问题是记录永远不会从全局窗口中删除。因此,每当新记录到达但旧记录仍然存在时,您就会在全局窗口上触发联接操作。
因此,要让它在您的情况下运行,您需要实现一个自定义evictor https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#evictors。我在一个最小的工作示例中扩展了您的示例,并添加了驱逐器,我将在代码片段后对其进行解释。
val data1 = List(
(1L, "myId-1"),
(2L, "myId-2"),
(5L, "myId-1"),
(9L, "myId-1"))
val data2 = List(
(3L, "myId-1", "myValue-A"))
val stream1 = env.fromCollection(data1)
val stream2 = env.fromCollection(data2)
stream1.join(stream2)
.where(_._2).equalTo(_._2)
.window(GlobalWindows.create()) // assume this is a requirement
.trigger(CountTrigger.of(1))
.evictor(new Evictor[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)], GlobalWindow](){
override def evictBefore(elements: lang.Iterable[TimestampedValue[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)]]], size: Int, window: GlobalWindow, evictorContext: Evictor.EvictorContext): Unit = {}
override def evictAfter(elements: lang.Iterable[TimestampedValue[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)]]], size: Int, window: GlobalWindow, evictorContext: Evictor.EvictorContext): Unit = {
import scala.collection.JavaConverters._
val lastInputTwoIndex = elements.asScala.zipWithIndex.filter(e => e._1.getValue.isTwo).lastOption.map(_._2).getOrElse(-1)
if (lastInputTwoIndex == -1) {
println("Waiting for the lookup value before evicting")
return
}
val iterator = elements.iterator()
for (index <- 0 until size) {
val cur = iterator.next()
if (index != lastInputTwoIndex) {
println(s"evicting ${cur.getValue.getOne}/${cur.getValue.getTwo}")
iterator.remove()
}
}
}
})
.apply((r, l) => (r, l))
.print()
应用窗口函数(本例中为 join)后将应用驱逐器。如果第二个输入中有多个条目,目前尚不完全清楚您的用例应该如何工作,但目前,逐出器仅适用于单个条目。
每当有新元素进入窗口时,就会立即触发窗口函数(count = 1)。然后使用具有相同键的所有元素来评估连接。之后,为了避免重复输出,我们删除当前窗口中第一个输入中的所有条目。由于第二输入可能在第一输入之后到达,因此当第二输入为空时不执行驱逐。请注意,我的 scala 已经很生锈了;你将能够以更好的方式编写它。运行的输出是:
Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
4> ((1,myId-1),(3,myId-1,myValue-A))
4> ((5,myId-1),(3,myId-1,myValue-A))
4> ((9,myId-1),(3,myId-1,myValue-A))
evicting (1,myId-1)/null
evicting (5,myId-1)/null
evicting (9,myId-1)/null
最后一点:如果表 API 已经提供了一种简洁的方法来完成您想要的操作,那么我会坚持使用它,然后将其转换为数据流 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#integration-with-datastream-and-dataset-api需要的时候。