kafka confluent schema registry 实现一个topic支持多个不同schema的表消费(包含报错信息及解决方式)

2023-11-01

背景:上篇文章已经说明confluent schema registry优点及如何实现。本文实现kafka confluent schema registry 一个topic多个不同结构的表消费需求

上篇文章:kafka Confluent Schema Registry 简单实践_温柔的小才的博客-CSDN博客

第一步

说明:在上篇文章基础上做修改。首先在原有topic下注册多个schema(这里注册两个做示范)。

#进入kafka的目录下执行,启动kafka
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
#注:kafka集群中每个节点kafka都需要启动
 
 
#进入confluent的目录下执行,启动Conflfluent Schema Registry
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties


#test-topic5下注册第一个schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"id\": \"id\", \"type\": \"int\"}, {\"time\": \"time\", \"type\": \"int\"}]}"}' \
http://hadoop01:8081/subjects/test-topic5-value/versions


#test-topic5下注册第二个schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id_num\", \"type\": \"int\"}, {\"name\": \"time\", \"type\": \"string\"}, {\"name\": \"number\", \"type\": \"int\"}]}"}' \ 
http://hadoop01:8081/subjects/test-topic5-value/versions

这样就在 http://hadoop01:8081/subjects/test-topic5-value/versions(这里是上面语句的网址,记得替换为自己节点的,语句中hadoop01请用自己节点ip替换)下注册了两个schema,可以用id查看。

查看id为1的网址:http://hadoop01:8081/subjects/test-topic5-value/versions/1

  查看id为2的网址:http://hadoop01:8081/subjects/test-topic5-value/versions/2

 说明可以注册多个schema

第二步

需要改一个配置:将compability设置为NONE

#将compability设置为NONE
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' \
http://hadoop01:8081/config

更改原因:注册新架构时,架构注册表服务器可以强制执行某些兼容性规则。当前,支持以下兼容性规则。

向后兼容(默认):如果新架构可用于读取所有先前架构中写入的数据,则该架构向后兼容。向后兼容性对于将数据加载到Hadoop等系统中很有用,因为人们始终可以使用最新架构查询所有版本的数据。

前向兼容性: 如果所有先前的模式都可以读取以该模式编写的数据,则新模式是前向兼容的。前向兼容性对于只能处理特定版本(不一定总是最新版本)中的数据的消费者应用程序很有用。

完全兼容:如果新模式既向后兼容又向前兼容,则完全兼容。

不兼容:新模式可以是任何模式,只要它是有效的Avro。

第三步:

之前文章中的代码做一下修改,注意下四个脚本topic为同一个(主要更改都在代码后面加了注释

生产者1号

public class product_kafka {
    public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +
            "\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " +
            "{\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop01:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 使用Confluent实现的KafkaAvroSerializer
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://hadoop01:8081");
        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);
        Random rand = new Random();
        int id = 0;
        while (id < 100) {
            id++;
            String name = "name" + id;
            int age = rand.nextInt(40) + 1;
            GenericRecord user = new GenericData.Record(schema);
            user.put("id", id);
            user.put("name", name);
            user.put("age", age);
            
//            ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>("test-topic6",user);  //修改前
            ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>("test-topic6","table_name1",user);  //修改后
            
            producer.send(record);
            Thread.sleep(1000);
        }

        producer.close();
    }
}

生产者2号

public class product_kafka2 {
    public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +
            "\"fields\": [{\"name\": \"id_num\", \"type\": \"int\"}, " +
            "{\"name\": \"time\",  \"type\": \"string\"}, {\"name\": \"number\", \"type\": \"int\"}]}";

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop01:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 使用Confluent实现的KafkaAvroSerializer
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://hadoop01:8081");
        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);
        Random rand = new Random();
        int id = 0;
        while (id < 100) {
            id++;
            String name = "2022" + id;
            int age = rand.nextInt(40) + 1;
            GenericRecord user = new GenericData.Record(schema);
            user.put("id_num", id);
            user.put("time", name);
            user.put("number", age);
//            ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>("test-topic6",user); //修改前
            ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>("test-topic6","table_name2",user);

            producer.send(record);
            Thread.sleep(1000);
        }

        producer.close();
    }
}

消费者一号(消费的是id为1,即表名为tablename1,表结构为id,name,age的表数据)

