第一章 初识Kafka
kafka是一款发布订阅的消息系统,具体结构从大向下可以列举为:
1个Kafka集群种有N个broker,一个broker有N个主题分区
broker指的是一个独立的Kafka服务器
主题指的是消息的分类
为什么要选用Kafka
发布订阅的消息系统很多,选择Kafka的原因如下:
多个生产者
Kafka可以支持多个生产者,这是因为Kafka是以主题(TOPIC)来区分消息的,多个生产者可以向同一个主题发送消息,从而支持多个生产者。
多个消费者
一个主题可以被多个消费者消费,并且可以有两种消费方式:
第一种是消费时,一个消息仅消费一次(多个应用设置同一个消费组)
第二种是消费时,一个消息同时被多个应用消费(多个应用,设置为不同的消费组)
基于磁盘的数据存储
Kafka的消息会被提交到磁盘种,因此当消费者挂掉时,消息也不会消失。
伸缩性
独立的Kafka服务器是一个Broker,当性能不能够时,可以扩展为Kafka集群
高性能
- Cache Filesystem Cache PageCache缓存
- 顺序写 由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。
- Zero-copy 零拷技术减少拷贝次数
- Batching of Messages 批量量处理。合并小的请求,然后以流的方式进行交互,直顶网络上限。
- Pull 拉模式 使用拉模式进行消息的获取消费,与消费端处理能力相符。
使用场景
活动跟踪
分析用户与前端交互的日志信息
传递消息
邮件,通知
度量指标和日志记录
系统日志或者系统指标的分析
提交日志
例如数据库的日志信息可以用来进行缓存更新
流处理
对Kafka的消息进行简单的统计处理
第二章 安装Kafkas
zookeeper
Kafka使用zookeeper保存集群的元数据信息和消费者信息,zookeeper的选举算法见
https://blog.csdn.net/wx1528159409/article/details/84622762?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.control
第三章 Kafka生产者
生产者场景
不同的生产者会有不同的场景,从来产生不同的API使用的方式。主要考虑以下几点:
- 是否允许丢失数据
- 是否允许重复消费
- 延迟和吞吐量需求是怎么样的
创建Kafka生产者
必填属性
bootsrap.servers:集群地址,不需要包含所有的broker地址,会从其中某个broker中获取其他broker地址。
key.serializer:key的序列化方式
value.serializer:value的序列化方式
可选参数
-
acks:指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入时成功的。该参数对消息丢失的可能性有重要影响
acks=0,生产者不判断消息是否写入成功,消息丢失可能性最大,吞吐量最大。
acks=1,broker中leader节点收到消息时,生产者就会收到消息写入成功的响应,消息丢失可能性适中,吞吐量适中
acks=all,所有的broker节点都收到消息时,生产者会收到消息写入成功的响应,消息丢失可能性最低,吞吐量最低
-
buffer.memory:缓存区大小,生产者消息会发送到缓冲区,然后缓冲区发送到服务器。如果进缓冲区的速度大于出缓冲区的速度,要么发送阻塞,要么出现异常。max.block.ms可以设置缓冲区的时间
-
compression.type:该参数设压缩模式,默认情况下不进行压缩。可以设置为snappy,gzip,lz4。
-
reries:重试次数,同时可以设置retry.backoff.ms设置重试间隔
-
batch.size:多个消息要被发送到同一个分区时,会进行打包发送,这里设置的是批次的大小(打包的大小)
-
linger.ms:设置了生产者在发送批次之前等待更多消息加入批次的时间,Kafka会在批次大小填满或者等待linger.ms时间进行发送。
-
client.id:可以是任意字符串,服务器用它识别消息的的来源
-
max.in.flight.requests.per.connection:制定生产者在收到服务器响应之前可以发送多少个消息,设置为1,表示按照顺序写入服务器
-
max.block.ms:可以设置缓冲区的时间
-
max.request.size:生产者发送的请求大小,注意和批次大小要相互匹配,也要和接收者(message.max.bytes)相互匹配
-
receive.buffer.bytes 和 send.buffer.bytes:指定TPC socket的缓冲区大小,设置为-1是系统默认值
发送消息方式
-
发送并忘记
调用send()方法,不关心异常情况,适用于允许丢数据的场景
这里不关心服务端发生的异常,但是生产者那一端会有其他异常,例如SerializationExcepion(序列化异常),BufferExhaustedException或TimeoutException(缓冲区已满),InterruptException(线程中断)
-
同步发送
调用send()方法,返回一个Futrue对象,调用get()方法进行等待,可以知道消息是否发送成功。适用于不允许丢数据的场景
KafkaProducer一般会有两类错误,一种是可重试错误,这类错误可以通过重发消息解决,例如“无主(no leader)”异常,可以重新为分区选举leader来接近。一种是无法通过重试来接近的异常,例如"消息过大"的异常,这个时候直接泡醋异常,不会进行任何重试。
-
异步发送
调用send()方法,设置回调函数,服务器在返回响应时调用该函数。
第四章 Kafka消费者
KafkaConsumer概念
消费者和消费者群组
为了保证消费端的横向扩展,消费者群组概念被提出,不过值得注意的是,消费者的横向扩展的上线与分区个数有关系。如下图:
消费者群组和分区再均衡
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。
什么时候再平衡?
消费者发送那个心跳到群组协调器的broker来维持它们和群组的从属关系,以及它们对分区的所有权关系。如果消费者没有以正常时间间隔发送心跳,那么会触发再平衡。
分配分区时一个怎样的一个过程?
当消费者要加入群组时,它向群组协调器发送一个JoinGroup请求。第一个加入群组的消费者成为“群主”。群主从协调器中获取群组成员列表,并负责给每一个消费者分配分区。
创建Kafka消费者
消费者需要填充的属性。
bootsrap.servers:集群地址,不需要包含所有的broker地址,会从其中某个broker中获取其他broker地址。
key.serializer:key的序列化方式
value.serializer:value的序列化方式
group.id:群组id,非必须,但是常规的情况下都会填
订阅主题
创建好消费者对象后,调用subsceibe()方法接受一个主题列表作为参数。
值得注意的时subsceibe()方法也可以传入一个正则表达式,正则表达式可以匹配多个主题,如果有人创建了新的主题,并且主题名字和正则表达式匹配,那么会立即触发一次再平衡,消费者就可以读取新添加的主题。
轮询
消费轮询时消费者Api的核心,通过轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括:群组协调 / 分区再平衡 / 发送心跳 / 获取数据,开发者只需要使用一组简单的Api来处理从分区返回的数据。消费者代码主要部分如下:
Properties props = new Properties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList("topic1"));
try {
// 无限循环,通过秩序轮询Kafka请求数据
while (true){
/**
* 灰常重要的代码,在这行代码里做了以下几件事情
* 1 群组协调:第一次调用时,会负责查找 GroupCoordinator,然后加入群组
* 2 分区再均衡:如果发生再平衡,在这代码里面执行
* 3 发送心跳:心跳也是这里发出的,因此要保证轮询期间所作的任何处理工作应该尽快完成
* 4 获取数据
*/
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
poll.forEach(item->{
// 返回的 ConsumerRecord 对象记录了topic,分区,offset
String format = MessageFormat.format("topic={0}, partition={1}, offset={2}, consumer={3}, value={4}",
item.topic(),
item.partition(),
item.offset(),
item.key(),
item.value());
System.out.println(format);
});
}
} finally {
// 退出程序之前,执行消费者退出,可以立即告诉群组协调该消费者已经gg
consumer.close();
}
消费者配置
- fetch.min.bytes:指定了消费者从服务器获取记录的最小字节数。消费者向broker请求数据,broker返回消费者的最小字节数。
- fetch.max.wait.ms:指定了消费者从服务器中最大的等待时间。消费者向broker发送请求,如果超过这个时间,broker就算没有集齐fetch.min.bytes数据,也会直接返回。
- max.partition.fetch.bytes:指定了服务器从每个分区返回给消费者的最大字节数。假设设置为1M,有20个分区,5个消费者,那么返回给消费者的最大数量围殴4M(20/5 * 1)
- session.timeout.ms:制定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3秒,
- auto.offset.reset:指定了读取一个没有偏移量的分区或者偏移量无效的情况下该如何处理,默认值时latest,意思时在偏移量无效时,消费者从最新的记录开始读取数据。
- enable.auto.commit:指定了消费者是否自动提交偏移量,默认值时true。为了避免重复消费或数据丢失,可以设置为false,自己控制何时提交偏移量。
- partition.assignment.strategy:指定分区分配哪个消费者的策略
- client.id:broker用它来标识从客户端发送过来的消息
- max.poll.records:控制单词调用call()方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。
- receive.buffer.bytes 和 send.buffer.bytes:指定TPC socket的缓冲区大小,设置为-1是系统默认值
提交和偏移量
提交是指更新分区当前位置的操作
消费者如何提交偏移量呢?
消费者向一个叫做_consumer_offset的特殊主题发送消息,消息里面包含每个分区的偏移量。一般情况下消费者如果一直运行,那么偏移量没有什么用处。不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发再平衡,完成再平衡后,每个消费者可能分配到新的分区,而不是之前处理的那个。此时,消费者要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
什么情况会有重复消费?
首先要明白,消费者客户端是批量处理,如果 broker中提交的偏移量 < 消费者客户端处理数据的偏移量。那么它们之差就是重复消费的数据。
什么情况会丢数据?
broker中提交偏移量 > 消费者客户端处理数据的偏移量,那么会丢失数据。
自动提交
enable.auto.commit设置为true,那么每过5s,消费者会自动把从poll()方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms控制,默认是5s。
自动提交会产生重复消费的情况,无法避免。具体场景是,默认5s提交,如果在3s的时候进行了在均衡,那么会重复消费0~3s的数据。可以通过减少时间间隔来减少重复消费的数据量。
提交当前偏移量
把auto.commit.offset设置为false,使用commitSync()手动提交偏移量。
while (true){
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
poll.forEach(item->{
// 返回的 ConsumerRecord 对象记录了topic,分区,offset
String format = MessageFormat.format("topic={0}, partition={1}, offset={2}, consumer={3}, value={4}",
item.topic(),
item.partition(),
item.offset(),
item.key(),
item.value());
System.out.println(format);
});
try {
// 处理完消息后,调用commitSync同步提交当前批次最新的偏移量
consumer.commitSync();
} catch (CommitFailedException e){
System.out.println(e);
}
}
异步提交
手动提交有一个不足之处,在broker对提交请求做出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过异步提交API,无需等待broker响应,提交偏移量。
while (true){
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
poll.forEach(item->{
// 返回的 ConsumerRecord 对象记录了topic,分区,offset
String format = MessageFormat.format("topic={0}, partition={1}, offset={2}, consumer={3}, value={4}",
item.topic(),
item.partition(),
item.offset(),
item.key(),
item.value());
System.out.println(format);
});
// 注意这里是异步提交偏移量
consumer.commitAsync();
}
同步和异步组合提交
同步和异步的组合,实际上的意思是正常情况下异步同步,如果出现异常,那么在异常处理中进行同步提交。
try {
while (true){
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
poll.forEach(item->{
String format = MessageFormat.format("topic={0}, partition={1}, offset={2}, consumer={3}, value={4}",
item.topic(),
item.partition(),
item.offset(),
item.key(),
item.value());
System.out.println(format);
});
// 正常情况下异步提交
consumer.commitAsync();
}
} finally {
try {
// 如果出现异常同步一直提交
consumer.commitSync();
} finally {
consumer.close();
}
}
提交特定的偏移量
关注提交特定偏移量的Api
**
* Commit the specified offsets for the specified list of topics and partitions.
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)
/**
* Commit the specified offsets for the specified list of topics and partitions to Kafka.
*/
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
再均衡监听器
消费者在退出和进行分区在均衡之前,会进行一些清理工作,这是Kafka自身的操作。假如你需要在消费者再均衡时定义自己的操作,那么需要在订阅Topic时传入一个回调函数,在回调函数中可以编写再均衡的逻辑。订阅的源码如下:
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
if (pattern == null)
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
acquireAndEnsureOpen();
try {
throwIfNoAssignorsConfigured();
log.debug("Subscribed to pattern: {}", pattern);
this.subscriptions.subscribe(pattern, listener);
this.metadata.needMetadataForAllTopics(true);
this.coordinator.updatePatternSubscription(metadata.fetch());
this.metadata.requestUpdate();
} finally {
release();
}
}
ConsumerRebalanceListener接口如下:
package org.apache.kafka.clients.consumer;
import java.util.Collection;
import org.apache.kafka.common.TopicPartition;
public interface ConsumerRebalanceListener {
// 该方法会在再均衡开始之前和消费停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者,就知道从哪里开始读取了
void onPartitionsRevoked(Collection<TopicPartition> partitions);
// 方法会在重新分配分区之后和消费者开始读取消息之前被调用。
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
从特定偏移量处开始处理记录
poll()方法是从各个分区的最新偏移量处开始处理消息。不过,有时候我们也需要从特定的偏移量处开始读取数据。修改分区的偏移量的有以下API
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public interface Consumer<K, V> extends Closeable {
/**
* @see KafkaConsumer#seek(TopicPartition, long),指定偏移量读取
*/
void seek(TopicPartition partition, long offset);
/**
* @see KafkaConsumer#seekToBeginning(Collection) 从分区的起始位置读取
*/
void seekToBeginning(Collection<TopicPartition> partitions);
/**
* @see KafkaConsumer#seekToEnd(Collection) 从分区的最终位置读取
*/
void seekToEnd(Collection<TopicPartition> partitions);
}
seek(TopicPartition partition, long offset) 用于查找特定偏移量的API有很多用途,比如向后回退几个消息或者向前跳过几个消息(对时间比较敏感的应用程序在处理滞后的情况下希望能够向前跳过若干个消息)。在使用Kafka以外的系统来存储偏移量时,它将给我们带来更大的惊喜。
假设以下一个场景,Kafka消费数据,然后入库,假设我们不想丢失数据也不想在数据库里多次保存该数据(保证不丢失数据和重复消费)。这种情况我们可以把偏移量的存储和数据的存储放在同一个事务里,仅能同时成功或者同时失败。然后Kafka启动或者再平衡的时候从数据库中读取这个偏移量,然后指定偏移量,继续开始消费。
如何退出
如果确定要退出循环,需要通过外部线程调用 consumer.wakeup(),注意 consumer.wakeup() 时消费者唯一一个可以从其他线程里安全调用的方法。调用 consumer.wakeup() 可以退出 poll(),并抛出 WakeupException 异常,或者如果调用 consumer.wakeup() 时线程没有等待轮询,那么异常将在下一轮调用 poll() 抛出。
反序列化器
以后再总结把。。。。
独立消费者
之前讨论的消费群组,分区被自动分配给群组里的消费者,在群组里新增或移除消费者时自动触发再均衡。通常情况下,这些行为刚好是你所需要的,不过有时候你需要一些更简单的东西。比如,你可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。这个时候,就不需要消费者群组和再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。
/**
* 向集群请求主题可用的分区
*/
List<PartitionInfo> partitionInfoList = consumer.partitionsFor("topic");
List<TopicPartition> partitionList = partitionInfoList.stream()
.map(item -> new TopicPartition(item.topic(), item.partition()))
.collect(Collectors.toList());
/**
* 知道需要哪些分区后,调用 assign() 方法
* 除了不会发生再平衡,也不需要手动查找分区,其他的看起来一切正常。
* 不过要记住,如果主题添加了新的分区,消费者并不会收到通知。所以,需要周期性地调用 consumer.partitionsFor() 方法来检查是否有新的分区加入;
*
*/
consumer.assign(partitionList);
while (true){
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
poll.forEach(item->{
// 返回的 ConsumerRecord 对象记录了topic,分区,offset
String format = MessageFormat.format("topic={0}, partition={1}, offset={2}, consumer={3}, value={4}",
item.topic(),
item.partition(),
item.offset(),
item.key(),
item.value());
System.out.println(format);
});
consumer.commitSync();
}
第五章 深入Kafka
集群成员关系
Kafka使用Zookeeper来维护集群成员的信息。每个broker(部署Kafka的服务器)都有一个唯一标识,这个标识可以再配置文件里指定,也可以自动生成。在broker启动的时候,它通过创建临时节点把自己的ID注册到Zookeeper。Kafka组件订阅Zookeeper的 /brokers/ids (broker 在 Zookeeper 上的注册路径),当有broker加入集群或退出集群时,这些组件就可以获得通知。
控制器
控制器也是一个 broker,只不过它除了具有一般的 broker的功能之外,还负责分区首领的选举。集群里第一个启动的 broker 通过在 Zookeeper 里创建一个临时节点 /controller 让自己成为控制器。其他 broker 启动时也会尝试创建这个节点,此时会收到一个“节点已存在的异常”,然后“意识到”控制器节点已存在,也就是说集群里已经有一个控制器了。其他broker在控制器节点上创建 Zookeeper wathc对象,这样它们就可以收到这个节点的变更通知。这种方式可以确保集群里一次只有一个控制器存在。
如果控制器被关闭或者与 Zookeeper 断开连接,Zookeeper 上的临时节点就会消失。集群里的其他 broker 通过 watch 对象得到控制器节点消息的通知,它们会尝试让自己成为新的控制器。第一个在Zookeeper里成功创建控制器节点的broker就会成为新的控制器,其他节点会收到“节点已存在”的异常,然后再新的控制器节点上再次创建watch对象。每个新选出的控制器,通过Zookeeper的条件递增操作获得一个全新,数值更大的controller epoch。其他broker在知道当前controller epoch后,如果收到由控制器发出的包含较旧 epoch 的消息,就会忽略它们。
当控制器发现 broker 已经离开集群(通过观察相关的 Zookeeper 路径),它就知道,那些失去首领的分区需要一个新首领(这些分区的首领刚好是在这个 broker 上)。控制器遍历这些分区,并确定谁应该成为新首领(简单来说就是分区副本列表里的下 个副本),然后向所有包含新首领或现有跟随者的 broke 发送请求。该请求消息包含了谁是新首领以及谁是分区跟随者的信息。随后,新首领开始处理来自生产者和消费者的请求,而跟随者开始从新首领那里复制消息。
当控制器发现 broker 加入集群时,它会使用 broker 来检查新加入的 broker 是否包含现有分区的副本。如果有,控制器就把变更通知发送给新加入的 broker 和其他 broker,新broker 上的副本开始从首领那里复制消息。
简而言之, Kafka 使用 Zookeeper 的临时节点来选举控制器, 并在节点加入集群或退出集群时通知控制器。控制器负责在节点加入或离开集群时进行分区首领选举。控制器使用epoch 来避免“脑裂” 。“脑裂”是指两个节点同时认为自己是当前的控制器。
复制
https://blog.csdn.net/qq_35689573/article/details/86699256
首先说明,Kafka使用主题(topic)来组织数据,每个主题被分为若干个分区(Partition),每个不同分区有多个副本。那些副本保存在broker上,每个broker可以保存成百上千个属于不同主题和分区的副本。
- leader 副本(首领副本):每个分区有个首领副本,为了保证一致性,所有生产者请求和消费都是通过首领副本。
- flower副本(跟随者副本):首领以外的副本都是跟随着副本。跟随着副本不处理来自客户端的请求,它的唯一任务就是从首领副本复制消息,保持与首领一致的状态。如果首领发生崩溃,其中的一个跟随着会被提升为新的首领。
处理请求
broker的大部分工作是处理客户端,分区副本和控制器发送给分区首领的请求。所有请求消息包含一个标准消息头:
- Request type(也就是 API key)
- Request version(broker可以处理不同版本的客户端请求,并根据客户端版本做出不同的响应)
- Correlation ID 一个具有唯一性的数字,用于标识请求消息,同时也会出现在响应消息和错误日志里
- Client ID 用户标识发送请求的客户端
第六章 可靠的数据传递
可靠性保证
Kafka可以在可靠性做出以下保证:
-
Kafka可以保证分区消息的顺序。如果同一个生产者向同一个分区写入消息,而且消息B在消息A之后写入,那么Kafka可以保证消息B的偏移量比消息A的偏移量大,而且消费者会先读取消息A再读取消息B。
- Kafka保证,只要有一个副本是活跃的,那么已提交的消息就不会丢失。
-
消费者只能读取已提交的消息。
-
生产者可以选择接收不同的类型的确认。比如消息被完全提交时的确认(acks=all),或者消息在被写入leader 副本时确认(acks=1),或者消息被发送到网络时确认(acks=0)。
作是处理客户端,分区副本和控制器发送给分区首领的请求。所有请求消息包含一个标准消息头:
- Request type(也就是 API key)
- Request version(broker可以处理不同版本的客户端请求,并根据客户端版本做出不同的响应)
- Correlation ID 一个具有唯一性的数字,用于标识请求消息,同时也会出现在响应消息和错误日志里
- Client ID 用户标识发送请求的客户端
第六章 可靠的数据传递
可靠性保证
Kafka可以在可靠性做出以下保证:
-
Kafka可以保证分区消息的顺序:如果同一个生产者向同一个分区写入消息,而且消息B在消息A之后写入,那么Kafka可以保证消息B的偏移量比消息A的偏移量大,而且消费者会先读取消息A再读取消息B。
- Kafka保证,只要有一个副本是活跃的,那么已提交的消息就不会丢失。
-
消费者只能读取已提交的消息。
-
生产者可以选择接收不同的类型的确认:比如消息被完全提交时的确认(acks=all),或者消息在被写入leader 副本时确认(acks=1),或者消息被发送到网络时确认(acks=0)。