我目前正在Rocket上构建一个API引擎,和现代应用程序一样,我想包括一个自动调度程序,以便在Rocket API运行时运行异步任务(也称为crons).

我决定使用火箭整流罩来启用所说的日程表建立在东京-日程表上的"升空"事件.

事实是,我设置了所有需要的部分(登录到数据库、 struct 和特征),但在编译关于我的整流罩的生命周期时,我得到了一个奇怪的错误.

下面是我的代码演练:

->这是我的"命令"模块,包含与应用程序一起构建和移动命令(也称为crons)的所有 struct 部分.

/// Synthetize a command execution result.
pub enum CommandResult {
    SUCCESS,
    ERROR(String),
    SKIPPED(String),
}

/// Trait to define structs as runnable async crons with tokio_scheduler
#[async_trait]
pub trait Command: Send + Sync {
    /// returns the current command name
    fn get_command_name(&self) -> String;

    /// returns the current command argument payload
    fn get_command_args(&self) -> Option<HashMap<String, String>>;

    /// returns the "cron_middleware"
    fn get_cron_middleware(&self) -> CronLogMiddleware<CronLogRepository>;

    /// real body for the command execution, must be overriden in impls.
    async fn do_run(&self) -> Result<CommandResult>;

    /// starts the command process by validating command lock, and registering an open cron log into database.
    async fn begin(&self) -> Result<CronLog> {
        // ...
    }

    /// ends the command process by releasing command lock, and registering the result of the command to an opened cron log into database.
    async fn end(&self, cron_log: &CronLog, result: CommandResult) -> Result<()> {
        // ...
    }

    /// hidden runner of commands, uses begin, end and do_run, and will be used by runner.
    async fn run(&self) -> Result<()> {
        // ...
    }

    /// generates a unique key for this command name + args, for locks purposes
    fn generate_unicity_key(&self) -> String {
        // ...
    }

    /// converts command args as a string payload
    #[allow(clippy::or_fun_call)]
    fn get_command_args_as_string(&self) -> String {
        // ...
    }
}

/// struct to move a command + its cron schedule into scheduler.
pub struct CommandHandle<T: Command + ?Sized + Send + Sync> {
    pub command: Box<T>,
    pub schedule: String,
}

然后,出于测试目的,我创建了一个测试命令 struct ,如下所示:

/// a testing purpose command
pub struct TestCommand {
    pub name: String,
    pub args: Option<HashMap<String, String>>,
    pub cron_log_middleware: CronLogMiddleware<CronLogRepository>,
}

#[async_trait]
impl Command for TestCommand {
    // accessors (get_... functions)

    async fn do_run(&self) -> Result<CommandResult> {
        debug!("executed !");

        Ok(CommandResult::SUCCESS)
    }
}

火箭建造者看起来是这样的:

    let mut sched = CronScheduler::default();

    sched.add_cron(CommandHandle {
        command: Box::new(TestCommand {
            name: "app:test".to_string(),
            args: None,
            cron_log_middleware: cron_log_middleware.clone(),
        }),
        schedule: "*/1 * * * *".to_string(),
    });

    // then I add sched to rocket with .manage()

整流罩看起来是这样的:


/// a rocket fairing enabling async tasks (eg crons) while rocket is launching
#[derive(Default)]
pub struct CronScheduler {
    crons: Vec<CommandHandle<dyn Command>>,
}

impl CronScheduler {
    /// adds a cron (eg CommandHandle with a given command) to run with the scheduler.
    pub fn add_cron(&mut self, cron: CommandHandle<dyn Command>) {
        self.crons.push(cron);
    }
}

#[rocket::async_trait]
impl Fairing for CronScheduler {
    //...
                        v -- error is here
    async fn on_liftoff(&self, _rocket: &Rocket<Orbit>) {
        let sched = SchedulerBuilder::build().await;

        for handle in self.crons.iter() {
            let job = Job::new_cron_job_async(handle.schedule.as_str(), |_uid, _lock| {
                Box::pin(async move {
                    handle.command.run().await;
                })
            })
            .unwrap();

            sched.add(job).await.unwrap();
        }

        sched.start().await.unwrap();
    }
}

AAA和我得到了这个错误:

error[E0759]: `self` has lifetime `'life0` but it needs to satisfy a `'static` lifetime requirement
  --> src/core/fairings/cron_scheduler.rs:34:26
   |
