Flink 处理事件太慢

2024-01-11

我使用 Kinesis 数据流作为源,使用 elasticsearch 作为接收器。 在 AWS Kinesis Data 分析应用程序中运行 Flink 作业。

事件示例:

{"area":"sessions","userId":4450,"date":"2021-12-03T11:00:00","videoDuration":5} 

我正在从前端收集这些视频观看事件,同时视频每 5 秒为一名用户播放一次。这些事件用于计算用户的观看时间。

假设如果一个用户正在观看视频,则前端每 5 秒生成一次此事件并摄取到 Kinesis 数据流中。有 10,000 个用户观看视频,因此一分钟内总共生成了 120,000 个事件。

加工用120,000 个事件我的 Flink 工作几乎需要〜4分钟的时间。这是相当长的一段时间了。

那么如何才能提高工作绩效呢?我需要在1分钟。

我的工作是这样的:

        stream
                .keyBy(e -> e.getUserId())
                .timeWindow(Time.seconds(60))
                .reduce(new MyReduceFunction()) //sum of video duration for user
                .map(<enrich event using some data from redis>)
                .addSink(<elasticsearch sink>);

// Reduce function 

 private static class MyReduceFunction implements ReduceFunction<TrackingData> {
        @Override
        public TrackingData reduce(TrackingData trackingData, TrackingData t1) throws Exception {
                trackingData.setVideoDuration(trackingData.getVideoDuration() + t1.getVideoDuration());
                return trackingData;
        }
    }

那么这项工作首先从 Kinesis Data 流接收事件,然后通过该流键入userId然后我做了一些videoDuration1 分钟后,这些数据进入丰富功能,在该功能中,我从 Redis 读取一些数据并丰富该事件,然后将该事件放入 Elasticsearch。

我尝试过增加作业的并行度,它为 1 个并行度(大约 4 分钟)提供了最佳性能。如果我增加并行度,就会花费更多时间,这很奇怪。尝试过 2、4、8、16 等。增加并行性应该可以加快处理速度,不是吗?

任何人都可以帮助我在 Flink 作业中缺少什么或做错了什么,我需要做什么才能在 1 分钟内加速这些事件?


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink 处理事件太慢 的相关文章

随机推荐

  • 内部样式表在 IE9 中无法与 jsf 一起工作

    我正在使用 jsf2 0 和 primefaces 我的应用程序在包括 IE8 在内的所有浏览器上运行良好 但是当我在 IE9 中运行我的应用程序时 我的内部样式表甚至没有被浏览器 IE9 采用 样式被破坏了 我头上的CSS
  • Gson根据字段名自定义反序列化逻辑

    我的课是这样的 class Foo public String duration public String height 我的 json 数据看起来像 duration 12200000 height 162 现在我想反序列化它 Foo
  • MVC4 WebAPI 的正确架构是什么

    我进行了一些搜索 但没有找到此问题的可用答案 好吧 我的MVC 3架构是这样的 Project EDM contains only the entity framework edmx file and its tt and cs entit
  • 如何使用curl进行POST而不收到HTTP错误422“无法处理的实体”?

    我正在尝试向登录端点发送 POST 请求 我不断收到 HTTP 错误422 Unprocessable Entity 我该如何克服这个错误 以下是我尝试过的一些命令示例 curl v X POST F user email email pr
  • 没有足够的权限访问 S3 中的数据

    我正在关注以下教程入门 控制台 Amazon Personalize https docs aws amazon com personalize latest dg getting started console htmlAmazon Sa
  • 在 angularJS select 指令中设置所选项目

    我在 Angular 的 select 指令中设置所选项目时遇到问题 我不知道这是一个bug还是Angular的设计者有意识的设计 但它确实使 select 指令的用处大大降低 描述 我的应用程序与 REST API 通信以从数据库接收实体
  • 我可以只向某些人分发我的 iPhone 应用程序吗?

    我想开发一个只有我的客户才能使用的特定应用程序 如何限制该应用程序只能由我认可的人下载 Thanks 通过应用程序商店正常分发应用程序 并需要解锁密钥才能运行它 仅将解锁密钥分发给 批准的 用户
  • 升级到 Spring 3.2 后出现 HttpMediaTypeNotAcceptableException

    将 Spring MVC 应用程序升级到 Spring 3 2 后 在访问某些 URL 时出现以下异常 org springframework web HttpMediaTypeNotAcceptableException Could no
  • 暂停 android 下载管理器

    在我的应用程序中 我从服务器下载电影 其中一些非常大 4GB 或更大 我尝试将自己的下载管理器实现为一项服务 但效果并不好 在某些设备上 应用程序会在没有任何通知的情况下自行崩溃 总体而言下载速度似乎太慢 所以 我想使用 Android 的
  • 反序列化对象时出错

    我有一个 JSON 字符串 target FDOL00001 datapoints y 72 564 x 1523858700 target FDOL00001 datapoints y 86 366 x 1523858700 target
  • 在 PHP 中将十六进制颜色转换为 RGB 值

    转换十六进制颜色值的好方法是什么 ffffff转换为单个 RGB 值255 255 255使用 PHP 如果你想将十六进制转换为RGB 你可以使用sscanf https www php net manual en function ssc
  • PHP 服务器端打印

    我过去几个小时一直在谷歌搜索 似乎找不到答案 我确实接近了这个问题 https stackoverflow com questions 1648399 starting serverside print job via php 我的 Win
  • 带单行的 Mercurial 日志

    常规的hg log命令给出每个变更集至少有 4 行的输出 例如 changeset 238 03a214f2a1cf user My Name lt email protected cdn cgi l email protection gt
  • 在联系页面添加地址簿

    我想在我的地址簿中添加contact页面 我想以编程方式执行此操作i e不使用nib files 谁能给我推荐一个不错的教程或示例代码 我已经使用了 iPatel 给出的答案的代码 当我运行时它抛出异常并且应用程序正在终止 感谢致敬 这是编
  • 从 MySQL 中提取所有 JSON 键

    我有一个 JSON 列属性 例如 a 2 b 5 c 3 a 5 d 1 c 7 e 1 f 7 如何从 MySQL 获取所有不同的 顶级 键名 像这样 a b c d e f 谢谢你 测试表 id json col 2 a 2 b 5 3
  • 从毫秒计算周数、天数和小时数

    周围有很多类似的问题 但没有一个解决这个计算 使用 javascript i 很容易找到 ex 的黑白 2 个日期的毫秒数 var mil Math floor new Date 1 1 2012 new Date 1 7 2012 mil
  • OnItemClick 侦听器和单击的视图项的可见性

    我有一个 ListView 其中每个项目都包含一个Textview and ImageView其中imageView is invisible通过单击每个列表视图项目 它将visible我的代码是 Override public void
  • Flask sqlAlchemy 与 Flask_Marshmallow 的验证问题

    使用flask marshmallow进行输入验证 并使用scheme load 我无法捕获模型中 validates装饰器生成的错误 我捕获了资源中的结果和错误 但错误会直接发送给用户 模型 py python from sqlalche
  • 通过上下文菜单运行 cmd 时 PATH 变量不同

    我刚刚花了最后一个小时试图找出为什么我的 PATH 变量没有更新我的 cmd exe 现在我发现它确实更新了 但仅限于某些条件 我通过更新它Win Break gt Change Settings gt Advanced gt Enviro
  • Flink 处理事件太慢

    我使用 Kinesis 数据流作为源 使用 elasticsearch 作为接收器 在 AWS Kinesis Data 分析应用程序中运行 Flink 作业 事件示例 area sessions userId 4450 date 2021