apache beam.io.BigQuerySource use_standard_sql 作为数据流运行程序运行时不起作用

2024-03-30

我有一个数据流作业,我将首先从 bigquery 查询中读取(在标准 sql 中)。它在直接运行模式下完美运行。但是,我尝试在数据流运行程序模式下运行此数据流并遇到此错误:

响应: , content

显然 use_standard_sql 参数在数据流运行程序模式下不起作用。 版本: 阿帕奇光束:2.24.0 蟒蛇:3.8

last_update_date = pipeline | 'Read last update date' >> beam.io.Read(beam.io.BigQuerySource(
    query='''
        SELECT
            MAX(date) AS date
        FROM
            GoogleSearchConsole.search_query_analytics_log
    ''',
    use_standard_sql=True
))

尝试以下从 Bigquery 读取数据并将数据写入 Bigquery 的代码。 代码是 apache beam 数据流运行程序代码:-

#------------Import Lib-----------------------#
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import apache_beam as beam, os, sys, argparse, logging
from apache_beam.options.pipeline_options import SetupOptions

#------------Set up BQ parameters-----------------------#
# Replace with Project Id
project = 'xxxxx'
#plitting Of Records----------------------#

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument(
          '--cur_suffix',
          dest='cur_suffix',
          help='Input table suffix to process.')
    known_args, pipeline_args = parser.parse_known_args(argv)


    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    p1 = beam.Pipeline(options=pipeline_options)


    logging.info('***********')
    logging.info(known_args.cur_suffix)
    data_loading = (
        p1
        | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(query='''SELECT SUBSTR(_time, 1, 19) as _time, dest FROM `project.dataset.table`''', use_standard_sql=True))
    )

    project_id = "xxxxxxx"
    dataset_id = 'AAAAAAAA'
    table_schema_Audit = ('_time:DATETIME, dest:STRING')

#---------------------Type = audit----------------------------------------------------------------------------------------------------------------------
    result = (
    data_loading
        | 'Write-Audit' >> beam.io.WriteToBigQuery(
                                                    table='YYYYYYY',
                                                    dataset=dataset_id,
                                                    project=project_id,
                                                    schema=table_schema_Audit,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
                                                    ))



    result = p1.run()
    result.wait_until_finish()


if __name__ == '__main__':
  path_service_account = 'ABGFDfc927.json'
  os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account
  run()

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