public class consumer_kafka {
    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop01:9092");
        props.put("group.id", "test1");
//        props.put("group.id", "test1");//修改前,主要是因为kafka特性,为了同组数据消费冲突,消费者需要不同组,消费者1为test1,消费者2为test2
        props.put("enable.auto.commit", "false");
        // 配置禁止自动提交,每次从头消费供测试使用
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 使用Confluent实现的KafkaAvroDeserializer
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://hadoop01:8081");
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList("test-topic6"));

        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000).toMillis());
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    GenericRecord user = record.value();
                    if (record.key().equals("table_name1")){   //结合生产者数据,加表判断。
                        System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
                                + user.get("name") + ", " + "user.age = " + user.get("age") + "], "
                                + "partition = " + record.partition() + ", " + "offset = " + record.offset());
                    }

                }
            }
        } finally {
            consumer.close();
        }
    }
}

消费者2号(消费的是id为2,即表名为tablename2,表结构为id_num,time,number的表数据)

public class consumer_kafka2 {
    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop01:9092");
        props.put("group.id", "test2");
//        props.put("group.id", "test2");//修改前,主要是因为kafka特性,为了同组数据消费冲突,消费者需要不同组,消费者1为test1,消费者2为test2
        props.put("enable.auto.commit", "false");
        // 配置禁止自动提交,每次从头消费供测试使用
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 使用Confluent实现的KafkaAvroDeserializer
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://hadoop01:8081");
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList("test-topic6"));

        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000).toMillis());
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    GenericRecord user = record.value();
                    if (record.key().equals("table_name2")){  //结合生产者数据,加表判断。
                        System.out.println("value = [user.id_num = " + user.get("id_num") + ", " + "user.time = "
                            + user.get("time") + ", " + "user.number = " + user.get("number") + "], "
                            + "partition = " + record.partition() + ", " + "offset = " + record.offset());
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }
}

启动后的结果展示:

消费者1号:

消费者2号 

报错及处理

1.Schema being registered is incompatible with an earlier schema

原因:注册的多个schema之间不兼容

解决方式:执行本文中第二步,将compability设置为NONE

#将compability设置为NONE
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' \
http://hadoop01:8081/config

2.org.apache.kafka.common.errors.SerializationException: Error serializing Avro message

原因:没有启动 confluent schema registry

解决方式:执行第一步中的

#进入confluent的目录下执行,启动Conflfluent Schema Registry
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

3.其他情况:检查kafka集群是否启动,节点ip是否配置正确。

有问题或者想法欢迎评论留言

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka confluent schema registry 实现一个topic支持多个不同schema的表消费(包含报错信息及解决方式) 的相关文章

