我一直在try 使用气流来安排DAG.
出于上述目的,我需要设置s3连接.但airflow提供的UI并没有那么直观(http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections).有人成功地建立了s3连接吗?如果有,你们有什么最佳实践吗?
谢谢
我一直在try 使用气流来安排DAG.
出于上述目的,我需要设置s3连接.但airflow提供的UI并没有那么直观(http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections).有人成功地建立了s3连接吗?如果有,你们有什么最佳实践吗?
谢谢
编辑:此答案将您的密钥存储在plain text中,可以是security risk,不推荐使用.最好的方法是将访问密钥和密钥放入登录/密码字段,如下面的其他答案所述.
很难找到参考资料,但经过一点挖掘后,我成功了.
使用以下属性创建新连接:
Conn Id: my_conn_S3
Conn Type: S3
Extra:
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
my_conn_S3
S3
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
要使用此连接,您可以在下面找到一个简单的S3传感器测试.这个测试的 idea 是设置一个传感器,监视S3中的文件(T1任务),一旦满足以下条件,它就会触发一个bash命令(T2任务).
airflow webserver
开始.airflow scheduler
开始.dag定义中的调度间隔设置为"@once",以便于调试.
要再次运行该任务,请保持原样,删除存储桶中的文件,然后通过 Select 第一个任务(在图表视图中)并 Select "清除"所有"过go "、"future "、"upstream "、"下游"重试....活动这应该会再次启动DAG.
告诉我进展如何.
"""
S3 Sensor Connection Test
"""
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 1),
'email': ['something@here.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')
t1 = BashOperator(
task_id='bash_test',
bash_command='echo "hello, it should work" > s3_conn_test.txt',
dag=dag)
sensor = S3KeySensor(
task_id='check_s3_for_file_in_s3',
bucket_key='file-to-watch-*',
wildcard_match=True,
bucket_name='S3-Bucket-To-Watch',
s3_conn_id='my_conn_S3',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)