如何使用 haproxy 负载均衡器 Kafka Bootstrap?

2024-04-28

我有一个 kafka 集群,由 3 台在 AWS 上运行的机器组成: 卡夫卡1到卡夫卡3

我正在使用新型卡夫卡消费者(>0.8)。

我知道kafka客户端连接到其中一台kafka服务器,获取服务器元数据,然后直接连接到代理。

我想确保在代理发生故障的情况下,客户端仍然能够获取元数据。

为此,我有一个具有以下配置的 HAProxy 负载均衡器:

listen kafka
      bind *:9092
      mode tcp
      balance roundrobin
      no option clitcpka
      option forceclose
      timeout check 5s
      server kafka1 kafka1.example.com:9092 check inter 3s fastinter 1s
      server kafka2 kafka2.example.com:9092 check inter 3s fastinter 1s
      server kafka3 kafka3.example.com:9092 check inter 3s fastinter 1s

这个想法是,如果其中一个经纪人出现故障,它将被从轮换中删除,并且从其他经纪人之一获取元数据。它还允许我透明地向集群添加更多代理。

然而,这给我的卡夫卡客户带来了问题。 PipelineDB 直接拒绝从主题消费,Python 的融合 kafka 库也是如此。 Kafkacat 消费,但一段时间后开始出现错误:

% ERROR: Local: Broker transport failure: kafka.example.com:9092/bootstrap: Receive failed: Disconnected
% ERROR: Local: Broker transport failure: kafka.example.com:9092/bootstrap: Connection closed

我在网上找不到任何有关如何负载平衡 Kafka 引导程序的信息。另一种方法是仅配置具有多个 A 记录的 DNS 条目,但我遇到了其中一个代理关闭的问题。


只需将所有三个引导服务器指定为逗号分隔列表即可。然后,如果一个发生故障,它只会查询下一个的元数据。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 haproxy 负载均衡器 Kafka Bootstrap? 的相关文章

  • 卡夫卡高级消费者 error_code=15

    当尝试使用高级消费者 使用全新的消费者组 从 Kafka 进行消费时 消费者永远不会开始运行 当我将日志记录级别切换为调试时 我可以看到以下两行一遍又一遍地重复 DEBUG AbstractCoordinator 09 43 51 192
  • 卡夫卡幂等生产者

    卡夫卡文档说 幂等生产者可以使用相同的生产者会话 但我无法理解这一点 比如说 Kafka 为每条消息添加序列号 最后一个序列号保存在 Kafka 中 不确定它在哪里维护 它如何生成序列号以及它保存在哪里 为什么当生产者崩溃并再次出现时它无法
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • 如何强制消费者读取kafka中的特定分区

    我有一个应用程序 用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容 我创建了一个有 5 个分区的主题 有 5 个 kafka 消费者 但网页下载的超时时间为 60 秒 当下载其中一个 URL 时 服务器会假设消
  • Kafka 连接教程停止工作

    我在此链接中执行了步骤 7 使用 Kafka Connect 导入 导出数据 http kafka apache org documentation html quickstart http kafka apache org documen
  • 无法将 HAProxy 实现为 HTTPS 的转发代理

    我正在尝试使用 HAProxy 作为转发代理 它适用于 HTTP 但不适用于 HTTPS 下面是我的 HTTP HAProxy 配置 listen forward http proxy bind 80 http request do res
  • 批量插入成功后更新 Kafka 提交偏移量

    我有一个 spring kafka 消费者 它读取记录并将其移交给缓存 计划任务会定期清除缓存中的记录 我想仅在批次成功保存到数据库后更新 COMMIT OFFSET 我尝试将确认对象传递给缓存服务以调用确认方法 如下所示 public c
  • 通过 Kafka 消费者重试维持订单保证

    我正在为基于 Kafka 的数据处理管道中的消费者重试设计一个架构 我们正在使用 Kafka 生产者和消费者 并且正在考虑重试主题 如果消费出错 将在这些主题上发送消息 将会有消费者以一定的节奏运行这些重试主题 我读了很多参考架构 但没有一
  • 无法对 @KafkaListener 带注释的方法进行单元测试

    我正在尝试在 Spring 中对 kafka 消费者类进行单元测试 我想知道如果 kafka 消息发送到它的主题 则侦听器方法被正确调用 我的消费者类注释如下 KafkaListener topics kafka topics myTopi
  • Kafka的消息键有什么特别的地方吗?

    我没有看到任何提及消息键 org apache kafka clients producer ProducerRecord key 除了它们可以用于主题分区 我可以自由地将我喜欢的任何数据放入密钥中 还是有一些我应该遵守的特殊语义 该密钥似
  • WebSocket 和负载平衡是瓶颈吗?

    当有一堆充当 WebSocket 无人机的系统和这些无人机前面的负载均衡器时 当 WebSocket 请求进入 LB 时 它会选择一个 WebSocket 无人机 并建立 WebSocket 我在 ELB 上使用 AWS ELB tcp S
  • 在SSL模式下使用apache kafka

    我正在尝试在 SSL 1 way 模式下设置 kafka 我已经阅读了官方文档并成功生成了证书 我将记下两种不同情况的行为 此设置只有一名经纪人和一名动物园管理员 案例 1 经纪人间通信 明文 我的相关条目server properties
  • Kafka 0.8.2 中是否可以向现有主题添加分区

    我有一个Kafka https kafka apache org 集群运行有 2 个分区 我一直在寻找一种将分区计数增加到 3 的方法 但是 我不想丢失有关该主题的现有消息 我尝试停下来Kafka https kafka apache or
  • 切换到负载均衡器后,django 帖子收到 CSRF 验证失败

    我有一个有效的登录模板 可以发布帖子 如下所示
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • Kafka REST 代理 API 有哪些好处?

    我不知道Kafka REST Proxy API的优点 它是一个 REST API 所以我知道它对于管理来说很方便 人们为什么使用 Kafka REST 代理 API 添加对生产者或消费者的 Maven 依赖是否很麻烦 另外 我知道kafk
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • kafka Avro 多个主题的消息反序列化器

    我正在尝试以 avro 格式反序列化 kafka 消息 我使用以下代码 https github com ivangfr springboot kafka debezium ksql blob master kafka research c
  • 使用Spring Cloud Stream Kafka动态更改instanceindex

    如同 在运行时更改 spring cloud stream 实例索引 计数 https stackoverflow com questions 37579939 changing spring cloud stream instance i
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti

