我试图使用线程将三种不同数据类型的插入卸载到Postgres数据库中,但在borrow 判断器上遇到了问题.

我try 了几种不同的方法,结果我都不知道自己在做什么了.因此,我try 简化所有操作,删除了DBM struct 的所有属性,并对其进行了设置,以便每个方法都将对PgPool的引用作为参数,并 for each 方法调用创建初始PgPool的克隆.所有方法都带有&self参数.我还试图 for each 不起作用的操作复制DBM(除了感觉非常笨拙之外).

我预计这会起作用,无论是在这里的代码示例中,还是在我使用DBM引用或克隆版本的变体中都是如此.相反,我从借入判断器收到了以下消息,虽然我理解了它,但我不确定如何最好地适应它.

76 |           let aton_handle = task::spawn(dbm.insert_aton_data(
   |  _______________________________________^
77 | |             connection_pool.clone(),
78 | |             split_messages.aton_data,
79 | |             &log_id,
80 | |         ));
   | |         ^
   | |         |
   | |_________borrowed value does not live long enough
   |           argument requires that `dbm` is borrowed for `'static`
...

显示违规代码的代码片段:

let connection_pool = PgPool::connect(&connection_string)
        .await
        .expect("Failed to connect to Postgres");
    let dbm = DbMethods {};

    // Make API calls etc..


    if let Some(messages) = last_hour.ais_response.ais_latest_responses {
        // TODO: Handle errors.
        let split_messages = process_ais_items(messages).unwrap();

        // TODO: Create thread for each message type and do DB inserts.
        let aton_handle = task::spawn(dbm.insert_aton_data(
            connection_pool.clone(),
            split_messages.aton_data,
            &log_id,
        ));
        // ... other handles.

        let _ = tokio::try_join!(aton_handle, static_handle, position_handle);
    }

方法:

pub async fn insert_aton_data(
        &self,
        db_pool: PgPool,
        aton_data: Vec<AISAtonData>,
        log_id: &Uuid,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        // let pool = PgPool::connect(&self.connection_string).await?;
        let tx = db_pool.begin().await?;

        for data in aton_data {
            sqlx::query!(
                "INSERT INTO ais.ais_aton_data (
                    type_field, message_type, mmsi, msgtime, dimension_a, dimension_b, dimension_c, dimension_d,
                    type_of_aids_to_navigation, latitude, longitude, name, type_of_electronic_fixing_device, log_id
                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
                data.type_field, data.message_type, data.mmsi, convert_to_datetime_option(data.msgtime), data.dimension_a, data.dimension_b,
                data.dimension_c, data.dimension_d, data.type_of_aids_to_navigation, data.latitude,
                data.longitude, data.name, data.type_of_electronic_fixing_device, log_id
            ).execute(&db_pool).await?;
        }
        tx.commit().await?;

        Ok(())
    }

当前解决方案

为了继续前进,我决定改变将数据库操作作为 struct 上的方法的模式,而将它们作为函数.线程现在可以工作了.希望有人能给我解释一下,我怎样才能做到把它们作为方法.

以下是我目前使用的代码,以防其他人想要做类似的事情.并不是说这是一个好主意,但使用sqlx and Postgres UNNESTING进行批量插入可能非常值得一看.

async fn insert_ais_items(connection_pool: PgPool, log_id: Uuid, last_hour: LastHourAISMessage) -> Result<(), Box<dyn Error>> {
    if let Some(messages) = last_hour.ais_response.ais_latest_responses {
        // TODO: Handle errors.
        let split_messages = process_ais_items(messages).unwrap();

        let aton_handle = task::spawn(insert_aton_data(
            connection_pool.clone(),
            split_messages.aton_data,
            log_id.clone(),
        ));
        let static_handle = task::spawn(insert_static_data(
            connection_pool.clone(),
            split_messages.static_data,
            log_id.clone(),
        ));
        let position_handle = task::spawn(insert_position_data(
            connection_pool.clone(),
            split_messages.position_data,
            log_id.clone(),
        ));

        let res = tokio::try_join!(aton_handle, static_handle, position_handle);
        match res {
            Ok(..) => {
                debug!("Threads completed");
            }
            Err(error) => warn!("There was an error in one of the threads: {:?}", error)
        }
    }

    Ok(())
}

推荐答案

如果您想要的只是方法语法,那么您可以通过让编译器执行constant promotion操作来绕过编译器错误,只需在您声明的dbm处添加一个借入:

let dbm = &Dbm {};

或者对Dbm执行Copy,并将接收器从&self更改为self.

但我真的建议不要为了使用方法而使用它,相反,您可以将函数分组到一个模块中,并使用模块名称调用它们,将其从.更改为:::

mod dbm {
    pub async fn insert_aton_data(
        db_pool: PgPool,
        aton_data: Vec<AISAtonData>,
        log_id: Uuid,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 
        //…
    }
}

// and later call it like
let handle = task::spawn(dbm::insert_aton_data(
    connection_pool.clone(),
    split_messages.aton_data,
    log_id.clone(),
));

注意:我也从&Uuid更改为Uuid,因为这可能会导致相同的问题

另请注意:如果将Dbm声明为struct Dbm;,则在创建它的实例时不必添加{}.

Postgresql相关问答推荐

Org.postgresql.util.PSQLException:错误:函数LOWER(BYTEA)不存在

无法在kubernetes中设置postgres复制

如何将 jackc/pgx 与连接池、上下文、准备好的语句等一起使用

无法将 json 范围读取为 pgtype.Int4range

Npgsql中准备好的语句和参数化查询有什么区别?

postgres 如何计算多列哈希?

Postgres 转储按顺序插入

sql:转换参数 $1 类型:不支持的类型 []int,in 的一部分

PostgreSQL 查询到 Excel 表

Windows PSQL 命令行: is there a way to allow for passwordless login?

Postgresql - 在 Big Data 库中使用数组的性能

PostgreSQL 提示:You will need to rewrite or cast the expression. column "state" is of type status but expression is of type character varying

MAC OS X 上的 Postgres 权限被拒绝

Docker - Postgres 和 pgAdmin 4:Connection refused

无法使用 sequelize 从本地 node 应用程序连接到 heroku postgresql 数据库

Postgis 中 2 点之间的距离,单位为 4326 米

在同一分区上应用多个窗口函数

如何正确索引多对多关联表?

带有 WITH 子句的查询时出现 Postgresmissing FROM-clause entry错误

PostgreSQL 无法启动:server.key具有组或世界访问权限