我试图使用线程将三种不同数据类型的插入卸载到Postgres数据库中,但在borrow 判断器上遇到了问题.
我try 了几种不同的方法,结果我都不知道自己在做什么了.因此,我try 简化所有操作,删除了DBM struct 的所有属性,并对其进行了设置,以便每个方法都将对PgPool的引用作为参数,并 for each 方法调用创建初始PgPool的克隆.所有方法都带有&;self参数.我还试图 for each 不起作用的操作复制DBM(除了感觉非常笨拙之外).
我预计这会起作用,无论是在这里的代码示例中,还是在我使用DBM引用或克隆版本的变体中都是如此.相反,我从借入判断器收到了以下消息,虽然我理解了它,但我不确定如何最好地适应它.
76 | let aton_handle = task::spawn(dbm.insert_aton_data(
| _______________________________________^
77 | | connection_pool.clone(),
78 | | split_messages.aton_data,
79 | | &log_id,
80 | | ));
| | ^
| | |
| |_________borrowed value does not live long enough
| argument requires that `dbm` is borrowed for `'static`
...
显示违规代码的代码片段:
let connection_pool = PgPool::connect(&connection_string)
.await
.expect("Failed to connect to Postgres");
let dbm = DbMethods {};
// Make API calls etc..
if let Some(messages) = last_hour.ais_response.ais_latest_responses {
// TODO: Handle errors.
let split_messages = process_ais_items(messages).unwrap();
// TODO: Create thread for each message type and do DB inserts.
let aton_handle = task::spawn(dbm.insert_aton_data(
connection_pool.clone(),
split_messages.aton_data,
&log_id,
));
// ... other handles.
let _ = tokio::try_join!(aton_handle, static_handle, position_handle);
}
方法:
pub async fn insert_aton_data(
&self,
db_pool: PgPool,
aton_data: Vec<AISAtonData>,
log_id: &Uuid,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// let pool = PgPool::connect(&self.connection_string).await?;
let tx = db_pool.begin().await?;
for data in aton_data {
sqlx::query!(
"INSERT INTO ais.ais_aton_data (
type_field, message_type, mmsi, msgtime, dimension_a, dimension_b, dimension_c, dimension_d,
type_of_aids_to_navigation, latitude, longitude, name, type_of_electronic_fixing_device, log_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
data.type_field, data.message_type, data.mmsi, convert_to_datetime_option(data.msgtime), data.dimension_a, data.dimension_b,
data.dimension_c, data.dimension_d, data.type_of_aids_to_navigation, data.latitude,
data.longitude, data.name, data.type_of_electronic_fixing_device, log_id
).execute(&db_pool).await?;
}
tx.commit().await?;
Ok(())
}
当前解决方案
为了继续前进,我决定改变将数据库操作作为 struct 上的方法的模式,而将它们作为函数.线程现在可以工作了.希望有人能给我解释一下,我怎样才能做到把它们作为方法.
以下是我目前使用的代码,以防其他人想要做类似的事情.并不是说这是一个好主意,但使用sqlx and Postgres UNNESTING进行批量插入可能非常值得一看.
async fn insert_ais_items(connection_pool: PgPool, log_id: Uuid, last_hour: LastHourAISMessage) -> Result<(), Box<dyn Error>> {
if let Some(messages) = last_hour.ais_response.ais_latest_responses {
// TODO: Handle errors.
let split_messages = process_ais_items(messages).unwrap();
let aton_handle = task::spawn(insert_aton_data(
connection_pool.clone(),
split_messages.aton_data,
log_id.clone(),
));
let static_handle = task::spawn(insert_static_data(
connection_pool.clone(),
split_messages.static_data,
log_id.clone(),
));
let position_handle = task::spawn(insert_position_data(
connection_pool.clone(),
split_messages.position_data,
log_id.clone(),
));
let res = tokio::try_join!(aton_handle, static_handle, position_handle);
match res {
Ok(..) => {
debug!("Threads completed");
}
Err(error) => warn!("There was an error in one of the threads: {:?}", error)
}
}
Ok(())
}