kafka权威指南中文版之三

2023-11-17

第三章kafka producer---kafka写入消息

无论你将kafka作为一个消息队列,或者消息总线,还是一个数据存储平台,你都要通过生产者producerkafka写入数据,通过消费者consumer读取kafka的数据。

例如,一个信用卡事务处理系统,会有一个客户端应用或者一个在线商店应用,负责在交易发生时,将每一个事务发送到kafka,另一个应用通过规则引擎校验这个事务,决定接收或者拒绝,接收或者拒绝响应可以写入kafka,这样在线商店应用就可以收到这个响应。第三方应用可以读取这个事务数据和响应数据,存储到数据库中,以备后续分析(改进规则引擎)

kafka提供了Java客户端API,开发者可以使用这些API开发应用程序,与kafka交互。本章关注如何使用producer客户端来开阿发应用程序,将数据写入kafka。下一章将关注consumer客户端,如何读取kafka数据。

有很多场景需要将消息写入kafka:记录用户动作,用于审计或分析;存储日志消息;与其他应用程序异步通信;作为写入数据库之前的缓冲区等等。

不同的场景以为着不同的需求:每条消息是否都很重要或者说可以接收消息的丢失吗?可以接受偶尔收到重复的消息吗?对消息时延和吞吐量有严格的要求吗?

前面的信用卡事务处理系统的例子中,可以看出对消息的要求是严格的,既不允许丢失消息也不允许重复收到消息,消息延迟要在500毫秒以内,需要每秒100万条消息的吞吐量。

另一种场景,如存储网站的点击事件信息到kafka。此种场景,一些消息的丢失或者重复是可以接受的,消息延迟也可以很高,只要对用户体验没有影响就可以,也就是说,消息经过几秒中到达kafka是可以的,只要下一个网页在用户点击后迅速加载出来。此时的消息吞吐量取决于网站的用户量。

不同的场景,不同的需求,会影响producer API的使用方式及配置方式。

生产者producer概览

虽然producer API非常简单,但是在发送消息时,其底层发生了很多事情。图3-1展示了发送消息到kafka的主要步骤。

上图中,从创建一个ProducerRecord开始,ProducerRecord包含消息要发送到哪个Topic,消息的值,也可以声明一个keypartition。一旦将ProducerRecord发送,producer要做的就是序列化keyvalue对象为二进制数组,这样才可以通过网络发送。

接着,数据发送到一个partitioner。此时如果我们在ProducerRecord中声明了一个partition分区,partitioner仅仅将我们自定义的partition返回;如果没有声明partitionpartitioner将会选择一个partition,通常会根据key来选择partition。一旦选择了partitionproducer就知道这个消息要发送到哪个topic和哪个分区了。

接着,producer将这个消息加入到一个消息批次中,这个消息批次中的消息会发送到相同的topicpartition。此时会开辟一个独立的线程负责发送这批消息到合适的kafka broker

broker收到消息,会发回一个响应信息。如果这个消息成功写入kafkabroker会响应一个RecordMetadata对象(包括topicpartition、以及消息在partition中的offset)。如果broker没有将消息写入kafka,将会响应一个错误。当producer收到这个错误,可以尝试重发指定次数的消息,直到放弃。

本章中学习如何使用kafka producer,会涉及到图3-1中的大部分组件。学习如何创建一个kafkaProducerProducerRecord对象;学习如何使用默认的partitionerserializers发送消息;学习如何处理响应的错误;学习如何自定义serializerspartitioner。以及学习producer相关的大部分重要配置项。

 

创建kafkaProducer对象

发送消息到kafka,首先需要创建KafkaProducer对象。KafkaProducer对象需要3个必备的属性:

bootstrap.servers—kafka brokershost:port列表。此列表中不要求包含集群中所有的brokersproducer会根据连接上的broker查询到其他broker。建议列表中至少包含两个brokers,因为这样即使一个broker连接不上,可以连接另一个broker

key.serializer—kafka brokers期望的消息(keyvalue)二进制数组。Producer接口使用了参数化类型来定义key serializer,以此发送任何Java对象。这就意味着,producer必须知道如何将这些Java对象转换为二进制数组byte arrayskey.serializer应该设置为一个类,这个类实现了org.apache.kafka.common.serialization.Serializer接口,producer使用这个类将key对象序列化为byte arraykafka客户端包中有ByteArraySerializer (which doesn’t domuch), StringSerializer andIntegerSerializer三种类型的序列化器,如果发送常用类型的消息,不需要自定义序列化器。注意:即使发送只包含value的消息,也要设置key.serializer

value.serializer---key.serializer含义相同,其值可以与key.serializer相同,也可以不同。


下面的代码片段通过设置上述参数的方式创建了KafkaProducer对象:

private Properties kafkaProps = new Properties();//创建一个properties对象

kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");

//设置字符串类型的消息key和value

kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//创建KafkaProducer对象,设置string泛型,传入properties对象

producer = new KafkaProducer<String, String>(kafkaProps);

 

 

