我需要同时写入多个文件,但是我有一个奇怪的行为,当执行太多并发GO routine 时,写入所需的时间会增加. 我用下面的代码示例重现了这个行为:
package main
import (
"fmt"
"log"
"time"
"math/rand"
"sync"
"os"
"io/ioutil"
)
var wg sync.WaitGroup
var mu sync.Mutex
func WriteToFile(data []byte, fileName string) error {
mu.Lock()
defer mu.Unlock()
err := ioutil.WriteFile(fileName, data, 0666)
if err != nil {
return err
}
return nil
}
func GenerateFile(index int) error {
defer wg.Done()
start := time.Now()
elapsed := time.Since(start)
buf := make([]byte, 7500000)
rand.Read(buf) // generate random data
randomFileName := fmt.Sprintf("/tmp/gotest-%v", rand.Int())
err := WriteToFile(buf, randomFileName)
if err != nil {
return err
}
defer os.Remove(randomFileName)
elapsed = time.Since(start)
log.Printf("goroutine id: %v, generate file %s done in %s", index, randomFileName, elapsed)
return nil
}
func main() {
start := time.Now()
for i := 0; i < 30; i++ {
wg.Add(1)
go GenerateFile(i)
}
wg.Wait()
elapsed := time.Since(start)
log.Printf("done in %s", elapsed)
}
我得到以下输出:
2022/09/29 15:56:42 generate file /tmp/gotest-6971393788758345733 done in 11.900753ms
2022/09/29 15:56:42 generate file /tmp/gotest-4673598773814253679 done in 19.610377ms
2022/09/29 15:56:42 generate file /tmp/gotest-4521113305882394455 done in 30.778525ms
2022/09/29 15:56:42 generate file /tmp/gotest-8353995959628873274 done in 44.008548ms
2022/09/29 15:56:42 generate file /tmp/gotest-1525622757191165040 done in 54.167972ms
2022/09/29 15:56:42 generate file /tmp/gotest-5591269383853180110 done in 62.932048ms
2022/09/29 15:56:42 generate file /tmp/gotest-6764721932990560969 done in 69.798237ms
2022/09/29 15:56:42 generate file /tmp/gotest-2295272693261924206 done in 78.44878ms
2022/09/29 15:56:42 generate file /tmp/gotest-2676611775321001319 done in 87.014884ms
2022/09/29 15:56:42 generate file /tmp/gotest-3509686945670964049 done in 96.566297ms
2022/09/29 15:56:42 generate file /tmp/gotest-4836444971131413206 done in 105.83336ms
2022/09/29 15:56:42 generate file /tmp/gotest-1300813662850892446 done in 114.375852ms
2022/09/29 15:56:42 generate file /tmp/gotest-8820441258575272048 done in 123.49409ms
2022/09/29 15:56:42 generate file /tmp/gotest-4521621037307893446 done in 131.889565ms
2022/09/29 15:56:42 generate file /tmp/gotest-3672417531832062779 done in 141.114569ms
2022/09/29 15:56:42 generate file /tmp/gotest-1086595338191073308 done in 150.002702ms
2022/09/29 15:56:43 generate file /tmp/gotest-3614980454862600779 done in 159.816332ms
2022/09/29 15:56:43 generate file /tmp/gotest-8835923576403879976 done in 168.48278ms
2022/09/29 15:56:43 generate file /tmp/gotest-234153523891197266 done in 177.340838ms
2022/09/29 15:56:43 generate file /tmp/gotest-4694585587220869374 done in 185.675593ms
2022/09/29 15:56:43 generate file /tmp/gotest-790814872699550686 done in 194.672799ms
2022/09/29 15:56:43 generate file /tmp/gotest-1109155725008951319 done in 203.573936ms
2022/09/29 15:56:43 generate file /tmp/gotest-2107324352293381651 done in 212.619876ms
2022/09/29 15:56:43 generate file /tmp/gotest-2262414210446865118 done in 221.291951ms
2022/09/29 15:56:43 generate file /tmp/gotest-5425047562930316945 done in 230.636649ms
2022/09/29 15:56:43 generate file /tmp/gotest-5121293724555456542 done in 239.258288ms
2022/09/29 15:56:43 generate file /tmp/gotest-8761336775543022197 done in 247.899475ms
2022/09/29 15:56:43 generate file /tmp/gotest-3896102679035637543 done in 259.054695ms
2022/09/29 15:56:43 generate file /tmp/gotest-6219168972567815878 done in 266.248078ms
2022/09/29 15:56:43 generate file /tmp/gotest-8075914412323098705 done in 275.679818ms
2022/09/29 15:56:43 done in 276.64278ms
我预计所有的请求应该大致同时完成,因为我正在启动Goroutine,并且将花费与第一个Goroutine相同的时间.如果删除数据生成和文件写入部分,请求将同时返回.此外,我也进行了工作人池的实验,但如果我添加太多工作人员,则总体时间仍在增加.
我不理解这种行为.有人能给我一个解释吗?
编辑:另外,在Go中有没有增加并行文件写入的解决方案?
编辑2:我try 在将数据写入磁盘之前预先生成数据,并使用非常小的字节片段(5),行为仍然是一样的.
编辑3: 好吧,我不了解所有的细节,但我想这与Golang调度器和Way it handles syscalls有关. 基于this article中描述的工作池和作业(job)队列,我更新了我的示例,并获得了一致的写入文件的结果.
package main
import (
"os"
"io/ioutil"
"fmt"
"time"
"log"
"math/rand"
"sync"
)
var (
MaxWorker = 2
wg sync.WaitGroup
)
// Job represents the job to be run
type Job struct {
FileIndex int
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
func GenerateFile(index int) error {
defer wg.Done()
start := time.Now()
elapsed := time.Since(start)
data := make([]byte, 7500000)
rand.Read(data) // generate random data
randomFileName := fmt.Sprintf("/tmp/gotest-%v", rand.Int())
err := ioutil.WriteFile(randomFileName, data, 0666)
if err != nil {
return err
}
defer os.Remove(randomFileName)
elapsed = time.Since(start)
log.Printf("goroutine id: %v, generate file %s done in %s", index, randomFileName, elapsed)
return nil
}
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
GenerateFile(job.FileIndex)
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
MaxWorkers int
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.MaxWorkers; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
func main() {
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
JobQueue = make(chan Job)
start := time.Now()
for i := 0; i < 30; i++ {
wg.Add(1)
work := Job{FileIndex: i}
// Push the work onto the queue.
JobQueue <- work
}
wg.Wait()
elapsed := time.Since(start)
log.Printf("done in %s", elapsed)
}
这为我提供了以下输出:
2022/10/03 11:54:53 goroutine id: 0, write file done in 11.838574ms
2022/10/03 11:54:53 goroutine id: 2, write file done in 19.516134ms
2022/10/03 11:54:53 goroutine id: 1, write file done in 14.525345ms
2022/10/03 11:54:53 goroutine id: 3, write file done in 15.714428ms
2022/10/03 11:54:53 goroutine id: 4, write file done in 14.858648ms
2022/10/03 11:54:53 goroutine id: 6, write file done in 13.567686ms
2022/10/03 11:54:53 goroutine id: 5, write file done in 13.952953ms
2022/10/03 11:54:53 goroutine id: 8, write file done in 12.848465ms
2022/10/03 11:54:53 goroutine id: 7, write file done in 16.949355ms
2022/10/03 11:54:53 goroutine id: 10, write file done in 18.237936ms
2022/10/03 11:54:53 goroutine id: 9, write file done in 14.750701ms
2022/10/03 11:54:53 goroutine id: 11, write file done in 15.03977ms
2022/10/03 11:54:53 goroutine id: 12, write file done in 14.320434ms
2022/10/03 11:54:53 goroutine id: 14, write file done in 16.248813ms
2022/10/03 11:54:53 goroutine id: 13, write file done in 15.655994ms
2022/10/03 11:54:54 goroutine id: 16, write file done in 13.760309ms
2022/10/03 11:54:54 goroutine id: 15, write file done in 14.58592ms
2022/10/03 11:54:54 goroutine id: 18, write file done in 15.343521ms
2022/10/03 11:54:54 goroutine id: 17, write file done in 15.035341ms
2022/10/03 11:54:54 goroutine id: 20, write file done in 14.53919ms
2022/10/03 11:54:54 goroutine id: 19, write file done in 14.59754ms
2022/10/03 11:54:54 goroutine id: 22, write file done in 14.190191ms
2022/10/03 11:54:54 goroutine id: 21, write file done in 14.810599ms
2022/10/03 11:54:54 goroutine id: 24, write file done in 14.843376ms
2022/10/03 11:54:54 goroutine id: 23, write file done in 15.578197ms
2022/10/03 11:54:54 goroutine id: 26, write file done in 15.244726ms
2022/10/03 11:54:54 goroutine id: 25, write file done in 14.845854ms
2022/10/03 11:54:54 goroutine id: 27, write file done in 14.719713ms
2022/10/03 11:54:54 goroutine id: 28, write file done in 15.499384ms
2022/10/03 11:54:54 goroutine id: 29, write file done in 15.203875ms
2022/10/03 11:54:54 done in 241.901091ms
如果有人能给出一个清楚的解释,那就太好了