我经常需要从Microsoft SQL服务器获取大量数据,以便在Rust中使用Polar进行操作,根据企业安全策略,我或多或少都被迫使用ODBC进行这些连接.ODBC要求限制我使用像ConnectorX这样成熟且功能强大的库.我可以使用ARROW_ODBC将查询结果从Arrow连接并高效地读取到RecordBatch对象中,但无法将这些RecordBatch对象转换为Polars DataFrame.

因为RecordBatchSeries的实际数据组件具有相同的底层表示形式,所以我认为可以从RecordBatch的零拷贝创建DataFrame.

然而,在columns.push(Series::from_arrow(&schema.fields().get(i).unwrap().name(), *column)?);中,我得到了错误:

mismatched types
expected struct `std::boxed::Box<(dyn polars::export::polars_arrow::array::Array + 'static)>`
   found struct `Arc<dyn arrow::array::Array>`

我的印象是,Arc<dyn Array>ArrayRef,是真正的问题,也许我有一个Arc<dyn arrow::array::Array>Series::from_arrow()期待一个北极星Arc<Array>?如果是这样,我该如何解决?

我的完整代码如下以供参考.

use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
use arrow::record_batch::RecordBatch;
use polars::prelude::*;
use anyhow::Result;

const CONNECTION_STRING: &str = "...";

pub fn test() -> Result<()> {

    let odbc_environment = Environment::new()?;
     
    let connection = odbc_environment.connect_with_connection_string(
        CONNECTION_STRING,
        ConnectionOptions::default()
    )?;

    let cursor = connection.execute("SELECT * FROM Backcast_Power_Plant_Map", ())?.unwrap();

    let arrow_record_batches = OdbcReaderBuilder::new().build(cursor)?;

    fn record_batch_to_dataframe(batch: &RecordBatch) -> Result<DataFrame, PolarsError> {
        let schema = batch.schema();
        let mut columns = Vec::with_capacity(batch.num_columns());
        for (i, column) in batch.columns().iter().enumerate() {
            columns.push(Series::from_arrow(&schema.fields().get(i).unwrap().name(), *column)?);
        }
        Ok(DataFrame::from_iter(columns))
    }

    for batch in arrow_record_batches {
        dbg!(record_batch_to_dataframe(&batch?));
    }

    Ok(())
}

推荐答案

看起来polarsarrow-odbc使用不同的箭箱:polars使用polars-arrowarrow-odbc使用arrow.前者的数组类型是Box<dyn polars_arrow::array::Array>,而后者的类型是ArrayRef,这是Arc<dyn arrow::array::Array>的别名.

对我们来说幸运的是,polars-arrow箱中有一个兼容层.您可以通过From个Ims在两种类型(或更多类型)之间进行转换:

use anyhow::Result;
use arrow::record_batch::RecordBatch;
use arrow_odbc::{
    odbc_api::{ConnectionOptions, Environment},
    OdbcReaderBuilder,
};
use polars::prelude::*;

const CONNECTION_STRING: &str = "...";

pub fn test() -> Result<()> {
    let odbc_environment = Environment::new()?;

    let connection = odbc_environment
        .connect_with_connection_string(CONNECTION_STRING, ConnectionOptions::default())?;

    let cursor = connection
        .execute("SELECT * FROM Backcast_Power_Plant_Map", ())?
        .unwrap();

    let arrow_record_batches = OdbcReaderBuilder::new().build(cursor)?;

    fn record_batch_to_dataframe(batch: &RecordBatch) -> Result<DataFrame, PolarsError> {
        let schema = batch.schema();
        let mut columns = Vec::with_capacity(batch.num_columns());
        for (i, column) in batch.columns().iter().enumerate() {
            let arrow = Box::<dyn polars_arrow::array::Array>::from(&**column);
            columns.push(Series::from_arrow(
                &schema.fields().get(i).unwrap().name(),
                arrow,
            )?);
        }
        Ok(DataFrame::from_iter(columns))
    }

    for batch in arrow_record_batches {
        dbg!(record_batch_to_dataframe(&batch?));
    }

    Ok(())
}

请注意,这需要具有arrow_rs功能的polars-arrow作为依赖项.

据我所知,这并没有复制实际数据.

Rust相关问答推荐

关联类型(类型参数)命名约定

使用Clap时如何将String作为Into Str参数传递?

闭包不会发送,即使它只捕获发送变量

使用极点数据帧时,找不到枚举结果的方法lazy()

什么时候和为什么S最好是按值或引用传递简单类型

可以为rust构建脚本编写单元测试吗?

JSON5中的变量类型(serde)

是否提供Bundle 在可执行文件中的warp中的静态文件?

为什么我必须使用 PhantomData?在这种情况下它在做什么?

实现 Deref 的 struct 可以返回对外部数据的引用吗?

如何将一个矩阵的列分配给另一个矩阵,纳尔代数?

如何正确使用git2::Remote::push?

Rust 异步循环函数阻塞了其他future 任务的执行

在1.5n次比较中找到整数向量中的最大和次大整数

如何将 C++ 程序链接到 Rust 程序,然后将该 Rust 程序链接回 C++ 程序? (cpp -> rust -> cpp)

如何在 Rust 中将枚举变体转换为 u8?

如何在 Emacs Elisp 中获得类似格式化的 LSP?

为什么可以在迭代器引用上调用 into_iter?

没有通用参数的通用返回

为什么 `ref` 会导致此示例*取消引用*一个字段?