如何将两个不同Spout的输出发送到同一个Bolt?

2023-12-02

我有两个 Kafka Spout,我想将它们的值发送到同一个 Bolt。

是否可以 ?


对的,这是可能的:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new KafkaSpout(...));
b.setSpout("topic_2", new KafkaSpout(...));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");

您也可以使用任何其他分组。

Update:

为了区分消费者bolt中的元组(即topic_1或topic_2),有两种可能性:

1) 您可以使用操作员 ID(如 @user-4870385 建议):

if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
    //do something
} else {
    //do something
}

2)您可以使用流名称(按照@zenbeni的建议)。对于这种情况,两个 Spout 都需要声明命名流,并且 Bolt 需要通过流名称连接到 Spout:

public class MyKafkaSpout extends KafkaSpout {
  final String streamName;

  public MyKafkaSpout(String stream) {
    this.streamName = stream;
  }

  // other stuff omitted

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // compare KafkaSpout.declareOutputFields(...)
    declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
  }
}

构建拓扑,现在需要使用流名称:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");

In MyBolt流名称现在可用于区分输入元组:

// in my MyBolt.execute():
if(input.getSourceStreamId().equals("Topic1")) {
  // do something
} else {
  // do something
}

讨论:

虽然second使用流名称的方法更自然(根据@zenbeni),first更灵活(IHMO)。流名称由spout/bolt直接声明(即在编写spout/bolt代码时);相反,操作员 ID 是在拓扑组合在一起时分配的(即,在喷口/螺栓被放置时)used).

假设我们获得三个螺栓作为类文件(没有源代码)。前两个应该用作生产者,并且都声明具有相同名称的输出流。如果第三个消费者通过流来区分输入元组,则这将不起作用。即使两个给定的生产者 Bolt 声明不同的输出流名称,预期的输入流名称也可能会硬编码在消费者 Bolt 中,并且可能不匹配。因此,它也不起作用。但是,如果消费者 Bolt 使用组件名称(即使它们是硬编码的)来区分传入的元组,则可以正确分配预期的组件 ID。

