风暴集群重复元组

2024-02-03

目前我正在开展一个项目,在该项目中我在四台 Unix 主机上设置了一个 Storm 集群。

拓扑本身如下:

  1. JMS Spout 侦听 MQ 以获取新消息
  2. JMS Spout 解析然后将结果发送到 Esper Bolt
  3. 然后,Esper Bolt 处理该事件并将结果发送到 JMS Bolt
  4. 然后,JMS Bolt 将消息发布回不同主题的 MQ 上

我意识到 Storm 是一个“至少一次”框架。但是,如果我收到 5 个事件并将这些事件传递到 Esper Bolt 进行计数,那么出于某种原因,我会在 JMS Bolt 中收到 5 个计数结果(所有值都相同)。

理想情况下,我想收到一个结果输出,有什么方法可以告诉 Storm 忽略重复的元组吗?

我认为这与我设置的并行性有关,因为当我只有一个线程时它会按预期工作:

 TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2);
    builder.setBolt("esperBolt", new EsperBolt.Builder().build(),6).setNumTasks(6)
            .fieldsGrouping(JMS_DATA_SPOUT,new Fields("eventGrouping"));
    builder.setBolt("jmsBolt", new JMSBolt(),2).setNumTasks(2).fieldsGrouping("esperBolt", new Fields("eventName"));

我还看到了 Trident 的“恰好一次”语义。然而,我并不完全相信这会解决这个问题。


如果您的 Esper Bolt 没有在其execute() 方法末尾显式 ack() 每个元组或使用 iBasicBolt 实现,那么它接收到的每个元组最终将在超时后由您的原始 JMS Spout 重播。

或者,如果您要求 Bolt“仅处理唯一消息”,请考虑将此处理行为添加到您的execute() 方法中。它可以首先检查本地 Guava 缓存的元组值的唯一性,然后进行相应的处理。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

