对于同一个功能,我有两种方法,我正在试着看看哪一种更惯用或更有表现力.

query_many_points()函数的目标是接受两个大小相同的数组,并通过query_point()函数将它们并发地发送到API,返回一个Responses的向量--一个包含从API反序列化的数据的定制 struct .API响应的顺序必须与输入的顺序完全匹配.我相信这两种方法都能做到这一点.

我最初是通过收集VEC中的期货,然后迭代到await并按顺序展开结果来编写这篇文章的.

Approach 1: Vec of handles

fn query_many_points(xs: &[f64], ys: &[f64]) -> anyhow::Result<Vec<Response>> {
    let agent = ureq::agent();
    let semaphore = Arc::new(Semaphore::new(TASKS_LIMIT));

    // Concurrently query all given coordinates, limited to TASKS_LIMIT concurrent tasks
    // Returned Vec<Response> preserves order of input coordinates
    let runtime = Runtime::new()?;
    runtime.block_on(async {
        // Create a vector of task handles, preserving order with input coordinates
        let handles = xs
            .iter()
            .zip(ys.iter())
            .map(|(&x, &y)| {
                // Obtain permit from semaphore when available
                let permit = semaphore.clone().acquire_owned();
                let agent = agent.clone();
                // Spawn new task and query point, returning handle containing Result<Response>
                tokio::spawn(async move {
                    let result = query_point(x, y, &agent);
                    drop(permit);
                    result
                })
            })
            .collect::<Vec<_>>(); // this line is where the approaches start to differ

        // Await all tasks to complete in order and collect into Vec<Response>
        let mut responses = Vec::with_capacity(handles.len());
        for handle in handles {
            responses.push(handle.await??);
        }

        Ok(responses)
    })
}

然后我找出了大约futures::stream::FuturesOrdered个,试了一试,想出了第二个解决方案.

Approach 2: FuturesOrdered

fn query_many_points(xs: &[f64], ys: &[f64]) -> anyhow::Result<Vec<Response>> {
    let agent = ureq::agent();
    let semaphore = Arc::new(Semaphore::new(TASKS_LIMIT));

    // Concurrently query all given points, limited to TASKS_LIMIT concurrent tasks
    // Returned Vec<Response> preserves order of input points
    let runtime = Runtime::new()?;
    runtime.block_on(async {
        // Create a FuturesOrdered of task handles
        let handles = xs
            .iter()
            .zip(ys.iter())
            .map(|(&x, &y)| {
                // Obtain permit from semaphore when available
                let permit = semaphore.clone().acquire_owned();
                let agent = agent.clone();
                // Spawn new task and query point, returning handle containing Result<Response>
                tokio::spawn(async move {
                    let result = query_point(x, y, &agent);
                    drop(permit);
                    result
                })
            })
            .collect::<FuturesOrdered<_>>(); // this line is where the approaches start to differ

        // Await completion of all tasks in order and collect into Vec<Response>
        handles.try_collect::<Vec<_>>().await?.into_iter().collect()
    })
}

哪种方法是最好的?还有没有其他可以改进的地方?两者维护的秩序完全相同,对吗?

推荐答案

考虑一下这样的情况,您不调用tokio::spawn,而只是从map返回future 本身.

如果你迭代Vec个期货,并在每个期货上调用await,那么一次只有一个future 开始,而下一个future 只有在前一个完成后才会开始.

如果你使用FuturesOrdered,它将一次启动多个期货,潜在地允许更多的并发性,因为一个启动的future 可以取得进展,而另一个必须等待.


在您的特定示例中,这并不重要--您 for each 将来派生一个任务,Tokio将开始自己执行它们(可能在另一个线程上),而您所连接的期货不会做任何工作,只会"等待任务完成".你最好还是用VEC吧.

Rust相关问答推荐

borrow 和内部IntoIterator

rust 蚀生命周期 行为

S在Cargo.toml中添加工作空间开发依赖关系的正确方法是什么?

`RwLockWriteGuard_,T`不实现T实现的特征

关于 map 闭合求和的问题

解析程序无法在Cargo 发布中 Select 依赖版本

如何轮询 Pin>?

如何对一个特征的两个实现进行单元测试?

要求类型参数有特定的大小?

在 Rust 中忽略 None 值的正确样式

有什么办法可以追踪泛型的单态化过程吗?

实现泛型的 Trait 方法中的文字

Rust 中的生命周期:borrow 的 mut 数据

&self 参数在 trait 的功能中是必需的吗?

Rust 中函数的类型同义词

为什么拥有 i32 所有权的函数需要它是可变的?

为什么基于 clap::Parser 读取的大量数字进行计算比硬编码该数字时慢?

如何为枚举中的单个或多个值返回迭代器

为什么这个闭包没有比 var 长寿?

如何在 Rust 的泛型函​​数中同时使用非拥有迭代器和消费迭代器?