我正在Rust中制作一个小的ncurses应用程序,需要与子进程进行通信.我已经有了一个用Common Lisp编写的原型.我正试图重写它,因为CL对这样一个小工具使用了大量内存.

我在弄清楚如何与子流程交互时遇到了一些困难.

我现在做的大致是:

  1. 创建流程:

    let mut program = match Command::new(command)
        .args(arguments)
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
    {
        Ok(child) => child,
        Err(_) => {
            println!("Cannot run program '{}'.", command);
            return;
        }
    };
    
  2. 将其传递给一个无限循环(直到用户退出),该循环读取和处理输入,并监听如下输出(并将其写入屏幕):

    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
        match program.stdout {
            Some(ref mut out) => {
                let mut buf_string = String::new();
                match out.read_to_string(&mut buf_string) {
                    Ok(_) => output_viewer.append_string(buf_string),
                    Err(_) => return,
                };
            }
            None => return,
        };
    }
    

然而,调用read_to_string会阻止程序,直到进程退出.从我所看到的read_to_endread似乎也被阻挡了.如果我try 运行像ls这样立即退出的程序,它会工作,但是对于像pythonsbcl这样不退出的程序,它只会在我手动终止子进程后继续.

基于this answer,我将代码改为使用BufReader:

    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
        match program.stdout.as_mut() {
            Some(out) => {
                let buf_reader = BufReader::new(out);
                for line in buf_reader.lines() {
                    match line {
                        Ok(l) => {
                            output_viewer.append_string(l);
                        }
                        Err(_) => return,
                    };
                }
            }
            None => return,
        }
    }

然而,问题依然存在.它将读取所有可用的行,然后阻塞.由于该工具应该与任何程序一起工作,因此在try 读取之前,无法猜测输出何时结束.似乎也没有办法将超时设置为BufReader.

推荐答案

默认情况下,流是blocking.TCP/IP流、文件系统流、管道流,它们都是阻塞的.当你告诉一个流给你一个字节块时,它会停止并等待,直到它有给定的字节数,或者直到其他事情发生(一个interrupt,一个流结束,一个错误).

The operating systems are eager to return the data to the reading process, so if all you want is to wait for the next line and handle it as soon as it comes in then the method suggested by Shepmaster in Unable to pipe to or from spawned child process more than once (and also in his answer here) works.
Though in theory it doesn't have to work, because an operating system is allowed to make the BufReader wait for more data in read, but in practice the operating systems prefer the early "short reads" to waiting.

当需要处理多个流(如子进程的stdoutstderr)或多个进程时,这种基于BufReader的简单方法变得更加危险.例如,当子进程等待您排空其stderr管道,而您的进程在等待其空stdout管道时被阻塞时,基于BufReader的方法可能会死锁.

同样,如果不希望程序无限期地等待子进程,则不能使用BufReader.也许你想在子元素还在工作时显示一个进度条或计时器,而不给你任何输出.

如果您的操作系统碰巧不急于将数据返回到进程(更喜欢"完全读取"而不是"短读取"),则不能使用基于BufReader的方法,因为在这种情况下,子进程打印的最后几行可能会出现在灰色区域:操作系统得到了它们,但它们不够大,无法填满BufReader的缓冲区.

BufReader仅限于Read接口允许它对流执行的操作,它的阻塞程度不低于底层流.为了提高效率,它会将输入分块输入,告诉操作系统尽可能多地填充可用的缓冲区.

你可能想知道为什么在这里读取数据块是如此重要,为什么BufReader不能一个字节一个字节地读取数据.问题是,要从流中读取数据,我们需要操作系统的帮助.另一方面,我们不是操作系统,我们与它隔离工作,以便在流程出现问题时不会弄乱它.因此,为了调用操作系统,需要转换到"内核模式",这也可能导致"上下文切换".这就是为什么调用操作系统读取每一个字节的成本很高.我们需要尽可能少的操作系统调用,因此我们可以批量获取流数据.

要想在没有阻塞的情况下等待流,你需要一个non-blocking stream.MIO promises to have the required non-blocking stream support for pipes,很可能是PipeReader,但我还没查过.