34 |       async fn on_liftoff(&self, rocket: &Rocket<Orbit>) {
   |                            ^^^^ this data with lifetime `'life0`...
...
39 | /                 Box::pin(async move {
40 | |                     handle.command.run().await;
41 | |                 })
   | |__________________- ...is used and required to live as long as `'static` here
   |
note: `'static` lifetime requirement introduced by the return type
  --> src/core/fairings/cron_scheduler.rs:34:5
   |
34 |       async fn on_liftoff(&self, rocket: &Rocket<Orbit>) {
   |       ^^^^^ requirement introduced by this return type
...
39 | /                 Box::pin(async move {
40 | |                     handle.command.run().await;
41 | |                 })
   | |__________________- because of this returned expression

我的货架也缩短了:

[dependencies]
rocket = {version = "0.5.0-rc.2", features = ["json"]}
// ...
tokio-cron-scheduler = {version = "0.8.1", features = ["signal"]}
// ...

我try 了不同的解决方案,并指出这就是导致问题的命令,就好像我将"Box::Pin(...)"的内容替换为类似println!的内容一样,没有任何问题.

我不知道这是异步性和火箭异步性之间的冲突,还是其他,但我搞不清楚.

编辑1:缩短了大量代码,因为罚单太长了.

编辑2:由于经过验证的答案,找到了解决方案;如果可以帮助任何人,这里是最终的代码补丁.

FTR,我不得不自己实现Clone(不使用宏),并使用固定答案的代码作为参考.

// command 

pub struct CommandHandle<T: Command + ?Sized + Send + Sync> {
    pub command: Arc<T>,
    pub schedule: String,
}

impl<T> Clone for CommandHandle<T>
where
    T: Command + ?Sized + Send + Sync,
{
    fn clone(&self) -> Self {
        Self {
            command: self.command.clone(),
            schedule: self.schedule.clone(),
        }
    }
}

// fairing 

    async fn on_liftoff(&self, _rocket: &Rocket<Orbit>) {
        let sched = SchedulerBuilder::build().await;

        for handle in self.crons.iter() {
            let schedule = handle.schedule.clone();

            let handle = handle.clone();

            let job = Job::new_cron_job_async(schedule.as_str(), move |_uid, _lock| {
                let handle = handle.clone();

                Box::pin(async move {
                    handle.command.run().await.unwrap();
                })
            })
            .unwrap();

            sched.add(job).await.unwrap();
        }

        sched.start().await.unwrap();
    }

推荐答案

Job::new_cron_job_async要求您的闭包是'static,但并非如此,因为handle是对self.crons的引用.

快速查看一下您的 struct ,如果您在CommandHandle中使用Arc而不是Box,那么它很容易被克隆,因此可以为cron作业(job)提供'static句柄:

pub struct CommandHandle<T: Command + ?Sized + Send + Sync> {
    pub command: Arc<T>, // <------------
    pub schedule: String,
}

impl Clone for CommandHandle ...
for handle in self.crons.iter() {
    let handle = handle.clone(); // <------------               vvvv
    let job = Job::new_cron_job_async(handle.schedule.as_str(), move |_uid, _lock| {
        let handle = handle.clone(); // <------------
        Box::pin(async move {
            handle.command.run().await;
        })
    })
    .unwrap();

    sched.add(job).await.unwrap();
}

这很难验证,因为您发布的代码是不完整的,但我相信您需要上面的两个克隆,因为函数需要所有权为'static,但也必须为FnMut才能多次调用,因此句柄不能直接移动到async块中.

Rust相关问答推荐

使用元组执行条件分支的正确方法

无法在线程之间安全地发送future (&Q;)&错误

这个规则关于或模式到底是什么意思?如果表达片段的类型与p_i|q_i...&q;不一致,就会形成

如何模拟/创建ReqData以测试Actix Web请求处理程序?

在IntoIter上调用.by_ref().Take().rev()时会发生什么情况

正在将带有盒的异步特征迁移到新的异步_fn_in_特征功能

减少指示ProgressBar在Rust中的开销

为什么我必须使用 PhantomData?在这种情况下它在做什么?

这个不安全的 Rust 代码有什么问题,所以它可以在 Windows 上运行,但不能在 Ubuntu 上运行?

为什么 js_sys Promise::new 需要 FnMut?

为什么某些类型参数仅在特征边界中使用的代码在没有 PhantomData 的情况下进行编译?

在没有任何同步的情况下以非原子方式更新由宽松原子操作 Select 的值是否安全?

Option<&T> 如何实现复制

如何基于常量在Rust中跳过一个测试

为什么Rust中无法推断生命周期?

实现AsyncWrite到hyper Sender时发生生命周期错误

在运行时在 Rust 中加载字体

切片不能被 `usize` 索引?

实现不消费的迭代器

火箭整流罩、tokio-scheduler 和 cron 的生命周期问题