Apache Kafka - 主题/分区上的 KafkaStream

2024-01-08

我正在为大容量高速分布式应用程序编写 Kafka Consumer。我只有一个主题,但收到的消息率非常高。拥有多个分区来服务更多消费者将适合此用例。最好的消费方式是拥有多个流读取器。根据文档或可用示例,ConsumerConnector 给出的 KafkaStream 数量基于主题数量。想知道如何[基于分区]获得多个 KafkaStream 读取器,以便我可以为每个流跨一个线程,或者在多个线程中从同一个 KafkaStream 读取将实现从多个分区的并发读取?

非常感谢任何见解。


想分享我从邮件列表中找到的内容:

您在主题映射中传递的数字控制将主题划分为多少个流。在你的例子中,如果你传入 1,所有 10 个分区的数据将被输入到 1 个流中。如果传入2,则2个流中的每一个都会从5个分区获取数据。如果你传入 11 个,其中 10 个将分别从 1 个分区获取数据,而 1 个流将什么也得不到。

通常,您需要在其自己的线程中迭代每个流。这是因为如果没有新事件,每个流都可能永远阻塞。

示例片段:

topicCount.put(msgTopic, new Integer(partitionCount));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = connector.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(msgTopic);

for (final KafkaStream stream : streams) {
    ReadTask task = new ReadTask(stream, msgTopic);
    task.addObserver(this.msgObserver);
    tasks.add(task); executor.submit(task);
}

