为什么 Flink 在 DataStream join + Global window 上发出重复记录?

2024-01-28

我正在学习/试验 Flink,并且观察到 DataStream 连接的一些意外行为,并且想了解发生了什么......

假设我有两个流,每个流有 10 条记录,我想将其加入到id场地。假设一个流中的每条记录在另一个流中都有一个匹配的记录,并且 ID 在每个流中都是唯一的。还假设我必须使用全局窗口(要求)。

使用 DataStream API 加入(我在 Scala 中的简化代码):

val stream1 = ... // from a Kafka topic on my local machine (I tried with and without .keyBy)
val stream2 = ... 

stream1
  .join(stream2)
  .where(_.id).equalTo(_.id)
  .window(GlobalWindows.create()) // assume this is a requirement
  .trigger(CountTrigger.of(1))
  .apply {
    (row1, row2) => // ... 
  }
  .print()

Result:

  • 一切都按预期打印,第一个流中的每条记录都与第二个流中的记录连接在一起。

However:

  • 如果我将其中一个记录(例如,带有更新的字段)从一个流重新发送到该流,则会发出两个重复的连接事件 ????
  • 如果我重复该操作(有或没有更新的字段),我将收到 3 个发出的事件,然后是 4、5 个等......????

Flink 社区有人能解释一下为什么会发生这种情况吗?我预计每次只会发出 1 个事件。是否可以通过全局窗口来实现这一点?

相比之下,Flink Table API 在同一场景中的行为符合预期,但对于我的项目,我对 DataStream API 更感兴趣。

Table API 示例,按预期工作:

tableEnv
  .sqlQuery(
    """
      |SELECT *
      |  FROM stream1
      |       JOIN stream2
      |       ON stream1.id = stream2.id
    """.stripMargin)
  .toRetractStream[Row]
  .filter(_._1) // just keep the inserts
  .map(...)
  .print() // works as expected, after re-sending updated records

谢谢你,

Nicolas


问题是记录永远不会从全局窗口中删除。因此,每当新记录到达但旧记录仍然存在时,您就会在全局窗口上触发联接操作。

因此,要让它在您的情况下运行,您需要实现一个自定义evictor https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#evictors。我在一个最小的工作示例中扩展了您的示例,并添加了驱逐器,我将在代码片段后对其进行解释。

val data1 = List(
  (1L, "myId-1"),
  (2L, "myId-2"),
  (5L, "myId-1"),
  (9L, "myId-1"))

val data2 = List(
  (3L, "myId-1", "myValue-A"))

val stream1 = env.fromCollection(data1)
val stream2 = env.fromCollection(data2)

stream1.join(stream2)
  .where(_._2).equalTo(_._2)
  .window(GlobalWindows.create()) // assume this is a requirement
  .trigger(CountTrigger.of(1))
  .evictor(new Evictor[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)], GlobalWindow](){
    override def evictBefore(elements: lang.Iterable[TimestampedValue[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)]]], size: Int, window: GlobalWindow, evictorContext: Evictor.EvictorContext): Unit = {}

    override def evictAfter(elements: lang.Iterable[TimestampedValue[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)]]], size: Int, window: GlobalWindow, evictorContext: Evictor.EvictorContext): Unit = {
      import scala.collection.JavaConverters._
      val lastInputTwoIndex = elements.asScala.zipWithIndex.filter(e => e._1.getValue.isTwo).lastOption.map(_._2).getOrElse(-1)
      if (lastInputTwoIndex == -1) {
        println("Waiting for the lookup value before evicting")
        return
      }
      val iterator = elements.iterator()
      for (index <- 0 until size) {
        val cur = iterator.next()
        if (index != lastInputTwoIndex) {
          println(s"evicting ${cur.getValue.getOne}/${cur.getValue.getTwo}")
          iterator.remove()
        }
      }
    }
  })
  .apply((r, l) => (r, l))
  .print()

应用窗口函数(本例中为 join)后将应用驱逐器。如果第二个输入中有多个条目,目前尚不完全清楚您的用例应该如何工作,但目前,逐出器仅适用于单个条目。

每当有新元素进入窗口时,就会立即触发窗口函数(count = 1)。然后使用具有相同键的所有元素来评估连接。之后,为了避免重复输出,我们删除当前窗口中第一个输入中的所有条目。由于第二输入可能在第一输入之后到达,因此当第二输入为空时不执行驱逐。请注意,我的 scala 已经很生锈了;你将能够以更好的方式编写它。运行的输出是:

Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
4> ((1,myId-1),(3,myId-1,myValue-A))
4> ((5,myId-1),(3,myId-1,myValue-A))
4> ((9,myId-1),(3,myId-1,myValue-A))
evicting (1,myId-1)/null
evicting (5,myId-1)/null
evicting (9,myId-1)/null

最后一点:如果表 API 已经提供了一种简洁的方法来完成您想要的操作,那么我会坚持使用它,然后将其转换为数据流 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#integration-with-datastream-and-dataset-api需要的时候。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么 Flink 在 DataStream join + Global window 上发出重复记录? 的相关文章

