如何更好地使用Kafka?

2023-11-17

引言| 要确保Kafka在使用过程中的稳定性,需要从kafka在业务中的使用周期进行依次保障。主要可以分为:事先预防(通过规范的使用、开发,预防问题产生)、运行时监控(保障集群稳定,出问题能及时发现)、故障时解决(有完整的应急预案)这三阶段。

事先预防

事先预防即通过规范的使用、开发,预防问题产生。主要包含集群/生产端/消费端的一些最佳实践、上线前测试以及一些针对紧急情况(如消息积压等)的临时开关功能。

Kafka调优原则:

1.确定优化目标,并且定量给出目标(Kafka 常见的优化目标是吞吐量、延时、持久性和可用性)。

2.确定了目标之后,需要明确优化的维度。

通用性优化:操作系统、JVM 等。

针对性优化:优化 Kafka 的 TPS、处理速度、延时等。

(一)生产端最佳实践

  • 参数调优

  • 使用 Java 版的 Client;
  • 使用 kafka-producer-perf-test.sh 测试你的环境;
  • 设置内存、CPU、batch 压缩;
  • batch.size:该值设置越大,吞吐越大,但延迟也会越大;
  • linger.ms:表示 batch 的超时时间,该值越大,吞吐越大、但延迟也会越大;
  • max.in.flight.requests.per.connection:默认为5,表示 client 在 blocking 之前向单个连接(broker)发送的未确认请求的最大数,超过1时,将会影响数据的顺序性;
  • compression.type:压缩设置,会提高吞吐量;
  • acks:数据 durability 的设置;
  • 避免大消息(占用过多内存、降低broker处理速度);
  • broker调整:增加 num.replica.fetchers,提升 Follower 同步 TPS,避免 Broker Full GC 等;
  • 当吞吐量小于网络带宽时:增加线程、提高 batch.size、增加更多 producer 实例、增加 partition 数;
  • 设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解;
  • 跨数据中心的传输:增加 socket 缓冲区设置以及 OS tcp 缓冲区设置。
  • 开发实践

a.做好Topic隔离

根据具体场景(是否允许一定延迟、实时消息、定时周期任务等)区分kafka topic,避免挤占或阻塞实时业务消息的处理。

b.做好消息流控

如果下游消息消费存在瓶颈或者集群负载过高等,需要在生产端(或消息网关)实施流量生产速率的控制或者延时/暂定消息发送等策略,避免短时间内发送大量消息。

c.做好消息补推

手动去查询丢失的那部分数据,然后将消息重新发送到mq里面,把丢失的数据重新补回来。

d.做好消息顺序性保障

如果需要在保证Kafka在分区内严格有序的话(即需要保证两个消息是有严格的先后顺序),需要设置key,让某类消息根据指定规则路由到同一个topic的同一个分区中(能解决大部分消费顺序的问题)。


但是,需要避免分区内消息倾斜的问题(例如,按照店铺Id进行路由,容易导致消息不均衡的问题)。

1.生产端:消息发送指定key,确保相同key的消息发送到同一个partition。

2.消费端:单线程消费或者写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue。

e.适当提高消息发送效率

批量发送:kafka先将消息缓存在内存中的双端队列(buffer)中,当消息量达到batch size指定大小时进行批量发送,减少了网络传输频次,提高了传输效率;

端到端压缩消息:将一批消息打包后进行压缩,发送给 Broker 服务器后,但频繁的压缩和解压也会降低性能,最终还是以压缩的方式传递到消费者的手上,在 Consumer 端进行解压;

异步发送:将生产者改造为异步的方式,可以提升发送效率,但是如果消息异步产生过快,会导致挂起线程过多,内存不足,最终导致消息丢失;

索引分区并行消费:当一个时间相对长的任务在执行时,它会占用该消息所在索引分区被锁定,后面的任务不能及时派发给空闲的客户端处理,若服务端如果启用索引分区并行消费的特性,就可以及时的把后面的任务派发给其他的客户端去执行,同时也不需要调整索引的分区数(但此类消息仅适用于无需保证消息顺序关系的消息)。

f.保证消息发送可靠性

Producer:如果对数据可靠性要求很高的话,在发送消息的时候,需要选择带有 callBack 的api进行发送,并设置 acks、retries、factor等等些参数来保证Producer发送的消息不丢失。

Broker:kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中,并采用了批量刷盘的做法,如果对数据可靠性要求很高的话,可以修改为同步刷盘的方式提高消息的可靠性。

(二)消费端最佳实践

  • 参数调优

  • 吞吐量:调整partition 数、OS page cache(分配足够的内存来缓存数据);
  • offset topic(__consumer_offsets):offsets.topic.replication.factor(默认为3)、offsets.retention.minutes(默认为1440,即 1day);
  • offset commit较慢:异步 commit 或 手动 commit;
  • fetch.min.bytes 、fetch.max.wait.ms;
  • max.poll.interval.ms:调用 poll() 之后延迟的最大时间,超过这个时间没有调用 poll() 的话,就会认为这个 consumer 挂掉了,将会进行 rebalance;
  • max.poll.records:当调用 poll() 之后返回最大的 record 数,默认为500;
  • session.timeout.ms;
  • Consumer Rebalance:check timeouts、check processing times/logic、GC Issues;
  • 网络配置。

  • 开发实践

a.做好消息消费幂等

消息消费的幂等主要根据业务逻辑做调整。

