java代码kafka初始化producer和consumer

2023-11-10

目录

一、初始化producer对象(序列化消息)

生产者发送消息的三种方式

kafka生产者其它详细知识:

二、初始化consumer对象(反序列化消息)

consumer取消订阅的方式consumer.unsubscribe();

使用自定义的序列化


一、初始化producer对象(序列化消息)

kafka序列化消息是在生产端,序列化后,消息才能网络传输。而构造KafkaProducer代码如下:

Properties props = new Properties();
props.put("bootstrap.servers", "10.0.55.229:9092");//
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//props.put("value.serializer", "com.java.kafka.PersonUtilSerializer");
//Producer<String, Object> producer = new KafkaProducer<String, Object>(props);
kafkaProducer = new KafkaProducer<>(props);
	    int i=0;
	    while(true){
	    	i++;
	    	JsonObject  p = new JsonObject();
                p .put("id","1")
	    	p .put("name","tester")
	    	System.out.println(p.toString());
// 当指定发送消息的分区时,程序就不会根据key值再判断发往哪个分区了。
           // record = new ProducerRecord<>(TOPIC, 0, String.valueOf(i), messageStr);
	    	ProducerRecord<String, Object> record = new ProducerRecord<String,String>(TOPIC, p.toString());
	    	producer.send(record);
	    
	    }
	       producer.close();
	}

ProducerRecord 构造方法,该方法的参数 topic 和 value 属性是必填项,其余属性(比如:分区号、时间戳、key、headers)是选填项。对应的 ProducerRecord 的构造方法也有多种:使用者可根据场景来选择合适的 ProducerRecord 。

属性key.serializer和value.serializer就是key和value指定的序列化方式。无论是key还是value序列化和反序列化实现都是一样的。

StringSerializer是内置的字符串序列化方式

生产者发送消息的三种方式

Kafka 生产者发送消息有三种方式,分别为:普通发送(发后即忘)、同步发送、异步发送。

kafka生产者其它详细知识:

Kafka基础(二):生产者相关知识汇总

https://blog.csdn.net/CREATE_17/article/details/93396981

二、初始化consumer对象(反序列化消息)

kafka反序列化消息是在消费端。由于网络传输过来的是byte[],只有反序列化后才能得到生产者发送的真实的消息内容。而构造KafkaConsumer代码如下:

Properties props = new Properties();
props.put("bootstrap.servers", "10.0.55.229:9092");
props.put("key.deserializer",   "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "group1");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
// 订阅test1 topic   使用subscribe()方法订阅主题 (一般推荐这种方式)
consumer.subscribe(Collections.singletonList("test1"));
// 另一种订阅的方式 使用assign()方法订阅确定主题和分区
//consumer.assign(Collections.singletonList(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())));
consumer.seekToBeginning(Arrays.asList(TOPIC_PARTITION));
	while(true){
             ConsumerRecords<String, String> records = consumer.poll(100);
	     System.err.println("print the size of records ,size="+records.count());
	     for(ConsumerRecord<String, String> record:records){
	        System.out.println(record);
	       }
}

属性key.deserializervalue.deserializer就是key和value指定的反序列化方式。

StringDeserializer是内置的字符串反序列化方式

consumer取消订阅的方式consumer.unsubscribe();

可以使用 unsubscribe() 方法来取消主题的订阅

使用方式:consumer.unsubscribe();

unsubscribe()方法即可以取消通过subscribe()方式实现的订阅,还可以取消通过assign()方式实现的订阅

consumer的两种不同订阅方式:subscribe()  和 assign()

使用自定义的序列化

写一个自定义的类,比如说我们要传递一个Person对象,那么我们就定义个Person对象的序列化和反序列化的类,并且实现Serializer接口,下面继续看,首先定义个Person类

public class Person implements Serializable{
 
	private String id ;
	private String name ;
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	@Override
	public String toString() {
		return "Person [id=" + id + ", name=" + name + "]";
	}
	
}

接下来,我们自定义一个序列化的类: PersonUtilSerializer

public class PersonUtilSerializer  implements Serializer<Person>{
 
	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
		
	}
 
	@Override
	public byte[] serialize(String topic, Person data) {
		
		return JSON.toJSONBytes(data);
	}
 
	@Override
	public void close() {
		// TODO Auto-generated method stub
		
	}
 
}

