我在GO中有一个简单的并发用例,但是我想不出一个优雅的解决方案来解决我的问题.

我想要编写一个方法fetchAll,该方法并行地从远程服务器查询未指定数量的资源.如果任何获取失败,我希望立即返回第一个错误.

我最初的实现泄露了Goroutines:

    package main

    import (
      "fmt"
      "math/rand"
      "sync"
      "time"
    )

    func fetchAll() error {
      wg := sync.WaitGroup{}
      errs := make(chan error)
      leaks := make(map[int]struct{})
      defer fmt.Println("these goroutines leaked:", leaks)

      // run all the http requests in parallel
      for i := 0; i < 4; i++ {
        leaks[i] = struct{}{}
        wg.Add(1)
        go func(i int) {
          defer wg.Done()
          defer delete(leaks, i)

          // pretend this does an http request and returns an error
          time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
          errs <- fmt.Errorf("goroutine %d's error returned", i)
        }(i)
      }

      // wait until all the fetches are done and close the error
      // channel so the loop below terminates
      go func() {
        wg.Wait()
        close(errs)
      }()

      // return the first error
      for err := range errs {
        if err != nil {
          return err
        }
      }

      return nil
    }

    func main() {
      fmt.Println(fetchAll())
    }

操场:https://play.golang.org/p/Be93J514R5

从阅读https://blog.golang.org/pipelines中我知道我可以创建一个信号通道来清理其他线程.或者,我可以使用context来完成它.但是看起来这么简单的用例应该有一个我没有的更简单的解决方案.

推荐答案

使用Error Group使这变得更加简单.这会自动等待所有提供的GO routine 成功完成,或者在任何一个 routine 返回错误的情况下取消所有剩余的 routine (在这种情况下,该错误就是泡沫返回给调用者的那个错误).

package main

import (
        "context"
        "fmt"
        "math/rand"
        "time"

        "golang.org/x/sync/errgroup"
)

func fetchAll(ctx context.Context) error {
        errs, ctx := errgroup.WithContext(ctx)

        // run all the http requests in parallel
        for i := 0; i < 4; i++ {
                errs.Go(func() error {
                        // pretend this does an http request and returns an error                                                  
                        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)                                               
                        return fmt.Errorf("error in go routine, bailing")                                                      
                })
        }

        // Wait for completion and return the first error (if any)                                                                 
        return errs.Wait()
}

func main() {
        fmt.Println(fetchAll(context.Background()))
}

Go相关问答推荐

使用Gorm创建自定义连接表

获取k8s群集作用域运算符的命名空间

如何使用Gorilla WebSockets实现Http.Hijacker&;alexedwards/scs/v2

在Mac中使用uname获取处理器体系 struct 时,在为AMD64构建Go二进制时出现错误结果

Golang内置打印(Ln)函数&S行为怪异

golang testscript .txtar 语法,用于 stderr 或 stdout 中包含的文本

无法读取postman 中的表单数据

通过 Terraform 中的 MapNestedAtribute 进行迭代

用 fork 替换 Go 依赖:...用于两个不同的模块路径

我应该先解锁然后再广播吗?

使用GOTK3和librsvg在Go中如何加载内联SVG?

命令行参数在 Golang 程序中不正确地接受为参数

使用 Go 解组 SOAP 消息

CORS grpc 网关 GoLang

在 Golang 中,如何将接口作为泛型类型与 nil 进行比较?

在 Go 中发送 ack 和 term 后消息仍在 nats 限制队列中

Go 使用 struct 作为接口而不实现所有方法

HCL 解码:具有多个标签的块

Go 错误:Is() 和 As() 声称是递归的,是否有任何类型实现错误接口并支持这种递归 - 无错误?

即使没有竞争条件也没有得到任何输出