我有我的应用程序由服务和http服务器组成.

  • 服务有一些与操作系统API有关的事情,等待事件等,它有用于此目的的循环以及异步数据库写入.我用异步功能启动了这项服务.
  • 服务器(用火箭编写)也使用异步请求hadnles,因为我目前使用的是使用异步的SeaORM.

Problem:个 当我用请求访问服务器时,它从不在处理程序中启动异步任务,除非我的服务循环中的事件被激发.当在我的服务中触发事件时,处理程序完成得很好,但在下一次请求时是相同的.

I tried to:

  • 使用tokio::spawntokio::task::spawn,它们的工作方式完全相同(阻塞执行)
  • 据我所知,我不能产生普通的线程,因为我无论如何也不会有.await个线程.
  • 另外,我试着用#[rocket::main(worker_threads = 4)]来标记main,这应该会产生更多的异步线程?但它仍然是一样的.

我怎么才能克服这一点呢? 我能想到的只是使用另一个ORM,比如diesel,它不是异步的,因为我目前没有使用ORM以外的任何其他地方的异步,它可以工作,但我不认为这是一个好的解决方案. 另一个 idea 是向我的循环添加标记,这样在触发服务事件之前它不会被卡住,但这看起来也很奇怪,处理延迟仍然取决于此.

最小可重现示例:

extern crate rocket;

use std::sync::mpsc::{channel, Sender};

use once_cell::sync::OnceCell;
use rocket::serde::{json::Json, Deserialize, Serialize};
use rocket::State;
use sea_orm::{entity::prelude::*, Database, Set};
use sea_orm::{DbBackend, Schema};
use tokio::join;
use windows::{
    w,
    Win32::Foundation::HWND,
    Win32::UI::{
        Accessibility::{SetWinEventHook, HWINEVENTHOOK},
        WindowsAndMessaging::{MessageBoxW, EVENT_SYSTEM_FOREGROUND, MB_OK},
    },
};

thread_local! {
    static TX: OnceCell<Sender<RawWindowEvent>>= OnceCell::new()
}

#[rocket::main]
async fn main() {
    let db = Database::connect("sqlite://data.db?mode=rwc")
        .await
        .unwrap();

    let builder = db.get_database_backend();

    let stmt = builder.build(
        Schema::new(DbBackend::Sqlite)
            .create_table_from_entity(Entity)
            .if_not_exists(),
    );

    db.execute(stmt).await.unwrap();

    let server = rocket::build()
        .manage(db.clone())
        .mount("/", routes![get_events])
        .launch();

    let service = tokio::spawn(service(db.clone()));

    join!(server, service);
}

#[get("/event")]
async fn get_events(db: &State<DatabaseConnection>) -> Json<Vec<Model>> {
    let db = db as &DatabaseConnection;

    let events = Entity::find().all(db).await.unwrap();

    Json(events)
}

extern "system" fn win_event_hook_callback(
    child_id: HWINEVENTHOOK,
    hook_handle: u32,
    event_id: HWND,
    window_handle: i32,
    object_id: i32,
    thread_id: u32,
    timestamp: u32,
) -> () {
    let event = RawWindowEvent {
        child_id,
        hook_handle,
        event_id,
        window_handle,
        object_id,
        thread_id,
        timestamp,
    };

    TX.with(|f| {
        let tx: &Sender<RawWindowEvent> = f.get().unwrap();

        tx.send(event).unwrap();
    });
}

async fn service(db: DatabaseConnection) {
    let (tx, cx) = channel::<RawWindowEvent>();

    std::thread::spawn(move || {
        TX.with(|f| f.set(tx)).unwrap();

        let hook = unsafe {
            SetWinEventHook(
                EVENT_SYSTEM_FOREGROUND,
                EVENT_SYSTEM_FOREGROUND,
                None,
                Some(win_event_hook_callback),
                0,
                0,
                0,
            )
        };

        let _ = unsafe { MessageBoxW(None, w!("Text"), w!("Text"), MB_OK) };
    });

    loop {
        let event = cx.recv();

        if (event.is_err()) {
            break;
        }

        let event = event.unwrap();

        // There goes some event processing with another windows api calls or simple calculations...

        let record = ActiveModel {
            timestamp: Set(event.timestamp),
            ..Default::default()
        };

        Entity::insert(record).exec(&db).await.unwrap();
    }
}

#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[serde(crate = "rocket::serde")]
#[sea_orm(table_name = "event")]
pub struct Model {
    #[sea_orm(primary_key)]
    pub id: i32,
    pub timestamp: u32,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

#[derive(Debug, Clone, Copy)]
pub struct RawWindowEvent {
    pub child_id: HWINEVENTHOOK,
    pub hook_handle: u32,
    pub event_id: HWND,
    pub window_handle: i32,
    pub object_id: i32,
    pub thread_id: u32,
    pub timestamp: u32,
}

Cargo.toml个中的依赖项:

[dependencies]
dotenv = "0.15.0"

tokio = { version = "1.28.2", features = ["full"] }

sea-orm = { version = "^0.11", features = [ "sqlx-sqlite", "runtime-tokio-native-tls", "macros" ] }

rocket = {version = "0.5.0-rc.3", features = ["json"]}

once_cell = "1.17.1"

[dependencies.windows]
version = "0.48.0"
features = [
    "Win32_Foundation",
    "Win32_UI_Accessibility",
    "Win32_UI_WindowsAndMessaging",
    "Win32_System_Threading",
    "Win32_System_ProcessStatus"
]

推荐答案

您使用的是同步通道,因此阻塞了运行时.使用tokio:tokio::sync::mpsc中定义的频道.

Rust相关问答推荐

如何在Rust中为具有多个数据持有者的enum变体编写文档 comments ?

空字符串转换为Box字符串时是否分配?<>

从Rust调用C++虚拟方法即使在成功执行之后也会引发Access违规错误

使用Rust s serde_json对混合数据类型进行优化'

抽象RUST中的可变/不可变引用

如何使用syn插入 comments ?

如何为rust trait边界指定多种可能性

在Rust中是否可以使用Rc自动化约束传播

Rust编译器似乎被结果类型与anyhow混淆

链表堆栈溢出

为什么BufReader实际上没有缓冲短寻道?

Rust 并行获取对 ndarray 的每个元素的可变引用

Google chrome 和 Apple M1 中的计算着色器

从Rust 的临时文件中创建引用是什么意思?

如何在 Rust 中按 char 对字符串向量进行排序?

如何在 Rust 中将 Vec> 转换为 Vec>?

在 Rust 中,将可变引用传递给函数的机制是什么?

Rust 跨同一文件夹中文件的可见性

基于名称是否存在的条件编译

为什么当borrow 变量发生变化时,borrow 变量不会改变?