随机推荐

  • Android Oreo 上的操作系统阻止了地理围栏转换 PendingIntent

    这只发生在 Android Oreo 上 我正在使用 Play 服务 11 4 2 我正在使用 GeofencingClient 和 addGeofences 方法将地理围栏注册到正在处理地理围栏转换的 IntentService 中 并将
  • 在 Javascript 中获取类的所有实例

    我以为这个问题已经有了答案 但我似乎找不到答案 如何在 Javascript 中的此类的所有实例上运行特定的类方法 这必须在我不知道实例名称的情况下完成 我想我可以在类中使用某种静态变量来存储所有实例 但这在 JS 中似乎不存在 那么如何在
  • 在 Angular dart 上设置全局 Http 请求标头

    如何配置 Http 服务 向调用添加标头 我尝试以下方法 class GlobalHttpHeaders static setup Injector inj HttpDefaultHeaders http inj get HttpDefau
  • 如何在表单数组中添加无效的表单控件而不影响其功能

    我想创建一个动态表单 将表单控件 必需的表单控件 添加到表单数组中 表单控件无效 因为它需要由用户填写 为空 但是当我添加表单控件时 出现错误 ExpressionChangedAfterItHasBeenCheckedError 表达式在
  • 如何使用 Moq 返回数据或值列表?

    谁能告诉我如何使用 Moq 框架使用模拟对象返回数据列表并将返回的数据列表分配给另一个 List 变量 public class SomeClass public virtual List
  • 如何在 Python 3.2 程序中优雅地包含 Python 3.3 from None 异常语法?

    我正在尝试重新引发异常 以便为用户提供有关实际错误的更好信息 Python 3 3 包括PEP 409 http www python org dev peps pep 0409 它添加了raise NewException from No
  • 获取 Bash 和 KornShell (ksh) 中命令的退出代码

    我想写这样的代码 command some command safeRunCommand command safeRunCommand cmnd 1 cmnd if 0 then printf Error when executing co
  • 如何使用相机谷歌地图 xcode 移动标记(图钉)

    我在我的应用程序中使用谷歌地图 API 我的应用程序中有两个按钮 第一个按钮在我的地图中添加一个标记 图钉 现在我想要第二个按钮将添加的图钉水平移动到页面中心 并使其移动到页面顶部的 25 我希望相机 用户正在查看的区域 也移动它 这是我的
  • 使用 python 从 XSD 文件创建特定的 XML 文件

    我有一个现有的 xsd 架构 并且需要创建 希望使用 Python 带有一些特定输入的 XML 文件 最好的方法是什么 我尝试了 Element Tree 和 xmlschema 但我无法判断它们是否允许从已知的 XSD 架构开始生成 XM
  • 您应该通过属性访问同一类中的变量吗?

    如果您有一个获取和设置实例变量的属性 那么通常您总是使用该类外部的属性来访问它 我的问题是你也应该在课堂上这样做吗 如果有的话 我总是使用该属性 即使是在班级内 但我想听到一些支持和反对的论据 以确定哪个是最正确的以及为什么 或者这只是项目
  • 使 HTML5 视频海报与视频本身大小相同

    有谁知道如何调整 HTML5 视频海报的大小 使其适合视频本身的确切尺寸 这是一个显示问题的 jsfiddle http jsfiddle net zPacg 7 http jsfiddle net zPacg 7 这是代码 HTML
  • Console.ReadLine() 末尾没有换行符?

    问题很简单 当我使用 Console ReadLine 控制台上打印的下一个内容将在下一行 有什么办法可以继续打印该行吗 提前致谢 请检查 控制台 Read 这不会导致新行或换行
  • MySQL 监听通知等效项

    是否有相当于 PostgresQL 的notify http www postgresql org docs 9 1 static sql notify html and listen http www postgresql org doc
  • 如何在 C# 中将 IEnumerable 转换为 Enum?

    我已将多个字符串解析为枚举标志 但看不到将它们合并为单个枚举位字段的巧妙方法 我使用的方法循环遍历字符串值 然后 将值转换为 Enum 对象 如下所示 Flags public enum MyEnum None 0 First 1 Seco
  • Spring - 使用 new 是一种不好的做法吗?

    正在创建对象by hand 即使用new操作员而不是注册Springbean 和使用依赖注入被认为是不好的做法吗 我的意思是 确实Spring IoC容器必须了解应用程序中的所有对象吗 如果是这样 为什么 你希望 Spring 创建 bea
  • dask groupby 不合并分区

    我有一组数据 我想要对其进行一些简单的 groupby count 操作 但我似乎无法使用 dask 来完成此操作 我很可能不理解 dask 中执行 groupby reduce 的方式 特别是当索引位于分组键中时 所以我将用玩具数据来说明
  • 当父类也实现 IDisposable 时,在子类上实现 IDisposable

    我有一个父类和子类都需要实现IDisposable 应该在哪里virtual and base Dispose 通话发挥作用 当我刚刚覆盖Dispose bool disposing 打电话 说我实现了感觉真的很奇怪IDisposable没
  • PyCharm 虚拟环境和 Anaconda 环境有什么区别?

    当我在 PyCharm 中创建新项目时 它会创建一个新的虚拟环境 我读到 当我执行Python脚本时 它们是使用此环境中的解释器而不是系统环境来执行的 因此 如果我需要安装一些软件包 我只能将它们安装在这个环境中 而不是在系统环境中 这很酷
  • 绘图 Deedle 框架

    我有以下代码 let mychart frame GetAllSeries gt Seq iter fun key value gt Chart Line value Name key gt Chart Combine where fram
  • 如何使用 haproxy 负载均衡器 Kafka Bootstrap?

    我有一个 kafka 集群 由 3 台在 AWS 上运行的机器组成 卡夫卡1到卡夫卡3 我正在使用新型卡夫卡消费者 gt 0 8 我知道kafka客户端连接到其中一台kafka服务器 获取服务器元数据 然后直接连接到代理 我想确保在代理发生