以处理订单消息为例

1.由订单编号+订单状态唯一的幂等key,并存入redis;

2.在处理之前,首先会去查Redis是否存在该Key,如果存在,则说明已经处理过了,直接丢掉;

3.如果Redis没处理过,则将处理过的数据插入到业务DB上,再到最后把幂等Key插入到Redis上;

简而言之,即通过Redis做前置处理 + DB唯一索引做最终保证来实现幂等性。

b.做好Consumer隔离

在消息量非常大的情况下,实时和离线消费者同时消费一个集群,离线数据繁重的磁盘 IO 操作会直接影响实时业务的实时性和集群的稳定性。

根据消费的实时性可以将消息消费者行为划分两类:实时消费者和离线消费者。

实时消费者:对数据实时性要求较高;在实时消费的场景下,Kafka 会利用系统的 page cache 缓存,直接从内存转发给实时消费者(热读),磁盘压力为零,适合广告、推荐等业务场景。

离线消费者(定时周期性消费者):通常是消费数分钟前或是数小时前的消息,这类消息通常存储在磁盘中,消费时会触发磁盘的 IO 操作(冷读),适合报表计算、批量计算等周期性执行的业务场景。

c.避免消息消费堆积

  • 延迟处理、控制速度,时间范围内分摊消息(针对实时性不高的消息);
  • 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS;
  • 避免很重的消费逻辑,优化consumer TPS:

是否有大量DB操作;

下游/外部服务接口调用超时;

是否有lock操作(导致线程阻塞);

需要特别关注kafka异步链路中的涉及消息放大的逻辑。

  • 如果有较重的消费逻辑,需要调整xx参数,避免消息没消费完时,消费组退出,造成reblance等问题;
  • 确保consumer端没有因为异常而导致消费hang住;
  • 如果使用的是消费者组,确保没有频繁地发生rebalance;
  • 多线程消费,批量拉取处理。

注:批量拉取处理时,需注意下kafka版本,spring-kafka 2.2.11.RELEASE版本以下,如果配置kafka.batchListener=true,但是将消息接收的元素设置为单个元素(非批量List),可能会导致kafka在拉取一批消息后,仅仅消费了头部的第一个消息。

d.避免Rebalance问题

  • 触发条件:

1.消费者数量变化:新消费者加入、消费者下线(未能及时发送心跳,被“踢出”Group)、消费者主动退出消费组(Consumer 消费时间过长导致);

2.消费组内订阅的主题或者主题的分区数量发生变化;

3.消费组对应的 GroupCoorinator 节点发生变化。

  • 如何避免非必要rebalance(消费者下线、消费者主动退出消费组导致的reblance):

1.需要仔细地设置session.timeout.ms(决定了 Consumer 存活性的时间间隔)和heartbeat.interval.ms(控制发送心跳请求频率的参数) 的值。

2.max.poll.interval.ms参数配置:控制 Consumer 实际消费能力对 Rebalance 的影响,限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。默认值是 5 分钟,表示 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。具体可以统计下历史的时间花费,把最长的时间为参考进行设置。

e.保证消息消费可靠性

一般情况下,还是client 消费 broker 丢消息的场景比较多,想client端消费数据不能丢,肯定是不能使用autoCommit的,所以必须是手动提交的。

Consumer自动提交的机制是根据一定的时间间隔,将收到的消息进行commit。commit过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit消息已经提交了,则此时消息就丢失了。

f.保证消息消费顺序性

1.不同topic(乱序消息):如果支付与订单生成对应不同的topic,只能在consumer层面去处理了。

2.同一个topic(乱序消息):一个topic可以对应多个分区,分别对应了多个consumer,与“不同topic”没什么本质上的差别。(可以理解为我们的服务有多个pod,生产者顺序发送消息,但被路由到不同分区,就可能变得乱序了,服务消费的就是无序的消息)。

3.同一个topic,同一个分区(顺序消息):Kafka的消息在分区内是严格有序的,例如把同一笔订单的所有消息,按照生成的顺序一个个发送到同一个topic的同一个分区。

针对乱序消息

例如:订单和支付分别封装了各自的消息,但是消费端的业务场景需要按订单消息->支付消息的顺序依次消费消息。

宽表(业务主题相关的指标、维度、属性关联在一起的一张数据库表):消费消息时,只更新对应的字段就好,消息只会存在短暂的状态不一致问题,但是状态最终是一致的。例如订单,支付有自己的状态字段,订单有自己的状态字段,售后有自己的状态字段,就不需要保证支付、订单、售后消息的有序,即使消息无序,也只会更新自己的状态字段,不会影响到其他状态;

消息补偿机制:将消息与DB进行对比,如果发现数据不一致,再重新发送消息至主进程处理,保证最终一致性;

MQ队列:一个中间方(比如redis的队列)来维护MQ的顺序;

业务保证:通过业务逻辑保障消费顺序;

针对顺序消息

两者都是通过将消息绑定到定向的分区或者队列来保证顺序性,通过增加分区或者线程来提升消费能力。

1.Consumer单线程顺序消费

生产者在发送消息时,已保证消息在分区内有序,一个分区对应了一个消费者,保证了消息消费的顺序性。

2.Consumer多线程顺序消费(具体策略在后面章节)

单线程顺序消费的扩展能力很差。为了提升消费者的处理速度,除了横向扩展分区数,增加消费者外,还可以使用多线程顺序消费。

