我是新来的,我正在try 创建一个简单的聊天服务器,在那里客户端可以向所有连接的客户端广播消息.

在我的服务器中,我有一个goroutine(无限for循环),它接受连接,所有连接都由一个通道接收.

go func() {
    for {
        conn, _ := listener.Accept()
        ch <- conn
        }
}()

然后,我 for each 连接的客户端启动一个处理程序(Goroutine).在处理程序内部,我试图通过迭代通道向所有连接广播.

for c := range ch {
    conn.Write(msg)
}

但是,我不能广播,因为(我想从文档上看)在迭代之前需要关闭通道.我不确定何时应该关闭频道,因为我想不断接受新的连接,而关闭频道不会让我这样做.如果有人能帮助我,或提供一种更好的方式向所有连接的客户端广播消息,将不胜感激.

推荐答案

您正在做的是扇出模式,也就是说,多个端点正在侦听单个输入源.此模式的结果是,每当输入源中有消息时,这些侦听器中只有一个能够获得消息.唯一的例外是close%的频道.这close个将被所有的听众识别,因此是一个"广播".

但是您想要做的是广播一条从连接中读取的消息,所以我们可以这样做:

当监听人数已知时

让每个工作人员收听专用广播频道,并将消息从主频道发送到每个专用广播频道.

type worker struct {
    source chan interface{}
    quit chan struct{}
}

func (w *worker) Start() {
    w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
    go func() {
        for {
            select {
            case msg := <-w.source
                // do something with msg
            case <-quit: // will explain this in the last section
                return
            }
        }
    }()
}

然后我们可以有一群工人:

workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }

然后启动我们的监听器:

go func() {
for {
    conn, _ := listener.Accept()
    ch <- conn
    }
}()

还有一个调度员:

go func() {
    for {
        msg := <- ch
        for _, worker := workers {
            worker.source <- msg
        }
    }
}()

当监听人数未知时

在这种情况下,上面给出的解决方案仍然有效.唯一的区别是,无论何时需要新的Worker,您都需要创建一个新的Worker,启动它,然后将其推入workers个切片.但是此方法需要线程安全片,而线程安全片需要围绕它的锁.其中一个实现可能如下所示:

type threadSafeSlice struct {
    sync.Mutex
    workers []*worker
}

func (slice *threadSafeSlice) Push(w *worker) {
    slice.Lock()
    defer slice.Unlock()

    workers = append(workers, w)
}

func (slice *threadSafeSlice) Iter(routine func(*worker)) {
    slice.Lock()
    defer slice.Unlock()

    for _, worker := range workers {
        routine(worker)
    }
}

无论何时您想要启动一名员工:

w := &worker{}
w.Start()
threadSafeSlice.Push(w)

并且您的调度员将更改为:

go func() {
    for {
        msg := <- ch
        threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
    }
}()

最后一句话:永远不要离开一个摇摇晃晃的高尔夫球场

其中一个很好的做法是:永远不要留下一只摇摆的猩猩.所以当你听完的时候,你需要关闭你emits 的所有大猩猩.这将通过worker中的quit通道完成:

首先,我们需要创建一个全局quit信令信道:

globalQuit := make(chan struct{})

每当我们创建一个worker时,我们都会将globalQuit个通道分配给它作为退出信号:

worker.quit = globalQuit

然后,当我们想要关闭所有员工时,我们只需执行以下操作:

close(globalQuit)

由于close将被所有侦听的Goroutine识别(这是您理解的一点),因此将返回所有Goroutine.记住也要关闭您的调度程序 routine ,但我将把它留给您:)

Go相关问答推荐

追加一个字节数组的分配比2个字节数组的分配要少得多

为什么在GO中打印变量会导致堆栈溢出?

如何使用Docker Compose配置Go,使main. go文件位于/CMD文件夹中

租户GUID X的租户不存在self 邮箱帐户的租户(我是唯一的成员)

golang 的持久隐蔽服务

如何模拟go的Elastic search SDK?

Golang Gorm Fiber / argon2.Config 未定义

如何将 goose 迁移与 pgx 一起使用?

Golang 网络应用程序安全性:您是否应该判断输入是否为有效的 utf-8?

Yocto 无法交叉编译 GoLang Wails 应用程序

设置 graphql 的最大文件上传大小(golang)

具有两个参数的动态规划:天数和优惠券

如何将 base64 编码的公钥转换为 crypto.PublicKey 或 ecdsa.PublicKey

有没有办法计算枚举中定义的项目总数?

上传图片失败,出现错误dial tcp: lookup api.cloudinary.com: no such host

Golang 构建多平台问题

如何在 golang revel 中获取动态应用程序配置

在删除级联时无法在 Gorm 中按预期工作

我该如何做错误处理惯用的方式

是否存在一个Go泛型类型约束,该约束捕获了将类型用作映射中的键的能力?