需求:自定义数据源,产出交易订单数据,设置基于事件时间窗口统计
。
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author liu a fu
* @version 1.0
* @date 2021/3/9 0009
* @DESC TODO: 自定义数据源,产出交易订单数据,设置基于事件时间窗口统计。
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order {
private String id;
private String userId;
private Integer money;
private Long orderTime;
}
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @author liu a fu
* @version 1.0
* @date 2021/3/9 0009
* @DESC 自定义数据源,实时产生订单数据,继承RichParallelSourceFunction接口
*/
public class OrderSource extends RichParallelSourceFunction<Order> {
//标识符 表示是否产生数据
private boolean isRunning = true;
//不断执行 实时产生数据
@Override
public void run(SourceContext<Order> ctx) throws Exception {
int[] times = new int[]{0, 0, 8, 0, 10, 5, 15, 12};
//随机实例对象
Random random = new Random();
while (isRunning){
//创建订单对象
Order order = new Order(
UUID.randomUUID().toString().substring(1, 18), //
"u100" + random.nextInt(2), // u1000, u1001
random.nextInt(100) + 1, //
// 为了演示生成的订单数据乱序达到Flink应用,当获取当前时间戳以后,再减去随机时间0,1,2,3,4
System.currentTimeMillis() - times[random.nextInt(times.length)] * 1000//
);
System.out.println("Order >>>" + order);
//发送数据
ctx.collect(order);
// TODO: 每秒钟产生一条数据
TimeUnit.SECONDS.sleep(2);
}
}
@Override
public void cancel() {
// 当不在接收数据时,设置isRunning为false
isRunning = false;
}
}
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author liu a fu
* @version 1.0
* @date 2021/3/9 0009
* @DESC 滚动事件时间窗口(Tumbling EventTime Window)统计:每隔5秒,计算5秒内,每个用户的订单金额
* TODO:添加Watermark水位线,来解决一定程度上的数据延迟和数据乱序问题
*/
public class StreamEventTimeWatermarkWindow {
public static void main(String[] args) throws Exception {
//1-环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO: step1. 设置时间语义:事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//2-数据源source
DataStreamSource<Order> orderSource = env.addSource(new OrderSource());
// TODO: step2. 提取事件时间字段值,转换为Long类型,并且设置最大允许的延迟时间或乱序时间
SingleOutputStreamOperator<Order> timeDataStream = orderSource.assignTimestampsAndWatermarks(
// TODO: 设置最大运行延迟时间,从而计算出每条数据Watermark水位线 = 事件时间 - 最大允许延迟时间
new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(2)) {
@Override
public long extractTimestamp(Order order) {
return order.getOrderTime();
}
}
);
//3-数据的transformation 每隔5秒,计算5秒内,每个用户的订单金额
SingleOutputStreamOperator<Tuple2<String, Integer>> sumDataStream = timeDataStream
.map(new MapFunction<Order, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Order order) throws Exception {
return Tuple2.of(order.getUserId(), order.getMoney());
}
})
.keyBy(0) //按照用户ID分组
// step3. 设置事件时间窗口大小
.timeWindow(Time.seconds(5))
//聚合计算
.sum(1);
//4-数据的sink
sumDataStream.printToErr();
//5-execute
env.execute(StreamEventTimeWatermarkWindow.class.getSimpleName());
}
}
- 运行程序, 分析结果,但是不容易看到
窗口时间范围及数据时间值
,所以需要使用apply函数
进行聚合。
- 自己编写上述代码功能,在设置每条数据水位线时使用
AssignerWithPeriodicWatermarks
及apply函数
聚合数据,获取窗口起始和结束时间
。具体代码如下:
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
/**
* @author liu a fu
* @version 1.0
* @date 2021/3/9 0009
* @DESC 滚动事件时间窗口(Tumbling EventTime Window)统计:每隔5秒,计算5秒内,每个用户的订单金额
* TODO:添加Watermark水位线,来解决一定程度上的数据延迟和数据乱序问题
*/
public class StreamEventTimeWatermarkWindowDebug {
public static void main(String[] args) throws Exception {
//1-执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO:step1. 设置流试时间语义:事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//2-数据源source
DataStreamSource<Order> orderSource = env.addSource(new OrderSource());
//TODO: step2. 提取事件时间字段值,转换为Long类型,并且设置最大允许的延迟时间或乱序时间
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd hh:mm:ss");
SingleOutputStreamOperator<Order> orderDataStream = orderSource
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Order>() {
// Watermark延迟时间(允许最大数据乱序时间), 此处设置最大延迟2秒
Long maxOutOfOrderness = 2000L;
// 当前最大事件时间
Long currentMaxTimestamp = Long.MIN_VALUE + 2000L;
// 最新的水位时间
Long lastEmittedWatermark = Long.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
//依据当前数据中时间时间,计算出水位 Watermaker值
long potentialWatermark = currentMaxTimestamp - maxOutOfOrderness;
// 比较当前数据计算Watermark值与上次数据Watermark值大小,设置Watermark
if (potentialWatermark >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWatermark;
}
return new Watermark(lastEmittedWatermark);
}
@Override
public long extractTimestamp(Order order, long previousElementTimestamp) {
//获取订单产生的当前时间
Long eventTime = order.getOrderTime();
// 比较当前数据事件时间与最大事件时间大小
if (eventTime > currentMaxTimestamp) {
currentMaxTimestamp = eventTime;
}
// 打印当前数据Watermark值
System.out.println(
"userId: " + order.getUserId()
+ ", money: " + order.getMoney()
+ ", eventTime: " + format.format(eventTime)
+ ", watermark: " + format.format(getCurrentWatermark().getTimestamp())
);
return eventTime;
}
});
// 3. 数据转换-transformation: 每隔5秒,计算5秒内,每个用户的订单金额
SingleOutputStreamOperator<String> sumStream = orderDataStream
.keyBy("userId")
// TODO: 设置事件时间窗口,大小为5秒
.timeWindow(Time.seconds(5))
// 窗口函数聚合
.apply(new WindowFunction<Order, String, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple,
TimeWindow window,
Iterable<Order> input,
Collector<String> out) throws Exception {
// 获取窗口StartTime和EndTime
String startTime = format.format(window.getStart());
String endTime = format.format(window.getEnd());
// 获取组合所有订单时间
List<String> list = new ArrayList<>();
Integer orderSum = 0;
for (Order order : input) {
list.add(format.format(order.getOrderTime()));
orderSum += order.getMoney();
}
String output = "窗口>>> userId: " + tuple.toString()
+ " -> [" + startTime + " ~ " + endTime
+ "], sum: " + orderSum + ", Orders: " + list;
out.collect(output);
}
});
// 4. 数据终端-sink
sumStream.printToErr();
// 5. 触发执行-execute
env.execute(StreamEventTimeWatermarkWindowDebug.class.getSimpleName());
}
}
- 分析上面案例分析,如果数据延迟到时间较久(通过Watermark机制,处理乱序数据,都没有处理到的数据),可以使用
Allowed Lateness机制和侧边流处理延迟数据
。