请考虑此函数:

func doAllWork() error {

    var wg sync.WaitGroup

    for i := 0; i < 2; i++ {

        wg.add(1)
        go func() {

            defer wg.Done()
            for j := 0; j < 10; j++ {
                result, err := work(j)
                if err != nil {
                    // can't use `return err` here
                    // what sould I put instead ? 
                    os.Exit(0)
                }
            }
        }()
    }
    wg.Wait()

    return nil
}

在每个goroutine中,函数work()被调用10次.如果对work()的一次调用在任何正在运行的goroutine中返回错误,我希望所有goroutine立即停止,程序退出.


Edit:这个问题不同于how to stop a goroutine,因为这里我需要关闭所有goroutine,如果其中一个goroutine发生错误

推荐答案

您可以使用为类似("carries deadlines, cancelation signals...")的内容创建的context软件包.

创建一个能够发布context.WithCancel()个取消信号的上下文(父上下文可能是context.Background()返回的上下文).这将向您返回一个cancel()函数,该函数可用于取消(或者更准确地说是取消意图)工作进程

我将使用以下work()个实现,它模拟10%的失败概率,并模拟1秒的工作:

func work(i int) (int, error) {
    if rand.Intn(100) < 10 { // 10% of failure
        return 0, errors.New("random error")
    }
    time.Sleep(time.Second)
    return 100 + i, nil
}

doAllWork()人可能是这样的:

func doAllWork() error {
    var wg sync.WaitGroup

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // Make sure it's called to release resources even if no errors

    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()

            for j := 0; j < 10; j++ {
                // Check if any error occurred in any other gorouties:
                select {
                case <-ctx.Done():
                    return // Error somewhere, terminate
                default: // Default is must to avoid blocking
                }
                result, err := work(j)
                if err != nil {
                    fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
                    cancel()
                    return
                }
                fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
            }
        }(i)
    }
    wg.Wait()

    return ctx.Err()
}

测试方法如下:

func main() {
    rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed
    fmt.Printf("doAllWork: %v\n", doAllWork())
}

输出(在Go Playground上试用):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled

如果没有错误,例如使用以下work()功能时:

func work(i int) (int, error) {
    time.Sleep(time.Second)
    return 100 + i, nil
}

输出如下(在Go Playground上try ):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>

Notes:

基本上,我们只是使用了上下文中的Done()个通道,所以我们似乎可以同样轻松地(如果不是更简单的话)使用done个通道,而不是Context个,关闭通道来完成上面解决方案中cancel()所做的事情.

这不是真的.This can only be used if only one goroutine may close the channel, but in our case any of the workers may do so.,并试图关闭已经关闭的通道死机(请参阅此处的详细信息:How does a non initialized channel behave?).因此,您必须确保close(done)周围的某种同步/排除,这将使其可读性降低,甚至更加复杂.实际上,这正是cancel()函数在幕后所做的事情,隐藏/抽象在您的视线之外,因此cancel()可能会被多次调用,以使您的代码/使用它变得更简单.

如何从工人那里获取并返回错误?

为此,您可以使用错误通道:

errs := make(chan error, 2) // Buffer for 2 errors

在工作人员内部,当遇到错误时,将其发送到频道,而不是打印:

result, err := work(j)
if err != nil {
    errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
    cancel()
    return
}

循环结束后,如果出现错误,则返回该值(否则返回nil):

// Return (first) error, if any:
if ctx.Err() != nil {
    return <-errs
}
return nil

这次输出(在Go Playground上试一下):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error

请注意,我使用的缓冲通道的缓冲区大小等于工作进程的数量,这确保了在该通道上的发送始终是非阻塞的.这也使您可以接收和处理所有错误,而不仅仅是一个错误(例如第一个).另一种 Select 是使用缓冲通道仅保存1,并在其上执行非阻塞发送,可能如下所示:

errs := make(chan error, 1) // Buffered only for the first error

// ...and inside the worker:

result, err := work(j)
if err != nil {
    // Non-blocking send:
    select {
    case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
    default:
    }
    cancel()
    return
}

Go相关问答推荐

获取k8s群集作用域运算符的命名空间

在Golang中Mergesort的递归/并行实现中出现死锁

我找不到pcap.Openlive的设备名称

如何模拟go的Elastic search SDK?

调用库和直接操作效率有区别吗?

如何绕过深层 xml,没有嵌套循环?

如何使用gosdk在Dynamodb中进行UpdateItem时,将ValueBuilder对象声明为StringSet类型?

golang / urfave.cli:无法手动设置标志

如何使用 fyne Go 使用 canvas.NewText() 使文本可滚动

速率限制特定端点

使用 go.work 文件在多个测试文件上运行 go test 命令

Golang Echo Labstack 如何在模板视图中调用函数/方法

将未知长度切片的值分配给Go中的 struct ?

Grafana/Prometheus 将多个 ip 可视化为查询

Go:用于 XML 解码的嵌套 struct 中的提升字段

如何排除溢出矩阵的坐标

带有 grpc 的 protobuf 用于拆分包中的 Go

Go Flag 用法 描述 包含 Word 值

泛型:对具有返回自身的函数的类型的约束

Go:为一组单个结果实现 ManyDecode