我正在try 使用Goroutines编写一个简单的工作者池.

  • 我写的代码是惯用的吗?如果不是,那么应该改变什么呢?
  • 我希望能够将工作线程的最大数量设置为5,如果5个线程都忙,挡路会一直设置到某个工作线程可用.我如何将其扩展到最多只有5名员工的池?我是不是要生下5只静电猩猩,然后给每只work_channel只?

代码:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    sleepMs := rand.Intn(1000)
    fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
    time.Sleep(time.Duration(sleepMs) * time.Millisecond)
    o <- work + fmt.Sprintf("-%dms", sleepMs)
}

func main() {
    var work_channel = make(chan string)
    var results_channel = make(chan string)

    // create goroutine per item in work_channel
    go func() {
        var c = 0
        var wg sync.WaitGroup
        for work := range work_channel {
            wg.Add(1)
            go worker(fmt.Sprintf("%d", c), work, results_channel, &wg)
            c++
        }
        wg.Wait()
        fmt.Println("closing results channel")
        close(results_channel)
    }()

    // add work to the work_channel
    go func() {
        for c := 'a'; c < 'z'; c++ {
            work_channel <- fmt.Sprintf("%c", c)
        }
        close(work_channel)
        fmt.Println("sent work to work_channel")
    }()

    for x := range results_channel {
        fmt.Printf("result: %s\n", x)
    }
}

推荐答案

在任何意义上,您的解决方案都不是一个worker goroutine池:您的代码不限制并发goroutine,也不"重用"goroutine(它总是在收到新作业(job)时启动一个新的goroutine).

生产者-消费者模式

Bruteforce MD5 Password cracker所示,你可以使用producer-consumer pattern.你可以有一个指定的producer goroutine来生成作业(job)(要做的事情/计算),并通过jobs通道发送它们.您可以有一个包含consumer个goroutine(例如其中5个)的固定池,这些goroutine将在传递作业(job)的通道上循环,每个goroutine将执行/完成接收到的作业(job).

producer goroutine只需在生成并发送所有作业(job)时关闭jobs通道,就可以正确地向consumers发出不再有作业(job)的信号.通道上的for ... range构造处理"关闭"事件并正确终止.请注意,在关闭通道之前发送的所有作业(job)仍将被传递.

这将产生一个干净的设计,将产生固定(但任意)数量的goroutines,并且它将始终利用consumer%的CPU(如果goroutines的数量大于CPU核心的数量).它还有一个优点,那就是通过适当 Select 信道容量(缓冲信道)和consumer条Goroutine的数量,可以对其进行"节流".

请注意,此模型具有指定的生产者goroutine,这不是强制的.您也可以有多个Goroutine来生成作业(job),但是您也必须对它们进行同步操作,以便仅在所有生产者goroutine都完成生成作业(job)时才关闭jobs通道-否则,在通道jobs已经关闭的情况下try 发送另一个作业(job)会导致运行时死机.生产作业(job)通常很便宜,而且生产速度比执行速度快得多,所以这种在多人消费/执行作业(job)的情况下在1个Goroutine中生产作业(job)的模式在实践中是很好的.

Handling results:

如果作业(job)有结果,您可以 Select 使用指定的result个频道来传递结果("发送回"),也可以 Select 在作业(job)完成/完成时在消费者中处理结果.后者甚至可以通过一个处理结果的"回调"函数来实现.重要的是结果是否可以独立处理,或者是否需要合并(例如map reduce framework)或聚合.

如果您使用results通道,您还需要一个从其接收值的goroutine,以防止消费者被阻塞(如果results的缓冲区被填满,就会发生这种情况).

With results channel

我不会将简单的string个值作为作业(job)和结果发送,而是创建一个包装器类型,它可以容纳任何附加信息,因此更加灵活:

type Job struct {
    Id     int
    Work   string
    Result string
}

注意,Job struct 也包装了结果,所以当我们发回结果时,它还包含原始的Job作为上下文often very useful.还要注意,只在通道上发送指针(*Job)而不是Job个值是有益的,这样就不需要制作Job的"无数"副本,而且Job struct值的大小也变得无关紧要.

Here is how this producer-consumer could look like:

我会使用2sync.WaitGroup个值,它们的作用如下:

var wg, wg2 sync.WaitGroup

生产者负责生成要执行的作业(job):

func produce(jobs chan<- *Job) {
    // Generate jobs:
    id := 0
    for c := 'a'; c <= 'z'; c++ {
        id++
        jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
    }
    close(jobs)
}

当完成(没有更多的工作)时,jobs通道关闭,这向消费者发出不会再有工作到达的信号.

请注意,produce()jobs通道视为send only,因为这就是制作人只需要对其执行的操作:send个作业(job)(除了closing个之外,send only通道也允许这样做).生成器中的意外接收将是编译时错误(在编译时及早检测到).

只要可以接收到作业(job),消费者的责任就是接收作业(job),并执行它们:

func consume(id int, jobs <-chan *Job, results chan<- *Job) {
    defer wg.Done()
    for job := range jobs {
        sleepMs := rand.Intn(1000)
        fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
        time.Sleep(time.Duration(sleepMs) * time.Millisecond)
        job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)
        results <- job
    }
}

请注意,consume()jobs通道视为receive only;消费者只需从它到receive.类似地,results频道对于消费者是send only.

还要注意,这里关闭results通道cannot是因为有多个消费者Goroutine,并且只有第一次try 关闭它才会成功,而进一步的try 将导致运行时死机!results通道可以(必须)在所有消费者Goroutine结束之后关闭,因为这样我们可以确保不会在results通道上发送更多的值(结果).

