如何强制消费者读取kafka中的特定分区

2024-04-19

我有一个应用程序,用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容。我创建了一个有 5 个分区的主题,有 5 个 kafka 消费者。但网页下载的超时时间为 60 秒。 当下载其中一个 URL 时,服务器会假设消息丢失并将数据重新发送给不同的消费者。

我已经尝试了中提到的所有内容

Kafka消费者配置/性能问题 https://stackoverflow.com/questions/39888281/kafka-consumer-configuration-performance-issues

and

https://github.com/spring-projects/spring-kafka/issues/202 https://github.com/spring-projects/spring-kafka/issues/202

但我每次都会遇到不同的错误。

是否可以将特定消费者与kafka中的分区绑定? 我在我的应用程序中使用 kafka-python


我错过了 Kafka-python 的文档。我们可以使用 TopicPartition 类为特定消费者分配一个分区。

http://kafka-python.readthedocs.io/en/master/ http://kafka-python.readthedocs.io/en/master/

>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何强制消费者读取kafka中的特定分区 的相关文章

  • Kafka结构化流KafkaSourceProvider无法实例化

    我正在开发一个流项目 其中有一个 ping 统计数据的 kafka 流 如下所示 64 bytes from vas fractalanalytics com 192 168 30 26 icmp seq 1 ttl 62 time 0 9
  • 如何在KafkaStream应用程序中获取partitionId和TopicName

    我们如何从 KafkaStream 获取主题名称和分区 id 对于任何其他 Kafka 消费者 我们可以获得主题名称和分区 ID 如下所示 ConsumerRecords
  • 如何在java程序中获取kafka消耗滞后

    我写了一个java程序来消费来自kafka的消息 我想监控消费延迟 如何通过java获取它 顺便说一句 我用
  • python 脚本在 docker 内运行时无法导入 kafka 库 [重复]

    这个问题在这里已经有答案了 我有以下 python 脚本 可以从 twitter 中提取推文并将其发送到 kafka 主题 该脚本运行完美 但是当我尝试在 docker 容器内运行它时 它无法导入 kafka 库 它说 语法错误 语法无效
  • 通过API服务端点消费来自Kafka主题的消息

    目前 我有一个 API 服务端点 用 netcore6 C 编写 它将接受消息对象并将其保存到数据库 然后将该消息发布到 kafka topic 2 如何利用我的 API 服务端点始终监听 观看并连接到 kafka topic 1 一旦新消
  • 处理 Kafka Broker 宕机时的故障

    我有一个 Kafka 代理正在运行 消息已成功消费 但我想处理 Kafka 代理在 Kafka 消费者端出现故障的情况 我读过了this https github com spring projects spring kafka issue
  • 无法对 @KafkaListener 带注释的方法进行单元测试

    我正在尝试在 Spring 中对 kafka 消费者类进行单元测试 我想知道如果 kafka 消息发送到它的主题 则侦听器方法被正确调用 我的消费者类注释如下 KafkaListener topics kafka topics myTopi
  • Kafka 一遍又一遍地重放消息 - 心跳会话已过期 - 标记协调器已死亡

    使用 python kafka api 从只有少量消息的主题中读取消息 Kafka 不断地一遍又一遍地重放队列中的消息 它从我的主题接收一条消息 返回每条消息内容 然后抛出ERROR Heartbeat session expired ma
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO
  • 无法向 kafka 主题发送消息

    我正在使用 Kafka Play 以及 Scala 这是我的代码 我想在其中发送消息到kafka服务器 主题名称是 测试主题 尽管我没有在主题中看到我发送的消息 但我没有收到任何错误 这里有什么问题吗 import kafka produc
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • Apache Kafka 与 Apache Storm

    Apache Kafka 分布式消息系统Apache Storm 实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据 就实时数据管道而言 在我看来 两者的工作都是相同的 我们如何在数据管道上使用这两种技术 您可以使用 Apa
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • Kafka 主题删除不起作用

    我使用的是 Kafka 0 8 2 版本 在开发过程中 我想我可能需要删除一个主题 所以我所做的是将以下行放入服务器配置文件中并启动两个 kafka 服务器 delete topic enable true 当我需要删除一个主题并运行以下命
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • Kafka Streams - 跳跃窗口 - 去重键

    我正在 4 小时窗口上进行跳跃窗口聚合 每 5 分钟前进一次 由于跳跃窗口重叠 我得到了具有不同聚合值的重复键 TimeWindows of 240 60 1000L advanceBy 5 60 1000L 如何消除具有重复数据的重复键或
  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确

