文章目录
- 前言
- 一、本文使用环境:
- 二、安装zookeeper
- 二、安装kafka
- 三、整合到springboot
- 总结
前言
在springboot和消息中间件愈发广泛的使用的年代,掌握他们其中之一的整合有备无患
一、本文使用环境:
windows / idea / jdk1.8 / zookeeper3.6.3 / kafka_2.12-2.8.0.tgz / maven3.6.1 / offsetexplorer
二、安装zookeeper
1、下载地址:https://archive.apache.org/dist/zookeeper/
2、解压后在安装目录下(例:D:\zookeeper\zookeeper-3.6.3)创建文件夹data
3、进入conf文件夹将文件zoo_sample.cfg复制一份并重命名为zoo.cfg
4、修改zoo.cfg中的dataDir为第2步新建的data文件夹路径
5、进入bin目录 打开命令提示符输入zkServer.cmd运行zookeeper
提示:将zookeeper注册为windows服务自行查阅: https://www.cnblogs.com/alca0126/articles/14030586.html
二、安装kafka
1、下载地址:http://kafka.apache.org/downloads 提示:尽量下载不带src的版本,带src的包需要额外编译
2、解压后在config文件夹下(例:D:\kafka\kafka_2.12-2.8.0\config)找到并打开server.properties
(1) 找到并设置消息存放路径(例:log.dirs=D:\kafka\kafka_2.12-2.8.0\kafka-logs)
(2) 打开监听接口 listeners=PLAINTEXT://:9092 提示:默认是注释状态
(2)命令提示符运行 .\bin\windows\kafka-server-start.bat .\config\server.properties
提示:将kafka注册为windows服务自行查阅:https://blog.csdn.net/baidu_39212797/article/details/114145325
三、整合到springboot
1、创建springboot工程,快速创建地址:https://start.spring.io/
2、创建完成后导入idea中
3、创建以下包和类
(1)UserLog
package com.example.demo.bean;
import lombok.Data;
import lombok.experimental.Accessors;
@Data
@Accessors(chain = true)
public class UserLog {
private String userName;
private String userId;
private String state;
}
(2)UserLogConsumer
package com.example.demo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
@Slf4j
public class UserLogConsumer {
@KafkaListener(topics = {"user-log"})
public void consumer(ConsumerRecord<?,?> consumerRecord){
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
log.info("record =:" + kafkaMessage);
if(kafkaMessage.isPresent()){
Object message = kafkaMessage.get();
System.err.println("消费消息:"+message);
}
}
}
(3)TestController
package com.example.demo.controller;
import com.example.demo.producer.UserLogProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private UserLogProducer kafkaSender;
@RequestMapping(value = "/kafka")
public String trigger(){
for (int i = 0; i < 5; i++) {
kafkaSender.sendLog(String.valueOf(i));
}
return "触发kafka成功";
}
}
(4)UserLogProducer
package com.example.demo.producer;
import com.alibaba.fastjson.JSON;
import com.example.demo.bean.UserLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class UserLogProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendLog(String userId){
UserLog userLog = new UserLog();
userLog.setUserName("sjh").setUserId(userId).setState("0");
System.err.println("发送用户日志数据:"+userLog);
kafkaTemplate.send("user-log", JSON.toJSONString(userLog));
}
}
4、修改application.properties
spring.application.name=kafka-user
server.port=8081
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=localhost:9092
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=user-log-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
5、pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
6、启动项目后访问:localhost:8081/kafka 查看控制台输出
完成!
7、kafka可视化工具及使用自行查阅 :https://www.cnblogs.com/miracle-luna/p/11299345.html
8、zookeeper可视化工具自行查阅:https://www.cnblogs.com/xubao/p/10693932.html
总结
实践出真知
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)