具有动态数量的并行消费者的 Kafka 工作队列

2024-03-12

我想用Kafka来“分工”。我想将工作实例发布到某个主题,并运行由相同使用者组成的云来处理它们。当每个消费者完成其工作时,它将从该主题中提取下一个工作。每项工作只能由一个消费者处理一次。处理工作非常昂贵,因此我需要在许多机器上运行许多消费者才能跟上。我希望消费者的数量根据需要增加和减少(我计划为此使用 Kubernetes)。

我发现了一种为每个消费者创建唯一分区的模式。这就“分工了”,但是分区的数量是在创建主题时设置的。此外,必须在命令行上创建主题,例如

bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 3 --topic divide-topic --create --replication-factor 1

...

for n in range(0,3):
    consumer = KafkaConsumer(
                     bootstrap_servers=['localhost:9092'])
    partition = TopicPartition('divide-topic',n)
    consumer.assign([partition])
    ...

我可以为每个消费者创建一个独特的主题,并编写自己的代码来将工作分配给这些主题。这看起来很恶心,而且我仍然必须通过命令行创建主题。

具有动态数量的并行消费者的工作队列是一种常见的体系结构。我不可能是第一个需要这个的人。使用 Kafka 的正确方法是什么?


您发现的模式是准确的。请注意,也可以使用以下命令创建主题卡夫卡管理 API http://kafka.apache.org/11/javadoc/org/apache/kafka/clients/admin/AdminClient.html#createTopics-java.util.Collection- and 还可以添加分区 http://kafka.apache.org/documentation/#basic_ops_modify_topic创建主题后(有一些陷阱)。

在 Kafka 中,划分工作和允许扩展的方法是使用分区 http://kafka.apache.org/documentation/#kafka_mq。这是因为在消费者组中,每个分区在任何时候都被单个消费者消费。

例如,您可以有一个具有 50 个分区的主题和一个订阅该主题的消费者组:

  • 当吞吐量较低时,组中只能有少数消费者,他们应该能够处理流量。

  • 当吞吐量增加时,您可以添加使用者(最多可达分区数量(本例中为 50))来承担部分工作。