参考:http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCA+sH[电子邮件受保护]%3E http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCA+sHyy_Z903dOmnjp7_yYR_aE2sRW-x7XpAnqkmWaP66GOqf6w@mail.gmail.com%3E

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Kafka - 主题/分区上的 KafkaStream 的相关文章

  • 用 @DataJpaTest 注释的测试不是用 @Autowired 注释的自动装配字段

    我有一个 Spring Boot 应用程序 其中包含 Spring Data Jpa 存储库 我需要围绕这个存储库运行单元 或组件 测试 我对 Spring Data Jpa 没有太多经验 这是我的测试 这很简单 我无法让它通过 impor
  • 如何打印整个字符串池?

    我想打印包含文字的整个字符串池String使用添加的对象intern 就在垃圾收集之前 JDK有没有隐式的方法来进行这样的操作 我们如何检查字符串池 EDIT The comment suggests that there may be a
  • 使用 Checkstyle Plugin 时从插件调用代码时出现问题:“org.eclipse.jface”

    我正在尝试在 Rational Software Architect 7 0 0 4 上使用 eclipse cs 插件 我最近卸载了旧的 beta2 版本并安装了 beta3 插件本身按照之前的配置工作 但是每当我尝试通过 Windows
  • JTree 节点不会被直观地选择

    不知何故 我无法为我的 JTree 节点启用 选择突出显示 我正在我的项目中使用自定义单元格渲染器 这很可能导致此问题 这是完整的渲染器类代码 protected class ProfessionTreeCellRenderer exten
  • 无法加载 jar 文件的主类

    我使用 Eclipse IDE 开发了一个应用程序 创建应用程序后 我以 jar 格式导出项目 当我尝试运行此 jar 文件时 出现错误 无法加载主类 请帮忙 当您将项目导出为 jar 时 请参阅此所以问题 https stackoverf
  • Java 泛型/类型调度问题

    考虑以下程序 import java util List import java util ArrayList public class TypeTest public static class TypeTestA extends Type
  • PropertySources 中各种源的优先级

    Spring引入了新的注释 PropertySources对于所有标记为的类 Configuration since 4 0 需要不同的 PropertySource作为论证 PropertySources PropertySource c
  • Spring Stomp over Websocket:流式传输大文件

    我的SockJs客户端在网页中 发送帧大小为16K的消息 消息大小限制决定了我可以传输的文件的最大大小 以下是我在文档中找到的内容 Configure the maximum size for an incoming sub protoco
  • 使用 Java 在浏览器中下载 CSV 文件

    我正在尝试在 Web 应用程序上添加一个按钮 单击该按钮会下载一个 CSV 文件 该文件很小 大小仅约 4KB 我已经制作了按钮并附加了一个侦听器 文件也准备好了 我现在唯一需要做的就是创建单击按钮时下载 csv 文件的实际事件 假设 fi
  • Java:VM 如何在 32 位处理器上处理 64 位“long”

    JVM 如何在 32 位处理器上处理 64 位的原始 long 在多核 32 位机器上可以并行利用多个核心吗 64 位操作在 32 位机器上慢了多少 它可能使用多个核心来运行不同的线程 但不会并行使用它们进行 64 位计算 64 位长基本上
  • 如何在java中使jpeg无损?

    有没有人可以告诉我如何使用编写 jpeg 文件losslessjava中的压缩 我使用下面的代码读取字节来编辑字节 WritableRaster raster image getRaster DataBufferByte buffer Da
  • 为什么 ConcurrentHashMap::putIfAbsent 比 ConcurrentHashMap::computeIfAbsent 更快?

    使用 ConcurrentHashMap 我发现computeIfAbsent 比putIfAbsent 慢两倍 这是简单的测试 import java util ArrayList import java util List import
  • 如何在没有 Control.Invoke() 的情况下从后台线程修改控件属性

    最近 我们遇到了一些旧版 WinForms 应用程序 我们需要更新一些新功能 在专家测试该应用程序时 发现一些旧功能被破坏 无效的跨线程操作 现在 在您认为我是新手之前 我确实有一些 Windows 窗体应用程序的经验 我不是专家 但我认为
  • 从 html 页面和 javascript 调用 java webservice

    我正在尝试从 javascript 调用 java 实现的 Web 服务 使用 NetBeans IDE 我读过很多关于 jQuery 和 AJAX 的内容 但我似乎无法掌握它 假设我的 Web 服务 WSDL 位于 http localh
  • 在 AKKA 中,对主管调用 shutdown 是否会停止其监督的所有参与者?

    假设我有一位主管连接了 2 位演员 当我的应用程序关闭时 我想优雅地关闭这些参与者 调用supervisor shutdown 是否会停止所有参与者 还是我仍然需要手动停止我的参与者 gracias 阻止主管 https github co
  • ExceptionHandler 不适用于 Throwable

    我们的应用程序是基于 Spring MVC 的 REST 应用程序 我正在尝试使用 ExceptionHandler 注释来处理所有错误和异常 I have ExceptionHandler Throwable class public R
  • 来自客户端的超时 Web 服务调用

    我正在使用 RestEasy 客户端调用网络服务 一项要求是 如果调用运行时间超过 5 秒 则中止 超时调用 我如何使用 RestEasy 客户端实现这一目标 我只看到服务器端超时 即如果在一定时间内未完成请求 Rest Easy 网络服务
  • 我想要一个 Java 阿拉伯语词干分析器

    我正在寻找阿拉伯语的 Java 词干分析器 我找到了一个名为 AraMorph 的库 但它的输出是无法控制的 并且它会形成不需要的单词 还有其他阿拉伯语词干分析器吗 这是新的阿拉伯语词干分析器 Assem 的阿拉伯语轻词干分析器 http
  • 如何将实例变量传递到 Quartz 作业中?

    我想知道如何在 Quartz 中外部传递实例变量 下面是我想写的伪代码 如何将 externalInstance 传递到此作业中 public class SimpleJob implements Job Override public v
  • Java中有类似分支/跳转表的东西吗?

    Java有类似分支表或跳转表的东西吗 分支表或跳转表是 根据维基百科 http en wikipedia org wiki Branch table 用于描述使用分支指令表将程序控制 分支 转移到程序的另一部分 或可能已动态加载的不同程序

