在我的例子中,我有数千个Goroutine同时工作,相当于work()个.我也有一辆sync()号大猩猩.当sync启动时,我需要任何其他Goroutine在同步作业(job)完成后暂停一段时间.以下是我的代码:

var channels []chan int
var channels_mutex sync.Mutex

func work() {
  channel := make(chan int, 1)
  channels_mutex.Lock()  
  channels = append(channels, channel)
  channels_mutex.Unlock()
  for {
    for {
      sync_stat := <- channel // blocked here
      if sync_stat == 0 { // if sync complete
        break  
      }
    }
    // Do some jobs
    if (some condition) {
      return
    }
  }
}

func sync() {
  channels_mutex.Lock()
  // do some sync

  for int i := 0; i != len(channels); i++ {
    channels[i] <- 0
  }
  channels_mutex.Unlock()
}

现在的问题是,由于<-总是在读取时阻塞,所以每次转到sync_stat := <- channel都是阻塞.我知道如果通道被关闭,它不会被封锁,但因为我必须使用这个通道直到work()个出口,我没有找到任何方法重新打开一个关闭的通道.

我怀疑自己走错了路,所以任何帮助都很感激.有没有什么"优雅"的方式来暂停和恢复任何其他的大猩猩表演?

推荐答案

如果我没记错的话,您需要N个工人和一个控制器,可以随意暂停、恢复和停止工人.下面的代码正好可以做到这一点.

package main

import (
    "fmt"
    "runtime"
    "sync"
)

// Possible worker states.
const (
    Stopped = 0
    Paused  = 1
    Running = 2
)

// Maximum number of workers.
const WorkerCount = 1000

func main() {
    // Launch workers.
    var wg sync.WaitGroup
    wg.Add(WorkerCount + 1)

    workers := make([]chan int, WorkerCount)
    for i := range workers {
        workers[i] = make(chan int, 1)

        go func(i int) {
            worker(i, workers[i])
            wg.Done()
        }(i)
    }

    // Launch controller routine.
    go func() {
        controller(workers)
        wg.Done()
    }()

    // Wait for all goroutines to finish.
    wg.Wait()
}

func worker(id int, ws <-chan int) {
    state := Paused // Begin in the paused state.

    for {
        select {
        case state = <-ws:
            switch state {
            case Stopped:
                fmt.Printf("Worker %d: Stopped\n", id)
                return
            case Running:
                fmt.Printf("Worker %d: Running\n", id)
            case Paused:
                fmt.Printf("Worker %d: Paused\n", id)
            }

        default:
            // We use runtime.Gosched() to prevent a deadlock in this case.
            // It will not be needed of work is performed here which yields
            // to the scheduler.
            runtime.Gosched()

            if state == Paused {
                break
            }

            // Do actual work here.
        }
    }
}

// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
    // Start workers
    setState(workers, Running)

    // Pause workers.
    setState(workers, Paused)

    // Unpause workers.
    setState(workers, Running)

    // Shutdown workers.
    setState(workers, Stopped)
}

// setState changes the state of all given workers.
func setState(workers []chan int, state int) {
    for _, w := range workers {
        w <- state
    }
}

Go相关问答推荐

SEARCH On Conflict Clause不考虑乐观锁定版本

Golang ==错误:OCI运行时创建失败:无法启动容器进程:exec:./" bin:stat./" bin:没有这样的文件或目录:未知

获取k8s群集作用域运算符的命名空间

golang 的条件储存库

如何使用 html/template 在 golang 中运行一个范围内的范围

使用反射在Go中递归迭代 struct 体和集合字段

Opensearch 错误 ping 弹性服务器:由未知权威签署的 x509 证书

如何将 base64 编码的公钥转换为 crypto.PublicKey 或 ecdsa.PublicKey

无法将 graphql-ws 连接到 gqlgen

无法使用 gocsv 读取引用字段

访问传递给可变参数函数的通用 struct 的特定字段

有没有什么方法可以在不使用 if/else 的情况下在 Golang 中处理 nil 指针?

Go:从 ssl 证书中获取 'subject/unstructeredName' 的值

使用innerxml在 Go 中编码 XML 是否仅适用于某些类型?

从 Makefile 运行时权限被拒绝

Go:等待多个通道的性能损失

golang jwt.MapClaims 获取用户ID

如何使用golang操作很长的字符串以避免内存不足

正确编码 JWT

map和struct golang的合并