流的非阻塞性质应使其能够分块读取数据,而不管操作系统是否喜欢"短读".因为非阻塞流从不阻塞.如果流中没有数据,它会简单地告诉你.

在没有非阻塞流的情况下,您将不得不求助于生成线程,以便阻塞读取将在单独的线程中执行,从而不会阻塞主线程.您可能还希望逐字节读取流,以便在操作系统不喜欢"短读"时立即对行分隔符做出react .这里有一个有效的例子:https://gist.github.com/ArtemGr/db40ae04b431a95f2b78.

注意:下面是一个函数示例,它允许通过共享的字节向量监视程序的标准输出:

use std::io::Read;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;

/// Pipe streams are blocking, we need separate threads to monitor them without blocking the primary thread.
fn child_stream_to_vec<R>(mut stream: R) -> Arc<Mutex<Vec<u8>>>
where
    R: Read + Send + 'static,
{
    let out = Arc::new(Mutex::new(Vec::new()));
    let vec = out.clone();
    thread::Builder::new()
        .name("child_stream_to_vec".into())
        .spawn(move || loop {
            let mut buf = [0];
            match stream.read(&mut buf) {
                Err(err) => {
                    println!("{}] Error reading from stream: {}", line!(), err);
                    break;
                }
                Ok(got) => {
                    if got == 0 {
                        break;
                    } else if got == 1 {
                        vec.lock().expect("!lock").push(buf[0])
                    } else {
                        println!("{}] Unexpected number of bytes: {}", line!(), got);
                        break;
                    }
                }
            }
        })
        .expect("!thread");
    out
}

fn main() {
    let mut cat = Command::new("cat")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .expect("!cat");

    let out = child_stream_to_vec(cat.stdout.take().expect("!stdout"));
    let err = child_stream_to_vec(cat.stderr.take().expect("!stderr"));
    let mut stdin = match cat.stdin.take() {
        Some(stdin) => stdin,
        None => panic!("!stdin"),
    };
}

我使用几个助手来控制SSH会话:

try_s! (stdin.write_all (b"echo hello world\n"));
try_s! (wait_forˢ (&out, 0.1, 9., |s| s == "hello world\n"));

另请注意,异步std中的await on read调用也会阻塞.它只是阻止了一个系统线程,而不是阻止了一个future 链(本质上是一个无堆栈的绿色线程).poll_read是非阻塞接口.在async-std#499年,我问开发人员这些API是否有短读保证.

另外,可能有a similar concern in Nom个:"we would want to tell the IO side to refill according to the parser's result (Incomplete or not)"

另外,了解如何在交叉术语中实现流读取可能会很有趣.对于Windows,在poll.rs中,他们使用的是本机WaitForMultipleObjects.在unix.rs年,他们使用的是mio poll.

Rust相关问答推荐

空字符串转换为Box字符串时是否分配?<>

在rust中如何修改一个盒装函数并将其赋回?

如何在Rust中表示仅具有特定大小的数组

通过不同的字段进行散列和排序的 struct (需要不同的EQ实现)

在使用#[NO_STD]时,如何在Rust中收到紧急消息?

闭包不会发送,即使它只捕获发送变量

不能在一个代码分支中具有不变的自身borrow ,而在另一个代码分支中具有可变的self borrow

我如何制作一个变异迭代器来锁定内部数据直到删除?

由于生存期原因,返回引用的闭包未编译

在 Rust 中,在需要引用 self 的 struct 体方法中使用闭包作为 while 循环条件

Rust Axum 框架 - 解包安全吗?

tokio::spawn 有和没有异步块

`use` 和 `crate` 关键字在 Rust 项目中效果不佳

Rust中是否可以在不复制的情况下从另一个不可变向量创建不可变向量?

Rust中的标记特征是什么?

(let b = MyBox(5 as *const u8); &b; ) 和 (let b = &MyBox(5 as *const u8); ) 之间有什么区别

打印 `format_args!` 时borrow 时临时值丢失

为什么在 macOS / iOS 上切换 WiFi 网络时 reqwest 响应会挂起?

Rust 为什么 (u32, u32) 的枚举变体的大小小于 (u64)?

为什么 Rust 标准库同时为 Thing 和 &Thing 实现特征?