将 MutationGroup 流式传输到 Spanner 中

2024-02-03

我正在尝试使用 SpannerIO 将 MutationGroups 流式传输到扳手中。 目标是每 10 秒写入新的 MuationGroup,因为我们将使用 Spanner 来查询近期 KPI。

当我不使用任何 Windows 时,出现以下错误:

Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
    at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:173)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:204)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:120)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1585)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1470)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteGrouped.expand(SpannerIO.java:868)
    at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteGrouped.expand(SpannerIO.java:823)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
    at quantum.base.transform.entity.spanner.SpannerProtoWrite.expand(SpannerProtoWrite.java:52)
    at quantum.base.transform.entity.spanner.SpannerProtoWrite.expand(SpannerProtoWrite.java:20)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline$Write$SpannerWrite.expand(EntityBuilderPipeline.java:388)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline$Write$SpannerWrite.expand(EntityBuilderPipeline.java:372)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline.main(EntityBuilderPipeline.java:122)
:entityBuilder FAILED

由于上述错误,我假设输入集合需要窗口化并触发,因为 SpannerIO 使用 GroupByKey (这也是我的用例所需要的):

        ...
        .apply("1-minute windows", Window.<MutationGroup>into(FixedWindows.of(Duration.standardMinutes(1)))
            .triggering(Repeatedly.forever(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(10))
            ).orFinally(AfterWatermark.pastEndOfWindow()))
            .discardingFiredPanes()
            .withAllowedLateness(Duration.ZERO))
        .apply(SpannerIO.write()
                    .withProjectId(entityConfig.getSpannerProject())
                    .withInstanceId(entityConfig.getSpannerInstance())
                    .withDatabaseId(entityConfig.getSpannerDb())
                    .grouped());

当我这样做时,我在运行时遇到以下异常:

java.lang.IllegalArgumentException: Attempted to get side input window for GlobalWindow from non-global WindowFn
        org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:631)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:683)
        com.google.cloud.dataflow.worker.StreamingSideInputFetcher.storeIfBlocked(StreamingSideInputFetcher.java:182)
        com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:71)
        com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:271)
        org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
        org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
        org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
        org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
        org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:145)

经过进一步调查,似乎是由于.apply(Wait.on(input))在 SpannerIO 中:它有一个全局侧输入,似乎不适用于我的固定窗口,如文档所示Wait.java state:

If signal is globally windowed, main input must also be. This typically would be useful
 *       only in a batch pipeline, because the global window of an infinite PCollection never
 *       closes, so the wait signal will never be ready.

作为临时解决方法,我尝试了以下方法:

  • 添加带有触发器的全局窗口而不是固定窗口:

        .apply("globalwindow", Window.<MutationGroup>into(new GlobalWindows())
                .triggering(Repeatedly.forever(AfterProcessingTime
                        .pastFirstElementInPane()
                        .plusDelayOf(Duration.standardSeconds(10))
                ).orFinally(AfterWatermark.pastEndOfWindow()))
                .discardingFiredPanes()
                .withAllowedLateness(Duration.ZERO))
    

    这会导致仅当我耗尽管道时才写入扳手。我的印象是Wait.on()信号仅在全局窗口关闭时触发,并且不适用于触发器。

  • 禁用.apply(Wait.on(input))在 SpannerIO 中:

    这会导致管道卡在视图创建上 在此 SO 帖子中进行了描述:SpannerIO Dataflow 2.3.0 卡在 CreateDataflowView 中 https://stackoverflow.com/questions/49273528/spannerio-dataflow-2-3-0-stuck-in-createdataflowview.

    当我检查工作日志以获取线索时,我确实收到以下警告:

    logger:  "org.apache.beam.sdk.coders.SerializableCoder"
    message:  "Can't verify serialized elements of type SpannerSchema have well defined equals method. This may produce incorrect results on some PipelineRunner
    logger:  "org.apache.beam.sdk.coders.SerializableCoder"   
    message:  "Can't verify serialized elements of type BoundedSource have well defined equals method. This may produce incorrect results on some PipelineRunner"
    

请注意,一切都适用于 DirectRunner,并且我正在尝试使用 DataflowRunner。

对于我可以尝试让它运行的事情,有人有任何其他建议吗?我很难想象我是唯一一个尝试将 MutationGroups 流式传输到扳手中的人。

提前致谢!


目前,Beam Streaming 不支持 SpannerIO 连接器。请关注这个请求请求 https://github.com/apache/beam/pull/6478它增加了对扳手 IO 连接器的流支持。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 MutationGroup 流式传输到 Spanner 中 的相关文章

