一. 基础概述
-
Bus 消息总线,支持 RabbitMQ 和 Kafka 消息,整合了 Java 事件处理机制和消息中间件功能,将分布式系统节点与消息系统连接起来的框架
-
通过 Bus 消息总线,实现管理和传播分布式系统间的消息,就像一个分布式执行器,可以用于广播消息,事件推送,实现指定功能,可以当做微服务,服务与服务之间的通信通道
-
通过 config server 分布式配置中心+ Bus 消息总线的组合使用,解释基本原理: 消息总线内部集成了消息中间件,连接到 Config Server 配置中心服务的 Client 服务,监听 server 上消息中间件的指定 topic 主题,当 server 端接收到刷新请求时,将请求以 topic 主题模式进行广播给订阅该主题的 Client 端,进而实现一次执行多处生效,与指定生效的功能
-
有两种方式:一是通过 Config Server 端接收请求然后广播给连接到该 server 的 Client端,还有一种通过某个 Client 端接收请求,将请求(推荐使用方式一通过 Server 端接收请求进行广播,通过Client 端接收的弊端:Client 时一个单纯的微服务,违背了单一职责)
-
使用 Bus 消息总线首先要安装启动消息中间件,可以是 RabbitMq 或 kafka
二. Config 分布式服务配置中心 + Bus 消息总线 实现数据刷新
Config server 服务
- pom 文件引入依赖(需要暴露刷新端口,引入 spring-boot-starter-actuator)
<!--config server-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<!-- 添加消息总线RabbitMQ支持 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<!--actuator 会用到 web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
- application.yml 文件中增加整合 RabbitMQ 配置,与暴露刷新端口配置
server:
port: 3344 #当前服务端口号
spring:
application:
name: cloud-config-center-service #当前服务名称
cloud:
config:
server:
git:
#连接到的 gitHub 存放配置文件的仓库地址
uri: https://github.com/weishuaiaaaa/springcloud-config01.git
#搜索仓库中"config01"文件夹下的配置文件,仓库中必须要有该文件夹
search-paths:
- config01
#gitHub 登入用户名密码
username: weishuaiaaaa@163.com
password: 1994sss
##读取仓库的哪个分支
label: master
#Bus 消息总线使用,配置 rabbitMQ 消息中间件
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
#rabbitmq相关设置 ,暴露 bus刷新配置的端点
management:
endpoints:
web:
exposure:
include: 'bus-refresh'
#=================eureka相关配置======================
eureka:
client:
register-with-eureka: true #true 表示将当前服务注册到eureka注册中心
#true 表示是否在注册中心抓取已有的注册信息,集群环境时必须为true,配合ribbon进行负载
fetchRegistry: true
service-url:
#eureka 注册中心访问连接,集群环境多个注册地址
defaultZone: http://127.0.0.1:7001/eureka,http://127.0.0.1:7002/eureka
instance:
instance-id: configService3344 #配置当前服务向eureka注册中心注册时显示的服务器主机名称
prefer-ip-address: true #配置在开发人员直接访问eureka服务器时显示当前eureka上注册的服务的ip
lease-renewal-interval-in-seconds: 1 #指定定时向eureka注册中心发送代表当前服务心跳包的时间默认30秒
lease-expiration-duration-in-seconds: 2 # Eureka 接收到当前服务最后一次发送代表正常心跳包的等待时间,超过则将当前服务在 Eureka 上踢除
#=================eureka相关配置end======================
- ``启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.config.server.EnableConfigServer;
@SpringBootApplication
@EnableConfigServer
public class ConfigCenterMain3344 {
public static void main(String[] args) {
SpringApplication.run(ConfigCenterMain3344.class, args);
}
}
通过 Config Server 端读取配置文件的 Client 服务
此处就一一个 3355 服务为示例,可能还会存在多个读取配置文件的 Client 服务
- pom 文件中引入依赖
<!-- 添加消息总线RabbitMQ支持 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<!--config Client 依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
- bootstrap.yml 文件中增加 RabbitMQ 配置与暴露刷新端点配置
server:
port: 3355
spring:
application:
name: config-client3355
cloud:
#Config客户端配置
config:
label: master #分支名称
name: config #配置文件名称
profile: dev #读取后缀名称 上诉3个综合就是 master分支上 config-dev.yml
uri: http://localhost:3344
#注意点此处的rabbitmq 配置是与上面的spring对齐的否则会报错
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
#暴露监控端点
management:
endpoints:
web:
exposure:
include: "*"
#=================eureka相关配置======================
eureka:
client:
register-with-eureka: true #true 表示将当前服务注册到eureka注册中心
#true 表示是否在注册中心抓取已有的注册信息,集群环境时必须为true,配合ribbon进行负载
fetchRegistry: true
service-url:
#eureka 注册中心访问连接,集群环境多个注册地址
defaultZone: http://127.0.0.1:7001/eureka,http://127.0.0.1:7002/eureka
instance:
instance-id: configClient3355 #配置当前服务向eureka注册中心注册时显示的服务器主机名称
prefer-ip-address: true #配置在开发人员直接访问eureka服务器时显示当前eureka上注册的服务的ip
lease-renewal-interval-in-seconds: 1 #指定定时向eureka注册中心发送代表当前服务心跳包的时间默认30秒
lease-expiration-duration-in-seconds: 2 # Eureka 接收到当前服务最后一次发送代表正常心跳包的等待时间,超过则将当前服务在 Eureka 上踢除
#=================eureka相关配置end======================
- 启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
@SpringBootApplication
@EnableEurekaClient
public class ConfigClientMain3355 {
public static void main(String[] args) {
SpringApplication.run(ConfigClientMain3355.class,args);
}
}
最终实现的效果
- 假设修改了 gitHub 上的配置文件,在不重启 Client 服务的情况下向让 Client 获取到新的数据,在以前,通过手动执行一个 post 请求,手动触发指定 Client 刷新获取新的数据,有几个 Client 就要请求几个
- 现在可以通过手动执行 Post 请求,触发 Server 端刷新,通过Server 端将刷新请求广播给连接该 Server 的所有 Client 端,并且可以在请求的末尾添加 “Client 服务名称:服务端口号” 指定哪个 Client 生效,不指定的则不会去执行刷新
- 发送 Post 请求,设置连接到该 server 所有 Client 执行刷新获取新的数据
“http://config server ip 地址: server端口号/actuator/Config Server 服务中通过yml配置的暴露的刷新路径”
- 发送 Post 请求,设置连接到该 server 指定 Client 执行刷新获取新的数据
“http://config server ip 地址: server端口号/actuator/Config Server 服务中通过yml配置的暴露的刷新路径/client服务名称:服务端口号”
- 当启动 Config Server 端与 Client 端后,底层实际通过 Bus 消息总线整个的消息中间件,创建了一个队列,连接 Server 端的 Client 监听该队列,当接收到请求后,Server 端会议 Topic 模式向队列中发送消息,匹配到的 Client 接收消息进行处理,执行刷新功能