我已经使用apache-beam编写了一个小管道,它使用Beam-postgres作为输入连接器,从一个DB表创建一个PCollection. 代码如下所示-
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from psycopg.rows import dict_row
from beam_postgres.io import ReadAllFromPostgres
def __trigger_bill_fetch_job(self):
print("Triggering bill-fetch job")
pipeline = beam.Pipeline()
read_from_db = ReadAllFromPostgres(
"host={host} dbname={dbName} user={user} password={password}",
"SELECT * FROM comparison_bill_data_requests WHERE status='PENDING' AND bill_event_received=true and bill_detail_event_received=true",
dict_row,
)
result = pipeline | "ReadPendingRecordsFromDB" >> read_from_db | "Print result" >> beam.Map(print)
pipeline.run().wait_until_finish()
print("read_from_db done", read_from_db)
IF beam.Map(print)
的输出为
{'id': '1', 'bill_id': 'bill-1', 'account_id': None, 'bill_event_received': True, 'bill_detail_event_received': True, 'status': 'PENDING', 'commodity_type': None, 'bill_start_date': datetime.datetime(2023, 12, 6, 11, 52, 28, 78945), 'bill_end_date': datetime.datetime(2023, 12, 6, 11, 52, 28, 78945), 'tenant_id': 'tenant-1', 'created_at': datetime.datetime(2023, 12, 6, 11, 52, 28, 78945), 'updated_at': datetime.datetime(2023, 12, 6, 11, 53, 21, 300224)}
我如何才能将其解析为对象?我想把这个传递到管道中的下一步,并想访问它的属性.我该怎么做呢?