使用 InProcessPipelineRunner 执行时,PubsubReader 失败并出现 NullPointerException

2023-12-29

我有简单的管道,仅执行读取 PubsubIO.Read.subscription 。在消耗大约 200 个元素后,每次运行都会失败,但有以下例外:

[error] (run-main-0) java.lang.RuntimeException: java.lang.NullPointerException
java.lang.RuntimeException: java.lang.NullPointerException
    at  com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:281)
    at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:69)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181)
    at com.sandbox.WriteLogsToBQ.main(WriteLogsToBQ.java:296)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.lang.NullPointerException
    at com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$PubsubReader.ackBatch(PubsubUnboundedSource.java:612)
    at com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:297)
    at com.google.cloud.dataflow.sdk.runners.inprocess.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.startReader(UnboundedReadEvaluatorFactory.java:203)
    at com.google.cloud.dataflow.sdk.runners.inprocess.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishBundle(UnboundedReadEvaluatorFactory.java:172)
    at com.google.cloud.dataflow.sdk.runners.inprocess.TransformExecutor.finishBundle(TransformExecutor.java:163)
    at com.google.cloud.dataflow.sdk.runners.inprocess.TransformExecutor.run(TransformExecutor.java:119)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我使用的是 SDK 版本 1.9.0。 1.6.1 中没有发生这种情况(成功获取 >10k 元素)。 有谁知道解决方法吗?我还注意到 1.9.0 的获取速度比 1.6.1 快得多。对于 1.6.1,它似乎使用 10 个元素的批次。


使用阅读表格主题进行测试:

p.apply(PubsubIO.Read.named("reading_topic_test").topic("projects/***/topics/SL_LogLogin"))

订阅是自动生成的,但随后管道失败,并显示:

Jan 05, 2017 2:06:33 PM com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource apply
WARNING: Created subscription null to topic NestedValueProvider{value=NestedValueProvider{value=StaticValueProvider{value=projects/***/topics/SL_LogLogin}}}. 
Note this subscription WILL NOT be deleted when the pipeline terminates
[error] (run-main-0) com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder$PopulateDisplayDataException: 
Error while populating display data for component:com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$StatsFn
com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder$PopulateDisplayDataException: 
Error while populating display data for component: com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$StatsFn
    at    com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:664)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:643)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:637)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.populateDisplayData(ParDo.java:1266)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.access$200(ParDo.java:457)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$Bound.populateDisplayData(ParDo.java:816)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:657)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:643)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:637)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.forRoot(DisplayData.java:630)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.access$000(DisplayData.java:617)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData.from(DisplayData.java:76)
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.evaluateDisplayData(DisplayDataValidator.java:47)
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.access$100(DisplayDataValidator.java:29)
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator$Visitor.visitTransform(DisplayDataValidator.java:62)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:103)
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:260)
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.validateTransforms(DisplayDataValidator.java:43)
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.validatePipeline(DisplayDataValidator.java:35)
    at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:245)
    at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:69)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181)
    at com.sandbox.WriteLogsToBQ.main(WriteLogsToBQ.java:303)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.lang.NullPointerException: Input display value cannot be null
    at com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:228)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.addItemIf(DisplayData.java:707)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.add(DisplayData.java:685)
    at com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource$StatsFn.populateDisplayData(PubsubUnboundedSource.java:1147)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:657)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:643)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:637)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.populateDisplayData(ParDo.java:1266)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.access$200(ParDo.java:457)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$Bound.populateDisplayData(ParDo.java:816)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:657)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:643)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:637)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.forRoot(DisplayData.java:630)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData$InternalBuilder.access$000(DisplayData.java:617)
    at com.google.cloud.dataflow.sdk.transforms.display.DisplayData.from(DisplayData.java:76)
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.evaluateDisplayData(DisplayDataValidator.java:47)
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.access$100(DisplayDataValidator.java:29)
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator$Visitor.visitTransform(DisplayDataValidator.java:62)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:103)
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:260)
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.validateTransforms(DisplayDataValidator.java:43)
    at com.google.cloud.dataflow.sdk.runners.inprocess.DisplayDataValidator.validatePipeline(DisplayDataValidator.java:35)
    at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:245)
    at com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.run(InProcessPipelineRunner.java:69)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181)
    at com.sandbox.WriteLogsToBQ.main(WriteLogsToBQ.java:303)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)

这是由https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/547 https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/547。对于造成的麻烦,我们深表歉意。

根本原因是 incubator-beam 和 GoogleDataflowSDK 之间的向后移植错误,该错误无法初始化在按作业订阅的情况下返回的订阅。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 InProcessPipelineRunner 执行时,PubsubReader 失败并出现 NullPointerException 的相关文章

