我正在try 使用多个Goroutine来读取同一个文件,其中每个Goroutine都被分配了一个字节来开始读取,并分配了一些行来读取lineLimit
个字节.
当文件可以放入内存时,通过将csv.ChunkSize
选项设置为chunkSize
变量,我成功地做到了这一点.但是,当文件大于内存时,我需要减少csv.ChunkSize
选项.我正try 着做这样的事情
package main
import (
"io"
"log"
"os"
"sync"
"github.com/apache/arrow/go/v11/arrow"
"github.com/apache/arrow/go/v11/arrow/csv"
)
// A reader to read lines from the file starting from the byteOffset. The number
// of lines is specified by linesLimit.
func produce(
id int,
ch chan<- arrow.Record,
byteOffset int64,
linesLimit int64,
filename string,
wg *sync.WaitGroup,
) {
defer wg.Done()
fd, _ := os.Open(filename)
fd.Seek(byteOffset, io.SeekStart)
var remainder int64 = linesLimit % 10
limit := linesLimit - remainder
chunkSize := limit / 10
reader := csv.NewInferringReader(fd,
csv.WithChunk(int(chunkSize)),
csv.WithNullReader(true, ""),
csv.WithComma(','),
csv.WithHeader(true),
csv.WithColumnTypes(map[string]arrow.DataType{
"Start_Time": arrow.FixedWidthTypes.Timestamp_ns,
"End_Time": arrow.FixedWidthTypes.Timestamp_ns,
"Weather_Timestamp": arrow.FixedWidthTypes.Timestamp_ns,
}))
reader.Retain()
defer reader.Release()
var count int64
for reader.Next() {
rec := reader.Record()
rec.Retain() // released at the other end of the channel
ch <- rec
count += rec.NumRows()
if count == limit {
if remainder != 0 {
flush(id, ch, fd, remainder)
}
break
} else if count > limit {
log.Panicf("Reader %d read more than it should, expected=%d, read=%d", id, linesLimit, count)
}
}
if reader.Err() != nil {
log.Panicf("error: %s in line %d,%d", reader.Err().Error(), count, id)
}
}
func flush(id int,
ch chan<- arrow.Record,
fd *os.File,
limit int64,
) {
reader := csv.NewInferringReader(fd,
csv.WithChunk(int(limit)),
csv.WithNullReader(true, ""),
csv.WithComma(','),
csv.WithHeader(false),
)
reader.Retain()
defer reader.Release()
record := reader.Record()
record.Retain() // nil pointer dereference error here
ch <- record
}
我try 了上述代码的多个版本,包括:
- 正在复制文件描述符
- 复制文件描述符的偏移量,打开同一文件 并寻求弥补这一点.
- 在呼叫
flush
或关闭第一个fd
之前关闭第一个读卡器.
无论我如何更改代码,错误似乎都是一样的.请注意,对flush
读取器的任何调用都会引发错误.包括reader.Next
和reader.Err()
.
我是不是用错了CSV读卡器?这是重用同一文件的问题吗?
编辑:我不知道这是否有帮助,但在flush
中打开一个没有任何Seek
的新FD可以避免错误(不知何故,任何Seek
都会导致原始错误出现).但是,如果没有Seek
,代码就不正确(即,删除Seek
会导致文件的一部分根本无法被任何Goroutine读取).