我有一个函数,我想定义一个最大数量的go routine ,我有一个列表,我浏览这个列表,我通过通道向go routine 发送一条消息,在这个go routine 中,我将调用一个函数,要么得到一个答案,要么得到一个错误,当它不是错误时,我想把返回保存在一个片段中,当它是错误时,我想停止go routine 并进行调用.

type response struct {
    value string
}

func Testing() []response {

    fakeValues := getFakeValues()

    maxParallel := 25
    wg := &sync.WaitGroup{}
    wg.Add(maxParallel)

    if len(fakeValues) < maxParallel {
        maxParallel = len(fakeValues)
    }

    errReceive := make(chan error, 1)
    defer close(errReceive)

    response := make([]response, 0)
    valuesChan := make(chan string, 1)

    for i := 0; i < maxParallel; i++ {
        go func(valuesChan <-chan string, errReceive chan error) {
            for value := range valuesChan {
                resp, err := getFakeResult(value)
                if err != nil {
                    errReceive <- err
                }

                response = append(response, resp)
            }
            wg.Done()
        }(valuesChan, errReceive)
    }

    for _, val := range fakeValues {
        valuesChan <- val
    }

    close(valuesChan)
    wg.Wait()

    err := <-errReceive
    if err != nil {
        // make any thing
    }

    return response
}

func getFakeValues() []string {
    return []string{"a", "b"}
}

func getFakeResult(val string) (response, error) {
    if val == "a" {
        return response{}, fmt.Errorf("ooh noh:%s", val)
    }

    return response{
        value: val,
    }, nil
}

推荐答案

你可以将上下文与cancel一起使用,让go routine 知道它们应该停止.

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg := &sync.WaitGroup{}
wg.Add(1)
go func(ctx context.Context) {
    defer wg.Done()
    for {
        select {
        case <-ctx.Done():
            fmt.Println("context is done")
            return
        case <-time.After(time.Second):
            fmt.Println("work")
        }
    }
}(ctx)

time.Sleep(time.Second * 5)
cancel()
wg.Wait()

https://go.dev/play/p/qe2oDppSnaF


下面是一个在您的用例上下文中更好地展示它的示例.

type result struct {
    err error
    val int
}
rand.Seed(time.Now().UnixNano())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rchan := make(chan result, 5)
wg := &sync.WaitGroup{}

for i := 0; i < 5; i++ {
    wg.Add(1)
    go func(ctx context.Context) {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                fmt.Println("context is done")
                return
            case <-time.After(time.Second):
                n := rand.Intn(100)
                if n > 90 {
                    rchan <- result{err: fmt.Errorf("error %d", n)}
                } else {
                    rchan <- result{val: n}
                }
            }
        }
    }(ctx)
}

go func() {
    wg.Wait()
    close(rchan)
}()

for res := range rchan {
    if res.err != nil {
        fmt.Println(res.err)
        cancel()
        break
    } else {
        fmt.Println(res.val)
    }
}

https://go.dev/play/p/Z63n1h2A81o

Go相关问答推荐

Go协议缓冲区导入问题

日志(log)文件不在 golang 的日志(log)目录中

Go-如何在递归函数中关闭通道

仅使用公共 api 对 alexedwards/scs 进行简单测试

更改多对多连接表的名称

设置 graphql 的最大文件上传大小(golang)

具有两个参数的动态规划:天数和优惠券

为什么不同的 Wireguard 私钥会产生相同的公钥?

枚举的 Golang 验证器自定义验证规则

如何将多个切片打印为一个切片?

Golang Getrlimit 返回与 ulimit 不同的值

级联调用泛型函数时的泛型类型推断

如何在 golang 中同时加载 .env 文件和 os 环境变量

如何在 GORM 中迭代一个 int 数组

Golang - 无法从 pipped Windows 命令中获取结果

httprouterhttp.HandlerFunc() 是如何工作的?

如何从 docker-compose 命令运行 2 个不同的命令:

为什么 Go 中的 maps.Keys() 将 map 类型指定为 M?

使用不安全的指针从 [] 字符串中获取值

如何访问 Go 模板中数组的第一个索引的值