我正在从rusqlite迁移,在那里我使用get_interrupt_handle立即中止来自另一个线程的查询(当用户更改过滤器参数时).

以下是我当前代码的一个示例.我最多只能在每await次之前添加一个中断判断,但如果初始查询需要很长时间才能返回第一个结果,那么这并不管用.

struct Query {
    title: String,
}

fn start_async(requests: crossbeam::channel::Receiver<Query>) {
    thread::spawn(move || {
        let runtime = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();
        runtime.block_on(run_db_thread(requests));
    });
}

async fn run_db_thread(requests: crossbeam::channel::Sender<Query>) {
    let connection = SqliteConnection::connect("test.sqlite").await?;
    loop {
        if let Ok(query) = requests.recv() {
            do_query(&connection, &query).await?;
        }
    }
}

async fn do_query(connection: &SqliteConnection, query: &Query) -> Result<(), Box<dyn Error>> {
    let mut stream = sqlx::query("SELECT title, authors, series FROM Books where title like ?")
        .bind(&format!("%{}%", query.title))
        .fetch(&connection);
    while let Some(row) = stream.next().await {
        let (title, authors, series) = row?;
        println!("{} {} {}", title, authors, series);
    }
}

当新的Query到达通道时,有没有办法中断正在运行的SQLX执行?如果需要的话,我很乐意单独发个信号.

推荐答案

All期货本质上是可取消的--这是async相对于阻塞多线程代码的好处(和危害)之一.你只是放弃future ,而不是进一步调查它.

您需要做的第一件事是将阻塞通道更改为async通道-这允许在运行查询时混合判断通道.然后,您可以使用各种future 的操作工具来决定是否使用continue来运行查询.我决定用select!来做这件事,它会轮询几个期货,并根据最先完成的那个来运行代码.

(这方面可能有更好的工具;我熟悉异步的工作原理,但还没有编写很多复杂的异步代码.)

use futures::future::OptionFuture;
use std::time::Duration;
use tokio::sync::mpsc;

async fn run_db_thread(mut requests: mpsc::Receiver<Query>) {
    // This variable holds the current query being run, if there is one
    let mut current_query_future = OptionFuture::default();

    loop {
        tokio::select! {
            // If we receive a new query, replace the current query with it.
            Some(query) = requests.recv() => {
                println!("Starting new query {query:?}");
                current_query_future = OptionFuture::from(Some(Box::pin(async move {
                    let answer = do_query(&query).await;
                    println!("Finished query {query:?} => {answer:?}");
                    answer
                })));
                // Note that we did not `.await` the new future.
            }

            // Poll the current query future, and check if it is done yet.
            Some(_answer) = &mut current_query_future => {
                // Stop polling the completed future.
                current_query_future = None.into();
            }

            // We get here if both of the above branches saw None, which means that the
            // channel is closed, *and* there is no query to run.
            else => {
                println!("Channel closed; run_db_thread() exiting");
                break;
            }
        }
    }
}

/// Example to drive the loop.
#[tokio::main]
async fn main() {
    let (sender, receiver) = mpsc::channel(1);
    tokio::spawn(run_db_thread(receiver));

    for (i, delay) in [1000, 1000, 1, 1, 1, 1000, 1000].into_iter().enumerate() {
        sender.send(Query(i)).await.unwrap();
        tokio::time::sleep(Duration::from_millis(delay)).await;
    }
    println!("main() exiting");
}

// Skeleton data types to make the example compile
#[derive(Debug)]
#[allow(dead_code)]
struct Query(usize);
#[derive(Debug)]
struct Answer;
async fn do_query(_q: &Query) -> Answer {
    tokio::time::sleep(Duration::from_millis(100)).await;
    Answer
}

此示例代码打印:

Starting new query Query(0)
Finished query Query(0) => Answer
Starting new query Query(1)
Finished query Query(1) => Answer
Starting new query Query(2)
Starting new query Query(3)
Starting new query Query(4)
Starting new query Query(5)
Finished query Query(5) => Answer
Starting new query Query(6)
Finished query Query(6) => Answer
main() exiting

也就是说,查询0、1、5和6已完成,但查询2、3和4在它们可以完成之前被到达的新查询取消.

Rust相关问答推荐

当为a Self:IntoIterator设置trait bind `时,获取`a T `不是迭代器"&'"<'>&'

无法在线程之间安全地发送future (&Q;)&错误

rust 蚀生命周期 行为

为什么BitVec缺少Serialize trait?

具有对同一类型的另一个实例的可变引用的

默认特征实现中的生命周期问题

如何循环遍历0..V.len()-1何时v可能为空?

是否可以使用Rust宏来构建元组的项?

我是否可以在Ruust中修改 struct 实例上的字符串,以使其在修改后具有相同的字符串生存期?

如何迭代存储在 struct 中的字符串向量而不移动它们?

为什么 js_sys Promise::new 需要 FnMut?

Rust 如何将链表推到前面?

将引用移动到线程中

在1.5n次比较中找到整数向量中的最大和次大整数

为什么我不能克隆可克隆构造函数的Vec?

Rust 中 `Option` 的内存开销不是常量

是否可以预测堆栈溢出?

Rust - 在线程之间不安全地共享没有互斥量的可变数据

您如何使用枚举反序列化字符串,其中任何其他值反序列化为新类型变体同时保留字符串?

有没有办法使用 NASM 语法进行内联汇编?