前言
首先,你的JDK是否已经是8+了呢?
其次,你是否已经用上SpringBoot3了呢?
最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。
一、场景说明
这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据解析处理入库TDengine。
二、使用步骤
1.引入库
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
简简单单,就这一个依赖就够了。
2.配置
spring:
kafka:
bootstrap-servers: localhost:9092
client-id: dc-device-flow-analyze
consumer:
group-id: dc-device-flow-analyze-consumer-group
max-poll-records: 10
auto-offset-reset: earliest
enable-auto-commit: false
auto-commit-interval: 1000
producer:
acks: 1
batch-size: 4096
buffer-memory: 40960000
client-id: dc-device-flow-analyze-producer
compression-type: zstd
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 3
properties:
spring.json.add.type.headers: false
max.request.size: 126951500
listener:
ack-mode: MANUAL_IMMEDIATE
concurrency: 1
type: BATCH
xiaotian:
analyze:
device:
flow:
topic:
consumer: device-flow
3.消费
import com.xiaotian.datagenius.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class KafkaListenConsumer {
@Autowired
private DataTransService dataTransService;
@KafkaListener(topics = "${xiaotian.analyze.device.flow.topic.consumer}")
public void deviceFlowListen(List<ConsumerRecord> records, Acknowledgment ack) {
log.debug("=====设备流水deviceFlowListen消费者接收信息====");
try {
for (ConsumerRecord record : records) {
log.debug("---开启线程解析设备流水数据:{}", record.toString());
dataTransService.deviceFlowTransSave(record);
}
} catch (Exception e) {
log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);
} finally {
ack.acknowledge();
}
}
}
消费与SpringBoot2的写法一样,没有任何改变。
4.发布信息
import cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import com.easylinkin.datagenius.core.Result;
import com.easylinkin.datagenius.core.ResultGenerator;
import com.easylinkin.datagenius.vo.KafkaMessageVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@Slf4j
@RestController
@RequestMapping("/kafka/push")
public class KafkaPushController {
@Autowired
private KafkaTemplate kafkaTemplate;
@PostMapping("/sendMsg")
public Result sendMsg(@RequestBody KafkaMessageVo kafkaMessageVo) {
String topic = kafkaMessageVo.getTopic();
String msg = kafkaMessageVo.getMessage();
log.debug(msg);
JSON msgJson = JSONUtil.parseObj(msg);
CompletableFuture<SendResult<String, Object>> completableFuture = kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);
completableFuture.thenAccept(result -> {
log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
});
completableFuture.exceptionally(e -> {
log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), e);
return null;
});
return ResultGenerator.genSuccessResult();
}
}
这个发送信息就与springBoot2的写法一致了。原ListenableFuture类已过时了,现在SpringBoot3、JDK8+用CompletableFuture监听信息发送结果。
总结
1、SpringBoot3真香
2、Kafka的集成已经非常成熟了,资料也多。
我这里这个SpringBoot3集成Kafka发送信息目前觉得是独家,你能找到的应该都还是使用的ListenableFuture类。
好了,就写到这里,希望能帮到大家,uping!!!
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)