我需要

1. run a select query on MYSQL DB and fetch the records.              
2. Records are processed by python script.

我不确定该怎么做.xcom是go 这里的路吗?此外,MYSQLOperator只执行查询,不获取记录.有没有我可以使用的内置传输操作符?我如何在这里使用MYSQL钩子?

你可能想使用一个PythonOperator,它使用钩子来获取数据,

有人能解释一下如何处理同样的问题吗.

Refer - 100

def do_work():
    mysqlserver = MySqlHook(connection_id)
    sql = "SELECT * from table where col > 100 "
    row_count = mysqlserver.get_records(sql, schema='testdb')
    print row_count[0][0]

callMYSQLHook = PythonOperator(
    task_id='fetch_from_testdb',
    python_callable=mysqlHook,
    dag=dag
)

这是正确的方法吗?

t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)

推荐答案

当然,只需创建一个钩子或操作符并调用get_records()方法:https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/hooks/dbapi.html

Python-3.x相关问答推荐

Django 5.0.2和django_rest_framework

S的两极是什么,相当于大Pandas 的`.ilo‘方法?

PANDAS中当前数据帧的匹配与更新

如何查找以开头并替换的字符串

如何将多输入数据加载器传递给单输入模型

使用 NaN 计算 pct_change 时如何避免 bfill 或 ffill

为什么 return node.next 会返回整个链表?

导入在不同目录中定义的函数

为什么最简单的流光示例会出错?

列表中的重复数字与列表理解

类型提示和链式赋值以及多重赋值

日志(log)模块不适用于 Python3

multiprocessing.Queue 中的 ctx 参数

'~'(波浪号)运算符在 Python 中的应用

如何从同一文件夹中的模块导入功能?

Python中调用者函数的访问变量

用于 unicode 大写单词的 Python 正则表达式

Python - 类 __hash__ 方法和集合

python - Pandas - Dataframe.set_index - 如何保留旧的索引列

使用 python 3.0 的 Numpy