数据流进入 Beam Pipeline 时的附加参数

2024-02-05

我正在研究 Dataflow,我已经通过 Python SDK 构建了自定义管道。 我想将数据流 UI 上的参数添加到我的自定义管道中。 使用附加参数。参考者https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#staticvalue https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#staticvalue

然后我就改变了add_argument to add_value_provider_argument遵循谷歌文档

class CustomParams(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):        
        parser.add_value_provider_argument(
            "--input_topic",
            type = str,
        )
        parser.add_value_provider_argument(
            "--window_size",
            type = int,
            default = 5,
        )

def run():
    pipeline_options = PipelineOptions(pipeline_args, .....)
    custom_param = pipeline_options.view_as(CustomParams)
    .....
    pipeline | "Read PubSub Message" >> beam.io.ReadFromPubSub(custom_param.input_topic)

之后,我尝试制作 GCP 模板。上传脚本看起来像

  python custom_pipeline.py \
    --runner DataflowRunner \
    --project YOUR_PROJECT_ID \
    --staging_location gs://YOUR_BUCKET_NAME/staging \
    --temp_location gs://YOUR_BUCKET_NAME/temp \
    --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME

但是当我创建上传到 GCS 的模板时出现错误,如下所示

TypeError: expected string or bytes-like object

在线上beam.io.ReadFromPubSub()

它看起来像我得到的东西add_value_provider_argument Is 运行时值提供者目的。所以我很困惑我必须做什么才能解决这个问题?

我尝试解决这个问题,例如

转换数据类型

beam.io.ReadFromPubSub(str(custom_param.input_topic))

但出现这个错误,

ValueError: PubSub topic must be in the form "projects/<project>/topics/<topic>" (got "RuntimeValueProvider(option: input_topic, type: str, default_value: '...')").

那么请有人对此进行故障排除吗?我不知道该怎么办。


正如@mk_sta 提到的

ReadFromPubSub 模块似乎不接受 ValueProvider。你检查过这个堆栈线程吗?

并解释说thread https://stackoverflow.com/questions/58838705/usage-problem-add-value-provider-argument-on-a-streaming-stream-apache-beam-p/58869194#58869194, ReadFromPubSub does not currently accept ValueProvider arguments since it is implemented as a native transform in Dataflow.

你可以检查接受运行时参数的 I/O 方法 https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#pipeline-io-and-runtime-parameters为了ValueProvider不同 SDK 的支持。

那么此时如果从Python SDK切换到Java SDK,Read https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L588PubSubIO 确实支持 ValueProvider。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