当然,可以从给定的类继承(如果没有声明final并覆盖declareOutputFields(...)为了分配自己的流名称。然而,这是更多额外的工作要做。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将两个不同Spout的输出发送到同一个Bolt? 的相关文章

随机推荐

  • 触发 valueChanged 事件后从 JTable 中删除行

    我正在使用 ListSelectionListener 来更新我的JTextField countryTxt 来自所选行 import java awt BorderLayout import java awt event ActionEv
  • 解组 Parcelable 时出现问题

    我有一些实现的类可分包其中一些类作为属性相互包含 我正在将课程整理成Parcel在活动之间传递它们 将它们编组到包裹工作正常 但是当我尝试解组它们时 出现以下错误 AndroidRuntime E Caused by android os
  • Subversion 管理的项目的 xcopy ASP.NET 部署

    我目前正在使用 Subversion 来管理我的 ASP NET 网站 我发现每当我将网站上传到服务器时 我都会复制大量隐藏的 svn 文件夹以及其中可能包含的任何内容 有人有任何建议来避免这种情况吗 我并不特别想要生产服务器上的那些隐藏的
  • Java Keystore.getKey() 缓慢,同时密钥存储大小增加

    我正在使用 java 密钥存储来存储和检索加密密钥 当我的密钥存储大小很小时 它的工作速度更快 但是 一旦我的密钥存储大小增加 密钥存储操作就会变慢 我正在linux平台上工作 Java版本Jdk 1 8 和 safenet 作为提供商 我
  • 在 onPause、onStop 和 onDestroy 方法中调用超类方法的正确顺序是什么?为什么?

    我刚刚浏览了 Android 开发人员网站 刷新了 Activity 生命周期 在每个代码示例中 超类方法旁边都有一条注释 上面写着 始终首先调用超类方法 虽然这在创建半周期 onCreate onStart 和 onResume 中有意义
  • 在 WPF 应用程序中禁用 Aero Peek

    我想在我的 WPF 应用程序中禁用 Aero Peek 当用户将鼠标放在 显示桌面 按钮上时 我的应用程序必须可见 我使用这个 PInvoke 签名 Flags public enum DwmWindowAttribute uint DWM
  • React 如何有条件地覆盖 Material-UI 中的 TextField 错误颜色?

    我在用着React Material UI库 我想有条件地覆盖 TextField 的错误颜色 当错误属于某种类型时 我需要将 helperText 边框 文本和所需标记颜色更改为黄色 像这样的东西 否则 我想为所有其他类型的错误保留默认颜
  • Android 从通话记录屏幕中获取条目

    我已经能够使用从互联网上获得的以下代码访问手机的通话记录屏幕 是否可以通过单击获取单个条目的条目详细信息 例如号码 通话时间等 Intent showCallLog new Intent showCallLog setAction Inte
  • 错误:打开失败:ENOENT(没有此类文件或目录)

    我试图创建一个文件来保存相机中的图片 结果发现我无法创建该文件 但我实在找不到错误所在 你能看一下并给我一些建议吗 private File createImageFile File imageFile null String stamp
  • 下载 Google App Engine 数据库

    在 Google App Engine 数据存储区中创建表 实体 后 我创建了 Web 应用程序并将其部署在 Google App Engine 中 我怀疑是否可以下载实体 数据库 启用remote api 将其添加到您的web xml
  • 通过Java修改XPath表达式?

    我有以下类型的 XPath 表达式 id test id and some other attribute some value 我想把它转换成 resourceId android id test id and some other at
  • 如何在 Java 中将字符串时间转换为 Long 毫秒

    我正在尝试使用下面的代码将时间字符串转换为毫秒 因为我会把时间用作倒计时器 问题是时间来自数据库并且是 varchar 类型 我尝试了这段代码 但它没有给我正确的输出 String timeDuration 10 00 for exampl
  • SpreadsheetAddRows 在中等大小的查询上失败

    编辑 我更改了名称 因为有一个类似的问题如何修复添加大型查询时 SpreadSheet 添加行函数崩溃的问题 那里描述了我的问题 所以我更简洁地进行了分述 问题是我的查询结果的电子表格添加行以我认为适中的大小 1600 行 27 列 轰炸了
  • 未找到对象 - XAMPP

    我最近在我的笔记本电脑上安装了 XAMPP windows 7 64 位 我刚刚用 HTML 编写了一个非常基本的 Hello World 脚本 但是 当我尝试运行它时 它显示 未找到对象 错误 404 这就是我正在做的 写了剧本 将其保存
  • 更改 tkinter 窗口边框样式

    我想让我的窗框看起来像旧的 windows 95 风格 现在 当我创建窗口时 tkinter 会自动采用我的操作系统 Windows 10 的样式 有办法改变这个吗 不 没有什么特别可以使窗口边框看起来不同 您唯一的选择是完全删除边框 例如
  • NETLOGO:使用上一个刻度的变量

    是否有一些原语可以使用前一个刻度的变量值 我尝试计算一个代理的 价格 变量 我的意思是使用包含其他代理的 价格 变量但来自上一个刻度的公式 不 NetLogo 中没有内置方法可以执行此操作 您最好的选择可能是创建一个名为以下内 容的变量va
  • 将 ioctl() 调用从 unix 移植到 linux,FIONBIO 出错

    我想使用 ioctl 来获取准备读取的字节数 我这样做的方式是 mysocket socket ioctl mysocket FIONBIO zero connect ioctl mysocket FIONREAD numBytes rea
  • 生成 [-1, 1] 范围内的随机数数组

    我正在尝试在 Fortran 中的 2D 数组中生成 1 1 范围内的随机数 我知道为了生成 1 和 1 范围内的随机数 我必须将数字重新调整为 0 2 但我不知道如何填充矩阵x 10 10 with randomReal 这是代码 pro
  • InstallShield 限量版支持 64 位安装程序吗?

    我刚刚开始学习 InstallShield LE 因为它似乎是唯一 官方 支持的安装程序项目 但我有一个简单的问题 我什至无法得到绝对的答案 ISLE 是否支持构建 64 位安装程序 我之所以这么问 是因为我发现至少有两篇文章说这是不可能的
  • 如何将两个不同Spout的输出发送到同一个Bolt?

    我有两个 Kafka Spout 我想将它们的值发送到同一个 Bolt 是否可以 对的 这是可能的 TopologyBuilder b new TopologyBuilder b setSpout topic 1 new KafkaSpou