1:整体架构流程
2:目录详情
- consumer 消息消费者项目
- provider 方法提供者项目
- mq RocketMQ项目
- point 打点接口项目
3:关键代码详解
3.1:mq项目
mq.properties
mq.defaultmqgroup=xd-mq-group //默认消息组名称
mq.namesrvaddr=localhost:9876 //rocketMQ地址,这里我用的是window docker启用的,具体说明在下面
mq.instancename=xd-rmq-instance
复制代码
PdBaseMqService
用于初始化RocketMQ 服务 和 提供生成消息方法
// 从配置文件中获取
Resource resource = new ClassPathResource("mq.properties");
Properties p = new Properties();
try {
p.load(resource.getInputStream());
} catch (IOException e) {
logger.error("配置文件异常");
}
producer = new DefaultMQProducer(String.valueOf(p.getProperty("mq.defaultmqgroup")));
producer.setNamesrvAddr(String.valueOf(p.getProperty("mq.namesrvaddr")));
producer.setInstanceName(String.valueOf(p.getProperty("mq.instancename")));
try {
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(2);
logger.info("MQ生产者,Producer Started...");
} catch (Exception e) {
logger.error("MQ生产者,异常:" + e.getMessage(), e);
}
复制代码
try {
Message message = new Message(topic, tags, JSON.toJSONString(object).getBytes());
logger.info("MQ生产者,【异步】发送消息:topic=" + topic + ",tags=" + tags);
// 异步发送
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//logger.info("消息发送结果:result=" + sendResult.getSendStatus().name() + ",msgId=" + sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
logger.error("消息发送结果,异常了:" + e.getMessage(), e);
}
}, TIMEOUT);
} catch (Exception e) {
logger.error("MQ生产者,发送消息,异常:" + e.getMessage(), e);
return false;
}
复制代码
ProducerMqService
定义 RocketMQ的Topic 和 Tag 及发送消息的方法
// 定义主题,一般一个项目,使用同一个
public static String TOPIC = "XdTopic";
//================================
// 定义Tag,实际业务
//================================
/**
* 应用事件打点
*/
public static String TAG_DOT_APP_EVENT_HIS = "tagDotAppEventHis";
public boolean sendDotAppEventHis(DotAppEventHisVo dotAppEventHisVo);
复制代码
ProducerMqServiceImpl
执发送消息的方法
@Override
public boolean sendDotAppEventHis(DotAppEventHisVo dotAppEventHisVo) {
try {
return PdBaseMqService.sendMsg(ProducerMqService.TOPIC, ProducerMqService.TAG_DOT_APP_EVENT_HIS, dotAppEventHisVo);
} catch (Exception e) {
logger.error("mq异步化:应用事件打点异常:" + e.getMessage(), e);
return false;
}
}
复制代码
3.2:provider方法提供者
ProducerMqServiceImpl
providers.xml dubbo xml方式封装方法
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 配置可参考 http://dubbo.io/User+Guide-zh.htm -->
<!-- 服务提供方应用名,用于计算依赖关系 -->
<dubbo:application name="dubbo-provider" owner="dubbo-provider" />
<!-- 定义 zookeeper 注册中心地址及协议 -->
<dubbo:registry protocol="zookeeper" address="localhost:2181" client="zkclient" />
<!-- 定义 Dubbo 协议名称及使用的端口,dubbo 协议缺省端口为 20880,如果配置为 -1 或者没有配置 port,则会分配一个没有被占用的端口 -->
<dubbo:protocol name="dubbo" port="-1" />
<!-- 实际业务 -->
<dubbo:service interface="com.xd.mq.service.ProducerMqService" ref="producerMqService" timeout="10000" />
<bean id="producerMqService" class="com.xd.mq.service.impl.ProducerMqServiceImpl" />
</beans>
复制代码
**注意:zookeeper 我用的是本地环境,换为你们的zookeeper即可 **
3.3:point打点项目
ProducerMqServiceImpl
consumers.xml 注册使用dubbo的方法
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 配置可参考 http://dubbo.io/User+Guide-zh.htm -->
<!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 -->
<dubbo:application name="md-webapi-dus" owner="md-webapi-dus" />
<!-- 定义 zookeeper 注册中心地址及协议 -->
<dubbo:registry protocol="zookeeper" address="localhost:2181" client="zkclient" />
<!-- 实际业务 -->
<dubbo:reference id="producerMqService" interface="com.xd.mq.service.ProducerMqService" />
</beans>
复制代码
3.2:consumer消费者
ConsumerMqServiceImpl 订阅消息
package com.xd.consumer.service.impl;
![image.png](https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/b7f657ba58ad4eeb981a4471acd14122~tplv-k3u1fbpfcp-watermark.image?)
import com.google.gson.Gson;
import com.xd.consumer.base.CmBaseMqService;
import com.xd.mq.service.ProducerMqService;
import com.xd.mq.vo.DotAppEventHisVo;
import com.xd.consumer.service.ConsumerMqService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* MQ消费者
*
* @author Minbo
*/
@Service
public class ConsumerMqServiceImpl implements ConsumerMqService {
protected static Logger logger = LoggerFactory.getLogger(com.xd.consumer.service.impl.ConsumerMqServiceImpl.class);
@Override
public boolean consume() {
try {
DefaultMQPushConsumer consumer = CmBaseMqService.getConsumer();
// 订阅所有消息
consumer.subscribe(ProducerMqService.TOPIC, "*");
logger.info("消费者,订阅所有消息");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
Gson gson = new Gson();
logger.info("消费者,准备消费消息:tag= " + msg.getTags());
// 区分不同Tag,不同处理方式
switch (msg.getTags()) {
// 应用事件打点
case ProducerMqService.TAG_DOT_APP_EVENT_HIS:
DotAppEventHisVo dotAppEventHisVo = gson.fromJson(new String(msg.getBody()),
DotAppEventHisVo.class);
logger.info("消费消息:得到内容为:" + dotAppEventHisVo.toString());
break;
default:
logger.error("无处理类型,请检查。tag=" + msg.getTags(), new RuntimeException("未知Tag类型"));
break;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
logger.info("消费者,启动成功...");
} catch (Exception e) {
logger.error("消费者,启动异常:" + e.getMessage(), e);
}
return true;
}
}
复制代码
注意事项: 需要自己备好 RocketMQ 和 zookeeper的调用地址,我是window用docker运行的打包镜像运行的,这两个地址需要更换成自己的 本次工程也已上传到 github,看这里:dus-system