数据流进入 Beam Pipeline 时的附加参数 的相关文章

  • 如何修复 TypeError: G 必须是 'd' 矩阵?

    目标 尝试通过优化过程运行玩具数据集 我遇到以下错误 TypeError Traceback most recent call last
  • 如何使用 BeautifulSoup 从表中选择特定行?

    So I have a question related to a previous question but I realized I needed to go one level more to get an 11 digit NDC
  • 如何从下面的html中提取数据?

    我想要从中提取数据的 Html 是 div class infoMessageInner p span class ng binding Fiber r best lld till adressen Tj nsterna kan du be
  • bs4 `next_sibling` VS `find_next_sibling`

    我在使用时遇到困难next sibling 并且类似地与next element 如果用作属性 我不会得到任何返回 但如果用作find next sibling or find next 然后就可以了 来自doc https www cru
  • 使用 python requests 模块时出现 HTTP 503 错误

    我正在尝试发出 HTTP 请求 但当前可以从 Firefox 浏览器访问的网站响应 503 错误 代码本身非常简单 在网上搜索一番后我添加了user Agent请求参数 但也没有帮助 有人能解释一下如何消除这个 503 错误吗 顺便说一句
  • 当变量取特定值时如何使 PyCharm 中断?

    我有一本大字典 其中一些元素偶尔会出现非法值 我想弄清楚非法值从何而来 PyCharm 应该不断监视我的字典的值 一旦它们中的任何一个取了非法值 它就应该中断并让我检查程序的状态 我知道我可以通过为我的字典创建一个 getter sette
  • FTP 下载冻结整个应用程序

    我正在尝试从 FTP 服务器下载一个大约 100 MB 的文件 这是一个测试 bin 文件 因为我正在测试该应用程序 我猜我将来想要下载的文件会更重 当我想下载文件时 整个应用程序就会冻结 几秒钟后它就会下载文件 该文件已完成 并且已成功下
  • Python排序算法[重复]

    这个问题在这里已经有答案了 我在Python中实现了不同的排序算法 以更好地理解它们 我想知道Python的内置排序方法实现什么类型的排序 这是一个叫做Timsort http en wikipedia org wiki Timsort由
  • pygame.error:文件不是 Windows BMP 文件(问题的延续)

    我最近开始使用 Mac 进行编码 因此 我必须从以前的计算机 Windows 中移走所有文件 长话短说 在发生此错误之前一切都很好 pygame error File is not a Windows BMP file 我检查了 Stack
  • 在Python中,如何将矩阵逆时针旋转90度?

    gt gt gt def rotate matrix k List List int For example if I have m 1 2 3 2 3 3 5 4 3 rotate matrix m should give me 3 3
  • 在数据流模板中调用 waitUntilFinish() 后可以运行代码吗?

    我有一个批处理 Apache Beam 作业 它从 GCS 获取文件作为输入 我的目标是根据执行后管道的状态将文件移动到两个 GCS 存储桶之一 如果管道执行成功 则将文件移动到存储桶 A 否则 如果管道在执行过程中出现任何未处理的异常 则
  • 有没有办法在 Python 3 中子类化生成器?

    除了显而易见的事情之外 我想我应该尝试一下 以防万一 def somegen input None yield gentype type somegen class subgen gentype def best function ever
  • 如何使用 Plotly 中的直方图将所有离群值分入一个分箱?

    所以问题是 我可以在 Plotly 中绘制直方图 其中所有大于某个阈值的值都将被分组到一个箱中吗 所需的输出 但使用标准情节Histogram类我只能得到这个输出 import pandas as pd from plotly import
  • 在flatpak项目中使用scrapy脚本

    我正在构建一个 flatpak 构建的项目 我有一个按钮 当单击它时我希望它运行 scrapy 脚本来抓取数据 窗口用户界面
  • Python 将日志滚动到变量

    我有一个使用多线程并在服务器后台运行的应用程序 为了无需登录服务器即可监控应用程序 我决定包括Bottle http bottlepy org为了响应一些HTTP端点并报告状态 执行远程关闭等 我还想添加一种查阅日志文件的方法 我可以使用以
  • 如何在 Python 中将 pdf 文件附加到 MIME 电子邮件?

    我正在制作一个自动邮件发送程序 Python 3 6 1 用于电子邮件营销 我在附加 PDF 文件时遇到问题 邮件中的 PDF 文件的文件名和页数是正确的 但 PDF 文件始终为空白 并且其大小增加 我尝试了三种不同的方法 其他两种方法不起
  • Sublime Text 插件开发中的全局 Python 包

    一 总结 我不知道 Sublime Text 插件开发人员如何使用 Sublime Text 查找全局 Python 包 而不是 Sublime Text 目录的 Python 包 Sublime Text使用自己的Python环境 而不是
  • 如何在Python中同时运行两只乌龟?

    我试图让两只乌龟一起移动 而不是一只接着另一只移动 例如 a turtle Turtle b turtle Turtle a forward 100 b forward 100 但这只能让他们一前一后地移动 有没有办法让它们同时移动 有没有
  • 小组芹菜链任务

    shared task def process record x return 1 2 4 4 5 6 shared task def add pro id return pro id 10 shared task def dmap it
  • pytesseract 无法从图像中识别复杂的数学公式

    我在用pytesseractpython 中的模块 pytesseract从图像中识别文本 但它不适用于包含复杂数学公式 例如根 推导 积分数学问题或方程 的图像 代码2 py Import modules from PIL import

