如何捕获Kafka-Spring中的反序列化错误?

2023-12-30

我正在启动一个使用 kafka 消息的应用程序。

我跟着Spring文档 https://docs.spring.io/spring-kafka/docs/2.2.4.RELEASE/reference/#error-handling-deserializer关于反序列化错误处理以捕获反序列化异常。我尝试过 failedDeserializationFunction 方法。

这是我的消费者配置类

@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        
        /*  Error Handling */
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);

        return consumerProps;
    }

    @Bean
    public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<>(NTCMessageBody.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

这是 BiFunction 提供者

public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {

    @Override
    public NTCMessageBody apply(byte[] t, Headers u) {
        return new NTCBadMessageBody(t);
    }

}

public class NTCBadMessageBody extends NTCMessageBody{

    private final byte[] failedDecode;

    public NTCBadMessageBody(byte[] failedDecode) {
        this.failedDecode = failedDecode;
    }

    public byte[] getFailedDecode() {
        return this.failedDecode;
    }

}

当我仅发送有关该主题的一条损坏的消息时,我收到此错误(循环中):

org.apache.kafka.common.errors.SerializationException:反序列化键/值时出错

我理解 ErrorHandlingDeserializer2 应该委托 NTCBadMessageBody 类型并继续消费。我还看到(在调试模式下)它从未进入 NTCBadMessageBody 类的构造函数。


使用错误处理反序列化器。

当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,2.2版本引入了ErrorHandlingDeserializer。该解串器委托给真正的解串器(键或值)。如果委托无法反序列化记录内容,ErrorHandlingDeserializer 将返回 DeserializationException,其中包含原因和原始字节。使用记录级 MessageListener 时,如果键或值包含 DeserializationException,则会使用失败的 ConsumerRecord 调用容器的 ErrorHandler。使用 BatchMessageListener 时,失败的记录会与批处理中的剩余记录一起传递给应用程序,因此应用程序侦听器有责任检查特定记录中的键或值是否为 DeserializationException。

您可以使用 DefaultKafkaConsumerFactory 构造函数,该构造函数采用键和值 Deserializer 对象,并连接到使用适当委托配置的适当 ErrorHandlingDeserializer 中。或者,您可以使用 ErrorHandlingDeserializer 使用的使用者配置属性来实例化委托。属性名称为 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS 和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS;属性值可以是类或类名

package com.mypackage.app.config;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.mypacakage.app.model.kafka.message.KafkaEvent;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import lombok.extern.slf4j.Slf4j;

@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String servers;

    @Value("${listener.group-id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> ListenerFactory() {
    
        ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        factory.setRetryTemplate(retryTemplate());
        factory.setErrorHandler(((exception, data) -> {
            /*
             * here you can do you custom handling, I am just logging it same as default
             * Error handler does If you just want to log. you need not configure the error
             * handler here. The default handler does it for you. Generally, you will
             * persist the failed records to DB for tracking the failed records.
             */
            log.error("Error in process with Exception {} and the record is {}", exception, data);
        }));

        return factory;

    }

    @Bean
    public ConsumerFactory<String, KafkaEvent> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,
                "com.mypackage.app.model.kafka.message.KafkaEvent");
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mypackage.app");

        return new DefaultKafkaConsumerFactory<>(config);
    }

    private RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        /*
         * here retry policy is used to set the number of attempts to retry and what
         * exceptions you wanted to try and what you don't want to retry.
         */
        retryTemplate.setRetryPolicy(retryPolicy());

        return retryTemplate;
    }

    private SimpleRetryPolicy retryPolicy() {
        Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();

        // the boolean value in the map determines whether exception should be retried
        exceptionMap.put(IllegalArgumentException.class, false);
        exceptionMap.put(TimeoutException.class, true);
        exceptionMap.put(ListenerExecutionFailedException.class, true);

        return new SimpleRetryPolicy(3, exceptionMap, true);
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何捕获Kafka-Spring中的反序列化错误? 的相关文章

  • JavaMail Gmail 问题。 “准备启动 TLS”然后失败

    mailServerProperties System getProperties mailServerProperties put mail smtp port 587 mailServerProperties put mail smtp
  • 解决错误:日志已在具有多个实例的atomikos中使用

    我仅在使用atomikos的实时服务器上遇到问题 在我的本地服务器上它工作得很好 我在服务器上面临的问题是 init 中出错 日志已在使用中 完整的异常堆栈跟踪 java lang RuntimeException Log already
  • 如何查找 Android 设备中的所有文件并将它们放入列表中?

    我正在寻求帮助来列出 Android 外部存储设备中的所有文件 我想查找所有文件夹 包括主文件夹的子文件夹 有办法吗 我已经做了一个基本的工作 但我仍然没有得到想要的结果 这不起作用 这是我的代码 File files array file
  • IntelliJ IDEA 创建的 JAR 文件无法运行

    我在 IntelliJ 中编写了一个跨越几个类的程序 当我在 IDE 中测试它时它运行良好 但是 每当我按照教程将项目制作成 jar 可执行文件时 它就不会运行 双击 out 文件夹中的文件时 该文件不会运行 并显示 无法启动 Java J
  • 在浏览器中点击应用程序时播放框架挂起

    我正在 Play 中运行一个应用程序activator run 也许 5 次中有 3 次 它会挂起 当我去http localhost 9000 它就永远坐在那里旋转 我看到很多promise timed out错误也 我应该去哪里寻找这个
  • Convert.FromBase64String 方法的 Java 等效项

    Java 中是否有相当于Convert FromBase64String http msdn microsoft com en us library system convert frombase64string aspx which 将指
  • 如何在 Java 中禁用 System.out 以提高速度

    我正在用 Java 编写一个模拟重力的程序 其中有一堆日志语句 到 System out 我的程序运行速度非常慢 我认为日志记录可能是部分原因 有什么方法可以禁用 System out 以便我的程序在打印时不会变慢 或者我是否必须手动检查并
  • Java 页面爬行和解析之 Crawler4j 与 Jsoup

    我想获取页面的内容并提取其中的特定部分 据我所知 此类任务至少有两种解决方案 爬虫4j https github com yasserg crawler4j and Jsoup http jsoup org 它们都能够检索页面的内容并提取其
  • 如何在jsp代码中导入java库?

    我有以下jsp代码 我想添加 java io 等库 我怎样才能做到这一点
  • 无法理解 Java 地图条目集

    我正在看一个 java 刽子手游戏 https github com leleah EvilHangman blob master EvilHangman java https github com leleah EvilHangman b
  • 序列化对象以进行单元测试

    假设在单元测试中我需要一个对象 其中所有 50 个字段都设置了一些值 我不想手动设置所有这些字段 因为这需要时间而且很烦人 不知何故 我需要获得一个实例 其中所有字段都由一些非空值初始化 我有一个想法 如果我要调试一些代码 在某个时候我会得
  • 如何将文件透明地传输到浏览器?

    受控环境 IE8 IIS 7 ColdFusion 当从 IE 发出指向媒体文件 例如 mp3 mpeg 等 的 GET 请求时 浏览器将启动关联的应用程序 Window Media Player 我猜测 IIS 提供文件的方式允许应用程序
  • 归并排序中的递归:两次递归调用

    private void mergesort int low int high line 1 if low lt high line 2 int middle low high 2 line 3 mergesort low middle l
  • 如何在 JFreeChart TimeSeries 图表上显示降雨指数和温度?

    目前 我的 TimeSeries 图表每 2 秒显示一个位置的温度 现在 如果我想每2秒显示一次降雨指数和温度 我该如何实现呢 这是我的代码 import testWeatherService TestWeatherTimeLapseSer
  • 检查 protobuf 消息 - 如何按名称获取字段值?

    我似乎无法找到一种方法来验证 protobuf 消息中字段的值 而无需显式调用其 getter 我看到周围的例子使用Descriptors FieldDescriptor实例到达消息映射内部 但它们要么基于迭代器 要么由字段号驱动 一旦我有
  • Java直接内存:在自定义类中使用sun.misc.Cleaner

    在 Java 中 NIO 直接缓冲区分配的内存通过以下方式释放 sun misc Cleaner实例 一些比对象终结更有效的特殊幻像引用 这种清洁器机制是否仅针对直接缓冲区子类硬编码在 JVM 中 或者是否也可以在自定义组件中使用清洁器 例
  • org.jdesktop.application 包不存在

    几天以来我一直在构建一个 Java 桌面应用程序 一切都很顺利 但是今天 当我打开Netbeans并编译文件时 出现以下编译错误 Compiling 9 source files to C Documents and Settings Ad
  • 将 JSON 参数从 java 发布到 sinatra 服务

    我有一个 Android 应用程序发布到我的 sinatra 服务 早些时候 我无法读取 sinatra 服务上的参数 但是 在我将内容类型设置为 x www form urlencoded 之后 我能够看到参数 但不完全是我想要的 我在
  • 当单元格内的 JComboBox 中有 ItemEvent 时,如何获取 CellRow

    我有一个 JTable 其中有一列包含 JComboBox 我有一个附加到 JComboBox 的 ItemListener 它会根据任何更改进行操作 但是 ItemListener 没有获取更改的 ComboBox 所在行的方法 当组合框
  • 休眠以持久保存日期

    有没有办法告诉 Hibernate java util Date 应该持久保存 我需要这个来解决 MySQL 中缺少的毫秒分辨率问题 您能想到这种方法有什么缺点吗 您可以自己创建字段long 或者使用自定义的UserType 实施后User

随机推荐