apache beam.io.BigQuerySource use_standard_sql 作为数据流运行程序运行时不起作用 的相关文章

  • bigquery 允许的表数量是否有限制

    BigQuery 中可以拥有的表数量有限制吗 我正在尝试创建多个小表以减少查询成本 谢谢 表的数量没有限制 由于查询字符串的长度有 10k 的限制 因此您可能会在查询所有这些内容时遇到问题
  • 在 Cloud Dataflow 中进行 ETL 和解析 CSV 文件

    我是云数据流和 Java 的新手 所以我希望这是正确的问题 我有一个 csv 文件 其中有 n 个列和行 可以是字符串 整数或时间戳 我需要为每一列创建一个新的 PCollection 吗 我在示例中找到的大多数文档都类似于 PCollec
  • 如何在 Google BigQuery 中创建临时表

    有没有办法通过以下方式在 Google BigQuery 中创建临时表 SELECT INTO
  • 如何在 BigQuery 中构建“星级”报告(或迷你图或颜色渐变)

    假设我有以下示例输入 WITH Ratings AS SELECT A name 2 score UNION ALL SELECT B name 0 score UNION ALL SELECT C name 5 score UNION A
  • BigQuery更新如何获取更新的行数

    我正在使用 Google Cloud Functions 连接到 Google Bigquery 数据库并更新一些行 云函数是使用Python 3编写的 当我通过函数运行更新 dml 时 我需要帮助弄清楚如何获取结果消息或更新 更改的行数
  • 如何在 python apache beam 中展平多个 Pcollection

    应该如何实现位于以下位置的以下逻辑 https beam apache org documentation pipelines design your pipeline https beam apache org documentation
  • bigquery 中的条件连接

    我有两张桌子 表 1 是单列整数 表 2 有三列 start integer end integer data 简单的查询是将整数列与数据连接起来 其中 integer gt start integer AND integer lt end
  • Bigquery - json_array 来自字段的额外多个元素

    我的表有一个 JSON 字段 如下所示 每个条目中可以有任意数量的评论 entry 1234 comment 6789 seconds 1614864327 nanoseconds 606000000 message hello world
  • 将带有变量的循环转换为 BigQuery SQL

    我有数千个脚本 其中包括循环数据集并使用变量进行累积 例如 assuming that ids is populated from some BQ table ids 1 2 3 4 5 var1 v1 initialize variabl
  • 写入 BigQuery 时处理卡住

    我正在使用云数据流将数据从 Pub Sub 消息导入到 BigQuery 表 我正在使用 DynamicDestinations 因为这些消息可以放入不同的表中 我最近注意到该进程开始消耗所有资源 并且消息表明该进程被卡住开始显示 Proc
  • 如何获取有权访问bigquery中的表的所有用户/组/服务帐户

    from pprint import pprint from google oauth2 import service account import googleapiclient discovery credentials service
  • BigQuery Crashlytics - 无崩溃的用户/会话

    我已将 firebase crashlytics 数据链接到 bigquery 并设置 google 提供的数据工作室模板 除了我的仪表板所需的最重要的指标之外 还有很多重要的数据 无崩溃用户 and 无崩溃会话以百分比表示 在我可以用来计
  • 压缩保存在Google云存储中的文件

    是否可以压缩已保存在 Google 云存储中的文件 这些文件由 Google 数据流代码创建和填充 数据流无法写入压缩文件 但我的要求是将其保存为压缩格式 标准 TextIO Sink 不支持写入压缩文件 因为从压缩文件中读取的可扩展性较差
  • Google BigQuery:检索每行的最后版本

    我有一个 Google BigQuery 表 其中包含所有版本的资源 每次创建 更新 删除资源时 都会添加一个新行 并递增版本号 该数字将是添加行时的时间戳 ID ResourceID Action Count Timestamp ABC
  • BigQuery 数据类型

    我正在开始一个新项目 只是想在定义表模式之前进行验证 BigQuery 是否支持以下以外的其他功能 string integer float boolean BigQuery 数据类型官方文档 https cloud google com
  • 可以在不填充数据的情况下创建 BigQuery 表/架构吗?

    是否可以在不先用数据填充的情况下创建表模式 最好使用 Google 的 python 客户端 谷歌的文档似乎没有提供明确的是或否的答案 他们建议创建一个表 https cloud google com bigquery docs table
  • 从 ISO 周中提取日期 (201905) BigQuery

    我需要从 ISO 周数中提取星期日的日期 即 201905 它需要位于 standardSQL 中 因为它将使用不支持旧版本的脚本进行调度 我尝试从 Google Sheets 调整工作公式 但无法弄清楚 Google Sheets 的原始
  • GCP BigQuery如何通过python api设置表的到期日期

    我正在使用 BigQuery Python API 创建表 并且想为该表设置一个到期日期 以便该表会在特定天数后自动删除 这是我的代码 client bq Client job config bq QueryJobConfig datase
  • bigquery DataFlow 错误:在 EU 中读写时无法在不同位置读写

    我有一个简单的 Google DataFlow 任务 它从 BigQuery 表中读取数据并写入另一个表 如下所示 p beam io Read beam io BigQuerySource query select dia import
  • Bigquery 中数组对之间的余弦相似度

    我创建了一个表 其中有一对 ID 和每个 ID 的坐标 以便我可以计算它们之间的成对余弦相似度 The table looks like this 坐标的维度数当前为 128 但可能会有所不同 但同一个表中一对 ID 的数字维度始终相同 c

