The async example是有用的,但对于Rust和Tokio来说是新手,我正在努力解决如何一次处理N个请求,使用来自向量的URL,并 for each URL创建一个响应HTML的迭代器作为字符串.

这怎么可能呢?

推荐答案

Concurrent requests

从0.10开始:

use futures::{stream, StreamExt}; // 0.3.5
use reqwest::Client; // 0.10.6
use tokio; // 0.2.21, features = ["macros"]

const CONCURRENT_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = &client;
            async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            }
        })
        .buffer_unordered(CONCURRENT_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        })
        .await;
}

stream::iter(urls)

stream::iter

取一组字符串并将其转换为Stream.

.map(|url| {

StreamExt::map

对流中的每个元素运行异步函数,并将元素转换为新类型.

let client = &client;
async move {

获取对Client的显式引用,并将引用(而不是原始Client)移动到匿名异步块中.

let resp = client.get(url).send().await?;

使用Client的连接池启动异步GET请求,并等待请求.

resp.bytes().await

请求并等待响应的字节数.

.buffer_unordered(N);

StreamExt::buffer_unordered

将期货流转换为期货价值流,同时执行期货.

bodies
    .for_each(|b| {
        async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        }
    })
    .await;

StreamExt::for_each

将流转换回单个future ,打印出沿途接收的数据量,然后等待future 完成.

另见:

无限制执行

如果愿意,还可以将迭代器转换为future 迭代器,并使用future::join_all:

use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = future::join_all(urls.into_iter().map(|url| {
        let client = &client;
        async move {
            let resp = client.get(url).send().await?;
            resp.bytes().await
        }
    }))
    .await;

    for b in bodies {
        match b {
            Ok(b) => println!("Got {} bytes", b.len()),
            Err(e) => eprintln!("Got an error: {}", e),
        }
    }
}

我鼓励您使用第一个示例,因为您通常希望限制并发性,bufferbuffer_unordered有助于提高并发性.

Parallel requests

并发请求通常已经足够好了,但有时您需要处理need个并行请求.在这种情况下,您需要生成一个任务.

use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]

const PARALLEL_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let urls = vec!["https://api.ipify.org"; 2];

    let client = Client::new();

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = client.clone();
            tokio::spawn(async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            })
        })
        .buffer_unordered(PARALLEL_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(Ok(b)) => println!("Got {} bytes", b.len()),
                Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
                Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
            }
        })
        .await;
}

主要区别在于:

  • 我们使用tokio::spawn在单独的tasks中执行工作.
  • 我们必须赋予每项任务它自己的reqwest::Client.作为recommended,我们克隆了一个共享客户机以利用连接池.
  • 当任务无法加入时,还有一个额外的错误情况.

另见:

Rust相关问答推荐

程序退出后只写入指定管道的数据

Rust TcpStream不能在读取后写入,但可以在不读取的情况下写入.为什么?

使用模块中的所有模块,但不包括特定模块

使用铁 rust S还原对多个数组执行顺序kronecker积

用 rust 蚀中的future 展望 struct 的future

无法从流中读取Redis请求

如何将单个 struct 实例与插入器一起传递到Rust中的映射

铁 rust 中双倍或更多换行符的更好练习?

根据填充系数以相对大小在给定空间中布局项目

为什么我需要 to_string 函数的参考?

当我编译 Rust 代码时,我是否缺少 AVX512 的目标功能?

使用 pyo3 将 Rust 转换为 Python 自定义类型

std mpsc 发送者通道在闭包中使用时关闭

返回迭代器考虑静态生命周期类型

Rust中如何实现一个与Sized相反的负特性(Unsized)

在线程中运行时,TCPListener(服务器)在 ip 列表中的服务器实例之前没有从客户端接受所有客户端的请求

判断对象是 PyDatetime 还是 Pydate 的实例?

类型组的通用枚举

有没有办法使用 NASM 语法进行内联汇编?

函数参数的 Rust 功能标志