您可以使用为类似("carries deadlines, cancelation signals...")的内容创建的context
软件包.
创建一个能够发布context.WithCancel()
个取消信号的上下文(父上下文可能是context.Background()
返回的上下文).这将向您返回一个cancel()
函数,该函数可用于取消(或者更准确地说是取消意图)工作进程
我将使用以下work()
个实现,它模拟10%的失败概率,并模拟1秒的工作:
func work(i int) (int, error) {
if rand.Intn(100) < 10 { // 10% of failure
return 0, errors.New("random error")
}
time.Sleep(time.Second)
return 100 + i, nil
}
doAllWork()
人可能是这样的:
func doAllWork() error {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
// Check if any error occurred in any other gorouties:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
result, err := work(j)
if err != nil {
fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
}
}(i)
}
wg.Wait()
return ctx.Err()
}
测试方法如下:
func main() {
rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed
fmt.Printf("doAllWork: %v\n", doAllWork())
}
输出(在Go Playground上试用):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled
如果没有错误,例如使用以下work()
功能时:
func work(i int) (int, error) {
time.Sleep(time.Second)
return 100 + i, nil
}
输出如下(在Go Playground上try ):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>
Notes:
基本上,我们只是使用了上下文中的Done()
个通道,所以我们似乎可以同样轻松地(如果不是更简单的话)使用done
个通道,而不是Context
个,关闭通道来完成上面解决方案中cancel()
所做的事情.
这不是真的.This can only be used if only one goroutine may close the channel, but in our case any of the workers may do so.,并试图关闭已经关闭的通道死机(请参阅此处的详细信息:How does a non initialized channel behave?).因此,您必须确保close(done)
周围的某种同步/排除,这将使其可读性降低,甚至更加复杂.实际上,这正是cancel()
函数在幕后所做的事情,隐藏/抽象在您的视线之外,因此cancel()
可能会被多次调用,以使您的代码/使用它变得更简单.
如何从工人那里获取并返回错误?
为此,您可以使用错误通道:
errs := make(chan error, 2) // Buffer for 2 errors
在工作人员内部,当遇到错误时,将其发送到频道,而不是打印:
result, err := work(j)
if err != nil {
errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
循环结束后,如果出现错误,则返回该值(否则返回nil
):
// Return (first) error, if any:
if ctx.Err() != nil {
return <-errs
}
return nil
这次输出(在Go Playground上试一下):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error
请注意,我使用的缓冲通道的缓冲区大小等于工作进程的数量,这确保了在该通道上的发送始终是非阻塞的.这也使您可以接收和处理所有错误,而不仅仅是一个错误(例如第一个).另一种 Select 是使用缓冲通道仅保存1,并在其上执行非阻塞发送,可能如下所示:
errs := make(chan error, 1) // Buffered only for the first error
// ...and inside the worker:
result, err := work(j)
if err != nil {
// Non-blocking send:
select {
case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
default:
}
cancel()
return
}