我使用 Spring Boot 应用程序在 Kafka Producer 中将其转换为 toString() 来发送 JSON 数组,但在 Consumer 中收到以下错误:
org.springframework.kafka.listener.ListenerExecutionFailedException:
无法使用传入消息调用侦听器方法端点处理程序详细信息:
方法 [public void com.springboot.service.KafkaReciever.recieveData(com.springboot.model.Student,java.lang.String)
抛出 java.lang.Exception]
豆[com.springboot.service.KafkaReciever@5bb3d42d];嵌套异常是
org.springframework.messaging.converter.MessageConversionException:
无法处理消息;嵌套异常是
org.springframework.messaging.converter.MessageConversionException:
无法从 [java.lang.String] 转换为
[com.springboot.model.Student] 用于 GenericMessage
[有效负载=[com.springboot.model.Student@5e40dc31,
com.springboot.model.Student@235e68b8],标题= {kafka_offset = 45,
kafka_receivedMessageKey=null,kafka_receivedPartitionId=0,
kafka_receivedTopic=myTopic-kafkasender}],
失败消息=通用消息
[有效负载=[com.springboot.model.Student@5e40dc31,
com.springboot.model.Student@235e68b8],标题= {kafka_offset = 45,
kafka_receivedMessageKey=null,kafka_receivedPartitionId=0,
kafka_receivedTopic=myTopic-kafkasender}]
配置文件:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.boot.server}")
private String kafkaServer;
@Value("${kafka.consumer.group.id}")
private String kafkaGroupId;
@Bean
public ConsumerFactory<String, String> consumerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
props.put("message.assembler.buffer.capacity", 33554432);
props.put("max.tracked.messages.per.partition", 24);
props.put("exception.on.message.dropped", true);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put("segment.deserializer.class", DefaultSegmentDeserializer.class.getName());
return new DefaultKafkaConsumerFactory(props, null, new StringDeserializer());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> listener = new ConcurrentKafkaListenerContainerFactory<>();
listener.setConsumerFactory(consumerConfig());
return listener;
}
}
接收文件:
@Service
public class KafkaReciever {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReciever.class);
@KafkaListener(topics = "${kafka.topic.name}", group = "${kafka.consumer.group.id}")
public void recieveData(@Payload Student student, @Header(KafkaHeaders.MESSAGE_KEY) String messageKey) throws Exception{
LOGGER.info("Data - " + student + " recieved");
}
}
发布 JSON:
[{
"studentId": "Q45678123",
"firstName": "Anderson",
"lastName": "John",
"age": "12",
"address": {
"apartment": "apt 123",
"street": "street Info",
"state": "state",
"city": "city",
"postCode": "12345"
}
},
{
"studentId": "Q45678123",
"firstName": "abc",
"lastName": "xyz",
"age": "12",
"address": {
"apartment": "apt 123",
"street": "street Info",
"state": "state",
"city": "city",
"postCode": "12345"
}
}]
我得到以下消费者输出:
[com.springboot.model.Student@5e40dc31, com.springboot.model.Student@235e68b8]