Flink水位线不触发问题

2023-10-26

窗口计算时遇到好几次水位线不触发的情况,简单总结下。

首先,介绍下Flink的事件时间(EventTime)和水位线(Watermarks)的概念。

一、处理时间

如果要构造一个实时的流式应用,或早或晚都会接触到EventTime这个概念。现实场景中也会遇到消息乱序到达,这里会介绍到为什么需要事件时间和如何去处理乱序到达的数据。
ProcessingTime是Flink系统处理这条消息的时间,EventTime可以理解成是这条消息真实发生的时间。
举个例子,创建一个SlidingWindow,窗口大小为10秒,步长为5秒。关于窗口的更多概念,可以参考Flink官方文档——Windows

案例1:消息都按时到达

val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
    .keyBy(0)
    .timeWindow(Time.seconds(10), Time.seconds(5))
    .sum(1)
counts.print
senv.execute("ProcessingTime processing example")

如果source中有三条消息,对应的事件时间分别为13秒、13秒和16秒:
[站外图片上传中...(image-1e2304-1550219427900)]
它们会落到正确的窗口上,如下图所示。13秒产生的两条消息会落到窗口1[5s-15s]和窗口2[10s-20s]上,16秒产生的消息会落到窗口2[10s-20s]和窗口3[15s-25s]上。最后窗口fire掉时,三个窗口的count值分别为:(a,2), (a,3) and (a,1) ,和预期一致。

pr_ino_windows

 

案例2:消息delay

其中一条13秒产生的消息晚到了6s,那按上面的代码逻辑,这些消息会落到下面的窗口中:

pr_ooo_windows


延迟的消息会落到窗口2[10s-20s]和窗口3[15s-25s]上。这看起来对窗口2没有影响,因为结果都是3,但窗口3的结果却不一致了。

 

二、事件时间

因此此处我们采用事件时间,这里水位线的事件时间为当前系统的时间,当然你可以改成数据中的某个时间。

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
    e.split(",")(1).toLong 
  }
  override def getCurrentWatermark(): Watermark = { 
      new Watermark(System.currentTimeMillis)
  }
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999)
                .assignTimestampsAndWatermarks(new TimestampExtractor) 
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(10), Time.seconds(5))
      .sum(1)
counts.print
senv.execute("EventTime processing example")

结果如下图所示:
[站外图片上传中...(image-b2feb1-1550219427900)]
结果看起来好很多,窗口2和3都正确了,但窗口1却丢了一条数据。
Flink没有将delay的数据分配给窗口3是因为现在是检查消息的事件时间,因此不会放入窗口三中。而没有分配给窗口1的原因是delayed的消息到达系统的时间是19秒,窗口1已经fire掉了。此处就需要watermarks了。

三、水位线(水印)

我认为水位线是很重要和有趣的一个概念,我这里会大概描述下,如果想了解更多可以看Google一场精彩的talk,也可以看这个dataArtisans的blog。水位线简单理解就是一个timestamp,当Flink收到这个水印时,Flink理解会收到来自这个时间点之后的消息,也可以理解成告诉Flink运行到哪个事件时间了。
在这个案例中,其实就是告诉Flink一条消息可以迟到多久。
我现在设置水位线为现在的事件提前5秒,相当于告诉Flink我的消息可以迟到五秒。

override def getCurrentWatermark(): Watermark = { 
      new Watermark(System.currentTimeMillis - 5000)
  }

结果变成下图所示:

[站外图片上传中...(image-6dfb6e-1550219427900)]

四、允许延迟(Lateness)

如果采用“watermark - delay”,如果水位线不超过window_length + delay是不会被fire掉的,所以此刻可以采用allowedLateness方法。在window_end_time + allowed lateness之前,Flink都不会丢弃这条数据。
当消息到达时,Flink会提取它的时间,然后判断它是否在有效的延迟时间内,然后去判断是否fire掉窗口。
但是通过这种途径,一个窗口有可能被fire掉多次,如果需要exactly once processing的话,需要保证sink是幂等的。

五、水位线怎么不触发?

数据一直有序得进来,为什么没有窗口被fire掉?没有结果产出?

case1:提取时间失败

笔者和上游约定好了数据格式,extractTimestamp中提取的是某个字段为事件时间。研究数据发现约定好的字段突然不发了。

case2:提取时间有了,但是照样失败

上游按约定发了该字段后,系统在测试环境运行了一段时间,又没有结果产出了。
调试发现提取时间正常,checkAndGetNextWatermark也正常,但是为什么窗口没被fire掉呢。
因为时间的format变了,由到毫秒的timestamp变成了yyyymmddHHmmss,需要转成timestamp。

case3:一切正常,窗口不fire

EventTimeTrigger.java
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // if the watermark is already past the window fire immediately
        return TriggerResult.FIRE;
    } else {
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
}

看了下EventTimeTrigger的源码,只有当当前的水位线越过窗口,即时间大于窗口的endTime才会触发Fire的操作。我们的处理流程没有触发,那就说明我们的水位线没有更新到合适的值。调试后发现当前的水位线一直停留在初始的最小的long值。