将接收到的kafka数据进行hash取模(注意:如果kafka分区接受消息已经是取模的了,这里一定要对id做一次hash再取模)发送到不同的队列,然后开启多个线程去消费对应队列里面的数据。

此外,这里通过配置中心进行开关、动态扩容/缩容线程池。

g.处理Consumer的事务

通过事务消息,可以很好的保证一些业务场景的事务逻辑,不会因为网络不可用等原因出现系统之间状态不一致。

当更新任何一个服务出现故障时就抛出异常,事务消息不会被提交或回滚,消息服务器会回调发送端的事务查询接口,确定事务状态,发送端程序可以根据消息的内容对未做完的任务重新执行,然后告诉消息服务器该事务的状态。

(三)集群配置最佳实践

  • 集群配置

Broker 评估:每个 Broker 的 Partition 数不应该超过2k、控制 partition 大小(不要超过25GB)。

集群评估(Broker 的数量根据以下条件配置):数据保留时间、集群的流量大小。

集群扩容:磁盘使用率应该在 60% 以下、网络使用率应该在 75% 以下。

集群监控:保持负载均衡、确保 topic 的 partition 均匀分布在所有 Broker 上、确保集群的阶段没有耗尽磁盘或带宽。

  • Topic 评估

1.Partition 数:

Partition 数应该至少与最大 consumer group 中 consumer 线程数一致;

对于使用频繁的 topic,应该设置更多的 partition;

控制 partition 的大小(25GB 左右);

考虑应用未来的增长(可以使用一种机制进行自动扩容);

2.使用带 key 的 topic;

3.partition 扩容:当 partition 的数据量超过一个阈值时应该自动扩容(实际上还应该考虑网络流量)。

  • 分区配置

设置多个分区在一定程度上是可以提高消费者消费的并发度,但是分区数量过多时可能会带来:句柄开销过大、生产端占用内存过大、可能增加端到端的延迟、影响系统可用性、故障恢复时间较长等问题。

根据吞吐量的要求设置 partition 数:

1.假设 Producer 单 partition 的吞吐量为 P

2.consumer 消费一个 partition 的吞吐量为 C

3.而要求的吞吐量为 T

4.那么 partition 数至少应该大于 T/P、T/c 的最大值

(四)性能调优

调优目标:高吞吐量、低延时。

  • 分层调优

自上而下分为应用程序层、框架层、JVM层和操作系统层,层级越靠上,调优的效果越明显。

  • 吞吐量(TPS)调优
  • 延时调优

(五)稳定性测试

kafka的稳定性测试主要在业务上线前针对Kafka实例/集群健康性、高可用性的测试。

  • 健康性检查

1.检查实例:查看Kafka 实例对象中拿到所有的信息(例如 IP、端口等);

2.测试可用性:访问生产者和消费者,测试连接。

  • 高可用测试

单节点异常测试:重启Leader副本或Follower副本所在Pod

步骤:

1.查看topic的副本信息

2.删除相应pod

3.脚本检测Kafka的可用性

预期:对生产者和消费者的可用性均无影响。

集群异常测试:重启所有pod

步骤:

1.删除所有pod

2.脚本检测Kafka的可用性

预期:所有broker ready后服务正常。

运行时监控

运行时监控主要包含集群稳定性配置与Kafka监控的最佳实践,旨在及时发现Kafka在运行时产生的相关问题与异常。

(一)集群稳定性监控

  • 腾讯云CKafka集群配置

合理进行kafka实例配,主要关注这几个数据:

  1. 磁盘容量和峰值带宽
  2. 消息保留时长;
  3. 动态保留策略;

a.磁盘容量和峰值带宽

可根据实际业务的消息内容大小、发送消息qps等进行预估,可以尽量设置大点;具体数值可根据实例监控查看,如果短时间内磁盘使用百分比就达到较高值,则需扩容。

峰值带宽=最大生产流量*副本数

b.消息保留时长

消息即使被消费,也会持久化到磁盘存储保留时长的时间。该设置会占用磁盘空间,如果每天消息量很大的话,可适当缩短保留时间。

c.动态保留策略

推荐开启动态保留设置。当磁盘容量达到阈值,则删除最早的消息,最多删除到保底时长范围外的消息(淘汰策略),可以很大程度避免磁盘被打满的情况。

但有调整时不会主动通知,但我们可以通过配置告警感知磁盘容量的变化。

  • 自建Kafka集群配置

1.设置日志配置参数以使日志易于管理;

2.了解 kafka 的(低)硬件需求;

3.充分利用 Apache ZooKeeper;

4.以正确的方式设置复制和冗余;

5.注意主题配置;

6.使用并行处理;

7.带着安全性思维配置和隔离 Kafka;

8.通过提高限制避免停机;

9.保持低网络延迟;

10.利用有效的监控和警报。

  • 资源隔离

a.Broker级别物理隔离

如果不同业务线的 topic 会共享一块磁盘,若某个consumer 出现问题而导致消费产生 lag,进而导致频繁读盘,会影响在同一块磁盘的其他业务线 TP 的写入。

解决:Broker级别物理隔离:创建Topic、迁移Topic、宕机恢复流程

b.RPC队列隔离

Kafka RPC 队列缺少隔离,一旦某个 topic 处理慢,会导致所有请求 hang 住。

