您正在做的是扇出模式,也就是说,多个端点正在侦听单个输入源.此模式的结果是,每当输入源中有消息时,这些侦听器中只有一个能够获得消息.唯一的例外是close
%的频道.这close
个将被所有的听众识别,因此是一个"广播".
但是您想要做的是广播一条从连接中读取的消息,所以我们可以这样做:
当监听人数已知时
让每个工作人员收听专用广播频道,并将消息从主频道发送到每个专用广播频道.
type worker struct {
source chan interface{}
quit chan struct{}
}
func (w *worker) Start() {
w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
go func() {
for {
select {
case msg := <-w.source
// do something with msg
case <-quit: // will explain this in the last section
return
}
}
}()
}
然后我们可以有一群工人:
workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }
然后启动我们的监听器:
go func() {
for {
conn, _ := listener.Accept()
ch <- conn
}
}()
还有一个调度员:
go func() {
for {
msg := <- ch
for _, worker := workers {
worker.source <- msg
}
}
}()
当监听人数未知时
在这种情况下,上面给出的解决方案仍然有效.唯一的区别是,无论何时需要新的Worker,您都需要创建一个新的Worker,启动它,然后将其推入workers
个切片.但是此方法需要线程安全片,而线程安全片需要围绕它的锁.其中一个实现可能如下所示:
type threadSafeSlice struct {
sync.Mutex
workers []*worker
}
func (slice *threadSafeSlice) Push(w *worker) {
slice.Lock()
defer slice.Unlock()
workers = append(workers, w)
}
func (slice *threadSafeSlice) Iter(routine func(*worker)) {
slice.Lock()
defer slice.Unlock()
for _, worker := range workers {
routine(worker)
}
}
无论何时您想要启动一名员工:
w := &worker{}
w.Start()
threadSafeSlice.Push(w)
并且您的调度员将更改为:
go func() {
for {
msg := <- ch
threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
}
}()
最后一句话:永远不要离开一个摇摇晃晃的高尔夫球场
其中一个很好的做法是:永远不要留下一只摇摆的猩猩.所以当你听完的时候,你需要关闭你emits 的所有大猩猩.这将通过worker
中的quit
通道完成:
首先,我们需要创建一个全局quit
信令信道:
globalQuit := make(chan struct{})
每当我们创建一个worker时,我们都会将globalQuit
个通道分配给它作为退出信号:
worker.quit = globalQuit
然后,当我们想要关闭所有员工时,我们只需执行以下操作:
close(globalQuit)
由于close
将被所有侦听的Goroutine识别(这是您理解的一点),因此将返回所有Goroutine.记住也要关闭您的调度程序 routine ,但我将把它留给您:)