BoundedOutOfOrdernessTimestampExtractor.java
@Override
public final Watermark getCurrentWatermark() {
    // this guarantees that the watermark never goes backwards.
    long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
    if (potentialWM >= lastEmittedWatermark) {
        lastEmittedWatermark = potentialWM;
    }
    return new Watermark(lastEmittedWatermark);
}

debug发现lastEmittedWatermark确实有更新,这说明这个地方是触发了Watermark的值。但是debug的过程中,发现时不时会出现初始值的水位线。

SystemProcessingTimeService.java
TimestampsAndPeriodicWatermarksOperator.java
@Override
public void onProcessingTime(long timestamp) throws Exception {
    // register next timer
    Watermark newWatermark = userFunction.getCurrentWatermark();
    if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
        currentWatermark = newWatermark.getTimestamp();
        // emit watermark
        output.emitWatermark(newWatermark);
    }
    long now = getProcessingTimeService().getCurrentProcessingTime();
    getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}

TimestampsAndPeriodicWatermarksOperator会做判断:如果新的水位线小于当前的水位线,就不会更新了。

终于,顺着StreamInputProcessor–>StatusWatermarkValve理了下来,看见这样的处理逻辑:

StreamInputProcessor.java
StatusWatermarkValve.java
private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
    long newMinWatermark = Long.MAX_VALUE;
    // determine new overall watermark by considering only watermark-aligned channels across all channels
    for (InputChannelStatus channelStatus : channelStatuses) {
        if (channelStatus.isWatermarkAligned) {
            newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
        }
    }
    // we acknowledge and output the new overall watermark if it is larger than the last output watermark
    if (newMinWatermark > lastOutputWatermark) {
        lastOutputWatermark = newMinWatermark;
        outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
    }
}

这里会将所有的channel status的水位线做个汇总:取最小的水位线。那是不是问题出在这里?后面debug了下看看,确实,这个地方有的channel status下的水位线一直是最小的long值那个不正常的水位线,进而导致整体的水位线发送不出去。

那么为什么会出现这种情况呢,百思不得其解。

当 [window_start_time,window_end_time) 有数据,watermark Time大于等于window_end_time时,会触发window trigger。

因为之前运行都是正常的,检查了数据也没问题。去翻改动,有影响的可能就是改了一些算子的并行度。

assigntimestampandwatermarks和map的并行度一样了就不能生成水位线了?

于是修改了assigntimestampandwatermarks的并行度,window正常fire掉了。

分析原因:
Flink source用到了FlinkKafkaConsumer010,没有指定KafkaPartitioner的话,会通过FixedPartitioner来给出默认的partitioner方法:

public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
        return partitions[this.parallelInstanceId % partitions.length];
    }

parallelInstanceId代表着Flink consumer程序的并行度ID,假如FlinkKafkaConsumer010的并行度是12,那么这12个线程的ID分别是1-12.

parallelInstances代表着总的并行度,即12。

partitions是一个kafka partition的数组,kafka的topic的partitions是4(因为性能测试,换到只有一个节点的kafka)。

Flink partition的规则,就是Flink的并行度ID除以kafka partition length取余。

因此kafka编号为1-4的partition分别对应source node的1-4的partition,那么source node5-12的partition就为空了。

默认的partition策略是按照Flink的并行度ID与kafka中partition的数量取余的方法分配的,而与数据本身没有关系。

source node的partition为5-12的接收不到数据,

Watermark的生成是数据驱动的,只有当TimestampAndWatermarkAssigner”看到”数据时,watermark才会生效。

而map和assigntimestampandwatermarks并发度一样的话,这八个partition的watermark不会修改,所以会出现watermark的初始值一直存在的情况。

当assigntimestampandwatermarks的并行度修改后,事件会被shuffled across(洗牌),因此到了TimestampAndWatermarkAssigner不会有空的partition存在了。

以上。

五 实际定位

  1. 问题:发现程序没有输出,没有任何报错。
  2. 可以先去任务的Appmaster上看watermark运行情况.来定位是否是watermarke的问题

 

  1. 如果是显示no watermark ,按照上述的case 进行诊断。
  2. 在assignTimestampsAndWatermarks 后setParallelism(2) 。具体并行度根据实际需求设置,在这里kafkatopic只有2个分区,并行度设置为2

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink水位线不触发问题 的相关文章

