Kafka生产者客户端整体架构如图:
整个生产者客户端主要有两个线程,主线程以及Sender线程。Producer在主线程中产生消息,然后通过拦截器,序列化器,分区器之后缓存到消息累加器RecordAccumulator中。Sender线程从RecordAccumulator中获取消息并发送到kafka中。
RecordAccumulator主要用来缓存消息,这样发送的时候进行批量发送以便减少相应的网络传输。RecordAccumulator缓存的大小可以通过配置参数buffer.memory配置,默认是32M。如果创建消息的速度过快,超过sender发送给kafka服务器的速度,会导致缓存空间不足,这个时候sender线程可能会阻塞或者抛出异常,max.block.ms配置决定阻塞的最大时间。
RecordAccumulator中为每个分区维护了一个双端队列,队列中的内容是ProducerBatch,即Deque<ProduderBatch>,创建消息写入到尾部,发送消息从头部读取。ProducerBatch是消息发送的一个批次,里面包含了一个或多个ProducerRecord。
Sender从RecordAccumulator中获取到缓存的消息,会将<分区,Dequeue<ProducerBatch>>
转换为<Node,List<ProruderBatch>>,Node表示的是kafka集群的broker节点,生产者客户端与具体broker节点建立的连接。也就是向具体的broker节点发送消息而不关心具体分区。
转换为<Node,List<ProruderBatch>>后,sender还会进一步封装转换成<Node,Request>形式,将请求发送给各个Node。
请求在发送给Kafka之前还会保存到InFlightRequests中,形式为: Map<NodeId,Dequeue<Request>>
主要作用是缓存了已经发出去但是还未收到响应的请求。InFlightRequests通过配置参数max.flight.requests.per.connection
可以限制每个链接最多缓存数量,默认值为5,即每个链接最多只能缓存5个未响应的请求,超过该参数之后就不能继续像这个连接发送请求。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)