汇总目录链接:【Spring Boot实战与进阶】学习目录
文章目录
- 一、简介
- 二、集成Kafka消息队列
- 1、引入依赖
- 2、配置文件
- 3、测试生产消息
- 4、测试消费消息
一、简介
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
二、集成Kafka消息队列
1、引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.0</version>
</dependency>
2、配置文件
spring:
kafka:
bootstrap-servers: 192.168.60.123:9092
consumer:
group-id: test
enable-auto-commit: false
max-poll-records: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual_immediate
3、测试生产消息
@SpringBootTest
class ScriptApplicationTests {
@Resource
private KafkaTemplate kafkaTemplate;
@Test
void contextLoads() {
kafkaTemplate.send("delay-message", "123456");
}
}
看下Kafka的UI界面(Kafka-map):
![在这里插入图片描述](https://img-blog.csdnimg.cn/47f38299036c44308aa1ef8fbbc0f0c0.png)
4、测试消费消息
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(id = "test"
,topics = "delay-message"
,groupId = "test"
,concurrency = "1"
,autoStartup = "true")
public void ProductInsertEvent1(ConsumerRecord<String, String> record, Acknowledgment ack) {
Optional<String> kafkaMessage = Optional.ofNullable(record.value());
kafkaMessage.ifPresent(msg -> {
log.info("KafkaConsumer kafka value:{}", msg);
ack.acknowledge();
});
}
}
![在这里插入图片描述](https://img-blog.csdnimg.cn/a6232e04e2574675a765f894f3033d96.png)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)