无法使用传入消息调用 Kafka Listener 方法

2024-01-19

我使用 Spring Boot 应用程序在 Kafka Producer 中将其转换为 toString() 来发送 JSON 数组,但在 Consumer 中收到以下错误:

org.springframework.kafka.listener.ListenerExecutionFailedException: 无法使用传入消息调用侦听器方法端点处理程序详细信息: 方法 [public void com.springboot.service.KafkaReciever.recieveData(com.springboot.model.Student,java.lang.String) 抛出 java.lang.Exception] 豆[com.springboot.service.KafkaReciever@5bb3d42d];嵌套异常是 org.springframework.messaging.converter.MessageConversionException: 无法处理消息;嵌套异常是 org.springframework.messaging.converter.MessageConversionException: 无法从 [java.lang.String] 转换为 [com.springboot.model.Student] 用于 GenericMessage [有效负载=[com.springboot.model.Student@5e40dc31, com.springboot.model.Student@235e68b8],标题= {kafka_offset = 45, kafka_receivedMessageKey=null,kafka_receivedPartitionId=0, kafka_receivedTopic=myTopic-kafkasender}], 失败消息=通用消息 [有效负载=[com.springboot.model.Student@5e40dc31, com.springboot.model.Student@235e68b8],标题= {kafka_offset = 45, kafka_receivedMessageKey=null,kafka_receivedPartitionId=0, kafka_receivedTopic=myTopic-kafkasender}]

配置文件:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.boot.server}")
    private String kafkaServer;

    @Value("${kafka.consumer.group.id}")
    private String kafkaGroupId;

    @Bean
    public ConsumerFactory<String, String> consumerConfig() {

         Properties props = new Properties();

         props.put("bootstrap.servers", "localhost:9092");
         props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
         props.put("message.assembler.buffer.capacity", 33554432);
         props.put("max.tracked.messages.per.partition", 24);
         props.put("exception.on.message.dropped", true);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put("segment.deserializer.class", DefaultSegmentDeserializer.class.getName());

         return new DefaultKafkaConsumerFactory(props, null, new StringDeserializer());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> listener = new ConcurrentKafkaListenerContainerFactory<>();
        listener.setConsumerFactory(consumerConfig());
        return listener;
    }
}

接收文件:

@Service
public class KafkaReciever {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReciever.class);

    @KafkaListener(topics = "${kafka.topic.name}", group = "${kafka.consumer.group.id}")
    public void recieveData(@Payload Student student, @Header(KafkaHeaders.MESSAGE_KEY) String messageKey) throws Exception{
        LOGGER.info("Data - " + student + " recieved");
    }
}

发布 JSON:

 [{
        "studentId": "Q45678123",
        "firstName": "Anderson",
        "lastName": "John",
        "age": "12",
        "address": {
          "apartment": "apt 123",
          "street": "street Info",
          "state": "state",
          "city": "city",
          "postCode": "12345"
        }
    },
    {
        "studentId": "Q45678123",
        "firstName": "abc",
        "lastName": "xyz",
        "age": "12",
        "address": {
          "apartment": "apt 123",
          "street": "street Info",
          "state": "state",
          "city": "city",
          "postCode": "12345"
        }
    }]

我得到以下消费者输出:

[com.springboot.model.Student@5e40dc31, com.springboot.model.Student@235e68b8]

无法反序列化 START_ARRAY 之外的 com.springboot.model.Student 实例

如果使用 json 反序列化器,您有一个列表,而不是单个学生

@Payload List<Student> student

或者,如果使用字符串解串器,您有一个 JSON 字符串,您必须手动解析它

receiveData(@Payload String student ... ) { 
    JsonNode data = new ObjectMapper().readTree(student); // for example, but should extract ObjectMapper to a field
}

关于您的其他输出,请参阅如何打印我的 Java 对象而不得到“SomeType@2f92e0f4”? https://stackoverflow.com/questions/29140402/how-do-i-print-my-java-object-without-getting-sometype2f92e0f4

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法使用传入消息调用 Kafka Listener 方法 的相关文章

随机推荐