我已经构建了一个概念验证,如果阅读器是包装在tokio_io::AsyncRead使用FuturesAsyncReadCompatExtreqwest::async_impl::Response,那么当在蜂窝/WiFi/有线网络之间切换时,tokio::io::copy将永远挂起.

这发生在MacOS和iOS上,这是我可以访问的平台.

#[tokio::main()]
async fn main() {
    let mut target_file = std::env::current_dir().unwrap();
    target_file.push("bbb.mp4");
    println!("File will be downloaded to {target_file:?}");
    let client = ClientBuilder::default()
        // Doesn't seem to help
        .tcp_keepalive(Some(Duration::from_secs(1)))
        // Doesn't seem to help
        .connect_timeout(Duration::from_secs(1))
        .build()
        .unwrap();
    let response = client.get("http://distribution.bbb3d.renderfarming.net/video/mp4/bbb_sunflower_native_60fps_stereo_abl.mp4").send().await.unwrap();
    match response_to_file(response, target_file).await {
        Ok(_) => println!("Everything OK"),
        Err(err) => eprintln!("{err}"),
    }
}

async fn response_to_file(response: Response, path: PathBuf) -> Result<(), ApiError> {
    let download = response.bytes_stream();

    let download = download
        .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
        .into_async_read();

    let download = download.compat();

    // Wrap download to be able to get progress in terminal
    let mut download = ProgressReadAdapter::new(download);

    let temp_file = tokio::task::spawn_blocking(NamedTempFile::new)
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    let mut outfile = tokio::fs::File::create(temp_file.path())
        .await
        .wrap_api_err()?;

    // Code hangs here forever after a network switch
    tokio::io::copy(&mut download, &mut outfile)
        .await
        .wrap_api_err()?;

    outfile.flush().await.wrap_api_err()?;

    let _persisted_file: File = tokio::task::spawn_blocking(move || temp_file.persist(path))
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    Ok(())
}

上面的代码中有一些概念,比如可以在GitHub上找到的wrap_api_err,但我认为它们对于分析问题并不重要.

主要的问题是--在切换网络后,我如何才能用Err退出response_to_file

第二个问题可能是-如果没有简单的方法来修复此代码,如何将网络资源的流副本复制到出现错误时实际干净地退出的临时文件?

推荐答案

我终于能够得出一些结论.

CURL的gihub页面上的This issue让我相信,原因不是reqwest,而是MacOS/iOS上的底层网络堆栈.

我问了this question on seanmonstar/reqwest,肖恩回答说,类似的 idea 已经存在问题(reqwest的低/无活动超时).

基本上,我认为正在发生的情况是,网络堆栈收到了我出色的响应,并试图继续从底层的TCP连接中读取更多数据,即使原始的WiFi连接已经"断开".根据cURL的讨论,这是一件刚刚发生的事情,而TCP/HTTP不是错误的,所以它不能真正被客户端库检测到.

客户端库can所做的是检测没有数据进入响应(至少在reqwest的情况下).目前,reqwest没有内置这一功能,但只需做一些工作就可以模拟它.

this StackOverflow answer为起点,我构建了一个AsyncRead包装器,该包装器检测停止的响应,并在经过给定时间后干净地退出并返回错误.

完整的代码可以在my GitHub repo bes/network-switch-hang上找到,它最初是错误概念验证的repo,但现在也是一个答案.

为了完整性,这里是代码中最重要的部分,至少在reqwest年前,会发展出一种检测延迟响应的本地方法.

main.rs

#[tokio::main()]
async fn main() {
    let mut target_file = std::env::current_dir().unwrap();
    target_file.push("bbb.mp4");
    println!("File will be downloaded to {target_file:?}");
    let client = ClientBuilder::default()
        .connect_timeout(Duration::from_secs(5))
        .build()
        .unwrap();
    let response = client.get("http://distribution.bbb3d.renderfarming.net/video/mp4/bbb_sunflower_native_60fps_stereo_abl.mp4").send().await.unwrap();
    match response_to_file(response, target_file).await {
        Ok(_) => println!("Everything OK"),
        Err(err) => eprintln!("{err}"),
    }
}

