在命令行中在Kafka中创建多个消费者

2024-03-09

我是卡夫卡的新手。当我在命令行中运行快速启动示例时,我发现无法在命令行中创建多个使用者。

健康)状况:

我构建了一个名为 test 的主题,包含 3 个分区,并且还针对该主题构建了一个生产者。

然后我想创建两个不同的消费者,共享关于此主题的名为 test1 的同一消费者组。

我运行了如下命令两次:

   bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --group test1

第一个有效,但当我第二次运行时,第一个会断开连接,第二个有效。

那么如何在命令行中的同一个消费者组中创建两个或多个消费者呢?

    WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

除了使用--consumer.config像 secfree 的答案一样的选项,您也可以使用

--consumer-property group.id=your_group

无需编辑配置文件即可指定组名称的选项。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在命令行中在Kafka中创建多个消费者 的相关文章

  • Consumer.endOffsets 在 Kafka 中如何工作?

    假设我有一个无限期运行的计时器任务 它会迭代 kafka 集群中的所有消费者组 并输出每个组的所有分区的延迟 提交偏移量和结束偏移量 与 Kafka 控制台消费者组脚本的工作方式类似 只不过它适用于所有组 就像是 单个消费者 不工作 不返回
  • 基于流的应用程序中的受控/手动错误/恢复处理

    我正在开发一个基于的应用程序Apache Flink 它利用Apache Kafka用于输入和输出 该应用程序可能会被移植到Apache Spark 所以我也将其添加为标签 问题仍然相同 我要求通过 kafka 接收的所有传入消息必须按顺序
  • 当我按键对数据进行分区,然后向 Kafka 中的主题添加新分区时,会发生什么?

    当我按键对数据进行分区 然后向 Kafka 中的主题添加新分区时 会发生什么 现有记录是否会发生变化 未来的数据将如何分区 当新分区添加到特定主题时 现有数据的分区不会改变 Kafka 不会尝试重新分发现有记录 此修改只会对新记录产生影响
  • 如何在Golang中创建kafka消费者组?

    可用的库是sarama https github com Shopify sarama 或其扩展萨拉玛簇 https github com bsm sarama cluster 但是没有提供消费者组示例 不在sarama https god
  • 使用 Kafka Streams 进行 OpenTracing - 如何?

    我正在尝试将 Jaeger 跟踪集成到 K Streams 中 我计划将跟踪添加到几个最重要的管道中 并且想知道将 Traceid 从一个管道传递到另一个管道的好方法是什么 这是我到目前为止所做的 在流处理管道开始时 我启动一个服务器范围并
  • Spring Cloud Stream动态通道

    我正在使用 Spring Cloud Stream 想要以编程方式创建和绑定通道 我的用例是 在应用程序启动期间 我收到要订阅的 Kafka 主题的动态列表 如何为每个主题创建一个频道 我最近遇到了类似的场景 下面是我动态创建 Subscr
  • 卡夫卡幂等生产者

    卡夫卡文档说 幂等生产者可以使用相同的生产者会话 但我无法理解这一点 比如说 Kafka 为每条消息添加序列号 最后一个序列号保存在 Kafka 中 不确定它在哪里维护 它如何生成序列号以及它保存在哪里 为什么当生产者崩溃并再次出现时它无法
  • 如何使用不同的kafka主题配置Kubernetes部署的微服务的每个pod/进程?

    在我们的应用程序中 有多个不同 kafka 主题的消费者 例如 Cosumer C1 Cosumer C2 Cosumer C3 Cosumer C4 Cosumer C5 以及不同的 kafka 主题 例如主题 1 主题 2 主题 3 主
  • GCP Dataproc 作业未找到存储在存储桶中的 SSL pem 证书

    我有一个 GCP Dataproc 集群 我正在尝试部署一个 pyspark 作业 该作业使用 SSL 生成一个主题 pem 文件存储在存储桶 gs dataproc kafka code code 中 我正在使用下面所示的代码访问 pem
  • kafka启动失败(版本0.8.0 beta1)

    我正在尝试在独立模式 在ec2上 上使用zookeeper版本 3 3 6 启动kafka服务 所以我运行 1 sbt update 2 sbt package 3 sbt assembly package dependency 然后启动z
  • 如何使用 haproxy 负载均衡器 Kafka Bootstrap?

    我有一个 kafka 集群 由 3 台在 AWS 上运行的机器组成 卡夫卡1到卡夫卡3 我正在使用新型卡夫卡消费者 gt 0 8 我知道kafka客户端连接到其中一台kafka服务器 获取服务器元数据 然后直接连接到代理 我想确保在代理发生
  • 由于 jaas.conf 不正确而导致 Kafka TopicAuthorizationException

    我指的是JAAS登录配置文件 https docs oracle com javase 7 docs technotes guides security jgss tutorials LoginConfigFile html 它讨论了两种指
  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • 是否可以使用 Kafka Streams 访问消息头?

    随着添加Headers http apache spinellicreations com kafka 0 11 0 0 javadoc org apache kafka common header Header html到记录 生产者记录
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • Kafka REST 代理 API 有哪些好处?

    我不知道Kafka REST Proxy API的优点 它是一个 REST API 所以我知道它对于管理来说很方便 人们为什么使用 Kafka REST 代理 API 添加对生产者或消费者的 Maven 依赖是否很麻烦 另外 我知道kafk
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events