随机推荐

  • 固定列标题宽度与正文列宽度不匹配

    标题与列宽不对齐 JsFiddle http jsfiddle net DyMSb 1 截屏 http s17 postimg org dybznay9b screen png 我在用着 ajax aspnetcdn com ajax jq
  • 以单下划线或双下划线开头的函数和变量

    我在各种编程语言 PHP 和 Python 中看到过以下划线开头的函数和变量 并且对其背后的含义感到困惑 假设 PHP 中使用正常约定 单下划线表示受保护的成员变量或方法 双下划线表示私有成员变量或方法 这源于当时 PHP 的 OOP 支持
  • 如何让Spring JMS从注释@JmsListener中选择目标队列名称

    任何帮助将不胜感激 我正在尝试使用 spring JMSListener 创建 MDB 的替代品 我希望将目的地名称作为注释传递 但我注意到org springframework jms listener DefaultMessageLis
  • 对象的 JVM 深度内存大小[重复]

    这个问题在这里已经有答案了 据我所知 众所周知的 Instrumentation Java 方法无法正确计算对象的深度大小 是否有可靠的方法在 JVM 上计算对象的正确深度大小 我正在考虑的用例是固定 或上限 内存大小的数据结构 即缓存 注
  • 如何显示多个 YouTube 视频而不重叠音频

    我有一个包含一些 YouTube 视频嵌入代码的页面 当用户在一个视频上单击 gt 播放 时 页面上的所有其他视频都需要暂停 否则它们的音频会与刚刚播放的新视频重叠 实现这一点最有效的方法是什么 好吧 这是我根据其他人的一些代码提出的解决方
  • postgreSQL中的@@Fetch_status

    我正在将数据库从 MS SQL Server 传输到 PostgreSQL 但此触发器有问题 CREATE TRIGGER added clients ON client FOR INSERT AS BEGIN DECLARE cursor
  • 如何强类型组合 mixin?

    我正在尝试使用函数组合通过 mixin 向对象添加行为 const pipe funcs args any gt any gt initial any gt funcs reduce object fn gt fn object initi
  • 快速,将文件发送到服务器

    我正在学习 swift 我使用下面的代码向服务器发送请求 它适用于简单的请求 我从服务器得到响应 我的问题是我无法将文件发送到服务器 code let parameters parameter let request NSMutableUR
  • 播放后重定向 html5 视频

    我有一个 html 5 视频 我删除了控制按钮并添加了 js 代码 以便用户在单击视频时播放视频 我需要做的是绑定一个额外的脚本 该脚本将在视频播放后重定向页面 而无需重新加载页面 下面是我的js代码 function play var v
  • 如何获取colspan的值

    我尝试过不同的 jQuery 方法 var num this attr colspan text var num this attr colspan val var num this td colspan val var num this
  • 在c#中将字符串转换为十进制

    我在使用decimal parse 将字符串转换为十进制值时遇到一些问题 这是我的代码行 fixPrice decimal Parse mItemParts Groups price Value Replace Replace Replac
  • 开发人员是否需要为在 Windows Azure Marketplace 上发布 SaaS 应用程序付费?

    目前我正在构建一个简单的 SaaS 驱动的 TMS 目的是在 Windows Azure Marketplace 中发布它 我无法找到任何定价 微软是否向开发者收取发布费用 是按月计算的吗 或者 Windows Azure 上托管的所有应用
  • OpenGL-OpenCL 互操作传输时间 + 位图纹理

    两部分问题 我正在开展一个学校项目 使用生命游戏作为实验 gpgpu 的工具 我使用 OpenCL 和 OpenGL 进行实时可视化 目标是让这个东西尽可能大 更快 经过分析 我发现帧时间主要由 CL 获取和释放 GL 缓冲区决定 并且时间
  • JavaScript 初学者遇到的引号问题

    我正在尝试从一本书 Jeremy McPeak 的 Beginner JavaScript 中学习 JS 但我坚持使用以下代码 html 中的结果是这样的 56 02 degrees centigrade is 56 as an integ
  • 如何分发带有依赖库的 Mac OS X?

    我有一个程序 特别是我的条目SO DevDays 倒计时应用挑战 https meta stackexchange com questions 20420 countdown app for devdays 21659 21659 它依赖于
  • 基于多个文件的存在激活 Maven 配置文件

    我想根据多个文件的存在来激活配置文件 在下面的示例中 如果两个文件都被激活 我希望配置文件被激活my marker and another marker exists
  • 包恢复失败。回滚包更改

    当我尝试在 VS2017 中为 asp net core 安装任何 nuget 包时 它不断显示每个 包的 包恢复失败 回滚包更改 您可以执行以下步骤 VS Tools Options Nuget 包管理器 General 清除所有 Nug
  • Gradle编译:如何从依赖关系中识别组和模块?

    有时 我不想添加所有依赖项 因此我需要从依赖项中排除一些依赖项 例如 compile com google http client google http client 1 20 0 exclude group org apache htt
  • ODBC Teradata 驱动程序 HY001 内存分配错误。什么意思?

    我正在使用 python 脚本 该脚本使用 teradata python 模块和类似于下面的脚本将一批数据插入 Teradata 它使用 ODBC 连接 偶尔会出现以下错误 HY001 Teradata ODBC Teradata Dri
  • 将 MutationGroup 流式传输到 Spanner 中

    我正在尝试使用 SpannerIO 将 MutationGroups 流式传输到扳手中 目标是每 10 秒写入新的 MuationGroup 因为我们将使用 Spanner 来查询近期 KPI 当我不使用任何 Windows 时 出现以下错