如何在kafka中创建新的消费者组

2023-11-22

我按照快速入门指南上的说明在本地运行 kafkahere,

然后我在中定义了我的消费者组配置config/consumer.properties这样我的消费者就可以从定义的中选择消息group.id

运行以下命令,

bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

结果是,

test-consumer-group  <-- group.id defined in conf/consumer.properties
console-consumer-67807 <-- when connecting to kafka via kafka-console-consumer.sh

我能够通过基于 python 的消费者连接到 kafka,该消费者配置为使用提供group.id i.e test-consumer-group

首先,我无法理解kafka如何/何时创建消费者组。看来它加载了conf/consumer.properties在某个时间点,此外它还隐式地创建了消费者组(在我的例子中console-consumer-67807)通过连接时kafka-console-consumer.sh.

比方说,我如何明确创建自己的消费者组my-created-consumer-group ?


您不会显式创建消费者组,而是构建始终属于某个消费者组的消费者。无论您使用哪种技术(Spark、Spring、Flink...),每个 Kafka Consumer 都会有一个 Consumer Group。消费者组可以为每个单独的消费者进行配置。

似乎它在某个时间点加载conf/consumer.properties,此外,它在通过 kafka-console-consumer.sh 连接时隐式创建消费者组(在我的例子中是 console-consumer-67807)

如果您没有告诉您的控制台用户实际使用该文件,则不会考虑该文件。

有以下几种方法可以提供消费者组的名称:

带有属性文件的控制台 Consumer (--consumer.config)

文件是这样的config/consumer.properties应该看起来像

# consumer group id
group.id=my-created-consumer-group

这就是你如何确保控制台消费者接受这个group.id考虑到:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --consumer.config /path/to/config/consumer.properties

使用 --group 控制台消费者

对于控制台消费者,消费者组会自动创建,带有前缀“console-consumer”和后缀(例如 PID),除非您通过添加来提供自己的消费者组--group:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --group my-created-consumer-group

基于标准代码的消费者 API

当使用标准 JAVA/Scala/... Consumer API 时,您可以通过以下属性提供 Consumer Group:

Properties settings = new Properties();
settings.put(ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer");
// set more properties

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
consumer.subscribe(Arrays.asList("test-topic")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在kafka中创建新的消费者组 的相关文章

随机推荐

  • 如何在没有 Transfer Encoding: chunked 的情况下发送 HTTP 响应?

    我有一个响应 Twilio API 的 Java Servlet Twilio 似乎不支持我的回复所使用的分块传输 我怎样才能避免使用Transfer Encoding chunked 这是我的代码 response is HttpServ
  • Web 服务无法序列化接口

    我有一个像这样的界面 public interface IDocument ISerializable Boolean HasBeenUploaded get set void ISerializable GetObjectData Ser
  • RXTX 在 Windows 7 64 位下无法列出或找到端口

    早上好 我在 Windows 7 64 位上使用 或安装 rxtx 时遇到问题 我之前在 x86 win XP 系统上使用过它 没有出现任何问题 由于由于某种原因重新安装到这个新系统 rxtx 无法找到任何端口 我尝试过 rxtx 安装 C
  • C中整数的大小[重复]

    这个问题在这里已经有答案了 可能的重复 int 的大小是否取决于编译器和 或处理器 整数的大小是否取决于编译器 操作系统或处理器 如果我在 32 位操作系统或 64 位操作系统上使用 gcc 并在 32 位计算机或 64 位计算机上运行 在
  • 如何让 Emacs 启动更快?

    I use Emacsv 22 控制台版本 可以远程使用PuTTY或本地与Konsole 作为我在 Linux 上的主要文本编辑器 每次启动时都需要一段时间来加载 可能几乎一秒钟 尽管我从未计时 我经常打开和关闭 Emacs 因为我更喜欢使
  • 在 R 中的同一数据框中绑定具有相似列名的列

    我有一个看起来有点像这样的数据框 df lt data frame 0 2 1 3 2 4 5 7 6 8 2 4 0 2 1 3 2 4 colnames df lt rep c a b c 3 gt df a b c a b c a b
  • 从 Zip 文件中删除文件夹

    我正在尝试从 Zip 文件中删除文件夹 所以我的文件结构是这样的 首先内部 我尝试使用这里的代码从 Zip 中删除文件作者 Siddharth Rout 但它只移动文件 显然文件夹变空了 但并未从 Zip 中删除 Code Sub del
  • 以编程方式添加的捆绑产品未显示在前端

    我正在尝试从 PHP 脚本将捆绑产品插入 Magento 数据库 有问题的版本是社区 1 5 1 0 我尝试了问题中描述的方法 使用简单商品的 SKU ID 以编程方式在 Magento 中添加捆绑产品 插入的产品在管理部分中显示得很好 我
  • 是否应该在 Web 应用程序中禁用实体框架延迟加载?

    我听说您应该在 Web 应用程序中禁用 EF 的延迟加载功能 ASP NET Here and here 对于初学者 现在我在这里真的很困惑 因为我一直认为应该始终启用延迟加载 因为它可以防止从数据库获取不必要的数据 所以 现在我的问题是
  • 在 iOS Core Data 中存储高精度纬度/经度数字

    我正在尝试将纬度 经度存储在核心数据中 这些最终精度为 6 到 20 位 无论出于何种原因 我将它们作为核心数据中的浮点数 将它们四舍五入而不给我返回确切的值 我尝试了 十进制 类型 但也没有运气 NSStrings 是我唯一的选择吗 ED
  • grunt-contrib-jasmine 和 PhantomJS 安全性

    我收到错误 XMLHttpRequest 无法加载https my api domain com Access Control Allow Origin 不允许 Origin file 当我尝试运行一些通过 grunt contrib ja
  • 使用关联数组的 D3 日历视图

    我想创建一个日历视图 如下例所示 http bl ocks org 4063318 其实我正在尝试修改它 我有一个像这样的关联数组 AdminCourt 2012 10 02 2 2012 10 09 2 2012 10 16 1 Cons
  • Java 无法使用正则表达式 \s,显示:无效的转义序列

    我想用 替换字符串中的所有空白字符 用 ss 替换所有 它对于 效果很好 但不知何故 eclipse 不允许我使用 s 作为空白 我尝试了 t 但它也不起作用 我收到以下错误 无效的转义序列 有效的转义序列 是 b t n f r 这是我的
  • 将元素值反序列化为字符串,尽管它包含混合内容

    假设这样的 XML
  • 将数据框日期列的 dd-mm-yyyy 日期格式更改为 yyyy-mm-dd [重复]

    这个问题在这里已经有答案了 我有这个熊猫数据框df Name Date Score Score2 Joe 26 12 2007 53 45 53 4500 Joe 27 12 2007 52 38 52 7399 Joe 28 12 200
  • 单元测试控制台 C# 应用程序的最佳方法

    我有一个简单的控制台应用程序 它是用一个普通的 main 来触发的 整个程序都在 main 中 它使用命令行解析器库 然后 我在解决方案中有第二个项目 其中包含应用程序的单元测试 但我似乎没有找到从测试中启动主程序进程的好方法 我当前实际启
  • 如何获取UIImage的dpi/ppi?

    iOS 中如何获取图像的 dpi ppi 也许原始图像文件包含这些信息 所以我可以从 NSData 获取 ppi dpi 谢谢 要从 NSData 中存储的图像中提取 DPI 请在项目中包含 Apple 的 ImageIO 框架并使用以下命
  • ASP.NET MVC3:通过控制器加载图像

    我尝试使用来自的答案here 但没有成功 我有以下代码 public ActionResult ShowImage using FileStream stream new FileStream Path Combine Server Map
  • 在 Jupyter Notebook 中使用 Tkinter

    我刚刚开始使用 Tkinter 并尝试在 python 中创建一个简单的弹出框 我从网站上复制粘贴了一段简单的代码 from Tkinter import master Tk Label master text First Name gri
  • 如何在kafka中创建新的消费者组

    我按照快速入门指南上的说明在本地运行 kafkahere 然后我在中定义了我的消费者组配置config consumer properties这样我的消费者就可以从定义的中选择消息group id 运行以下命令 bin kafka cons