随机推荐

  • bash 命令输出作为参数

    假设命令alpha产生这个输出 a b c d 如果我运行命令 beta alpha then beta将使用四个参数执行 a b c and d 但是如果我运行命令 beta alpha then beta将使用一个参数执行 a b c
  • UIWebView 和 Safari 比较

    UIWebView 是否使用与 Mobile Safari 相同的 JavaScript 引擎 另外 UIWebView 是否像 Mobile Safari 一样支持所有 HTML5 功能 我特别关心 Web SQL 和 Web Worke
  • Android 短划线/虚线问题?

    当我使用时Android 虚线 its 在小屏幕上运行良好 but 不适用于 Samsung S3 设备及更高版本 截图 And 可绘制 dashline xml
  • 如何在 VSTS 中使用 NUnit?

    我正在尝试在 Visual Studio Team System 中使用 NUnit 3 但他们似乎让这变得非常困难 我已在构建过程和高级执行选项中添加了测试程序集步骤 gt 自定义测试适配器的路径 我已按照帮助中的建议放入 NUnitVi
  • 检查列表是否为空(Raku)

    常见问题解答 在 Raku 中如何检查列表是否为空 还有比以下更惯用的方法吗 my l say l elems 0 say l say l Bool The 名单上的文档 https docs perl6 org type List推荐智能
  • 收到警告“[org.springframework.web.servlet.PageNotFound](默认任务-1)没有 GET /ProjectFE/ 的映射”

    正如标题所示 我收到一个错误 org springframework web servlet PageNotFound default task 1 No mapping for GET ProjectFE 我应该如何解决这个问题 另外 代
  • Angular:将项目推送到列表不会更新视图

    当我将项目推送到数组时 视图不会刷新列表 table tbody tr td product Code td td product Name td tr tbody form
  • 在golang中将chan转换为non chan

    是否可以让函数funcWithNonChanResult有如下接口 func funcWithNonChanResult int 如果我想让它使用函数funcWithChanResult与接口 func funcWithChanResult
  • RAD(Rational Application Developer)——清理、发布、重新启动

    我目前在 websphere 6 1 环境中使用 Spring 3 0 框架 我使用的 IDE 是 RAD Rational Application Developer 7 5 昨晚我正在解决一个问题 我发现在工作区进行更改后 我的代码并未
  • 用于模板更新的 Meteor 反应函数

    我想解决 Meteor js 中的以下问题 我有一个 HTML 元素 仅当用户登录时才会出现在模板中 if currentUser
  • 打印大文本时,文本行被水平切成两半

    我有一个 div 偶尔包含很长的文本 一个 div 中不止一页 这需要打印 并且在打印时 当页面制动时 某些行会被水平分割 尝试了我在 Stack Overflow 和其他来源上能找到的所有内容 我已经尝试了分页符的所有组合 尝试将正文边距
  • 对于带有非 TextView 视图的 ExpandableListView 使用什么适配器?

    我有一个 ExpandableListView 我想在其中包含 TextView 以外的控件 显然 SimpleExandableListViewAdapter 假定所有控件都是 TextView 如果不是 则会生成强制转换异常 推荐的解决
  • 将 NSAttributedString 和 NSTextAttachment 保存到文件中。如何?

    我有一个NSTextView 其中可能包含富文本或带有图像的富文本NSTextAttachment 我添加附件的方法如下 NSImage image NSImage imageNamed image NSTextAttachmentCell
  • 导航栏上有超过 1 个 rightBarButtonItem

    我想在导航栏上有两个 rightBarButtonItems 一个用于编辑 另一个用于添加 显然我无法使用 Interface Builder 来实现它 有人知道如何以编程方式做到这一点吗 谢谢 它现在包含在 iOS 5 中 称为 righ
  • ForkJoinPool#awaitQuithesis 实际上是如何工作的?

    我有下一个实施RecursiveAction 此类的唯一目的 是从 0 到 9 打印 但如果可能的话 从不同的线程打印 public class MyRecursiveAction extends RecursiveAction priva
  • 有没有办法使用 facebook graphQL 接收扁平化数据?

    我刚刚发现 还有像 GraphQL 这样神奇的东西 我很好奇使用这个时是否有任何方法 fields id name description place name location city cover source 得到这个 id 1001
  • 保护 Firestore 中的特定文档字段

    我正在尝试创建一个简单的系统 允许用户使用他们提供的信息创建帐户 现在 我将所有数据存储在一个集合中users其中有一些代表用户的文档 我想保留用户的一些敏感数据 例如电子邮件地址和电话号码 在 Firebase 数据库中 我会创建如下内容
  • 按 Soundex(或类似)“亲密程度”排序

    有什么方法可以让 MySQL 根据结果与搜索词的 听起来 接近程度来对结果进行排序吗 我正在尝试对包含用户输入的城市名称的字段进行排序 存在变体和拼写错误 我想在顶部显示 最接近 的匹配项 我知道 soundex 可能不是最好的算法 但如果
  • 从命令行安装驱动程序 (.inf) 文件

    我正在 Windows 8 1 中工作 我需要安装驱动程序文件 inf文件 从命令行 我需要使用哪个命令 我知道我还有很多其他方法来安装 inf文件 但我必须从命令行安装它 请帮我 提前致谢 Granger 的评论也适用于 Windows
  • Apache Kafka - 主题/分区上的 KafkaStream

    我正在为大容量高速分布式应用程序编写 Kafka Consumer 我只有一个主题 但收到的消息率非常高 拥有多个分区来服务更多消费者将适合此用例 最好的消费方式是拥有多个流读取器 根据文档或可用示例 ConsumerConnector 给