风暴集群重复元组 的相关文章

  • 如何使用固定数量的工作线程实现简单线程

    我正在寻找最简单 最直接的方法来实现以下内容 主程序实例化worker 执行任务的线程 Only n任务可以同时运行 When n已达到 不再有工人 开始直到计数 正在运行的线程回落到下方n 我觉得Executors newFixedThr
  • 如何将变量的全部内容发送/导出到文本文件/xml 文件/剪贴板?

    我想将实例的内容 最好以树形形式 发送给某人 打印屏幕是不行的 因为类太复杂了 您需要将输出转回实例吗 在这种情况下 其他答案都是正确的 如果您只想手动检查实例的内容 理想情况下您的类都将实现toString 你可以将其重定向到一个文件 如
  • 在Java中将*s打印为三角形?

    我在 Java 课程中的作业是制作 3 个三角形 一份左对齐 一份右对齐 一份居中 我必须为什么类型的三角形制作一个菜单 然后输入需要多少行 三角形必须看起来像这样 到目前为止 我能够完成左对齐的三角形 但我似乎无法获得其他两个 我尝试用谷
  • 将一种类型的对象声明为另一种类型的实例有什么好处? [复制]

    这个问题在这里已经有答案了 可能的重复 Base b2 new Child 是什么意思 表示 https stackoverflow com questions 4447924 what does base b2 new child sig
  • 如何使用 Maven Failsafe 插件运行 JUnit 5 集成测试?

    当我运行命令时 Maven Failsafe 插件找不到我的 JUnit 5 集成测试mvn clean failsafe integration test 尽管它可以找到文件 我有junit jupiter api and junit j
  • 如何在log4j的配置文件中为文件附加器提供环境变量路径

    我有一个log4j xml配置文件 和一个RollingFileAppender我需要提供用于存储日志的文件路径 问题是我的代码将作为可运行的 jar 部署在 Unix 机器上 所以如果我传递这样的参数 value logs message
  • 如何在Java中优雅地处理SIGKILL信号

    当程序收到终止信号时如何处理清理 例如 我连接到一个应用程序 希望任何第三方应用程序 我的应用程序 发送finish注销时的命令 发送该信息最好说什么finish当我的应用程序被破坏时的命令kill 9 编辑1 kill 9无法被捕获 谢谢
  • JavaFX 2.0 FXML 子窗口

    经过多次搜索我发现了这个问题如何创建 javafx 2 0 应用程序 MDI https stackoverflow com questions 10915388 how to create a javafx 2 0 application
  • 迁移到Java 9或更高版本时是否需要切换到模块?

    我们目前正在从 Java 8 迁移到 Java 11 但是 升级我们的服务并没有我们预期的那么痛苦 我们基本上只需要更改我们的版本号build gradle文件和服务都顺利启动并运行 我们升级了库以及使用这些库的 微 服务 到目前为止没有问
  • 如何将现有的 SQLite3 数据库导入 Room?

    好吧 我在桌面上使用 SQLite3 创建了一个只需要读取的某些信息的数据库 我正在制作的应用程序不需要在此表中插入或删除信息 我在 Room 数据库层上做了相当多的谷歌搜索 所有文档都需要在构建应用程序时在 Room 中创建一个新的数据库
  • 在 Java 中创建 T 的新实例

    在C 中 我们可以定义一个泛型class A
  • Java - JPanel 内有边距和 JTextArea

    我想创建这样的东西 主面板有其边距 x 并且 TextArea 位于该面板的中心 几乎填满了面板 底部是另一个具有自定义尺寸 高度 y 的面板 可以使用某些快捷方式将其切换为可见和不可见 底部面板有 FlowLayout 和几个元素 问题是
  • 多线程——更快的方法?

    我有一堂有吸气剂的课程getInt 和一个二传手setInt 在某个领域 比如说领域 Integer Int 一个类的一个对象 比如说SomeClass The setInt 这里是同步的 getInt isn t 我正在更新的值Int来自
  • 配置jmxremote时无法正常停止tomcat

    我添加了一个jmxremotecatalina bat中的配置 set JAVA OPTS Dcom sun management jmxremote port 9004 Dcom sun management jmxremote ssl
  • 在循环中按名称访问变量

    我正在开发一个 Android 项目 并且有很多可绘制对象 这些绘图的名称都类似于icon 0 png icon 1 png icon 100 png 我想将这些可绘制对象的所有资源 ID 添加到整数 ArrayList 中 对于那些不了解
  • Java和手动执行finalize

    如果我打电话finalize 在我的程序代码中的一个对象上 JVM当垃圾收集器处理这个对象时仍然再次运行该方法吗 这是一个大概的例子 MyObject m new MyObject m finalize m null System gc 是
  • android 中的 java.net.URL ..新手问题

    我是java新手 正在尝试android开发 以下代码生成 malformedURLException 有人可以帮助我识别异常吗 任何提示都会非常有帮助 package com example helloandroid import and
  • JPA 将 BigDecimal 作为整数保存在数据库中

    我在数据库中有这个字段 ITEMCOST NUMERIC 13 DEFAULT 0 NOT NULL 在JAVA中 Entity中的字段定义如下 Column name ITEMCOST private BigDecimal itemCos
  • CXF:通过 SOAP 发送对象时如何排除某些属性?

    我使用 Apache CXF 2 4 2 当我将数据库中的某个对象返回给用户时 我想排除一些属性 例如密码 我怎样才能做到这一点无需创建临时的班级 有这方面的注释吗 根据 tomasz nurkiewicz 评论我应该使用 XmlTrans
  • 如何清理 Runtime.exec() 中使用的用户输入?

    我需要通过命令行调用自定义脚本 这些脚本需要很少的参数并在 Linux 机器上调用 当前版本容易出现各种shell注入 如何清理用户给出的参数 参数包括登录名和路径 Unix 或 Windows 路径 用户应该能够输入任何可能的路径 该路径

