Flink广播状态如何初始化?

2023-12-13

我们正在尝试构建一个用例,其中来自流的数据通过计算公式运行,但公式本身也应该(很少)可更新。 通过阅读文档,在我看来,Flink 广播状态非常适合这种情况。

作为实验,我构建了一个简化版本:假设我有一个整数流,以及第二个包含这些整数的乘法因子的流(我可以随意发送值)。第二个流的频率非常低,事件之间很可能是几天或几周。 目前,这两者都被实现为简单的套接字服务器,最终产品将使用 Kafka。

在我的示例应用程序中,这一切都有效,但我留下了一个问题:当系统启动并且广播流上还没有发生任何事情时会发生什么?我可以从哪里获得默认(或上次使用的)系数?在我的示例中,我现在通过硬编码一个值来解决它,但这不是我可以使用的东西。

在我的实验项目中,我对此感到有点困惑,因为 {processElement} 只能获得只读广播状态,但是processBroadcastElement在有可能需要很长时间的更新之前不会被调用。 我的计划是存储数据库中使用的公式,并在作业(重新)开始时以某种方式读取它,但我还没有找到一种方法来完成这项工作。欢迎来自更多知识渊博的人的任何建议,这是我的第一个 Flink 项目,所以我正在努力寻找解决办法。

工作示例在这里:https://github.com/tonvanbart/flink-broadcast-example/tree/mapstate-attemptFlink代码在类中BroadcastState.

提前致谢。


如果系统从检查点/保存点重新启动,那么您就拥有了广播的最后一个因素(通过状态),对吧?所以我认为问题在于它最初启动时要做什么。

如果是这样,那么这是您正在使用的模式的常见问题,您实际上希望阻止整数流,直到从广播流中获取初始值。

目前,常见的解决方案是在运算符中缓冲整数流(使用状态),直到获得初始值,但这可能会导致无界状态,具体取决于整数进入的速度以及需要等待的时间。