可以看出,通过设置properties对象的不同参数,可以完成对producer对象的控制。kafka官方文档中列出了所有的配置参数,本章会学习其中一些重要的配置参数。

上述代码片段实例化了一个producer,接着就可以发送消息了。有三个主要方法用来发送消息:

Fire-and-forget----此方法用来发送消息到broker,不关注消息是否成功到达。大部分情况下,消息会成功到达broker,因为kafka是高可用了,producer会自动重试发送。但是,还是会有消息丢失的情况;

SynchronousSend(同步发送)---发送一个消息,send()方法返回一个Future对象,使用此对象的get()阻塞方法可以指定send方法是否执行成功。

Asynchronous Send(异步发送)---以回调函数的形式调用send()方法,当收到broker的响应,会触发毁掉函数执行。

上述所有的情况,有一点要认识到:发送消息到kafka是可能失败的,需要有处理这些失败的计划。一个producer对象可以通过多线程的方式发送消息,也可以使用多个producer发送消息。

下面通过实例演示如何使用上述三种方式发送消息,以及如何处理可能发生的异常。

发送消息到kafka

发送消息到kafka最简单的方式如下:

ProducerRecord<String, String> record =

    new ProducerRecord<>("CustomerCountry", "Precision Products", "France");

try {

  producer.send(record);

} catch (Exception e) {

    e.printStackTrace();

}

 

1、  producer对象的send方法接收一个ProducerRecord对象,所以先创建一个ProducerRecord对象。ProducerRecord有多个构造方法,后续会讨论。这里我们使用了三个参数的构造函数:string类型的topicstring类型的keystring类型的valuekeyvalue的类型必须与serializersProducer的泛型一致。

2、  使用Producersend方法发送ProducerRecord对象。在前面的Producer架构图中显示,消息会先放到缓冲区,然后启用一个独立线程发送到brokersend方法返回一个包含RecordMetadataFuture对象,这里我们忽略了返回值,不关注消息发送是否成功。这种发送消息的方式在允许消息丢失的场景下使用。

3、  虽然我们忽略了消息发送到kafka的异常,但是在消息发送到kafka之前,还是有可能发生异常的。如序列化消息失败异常SerializationException、缓冲区用尽异常BufferExhaustedException(配置了producer信息,指定在缓冲区满时,不是阻塞,而是抛出异常的情况)、发送中断异常InterruptException

同步发送消息到kafka

ProducerRecord<String, String> record =

    new ProducerRecord<>("CustomerCountry", "Precision Products", "France");

producer.send(record).get();

 

这里我们使用Future.get()方法来等待消息发送结果,直到收到kafka的响应。当kafka broker返回错误时,Future对象会抛出异常,我们的应用程序可以捕获异常。如果没有异常,我们会得到RecordMetadata对象,从中可以获取到消息的offset等信息。

KafkaProducer有两类错误。一类是重试类错误,这类错误通过再次发送消息可以解决,例如连接错误(重试可能会连接成功)”no-leader”错误。重试次数是可以配置的,只有在重试次数用完后,错误依然存在,此时客户端才会收到重试类错误。另一类错误是非重试类错误,就是说不能通过重试来解决的错误。例如 message size too large错误。此时KafkaProducer将不会重试,直接返回异常。

异步发送消息

假设客户端程序与kafka集群之间的网络轮询时间为10ms。如果我们在发送消息后等待响应,发送100条消息将会消费1s的时间。另一方面讲,如果我们只是发送消息,不等待响应,此时发送100条消息将会耗费更少的时间。在大部分场景下,我们真的不需要等待一个响应,响应中的信息(topicpartitionoffset)有时候不是客户端必须的。也就是说,我们需要知道什么时候发送失败,这样我们可以抛出异常或者写入错误日志文件中,以备后续分析。

为了能够异步发送消息,并且能处理错误,这种场景需要为Producer添加一个callback回调函数:

private class DemoProducerCallback implements Callback {

       @Override

    public void onCompletion(RecordMetadata recordMetadata, Exception e) {

           if (e != null) {

               e.printStackTrace();

        }

    }

}

 

ProducerRecord<String, String> record =

       new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");

producer.send(record, new DemoProducerCallback());

 

Serializers序列化器


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka权威指南中文版之三 的相关文章

