目前我正在开展一个项目,在该项目中我在四台 Unix 主机上设置了一个 Storm 集群。
拓扑本身如下:
- JMS Spout 侦听 MQ 以获取新消息
- JMS Spout 解析然后将结果发送到 Esper Bolt
- 然后,Esper Bolt 处理该事件并将结果发送到 JMS Bolt
- 然后,JMS Bolt 将消息发布回不同主题的 MQ 上
我意识到 Storm 是一个“至少一次”框架。但是,如果我收到 5 个事件并将这些事件传递到 Esper Bolt 进行计数,那么出于某种原因,我会在 JMS Bolt 中收到 5 个计数结果(所有值都相同)。
理想情况下,我想收到一个结果输出,有什么方法可以告诉 Storm 忽略重复的元组吗?
我认为这与我设置的并行性有关,因为当我只有一个线程时它会按预期工作:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2);
builder.setBolt("esperBolt", new EsperBolt.Builder().build(),6).setNumTasks(6)
.fieldsGrouping(JMS_DATA_SPOUT,new Fields("eventGrouping"));
builder.setBolt("jmsBolt", new JMSBolt(),2).setNumTasks(2).fieldsGrouping("esperBolt", new Fields("eventName"));
我还看到了 Trident 的“恰好一次”语义。然而,我并不完全相信这会解决这个问题。
如果您的 Esper Bolt 没有在其execute() 方法末尾显式 ack() 每个元组或使用 iBasicBolt 实现,那么它接收到的每个元组最终将在超时后由您的原始 JMS Spout 重播。
或者,如果您要求 Bolt“仅处理唯一消息”,请考虑将此处理行为添加到您的execute() 方法中。它可以首先检查本地 Guava 缓存的元组值的唯一性,然后进行相应的处理。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)