我正在try 使用缓冲通道在Golang创建生产者-消费者消息队列系统.这是我的实现.

package main

import "fmt"

type MessageQueue struct {
    storage chan int
    count   int
}

var done = make(chan bool)

func NewMessageQueue(count int) *MessageQueue {
    ret := &MessageQueue{
        count:   count,
        storage: make(chan int, count),
    }
    return ret
}

func (m *MessageQueue) Produce() {
    for i := 0; i < m.count; i++ {
        m.storage <- i + 1
    }
    done <- true
}

func (m *MessageQueue) Consume(f func(int) int) {
    for each := range m.storage {
        fmt.Printf("%d ", f(each))
    }
}

func main() {
    op1 := func(a int) int {
        return a * a
    }
    msq := NewMessageQueue(10)
    go msq.Produce()
    go msq.Consume(op1)
    <-done
}

但不幸的是,当我运行go run main.go时,我无法获得输出,但是为了判断是否存在任何竞争条件,当我try go run -race main.go时,我确实获得了输出.我无法理解为什么会发生这种事.有人能帮我吗?

推荐答案

当你的制作人可以发送值时,它会在done频道上发送一个值,这样你的应用程序就可以立即终止.

相反,当生产者完成时,它应该关闭m.storage通道,发出不再发送值的信号,并且不要在done上发送值,因为您还没有完成!

当值被消耗时,您就完成了,所以在Consume()中的done上发送一个值:

func (m *MessageQueue) Produce() {
    for i := 0; i < m.count; i++ {
        m.storage <- i + 1
    }
    close(m.storage)
}

func (m *MessageQueue) Consume(f func(int) int) {
    for each := range m.storage {
        fmt.Printf("%d ", f(each))
    }
    done <- true
}

这将输出(在Go Playground上try ):

1 4 9 16 25 36 49 64 81 100 

done通道是必需的,因为main goroutine中没有消费,main goroutine必须等待它结束.

如果在main goroutine上进行消费,可以删除done频道:

msq := NewMessageQueue(10)
go msq.Produce()
msq.Consume(op1)

Go Playground号上试试这个.

Go相关问答推荐

Go -SDP服务器读缓冲区不会更改任何内容

切换选项卡时,Goland IDE中的光标自动转移

";无效的复制因子;融合Kafka Go客户端

如何模拟 stripe 需要 webhooks 的捕获事件?

Golang Fiber Render - 将数据发送到多个布局

如何将Golang测试用例的测试覆盖率值与特定阈值进行比较

如何在 Go 中将 int 转换为包含 complex128 的泛型类型?

Golang:如何在不转义每个动作的情况下呈现模板的模板?

go-echo中如何防止+转义

Protobuf.Any - 从 json.RawMessage 解组

如果 transaction.Commit 对带有 postgres 连接的 SQL 失败,您是否需要调用 transaction.RollBack

使用 go.work 文件在多个测试文件上运行 go test 命令

在 GORM 中,如何在特定时区配置 autoCreateTime 和 autoUpdateTime?

如果服务器在客户端的 gRPC 中不可用,则等待的方法

为什么 go.mod 中的所有依赖都是间接的?

Golang 'defer' 导致发送(接收)API 响应延迟

行之间的模板交替设计

我该如何做错误处理惯用的方式

(如何)我可以基于接口抽象地实现Stringer吗?

有没有一种方法可以确保传递的值具有使用泛型的某些字段?