时间(Time)
Flink支持三种不同的时间:
-
处理时间(processing time):指当前操作的时间(如:map())。当程序以处理时间运行,所有基于时间的操作(窗口),都依赖各个operator的机器时间。
-
事件时间(event time):事件时间可以理解称为数据产生的时间,是和数据绑定的。使用事件时间时必须设置其水印,在一定范围内可以处理乱序数据。
-
接收时间(ingestion time):指事件进入Flink程序的时间,在source operator时生成。和事件时间不同,接收时间不能处理任何无序的数据,同时也不需要指定水印。
设置时间属性
程序的第一部分通常就是设置时间类型(上述三种)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
事件时间和水印(Event Time and Watermarks)
在基于事件时间的流处理中,需要一种方法来度量时间进度,比如每小时触发一次的窗口操作。显然不能直接使用事件时间(事件时间不能保证绝对有序增长),而需要使用水印来衡量时间进度。
水印流作为数据流的一部分,一个时间戳 t。这个Watermark(t)表示事件到达了时间点(t)(一般是当前到达的事件的最大事件时间-容忍的迟到或乱序的最长时间),也就是说不应该 t ’ < = t(即事件与时间戳小于或等于水印)的事件进入当前操作符。
下图是一个事件按顺序(时间顺序)排列,水印在流中只是当做周期(窗口)的标记。
对于无序的的数据流,水印至关重要。不管事件是否按照时间戳排序,水印都会标记在某个时间点之后,所有这个时间戳之前的数据都应该到达了。水印可以让无序的数据在每个窗口之间保持有序。
上图中W(11),和w(17)之间理解成一个窗口。数据小于W(11)时间戳的被认为是无效数据,而大于W(17)会被缓存起来不会参与窗口的计算以及向下游发送。
在并行流中的水印(Watermarks in Parallel Streams)
水印一般在source function 或者之后生成,并且每个source function的并行任务独立生成各自的水印。
水印产生,并会向下游发送,更新下游的事件时间。同时也是下游生成新水印的依据。一下有多个输入流的操作union,或者keyBy之后,或者partition之后。此类操作符使用的水印生成的事件时间是所有流入水印里面最小的一个。
生成时间戳和分配水印(Generating Timestamps / Watermarks)
如果Flink使用事件时间处理,那么就需要为每个元素都分配或者指定时间戳。一般都是通过事件的某个字段来提取。
有两种方法来分配时间戳和生成水印:
- 直接在source数据流中
- 在Flink中使用时间戳分配程序/水印生成器:时间戳分配程序和定水印生成器
以下例子流使用时间戳和水印生成器生成水印和一个新的带时间戳属性的元素:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
水印和时间戳生成器:getCurrentWatermark()
方法被调用返回一个水印,如果水印非空并且大于前一个水印,则新的水印会被发送。
public class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<MyEvent> {
private Watermark current;
long currentMaxTimestamp = 0L;
long maxOutOfOrderless = 5000L; //5秒延时
@Nullable
@Override
public Watermark getCurrentWatermark() {
current = new Watermark(currentMaxTimestamp - maxOutOfOrderless);
return current;
}
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timeStamp = element.eventTime.getTime();
currentMaxTimestamp = Math.max(timeStamp, currentMaxTimestamp);
return timeStamp;
}
}
Notice: 窗口最终由事件触发。如30秒的窗口,若事件在前29秒全部发送完毕,并且sourceFunction不会再产生数据会导致窗口无法触发。
更多有关水印的使用请浏览官网Generating Timestamps / Watermarks。
本教程的所有示例代码都已上传至Github仓库flink-toturial
下一篇Flink简单教学6-operator
关注我的公众号
了解我的最新动向
收藏我的个人博客