随机推荐

  • 仅当 S 和 T 不同时,Func 的 T 才从 lambda 表达式的输出推断出来?

    When S and T是不同的 这有效 public static void Fun
  • SugarORM 从多个表查询?

    我正在为我的 Android 应用程序使用 SugarORM 在我的项目中 我有几个表 我想知道是否有一种方法可以将它们连接到另一个具有多个表中的列的类对象中 如果是 那么示例将非常有帮助 SugarORM 提供了一个用于简单查询的查询生成
  • 在 beforeRemote 远程挂钩内添加过滤器

    我有一个问题 在 Loopback 的文档中找不到答案 说我有一个模型Company和一个模型Employee 之间存在 1Xn 关系Company和它的Employees When api Employees被调用时 服务器返回所有员工
  • Windows 服务应该在哪里写入应用程序数据?

    我们有一个 Windows 服务作为我们软件的一部分 即使没有用户登录系统 它也可以在后台将数据传输到远程数据库 目前 这些数据缓存在 Program Files 目录中 我认为这不是一个好主意 因为它要求我们减少 Program File
  • 编程语言中的协变和逆变有什么区别? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 谁能解释一下协变和逆变的概念 编程语言理论 协方差非常简单 最好从某个集合类的角度来思考List 我们可以参数化 the List具有
  • Xamarin 表单按钮没有边框问题

    我尝试在视图中呈现可点击项目的列表 我想添加一个带有图像和白色边框的按钮 第一个 我发现 StackLayout ViewCell 中的按钮无法渲染边框
  • 如何同时使用 2 个范围滑块?

    我想使用 2 个范围滑块同时根据年龄和身高过滤表中的数据 我已经使用以下方法实现了 2 个范围滑块 年龄和身高 d3 slider js https github com MasterMaps d3 slider and a dc data
  • 身份验证超时无法正常工作

    我有一个 ASP NET 4 网站 我在 web Config 中将身份验证超时设置为 100 分钟 但是当用户使用网站时 网站甚至在 3 分钟后突然提示登录 下面的代码是我的 web Config 文件
  • 寻找成熟的 M-Tree 实现 [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我正在寻找一个成熟的 java M Tree 实现 甚至任何 M Tree 实现 除了我找到的唯一实现 http en wikipedia
  • Mockito.anyString() 在 Kotlin 中因 NPE 崩溃

    我正在使用浓缩咖啡 我想为内容解析器创建一个模拟响应 当我使用时 when context activity contentResolver query ArgumentMatchers isA Uri class java Argumen
  • famo.us:如何处理 textbox.onchange 事件

    我在famo us 大学上没有看到任何带有文本输入的教程 如何将文本框表面添加到我的应用程序并处理onchange events 有点难以理解你想要做什么 但是让我们从你的第一个问题开始 目前没有 onchange 处理程序选项 因此如果您
  • 为什么 JavaScript 中的数字是不可变的?

    我在这里阅读了问题和答案 javascript 数字 不可变 https stackoverflow com questions 8248568 javascript numbers immutable 但我还不清楚为什么数字 原始类型 是
  • 如何处理重复事件中的 DST 和 TZ?

    dateutil rrule 是否支持夏令时和夏令时 需要类似于 iCalendar RRULE 的东西 如果不是 如何解决这个问题 安排重复事件和 DST 偏移量更改 Imports gt gt gt from django utils
  • 编组无法从 Go 访问的 C 对象

    有一些 C 对象 例如联合体 包含位域的结构体以及其对齐方式与 Go 的 ABI 不同的结构体 无法从 Go 访问 其中一些结构无法更改为可从 Go 代码访问 因为它们是现有库的 API 的一部分 因此 要将这些对象编组到 Go 结构中 我
  • 测试自定义 AuthorizationAttribute 时抛出 NullReferenceException

    我看了一下 如何进行单元测试来测试检查请求标头的方法 https stackoverflow com questions 9263457 how do i make a unit test to test a method that che
  • Docker 卷挂载不存在

    我在 OS X 上运行 Docker 1 11 并试图找出本地卷的写入位置 我通过运行创建了一个 Docker 卷docker volume create name mysql 然后我跑了docker volume inspect mysq
  • Facebook 政策:我的应用程序可以自动将故事发布到我的 Facebook 流吗?

    根据Facebook 平台政策 http developers facebook com policy 您不得预先填写 user message 通过a发送的参数或内容 扩展权限 例如状态 更新或注释 除非用户 之前生成的内容 工作流程 这
  • 仅使用 VBA 宏将可见行的值从一个工作簿复制到新工作簿中

    我有一些宏可以将工作表 2 从现有工作簿复制到新工作簿 此代码按其应有的方式工作 只是存在不应在新工作簿上显示的隐藏行 这是我编写的代码 用于复制工作表并仅粘贴其值 Dim Output As Workbook Dim FileName A
  • PostgreSQL regexp_replace 与匹配的表达式

    我正在使用 PostgreSQLregexp replace函数来转义字符串中的方括号 括号和反斜杠 以便我可以将该字符串用作正则表达式模式本身 在使用该字符串之前还对该字符串进行了其他操作 但它们超出了本问题的范围 想法是替换 with
  • 如何强制消费者读取kafka中的特定分区

    我有一个应用程序 用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容 我创建了一个有 5 个分区的主题 有 5 个 kafka 消费者 但网页下载的超时时间为 60 秒 当下载其中一个 URL 时 服务器会假设消