我们有需要分析的结果:

func analyze(results <-chan *Job) {
    defer wg2.Done()
    for job := range results {
        fmt.Printf("result: %s\n", job.Result)
    }
}

正如您所看到的,只要结果可能出现,这也会接收结果(直到关闭results个通道).分析仪的results通道是receive only.

请注意通道类型的使用:只要足够,就只使用unidirectional通道类型,以便在编译时尽早检测和防止错误.如果你需要两个方向,只使用bidirectional通道类型.

这就是所有这些粘在一起的方式:

func main() {
    jobs := make(chan *Job, 100)    // Buffered channel
    results := make(chan *Job, 100) // Buffered channel

    // Start consumers:
    for i := 0; i < 5; i++ { // 5 consumers
        wg.Add(1)
        go consume(i, jobs, results)
    }
    // Start producing
    go produce(jobs)

    // Start analyzing:
    wg2.Add(1)
    go analyze(results)

    wg.Wait() // Wait all consumers to finish processing jobs

    // All jobs are processed, no more values will be sent on results:
    close(results)

    wg2.Wait() // Wait analyzer to analyze all results
}

Example output:

以下是示例输出:

如您所见,在所有作业(job)进入队列之前,结果会出现并得到分析:

worker #4 received: 'e', sleep 81ms
worker #0 received: 'a', sleep 887ms
worker #1 received: 'b', sleep 847ms
worker #2 received: 'c', sleep 59ms
worker #3 received: 'd', sleep 81ms
worker #2 received: 'f', sleep 318ms
result: c-59ms
worker #4 received: 'g', sleep 425ms
result: e-81ms
worker #3 received: 'h', sleep 540ms
result: d-81ms
worker #2 received: 'i', sleep 456ms
result: f-318ms
worker #4 received: 'j', sleep 300ms
result: g-425ms
worker #3 received: 'k', sleep 694ms
result: h-540ms
worker #4 received: 'l', sleep 511ms
result: j-300ms
worker #2 received: 'm', sleep 162ms
result: i-456ms
worker #1 received: 'n', sleep 89ms
result: b-847ms
worker #0 received: 'o', sleep 728ms
result: a-887ms
worker #1 received: 'p', sleep 274ms
result: n-89ms
worker #2 received: 'q', sleep 211ms
result: m-162ms
worker #2 received: 'r', sleep 445ms
result: q-211ms
worker #1 received: 's', sleep 237ms
result: p-274ms
worker #3 received: 't', sleep 106ms
result: k-694ms
worker #4 received: 'u', sleep 495ms
result: l-511ms
worker #3 received: 'v', sleep 466ms
result: t-106ms
worker #1 received: 'w', sleep 528ms
result: s-237ms
worker #0 received: 'x', sleep 258ms
result: o-728ms
worker #2 received: 'y', sleep 47ms
result: r-445ms
worker #2 received: 'z', sleep 947ms
result: y-47ms
result: u-495ms
result: x-258ms
result: v-466ms
result: w-528ms
result: z-947ms

Go Playground上试用完整的应用程序.

Without a results channel

如果我们不使用results通道,但是消费者Goroutines会立即处理结果(在我们的例子中是打印结果),那么代码会大大简化.在本例中,我们不需要2sync.WaitGroup个值(第二个值只需要等待分析器完成).

如果没有results个通道,完整的解决方案如下:

var wg sync.WaitGroup

type Job struct {
    Id   int
    Work string
}

func produce(jobs chan<- *Job) {
    // Generate jobs:
    id := 0
    for c := 'a'; c <= 'z'; c++ {
        id++
        jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
    }
    close(jobs)
}

func consume(id int, jobs <-chan *Job) {
    defer wg.Done()
    for job := range jobs {
        sleepMs := rand.Intn(1000)
        fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
        time.Sleep(time.Duration(sleepMs) * time.Millisecond)
        fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs))
    }
}

func main() {
    jobs := make(chan *Job, 100) // Buffered channel

    // Start consumers:
    for i := 0; i < 5; i++ { // 5 consumers
        wg.Add(1)
        go consume(i, jobs)
    }
    // Start producing
    go produce(jobs)

    wg.Wait() // Wait all consumers to finish processing jobs
}

输出与results通道的输出"相似"(当然,执行/完成顺序是随机的).

Go Playground上试试这个变种.

Go相关问答推荐

Makefile:现有文件上没有这样的文件或目录,不加载环境变量

获取k8s群集作用域运算符的命名空间

困扰围棋官方巡回赛的S建议所有方法都使用同一类型的接收器

Go:如何在不加载正文的情况下创建 http 代理通行证

Go Programming Language书上的例子server2错了吗?

Golang 到 wasm 编译使用 tinygo.使用 wasmtime 执行

linter 警告:返回值被忽略

如何在 `hashicorp / terraform-exec` 中将 `ApplyConfig` 传递给 `tf.Apply()`?

加密/椭圆:try 在无效点上进行操作

无法访问 Go 模块导入的远程存储库

将 []float64 像素切片转换为图像

Go 中 SDL Surface 的 OpenGL 纹理

Go 加密库创建的 PKCS1 公钥与openssl rsa ...之间的区别

无限期运行 Go routine (完成后重新启动)

Go GCP 同时模拟两个服务帐户

httprouterhttp.HandlerFunc() 是如何工作的?

Go 导入范围查找 protobuf 类型

Dynamodb.ScanInput - 不能使用expr.Names()(类型 map[string]*string)作为类型 map[string]string

无法识别同步错误.使用一次

如何发送带有登录数据的 GET 请求并将 cookie 数据保存到 txt 文件?