kafka Avro 多个主题的消息反序列化器

2024-05-09

我正在尝试以 avro 格式反序列化 kafka 消息,我使用以下代码:https://github.com/ivangfr/springboot-kafka-debezium-ksql/blob/master/kafka-research-consumer/src/main/java/com/mycompany/kafkaresearchconsumer/kafka/ReviewsConsumerConfig.java https://github.com/ivangfr/springboot-kafka-debezium-ksql/blob/master/kafka-research-consumer/src/main/java/com/mycompany/kafkaresearchconsumer/kafka/ReviewsConsumerConfig.java

上面的代码作为单个主题对我来说工作得很好,但我必须侦听来自多个主题的消息并创建多个 AvroGenerate 文件,但我陷入配置中,因为确认需要多个 avro 类型对象。请考虑以下问题:

https://github.com/ivangfr/springboot-kafka-debezium-ksql/issues/3 https://github.com/ivangfr/springboot-kafka-debezium-ksql/issues/3


在您的配置中使用以下行:

props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName() );

代替:

props.put( SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, mysql.researchdb.institutes.Value.class );
props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroWithSchemaDeserializer.class );

最终代码将是:

package com.moglix.netsuite.kafka;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class ReviewsConsumerConfig
{

@Value( "${kafka.bootstrap-servers}" )
private String bootstrapServers;

@Value( "${kafka.schema-registry-url}" )
private String schemaRegistryUrl;

@Value( "${kafka.reviews.start-offset}" )
private String orderStartOffset;

@Value( "${kafka.reviews.max-poll-records}" )
private Integer maxPollRecords;

@Bean
public <T> ConcurrentKafkaListenerContainerFactory<String, T> kafkaListenerContainerFactory()
{
    ConcurrentKafkaListenerContainerFactory<String, T> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory( consumerFactory1() );
    factory.setBatchListener( true );
    factory.getContainerProperties().setAckMode( ContainerProperties.AckMode.MANUAL );
    factory.getContainerProperties().setSyncCommits( true );
    return factory;
}

@Bean
public <T> ConsumerFactory<String, T> consumerFactory1()
{
    return new DefaultKafkaConsumerFactory<>( consumerConfigs1() );
}

@Bean
public Map<String, Object> consumerConfigs1()
{
    Map<String, Object> props = new HashMap<>();
    props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers );
    props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class );
    props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName() );
    props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, orderStartOffset );

    props.put( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl );
    props.put( KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true );
    props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());// This is main line for my problem solution
    //props.put( SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, mysql.researchdb.institutes.Value.class );

    props.put( ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords );
    props.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
    return props;
}

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka Avro 多个主题的消息反序列化器 的相关文章

  • 在Windows上安装Java 11 OpenJDK(系统路径问题)

    Java 11 最近发布了 众所周知 这个版本没有安装文件 当然 要在没有安装程序的情况下安装 Java 我将系统设置 PATH 和 JAVA HOME 设置为解压缩 Java 11 的文件夹的地址 根据对类似问题的已接受回复建议 唯一的事
  • 线程自动利用多个CPU核心?

    假设我的应用程序运行 2 个线程 例如渲染线程和游戏更新线程 如果它在具有多核 CPU 当今典型 的移动设备上运行 我是否可以期望线程在可能的情况下自动分配给不同的核心 我知道底层操作系统内核 Android linux内核 决定调度 我的
  • 解决错误:日志已在具有多个实例的atomikos中使用

    我仅在使用atomikos的实时服务器上遇到问题 在我的本地服务器上它工作得很好 我在服务器上面临的问题是 init 中出错 日志已在使用中 完整的异常堆栈跟踪 java lang RuntimeException Log already
  • Java8无符号算术

    据广泛报道 Java 8 具有对无符号整数的库支持 然而 似乎没有文章解释如何使用它以及有多少可能 有些函数 例如 Integer CompareUnsigned 很容易找到 并且似乎可以实现人们所期望的功能 但是 我什至无法编写一个简单的
  • Convert.FromBase64String 方法的 Java 等效项

    Java 中是否有相当于Convert FromBase64String http msdn microsoft com en us library system convert frombase64string aspx which 将指
  • java中删除字符串中的特殊字符?

    如何删除字符串中除 之外的特殊字符 现在我用 replaceAll w s 它删除了所有特殊字符 但我想保留 谁能告诉我我该怎么办 Use replaceAll w s 我所做的是将下划线和连字符添加到正则表达式中 我添加了一个 连字符之前
  • 如何在 Java 中禁用 System.out 以提高速度

    我正在用 Java 编写一个模拟重力的程序 其中有一堆日志语句 到 System out 我的程序运行速度非常慢 我认为日志记录可能是部分原因 有什么方法可以禁用 System out 以便我的程序在打印时不会变慢 或者我是否必须手动检查并
  • HDFS:使用 Java / Scala API 移动多个文件

    我需要使用 Java Scala 程序移动 HDFS 中对应于给定正则表达式的多个文件 例如 我必须移动所有名称为 xml从文件夹a到文件夹b 使用 shell 命令我可以使用以下命令 bin hdfs dfs mv a xml b 我可以
  • jdbc4.MySQLSyntaxErrorException:数据库中不存在表

    我正在使用 SpringBoot 开发一个网络应用程序 这是我的application properties文件来指定访问数据库的凭据 spring datasource driverClassName com mysql jdbc Dri
  • hibernate总是自己删除表中的所有数据

    您好 我正在开发一个 spring mvc 应用程序 它使用 hibernate 连接到存储文件的 mysql 数据库 我有两个方法 一个方法添加我选择的特定文件路径中的所有文件 另一种方法调用查询以返回从 mysql 存储的文件列表 问题
  • 请求位置更新参数

    这就是 requestLocationUpdates 的样子 我使用它的方式 requestLocationUpdates String provider long minTime float minDistance LocationLis
  • Spring Data 与 Spring Data JPA 与 JdbcTemplate

    我有信心Spring Data and Spring Data JPA指的是相同的 但后来我在 youtube 上观看了一个关于他正在使用JdbcTemplate在那篇教程中 所以我在那里感到困惑 我想澄清一下两者之间有什么区别Spring
  • 检查 protobuf 消息 - 如何按名称获取字段值?

    我似乎无法找到一种方法来验证 protobuf 消息中字段的值 而无需显式调用其 getter 我看到周围的例子使用Descriptors FieldDescriptor实例到达消息映射内部 但它们要么基于迭代器 要么由字段号驱动 一旦我有
  • Keycloak - 自定义 SPI 未出现在列表中

    我为我的 keycloak 服务器制作了一个自定义 SPI 现在我必须在管理控制台上配置它 我将 SPI 添加为模块 并手动安装 因此我将其放在 module package name main 中 并包含 module xml 我还将其放
  • Android JNI C 简单追加函数

    我想制作一个简单的函数 返回两个字符串的值 基本上 java public native String getAppendedString String name c jstring Java com example hellojni He
  • Netbeans 8 不会重新加载静态 Thymeleaf 文件

    我通过 Maven 使用 Spring Boot 和 Thymeleaf 当我进行更改时 我似乎无法让 Netbeans 自动重新部署我的任何 Thymeleaf 模板文件 为了看到更改 我需要进行完整的清理 构建 运行 这需要太长的时间
  • 查看Jasper报告执行的SQL

    运行 Jasper 报表 其中 SQL 嵌入到报表文件 jrxml 中 时 是否可以看到执行的 SQL 理想情况下 我还想查看替换每个 P 占位符的值 Cheers Don JasperReports 使用 Jakarta Commons
  • java迭代器内部是如何工作的? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我有一个员工列表 List
  • JAVA - 如何从扫描仪读取文件中检测到“\n”字符

    第一次海报 我在读取文本文件的扫描仪中读取返回字符时遇到问题 正在读取的文本文件如下所示 test txt start 2 0 30 30 1 1 90 30 0 test txt end 第一行 2 表示两个点 第二行 位置索引 0 xp
  • Jackson 将单个项目反序列化到列表中

    我正在尝试使用一项服务 该服务为我提供了一个带有数组字段的实体 id 23233 items name item 1 name item 2 但是 当数组包含单个项目时 将返回该项目本身 而不是包含一个元素的数组 id 43567 item

