我尝试配置阿帕奇卡夫卡 in 弹簧靴应用。我读了这个文档 https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-messaging.html#boot-features-kafka并按照以下步骤操作:
1)我将此行添加到aplication.yaml
:
spring:
kafka:
bootstrap-servers: kafka_host:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
2)我创建新主题:
@Bean
public NewTopic responseTopic() {
return new NewTopic("new-topic", 5, (short) 1);
}
现在我想用KafkaTemplate
:
private final KafkaTemplate<String, byte[]> kafkaTemplate;
public KafkaEventBus(KafkaTemplate<String, byte[]> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
但 Intellij IDE 的亮点是:
为了解决这个问题,我需要创建 bean:
@Bean
public KafkaTemplate<String, byte[]> myMessageKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
并传递给构造函数属性greetingProducerFactory()
:
@Bean
public ProducerFactory<String, byte[]> greetingProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka_hist4:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
但那么设置有什么意义呢?应用程序.yaml 是否需要创建ProducerFactory手动?