我已经使用apache-beam编写了一个小管道,它使用Beam-postgres作为输入连接器,从一个DB表创建一个PCollection. 代码如下所示-

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from psycopg.rows import dict_row
from beam_postgres.io import ReadAllFromPostgres
def __trigger_bill_fetch_job(self):
        print("Triggering bill-fetch job")            
        pipeline = beam.Pipeline()   
       
        read_from_db = ReadAllFromPostgres(
            "host={host} dbname={dbName} user={user} password={password}",
            "SELECT * FROM comparison_bill_data_requests WHERE status='PENDING' AND bill_event_received=true and bill_detail_event_received=true",
            dict_row,
        )

        result = pipeline | "ReadPendingRecordsFromDB" >> read_from_db | "Print result" >> beam.Map(print)

        pipeline.run().wait_until_finish()          
        print("read_from_db done", read_from_db)

IF beam.Map(print)的输出为

{'id': '1', 'bill_id': 'bill-1', 'account_id': None, 'bill_event_received': True, 'bill_detail_event_received': True, 'status': 'PENDING', 'commodity_type': None, 'bill_start_date': datetime.datetime(2023, 12, 6, 11, 52, 28, 78945), 'bill_end_date': datetime.datetime(2023, 12, 6, 11, 52, 28, 78945), 'tenant_id': 'tenant-1', 'created_at': datetime.datetime(2023, 12, 6, 11, 52, 28, 78945), 'updated_at': datetime.datetime(2023, 12, 6, 11, 53, 21, 300224)}

我如何才能将其解析为对象?我想把这个传递到管道中的下一步,并想访问它的属性.我该怎么做呢?

推荐答案

根据print的结果,beam_postgres包在输入连接器中从Postgres返回PCollection,即Dict.

您可以在beam.Map中调用一个函数,并在PCollection:

from typing import Dict

import apache_beam as beam
from beam_postgres.io import ReadAllFromPostgres
from psycopg.rows import dict_row

def __trigger_bill_fetch_job(self):
    print("Triggering bill-fetch job")
    pipeline = beam.Pipeline()

    read_from_db = ReadAllFromPostgres(
        "host={host} dbname={dbName} user={user} password={password}",
        "SELECT * FROM comparison_bill_data_requests WHERE status='PENDING' AND bill_event_received=true and bill_detail_event_received=true",
        dict_row,
    )

    result = pipeline | "ReadPendingRecordsFromDB" >> read_from_db | "Print result" >> beam.Map(to_fields)

    pipeline.run().wait_until_finish()
    print("read_from_db done", read_from_db)


def to_fields(row: Dict) -> Dict:
    field1 = row["field1"]
    field2 = row["field2"]

    return row

在本例中,我从beam.Map操作符调用了to_fields方法. 在此方法中,我们可以访问PCollection中的当前元素,即Dict,获取所需的字段并在结果中应用转换.

Python相关问答推荐

实现的差异取决于计算出的表达是直接返回还是首先存储在变量中然后返回

Image Font生成带有条形码Code 128的条形码时出现枕头错误OSErsor:无法打开资源

分组数据并删除重复数据

对某些列的总数进行民意调查,但不单独列出每列

如何避免Chained when/then分配中的Mypy不兼容类型警告?

C#使用程序从Python中执行Exec文件

OR—Tools中CP—SAT求解器的IntVar设置值

实现神经网络代码时的TypeError

Polars asof在下一个可用日期加入

在Python中使用if else或使用regex将二进制数据如111转换为001""

从源代码显示不同的输出(机器学习)(Python)

如果有2个或3个,则从pandas列中删除空格

如何过滤组s最大和最小行使用`transform`'

Pandas 数据帧中的枚举,不能在枚举列上执行GROUP BY吗?

提取数组每行的非零元素

Python—在嵌套列表中添加相同索引的元素,然后计算平均值

为罕见情况下的回退None值键入

有了Gekko,可以创建子模型或将模型合并在一起吗?

类型对象';敌人';没有属性';损害';

Django更新视图未更新