随机推荐

  • F# 和 ADO.NET - 惯用的 F#

    我刚刚开始学习 F 我昨晚写了这段 F ADO NET 代码 您将通过哪些方式改进语法 使其感觉像惯用的 F let cn new OleDbConnection cnstr let sql SELECT FROM People let d
  • C:按某个键终止程序

    在C语言中 我通常使用getch 函数等待按下键然后结束程序 但最近我读到 由于它不是标准函数 因此使用它是不好的编程习惯 因此 而不是以下内容 int main dosomething getch wait for the user to
  • 如何从矩阵中删除所有带有 NA 的行?

    I have 一个矩阵y它有两列 行数不同 取决于输入参数 第一列中的所有元素都是整数 I need 对于每一行 如果第二列的元素为 NA 我需要删除这一行 我该怎么做 我唯一的想法是创建另一个矩阵 如果第一个矩阵中的一行没有 NA 则将其
  • 从页面读取 XML 响应

    我正在使用 C 和 ASP net 对网页执行 POST 如何读取 XML 响应以确定我的提交是否有错误或是否成功 这是我尝试过的 但它只会返回成功 失败消息 不会显示从页面返回的实际 xml private void Perform th
  • 使用 jquery 滑动导航栏

    我试图为网站创建一个水平导航栏 现在我需要提供一个滑动效果如下 导航菜单有 5 个链接 第 5 个链接是右箭头图像 单击此第 5 个链接 右箭头 时 所有其他 4 个链接应折叠到此第 5 个菜单中 并且右箭头应替换为左箭头 现在只有一个链接
  • pandas多索引如何按第二级屏蔽数据

    我有一个具有多索引的数据框 如下所示 Date Period Value n 20130101 0 12 n 20130101 1 13 20130102 0 13 20130102 1 14 第一级是日期 第二级是周期 我想将周期不为零的
  • 从 Joomla 表单字段插入数据库

    我是 Joomla 的初学者 开发并创建了一个非常简单的模块 如何创建包含 3 个文本字段的表单 然后将输入的值保存到数据库表中 试试这个例子 我们将把用户的名字和姓氏发布到表中 在您的数据库中创建一个表 注意它应该有前缀 jos 我们将这
  • 撤消 git 合并

    我对 Git 没有那么丰富的经验 现在我遇到了一个大问题 这是我当前的分支的样子 feature F1 F2 master M0 M1 M2 M3 M4 bugfix B1 B2 情况 有人做了一件非常糟糕的事情并推送了一个非常糟糕的合并
  • MySQL 插入错误:ER_BAD_FIELD_ERROR:“字段列表”中未知列“2525”

    var convID 2525 var contactUsername blabla var userId 100 var contactId 200 var sql INSERT INTO contacts FK OWNERID FK U
  • 如何/在哪里使用 NSNumberFormatter?

    我是代码新手 在查看了一些答案后仍然需要帮忙 在我的代码中 func labelInformation numLabels text newLabel text 当前结果 228500 23 期望的结果 228 500 23 如何 在哪里使
  • 短信收件箱中对联系人表的错误引用

    我正在尝试从手机的短信收件箱中查找与短信相对应的联系方式 据我了解person列是外键 id的列ContactsContract Contacts 我的问题是我得到了错误的值person来自短信查询的值 一些person联系人表中不存在 I
  • 静态变量还是通过 Bundle 传递变量? [复制]

    这个问题在这里已经有答案了 假设我有一个 ListView 并在列表上设置了一个 OnItemClickListener 传递变量的最佳方式是什么 静态变量 public static String example onItemClick
  • 由于 teamcity,pycharm 调试控制台不可读

    几天以来 我的 pycharm 调试控制台被 teamcity 信息系统性地污染了 例如当我想打印出变量值时 将显示以下内容 gt gt gt df teamcity testStdOut timestamp 2017 11 02T15 5
  • 我希望能够使用故事板左右滚动 UITableViewCell。这不可能吗?

    我已经尝试了所有我能想到的方法 但没有任何效果 与 iOS 7 的 Mail app 中的效果类似 我希望能够左右滑动表格视图的单元格 我希望能够在情节提要和自动布局中执行此操作 因为我的大部分应用程序都是使用这些完成的 我理解带有故事板的
  • 将 WebRTC (AudioTrackSinkInterface) 原始音频写入光盘

    我正在尝试录制 WebRTC 传输的音频PeerConnection MediaStream 我在音轨中添加了一个接收器 它实现了AudioTrackSinkInterface 它实现了OnData method void TestAudi
  • 确定 C# 中的调用对象类型

    不管这是否是一个好主意 是否可以实现一个接口 其中执行函数知道调用对象的类型 class A private C public int doC int input return C DoSomething input class B pri
  • JPA 实体管理器 - 如何运行 SQL 脚本文件?

    我有一个 SQL 脚本文件 它删除并重新创建各种表以及将各种记录插入到这些表中 该脚本在 SQL 查询控制台中执行时运行良好 但我需要它由实体管理器执行 关于我如何能够做到这一点有什么想法吗 Thanks H 参加聚会迟到了 但我是这样做的
  • Laravel 为资源控制器命名路由

    使用 Laravel 4 2 是否可以为资源控制器路由分配名称 我的路线定义如下 Route resource faq ProductFaqController 我尝试向路线添加名称选项 如下所示 Route resource faq Pr
  • 专家 R 用户,您的 .Rprofile 中有什么? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 为什么 Flink 在 DataStream join + Global window 上发出重复记录?

    我正在学习 试验 Flink 并且观察到 DataStream 连接的一些意外行为 并且想了解发生了什么 假设我有两个流 每个流有 10 条记录 我想将其加入到id场地 假设一个流中的每条记录在另一个流中都有一个匹配的记录 并且 ID 在每