随机推荐

  • 如何提高@patch和MagicMock语句的可读性和可维护性(避免长名称和字符串标识)?

    在我的测试代码中 我有很多样板表达式 Magic return 我还有很长的字符串来标识要模拟的函数的路径 重构期间不会自动替换字符串 我更愿意直接使用导入的函数 示例代码 from mock import patch MagicMock
  • 如何在远程存储库上运行 hg recovery 命令

    在 teamcity 中运行构建时出现以下错误 Failed to collect changes error C Program Files TortoiseHg hg exe config ui interactive False pu
  • 在 cakephp 中分配布局

    我们可以在该特定控制器中为整个控制器定义一个布局吗 我之前已经在应用程序控制器的过滤器之前用于此目的 但它不再解决它 所以我需要在控制器中应该有一些适用于的布局定义该控制器的所有操作 Regards use it 在你的行动中 this g
  • JavaScript - 对象字面量的优点

    我读过 我应该使用对象文字 而不是简单地编写一堆函数 对象字面量有什么优点 有例子吗 正如 Russ Cam 所说 您可以避免污染全局命名空间 这在当今组合来自多个位置 TinyMCE 等 的脚本时非常重要 正如 Alex Sexton 所
  • 如何使用 WebApplicationFactory 覆盖 Autofac 容器中的服务

    我正在使用 WebApplicationFactory 编写一些集成测试 我使用 Autofac 作为我的依赖解析器 在我的测试中 我试图覆盖其中一项注册 以便我可以模拟其中一项依赖项 使用aspnetcore默认的ConfigureSer
  • 如何将html5画布保存到服务器

    我将一些图像加载到我的画布上 然后在加载后我想单击一个按钮将该画布图像保存到我的服务器上 我可以看到脚本工作正常 直到它到达 toDataURL 部分并且我的函数停止执行 我究竟做错了什么 这是我的代码
  • Android View 背景意外变化

    我正在构建一个具有大量屏幕的应用程序 大多数屏幕的顶部都有一个带有背景颜色的视图 我经常使用 view setBackgroundColor color 更改颜色 奇怪的事情来了 有时在设置一个视图的颜色后 例如 f14fb7 在应用程序中
  • 将阿拉伯数字转换为英语

    我正在寻找一种将阿拉伯数字字符串 转换为英语的方法 数字字符串 0123456789 Private Sub Button1 Click ByVal sender As System Object ByVal e As System Eve
  • 如何将多个局部变量传递给嵌套部分

    这应该是非常简单且有据可查的 我已经这样做了好几次了 尽管有些事情仍然让我很烦恼 我有一个调用嵌套部分的部分结构 在某个时刻一render调用需要将额外的变量传递给部分 尽管部分的渲染失败并显示 undefined local variab
  • Swing 菜单 Java 7 mac osx

    我一直在 mac os x 上测试我的 Swing 应用程序 它在小程序上运行 当我在浏览器中运行此小程序时 我注意到 JMenus JMenuItems 上的鼠标悬停无法正常工作 这是一个重现该问题的小程序 package com mac
  • 如何在 Sublime Text 中使用控制台

    我正在使用 Sublime Text 2 来编写程序 并希望在其中运行控制台来编译和运行它们 有没有办法在 Sublime Text 2 中嵌入控制台命令行 已经在那里了吗 我同时使用 Windows 和 Linux 我想你可以尝试创建一个
  • 推送事件不会触发推送路径上的工作流程

    我目前正在测试 GitHub Actions 工作流程这个存储库 https github com GuillaumeFalourd poc github actions 我正在尝试使用这个工作流程 https github com Gui
  • 禁止 (#403) - 你不能执行此操作 [Yii2]

    我尝试添加菜单map在后端 我用yii2 advanced 这是我的 控制器 代码 public function actionMap return this gt render map 但是 当我尝试使用此网址访问它时http local
  • opencv中如何根据深度颜色分割连通区域

    I have a picture like which i need to segment the picture into 8 blocks 我尝试过这种阈值方法 img gray cv2 imread input file cv2 IM
  • 如何获得欧米茄(n)

    我有公式 a n n a n 1 1 a 0 0 如果没有主定理 我怎样才能从中得到 Omega Theta 或 O 表示法 或者有人有一个很好的网站来理解解释 马斯特定理甚至不适用 所以不能使用它并不是太大的限制 此处有效的方法是猜测上限
  • 在 R 中:计算精确率/召回率曲线下的面积 (AUPR)?

    假设我有两个矩阵 A代表标签矩阵 B代表A对应的预测概率矩阵 现在我想根据矩阵A和B计算AUPR 精确率 召回率曲线下的面积 对于常见的AUC Area Under Precision Recall Curve ROC Curve R中有很
  • 将多个预制件分配给一个只允许添加一个的脚本

    我有一个脚本 它使用 LeanTween 将对象 预制 放入预制路径上 效果很好 其工作原理是 您可以将一个对象分配给附加有 Moveable 脚本的 路径添加器 MoveController 但是 我需要能够将运行时创建的新预制件添加到
  • 使用 printf 在 c 中 fork() [重复]

    这个问题在这里已经有答案了 有 2 个不同的程序 它们都很小 例如 int main printf print hello fork int main printf print hello n fork 输出 1 是 print hello
  • 根据列删除配置单元中的重复行

    我有一个包含 10 列的 HIVE 表 其中前 9 列将有重复的行 而第 10 列则不会 因为 CREATE DATE 将具有创建日期 例子 如果我今天在表中插入 10 行 它将具有 CREATE DATE 作为今天的日期 如果我明天再次插
  • apache beam.io.BigQuerySource use_standard_sql 作为数据流运行程序运行时不起作用

    我有一个数据流作业 我将首先从 bigquery 查询中读取 在标准 sql 中 它在直接运行模式下完美运行 但是 我尝试在数据流运行程序模式下运行此数据流并遇到此错误 响应 content 显然 use standard sql 参数在数