初始化producer实例对象

props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.java.kafka.PersonUtilSerializer");
Producer<String, Object> producer = new KafkaProducer<String, Object>(props);
	    int i=0;
	    while(true){
	    	i++;
	    	Person p = new Person();
	    	p.setId(i+"");
	    	p.setName("zhangsan-"+i);
	    	System.out.println(p.toString());
	    	ProducerRecord<String, Object> record = new ProducerRecord<String,Object>(TOPIC, p);
	    	producer.send(record);
	    
	    }
	       producer.close();
	}

自定义Serializer和Deserializer非常痛苦,而且上面还有很多异常情况没有处理,还有很多类型不支持,非常脆弱。复杂类型的支持更是一件痛苦的事情,不同版本之间的兼容性问题更是一个极大的挑战。由于Serializer和Deserializer影响到上下游系统,导致牵一发而动全身。自定义序列化&反序列化实现不是能力的体现,而是逗比的体现。所以强烈不建议自定义实现序列化&反序列化,推荐直接使用StringSerializer和StringDeserializer,然后使用json作为标准的数据传输格式。站在巨人的肩膀上,事半功倍。

Kafka中位移提交那些事儿

https://blog.csdn.net/weixin_45039616/article/details/107081148

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java代码kafka初始化producer和consumer 的相关文章

  • Flink Dashboard的数据监控功能

    一 数据反压 1 1 数据反压是啥 数据反压是在实时数据处理中 数据处理流的某个节点上游产生数据的速度大于该节点处理数据速度 导致数据堆积 从该节点向上游传递 一直到数据源 并降低数据源的摄入速度 导致数据反压出现的常见场景 比如 GC导致