解决:需要按照控制流、数据流分离,且数据流要能够按照 topic 做隔离。

1.将 call 队列按照拆解成多个,并且为每个 call 队列都分配一个线程池。

2.一个队列单独处理 controller 请求的队列(隔离控制流),其余多个队列按照 topic 做 hash 的分散开(数据流之间隔离)。

如果一个 topic 出现问题,则只会阻塞其中的一个 RPC 处理线程池,以及 call 队列,可以保障其他的处理链路是畅通的。

  • 智能限速

整个限速逻辑实现在 RPC 工作线程处理的末端,一旦 RPC 处理完毕,则通过限速控制模块进行限速检测。

1.配置等待时间,之后放入到 delayed queue 中,否则放到 response queue 中。

2.放入到 delayed queue 中的请求,等待时间达到后,会被 delayed 线程放入到 response queue 中。

3.最终在 response queue 中的请求被返回给 consumer。

(二)Kafka监控

白盒监控:服务或系统自身指标,如CPU 负载、堆栈信息、连接数等;

黑盒监控:一般是通过模拟外部用户对其可见的系统功能进行监控的一种监控方式,相关指标如消息的延迟、错误率和重复率等性能和可用性指标。

  • 腾讯云CKafka告警

针对CKafka,需要配置告警(此类告警一般为消息积压、可用性、集群/机器健康性等检查)。

a.指标

如:实例健康状态、节点数量、健康节点数量、问题分区数、生产消息数、消费请求数、jvm内存利用率、平均生产响应时间、分区消费偏移量等。

具体指标可以参考:https://cloud.tencent.com/document/product/597/54514

b.配置

配置文档:https://cloud.tencent.com/document/product/597/57244

选择监控实例,配置告警内容和阈值。

一般会对当前服务自身的kafka集群做告警配置,但是如果是依赖自身消息的下游服务出现消费问题,我们是感知不到了;而且针对消费端服务不共用同一个集群的情况,出现消息重复发送的问题,服务自身是很难发现的。

c.预案

在业务上线前,最好梳理下自身服务所涉及的topic消息(上游生产端和下游消费端),并细化告警配置,如果出现上游kafka异常或者下游kafka消息堆积可以及时感知。特别需要把可能有瞬时大量消息的场景(如批量数据导入、定时全量数据同步等)做一定的告警或者预案,避免服务不可用或者影响正常业务消息。

  • 自建告警平台

通过自建告警平台配置对服务自身的异常告警,其中包括对框架在使用kafka组件时抛出与kafka消费逻辑过程中抛出的业务异常。

其中,可能需要异常升级的情况(由于)单独做下处理(针对spring kafka):

1.自定义kafka异常处理器:实现KafkaListenerErrorHandler接口的方法,注册自定义异常监听器,区分业务异常并抛出;

2.消费Kafka消息时,将@KafkaListener的errorHandler参数设置为定义的Kafka异常处理器;

3.此后,指定的业务异常会被抛出,而不会被封装成Spring kafka的框架异常,导致不能清晰地了解具体异常信息。

  • Kafka监控组件

目前业界并没有公认的解决方案,各家都有各自的监控之道。

Kafka Manager:应该算是最有名的专属 Kafka 监控框架了,是独立的监控系统。

Kafka Monitor:LinkedIn 开源的免费框架,支持对集群进行系统测试,并实时监控测试结果。

CruiseControl:也是 LinkedIn 公司开源的监控框架,用于实时监测资源使用率,以及提供常用运维操作等。无 UI 界面,只提供 REST API。

JMX 监控:由于 Kafka 提供的监控指标都是基于 JMX 的,因此,市面上任何能够集成 JMX 的框架都可以使用,比如 Zabbix 和 Prometheus。已有大数据平台自己的监控体系:像 Cloudera 提供的 CDH 这类大数据平台,天然就提供 Kafka 监控方案。

JMXTool:社区提供的命令行工具,能够实时监控 JMX 指标。答上这一条,属于绝对的加分项,因为知道的人很少,而且会给人一种你对 Kafka 工具非常熟悉的感觉。如果你暂时不了解它的用法,可以在命令行以无参数方式执行一下kafka-run-class.sh kafka.tools.JmxTool,学习下它的用法。

  • Kafka Monitor

其中,Kafka Monitor通过模拟客户端行为,生产和消费数据并采集消息的延迟、错误率和重复率等性能和可用性指标,可以很好地发现下游的消息消费情况进而可以动态地调整消息的发送。(使用过程中需注意对样本覆盖率、功能覆盖率、流量、数据隔离、时延的控制)

Kakfa Monitor 优势

1.通过为每个 Partition 启动单独的生产任务,确保监控覆盖所有 Partition。

2.在生产的消息中包含了时间戳、序列号,Kafka Monitor 可以依据这些数据对消息的延迟、丢失率和重复率进行统计。

3.通过设定消息生成的频率,来达到控制流量的目的。

4.生产的消息在序列化时指定为一个可配置的大小(验证对不同大小数据的处理能力、相同消息大小的性能比较)。

5.通过设定单独的 Topic 和 Producer ID 来操作 Kafka 集群,可避免污染线上数据,做到一定程度上的数据隔离。

基于Kafka Monitor的设计思想,可以针对业务特点引入对消息的延迟、错误率和重复率等性能的监控告警指标。

故障时解决

防微杜渐,遇到问题/故障时有完整的应急预案,以快速定位并解决问题。

