上下文

我正在读到this blog页.提供了一些代码:

async fn parse_line(socket: &TcpStream) -> Result<String, Error> {
    let len = socket.read_u32().await?;
    let mut line = vec![0; len];
    socket.read_exact(&mut line).await?;
    let line = str::from_utf8(line)?;
    Ok(line)
}

loop {
    select! {
        line_in = parse_line(&socket) => {
            if let Some(line_in) = line_in {
                broadcast_line(line_in);
            } else {
                // connection closed, exit loop
                break;
            }
        }
        line_out = channel.recv() => {
            write_line(&socket, line_out).await;
        }
    }
}

作者声称,如果channel在parse_line执行时收到一条消息,则parse_line可能会以损坏状态结束.

问题

在什么时候parse_line可以被打断?是在any点吗?根据我目前的理解--这可能是错误的--Rust可以在Await语句中切换线程上的任务,但在这些点上存储状态,以便可以继续工作.

我想像的是

我设想在parse_line中,Rust将字节加载到line变量中.在读取了一定数量的字节之后(对于某个ASCII字符,可能只有一半的字节),并且在等待更多字节进入时,channel接收到一些东西,并且上下文切换.

完成channel.recv()任务后,Rust返回读取输入,但是提供字节的用户取消了请求,现在没有其他内容可读取.

现在str::from_utf8(line)?抛出UTF-8错误,因为line具有不完整的ASCII字符.

推荐答案

TL;DR:不是在any点,只在.await秒.


异步代码被降低到实现Future的状态机中..await在循环中调用Future::poll(),在调用方返回Poll::Pending并在Poll::Ready结束时暂停调用方.这本质上是运行内在的future ,直到完成.

然而,如果我们拨打poll()几次,但没有完成future 会发生什么?在这种情况下,我们调用poll()的几次已经将future 推进到了执行的某个时间点,到此为止.我们有cancelled个future .

关键的观察是,这个执行点必须在.await以内.这是因为同步,没有-.await代码在future 转换为poll()实现中的直接同步代码,我们不能止步于此.我们只能在从poll()返回后停止,这发生在.await的中间或完成时.

然而,并非所有的期货都是cancellation safe.有些人,像你的parse_line(),失go 工作时,他们被取消.如果我们在第二个.await处取消,则长度已经从套接字中读取(并丢弃),但主体还没有,因此我们丢失了该长度.我们无法恢复它,并且下一次调用该函数时,它将看到来自套接字的损坏数据(或者只是跳过一条记录).

select!取消除第一个完成的期货之外的所有期货,因此此代码有一个错误.

解决办法是永远不要失go future ,而是把它留到以后:

let mut parse_line_fut = std::pin::pin!(parse_line(&socket));
loop {
    select! {
        line_in = parse_line_fut.as_mut() => {
            if let Ok(line_in) = line_in {
                broadcast_line(line_in);

                parse_line_fut.set(parse_line(&socket));
            } else {
                // connection closed, exit loop
                break;
            }
        }
        line_out = channel.recv() => {
            write_line(&socket, line_out).await;
        }
    }
}

Rust相关问答推荐

这个规则关于或模式到底是什么意思?如果表达片段的类型与p_i|q_i...&q;不一致,就会形成

integer cast as pointer是什么意思

S,一般性状和联想型性状有什么不同?

什么时候和为什么S最好是按值或引用传递简单类型

在Rust中,Box:ed struct 与普通 struct 在删除顺序上有区别吗?

写入引用会更新基础值,但引用会打印意外的值

在铁 rust 中,如何一次只引用几件事中的一件?

如何重命名 clap_derive 中的子命令占位符?

如何强制匹配的返回类型为()?

为什么我们有两种方法来包含 serde_derive?

在 Rust 中实现资源消耗的安全包装器

在每个循环迭代中删除borrow

切片不能被 `usize` 索引?

改变不实现克隆的 dioxus UseState struct

发生移动是因为 `data` 的类型为 `Vec`,它没有实现 `Copy` 特性

你能告诉我如何在 Rust 中使用定时器吗?

在 Rust 中为泛型 struct 编写一次特征绑定

在 Rust 中枚举字符串的最佳方式? (字符()与 as_bytes())

如何在宏中的多个参数上编写嵌套循环?

如何在 Rust 中使用特征标志来捕获多行代码?