在这种情况下,50 个消费者是扩展的极限。消费者公开了许多指标(例如延迟),让您可以随时决定是否有足够的指标

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有动态数量的并行消费者的 Kafka 工作队列 的相关文章

  • GKE 集群无法从同一项目(GitLab Kubernetes 集成)中的 GCR 注册表中提取 (ErrImagePull):为什么?

    因此 在谷歌搜索了一下之后 被那些在 Pull Secrets 方面遇到麻烦的人污染了 我将其发布在这里 以及 GCP 支持 据我所知将更新 我通过 GitLab Kubernetes 集成创建了一个集群 文档 https about gi
  • 不同命名空间中 k8s 的入口配置

    我需要在 azure k8s 上配置 Ingress Nginx 我的问题是是否可以在一个命名空间等中配置 ingress ingress nginx 和其他命名空间中的一些服务 例如 资源 我的文件看起来像这样 ingress nginx
  • Kafka 中的“__consumer_offsets”主题是什么

    当我运行此命令时 我得到 2 个主题 我知道我创建了测试主题 但我看到了一个名为 consumer offsets 的附加主题 从名称上看 它与消费者抵消有关 但它是如何使用的呢 bin kafka topics sh list zooke
  • 异步运行 PHP 任务

    我正在开发一个较大的 Web 应用程序 后端主要是 PHP 代码中有几个地方我需要完成某些任务 但我不想让用户等待结果 例如 当创建一个新帐户时 我需要向他们发送一封欢迎电子邮件 但是 当他们点击 完成注册 按钮时 我不想让他们等到电子邮件
  • Istio 的 `DestinationRule` 与 Kubernetes `Service` 之间的区别?

    我刚刚阅读了 istio 1 0 0 的文档 特别是它的概念 我想了解一件事 尤其是DestinationRule https istio io docs concepts traffic management rule configura
  • kafka启动失败(版本0.8.0 beta1)

    我正在尝试在独立模式 在ec2上 上使用zookeeper版本 3 3 6 启动kafka服务 所以我运行 1 sbt update 2 sbt package 3 sbt assembly package dependency 然后启动z
  • RabbitMQ 上的 Nack 和拒绝

    我想处理消费者从队列中获取的不成功的消息并将它们重新排队 想象一下我有这样的情况 P gt foo bar baz gt C 其中 foo bar 和 baz 是消息 如果消费者读到baz但出了问题 我可以使用basic reject or
  • 用于 SQL 更新语句的 Java 单工作线程

    我正在开发一个基于 Java 的服务器 其中有多个线程 每个连接的用户一个线程 一些额外的线程 会涉及到一些数据库连接 所以我在想服务器每次创建一个SELECT查询数据库时 它将为此启动一个新线程 以防止当前线程阻塞 我计划为此使用连接池
  • 从 pod 连接到其他 pod

    基本上 我有一个部署 它创建了 3 个自动扩展的容器 PHP FPM NGINX 和包含应用程序的容器 所有这些都设置了机密 服务和入口 该应用程序还在 PHP FPM 和 NGINX 之间共享项目 因此一切都已设置完毕 由于我想使用 K8
  • Kafka的消息键有什么特别的地方吗?

    我没有看到任何提及消息键 org apache kafka clients producer ProducerRecord key 除了它们可以用于主题分区 我可以自由地将我喜欢的任何数据放入密钥中 还是有一些我应该遵守的特殊语义 该密钥似
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • 使用java工具的类似Sidekiq的队列?

    我想要一个工作队列 其行为几乎与 ruby 的 sidekiq 完全相同 它不need使用 Redis 但它可以 我只是不能使用 ruby 甚至不能使用 Jruby 基本上 我希望能够创建使用某些参数运行的作业 并且工作池执行作业 工作人员
  • 使用一个或多个标准 FIFO 队列实现延迟队列 [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 延迟队列是一种队列 其中每条消息都有
  • Kubernetes Service 没有活动 Endpoint

    我创建了一个Deployment Service and an Ingress 不幸的是 ingress nginx controllerpod 抱怨我的Service没有活动端点 controller go 920 Service
  • Docker nuget连接超时

    尝试利用官方jetbrains teamcity agentKubernetes 上的图像 我已经设法在 Docker 中运行 Docker 但尝试使用以下命令构建 ASP NET Core 映像docker build命令失败于dotne
  • 具有替代方案的重载方法值表

    我有编译器抱怨的以下代码 val state KTable String String builder table BARY PATH Materialized as PATH STORE 错误信息 error home developer
  • Julia:如何让多个工作人员访问模块中的函数?

    我有以下测试模块 MyMod jl 来在 Julia 中存储一些测试函数 一些核心函数是串行编写的 其他函数并行调用核心函数 module MyMod export Dummy distribute data getfrom recombi
  • Kubernetes:无法创建存储库

    我正在跟进Kubernetes 入门指南 http kubernetes io docs hellonode 一切都很顺利 直到我跑步 gcloud docker push gcr io
  • 由于 jaas.conf 不正确而导致 Kafka TopicAuthorizationException

    我指的是JAAS登录配置文件 https docs oracle com javase 7 docs technotes guides security jgss tutorials LoginConfigFile html 它讨论了两种指
  • 使用 zeppelin 在 kubernetes 上 Spark

    我按照本指南在使用 minikube 设置的本地 kubernetes 集群中运行 zeppelin 容器 https zeppelin apache org docs 0 9 0 SNAPSHOT quickstart kubernete

随机推荐

  • 如何设置jquery按钮的活动状态

    我有一个使用 jquery 按钮的正确导航 如果用户点击它 页面就会加载 我需要的是 一旦页面重新加载 按钮应该显示它已被选中 如何使用 jquery 按钮执行此操作 为什么不使用 jQuery UI Tab 小部件 您也可以尝试使用按钮集
  • DataGridView:仅当滚动到底部时自动向下滚动

    我有一个程序 它使用 dataGridView 来显示通过向 dataGridView 添加行来每秒自动更新的数据 当我想在开头阅读一些内容时 我会向上滚动 即使数据更新 滚动条也不会向下滚动 这很好 但我希望滚动条仅在位于 dataGri
  • 如何在 LibreOffice 中运行 python 宏?

    当我去工具 gt 宏 gt 组织宏 gt Python我得到这个对话框 It is 不可能创建新的 Python 宏 显然 LibreOffice 有没有Python编辑器所以我必须在其他地方编写宏 然后执行它们 但是我不知道where放置
  • 我不断收到此错误:“阅读器关闭时调用 Read 的尝试无效”

    这是我的代码 我关闭并打开阅读器 但它仍然无法工作 几个线程可以同时访问这个函数 但是有一个锁 它一开始会工作几次 但迟早我会收到异常 阅读器关闭时调用 Read 的尝试无效 private IList
  • 在 El Capitan 10.11.6 上安装 Tensorflow 1.10

    我试图在我的旧Mac上安装tensorflow 1 10 但每次都会遇到同样的问题 一旦我启动 python shell 我就会收到以下错误 我确实尝试先将其安装在 virtualenv 中 之后 我尝试仅使用 pip 安装它并得到相同的错
  • Tempus Dominus Bootstrap4 需要 moment.js。 (日期时间选择器)

    我正在尝试使用 Tempus Dominus Bootstrap4 添加 DateTimePicker 但我收到以下错误 I use Laravel as a front end So I use laravel mix 我的刀片文件在下面
  • .htaccess 重写规则中的模式干扰

    在 htaccess 中定义重写规则时 我遇到了模式干扰问题 我试图重写的网站链接是 example com item work gt example com work example com item work tile x gt exa
  • 使用 ActiveAdmin 进行多步骤表单?

    是否可以使用 ActiveAdmin 创建多步骤表单 如果没有 是否可以在提交表单后添加另一个重定向到的页面 不是默认索引 显示或表单页面 我自己也一直在苦恼这个问题 我发现您可以使用 ActiveAdmin 文件中的集合操作添加自己的页面
  • Gwt 2.8-rc1 超级开发模式不适用于 IntelliJ IDEA 2016.2.5

    我的项目在超级开发模式下使用 GWT 2 7 和 2 8 beta1 正确启动 不幸的是 自 2 8 rc1 发布以来 它没有启动 看起来 GWT 项目依赖项配置不正确 IDE IntelliJ IDEA 2016 2 5 C Progra
  • 如何在 karma angularjs 中对 setInterval 进行单元测试

    app directive shuffleBlocks function timeout return link function sco ele att if itemCnt lt 1 return Trigger function fu
  • UITableViewCell 布局在重复使用单元格之前不会更新

    我有一个 UITableView 其中填充了自动调整大小的单元格 UITableView设置相当简单 tableView estimatedRowHeight 70 tableView rowHeight UITableViewAutoma
  • 将 angularjs 值传递给 PHP 变量

    我从 AngularJS 和 ngStorage 开始 我可以成功保存并显示数据 我像以前一样显示值 myobj session 我想将任何存储的值传递到 php 变量中 下面显示的是我的想象逻辑 我知道那是行不通的 我的问题是如何以正确的
  • 如何让 jest 使用 ES6 依赖项

    所以我有一个依赖包 我将其拉入我的 node modules 文件夹中 这个包中有一个像这样的导出 Object
  • PHP sleep() 导致 CPU 使用率高

    我正在运行一个大部分时间处于睡眠状态的 CLI 脚本 每隔 10 秒左右 脚本就会执行一些操作 问题是 脚本在睡眠时 CPU 使用率为 94 我设置的方法是 while 1 sleep 10 doStuff 虽然这按预期工作 但存在一个明显
  • 字符+字符=整数?为什么?

    为什么要加两个char在 C 中结果为int type 例如 当我这样做时 var pr R G B Y P the pr变量变成int类型 我希望它是一个string类型值为 RGBYP 为什么C 要这样设计呢 默认实现不是添加两个cha
  • 从Excel VBA的下拉列表中选择特定项目

    我正在为我的办公室设计一个仪表板 这一切都有效 但我想添加一个选项 而不是在下拉列表中搜索 250 多个项目 您还可以单击一个单元格 下拉列表将更改为该值 并且分配的宏将为该下拉列表运行 到目前为止 我不知道如何让 vba 从下拉列表中选择
  • 具有列表视图的 Android 小部件正在刷新具有丑陋的短“闪烁”效果的项目

    我想制作一个带有 ListView 的小部件 您可以在其中添加 listItems 来显示计数器计时器 以查看您有多少时间来处理某个事件 这是我第一次使用小部件 我不知道我的方法是好还是坏 到目前为止我得到了这个 AppWidgetProv
  • Javascript - 所有嵌套的 forEach 循环完成后的回调

    我确信这是一个相当简单的任务 但我现在无法全神贯注 我有一组嵌套的 forEach 循环 当所有循环运行完毕时 我需要一个回调 我愿意使用 async js 这就是我正在处理的 const scanFiles function accoun
  • 如何使用 BiWeekly 库和 Java Mail API 创建现有事件并发送更新?

    我在用着BiWeekly http sourceforge net projects biweekly 库来创建 VEVENT 然后使用以下命令发送它Java 邮件 API https java net projects javamail
  • 具有动态数量的并行消费者的 Kafka 工作队列

    我想用Kafka来 分工 我想将工作实例发布到某个主题 并运行由相同使用者组成的云来处理它们 当每个消费者完成其工作时 它将从该主题中提取下一个工作 每项工作只能由一个消费者处理一次 处理工作非常昂贵 因此我需要在许多机器上运行许多消费者才