如何使用 Spring-Kafka 通过 Confluence Schema 注册表读取 AVRO 消息?

2023-11-27

如何使用 Spring-Kafka 通过 Confluence Schema 注册表读取 AVRO 消息?有样品吗?我在官方参考文档中找不到它。


下面的代码可以读取 customer-avro 主题的消息。这是我定义的值的 AVRO 模式。

{
     "type": "record",
     "namespace": "com.example",
     "name": "Customer",
     "version": "1",
     "fields": [
       { "name": "first_name", "type": "string", "doc": "First Name of Customer" },
       { "name": "last_name", "type": "string", "doc": "Last Name of Customer" },
       { "name": "age", "type": "int", "doc": "Age at the time of registration" },
       { "name": "height", "type": "float", "doc": "Height at the time of registration in cm" },
       { "name": "weight", "type": "float", "doc": "Weight at the time of registration in kg" },
       { "name": "automated_email", "type": "boolean", "default": true, "doc": "Field indicating if the user is enrolled in marketing emails" }
     ]
}

下面是一个完整的代码片段,用于通过手动提交来读取此示例。

import com.example.Customer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Calendar;
import java.util.Collections;
import java.util.Properties;

public class KafkaAvroJavaConsumerV1Demo {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // normal consumer
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.put("group.id", "customer-consumer-group-v1");
        properties.put("auto.commit.enable", "false");
        properties.put("auto.offset.reset", "earliest");

        // avro part (deserializer)
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
        properties.setProperty("specific.avro.reader", "true");

        KafkaConsumer<String, Customer> kafkaConsumer = new KafkaConsumer<>(properties);
        String topic = "customer-avro";
        kafkaConsumer.subscribe(Collections.singleton(topic));

        System.out.println("Waiting for data...");

        while (true){
            System.out.println("Polling at " + Calendar.getInstance().getTime().toString());
            ConsumerRecords<String, Customer> records = kafkaConsumer.poll(1000);

            for (ConsumerRecord<String, Customer> record : records){
                Customer customer = record.value();
                System.out.println(customer);
            }

            kafkaConsumer.commitSync();
        }
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 Spring-Kafka 通过 Confluence Schema 注册表读取 AVRO 消息? 的相关文章

随机推荐

  • iTunes 应用程序加载器 - 自动化

    最近 Apple 更改了 iTunes Connect 界面 要求人们使用应用程序加载器上传应用程序 这很好 但我需要一个脚本来自动化我的工作 像 App Loader 这样的应用程序如何实现自动化 我正在考虑用 AppleScript 编
  • Dispose 方法的线程安全性?

    MSDN 很好地记录了 BCL 类型的实例成员的线程安全性 但我从未真正看到过信息表明如何Dispose的方法IDisposable可以调用类型 Is the Dispose方法a 保证对所有类都是线程安全的 b 从不保证是线程安全的 c
  • 如何在空安全 Dart 中将 List 转换为 List

    我有一个飞镖清单 List
  • Qt Creator 的编译器配置

    我是 Qt 及其 IDE Qt Creator 的新手 请原谅我的无知 当我尝试编译代码时出现异常 Qt Creator 需要设置一个编译器来构建 在套件选项中配置编译器 我已经安装了 MingGW 来编译 C 代码 但是 我在为 Qt C
  • 将多个绘图保存在一个 PDF 文件中

    绘图模块 def plotGraph X Y fignum random randint 0 sys maxint plt figure fignum Plotting arrangements return fignum 主模块 impo
  • 在 PostMan 中模拟特定的 CURL

    我正在使用 Postman 测试对 API 服务器的一些 Curl 请求 API开发人员给了我们curl命令 但我无法从邮递员发送它 如何向邮递员提出这样的请求 curl X POST https api server com API in
  • 向多个表插入数据的最佳方式 MVC ASP

    我有4张桌子 操作表 客户端表 客户端详细信息 操作资源 客户表 ClientID Name Surname Birthday 版本编号 客户详情 ClientID Email Adress 电话 操作表 操作ID Date Time Cl
  • C语言中“<<”是什么意思?