随机推荐

  • X11协议基础与实践

    X11协议基础与实践 概念 X11 X Window System 是一种位图显示的视窗系统 X表示X协议 11是协议版本号 X 协议主要由 X server 和 X client 组成 l X server 管理主机上与显示相关的硬件设置
  • 对角线遍历

    param number matrix return number var findDiagonalOrder function matrix if matrix null matrix length 0 return let m matr
  • “40道高频区块链面试题”——我的一些看法

    最近看到了一篇文章如下 超强攻略 40道高频区块链面试题大放送 年底跳槽看过来 地址我也贴出来吧 https mp weixin qq com s 3Fa2XG4R11QDfMSAaBCngw 哦 CSDN的地址也出来了 https blo
  • vscode好用的前端插件和快捷键

    用到好用的vscode插件 总结一下 文章目录 一 常用主题 1 Material Theme主题 2 Community Material Theme主题 3 vscode icons 二 基础插件 1 Code Spell checke
  • java生成二维码图片(有logo),并在图片下方附文字

    logo配置类 Created by Amber Wang on 2017 11 27 17 25 import java awt public class LogoConfig logo默认边框颜色 public static final
  • 【数据结构】 实现 堆 结构 ---超细致解析

    目录 二叉树的性质 二叉树的存储结构 顺序存储 链式存储 堆的概念和性质 堆的实现 堆的初始化 堆的插入 向上调整函数 堆的删除 向下调整函数 向上建堆 向下建堆 TopK问题 二叉树的性质 在我们实现堆之前我们要知道堆的实现是依靠的是二叉
  • C# Thread启动线程时传递参数

    目录 1 不带参数 使用ThreadStart 2 带一个参数 使用ParameterizedThreadStart 3 带多个参数 1 不带参数 使用ThreadStart private void funcName public voi
  • Java反射学习记录

    一 反射概述 反射允许程序在运行中获取类的内部信息 例如构造器 成员变量 成员方法等 类加载之后 在堆中生成一个Class类的对象 一个类只有一个Class对象 这个对象包含类的完整结构信息 二 入门案例 通过配置文件中的内容生成指定类的对
  • 创建完整团队的艺术:敏捷如何改变我们与客户的工作方式

    来源 Ackarlix博客 http www ackarlix com 十年前 敏捷宣言 的作者们希望我们重新思考 我们作为程序员与客户协作的方式 我和我的博士学位顾问Robert Biddle以及James Noble都深受启发 充满希望
  • python --- multiprocessing实现多进程

    文章目录 进程理论知识 multiprocessing模块实现多进程 进程池 进程理论知识 进程就是正在运行的程序 是计算机进行资源分配的最小单位 各个进程都有独立的数据 相互隔离 Linux里进程的状态 R 运行状态runable S 中
  • 各种网络协议的类型、优缺点、作用

    一 网络协议的定义 网络协议是一种特殊的软件 是计算机网络实现其功能的基本机制 网络协议的本质是规则 即各种硬件和软件必须遵循的共同规则 网络协议并不是一套单独的软件 他融合于其他所有软件系统中 协议在网络中无所 不在 二 常用的网络协议
  • 24

    以下内容出自 MySQL 实战 45 讲 https time geekbang org column article 76446 24 MySQL是怎么保证主备一致的 MySQL 主备的基本原理 如图所示就是基本的主备切换流程 M S结构
  • socket可读,可写的条件

    socket可读可写条件 经常做为面试题被问 因为它考察被面试者对网络编程的基础了解的是不是够深入 要了解socket可读可写条件 我们先了解几个概念 1 接收缓存区低水位标记 用于读 和发送缓存区低水位标记 用于写 每个套接字有一个接收低
  • Oracle数据库运维、备份常用指令

    Oracle数据库运维 备份常用指令 1 Oracle数据泵备份导出 1 1 准备工作 在linux系统下创建导出结果存放的文件夹 切记要切换到oracle用户创建 否则会出现权限问题 su oracle mkdir home oracle
  • keepalived高可用服务的VIP地址无法访问

    环境 keepalived nginx实现高可用 VIP地址可以正常生成 也可正常漂移 可以实现故障切换 VIP地址只能在本地服务器ping通 其他内网服务器上无法ping通VIP地址 防火墙和selinux都已关闭 原因一 服务器启动了i
  • day37 445 数字反转 (字符串处理、模拟)

    445 数字反转 给定一个整数 请将该数各个位上数字反转得到一个新数 新数也应满足整数的常见形式 即除非给定的原数为零 否则反转后得到的新数的最高位数字不应为零 输入格式 输入共1行 1个整数N 输出格式 输出共1行 1个整数表示反转后的新
  • 集群基础7——keepalived脑裂

    文章目录 一 脑裂概念 二 脑裂产生原因 三 解决方案 四 脑裂监控 一 脑裂概念 在高可用 HA 系统中 当联系2个节点的 心跳线 断开时 本来为一整体 动作协调的HA系统 就分裂成为2个独立的个体 由于相互失去了联系 都以为是对方出了故
  • 常用 Git 命令行操作

    本文记录了一些常用 Git 命令行操作的具体使用方式 git clone git clone REPOSITORY URL 拉取仓库 并使用仓库名作为本地文件名 git clone REPOSITORY URL FOLDER 拉取仓库 并使
  • android new intent(),Android:关于onNewIntent()触发机制及注意事项

    在阅读该篇日志前 先熟悉一下Android的四种启动模式 因为onNewIntent并不是在所有启动模式下都会执行的 一 onNewIntent 在IntentActivity中重写下列方法 onCreate onStart onResta
  • Flink水位线不触发问题

    窗口计算时遇到好几次水位线不触发的情况 简单总结下 首先 介绍下Flink的事件时间 EventTime 和水位线 Watermarks 的概念 一 处理时间 如果要构造一个实时的流式应用 或早或晚都会接触到EventTime这个概念 现实