Problem Summary
我正在做一个项目,它需要在单个Linux服务器上以非常高的速度将数据流式传输到磁盘.使用以下命令的fio基准测试显示,我应该能够使用io_uring获得所需的写入速度(>;40 Gb/s).
fio --name=seqwrite --rw=write --direct=1 --ioengine=io_uring --bs=128k --numjobs=4 --size=100G --runtime=300 --directory=/mnt/md0/ --iodepth=128 --buffered=0 --numa_cpu_nodes=0 --sqthread_poll=1 --hipri=1
然而,我不能用我自己的代码来复制这种性能,因为我自己的代码使用了liburing帮助器库来进行输入输出.我目前的写入速度约为9 GB/s.我怀疑库的额外开销可能是瓶颈,但在放弃更漂亮的库代码之前,我有几个问题要问有关我的方法.
My approach
- 使用liburing
- 利用submission queue polling功能
- 不是对具有
writev()
的聚集/分散IO请求进行排队,而是将请求排队以使用正常的write()
函数来写入磁盘.(try 了聚集/分散IO请求,但这似乎对我的写入速度没有太大影响.) - 多线程,每个线程一个环
Additional Information
- 运行此代码的简化版本(不使用线程)会产生类似的结果.
- 我的调试器显示我正在创建
NUM_JOBS
宏中指定的线程数.但是,它没有告诉我内核为SQ轮询创建的线程. - 当运行两个以上的线程时,我的性能会下降
- Linux服务器有96个CPU可供使用
- 数据正在写入RAID0配置
- 我在一个单独的终端中使用
bpftrace -e 'tracepoint:io_uring:io_uring_submit_sqe {printf("%s(%d)\n", comm, pid);}'
,它显示专用于SQ轮询的内核线程处于活动状态. - 我已经验证写入磁盘的数据在大小和内容上与我预期的完全匹配.
- 我试着在设置戒指时使用了
IORING_SETUP_ATTACH_WQ
的旗帜.如果说有什么不同的话,那就是这让事情变慢了. - 我试过了各种区块大小,128K似乎是最合适的
Questions
- 我预计内核将在每个环上启动一个线程来处理SQ轮询.然而,我不知道如何核实这是否真的发生了.我能假设它是吗?
- 为什么我在运行两个以上的作业(job)时性能会下降?这是因为线程之间争用要写入的文件吗?也许是因为实际上只有一个执行SQ轮询的线程在处理来自多个环的请求时陷入困境?
- 是否有其他我应该使用的标志或选项可能会有所帮助?
- 是时候顶住子弹,直接打国际电话了吗?
The Code
为了简洁起见,下面的代码是一个简化版本,它删除了大量错误处理代码.然而,这个简化版本的性能和功能与功能齐全的代码相同.
主要功能
#include <fcntl.h>
#include <liburing.h>
#include <cstring>
#include <thread>
#include <vector>
#include "utilities.h"
#define NUM_JOBS 4 // number of single-ring threads
#define QUEUE_DEPTH 128 // size of each ring
#define IO_BLOCK_SIZE 128 * 1024 // write block size
#define WRITE_SIZE (IO_BLOCK_SIZE * 10000) // Total number of bytes to write
#define FILENAME "/mnt/md0/test.txt" // File to write to
char incomingData[WRITE_SIZE]; // Will contain the data to write to disk
int main()
{
// Initialize variables
std::vector<std::thread> threadPool;
std::vector<io_uring*> ringPool;
io_uring_params params;
int fds[2];
int bytesPerThread = WRITE_SIZE / NUM_JOBS;
int bytesRemaining = WRITE_SIZE % NUM_JOBS;
int bytesAssigned = 0;
utils::generate_data(incomingData, WRITE_SIZE); // this just fills the incomingData buffer with known data
// Open the file, store its descriptor
fds[0] = open(FILENAME, O_WRONLY | O_TRUNC | O_CREAT);
// initialize Rings
ringPool.resize(NUM_JOBS);
for (int i = 0; i < NUM_JOBS; i++)
{
io_uring* ring = new io_uring;
// Configure the io_uring parameters and init the ring
memset(¶ms, 0, sizeof(params));
params.flags |= IORING_SETUP_SQPOLL;
params.sq_thread_idle = 2000;
io_uring_queue_init_params(QUEUE_DEPTH, ring, ¶ms);
io_uring_register_files(ring, fds, 1); // required for sq polling
// Add the ring to the pool
ringPool.at(i) = ring;
}
// Spin up threads to write to the file
threadPool.resize(NUM_JOBS);
for (int i = 0; i < NUM_JOBS; i++)
{
int bytesToAssign = (i != NUM_JOBS - 1) ? bytesPerThread : bytesPerThread + bytesRemaining;
threadPool.at(i) = std::thread(writeToFile, 0, ringPool[i], incomingData + bytesAssigned, bytesToAssign, bytesAssigned);
bytesAssigned += bytesToAssign;
}
// Wait for the threads to finish
for (int i = 0; i < NUM_JOBS; i++)
{
threadPool[i].join();
}
// Cleanup the rings
for (int i = 0; i < NUM_JOBS; i++)
{
io_uring_queue_exit(ringPool[i]);
}
// Close the file
close(fds[0]);
return 0;
}
函数的作用是:
void writeToFile(int fd, io_uring* ring, char* buffer, int size, int fileIndex)
{
io_uring_cqe *cqe;
io_uring_sqe *sqe;
int bytesRemaining = size;
int bytesToWrite;
int bytesWritten = 0;
int writesPending = 0;
while (bytesRemaining || writesPending)
{
while(writesPending < QUEUE_DEPTH && bytesRemaining)
{
/* In this first inner loop,
* Write up to QUEUE_DEPTH blocks to the submission queue
*/
bytesToWrite = bytesRemaining > IO_BLOCK_SIZE ? IO_BLOCK_SIZE : bytesRemaining;
sqe = io_uring_get_sqe(ring);
if (!sqe) break; // if can't get a sqe, break out of the loop and wait for the next round
io_uring_prep_write(sqe, fd, buffer + bytesWritten, bytesToWrite, fileIndex + bytesWritten);
sqe->flags |= IOSQE_FIXED_FILE;
writesPending++;
bytesWritten += bytesToWrite;
bytesRemaining -= bytesToWrite;
if (bytesRemaining == 0) break;
}
io_uring_submit(ring);
while(writesPending)
{
/* In this second inner loop,
* Handle completions
* Additional error handling removed for brevity
* The functionality is the same as with errror handling in the case that nothing goes wrong
*/
int status = io_uring_peek_cqe(ring, &cqe);
if (status == -EAGAIN) break; // if no completions are available, break out of the loop and wait for the next round
io_uring_cqe_seen(ring, cqe);
writesPending--;
}
}
}