我一直在try 使用气流来安排DAG.

出于上述目的,我需要设置s3连接.但airflow提供的UI并没有那么直观(http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections).有人成功地建立了s3连接吗?如果有,你们有什么最佳实践吗?

谢谢

推荐答案

编辑:此答案将您的密钥存储在plain text中,可以是security risk,不推荐使用.最好的方法是将访问密钥和密钥放入登录/密码字段,如下面的其他答案所述.

很难找到参考资料,但经过一点挖掘后,我成功了.

太长,读不下go 了

使用以下属性创建新连接:

Conn Id: my_conn_S3

Conn Type: S3

Extra:

{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}

长版本,设置UI连接:

  • 在Airflow UI上,转到管理>;连接
  • 使用以下属性创建新连接:
  • 康涅狄格州Id:my_conn_S3
  • 康涅狄格州类型:S3
  • 额外:{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
  • 将所有其他字段(主机、架构、登录)留空.

要使用此连接,您可以在下面找到一个简单的S3传感器测试.这个测试的 idea 是设置一个传感器,监视S3中的文件(T1任务),一旦满足以下条件,它就会触发一个bash命令(T2任务).

测试

  • 在运行DAG之前,确保您有一个名为"S3 bucket To Watch"的S3 bucket.
  • 添加以下s3_dag_测试.py到气流dags文件夹(~/afflow/dags)
  • airflow webserver开始.
  • 进入气流界面(http://localhost:8383/)
  • airflow scheduler开始.
  • 在主DAGs视图上打开"s3_dag_测试"dag.
  • Select "s3_dag_测试"以显示dag详细信息.
  • 在图形视图中,您应该能够看到它的当前状态.
  • "判断\u s3中的\u文件\u"任务应处于活动状态并正在运行.
  • 现在,将名为"file-to-watch-1"的文件添加到"S3 Bucket to-watch"中.
  • 第一项任务应该已经完成,第二项任务应该开始并完成.

dag定义中的调度间隔设置为"@once",以便于调试.

要再次运行该任务,请保持原样,删除存储桶中的文件,然后通过 Select 第一个任务(在图表视图中)并 Select "清除"所有"过go "、"future "、"upstream "、"下游"重试....活动这应该会再次启动DAG.

告诉我进展如何.

s3_-dag_测试.py;

"""
S3 Sensor Connection Test
"""

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor,   BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 11, 1),
    'email': ['something@here.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')

t1 = BashOperator(
    task_id='bash_test',
    bash_command='echo "hello, it should work" > s3_conn_test.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='check_s3_for_file_in_s3',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='S3-Bucket-To-Watch',
    s3_conn_id='my_conn_S3',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)

Main References:

Python-3.x相关问答推荐

Python ModuleNotFound错误,即使安装了模块

类型注释:pathlib. Path vs importlib. resources. abc. Traversable

如何使用PySide6创建切换框架?

在Python代码中包含NAN值时,以两个矩阵计算RMSE

Pandas 根据条件增加Dataframe列

使用Python抓取sofascore以获取有关球队阵容和投票的信息

torch.stack([t1, t1, t1], dim=1)与torch.hstack([t1, t1, t1])之间有什么区别?

使用Python按照其组/ID的紧密值的递增顺序映射数据框的两列

pip install saxonche v 12.1.0 产生 FileNotFoundError

我想使用命令提示符安装 cv2

将 rgb numpy 图像转换为 rgb 列表和相应的索引值

缺失时推断的数据类可选字段

过滤并获取数据框中条件之间的行

Snakemake 'run' 指令不产生错误信息

使用 selenium 加速网页抓取

聚合(aggregate)为最多包含两个元素的列表

有没有更好的方法来判断一个数字是否是两个数字的范围

TensorFlow:dataset.train.next_batch 是如何定义的?

AttributeError:LinearRegression 对象没有属性coef_

将 numpy.float64 列表快速转换为 Python 中的浮点数