您可以尝试的其他方法是包装您的整数源(使其成为委托)并且在您知道某些内容已被广播之前不要发出任何值。例如。使广播的内容进入可查询状态,并定期检查直到该状态存在。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink广播状态如何初始化? 的相关文章

  • 从 iOS 服务检测屏幕开/关

    我正在开发一个作为服务在后台运行的网络监控应用程序 当屏幕打开或关闭时是否可以收到通知 来电 它通过使用以下代码存在于Android中 private void registerScreenOnOffReceiver IntentFilte
  • 在任务管理器之间均匀分配 Flink 运算符

    我正在 15 台机器的裸机集群上构建 Flink 流应用程序原型 我使用带有 90 个任务槽 15x6 的纱线模式 该应用程序从单个 Kafka 主题读取数据 Kafka主题有15个分区 所以我也将源算子的并行度设置为15 但是 我发现 F
  • flink kafka生产者在检查点恢复时以一次模式发送重复消息

    我正在写一个案例来测试 flink 两步提交 下面是概述 sink kafka曾经是kafka生产者 sink stepmysql接收器是否扩展two step commit sink comparemysql接收器是否扩展two step
  • Flink 日志记录限制:如何将日志记录配置传递给 Flink 作业

    我有一个 flink 作业 它使用 logback 作为日志记录框架 因为日志需要发送到logstash 而 logback 有一个 logstash 附加程序 Logstash logback appender Appender 工作正常
  • Apache Beam 计数器/指标在 Flink WebUI 中不可用

    我正在使用 Flink 1 4 1 和 Beam 2 3 0 并且想知道是否可以在 Flink WebUI 或任何地方 中提供可用的指标 如 Dataflow WebUI 中那样 我用过类似的计数器 import org apache be
  • Apache Flink AWS S3 Sink 是否需要 Hadoop 进行本地测试?

    我对 Apache Flink 比较陌生 我正在尝试创建一个简单的项目 将文件生成到 AWS S3 存储桶 根据文档 我似乎需要安装 Hadoop 才能执行此操作 如何设置本地环境来测试此功能 我在本地安装了 Apache Flink 和
  • Flink CEP:对于不同类型的事件,使用哪种方法加入数据流?

    假设我有两种不同类型的数据流 一种提供天气数据 另一种提供车辆数据 我想使用 Flink 对数据进行复杂的事件处理 Flink 1 3 x 中哪种方法是正确的使用方法 我看到了不同的方法 如 Union Connect Window Joi
  • 在 Flink 流中使用静态 DataSet 丰富 DataStream

    我正在编写一个 Flink 流程序 其中我需要使用一些静态数据集 信息库 IB 来丰富用户事件的数据流 对于例如假设我们有一个买家的静态数据集 并且有一个传入的事件点击流 对于每个事件 我们希望添加一个布尔标志来指示事件的执行者是否是买家
  • 将 Docker 容器连接到网络接口/设备而不是 IP 地址

    经过仔细的研究 测试和摆弄 我只能找到通过从 IP 端口转发来将 Docker 容器连接到给定接口的方法 这可以通过添加来完成 p Host IP Host Port Container Port to a docker run命令 我有一
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • 创建具有通用返回类型的 FlinkSQL UDF

    我想定义函数MAX BY接受类型值T和类型的订购参数Number并根据排序从窗口返回最大元素 类型为T 我试过了 public class MaxBy
  • Apache Flink 动态设置 JVM_OPT env.java.opts

    是否可以设置自定义 JVM 选项env java opts提交作业时未在作业中指定conf flink conf yaml file 我问的原因是我想在 log4j 中使用一些自定义变量 我也在 YARN 上运行我的工作 我已经使用 CLI
  • Flink 窗口:聚合并输出到接收器

    我们有一个数据流 其中每个元素都是这种类型 id String type Type amount Integer 我们想要聚合这个流并输出总和amount每周一次 目前的解决方案 Flink 管道示例如下所示 stream keyBy ty
  • .NET 选项将视频文件流式传输为网络摄像头图像

    我有兴趣开发一个应用程序 它允许我从 xml 构建视频列表 包含视频标题 持续时间等 并将该列表作为我的网络摄像头流播放 这意味着 如果我要访问 ustream tv 或在实时通讯软件上激活我的网络摄像头 我的视频播放列表将注册为我的活动网
  • 使用多个 NIC 广播 UDP 数据包

    我正在 Linux 中为相机控制器构建嵌入式系统 非实时 我在让网络做我想做的事情时遇到问题 该系统有 3 个 NIC 1 个 100base T 和 2 个千兆端口 我将较慢的连接到相机 这就是它支持的全部 而较快的连接是与其他机器的点对
  • Flink从hdfs读取数据

    我是 Flink 的新生 我想知道如何从 hdfs 读取数据 有人可以给我一些建议或一些简单的例子吗 谢谢你们 如果您的文件采用文本文件格式 则可以使用 ExecutionEnvironment 对象中的 readTextFile 方法 这
  • 连接广播而不是服务器后AsyncUdpSocket接收数据的小问题

    我的 AsyncUdpSocket 有问题 我曾经连接到服务器 发送一些数据并获得一些响应 现在 由于我不知道服务器的实际地址 我不得不更改代码并将数据发送到广播地址 255 255 255 255 这是我的代码 NSString bcho
  • 我想使用 Flink 的 Streaming File Sink 写入 ORC 文件,但它无法正确写入文件

    我正在从 Kafka 读取数据并尝试将其以 ORC 格式写入 HDFS 文件系统 我使用了他们官方网站上的以下链接参考 但我可以看到Flink为所有数据写入完全相同的内容并生成这么多文件并且所有文件都可以103KB https ci apa
  • Flink Kafka Producer 中的 Exactly-once 语义

    我正在尝试使用 Kafka Source 和 Sink 测试 Flink 的一次性语义 运行 flink 应用程序 只需将消息从一个主题传输到另一个主题 并行度 1 检查点间隔 20 秒 使用 Python 脚本每 2 秒生成递增整数的消息
  • 如何在 Android Q 上将照片广播到图库

    我使用这些代码拍照并将照片广播到画廊 它有效 我发现我的广播功能使用MediaStore Images ImageColumns DATA and Intent ACTION MEDIA SCANNER SCAN FILE 并且这些已被弃用