(一)Kafka消息堆积紧急预案

问题描述:消费端产生消息积压,导致依赖该消息的服务不能及时感知业务变化,导致一些业务逻辑、数据处理出现延迟,容易产生业务阻塞和数据一致性问题。

方案:问题排查、扩容升配策略、消息Topic转换策略、可配置多线程的消费策略。

  • 问题排查

遇到消息积压时,具体可以从以下几个角度去定位问题原因:

1.消息生产端数据量是否存在陡升的情况。

2.消息消费端消费能力是否有下降。

3.消息积压是发生在所有的partition还是所有的partition都有积压情况。

对于第1、2点导致的消息积压:为暂时性的消息积压,通过扩分区、扩容升配、多线程消费、批量消费等方式提高消费速度能在一定程度上解决这类问题。

对于第3点导致的消息积压:可以采用消息Topic中转策略。

  • 扩容升配策略

1.检查生产端消费发送情况(主要检查是否继续有消息产生、是否存在逻辑缺陷、是否有重复消息发送);

2.观察消费端的消费情况(预估下堆积消息的处理清理以及是否有降低趋势);

3.若为生产端问题,则评估是否可以通过增加分区数、调整偏移量、删除topic(需要评估影响面)等解决;

4.消费端新增机器及依赖资源,提高消费能力;

5.如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。

  • 配置多线程的消费策略

简而言之,即线程池消费+动态线程池配置策略:将接收到的kafka数据进行hash取模(如果kafka分区接受消息已经是取模的了,这里一定要对id做一次hash再取模)发送到不同的队列,然后开启多个线程去消费对应队列里面的数据。

设计思路:

1.在应用启动时初始化对应业务的顺序消费线程池(demo中为订单消费线程池);

2.订单监听类拉取消息提交任务至线程池中对应的队列;

3.线程池的线程处理绑定队列中的任务数据;

4.每个线程处理完任务后增加待提交的offsets标识数;

5.监听类中校验待提交的offsets数与拉取到的记录数是否相等,如果相等则;

6.手动提交offset(关闭kafka的自动提交,待本次拉取到的任务处理完成之后再提交位移)

另外,可以根据业务流量调整的线程配置与pod的配置,如高峰期设置一个相对较高的并发级别数用来快速处理消息,平峰期设置一个较小的并发级别数来让出系统资源。这里,可以参考美团提供的一种配置中心修改配置动态设置线程池参数的思路,实现动态的扩容或者缩容。

实现了动态扩容与缩容

1.通过配置中心刷新OrderKafkaListener监听类中的配置concurrent的值。

2.通过set方法修改concurrent的值时,先修改stopped的值去停止当前正在执行的线程池。

3.执行完毕后通过新的并发级别数新建一个新的线程池,实现了动态扩容与缩容。

此外,还可以新增开关,它设置为true是可以中断启动中的线程池,故障时进行功能开关。

注意:如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。

  • Topic中转策略

当消息积压是发生在所有的partition还是所有的partition都有积压情况时,只能操作临时扩容,以更快的速度去消费数据了。

设计思路:

1.临时建立好原先10倍或者20倍的queue数量(新建一个topic,partition是原来的10倍);

2.然后写一个临时分发消息的consumer程序,这个程序部署上去消费积压的消息,消费之后不做耗时处理,直接均匀轮询写入临时建好分10数量的queue里面;

3.紧接着征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的消息;

4.这种做法相当于临时将queue资源和consumer资源扩大10倍,以正常速度的10倍来消费消息。

5.等快速消费完了之后,恢复原来的部署架构,重新用原来的consumer机器来消费消息。

改进

1.consumer程序可以写在服务里面;

2.指定一个“预案topic”,在服务中预先写好对“预案topic”;

3.采用策略模式进行”业务topic“->“预案topic”的转换。

注意

1.如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验;

2.需要有个单独的topic转换服务,或修改服务代码,或在事前将多线程逻辑写好。

(二)Kafka消费异常导致消费阻塞

问题描述:某个消息消费异常或者某个操作较为耗时,导致单个pod的消费能力下降,甚至产生阻塞。

方案:设置偏移量;开关多线程的消费策略。

  • 设置偏移量

1.调整偏移量:联系运维,将offset后移一位;

2.消息补推:针对跳过的消息或某个时间段内的数据进行消息补推;


3.如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。

  • 开关多线程的消费策略

参考上面的“可配置多线程的消费策略”,在发生阻塞时开启多线程消费开关。

注:需要修改代码或者在事前将多线程逻辑写好

(三)Kafka消息丢失预案

问题描述:服务没有按照预期消费到kafka消息,导致业务产生问题。

方案:根因分析;消息补推。

  • 根因分析

1.生产端是否成功发送消费(源头丢失)

Broker丢失消息:Kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中,异步刷盘有肯能造成源头数据丢失;

Producer丢失消息:发送逻辑存在Bug,导致消息为发送成功。

解决:需要检查生产端与集群健康性;消息补发。

2.是否被成功消费

Consumer自动提交的机制是根据一定的时间间隔,将收到的消息进行commit。commit过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit消息已经提交了。

此外,如果消费逻辑有bug,也导致消息丢失的假象。

解决:修复问题,视情况修改消费确认机制。

3.是否有其他服务共用了同一个消费组

多服务误用同一个消费组会导致消息一定比率或规律性丢失。