    这是什么意思 define WS RECURSIVE 1 lt lt 0 我明白它将定义WS Recursive 1 lt lt 0 但什么是 lt lt mean Thanks lt lt is the 左移运算符 正在转移号码1向左转0
  • 对于 Node.js 上的并发任务,哪个更好?纤维?网络工作者?或线程?

    我前段时间偶然发现了node js并且非常喜欢它 但很快我发现它严重缺乏执行CPU密集型任务的能力 因此 我开始谷歌搜索并得到了解决问题的答案 Fibers Webworkers 和 Threads thread a gogo 现在使用哪一
  • 更改 FlowDirection 时复选框勾选镜像

    操作系统 微软Windows 8 1 开发应用程序 Microsoft Visual Studio 2013 WPF应用程序开发 我已经从 Windows 7 迁移到 Windows 8 1 现在我在 VS2012 中开发的旧应用程序有一种
  • 关于c中“非声明”的优化?

    在学习编译器优化时 我在中编写代码C under Linux with GCC版本gcc version 4 4 5 Ubuntu Linaro 4 4 4 14ubuntu5 1 要了解not a statement nop 用C语言 我
  • 用于在列表框中添加/删除项目的事件 c#.NET

    我有一个列表框控件item动态添加和手动删除 由于 删除项目 按钮 当项目数量发生变化时 我想更新用户界面的其他部分 即 您必须选择一些文件 的标题 和项目计数标题 如何添加事件处理程序或有效地添加事件处理程序以在项目数量更改时触发 例如一
  • 通过Web服务从服务器发送pdf文件到客户端

    由于我是网络服务新手 您能告诉我问题的答案吗 我的问题是 我想实现一个 Web 服务 当客户端调用此 Web 服务时 该服务将 pdf 文件发送到客户端 请有人帮我提供一段不错的代码或解释 现在有人可以解决我的错误吗 12 23 09 42
  • PreRenderView 在每次回发时增量调用

    我对执行的顺序和数量有疑问f event type preRenderView 在我在这里搜索的过程中 我像往常一样找到了 BalusC 的答案this and this与我的问题相关的帖子 仍然给我留下了两个问题 当我放一个f event
  • 黑莓模拟器无法连接到互联网

    我的 BB 模拟器无法连接到互联网 我使用代理 并在 HTTP HANDLER 标题下的 rimpublic property 中输入了以下内容 application handler http proxyEnabled true appl
  • GLSL-ES 2.0 中顶点属性是否可以是数组?

    在 GLSL ES 中可以有数组 例如 GLSL ES 规格给出了以下作为数组的统一变量的示例 uniform vec4 lightPosition 4 顶点属性是否可以是数组 换句话说 根据规范 以下内容合法吗 attribute vec
  • 如何在实体框架代码优先中删除表?

    我正在使用具有自动迁移功能的实体框架 因此 当我向上下文添加新模型时 我的数据库会更新并创建新表 我想做的恰恰相反 将表从数据库中完全删除 但是 从 Context 类中删除定义不起作用 public class CompanyContex
  • Xcode:使用核心图像与 alpha 进行合成

    我想创建一个 CoreImage 滤镜链 并能够通过将其单独效果与 alpha 或不透明度设置合成来控制链中每个滤镜的 强度 但我没有看到与 alpha 合成的方法或文档中的不透明度 我猜我可以跳出核心图像过滤器链并与核心图形上下文进行合成
  • 将 JavaScript 变量值传递给输入类型隐藏值

    我想将两个整数的乘积值分配给 html 文档中已有的隐藏字段 我正在考虑获取 javascript 变量的值 然后将其传递给隐藏的输入类型 我很难解释 但这就是它应该如何工作 脚本示例 上面计算了产品 我希望产品位于隐藏字段中
  • 如何使用 Spring-Kafka 通过 Confluence Schema 注册表读取 AVRO 消息?

    如何使用 Spring Kafka 通过 Confluence Schema 注册表读取 AVRO 消息 有样品吗 我在官方参考文档中找不到它 下面的代码可以读取 customer avro 主题的消息 这是我定义的值的 AVRO 模式 t