随机推荐

  • 数字系统的信息表示

    数字系统的信息表示 1 什么是信息 2 数字系统是如何表示一个连续值的信息 3 使用数字信号的优势 4 将模拟信号表示成数字信号形式过程 5 为什么数字系统要采用二进制 6 噪声容限 1 什么是信息 信息是对物质世界与人类社会中存在的各种各
  • 网络安全中的欺骗攻击与防御技术

    在Internet上计算机之间相互进行的交流建立在两个前提之下 认证 信任 认证是网络上的计算机用于相互间进行识别的一种鉴别过程 经过认证的过程 获准相互交流的计算机之间就会建立起相互信任的关系 信任和认证具有逆反关系 即如果计算机之间存在
  • 介值定理究竟在讲什么?

    介值定理 书本上的定义 翻译成人话就是 函数最原始的定义 我们初中就知道 一个函数最根本的性质就是 函数值 自变量值 一一对应 所以介值定理就是在反复说一件事 一个数如果属于值域 在定义域内 一定能够找到一个 自变量 与其对应 当然这个结论
  • Shell—关于source,bash如何执行

    通过对一个脚本问题的分析 发现了自己的一个知识误区 我想 有必要写篇博客总结一下 关于source source test sh 与 test sh 二者用法相同 是读取脚本test sh中的内容 依次在当前脚本中执行 且不会建立新的子sh
  • 【论文精读IEEE_2023_6】FlowFace++: Explicit Semantic Flow-supervised End-to-End Face Swapping

    论文精读CVPR 2023 6 FlowFace Explicit Semantic Flow supervised End to End Face Swapping 一 前言 Abstract I INTRODUCTION II RELA
  • matlab 三维激光雷达点云的路缘检测与跟踪

    目录 Introduction Download Lidar Data Set Preprocess Data Detect Road Shape Detect Road Curbs Track Curb Points Analyze Dr
  • 记录用ConstraintLayout实现控件view最大高度的过程

    背景 我项目里用到个popupWindow 内容是掉接口获取的list 长度不固定 就想着弄个最大高度 让他在内容过多的时候不会太长怼到屏幕底部 开整 看constraintLayout的文章说用android maxHeight 250d
  • how to activate XMind8 to pro version.

    From activate Xmind 8 in step 3 run setup sh in sudo command and use the following command to run XMind XMind Activate X
  • 立创开源

    一 项目说明 我们在使用单片机设计项目时经常需要用到ADC功能 但是众所周知 单片机是很脆弱的东西 一旦采样 分压后 的电压超过3 3v就会瞬间罢工 在烧毁4 5个单价不菲的单片机后我认为使用外部ADC很有必要 由此本项目诞生 二 原理图
  • 【解决】nltk.download()报错:errno54: connection reset by peer

    报错详情 import nltk gt gt gt nltk download nltk data Error loading
  • github不再支持账号密码解决方案

    今天在向github上传代码的时候 突然不能上传了 终端报错信息如下 remote Support for password authentication was removed on August 13 2021 Please use a
  • C语言中将变量的数值打印到.txt文件

    1 C语言中只需加入以下代码即可将变量的数值打印到 txt文件中 FILE fp fopen dayin txt a fprintf fp d n 变量名 fclose fp 2 若想每次运行C程序就覆盖掉上一次生成的 txt文件 则需要先
  • 深入PCI与PCIe之一:硬件篇

    PCI总线和设备树是X86硬件体系内很重要的组成部分 几乎所有的外围硬件都以这样或那样的形式连接到PCI设备树上 虽然Intel为了方便各种IP的接入而提出IOSF总线 但是其主体接口 primary interface 还依然是PCIe形
  • 人才盘点的主角是业务部门负责人还是HR?

    你的企业 是否存在以下问题 人才储备不足 关键岗位人员离职后 没有合适的马上接替 人才质量不高 企业战略变革转型期 不知谁可以引领和驱动变革 人才现状不清 新的业务 新的项目要开拓 不知合适的人才在哪里 人才分布不均 成熟业务部门人才扎堆
  • gin框架38--使用中间件

    gin框架38 使用中间件 介绍 案例 说明 介绍 本文主要介绍如何在gin框架中使用中间件 并通过案例加以说明 使用MyBenchLogger中间件来输出特有的日志 用AuthRequire中间件来实现基础认证 案例 package ma
  • Cost function

    cost function的形式 cost function的推导满足以下过程 1 认为error 满足某个分布 写出样本点xi的样本的error 2 认为样本点是相互独立的 推导出其对数似然函数 3 求偏导 是得导函数为0 分离常数部分
  • Java 高精度计算 BigDecimal 和 BigInteger

    BigDecimal 在 Java 中 表示小数值一般使用 float 或者 double 类型 可以用于科学计算或者工程计算等 数学意义上的小数是连续的 但 float 和 double 只能表示其中的一些离散点 如果我们要表示的数值刚好
  • Webpack 中常用的loader和plugin已经webpack如何配置

    dist文件夹存放打包后的文件 动态获取出口路径 需要有webpack init 生成package js文件 1 1webpack是什么 webpack 是一种前端资源构建工具 一个静态模块打包器 modulebundler 在 webp
  • IEnumerable vs IEnumerator

    对对象的存储对每一种语言都很重要 例如C 中的Iterator C 中的IEnumerator Java中的Iterator等 C 所有的集合类都定义了一个叫iterator的数据成员 可以通过此对象实现对集合的traverse vecto
  • kafka权威指南中文版之三

    第三章kafka producer 向kafka写入消息 无论你将kafka作为一个消息队列 或者消息总线 还是一个数据存储平台 你都要通过生产者producer向kafka写入数据 通过消费者consumer读取kafka的数据 例如 一