例如,创建用户的kafka消息,可能价格中心和促销服务误用了一个消费组,导致每个服务都是消费了部分消息,导致一些问题出现偶现的情况。

解决:修改配置,重启服务,各种建立的消费组;事前需要有检查是否有多个服务共用一个消费的情况(检测+比对)。

  • 消息补推

1.通过业务影响查询影响的数据信息;

2.构建kafka消息,进行消息补偿;

3.如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。

针对每个对外发送的服务,生产端一般都需要有较为完善的消息补推接口,并且消费端也需要保障消息消费的幂等。

其他

(一)Kafka成本控制

机器、存储和网络

  • 机器

需要重新评估你的实例类型决策:你的集群是否饱和?在什么情况下饱和?是否存在其他实例类型,可能比你第一次创建集群时选择的类型更合适?EBS 优化实例与 GP2/3 或 IO2 驱动器的混合是否真的比 i3 或 i3en 机器(及其带来的优势)有更好的性价比?

  • 存储与网络

压缩在 Kafka 中并不新鲜,大多数用户已经知道了自己可以在 GZIP、Snappy 和 LZ4 之间做出选择。但自从KIP-110被合并进 Kafka,并添加了用于 Zstandard 压缩的压缩器后,它已实现了显著的性能改进,并且是降低网络成本的完美方式。

以生产者端略高的 CPU 使用率为代价,你将获得更高的压缩率并在线上“挤进”更多信息。

Amplitude在他们的帖子中介绍,在切换到 Zstandard 后,他们的带宽使用量减少了三分之二,仅在处理管道上就可以节省每月数万美元的数据传输成本。

  • 集群

不平衡的集群可能会损害集群性能,导致某些 borker 比其他 broker 的负载更大,让响应延迟更高,并且在某些情况下会导致这些 broker 的资源饱和,从而导致不必要的扩容,进而会影响集群成本。

此外,不平衡集群还面临一个风险:在一个 broker 出故障后出现更高的 MTTR(例如当该 broker 不必要地持有更多分区时),以及更高的数据丢失风险(想象一个复制因子为 2 的主题,其中一个节点由于启动时要加载的 segment 过多,于是难以启动)。

(二)消息消费的幂等

定义:

所谓幂等性,数学概念就是: f(f(x)) = f(x) 。f函数表示对消息的处理。通俗点来讲就是,在消费者收到重复消息进行重复处理时,也要保证最终结果的一致性。

比如,银行转账、下单等,不管重试多少次,都要保证最终结果一定是一致的。

  • 利用数据库的唯一约束

将数据库中的多个字段联合,创建一个唯一约束,即使多次操作也能保证表里至多存在一条记录(如创建订单、创建账单、创建流水等)。

此外,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统(如Redis的SETNX)都可以用于实现幂等消费。

  • 设置前置条件

1.给数据变更设置一个前置条件(版本号version、updateTime);

2.如果满足条件就更新数据,否则拒绝更新数据;

3.在更新数据的时候,同时变更前置条件中的数据(版本号+1、更新updateTime)。

  • 记录并检查操作

1.给每条消息都记录一个全局唯一 ID;

2.消费时,先根据这个全局唯一 ID 检查这条消息是否有被消费过;

3.如果没有消费过,则更新数据,并将消费状态置为“已消费”状态。

其中,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性。

参考:

[1]https://iwiki.woa.com/pages/viewpage.action?pageId=1126809993
[2]https://www.infoq.cn/article/ucSru1uKkSswLXPcjQgC?source=app_share
[3]https://blog.csdn.net/qq_32179907/article/details/122599769
[4]https://blog.csdn.net/qq_32179907/article/details/122599769
[5]404 - 知乎
[6]https://blog.csdn.net/philip502/article/details/118997899utm_medium=distribute.wap_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-118997899-blog-125192952.wap_relevant_multi_platform_whitelistv1&spm=1001.2101.3001.4242.1&utm_relevant_index=1[7]404 - 知乎
[8]404 - 知乎
[9]https://www.infoq.cn/article/contrast-with-kafka-and-jingdong-jmq?source=app_share
[10]https://www.infoq.cn/article/BF3mm9haDs-cdHCXOLlf?source=app_share
[11]https://www.infoq.cn/article/wmM8WXzLEgfGMKYpbF0N?source=app_share
[12]https://www.infoq.cn/article/Q0o*QzLQiay31MWiOBJH?source=app_share

