我使用 Kinesis 数据流作为源,使用 elasticsearch 作为接收器。
在 AWS Kinesis Data 分析应用程序中运行 Flink 作业。
事件示例:
{"area":"sessions","userId":4450,"date":"2021-12-03T11:00:00","videoDuration":5}
我正在从前端收集这些视频观看事件,同时视频每 5 秒为一名用户播放一次。这些事件用于计算用户的观看时间。
假设如果一个用户正在观看视频,则前端每 5 秒生成一次此事件并摄取到 Kinesis 数据流中。有 10,000 个用户观看视频,因此一分钟内总共生成了 120,000 个事件。
加工用120,000 个事件我的 Flink 工作几乎需要〜4分钟的时间。这是相当长的一段时间了。
那么如何才能提高工作绩效呢?我需要在1分钟。
我的工作是这样的:
stream
.keyBy(e -> e.getUserId())
.timeWindow(Time.seconds(60))
.reduce(new MyReduceFunction()) //sum of video duration for user
.map(<enrich event using some data from redis>)
.addSink(<elasticsearch sink>);
// Reduce function
private static class MyReduceFunction implements ReduceFunction<TrackingData> {
@Override
public TrackingData reduce(TrackingData trackingData, TrackingData t1) throws Exception {
trackingData.setVideoDuration(trackingData.getVideoDuration() + t1.getVideoDuration());
return trackingData;
}
}
那么这项工作首先从 Kinesis Data 流接收事件,然后通过该流键入userId
然后我做了一些videoDuration
1 分钟后,这些数据进入丰富功能,在该功能中,我从 Redis 读取一些数据并丰富该事件,然后将该事件放入 Elasticsearch。
我尝试过增加作业的并行度,它为 1 个并行度(大约 4 分钟)提供了最佳性能。如果我增加并行度,就会花费更多时间,这很奇怪。尝试过 2、4、8、16 等。增加并行性应该可以加快处理速度,不是吗?
任何人都可以帮助我在 Flink 作业中缺少什么或做错了什么,我需要做什么才能在 1 分钟内加速这些事件?