随机推荐

  • 将x和y轴添加到所有facet_wrap

    通常希望尽量减少绘图中的墨水 我有一个多面情节 facet wrap 并希望去除尽可能多的墨水但保持可读性 我已经按照我的意愿进行了设置 除了 x 轴和 y 轴不存在于面 子图 中 除非位于最左侧或底部 去除了这么多墨水后 我相信眼睛需要这
  • 重载解析算法中如何确定歧义?

    我试图理解重载解析方法 为什么这是模棱两可的 void func double int int double void func int double double double void main func 1 2 3 4 但这不是吗 v
  • XCode 警告:“/* 在块注释内”

    我真的很喜欢通过这样注释来临时启用和禁用代码部分 some code 注意 代替 在最后 然而 XCode 不断向我发出警告 within block comment 有没有办法 自定义禁用 特定警告 为什么 我会告诉你为什么 因为我可以轻
  • 如何将 std::max_element 用于结构

    我想用std max element对于结构体VAR T基于id元素 但我无法将起始点和停止点与该函数的输入链接起来 typedef struct VAR int id char b 16 VAR int a 0 strcpy b VAR
  • 如何从警报框中获取文本?

    我需要从警报框中获取文本 我没有足够的声誉来上传图像 所以我上传代码而不是图像 有没有办法使用 Greasemonkey 在 Chrome 上 从弹出窗口 获取文本 查询不清楚 但是如果我理解正确的话 页面上有一个 JavaScript 会
  • JRHtmlExporter 现已弃用。如何定义图片保存路径?

    The JRHtml导出器类现在已弃用 JasperReports 6 x 我将这个类的用法替换为Html导出器 但我找不到等效的函数来替换exporter setParameter JRHtmlExporterParameter IMAG
  • Summernote 图像上传和替代方案不起作用

    我在我的网站上使用 Summernote 编辑器 并使用其网站上提到的 Click2edit 方法实现它here http www usrtriton nl assets bower summernote example html 然而 如
  • 在背景中绘图

    我有一个 IOS 应用程序 需要更新视图以响应用户或外部事件 绘制时间可以很短也可以很长 几秒 具体取决于视图中的内容 现在 绘图是在视图的 drawRect 方法中进行的 当绘图很长且存在大量用户交互时 应用程序将变得无响应 当需要更新时
  • 运行 liquibase 时出现值“CONTINUE”不是构面有效错误

    当我尝试运行以下命令时 java jar liquibase 3 4 1 jar classpath postgresql 9 2 1004 jdbc4 jar logLevel severe url jdbc postgresql loc
  • 是否可以更改 Postgres 中列的自然顺序?

    是否可以更改 Postgres 8 1 中列的自然顺序 我知道你不应该依赖列顺序 它不是基本的到我正在做的事情 我只需要它使一些自动生成的东西以更令人愉悦的方式出现 以便字段顺序从 pgadmin 通过后端一直到前端匹配 实际上 您可以直接
  • 复选框的表单模型绑定

    我正在使用 Laravel 4 1 在我的应用程序中我需要显示一个带有预填充复选框的表单 但我尝试使用表单模型绑定来做到这一点 但它不起作用 Form model user array route gt settings notify di
  • 如何在 Python 中将自定义类设为集合

    我有 Matlab 背景 在 matlab 中 我可以创建一个类定义 然后创建一个对象数组 我可以轻松地使用索引取消引用每个对象 此外 当我从对象数组 没有索引 调用方法时 我可以访问数组中的所有对象 例如 假设 myNewClass 具有
  • DotNetOpenAuth 可在哪些 .NET 框架上使用?

    大多数 全部 OAuth 资源 有关协议和代码库的信息 以便在您自己的应用程序中轻松使用它们 人们似乎在互联网上找到的资源似乎假设您正在使用它的应用程序是一个 Web 应用程序 不过我想开始使用 OAuth我的 Windows Mobile
  • 使用jdatabase更新数据库中的记录

    如何使用数据库更新 Joomla 3 中的记录 这是我到目前为止所拥有的 db JFactory getDBO query db gt getQuery true query gt update test AS h query gt set
  • Arrays.asList 给出 UnsupportedOperationException [重复]

    这个问题在这里已经有答案了 The List由返回Arrays asList不能通过使用诸如add or remove 但如果你把它传递给Collections sort方法 它可以毫无问题地对数组进行排序 我预计会出现异常 这似乎是一种非
  • 无法重新安装 Python 2.7 的 PyTables

    除了 2 7 之外 我还安装了 Python 2 7 当再次安装 PyTables 2 7 时 我收到此错误 发现已安装 numpy 1 5 1 软件包 错误 找不到本地 HDF5 安装 您可能需要明确说明本地 HDF5 标头和 可以通过设
  • 为什么这个构造函数无法在 Codeigniter 中加载?

  • 如何在javafx-8中setEnabled()?

    与 Swing 不同 Javafx 8 似乎没有用于 UI 控件的 setEnabled 方法 或等效方法 解决方法的建议 您可以使用setDisable 而不是 javaFx 中的 setEnabled 喜欢button setDisab
  • 如何过滤每个返回行的 json 数组?

    我有一个带有 json 字段的表 其中存储了对象数组 我想查询此表 并通过使用某些条件过滤它们 为每个返回的行仅返回 json 数组对象的子集 例如对于行 id 1 jsonColumn field abc field def field
  • 数据流进入 Beam Pipeline 时的附加参数

    我正在研究 Dataflow 我已经通过 Python SDK 构建了自定义管道 我想将数据流 UI 上的参数添加到我的自定义管道中 使用附加参数 参考者https cloud google com dataflow docs guides