随机推荐

  • 管理文件字段当前 url 不正确

    在 Django 管理中 只要有 FileField 编辑页面上就会有一个 当前 框 其中包含指向当前文件的超链接 但是 此链接会附加到当前页面 url 因此会导致 404 因为不存在这样的页面 例如 http 127 0 0 1 8000
  • 当创建 Android Jetpack Compose AndroidView 的参数发生变化时,如何替换它?

    我有一个应用程序 显示封装在其中的几个不同视图AndroidView 在下面重现的简单示例中 这些只是TextView实例 问题是更改文本 在本例中循环显示三个不同的值 似乎不会更新应用程序显示的内容 sealed class AppVie
  • Java 中意外的负数

    import java util public class Prac9FibonacciNumbers public static void main String args int x new int 100 x 0 1 x 1 1 fo
  • 如何在 PHP 中递归删除目录及其全部内容(文件+子目录)? [复制]

    这个问题在这里已经有答案了 如何在 PHP 中删除目录及其全部内容 文件和子目录 手册页中的用户贡献部分rmdir http www php net rmdir包含一个不错的实现 function rrmdir dir if is dir
  • 内联 svg 不显示在 xhtml 中

    我创建了一个带有内联 SVG 的 XHTML 文件 当测试为 XHTML 时 它不会显示 但当测试为 HTML 时 它会显示 我已经搜索过互联网并相信我已经指定了正确的名称空间等 但是 我很困惑为什么它没有显示 请帮助我理解我做错了什么 注
  • 如何使用 UUID 生成唯一的正 Long

    我需要为我的数据库主键列生成唯一的长 ID 我以为我可以用UUID randomUUID getMostSignificantBits 但有时它也会产生一些负多头 这对我来说是个问题 是否可以从 UUID 中仅生成正长 将会有数十亿个条目
  • Apscheduler 运行一次然后抛出 TypeError

    我正在尝试每小时将某人的 soundcloud 关注者列表添加到数据库中 我有代码可以提取他们的关注者列表并将它们添加到数据库中 但是当我将它与 apscheduler 一起使用时 我遇到了错误 这是错误的示例 Traceback most
  • 如何获取调用函数的“this”值?

    如果我有一个这样的函数 function foo this console log this function bar bar prototype func function foo this var test new bar test f
  • 我可以仅在少数情况下关闭模拟吗

    我有一个始终使用模拟的应用程序 但是 当用户以管理员身份登录时 一些操作需要他们写入服务器本身 现在 如果这些用户在实际服务器上没有权限 有些用户没有 则不会让他们写入 我想做的是关闭几个命令的模拟 有没有办法做这样的事情 using Ho
  • 如何最好地实现多个重叠元素的翻转和推出事件?

    Problem 我正在开发一个网站 其中有一个 拨号盘 显示代表伞式公司不同部门的多个选项卡 目前我已经用 HTML CSS 准备好了一切 每个选项卡的定位 内圈处于较高位置z index因为选项卡在滚动时需要向外动画 我可以实现这部分 选
  • 如何从数据框的单元格中获取值?

    我构建了一个条件 从我的数据框中提取一行 d2 df df l ext l ext df item item df wn wn df wd 1 现在我想从特定列中获取一个值 val d2 col name 但结果 我得到一个包含一行和一列
  • 使用JS将图像的特定背景颜色设置为透明

    我正在使用以下代码来修改图像的透明度 然而 我想做的只是修改图像的背景颜色并将其 alpha 通道设置为 0 而不是整个图像 以下代码将整个图像的 Alpha 透明度设置为 0 var ctx this data getContext 2d
  • 加载 ini 文件时发生致命异常

    我的项目文件夹是 demo 其中有文件夹 application library 和 public 在应用程序文件夹中 我有一个名为 configs 的文件夹 其中有一个文件 application ini 其中包含我的数据库参数 因此 在
  • C# datagridview 列转入数组

    我正在用 C 构建一个程序 并在其中包含一个 datagridview 组件 datagridview 有固定数量的列 2 我想将其保存到两个单独的数组中 但行数确实发生了变化 我怎么能这样做呢 假设一个名为 dataGridView1 的
  • 具有适当后退按钮支持的 jQuery Lightbox

    在进行了一些可用性测试后 我发现参与者打开了jQuery 灯箱 http leandrovieira com projects jquery lightbox 查看更大的图像 然后 他们只需点击浏览器后退按钮 而不是单击 关闭 按钮 这会将
  • Symfony 4.1 组件 - 依赖注入问题

    我正在用 PHP 重构旧应用程序 我正在尝试使用 Symfony 依赖注入组件将服务注入控制器 或其他服务 但我不知道如何实现这一点 因为 symphony 文档比框架组件更适合使用框架 我已经有了自己的内核 包含所有服务和控制器的容器 控
  • HTML5 拖放 - 没有透明度?

    当我将一个元素拖放到页面上时 该元素会变成 幻影 基本上它获得了一些透明度值 有什么办法可以做到吗opacity 1 看来是做不到了 拖动的元素被放入具有自己的不透明度 低于 1 的容器中 这意味着虽然您可以降低拖动元素的不透明度 但您无法
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • 如何从最新版本的 Ubuntu (18.10) 运行使用 SystemD 的 Docker 容器?

    我正在尝试执行使用 ubuntu latest 构建的 Docker 映像 并且在运行容器时不断收到 SystemD 错误消息 System has not been booted with systemd as init system P
  • kafka Avro 多个主题的消息反序列化器

    我正在尝试以 avro 格式反序列化 kafka 消息 我使用以下代码 https github com ivangfr springboot kafka debezium ksql blob master kafka research c