阅读原文

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何更好地使用Kafka? 的相关文章

  • 如何使用Spring WebClient进行同步调用?

    Spring Framework in 休息模板 https docs spring io spring framework docs current javadoc api org springframework web client R
  • Android 中的列表(特别是 RecyclerView 和 CardView)如何工作

    请原谅我问这个问题 但我是 Android 开发新手 尽管我正在尝试了解developer android com 网站上的基础知识 但大多数示例 即使他们说它们是为 Android Studio 构建的 尚未设置为使用 Gradle 因此
  • (Java) App Engine 中的静态文件无法访问

    The 示例文档 http code google com appengine docs java gettingstarted staticfiles html表示您只需将文件放在 war 或子目录 中 并且应该可以从主机访问它们 只要它
  • 文本在指定长度后分割,但不要使用 grails 打断单词

    我有一个长字符串 需要将其解析为长度不超过 50 个字符的字符串数组 对我来说 棘手的部分是确保正则表达式找到 50 个字符之前的最后一个空格 以便在字符串之间进行彻底的分隔 因为我不希望单词被切断 public List
  • 在 Struts 2 中传递 URL 参数而不使用查询字符串

    我想使用类似的 URL host ActionName 123 abc 而不是像这样传递查询字符串 host ActionName parm1 123 parm2 abc 我怎样才能在 Struts 2 中做到这一点 我按照下面的方法做了
  • 如何检测图像是否像素化

    之前有人在 SO 上提出过这样的问题 在Python中检测像素化图像 https stackoverflow com questions 12942365 detecting a pixelated image in python还有关于q
  • 是否可以从 servlet 内部以编程方式设置请求上下文路径?

    这是一个特殊情况 我陷入了处理 企业 网络应用程序的困境 企业应用程序正在调用request getContext 并将其与另一个字符串进行比较 我发现我可以使用 getServletContext getContextPath 获取 se
  • 从直方图计算平均值和百分位数?

    我编写了一个计时器 可以测量任何多线程应用程序中特定代码的性能 在下面的计时器中 它还会在地图中填充花费了 x 毫秒的调用次数 我将使用这张图作为我的直方图的一部分来进行进一步的分析 例如调用花费了这么多毫秒的百分比等等 public st
  • 添加到列表时有没有办法避免循环?

    我想知道这样的代码 List
  • 通过 appassembler-maven-plugin 生成的脚本无法在 Spring Boot 应用程序中找到主类

    我使用 appassembler maven plugin 生成的启动脚本有问题 我有一个基本的 spring boot 应用程序 只有一个类 SpringBootApplication public class ScriptDemoApp
  • Java:如何确定文件所在的驱动器类型?

    Java 是否有一种独立于平台的方法来检测文件所在的驱动器类型 基本上我有兴趣区分 硬盘 可移动驱动器 如 USB 记忆棒 和网络共享 JNI JNA 解决方案不会有帮助 可以假设 Java 7 您可以使用 Java 执行 cmd fsut
  • 我们如何测试包私有类?

    我正在看书Effective Java in Item 13 Minimize the accessibility of classes and members 它提到 为了方便测试 您可能想让类 接口或成员更易于访问 这在某种程度上是好的
  • JAVA中遍历JSON数据

    我是 JSON 新手 我使用 HTTPUrlConnections 并在 JAVA 程序中获得一些响应 响应数据将类似于 data id 1 userId 1 name ABC modified 2014 12 04 created 201
  • Java - 从 XML 文件读取注释

    我必须从 XML 文件中提取注释 我找不到使用 JDOM 或其他东西来让它们使用的方法 目前我使用 Regex 和 FileReader 但我不认为这是正确的方法 您可以使用 JDOM 之类的东西从 XML 文件中获取注释吗 或者它仅限于元
  • 避免 Java 中的重复导入:继承导入?

    有没有办法 继承 导入 Example 常见枚举 public enum Constant ONE TWO THREE 使用此枚举的基类 public class Base protected void register Constant
  • 如何处理 StaleElementReferenceException

    我正在为鼠标悬停工作 我想通过使用 for 循环单击每个链接来测试所有链接的工作条件 在我的程序中 迭代进行一次 而对于下一次迭代 它不起作用并显示 StaleElementReferenceException 如果需要 请修改代码 pub
  • JMS 中的 MessageListener 和 Consumer 有什么区别?

    我是新来的JMS 据我了解Consumers能够从队列 主题中挑选消息 那么为什么你需要一个MessageListener因为Consumers会知道他们什么时候收到消息吗 这样的实际用途是什么MessageListener 编辑 来自Me
  • 何时在 hibernate 中使用 DiscriminatorValue 注解

    在 hibernate 中使用 DiscriminatorValue 注释的最佳场景是什么以及何时 这两个链接最能帮助我理解继承概念 http docs oracle com javaee 6 tutorial doc bnbqn html
  • HttpClient请求设置属性问题

    我使用这个 HttpClient 库玩了一段时间 几周 我想以某种方式将属性设置为请求 不是参数而是属性 在我的 servlet 中 我想使用 Integer inte Integer request getAttribute obj 我不
  • 将对象从手机共享到 Android Wear

    我创建了一个应用程序 在此应用程序中 您拥有包含 2 个字符串 姓名和年龄 和一个位图 头像 的对象 所有内容都保存到 sqlite 数据库中 现在我希望可以在我的智能手表上访问这些对象 所以我想实现的是你可以去启动 启动应用程序并向左和向