async fn response_to_file(response: Response, path: PathBuf) -> Result<(), ApiError> {
    let download = response.bytes_stream();

    let download = download
        .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
        .into_async_read();

    let download = download.compat();

    // Wrap download to be able to detect stalled downloads
    let mut download = StalledReadMonitor::new(download);

    let temp_file = tokio::task::spawn_blocking(NamedTempFile::new)
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    let mut outfile = tokio::fs::File::create(temp_file.path())
        .await
        .wrap_api_err()?;

    // Code used to hang here, but will now exit with an error after being stalled for
    // more than 5 seconds. See StalledReadMonitor for details.
    tokio::io::copy(&mut download, &mut outfile)
        .await
        .wrap_api_err()?;

    outfile.flush().await.wrap_api_err()?;

    let _persisted_file: File = tokio::task::spawn_blocking(move || temp_file.persist(path))
        .await
        .wrap_api_err()?
        .wrap_api_err()?;

    Ok(())
}

stalled_monitor.rs

/// This monitor can wrap an [AsyncRead] and make sure that it is making progress.
/// If the inner reader isn't making progress, we can stop the download.
/// The monitoring is done by keeping an [Interval] and measuring progress
/// by counting the number of bytes during each interval.
///
/// Please note that this monitor won't stop the download after _exactly_
/// five seconds of inactivity, but rather five seconds after the last interval
/// that had data. So the worst case is 10 seconds, and the averge will be 7.5 seconds.
#[pin_project]
pub struct StalledReadMonitor<R: AsyncRead> {
    #[pin]
    inner: R,
    interval: Interval,
    interval_bytes: usize,
}

impl<R: AsyncRead> StalledReadMonitor<R> {
    pub fn new(inner: R) -> Self {
        Self {
            inner,
            interval: interval_at(
                Instant::now().add(Duration::from_millis(5_000)),
                Duration::from_millis(5_000),
            ),
            interval_bytes: 0,
        }
    }
}

impl<R: AsyncRead> AsyncRead for StalledReadMonitor<R> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<()>> {
        let this = self.project();

        let before = buf.filled().len();
        let mut result = this.inner.poll_read(cx, buf);
        let after = buf.filled().len();

        *this.interval_bytes += after - before;
        match this.interval.poll_tick(cx) {
            Poll::Pending => {}
            Poll::Ready(_) => {
                if *this.interval_bytes == 0 {
                    println!("Rate is too low, aborting fetch");
                    result = Poll::Ready(Err(std::io::Error::new(
                        ErrorKind::TimedOut,
                        StalledError {},
                    )))
                }
                *this.interval_bytes = 0;
            }
        };
        result
    }
}

Rust相关问答推荐

为什么我们不能通过指针算法将Rust原始指针指向任意地址?'

在HashMap中插入Vacant条目的可变借位问题

在actix—web中使用Redirect或NamedFile响应

如何使用Match比较 struct 中的值

如何实现泛型枚举的`Serde::Desialize`特性

默认特征实现中的生命周期问题

如何正确地将App handler传递给Tauri中的其他模块?

当T不执行Copy时,如何返回Arc Mutex T后面的值?

为什么';t std::cell::ref使用引用而不是非空?

什么是`&;[][..]`铁 rust 里的刻薄?

在 Rust 中,在需要引用 self 的 struct 体方法中使用闭包作为 while 循环条件

使用 select 处理 SIGINT 和子等待!无阻塞

结果流到 Vec 的结果:如何避免多个into_iter和collect?

Rust 并行获取对 ndarray 的每个元素的可变引用

Some(v) 和 Some(&v) 有什么区别?

在 Rust 中,我如何处理请求 javascript 的页面?

使用 serde_json 进一步处理字段

为什么分配对变量的引用使我无法返回它

如何在没有 `make_contiguous()` 的情况下对 VecDeque 进行排序或反转?

如果返回类型是通用的,我可以返回 &str 输入的一部分吗?