我有我的应用程序由服务和http服务器组成.
- 服务有一些与操作系统API有关的事情,等待事件等,它有用于此目的的循环以及异步数据库写入.我用异步功能启动了这项服务.
- 服务器(用火箭编写)也使用异步请求hadnles,因为我目前使用的是使用异步的SeaORM.
Problem:个 当我用请求访问服务器时,它从不在处理程序中启动异步任务,除非我的服务循环中的事件被激发.当在我的服务中触发事件时,处理程序完成得很好,但在下一次请求时是相同的.
I tried to:个
- 使用
tokio::spawn
、tokio::task::spawn
,它们的工作方式完全相同(阻塞执行) - 据我所知,我不能产生普通的线程,因为我无论如何也不会有
.await
个线程. - 另外,我试着用
#[rocket::main(worker_threads = 4)]
来标记main
,这应该会产生更多的异步线程?但它仍然是一样的.
我怎么才能克服这一点呢?
我能想到的只是使用另一个ORM,比如diesel
,它不是异步的,因为我目前没有使用ORM以外的任何其他地方的异步,它可以工作,但我不认为这是一个好的解决方案.
另一个 idea 是向我的循环添加标记,这样在触发服务事件之前它不会被卡住,但这看起来也很奇怪,处理延迟仍然取决于此.
最小可重现示例:
extern crate rocket;
use std::sync::mpsc::{channel, Sender};
use once_cell::sync::OnceCell;
use rocket::serde::{json::Json, Deserialize, Serialize};
use rocket::State;
use sea_orm::{entity::prelude::*, Database, Set};
use sea_orm::{DbBackend, Schema};
use tokio::join;
use windows::{
w,
Win32::Foundation::HWND,
Win32::UI::{
Accessibility::{SetWinEventHook, HWINEVENTHOOK},
WindowsAndMessaging::{MessageBoxW, EVENT_SYSTEM_FOREGROUND, MB_OK},
},
};
thread_local! {
static TX: OnceCell<Sender<RawWindowEvent>>= OnceCell::new()
}
#[rocket::main]
async fn main() {
let db = Database::connect("sqlite://data.db?mode=rwc")
.await
.unwrap();
let builder = db.get_database_backend();
let stmt = builder.build(
Schema::new(DbBackend::Sqlite)
.create_table_from_entity(Entity)
.if_not_exists(),
);
db.execute(stmt).await.unwrap();
let server = rocket::build()
.manage(db.clone())
.mount("/", routes![get_events])
.launch();
let service = tokio::spawn(service(db.clone()));
join!(server, service);
}
#[get("/event")]
async fn get_events(db: &State<DatabaseConnection>) -> Json<Vec<Model>> {
let db = db as &DatabaseConnection;
let events = Entity::find().all(db).await.unwrap();
Json(events)
}
extern "system" fn win_event_hook_callback(
child_id: HWINEVENTHOOK,
hook_handle: u32,
event_id: HWND,
window_handle: i32,
object_id: i32,
thread_id: u32,
timestamp: u32,
) -> () {
let event = RawWindowEvent {
child_id,
hook_handle,
event_id,
window_handle,
object_id,
thread_id,
timestamp,
};
TX.with(|f| {
let tx: &Sender<RawWindowEvent> = f.get().unwrap();
tx.send(event).unwrap();
});
}
async fn service(db: DatabaseConnection) {
let (tx, cx) = channel::<RawWindowEvent>();
std::thread::spawn(move || {
TX.with(|f| f.set(tx)).unwrap();
let hook = unsafe {
SetWinEventHook(
EVENT_SYSTEM_FOREGROUND,
EVENT_SYSTEM_FOREGROUND,
None,
Some(win_event_hook_callback),
0,
0,
0,
)
};
let _ = unsafe { MessageBoxW(None, w!("Text"), w!("Text"), MB_OK) };
});
loop {
let event = cx.recv();
if (event.is_err()) {
break;
}
let event = event.unwrap();
// There goes some event processing with another windows api calls or simple calculations...
let record = ActiveModel {
timestamp: Set(event.timestamp),
..Default::default()
};
Entity::insert(record).exec(&db).await.unwrap();
}
}
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[serde(crate = "rocket::serde")]
#[sea_orm(table_name = "event")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub timestamp: u32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
#[derive(Debug, Clone, Copy)]
pub struct RawWindowEvent {
pub child_id: HWINEVENTHOOK,
pub hook_handle: u32,
pub event_id: HWND,
pub window_handle: i32,
pub object_id: i32,
pub thread_id: u32,
pub timestamp: u32,
}
Cargo.toml
个中的依赖项:
[dependencies]
dotenv = "0.15.0"
tokio = { version = "1.28.2", features = ["full"] }
sea-orm = { version = "^0.11", features = [ "sqlx-sqlite", "runtime-tokio-native-tls", "macros" ] }
rocket = {version = "0.5.0-rc.3", features = ["json"]}
once_cell = "1.17.1"
[dependencies.windows]
version = "0.48.0"
features = [
"Win32_Foundation",
"Win32_UI_Accessibility",
"Win32_UI_WindowsAndMessaging",
"Win32_System_Threading",
"Win32_System_ProcessStatus"
]