有5000万个文件,存储在一台ubuntu电脑上,我想把这5000万个文件合并成几个大文件,怎么才能最快呢?
我试着编写一个go程序来读取文件,然后将输出读取到文件中,但我发现它太慢了.合并30-40个文件的实际读取速度约为1s,完成此过程需要16天以上.
有什么好方法可以快速合并吗?
这是我写的go代码:
const fileSizeLimit = (1 << 30) * 4 // 4GB
const filesStorePath = "<>"
func main() {
fileNamesFile := ""
outBasePath := ""
startId := 0
//del := false
flag.StringVar(&fileNamesFile, "d", "", "filenames file")
flag.StringVar(&outBasePath, "o", "", "out dir")
flag.IntVar(&startId, "f", 0, "start fn")
//flag.BoolVar(&del, "del", false, "del file")
flag.Parse()
start := time.Now()
fmt.Printf("start:%s\n", start.Format("2006-01-02 15:04:05"))
fmt.Printf("file names = %s\n", fileNamesFile)
fmt.Printf("out dir = %s\n", outBasePath)
allList, _ := ioutil.ReadFile(fileNamesFile)
all := strings.Split(string(allList), "\n")
total := len(all)
store := newStoreItems(outBasePath, startId)
uiLiveWriter := uilive.New()
uiLiveWriter.Start()
finish := make(chan bool, 1)
pos := 0
readCount := 0
go func() {
for i := pos; i < total; i++ {
pos = i
fn := all[i]
f := path.Join(filesStorePath, fn)
if content, err := ioutil.ReadFile(f); err == nil {
store.write(content)
}
}
}()
go func() {
ticker := time.NewTicker(1 * time.Second)
// 当前文件
for true {
select {
case <-ticker.C:
t := time.Since(start)
cost := t.Seconds()
content := fmt.Sprintf("read %d/%d(%.2f%%), file=%d/%d, speed=%d/s\ttime %s\n",
pos, total, float64(pos)/float64(total)*100,
store.index, store.getSize(),
int(float64(readCount) / cost),
(time.Duration(cost) * time.Second).String())
_, _ = fmt.Fprint(uiLiveWriter, content)
}
}
}()
osSignals := make(chan os.Signal, 1)
signal.Notify(osSignals, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
go func() {
s := <-osSignals
fmt.Println("stop !", s)
finish <- false
}()
<-finish
close(finish)
_, _ = fmt.Fprintln(uiLiveWriter, "Finished ")
uiLiveWriter.Stop() // flush and stop rendering
fmt.Println("readCount", readCount)
fmt.Println("exit 0")
}
type storeItems struct {
basePath string
w *bufio.Writer
file *os.File
size int
rowSize int64
index int
lock sync.Mutex
}
func newStoreItems(storePath string, startFn int) *storeItems {
fn := path.Join(storePath, strconv.Itoa(startFn))
f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
fmt.Printf("create [%s] fail! err: %s \n", fn, err)
}
return &storeItems{
basePath: storePath,
w: bufio.NewWriterSize(f, util.GIGABYTE),
file: f,
size: 0,
index: startFn,
}
}
func (s *storeItems) getSize() int {
return s.size
}
func (s *storeItems) nextFile() *os.File {
if s.file != nil {
_ = s.w.Flush()
_ = s.file.Close()
}
nextIndex := s.index+1
s.file, _ = os.OpenFile(path.Join(s.basePath, strconv.Itoa(nextIndex)),
os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
s.w = bufio.NewWriterSize(s.file, util.GIGABYTE)
s.index = nextIndex
s.size = 0
return s.file
}
func (s *storeItems) write(b []byte) {
_, _ = s.w.Write(b)
_, _ = s.w.WriteRune('\n')
s.size += len(b) + 1
if s.w.Size() >= fileSizeLimit {
// cut off file
s.nextFile()
}
}
执行输出:
start:2022-07-22 05:03:09
file names = ***
out dir = ***
read 9057/50803783(0.02%), file=0/48151629, speed=40/s time 3m41s
观察到的系统读写:读:4 M/s~9 M/s
我也try 过使用awk
和cat
个命令,但效果与go差不多.
head ~/filename.txt -n 10000 | xargs awk '1' >> ~/out/0
sed -i '1,10000d' ~/filename.txt