随机推荐

  • Heroku 和网页抓取

    我有一个 nokigiri 网络抓取工具 它发布到我正在尝试发布到 heroku 的数据库 我有一个 sinatra 应用程序前端 我想从数据库中提取它 我是 Heroku 和 Web 开发的新手 不知道处理此类问题的最佳方法 我是否必须将
  • 如果没有预处理步骤,Qt 的可用性如何?

    我认为库要求使用特殊工具对我的源代码进行预处理是不合理的 也就是说 有几个人向我推荐了 Qt 库来进行跨平台 GUI 开发 如果没有预处理步骤 Qt 的可用性如何 编辑 好吧 大家 我并不是说这个问题是对 Qt 的攻击 太多的 Qt 粉丝都
  • 将 php 异常重新抛出到更高级别的 catch 块中

    我试图将异常从特定的 catch 块传递到更通用的 catch 块 但它似乎不起作用 当我尝试以下操作时 出现 500 服务器错误 这可能吗 我意识到有一些简单的解决方法 但是说 嘿 我不想处理这个错误 让我们让更通用的异常处理程序来处理它
  • 使用 awk 打印从零开始的行号

    谁能告诉我如何使用 awk 打印包括零的行号 这是我的输入文件 当我运行下面的 awk 命令时我得到 awk print NR 0 stackfile2 txt tr gt actual output txt 而我的预期输出是 如何打印以零
  • JavaScript 在 Thymeleaf 和 Spring Boot 中无法按预期工作

    JavaScript 在 thymeleaf 中不起作用 在 Spring Boot Thymeleaf 中 首先可以打开模态 但第二 第三 东西无法打开模态 每个事物都有类名 但只有第一个事物可以打开模态 我认为 JavaScript 只
  • 将实例方法作为函数指针传递给 C 库

    我正在编写一个使用 C 库的 Objective C 应用程序 我目前面临的问题是 C 库有一个结构 其中某些字段是函数指针 稍后用作回调 如何将 Objective C 实例方法转换为函数指针并将其传递给库 您需要在 Objective
  • 从列表中创建随机对

    我正在尝试创建一个程序来打印列表中的元素对 我需要创建一个字典 一开始是空的 可以在其中存储值 循环遍历列表以形成一对并确保没有重复项 当我在列表中循环时 我需要获取一个随机数 然后可以使用它来删除元素 使用 pop 方法从列表中删除随机选
  • 在服务器端验证 recapcha

    我在我们的一个项目中使用 Google 验证码 我在aspx页面中添加了recaptcha控件 现在我想验证在 recaptcha 中输入的值是否正确 我如何在按钮单击事件中完成它 我正在使用c if Page IsValid Will b
  • 获取 Android 上触摸事件的坐标

    我是 Android 新手 我已经完成了 hello world 教程 并且对正在发生的事情有了基本的了解 我对 T Mobile Pulse 的触摸屏特别感兴趣 所以为了让我开始 我希望能够在屏幕上写下 tocuh 事件的坐标 假设用户触
  • 如果有多个单词,则提取字符串中逗号后的最后一个单词,否则提取第一个单词

    我有数据 其中文字如下 location lt c xyz sss New Zealand USA Pris France id lt c 1 2 3 df lt data frame location id 我想从数据中提取国家名称 棘手
  • 如何避免 WPF 中的 Window 小于 UserControl 的最小尺寸?

    我有一个用户控件 状态栏 它具有隐式最小尺寸 不是通过属性设置 我的意思是 当它达到最小尺寸时 它不能减小并且会被裁剪 有没有办法让主窗口知道 UserControl 将被裁剪并且不允许它减小其大小 对于 WPF 这样的智能布局系统 它必须
  • 批次:后面带星号的百分比是多少?

    我在批处理文件中有这一行 Test exe 我在谷歌搜索 找到了这个解释 某些 DOS 版本使用此符号和百分号 来表示批处理文件中命令行上的所有参数 但我还是不明白这个命令怎么用 谢谢你 这用于将您传递到批处理文件的参数转发到另一个应用程序
  • 带背景图像的 JTextArea 的内部填充

    我的最终目标是拥有一个JTextArea带有背景图像 我在网上找到了代码 向我展示了如何执行此操作 但现在我遇到了图像顶部文本的问题 这就是我的意思 有什么方法可以添加一种向内缩进 以便文本不与图像边缘重叠 这是原始评论气泡图像 这是代码
  • 从Jquery中的hover()获取悬停单词?

    我想根据鼠标悬停在其上的单词进行自动翻译 我用 p hover function var hoveredWord this text translate hoveredWord en function to translate a word
  • checkbox数组返回nodejs中最后检查的值,而不是整个数组

    我试图通过 req body 从复选框中获取选中的值 如果我只检查一个就没有问题 并且在 req body 上有一个具有该值的对象 如果我检查多个 那么它会返回我连续检查的最后一个 我使用express ejs 和express json
  • 如何获得对 HTA 打开的对话框的 IE9 标准支持?

    我正在将一些旧的 HTA 从 Quirks 转换为 IE9 标准 但是 HTA 打开的模态和非模态对话框似乎不支持 IE9 标准模式 测试对话框
  • Phonegap如何获取应用程序内的应用程序名称?

    当我在手机上打开应用程序时 我想使用phonegap 获取应用程序的名称 并将该名称传递给网址 有没有办法获取应用程序名称 目前 我在应用程序打开时执行此操作 但我需要将应用程序的名称传递到 URL 的末尾 location href ht
  • Android File.delete 不起作用

    我尝试将图像文件保存到 SD 卡后删除它 但删除功能不起作用 任何帮助将不胜感激 这是我的代码 Save image to SD card String path Environment getExternalStorageDirector
  • JXL 数字格式和单元格类型

    我正在使用 JXL 编写 Excel 文件 客户希望某一列显示保留一位小数的数字 他们还希望细胞类型为 数字 当我使用以下 测试 代码时 数字显示正确 但单元格类型为 自定义 File excelFile new File C Users
  • 使用 InProcessPipelineRunner 执行时,PubsubReader 失败并出现 NullPointerException

    我有简单的管道 仅执行读取 PubsubIO Read subscription 在消耗大约 200 个元素后 每次运行都会失败 但有以下例外 error run main 0 java lang RuntimeException java