我在 Spring Kafka 设置中使用 Avro 和 Schema 注册表。
我想以某种方式处理SerializationException
,在反序列化期间可能会抛出该异常。
我找到了以下两个资源:
https://github.com/spring-projects/spring-kafka/issues/164 https://github.com/spring-projects/spring-kafka/issues/164
如何配置 spring-kafka 忽略格式错误的消息? https://stackoverflow.com/questions/45584504/how-do-i-configure-spring-kafka-to-ignore-messages-in-the-wrong-format
这些资源建议我返回 null 而不是抛出SerializationException
反序列化并监听时KafkaNull
。这个解决方案效果很好。
然而,我希望能够抛出异常而不是返回 null。
KIP-161 https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers and KIP-210 https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce提供更好的功能来处理异常。我确实在 Spring Cloud 中找到了一些提到 KIP-161 的资源,但没有任何关于 Spring-Kafka 的具体信息。
有谁知道怎么抓SerializationException
在 Spring Boot 中?
我正在使用 Spring Boot 2.0.2
编辑:我找到了解决方案。
我宁愿抛出异常并捕获它,也不愿返回 null 或KafkaNull
。我在多个不同的项目中使用我的自定义 Avro 序列化器和反序列化器,其中一些不是 Spring。如果我更改了 Avro 序列化器和反序列化器,则需要更改其他一些项目以期望反序列化器返回 null。
我想关闭容器,这样我就不会丢失任何消息。在生产中永远不应该出现 SerializationException。 SerializationException 应该只有在架构注册表关闭或未格式化的消息以某种方式发送到生产 kafka 时才会发生。不管怎样,SerializationException 应该很少发生,如果发生,我想关闭容器,这样就不会丢失任何消息,并且我可以调查问题。
只需考虑一下,它将捕获您的消费者容器中的所有异常。在我的具体情况下,我只想在它是时才关闭SerializationException
public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
//Only call super if the exception is SerializationException
if (thrownException instanceof SerializationException) {
//This will shutdown the container.
super.handle(thrownException, records, consumer, container);
} else {
//Wrap and re-throw the exception
throw new KafkaException("Kafka Consumer Container Error", thrownException);
}
}
}
该处理程序被传递到消费者容器。下面是一个示例KafkaListenerContainerFactory
bean.
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setErrorHandler(new SerializationExceptionHandler());
factory.getContainerProperties().setTransactionManager(chainedTxM(jpa, kafka));
return factory;
}