在 Airflow 2.0 中运行多个 Athena 查询

2023-12-21

我正在尝试创建一个 DAG,其中一项任务执行athena查询使用boto3。它适用于一个查询,但是当我尝试运行多个 athena 查询时,我遇到了问题。

这个问题可以通过以下方式解决:-

  1. 如果一个人经过this https://www.ilkkapeltola.fi/2018/04/simple-way-to-query-amazon-athena-in.html博客,可以看出athena uses start_query_execution触发查询并get_query_execution为了得到status, queryExecutionId以及有关查询的其他数据(文档athena https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html)

遵循上述模式后,我有以下代码:-

import json
import time
import asyncio
import boto3
import logging
from airflow import DAG
from airflow.operators.python import PythonOperator


def execute_query(client, query, database, output_location):
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
        },
        ResultConfiguration={
            'OutputLocation': output_location
        }
    )

    return response['QueryExecutionId']


async def get_ids(client_athena, query, database, output_location):
    query_responses = []
    for i in range(5):
        query_responses.append(execute_query(client_athena, query, database, output_location))    

    res = await asyncio.gather(*query_responses, return_exceptions=True)

    return res

def run_athena_query(query, database, output_location, region_name, **context):
    BOTO_SESSION = boto3.Session(
        aws_access_key_id = 'YOUR_KEY',
        aws_secret_access_key = 'YOUR_ACCESS_KEY')
    client_athena = BOTO_SESSION.client('athena', region_name=region_name)

    loop = asyncio.get_event_loop()
    query_execution_ids = loop.run_until_complete(get_ids(client_athena, query, database, output_location))
    loop.close()

    repetitions = 900
    error_messages = []
    s3_uris = []

    while repetitions > 0 and len(query_execution_ids) > 0:
        repetitions = repetitions - 1
        
        query_response_list = client_athena.batch_get_query_execution(
            QueryExecutionIds=query_execution_ids)['QueryExecutions']
      
        for query_response in query_response_list:
            if 'QueryExecution' in query_response and \
                    'Status' in query_response['QueryExecution'] and \
                    'State' in query_response['QueryExecution']['Status']:
                state = query_response['QueryExecution']['Status']['State']

                if state in ['FAILED', 'CANCELLED']:
                    error_reason = query_response['QueryExecution']['Status']['StateChangeReason']
                    error_message = 'Final state of Athena job is {}, query_execution_id is {}. Error: {}'.format(
                            state, query_execution_id, error_message
                        )
                    error_messages.append(error_message)
                    query_execution_ids.remove(query_response['QueryExecutionId'])
                
                elif state == 'SUCCEEDED':
                    result_location = query_response['QueryExecution']['ResultConfiguration']['OutputLocation']
                    s3_uris.append(result_location)
                    query_execution_ids.remove(query_response['QueryExecutionId'])
                 
                    
        time.sleep(2)
    
    logging.exception(error_messages)
    return s3_uris


DEFAULT_ARGS = {
    'owner': 'ubuntu',
    'depends_on_past': True,
    'start_date': datetime(2021, 6, 8),
    'retries': 0,
    'concurrency': 2
}

with DAG('resync_job_dag', default_args=DEFAULT_ARGS, schedule_interval=None) as dag:

    ATHENA_QUERY = PythonOperator(
        task_id='athena_query',
        python_callable=run_athena_query,
        provide_context=True,
        op_kwargs={
            'query': 'SELECT request_timestamp FROM "sampledb"."elb_logs" limit 10;', # query provide in athena tutorial
            'database':'sampledb',
            'output_location':'YOUR_BUCKET',
            'region_name':'YOUR_REGION'
        }
    )

    ATHENA_QUERY

运行上面的代码时,我收到以下错误:-

[2021-06-16 20:34:52,981] {taskinstance.py:1455} ERROR - An asyncio.Future, a coroutine or an awaitable is required
Traceback (most recent call last):
  File "/home/ubuntu/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/ubuntu/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/ubuntu/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/ubuntu/venv/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/home/ubuntu/venv/lib/python3.6/site-packages/airflow/operators/python.py", line 128, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/ubuntu/iac-airflow/dags/helper/tasks.py", line 93, in run_athena_query
    query_execution_ids = loop.run_until_complete(get_ids(client_athena, query, database, output_location))
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "/home/ubuntu/iac-airflow/dags/helper/tasks.py", line 79, in get_ids
    res = await asyncio.gather(*query_responses, return_exceptions=True)
  File "/usr/lib/python3.6/asyncio/tasks.py", line 602, in gather
    fut = ensure_future(arg, loop=loop)
  File "/usr/lib/python3.6/asyncio/tasks.py", line 526, in ensure_future
    raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
TypeError: An asyncio.Future, a coroutine or an awaitable is required

我无法找到我出错的地方。希望对这个问题有一些提示


我认为你在这里所做的事情并不是真正需要的。 您的问题是:

  1. 并行执行多个查询。
  2. 能够康复queryExecutionId每个查询。

这两个问题都可以通过使用简单地解决AWSAthenaOperator。接线员已经为您处理了您提到的一切。

Example:

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator


with DAG(
    dag_id="athena",
    schedule_interval='@daily',
    start_date=days_ago(1),
    catchup=False,
) as dag:

    start_op = DummyOperator(task_id="start_task")
    query_list = ["SELECT 1;", "SELECT 2;" "SELECT 3;"]

    for i, sql in enumerate(query_list):
        run_query = AWSAthenaOperator(
            task_id=f'run_query_{i}',
            query=sql,
            output_location='s3://my-bucket/my-path/',
            database='my_database'
        )
        start_op >> query_op

只需添加更多查询即可动态创建 Athena 任务query_list:

请注意,QueryExecutionId is 推送到xcom https://github.com/apache/airflow/blob/6236e7e205f3dcff3b1f003b7897ba3f549454be/airflow/providers/amazon/aws/operators/athena.py#L123因此,如果需要,您可以在下游任务中访问 。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Airflow 2.0 中运行多个 Athena 查询 的相关文章

  • Pygame读取MIDI输入

    我参考了Pygame MIDI 文档 https www pygame org docs ref midi html and 这段代码 https stackoverflow com questions 62983509 pygame mi
  • 将 pandas 数据框中的列减去其第一个值

    我需要将 pandas 数据帧的一列中的所有元素减去其第一个值 在这段代码中 pandas 抱怨 self inferred type 我猜这是循环引用 df Time df Time df Time 0 在这段代码中 pandas 抱怨为
  • 在 Python 中使用 XPath 和 LXML

    我有一个 python 脚本 用于解析 XML 并将某些感兴趣的元素导出到 csv 文件中 我现在尝试更改脚本以允许根据条件过滤 XML 文件 等效的 XPath 查询将是 DC Events Confirmation contains T
  • 替换字符串列表中的 \x00 的最佳方法?

    我有一个来自已解析 PE 文件的值列表 其中包括 x00每个部分末尾的空字节 我希望能够删除 x00字符串中的字节而不删除所有字节 x 文件中的 s 我试过做 replace and re sub 但并没有取得太大成功 使用Python 2
  • 类属性在功能上依赖于其他类属性

    我正在尝试使用静态类属性来定义另一个静态类属性 我认为可以通过以下代码来实现 f lambda s s 1 class A foo foo bar f A foo 然而 这导致NameError name A is not defined
  • Python 中 genfromtxt() 的可变列数?

    我有一个 txt具有不同长度的行的文件 每一行都是代表一条轨迹的一系列点 由于每条轨迹都有自己的长度 因此各行的长度都不同 也就是说 列数从一行到另一行不同 据我所知 genfromtxt Python 中的模块要求列数相同 gt gt g
  • 使用正则表达式解析 Snort 警报文件

    我正在尝试使用 Python 中的正则表达式从 snort 警报文件中解析出源 目标 IP 和端口 和时间戳 示例如下 03 09 14 10 43 323717 1 2008015 9 ET MALWARE User Agent Win9
  • 行为:如何从另一个文件导入步骤?

    我刚刚开始使用behave http pythonhosted org behave 一个Pythonic BDD框架 使用小黄瓜语法 http docs behat org guides 1 gherkin html 行为需要一个特征 例
  • Pandas:根据列名进行列的成对乘法

    我有以下数据框 gt gt gt df pd DataFrame ap1 X 1 2 3 4 as1 X 1 2 3 4 ap2 X 2 2 2 2 as2 X 3 3 3 3 gt gt gt df ap1 X as1 X ap2 X a
  • Python While 循环,and (&) 运算符不起作用

    我正在努力寻找最大公因数 我写了一个糟糕的 运算密集型 算法 它将较低的值减一 使用 检查它是否均匀地划分了分子和分母 如果是 则退出程序 但是 我的 while 循环没有使用 and 运算符 因此一旦分子可整除 它就会停止 即使它不是正确
  • 我可以使用 dask 创建 multivariate_normal 矩阵吗?

    有点相关这个帖子 https stackoverflow com questions 52337612 random multivariate normal on a dask array 我正在尝试复制multivariate norma
  • 使用循环将对象添加到列表(python)

    我正在尝试使用 while 循环将对象添加到列表中 基本上这就是我想做的 class x pass choice raw input pick what you want to do while choice 0 if choice 1 E
  • 使用 python 将文本发送到带有逗号分隔符的列

    如何使用分隔符 在 Excel 中将一列分成两列 并使用 python 命名标题 这是我的代码 import openpyxl w openpyxl load workbook DDdata xlsx active w active a a
  • 负整数的Python表示

    gt gt gt x 4 gt gt gt print b format x x 4 100 gt gt gt mask 0xFFFFFFFF gt gt gt print b format x mask x mask 4294967292
  • ValueError:无法插入 ID,已存在

    我有这个数据 ID TIME 1 2 1 4 1 2 2 3 我想按以下方式对数据进行分组ID并计算每组的平均时间和规模 ID MEAN TIME COUNT 1 2 67 3 2 3 00 1 如果我运行此代码 则会收到错误 ValueE
  • 使用 lambda 函数更改属性值

    我可以使用 lambda 函数循环遍历类对象列表并更改属性值 对于所有对象或满足特定条件的对象 吗 class Student object def init self name age self name name self age ag
  • 是否可以写一个负的python类型注释

    这可能听起来不合理 但现在我需要否定类型注释 我的意思是这样的 an int Not Iterable a string Iterable 这是因为我为一个函数编写了一个重载 而 mypy 不理解我 我的功能看起来像这样 overload
  • CSV 在列中查找最大值并附加新数据

    大约两个小时前 我问了一个关于从网站读取和写入数据的问题 从那时起 我花了最后两个小时试图找到一种方法来从输出的 A 列读取最大日期值 将该值与刷新的网站数据进行比较 并将任何新数据附加到 csv 文件而不覆盖旧的或创建重复项 目前 100
  • 将 Scikit-Learn OneHotEncoder 与 Pandas DataFrame 结合使用

    我正在尝试使用 Scikit Learn 的 OneHotEncoder 将 Pandas DataFrame 中包含字符串的列替换为 one hot 编码的等效项 我的下面的代码不起作用 from sklearn preprocessin
  • 从时间序列生成日期特征

    我有一个数据框 其中包含如下列 Date temp data holiday day 01 01 2000 10000 0 1 02 01 2000 0 1 2 03 01 2000 2000 0 3 30 01 2000 200 0 30

随机推荐