随机推荐

  • Goland The selected directory is not a valid home for Go Sdk

    1 前言 初学 Golang 今天在配置好 Golang SDK 后 安装 goland IED 编辑器 在配置 goland GOROOT SDK 的过程中 一直报错如下 The selected directory is not a v
  • 什么是边缘计算(Edge AI)?

    什么是边缘计算 Edge AI 道翰天琼认知智能机器人平台API接口大脑为您揭秘 边缘AI发源于边缘计算 边缘计算也称为边缘处理 是一种将服务器放置在本地设备附近网络技术 这有助于降低系统的处理负载 解决数据传输的延迟问题 这样的处理是在传
  • KeyError: 'Spider not found: xxxx'

    保证确实由有Spider的情况下 可以查看你的scrapy cfg文件是否丢失
  • android studio 设置model设置为library

    如图所示 我的项目里面是两个model 我现在把第二个flowlayout设置为library来用 在App中引用flowlayout 为了防止今后忘记 特此标注一下 首先第一步 找到我们要做library的model的build文件 我这
  • element步骤条增加锚点的实现

    element的步骤条默认样式是无法点击的 需求中有点击步骤条 页面滚动到特定锚点需求 实现如下
  • 【云原生之Docker实战】使用Docker部署PhotoPrism照片管理系统

    云原生之Docker实战 使用Docker部署PhotoPrism照片管理系统 一 PhotoPrism介绍 1 PhotoPrism简介 2 PhotoPrism特点 二 检查宿主机系统版本 三 检查本地docker环境 1 检查dock
  • jumpserver堡垒机 (资源)

    23 5 jumpserver介绍 官网www jumpserver org 跳板机概述 跳板机就是一台服务器 开发戒运维人员在维护过程中首先要统一登录到这台服务器 然后再登录到目标 设备迚行维护和操作 堡垒机概述 堡垒机 即在一个特定的网
  • (附源码)springboot高校宿舍交电费系统 毕业设计031552

    Springboot高校宿舍交电费系统 摘 要 科技进步的飞速发展引起人们日常生活的巨大变化 电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用 信息时代的到来已成为不可阻挡的时尚潮流 人类发展的历史正进入一个新时代
  • 关于Python爬虫Scrapy在高并发下DNS查找失败解决方案

    使用场景 检测80w URL 可否打开 配置 高端配置 20 进程 500 CONCURRENT REQUESTS 运行一段时间后会有DNSLookup什么的错误 也就是查找超时 但是在浏览器里可以打开这个网页 首先做一些可能的无用功 爬虫
  • LeetCode——剑指 Offer 39. 数组中出现次数超过一半的数字

    剑指 Offer 39 数组中出现次数超过一半的数字 题目 数组中有一个数字出现的次数超过数组长度的一半 请找出这个数字 你可以假设数组是非空的 并且给定的数组总是存在多数元素 示例 1 输入 1 2 3 2 2 2 5 4 2 输出 2
  • Ajax简要分析使用

    先抛出一般结构 ajax type get url Stu Servlet data type select student id stu id message p success function data alert data 当然是j
  • Ubuntu22.04使用中文输入法

    安装的时候选择了英文安装 之后切换到中文 忘记还要写中文注释 发现在语言设置里不能添加输入法 仔细找了以下发现输入法的设置改到了键盘设置里 网络上查到的大部分都是老版本的ubuntu 这个是2204版本 输入法设置位置不同
  • 闪回事务查询+闪回事务查询案例

    闪回事务查询 1闪回事务查询是闪回版本查询的一个扩充 2闪回事务查询可以审计某个事务或者撤销一个已经提交的事务 闪回事务查询案例 测试数据 create table sct4 id number 4 name varchar2 20 ins
  • uos,qt,linuxdeployqt,qt-installer-framework, 生成安装包的记录

    注 使用源码生成安装包的环境要求 已安装QT v5 5 24 DTK QTcreator linuxdeployqt qt installer framework v5 9 的UOS v20 1 打开QTcreator 新建项目 2 选择侧
  • python随机生成验证码,数字+大小写字母

    ASCII码的对照链接 大写字母的十进制范围是 65 91 小写字母的十进制范围是 97 123 数字的十进制范围是 48 58 思路 1 先在空链表中添加大小写字母和数字 2 从列表中随机选择四个验证码 3 将列表转化成字符串输出 代码如
  • python 进行排序的两种方式 sort和sorted

    方法1 用List的成员函数sort进行排序 方法2 用内建函数sorted进行排序 sort函数定义 sort cmp None key None reverse False sorted函数定义 sorted iterable cmp
  • Cannot invoke “String.equalsIgnoreCase(String)“ because “code“ is null

    问题 同时开启多个项目 端口号不一致导致项目前后端错乱匹配 解决办法 后端 ruoyi admin下的application yml中的port 端口号 前端 vue config js里的port 端口号修改一致
  • cpp 解析HTML之 htmlcxx

    html与xml格式上比较相似 但xml不并一定能支持html的解析 这里介绍一个c 解析html的开源项目 htmlcxx 一 代码示例 1 项目源码下载之后 使用vs打开即可 默认为生成 lib静态库及MTd模式 可以在属性中修改指定为
  • httprunner测试框架3--har2case录制脚本

    har2case录制脚本 录制脚本 只是一个过渡 可以将录制的 har脚本快速转化成httprunner脚本文件 不能依靠录制 har2case可以将 har文件转化成yaml格式或者json格式的httprunner的脚本 可以借助fid
  • java代码kafka初始化producer和consumer

    目录 一 初始化producer对象 序列化消息 生产者发送消息的三种方式 kafka生产者其它详细知识 二 初始化consumer对象 反序列化消息 consumer取消订阅的方式consumer unsubscribe 使用自定义的序列