编辑:这个答案将您的密钥存储在纯文本这可以是一个安全风险并且不推荐。最好的方法是将访问密钥和秘密密钥放在登录/密码字段中,如下面其他答案中所述。
结束编辑
很难找到参考资料,但经过一番挖掘后,我能够使其发挥作用。
TLDR
创建具有以下属性的新连接:
Conn Id:my_conn_s3
康涅狄格州类型: S3
Extra:
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
长版本,设置 UI 连接:
- 在 Airflow UI 上,转到管理 > 连接
- 创建具有以下属性的新连接:
- 康涅狄格州 ID:
my_conn_S3
- 康涅狄格州类型:
S3
- Extra:
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
- 将所有其他字段(主机、架构、登录)保留为空。
要使用此连接,您可以在下面找到一个简单的 S3 传感器测试。此测试的想法是设置一个传感器来监视 S3 中的文件(T1 任务),一旦满足以下条件,它就会触发 bash 命令(T2 任务)。
Testing
- 在运行 DAG 之前,请确保您有一个名为“S3-Bucket-To-Watch”的 S3 存储桶。
- 将以下 s3_dag_test.py 添加到气流 dags 文件夹(~/airflow/dags)
- Start
airflow webserver
.
- 转到 Airflow UI (http://localhost:8383/)
- Start
airflow scheduler
.
- 在主 DAG 视图上打开“s3_dag_test”DAG。
- 选择“s3_dag_test”以显示 dag 详细信息。
- 在图表视图上,您应该能够看到它的当前状态。
- “check_s3_for_file_in_s3”任务应处于活动状态并正在运行。
- 现在,将名为“file-to-watch-1”的文件添加到“S3-Bucket-To-Watch”中。
- 第一个任务应该已经完成,第二个任务应该开始并完成。
dag 定义中的schedule_interval 设置为“@once”,以方便调试。
要再次运行它,请保持所有内容不变,删除存储桶中的文件,然后通过选择第一个任务(在图形视图中)并选择“清除”所有“过去”、“未来”、“上游”、“下游”来重试.... 活动。这应该会再次启动 DAG。
让我知道进展如何。
s3_dag_test.py;
"""
S3 Sensor Connection Test
"""
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 1),
'email': ['[email protected] /cdn-cgi/l/email-protection'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')
t1 = BashOperator(
task_id='bash_test',
bash_command='echo "hello, it should work" > s3_conn_test.txt',
dag=dag)
sensor = S3KeySensor(
task_id='check_s3_for_file_in_s3',
bucket_key='file-to-watch-*',
wildcard_match=True,
bucket_name='S3-Bucket-To-Watch',
s3_conn_id='my_conn_S3',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)
Main References:
- https://gitter.im/apache/incubator-airflow https://gitter.im/apache/incubator-airflow
- https://groups.google.com/forum/#!topic/airbnb_airflow/TXsJNOBBfig https://groups.google.com/forum/#!topic/airbnb_airflow/TXsJNOBBfig
- https://github.com/apache/incubator-airflow https://github.com/apache/incubator-airflow