随机推荐

  • BigInteger和BigDecimal的使用

    1 BigInteger类的常用方法 java math BigInteger类表示一个超大的整数 而且支持任意精度整数的四则运算 加减乘除 常用方法 方法 含义 public BigInteger String val 将 BigInte
  • 模型与动画作业

    一 智能巡逻兵 游戏设计要求 创建一个地图和若干巡逻兵 使用动画 每个巡逻兵走一个3 5个边的凸多边型 位置数据是相对地址 即每次确定下一个目标位置 用自己当前位置为原点计算 巡逻兵碰撞到障碍物 则会自动选下一个点为目标 巡逻兵在设定范围内
  • canvas正交坐标系旋转--监听滚轮

    canvas 简单学习canvas 功能实现 效果 简单学习canvas
  • Java读取excel的方式,一篇文章看懂(详细)

    目录 一 excel读取的两种方式 1 1 jxl 和 poi 的区别和选择 二 jxl 的使用 2 1 导入相关依赖 2 2 操作 三 poi 的使用 3 1 导入相关依赖 3 2 操作 四 总结 一 excel读取的两种方式 Java中
  • Java 简单控制台项目之家庭记账本 --- 凌宸1642

    项目一 家庭记账本 模拟实现一个基于文本界面的 模拟实现一个基于文本界面的 家庭记账软件 家庭记账软件 主要涉及以下知识点 变量的定义 基本数据类型的使用 循环语句 分支语句 方法声明 调用和返回值的接收 简单的屏幕输出格式控制 该软件能够
  • Oracle VM Virtualbox虚拟机教程

    Oracle Virtualbox的下载 我们使用不太操作系统的电脑来运行不同的功能和应用 常见的操作系统有Windows MacOS Linux Crome等等 通常我们在电脑上安装一个系统就不动了 当然也有双系统 但是比较麻烦 废时费力
  • proteus中的标签的使用方法。

    我们在proteus中绘制原理图的时候常常会遇到元器件的连线比较繁杂 导致原理图看起来比较乱 或者不是太好看 我们一般采用两种比较典型的方式来解决 一 标签 1 终端模式下的标签 终端模式下的标签须确保我们已经选择了终端模式 如下图 双击终
  • 使用Flask-Migrate迁移数据库

    1 安装 pipenv install flask migrate from flask import Flask from flask sqlalchemy import SQLAlchemy from flask migrate imp
  • static作用(修饰函数、局部变量、全局变量)

    转载自 http www cnblogs com stoneJin archive 2011 09 21 2183313 html 在C语言中 static的字面意思很容易把我们导入歧途 其实它的作用有三条 1 先来介绍它的第一条也是最重要
  • C++面试题:虚函数(virtual)可以是内联函数(inline)吗?

    原文链接 https github com huihut interview 答案 虚函数可以是内联函数 内联是可以修饰虚函数的 但是当虚函数表现多态性的时候不能内联 理由如下 内联是在发生在编译期间 编译器会自主选择内联 而虚函数的多态性
  • python 通过文件头获取文件类型mimetype

    一 MIME Type是什么 资源的媒体类型 MIME Multipurpose Internet Mail Extensions 多用途互联网邮件扩展类型 是设定某种扩展名的文件用一种应用程序打开的方式类型 当该扩展名文件被访问时 浏览器
  • 设计模式——代理模式

    代理模式概述 代理模式是Java开发中使用较多的一种设计模式 代理设计就是为其他对象提供一种代理以控制对这个对象的访问 代理类似中介的身份 应用场景 安全代理 屏蔽对真实角色的直接访问 远程代理 通过代理类处理远程方法调用 RMI 延迟加载
  • 调试HX711

    体重电路板HX711 1 下载程序 如果不正常用万用表测量输出电压是否正常 看BOOT键是否打开 RX TX是否接对 2 首先确保程序正确 I O口对应正确 3 连接体重计 如果串口接收数据不正常 首先检查称重传感器连线问题 然后用万用表测
  • Linux入门笔记-尚硅谷韩顺平-基础篇&实操篇

    文章目录 课程导论 基础篇 Linux入门 Linux介绍 Linux和Unix的关系 Linux和Windows比较 基础篇 Linux的目录结构 基本介绍 具体的目录结构 实操篇 vi和vim的使用 vi和vim的基本介绍 vi和vim
  • 节点编译问题 | FISCO BCOS开发问题排查

    1 源码编译慢 1 1 case1 先前没有编译过源码 修改 etc hosts文件 添加如下内容可加速依赖包的下载 140 82 113 4 github com 185 199 108 153 assets cdn github com
  • 【科普贴】SIM卡接口协议(ISO7816-3)详解

    一 SIM卡介绍 SIM Subscriber Identity Module 卡是GSM系统的移动用户所持有的IC卡 称为身份识别卡 SIM卡内部含有微处理器 它由CPU 8位 RAM 工作存储器 6 16KB ROM 程序存储器 3 8
  • Android.mk 语法详解

    Android mk 语法详解 转 http blog sina com cn s blog 602f8770010148ce html 0 Android mk简介 Android mk文件用来告知NDK Build 系统关于Source
  • 文件删不掉需要管理员权限?分享解决方法

    文件删不掉需要管理员权限 正常情况下 我们使用电脑时 遇到不需要的文件都可以直接手动删除 但有时候会出现无法删除文件的现象 提示 文件夹访问被拒绝 你需要提供管理员权限才能删除此文件 这时该怎么处理呢 下面就一起来看看吧 遇到需要管理员权限
  • 百度自动驾驶平台生态部负责人张亮:Apollo开放平台,连接技术场景 赋能人才生态

    社会的经济发展 国家政策的支持 科学技术的不断进步 消费者的购买热情 共同推动了自动驾驶行业的发展 自动驾驶汽车会使交通事故的发生率大大降低 方便更多的人开车出行 交通秩序变得更好 出行效率变得更高 2022年7月21日 由中国开源软件推进
  • kafka confluent schema registry 实现一个topic支持多个不同schema的表消费(包含报错信息及解决方式)

    背景 上篇文章已经说明confluent schema registry优点及如何实现 本文实现kafka confluent schema registry 一个topic多个不同结构的表消费需求 上篇文章 kafka Confluent