随机推荐

  • vscode c++ 的环境配置 (完美版)

    怎么下载MinGW64 https blog csdn net skh2015java article details 85075032 vscode c 的环境配置 https blog csdn net qq 43041976 arti
  • ElasticSearch--Field的使用

    目录 一 Field的介绍 二 Field的属性介绍 三 常用的Field类型 一 text文本字段 二 keyword关键字字段 三 date日期类型 四 Numeric类型 四 Field属性的设置标准 一 Field的介绍 上周的一篇
  • 顺丰科技 Hudi on Flink 实时数仓实践

    关注 Flink 中文社区 获取更多技术干货 摘要 本文作者刘杰 介绍了顺丰科技数仓的架构 趟过的一些问题 使用 Hudi 来优化整个 job 状态的实践细节 以及未来的一些规划 主要内容为 数仓架构 Hudi 代码躺过的坑 状态优化 未来
  • 【MindSpore易点通】深度学习系列-那些介于模糊与清楚之间的一些概念

    之前小编就给大家提过正则化 超链接 其实还有很多定义大家是有点模糊又有点清楚的 今天好好带大家一起捋一遍 1训练集 验证集 测试集 正确地配置训练 验证和测试数据集 会很大程度上帮助大家创建高效的神经网络 即使是深度学习专家也不太可能一开始
  • Ubuntu18.4开机时进入命令行界面或进入bios设置

    开机时进入命令行界面 开机时按ctrl alt Fx Fx是从F1到F6选择一个 ctrl alt F7切换到图形界面 开机时进入bios设置 开机时按F2
  • c++实现合并两个有序链表

    leetcode题目 力扣 执行结果 代码实现 Definition for singly linked list struct ListNode int val ListNode next ListNode val 0 next null
  • 输入引脚时钟约束_时钟树例外(exclude pin、stop pin、non_stop pin、float pin)

    时钟树例外 exclude pin stop pin non stop pin float pin 回复 以下关键词 查看更多IC设计教程 目前支持的关键词有 Innovus ICC or IC Compiler DC or Design
  • 等保2.0测评综合得分计算

    文章目录 概述 公式及说明 分类计算实例 单一对象 多个对象 结果 未经本人许可 不能转载 转发 2021 6 20更新 2021新版的等保测评报告6 17出炉 6 18启用 新版综合得分计算可以看这里 这里 新版测评综合得分计算实例看 这
  • spring中的单元测试的策略

    本文主要介绍使用spring提供的对junit的扩展机制来进行单元测试 没有设计mock方面的测试 一 Spring提供的JUnit框架扩展 AbstractSpringContextTests spring中使用spring上下文测试的J
  • js高级 7.原型链继承

    原型链继承 套路 定义父类型构造函数 给父类型的原型添加方法 定义子类型的构造函数 创建父类型的对象赋值给子类型的原型 将子类型原型的构造属性设置为子类型 给子类型原型添加方法 创建子类型的对象 可以调用父类型的方法 关键 子类型的原型为父
  • AI人工智能Mojo语言:AI的新编程语言

    推荐 使用 NSDT场景编辑器 快速搭建3D应用场景 Mojo的主要功能包括 类似Python的语法和动态类型使Python开发人员易于学习Mojo 因为Python是现代AI ML开发背后的主要编程语言 使用Mojo 您可以导入和使用任何
  • RPC(远程过程调用)详解

    转自 https blog csdn net daaikuaichuan article details 88595202 仅用于自己学习使用 如有侵权删 一 RPC是什么 RPC是指远程过程调用 也就是说两台服务器A B 一个应用部署在A
  • 单片机串口中断以及消息收发处理——对接受信息进行判断实现控制

    目录 本次自己捣鼓的问题 自己摸索的一个实验 实现效果 初步基础 实现步骤 实验结果 主要代码 本次自己捣鼓的问题 自己摸索的一个实验 以51的单片机来说 用定时器2作为串口1来进行串口实验 检验以下的数据 任意数据 hello 1 yzh
  • npm node-sass 安装错误

    控制台运行npm install时报错 报错信息如下 npm ERR code ELIFECYCLE npm ERR errno 1 npm ERR node sass 4 9 2 postinstall node scripts buil
  • log4j2漏洞原理简述

    影响版本 Apache Log4j 2 x lt 2 14 1 jdk不知道 有知道的师傅麻烦告诉下 漏洞原理 由于Apache Log4j存在递归解析功能 lookup 未取得身份认证的用户 可以从远程发送数据请求输入数据日志 轻松触发漏
  • JAVA 基数排序算法(详细实现过程介绍)

    基数排序 是将整数按位数切割成不同的数字 然后按每个位数分别比较从而得到有序的序列 本文以数组中元素均为正整数来演示 给定一个数组 arr 53 3 542 748 14 214 准备十个桶 0 9 第一轮按照元素的个位数排序 0 9的各个
  • ffmpeg提取视频分辨率输出为批处理变量

    在使用ffmpeg批处理编码视频时候 如果导入的素材尺寸不一样 得每次输入分辨率很麻烦 这里提供一个自动提取拖入的视频文件的分辨率的批处理脚本 另存为bat即可 需要文件夹下有ffprobe exe 原理是通过ffprobe exe把媒体信
  • Python 列表的定义

    视频版教程 Python3零基础7天入门实战视频教程 容器 容器是一种可以存储多个元素的数据类型 Python中的容器有 列表list 元组tuple 字符串str 集合set 字典dict 列表list 列表是多个元素的集合 列表的定义
  • 安装LR提示“此计算机缺少 vc2005_sp1_with_atl_fix_redist,请安装所有缺少的必要组件,然后重新运行此安装“

    安装LoadRunner 11时弹窗提示 Micosoft Visual C 2005 SP1 可再发行组件包 X86 命令行选项语法错误 键入命令 可获得帮助信息 或者弹窗提示 此计算机缺少 vc2005 sp1 with atl fix
  • 如何更好地使用Kafka?

    引言 要确保Kafka在使用过程中的稳定性 需要从kafka在业务中的使用周期进行依次保障 主要可以分为 事先预防 通过规范的使用 开发 预防问题产生 运行时监控 保障集群稳定 出问题能及时发现 故障时解决 有完整的应急预案 这三阶段 事先