如何统计Apache Flink在给定时间窗口内处理的记录数

2023-12-20

在flink中定义一个时间窗口后如下:

val lines = socket.timeWindowAll(Time.seconds(5))

如何计算该特定 5 秒窗口内的记录数?


执行计数聚合的最有效方法是ReduceFunction。然而,reduce有输入和输出类型必须相同的限制。所以你必须将输入转换为Int在应用窗口之前:

val socket: DataStream[(String)] = ???

val cnts: DataStream[Int] = socket
  .map(_ => 1)                    // convert to 1
  .timeWindowAll(Time.seconds(5)) // group into 5 second windows
  .reduce( (x, y) => x + y)       // sum 1s to count
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何统计Apache Flink在给定时间窗口内处理的记录数 的相关文章

随机推荐

  • 简单的 UIView drawRect 没有被调用

    我不明白这里出了什么问题 我有一个非常简单的 UIViewController 和一个非常简单的 viewDidLoad 方法 void viewDidLoad NSLog making game view GameView v GameV
  • Python-获取目录中所有文件和子文件夹的相对路径

    我正在寻找一种获取特定文件夹内文件和 子 文件夹的相对路径的好方法 对于我目前使用的方法os walk 它正在工作 但对我来说似乎并不 Pythonic myFolder myfolder fileSet set yes I need a
  • 手动设置Session过期时间-CodeIgniter

    如何在 codeigniter 中动态设置会话过期时间 例如 如果用户登录并具有以下角色 admin 过期时间应该比没有权限的用户登录时要长admin role Thanks 您可以通过在配置文件中增加此变量来更新会话过期时间 config
  • 我应该对不透明对象使用整数 ID 还是指针?

    我正在一些图形 API DirectX9 和 DirectX11 之上编写一个抽象层 我想听听您的意见 传统上 我会为每个我想要抽象的概念创建一个基类 因此 在典型的 OO 方式中 我将拥有一个 Shader 类和 2 个子类 DX9Sha
  • Django 动态模型字段

    我正在研究一个多租户应用程序中 一些用户可以定义自己的数据字段 通过管理员 以收集表单中的附加数据并报告数据 后一点使得 JSONField 不是一个很好的选择 所以我有以下解决方案 class CustomDataField models
  • 如何使 AWS EC2 上的 Tomcat 从外部本地主机可用

    我正在尝试在 AWS Linux 服务器上运行 Tomcat 我已经安装了 Tomcat 并从命令行测试了它以确保它正常工作 但我无法从另一台计算机访问它 细节 该实例在安全组中开放了用于 HTTP 的 80 传入端口 我已经通过远程登录到
  • 重写解析表达式语法(PEG),无需左递归

    Using https github com JetBrains Grammar Kit https github com JetBrains Grammar Kit如何在没有左递归的情况下重写语法 grammar exprs exprs
  • Bison 语义谓词语法错误,杂散“#”

    我正在尝试使用 Bison 的语义谓词 https www gnu org software bison manual html node Semantic Predicates html Semantic Predicates功能 但我在
  • Google 云消息服务器的 IP 地址

    我即将在服务器上部署 GCM 的实现 并且需要通过 IP 打开适当的防火墙 有谁知道在哪里可以找到 android googleapis com 的 IP 地址范围 Thanks 我发现 android googleapis com 的 I
  • 无法在 Windows 10 上使用 Docker Toolbox 共享/挂载卷

    我正在尝试使用 docker 设置我的项目 我在 Windows 10 家庭版上使用 Docker Toolbox 我对码头工人很陌生 据我了解 我必须将文件复制到新容器并添加一个卷 以便我可以保留 gulp 所做的更改 这是我的文件夹结构
  • Mongodb count 与 findone

    我的问题是 有一组用户 我试图找到 用户是否使用 id xxx has somevalue gt 5 我想知道 使用什么会更快find count gt 0 or findOne null 或者也许还有其他更快 更好的方法 查询时间之间的差
  • 使用 guice 构建带有注入类的框架,初始化的正确方法是什么?

    我正在尝试编写一个框架 其中任意 bean 类都通过我的 API 中的类注入 并且它们可以与这两个类交互 也可以根据定义的注释触发回调 这是一个示例 bean Experiment static class TestExperiment p
  • Android NDK:您确定您的 NDK_MODULE_PATH 变量已正确定义吗?

    最近 3天前 开始学习Android Studio 我购买了一个 Eclipse 游戏项目来玩 但出现错误 当我修复该错误时 我收到一个新错误 目前的错误如下 构建命令失败 执行过程时出错 C Users user AppData Loca
  • 不考虑回到起点的旅行商问题(TSP)的问题名称是什么?

    我想知道 TSP 的问题名称是什么 不考虑返回起点的方式 以及解决这个问题的算法是什么 我研究了最短路径问题 但这不是我想要的 问题只是从 2 个指定点找到最短路径 但我要寻找的是我们给出n个点并且只输入1个起点的问题 然后 找到经过所有点
  • 在应用程序和扩展程序之间共享捆绑资源

    我的照片共享扩展计划使用相同的设计资源 用于导航和向照片添加 图章 贴纸 如应用程序沙盒设计指南中所述 沙盒应用程序组 需要共享文件和其他信息的可以请求容器 目录作为其权利的一部分 这些目录是存放的 在 Library Group Cont
  • 如何获取 XGBClassifier 的预测 p 值?

    我想知道 XGBClassifier 对它所做的每个预测的置信度如何 有可能有这样的价值吗 或者 predict proba 是否已经间接成为模型的置信度 你的直觉确实是正确的 predict proba返回每个示例属于给定类别的概率 来自
  • 读取注册表项的性能?

    我想知道通过标准 C 库从 Windows 注册表读取注册表值需要多长时间 以毫秒为单位 在这种情况下 我正在阅读一些代理设置 我应该期望什么数量级的值 有没有好的基准数据可用 我正在运行 WS2k8 R2 amd64 加分点 操作系统 s
  • Django REST框架范围过滤器

    如何在 Django REST Framework 中对日期和数字进行范围过滤 其他过滤器 lt gt 等 工作正常 我尝试了很多变体 例如 import rest framework filters as filters class Or
  • 如何在 PHP 中将查询字符串转换为斜杠 URL?

    我想将 URL 转换为 http localhost projectname api index php type login to http localhost projectname api login Convert在这里不是一个常用
  • 如何统计Apache Flink在给定时间窗口内处理的记录数

    在flink中定义一个时间窗口后如下 val lines socket timeWindowAll Time seconds 5 如何计算该特定 5 秒窗口内的记录数 执行计数聚合的最有效方法是ReduceFunction 然而 reduc