随机推荐

  • 如何调试 git svn 获取无效的 XML

    我尝试使用将 SVN 存储库转换为 gitgit2svn 我的服务器使用用户名和密码 I did svn2git https example com svn bar 并得到错误 Initialized empty Git repositor
  • CSS 水平滚动溢出与 jQuery 滑块

    我正在尝试设置一个全屏 jquery 滑块 我将项目分为两个步骤 1 css 和 2 js 1 CSS 下面是我正在拍摄的图片 没有固定高度 下面是我到目前为止不起作用的代码
  • 如何在给定时间后重新加载网页的一部分?

    我想做的是 根据特定条件显示一条消息 因此 我将在给定时间后连续读取数据库 并相应地向用户显示消息 但我希望该消息仅在页面的一部分 比如说 DIV 上更新 任何帮助 将不胜感激 谢谢 这可以使用setInterval and jQuery
  • 如何将日期对象添加到 C# 列表? [关闭]

    这个问题不太可能对任何未来的访客有帮助 它只与一个较小的地理区域 一个特定的时间点或一个非常狭窄的情况相关 通常不适用于全世界的互联网受众 为了帮助使这个问题更广泛地适用 访问帮助中心 早些时候我已经发布了这个问题排序日期和时间 答案是mi
  • 虚拟析构函数是继承的吗?

    如果我有一个带有虚拟析构函数的基类 派生类也有声明虚拟析构函数吗 class base public virtual base class derived base public virtual derived 1 derived 2 具体
  • Android 中的水平 ListView?

    是否有可能使ListView水平的 我已经使用图库视图完成了此操作 但所选项目会自动到达屏幕中央 我不希望所选项目位于我单击的同一位置 我该如何解决这个问题 我的想法是设置ListView带有水平滚动条 分享你的想法 根据 Android
  • 正则表达式在 C 中失败,在线测试通过

    当我在 C 中使用应该有效的正则表达式时 它会失败 当我将这个正则表达式粘贴到此处时 https regex101 com并测试它似乎很好 正如预期的那样 clang 3 8 0 include
  • 我的第一个计算器需要帮助

    所以我刚刚开始学习编码 已经学习了大约 1 周 我想尝试制作一个可以执行 和 的计算器 但无法弄清楚如何让用户选择他想要使用的内容 有没有人可以帮助我 这是代码 int x int y Console WriteLine Welcome t
  • C++ 中的令牌及其使用方式

    在 Bjarne Stroustrup 的 使用 C 编程原理与实践 一书中 他解释了如何使用标记来阻止 C 自动使用简单计算器上的运算顺序 他在书中给出的代码不起作用 要么是因为我的愚蠢 要么是因为我遗漏了一些东西 我知道令牌本质上是将代
  • 如何知道 jQuery 选项卡上的选项卡是否已启用?

    我在 jQuery UI Tabs 的 API 中找不到 http docs jquery com UI Tabs 一种知道是否启用某个选项卡的方法 我需要它 因为在我的应用程序的事件中 我只想在禁用某个选项卡时才启用该选项卡 你知道如何从
  • 声明 JavaScript 数组时“{}”和“[]”有什么区别?

    声明 JavaScript 数组时 和 有什么区别 通常我声明像 var a 将数组声明为的含义是什么var a 似乎没有人解释数组和对象之间的区别 正在声明一个数组 正在声明一个对象 数组具有对象的所有功能以及附加功能 您可以将数组视为对
  • 使用不同的列/线宽度绘制热图

    我正在模拟一些东西 想找出两个参数的影响 因此 我改变它们 并查找每对参数值的结果 并得到如下结果 0 1000 2000 3000 4000 5000 0 13 2 14 8 19 9 25 5 27 3 1000 21 3 25 9 3
  • 什么时候我们需要多个Dispatcher Servlet?

    哪些场景下我们需要多个Dispatcher Servlets 谁能告诉我的用例multiple Dispatcher Servlets 我认为每个用例都可以通过使用来解决single Dispatcher Servlet 来自文档 Web
  • 由于错误,无法使用特定项目的 gh-pages 进行部署:无法生成 sh:没有这样的文件或目录

    继续报错 错误 无法生成 sh 没有这样的文件或目录 致命 无法分叉 该项目构建正确 但在尝试部署我的网站时失败 网站是在我的 Windows 8 电脑上使用 create react app 使用命令 gh pages d build 创
  • 如何使用图标呈现数据绑定的 WinForms DataGridView 列?

    在我的 C Windows 窗体应用程序中 我有一个DataGridView绑定到一个BindingList
  • 分页链接中重复参数的问题?

    问题是 当我加载第 2 页时 URL 变为 这很好 但是当我翻到第 3 页时 它变成了 依此类推 每次从分页链接加载新页面时 它都会添加一个当前页面参数 我想知道这个问题如何解决 这是分页功能的一些代码 build the paginati
  • 如何在 Rails 6 中使用自定义 jQuery

    我已经思考这个问题好几天了 由于某种原因 我的语义用户界面 jQuery 无法工作 这就是我所做的 在我的 webpack environment js 上 const environment require rails webpacker
  • gridview中的Oracle数据库表

    我想从 Oracle 数据库中的查询中获取结果并将其放入网格视图中 现在我的问题是 我不知道如何在网格视图中输出它 我正在使用工具箱中的 gridview 并且我的 Oracle 连接正常 我也有权利SELECT查询 我可以将其输出到列表框
  • Java 每 0.5 秒在动画中移动 jlabel

    我想要简单的动画每 0 5 秒设置一次位置 但它不会仅在循环结束时设置位置动画 int x 1 整数y 1 while x lt 100 jLabel1 setLocation x y x x 10 y y 10 try Thread sl
  • Flink广播状态如何初始化?

    我们正在尝试构建一个用例 其中来自流的数据通过计算公式运行 但公式本身也应该 很少 可更新 通过阅读文档 在我看来 Flink 广播状态非常适合这种情况 作为实验 我构建了一个简化版本 假设我有一个整数流 以及第二个包含这些整数的乘法因子的