随机推荐

  • 当属性设置太早时,UIImageView 不显示图像

    我有一个图像想要显示在UIView 在界面生成器中 UIView是根和UIImageView是它的孩子 视图连接到视图控制器的视图出口 图像视图连接到图像视图出口 property nonatomic retain IBOutlet UII
  • Amazon WorkMail 账户无法接收电子邮件

    我之前设置了 AWS WorkMail 组织和电子邮件地址 并且正在使用托管在 Route 53 上的自定义域 这已成功运行 但是现在我已经创建了第二个 WorkMail 地址 我无法接收到它的电子邮件 尽管我可以从它发送电子邮件 我收到以
  • 使用 laravel 和 vuejs 导出 Excel

    我喜欢使用 Laravel 和 Vuejs 导出 Excel 不知何故 代码返回真实值 但无法下载 Excel 文件 如果我执行正常请求 它将下载文件 但在 axios 请求中 它不会导出文件 我正在使用 php artisan make
  • 复制另一个账户拥有的 AMI 时出现问题

    我正在尝试从一个账户 A 复制一个由另一个账户 B 拥有的 AMI 我之前已经构建过此 AMI 帐户 A 具有 AMI 的启动权限 但当我复制时 收到以下错误消息 Images from AWS Marketplace cannot be
  • 在 R 中使用 foreach 读取全局变量

    我正在尝试使用 RStudio 在具有 16 核 CPU 和 64 GB RAM 的 Windows 服务器上运行 foreach 循环 使用 doParallel 包 工作 进程从 for 循环外部复制所有变量 通过运行 foreach
  • 如何生成包含 JSON 结构的下载文件?

    我的控制器中有这个方法 public IActionResult Download return Json context Users 我注意到它生成了正确的 JSON 结构but它在浏览器中呈现为普通文本 我希望将其下载到客户的计算机上
  • 输出模板内的嵌套类

    我试图重载 ostream 运算符以允许输出模板内的嵌套类 但是 编译器无法将实际的函数调用绑定到我的重载 template
  • iOS:(Swift)如何显示距当前位置的距离并在注释副标题上存在注释

    我目前正在使用 Swift 语言开发基于 iOS 的地图应用程序 我想要一个建议 因为在我在地图视图上绘制所有引脚之后 我使用名为 Alamofire 的 JSON 框架从服务器接收数据 我希望地图上所有注释的副标题显示距用户当前位置的距离
  • Fabricjs,旋转对象后是否可以获得虚拟矩形的左、上、右位置

    旋转物体后是否可以获得虚拟矩形的左 上 右位置 您正在寻找的是边界矩形你的对象 获取边界矩形 忽略Vpt Object 返回对象边界矩形的坐标 左 上 宽 高 该框旨在与画布轴对齐 返回 具有 left top width height 属
  • 在源代码中保护密码?

    我的代码中有一个密码 需要连接到 sftp 服务器 混淆 或隐藏在代码中的最佳方法是什么 Thanks 不要将密码存储在源代码中 而是将其存储在 App Config 或 Web Config 内的受保护部分中 See 使用受保护的配置加密
  • C++ 在模板类中重载运算符<< [重复]

    这个问题在这里已经有答案了 可能的重复 为模板类重载友元运算符 https stackoverflow com questions 4660123 overloading friend operator for template class
  • 回历日期格式

    我有一些从网站下载的数据 其中一列包含回历日期 为了使该列成为正确的日期列 我应用了以下格式 但问题是 除非我输入单元格 通过双击或 F2 然后按 Enter 键 否则它不会被视为日期并向右对齐 因为行数很大 所以我使用的方法不实用 我尝试
  • php 中 HTTPRequest 的替代方案

    我在 php 脚本中使用 HttpRequest 类 但是当我将此脚本上传到托管提供商的服务器时 执行它时出现致命错误 致命错误 在第 87 行 中找不到类 HttpRequest 我相信原因是因为我的托管提供商的 php ini 配置不包
  • android模拟器的IP地址是多少,[重复]

    这个问题在这里已经有答案了 android模拟器的IP地址是多少 根据我在互联网上找到的几个信息 127 0 0 1 10 0 0 2或10 0 015从Web服务器 apache 调用模拟器 模拟器的端口是固定的吗 10 0 2 1 路由
  • 如何正确广播 NumPy 数组的数组索引

    简短的介绍 我有两个 numpy 数组 data data shape是一个包含 X 个条目的元组 indices indices shape是元组 X Y indices基本上是一个索引数组的列表 沿第二个维度的数组指定相应维度的索引列表
  • 在新 API 的两个单独文件中初始化 Firebase 引用

    我已升级到新的 API 但不知道如何在两个单独的文件中初始化 Firebase 引用 CASE 1 1st file var config firebase initializeApp config var rootRef firebase
  • ArrayList初始化相当于数组初始化[重复]

    这个问题在这里已经有答案了 我知道您可以在实例化期间初始化数组 如下所示 String names new String Ryan Julie Bob 有没有办法用 ArrayList 做同样的事情 或者我必须单独添加内容array add
  • 以编程方式运行 Ansible playbook?

    我有一个 python 应用程序 它调用下面的代码 并计划通过 Ansible API 以编程方式运行 Ansible playbook 而不是使用子进程之类的东西 下面的代码运行但实际上似乎没有执行任何内容 获取结果的输出只会给我一个看起
  • 如何在 bash 脚本中自动按 [ENTER] 继续

    我有一个 bash 脚本 可以帮助自动安装一些应用程序 One app requests that I press ENTER to continue or CTRL C to cancel How can I automate my sc
  • 风暴集群重复元组

    目前我正在开展一个项目 在该项目中我在四台 Unix 主机上设置了一个 Storm 集群 拓扑本身如下 JMS Spout 侦听 MQ 以获取新消息 JMS Spout 解析然后将结果发送到 Esper Bolt 然后 Esper Bolt