随机推荐

  • 如何从 SwiftUI 部分中删除背景颜色? [复制]

    这个问题在这里已经有答案了 我有一个 SwiftUI 部分 如下所示 struct FormView View var body some View Form Section Button action HStack Spacer Text
  • HTTP 错误 504:尝试阅读 Reddit 评论帖子时网关超时

    我在尝试从 reddit 获取评论的 http 时遇到错误 各种 URL 都发生过这种情况 并非所有 URL 都带有特殊字符 这就是其中之一 在一小时内 可能有 1000 个或更多对 reddit com 域的请求 hdr User Age
  • 使用并行线程提高 Python 执行速度

    假设我有这个示例代码 x foo1 something1 y foo2 something2 z max x y 我想通过使用线程来提高这段代码的执行时间 希望它有帮助 不是吗 我想让事情尽可能简单 所以基本上我想做的是创建两个同时工作的线
  • Google App Engine 灵活环境 0 个实例

    在过去的一周里 我发现我的 GAE 灵活环境中的实例数量降至 0 并且没有新实例启动 我对灵活环境的理解是 这不应该是可能的 https cloud google com appengine docs the appengine envir
  • 暂停播放时 MPNowPlayingInfoCenter 未正确反应

    我试图让 MPNowPlayingInfoCenter 在暂停播放时正常工作 我有一个使用 AVPlayer 进行播放的流媒体音乐应用程序 并且我正在通过 Airplay 在 Apple TV 中播放 除了暂停之外的所有内容似乎都在 App
  • 理解此警告:可序列化类未声明静态最终serialVersionUID

    我有一些静态初始化代码 someMethodThatTakesAHashMap new HashMap
  • 在 R 中保留 dcast 中的变量

    我正在使用dcastR 中的函数将长格式数据集转换为宽格式数据集 我有一个ID数字 一个分类变量 CAT 和一个连续变量 AMT 但是 我还有一个变量SEX 对于给定的所有行都是相同的ID数字 这段代码可以创建宽格式数据集 但我输了SEX
  • SwiftUI navigationBarItems 在 TabView 中消失

    我有一个包含导航栏项目的视图 并将该视图嵌入到TabView 但这样做时 栏项目不再出现 如果我在 a 之外调用视图TabView一切都按预期进行 下面是一个小示例项目来说明我的问题 请注意TabView最初没有被调用ContentView
  • 如何在 Vue js 中使 localStorage 中的数据响应

    我在 Vue js 项目中使用 localStorage 作为数据源 我可以读写 但找不到反应性使用它的方法 我需要刷新才能看到我所做的任何更改 我使用数据作为多个组件的道具 当我写入时localStorage从我触发的组件forceUpd
  • C++ 中的 HMAC SHA256 (DynamoDB)

    我正在尝试通过 REST Web API 连接到 DynamoDB 它要求我使用 HMAC SHA256 生成签名 我已经让 SHA 256 工作了 但我似乎无法让 HMAC 工作 这里是 C 代码 使用 OpenSSL string hm
  • Kotlin 中 open 和 override 方法之间的区别?

    open class Base open fun v fun nv class Derived Base override fun v 这是一个例子 有人可以解释一下区别吗 这里 open 关键字是强制性的吗 是的 两者都有open在您的示
  • 如何使用异步方法加载数据库数据并保持 UI 响应

    我制作了一个运行良好的大型应用程序 除了它的 UI winforms 在使用 webclient 从 web 检索数据时冻结 链接是嗯 不是最快的 或者从数据库检索查询的数据连接 这是存储在一个遥远 缓慢的服务器中 无法避免它 因此 我想到
  • UDP 服务器套接字缓冲区溢出

    我正在 Linux 上编写 C 应用程序 我的应用程序有一个 UDP 服务器 它在某些事件上向客户端发送数据 UDP 服务器还接收来自客户端的一些反馈 确认 为了实现这个应用程序 我使用了一个 UDP 套接字 例如int fdSocket
  • Win Vista/7 下的 WriteFile 错误#5“拒绝访问”

    我有一个 C 控制台应用程序 可以读取 1GB SD 卡 修复不正确关闭的文件并相应地写入 FAT 表 SD 卡一开始是由定制设备中的固件写入的 它在 Xp 之前工作正常 在 Win Vista 7 中停止工作 我尝试提升权限 在管理员帐户
  • Symfony 2 根据用户代理属性加载不同的模板

    是否有可能 以及如何 确定用户是否正在使用移动设备 在这种情况下强制 symfony 2 加载不同的模板 并回退默认的 html 模板 id 喜欢做的是 在不修改任何控制器的情况下加载不同的模板 UPDATE 真正的问题不是检测部分 它实际
  • 我的 iPhone 应用程序使用了多少内存(来自模拟器)

    我知道这与 Instruments 有关 但这有点令人困惑 并且在 Google 上搜索 Instruments 并没有多大帮助 我想知道我的应用程序运行情况如何 例如它使用了多少内存 我只是不知道在哪里可以找到类似的东西 据我们从模拟器中
  • Postgres COPY TO NULL 整数

    我有一个包含各种列的 CSV 其中一列包含整数数据 但是 当运行副本时 COPY soc FROM soc asc WITH DELIMITER 我得到以下信息 ERROR invalid input syntax for integer
  • ANTLR4 将 ParserRuleContext 树展平为数组

    如何压平一个ParserRuleContext将子树放入令牌数组中 这ParserRuleContext getTokens int ttype 看起来不错 但什么是ttype 是token类型吗 如果我想包含所有令牌类型 应使用什么值 P
  • 如何在 C# 中从两个列表创建单个对象对列表?

    我有两个对象列表 列表 A 和列表 B 我需要创建列表 C 将列表 A 和列表 B 组合成对 例如 List A object a1 object a2 object a3 List B object b1 object b2 object
  • 在命令行中在Kafka中创建多个消费者

    我是卡夫卡的新手 当我在命令行中运行快速启动示例时 我发现无法在命令行中创建多个使用者 健康 状况 我构建了一个名为 test 的主题 包含 3 个分区 并且还针对该主题构建了一个生产者 然后我想创建两个不同的消费者 共享关于此主题的名为