我想把postgres表读入数据工程管道的数据框中.我用气流来安排这些任务.我在Airflow中创建了一个名为postgres_product_db的连接,并try 使用get_pandas_df来获取记录.

db_hook = PostgresHook('postgres_product_db')
fetch_item = db_hook.get_pandas_df(request)

但它把错误当作

UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 3: ordinal not in range(128)

完整错误日志(log):

 [2022-04-05, 06:46:27 UTC] {taskinstance.py:1774} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 174, in execute
        return_value = self.execute_callable()
      File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 188, in execute_callable
        return self.python_callable(*self.op_args, **self.op_kwargs)
      File "/home/azureuser/airflow/dags/foodstar_store1_pricing_update_program.py", line 81, in fetch_inventory
        inventory = db_hook.get_pandas_df(request)
      File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/hooks/dbapi.py", line 138, in get_pandas_df
        return psql.read_sql(sql, con=conn, params=parameters, **kwargs)
      File "/home/azureuser/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 566, in read_sql
        return pandas_sql.read_query(
      File "/home/azureuser/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 2094, in read_query
        data = self._fetchall_as_list(cursor)
      File "/home/azureuser/.local/lib/python3.8/site-packages/pandas/io/sql.py", line 2108, in _fetchall_as_list
        result = cur.fetchall()
    UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 3: ordinal not in range(128)

在正常情况下,为了克服这个问题,我设置了编码

conn=psycopg2.connect(
database="xxxxx", user='xxxxx', password='xxxx', host='xxxx.xxx.xxx.xx.xxx', port='5432')
conn.set_client_encoding('UNICODE')
cur=conn.cursor()

但找不到任何选项来设置Postgreshake中的客户端_编码.连接中有一个额外选项,我试图将其设置为{ encode: 'UNICODE' }.但这也带来了错误.有人能帮忙吗?

推荐答案

client_encoding等于run time config.

db_hook = PostgresHook('postgres_product_db')
sql="SET client_encoding = 'UTF8'; SELECT col FROM my_table "
fetch_item = db_hook.get_pandas_df(sql=sql)

你没有问过,但是如果PostgresOperator适用,那么用法可以是:

from airflow.providers.postgres.operators.postgres import PostgresOperator

op = PostgresOperator(
    task_id="my_task",
    sql=sql,
    runtime_parameters={'set_client_encoding': 'UNICODE'},
)

这项功能是在PR中添加的,适用于apache-airflow-providers-postgres>=4.1.0

Sql相关问答推荐

Postgresql:从jsons数组到单个id索引的json

SQL:如何在表中同时使用GROUPING和CONDITION?

正在编写查询.我需要将订阅的时间段分为第一个订阅中包含的另一个订阅之前和之后的时间段

在查询Oracle SQL中创建替代ID

合并分层表SQL中的第一个非空、变化的空位置

MS Access问题查询中的自定义字段

从类似JSON的字符串列创建新列

postgres中的条件索引和触发器

如何使用聚合连接两个表

配置单元查询失败:无法识别';附近的输入;LEFT'';(select子句中的';';col'

Postgresql - 如何根据阈值计算累积和

用户定义的标量值函数是否仍然会阻止并行性?

获取所有用户的第一次和最后一次发货以及到达日期

使用row_number() over partition by保留首次出现且值不为空的行的方法

在自引用表中使用分组和计数的SQL查询语句

使用日期和间隔作为键加入 Athena 上的表?

SQLite 中的过滤运行总和视图

在 SQL 的每行选项中 Select 最大值

为什么 Oracle 在一个查询中对同一张表同时执行 TABLE SCAN 和 INDEX UNIQUE SCAN?

在 sql 中合并系列以删除重复项