默认情况下,流是blocking.TCP/IP流、文件系统流、管道流,它们都是阻塞的.当你告诉一个流给你一个字节块时,它会停止并等待,直到它有给定的字节数,或者直到其他事情发生(一个interrupt,一个流结束,一个错误).
The operating systems are eager to return the data to the reading process, so if all you want is to wait for the next line and handle it as soon as it comes in then the method suggested by Shepmaster in Unable to pipe to or from spawned child process more than once (and also in his answer here) works.
Though in theory it doesn't have to work, because an operating system is allowed to make the BufReader
wait for more data in read
, but in practice the operating systems prefer the early "short reads" to waiting.
当需要处理多个流(如子进程的stdout
和stderr
)或多个进程时,这种基于BufReader
的简单方法变得更加危险.例如,当子进程等待您排空其stderr
管道,而您的进程在等待其空stdout
管道时被阻塞时,基于BufReader
的方法可能会死锁.
同样,如果不希望程序无限期地等待子进程,则不能使用BufReader
.也许你想在子元素还在工作时显示一个进度条或计时器,而不给你任何输出.
如果您的操作系统碰巧不急于将数据返回到进程(更喜欢"完全读取"而不是"短读取"),则不能使用基于BufReader
的方法,因为在这种情况下,子进程打印的最后几行可能会出现在灰色区域:操作系统得到了它们,但它们不够大,无法填满BufReader
的缓冲区.
BufReader
仅限于Read
接口允许它对流执行的操作,它的阻塞程度不低于底层流.为了提高效率,它会将输入分块输入,告诉操作系统尽可能多地填充可用的缓冲区.
你可能想知道为什么在这里读取数据块是如此重要,为什么BufReader
不能一个字节一个字节地读取数据.问题是,要从流中读取数据,我们需要操作系统的帮助.另一方面,我们不是操作系统,我们与它隔离工作,以便在流程出现问题时不会弄乱它.因此,为了调用操作系统,需要转换到"内核模式",这也可能导致"上下文切换".这就是为什么调用操作系统读取每一个字节的成本很高.我们需要尽可能少的操作系统调用,因此我们可以批量获取流数据.
要想在没有阻塞的情况下等待流,你需要一个non-blocking stream.MIO promises to have the required non-blocking stream support for pipes,很可能是PipeReader,但我还没查过.
流的非阻塞性质应使其能够分块读取数据,而不管操作系统是否喜欢"短读".因为非阻塞流从不阻塞.如果流中没有数据,它会简单地告诉你.
在没有非阻塞流的情况下,您将不得不求助于生成线程,以便阻塞读取将在单独的线程中执行,从而不会阻塞主线程.您可能还希望逐字节读取流,以便在操作系统不喜欢"短读"时立即对行分隔符做出react .这里有一个有效的例子:https://gist.github.com/ArtemGr/db40ae04b431a95f2b78.
注意:下面是一个函数示例,它允许通过共享的字节向量监视程序的标准输出:
use std::io::Read;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;
/// Pipe streams are blocking, we need separate threads to monitor them without blocking the primary thread.
fn child_stream_to_vec<R>(mut stream: R) -> Arc<Mutex<Vec<u8>>>
where
R: Read + Send + 'static,
{
let out = Arc::new(Mutex::new(Vec::new()));
let vec = out.clone();
thread::Builder::new()
.name("child_stream_to_vec".into())
.spawn(move || loop {
let mut buf = [0];
match stream.read(&mut buf) {
Err(err) => {
println!("{}] Error reading from stream: {}", line!(), err);
break;
}
Ok(got) => {
if got == 0 {
break;
} else if got == 1 {
vec.lock().expect("!lock").push(buf[0])
} else {
println!("{}] Unexpected number of bytes: {}", line!(), got);
break;
}
}
}
})
.expect("!thread");
out
}
fn main() {
let mut cat = Command::new("cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("!cat");
let out = child_stream_to_vec(cat.stdout.take().expect("!stdout"));
let err = child_stream_to_vec(cat.stderr.take().expect("!stderr"));
let mut stdin = match cat.stdin.take() {
Some(stdin) => stdin,
None => panic!("!stdin"),
};
}
我使用几个助手来控制SSH会话:
try_s! (stdin.write_all (b"echo hello world\n"));
try_s! (wait_forˢ (&out, 0.1, 9., |s| s == "hello world\n"));
另请注意,异步std中的await
on read调用也会阻塞.它只是阻止了一个系统线程,而不是阻止了一个future 链(本质上是一个无堆栈的绿色线程).poll_read是非阻塞接口.在async-std#499年,我问开发人员这些API是否有短读保证.
另外,可能有a similar concern in Nom个:"we would want to tell the IO side to refill according to the parser's result (Incomplete or not)"
另外,了解如何在交叉术语中实现流读取可能会很有趣.对于Windows,在poll.rs中,他们使用的是本机WaitForMultipleObjects.在unix.rs年,他们使用的是mio poll
.