我看到很多关于如何让Go等待x个Goroutine完成的教程和示例,但我要做的是确保始终有x个Goroutine在运行,所以一个新的Goroutine一结束就会启动.

具体地说,我有几十万件"事情要做",那就是处理从MySQL中传出的一些东西.所以它的工作原理是这样的:

db, err := sql.Open("mysql", connection_string)
checkErr(err)
defer db.Close()

rows,err := db.Query(`SELECT id FROM table`)
checkErr(err)
defer rows.Close()

var id uint
for rows.Next() {
    err := rows.Scan(&id)
    checkErr(err)
    go processTheThing(id)
    }
checkErr(err)
rows.Close()

目前,它将发布数十万条processTheThing()个线程.我需要的是最多启动x个(我们称之为20个)goroutines.因此,它首先为前20行启动20,然后在当前的一个goroutine完成后,它将为下一个id启动一个新的goroutine.因此,在任何时间点,总有20人在 run .

我相信这很简单/标准,但我似乎找不到任何教程或示例的好解释,也找不到如何做到这一点的好解释.

推荐答案

谢谢大家帮我解决这个问题.然而,我觉得没有人真的提供了既有效又简单易懂的东西,尽管你们都帮助我理解了这项技术.

我最后所做的是,我认为作为我的具体问题的答案,我更容易理解和实际,所以我会把它贴在这里,以防其他人有同样的问题.

不知何故,这最终看起来很像OneofOne发布的内容,这很棒,因为现在我明白了.但是OneofOne的代码一开始我发现很难理解,因为要将函数传递给函数,这让我很难理解位是用来做什么的.我认为这样做更有意义:

package main

import (
"fmt"
"sync"
)

const xthreads = 5 // Total number of threads to use, excluding the main() thread

func doSomething(a int) {
    fmt.Println("My job is",a)
    return
}

func main() {
    var ch = make(chan int, 50) // This number 50 can be anything as long as it's larger than xthreads
    var wg sync.WaitGroup

    // This starts xthreads number of goroutines that wait for something to do
    wg.Add(xthreads)
    for i:=0; i<xthreads; i++ {
        go func() {
            for {
                a, ok := <-ch
                if !ok { // if there is nothing to do and the channel has been closed then end the goroutine
                    wg.Done()
                    return
                }
                doSomething(a) // do the thing
            }
        }()
    }

    // Now the jobs can be added to the channel, which is used as a queue
    for i:=0; i<50; i++ {
        ch <- i // add i to the queue
    }

    close(ch) // This tells the goroutines there's nothing else to do
    wg.Wait() // Wait for the threads to finish
}

Go相关问答推荐

禁用Golang中的终端

带有条件的for循环中缺少RETURN语句

使用ciph.AEAD.Seal()查看内存使用情况

未对GoFr中的所有请求进行跟踪

GoFR HTTP服务初始化中Open遥测传输和超时配置的说明

使用!NOT运算符的Golang文本/模板多个条件

如何在gofiber/websocket/v2中设置状态代码和原因

在Go中旋转矩阵

在运行时更改 Go lang slog 的日志(log)级别

如何根据中间件的请求设置上下文值?获取 go-staticcheck 问题

无法读取postman 中的表单数据

为什么我只收到部分错误而不是我启动的 goroutines 的所有错误?

无法使用带有 422 的 go-github 创建提交 - 更新不是快进

上传图片失败,出现错误dial tcp: lookup api.cloudinary.com: no such host

使用 unsafe.Pointer 将 struct point直接转换为另一个 struct 是否安全?

Apache Beam 左加入 Go

使用 oklog/run 来自 Go 编译器的错误(无值)用作值

如何扩充 ResponseWriter 的 Header() 返回的 map

函数参数的判断顺序是什么?

如何使用 context.WithCancel 启动和停止每个会话的心跳?