1、什么是canal
canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)
2、canal使用场景
(1)阿里otter(阿里用于进行异地数据库之间的同步框架)中间件的一部分,这是原始场景;
(2)更新缓存:如果有大量的请求发送到mysql的话,mysql查询速度慢,QPS上不去,光查mysql可能会瘫痪,那就可以在前面加个缓存,这个缓存有2个主要的问题。一是缓存没有怎么办,二是数据不一致怎么办。对于第一个问题查缓存没有就查询mysql,mysql再往缓存中写一份。对于第二个问题,如果数据库修改了,那就采用异步的方式进行修改,启动一个canal服务,监控mysql,只要一有变化就同步缓存,这样mysql和缓存就能达到最终的一致性;
(3)抓取业务数据新增变化表,用于制作拉链表:做拉链表是需要有增加时间和修改时间的,需要数据今天新增和变化的数据,如果时间不全就没办法知道哪些是修改的。可以通过canal把变化的抽到自己的表里,以后数据就从这个表出。
(4)取业务表的新增变化数据,用于制作实时统计
3、canal工作原理
1. MySQL主备复制原理
canal的工作原理:把自己伪装成slave,从master复制数据。读取binlog是需要master授权的,因为binlog是加密的,授权分用户名密码才能读。master授权后不知道读他的binlog的是从机还是canal,他的所有传输协议都符合从机的标准,所以master一直以为是从机读的。
4、 Canal 实战
需求分析:
(1)、注册用户成功,需要发送邮箱信息;
方式1:我们使用AOP切面,注册成功后,发送mq消息,监听mq消息队列,发送邮件。这样会存在业务层直接有耦合关系
方式2:使用canal监听数据库,监听用户表,如果用户表有数据插入,发送mq消息到队列中。监听mq消息队列,发送邮件。这样监听发送邮件与注册的业务没有任何耦合关系;
(2)、修改或者删除或者增加数据后,redis缓存中数据没有同步更新
方式1:使用AOP切面,@CacheEvict(value = "ums", key = "'resource'", beforeInvocation = false),先执行修改或者新增或者删除的代码,执行完成后,删除redis中的key。下次执行查询的数据后,将数据写到redis中@Cacheable(value = "ums", key = "'resource'");
这种方式有个弊端:
1.修改、删除、新增,每个方法上都需要添加@CacheEvict,删除redis中的key
2.每次执行修改、删除、新增后,执行查询的时候,先要去数据库查询,然后在同步到redis中;
方式2:使用canal监听数据库,监听对应的表,如果表中有增、删、改,发送mq消息,监听到mq消息队列,则删除redis中的对应key,重新读取最新的数据写入。这样修改或者增加完成后,返回查询数据,数据是正确的,并且返回后查询不需要等待较长时间,因为开辟了其他线程消费mq消息;
环境配置
所有的应用都部署到docker容器上
MySQL开启 binlog 日志
1.docker查看mysql工作目录 docker inspect xx
MergedDir表示工作目录
2.进入工作目录
查看mysql配置文件
3.编辑mysql配置文件,在mysqld下增加如下配置
[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择ROW模式
server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可
4.重启docker容器
docker restart myMysql
2. RabbitMQ 队列创建
canal配置与启动
1.下载镜像
[root@localhost /]# docker run --name myCanal -d -p 11111:11111 canal/canal-server
-d 后台启动 --name 起名字 -p端口映射 canal/canal-server镜像名称
查看canal工作目录
docker inspect myCanal
进入工作目录,修改配置文件
我们不用linux进入工作目录了,我们使用idea的sftp的功能进入工作目录修改配置文件
修改配置文件信息
修改文件后上传到服务器
3、springboot整合canal
引入jar包
<!-- canal数据库监听,依赖的jar包。 需要注意canal监听的时候还需要mybatis-plus相关jar,没有监听的时候会报错 -->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
</dependency>
<!-- 依赖 数据库已经mybatis-plus相关jar包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
springboot配置文件,配置canal信息
#如果配置了数据库监听发送mq消息到队列中,这个配置就不要了,因为这个默认是按照tcp连接方式连接的,我们修改了配置文件,连接方式修改为了mq
canal:
server: 192.168.75.131:11111
destination: example
编写测试代码
import com.powernode.entity.UmsUser;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
import javax.annotation.Resource;
/**
* canal监听数据库,如果ums_user数据库插入数据,则发送邮件
* 注意:
* 如果我们先删除或者添加或者修改数据,服务启动后,依然会监听到。因此服务短暂的暂停不影响数据的完整性
*
* canal用在数据同步很好,例如:数据库更新,缓存也可以进行更新
*
* CanalTable 监听那个表
*/
@Component
@CanalTable(value = "ums_user")
public class UserHandler implements EntryHandler<UmsUser> {
@Resource
private JavaMailSender javaMailSender;
/**
* 监听添加
* @param umsUser 添加的umsUser对象数据
*/
public void insert(UmsUser umsUser) {
System.out.println("监听添加");
System.out.println("添加====" + umsUser);
//监听到添加数据后,进行发送邮件,不需要在往mq中发现消息,mq消费消息发送邮件了。彻底与业务层解耦了
}
/**
* 监听修改
* @param before 监听修改前user对象的被修改字段的数据,其他没有修改的都是null
* 修改前=======UmsUser(name=BB, phone=null, email=null, icon=null, password=null, active=null, sort=null, fileInfoListList=null, description=null)
* @param after 监听修改后的user对象的数据
*/
public void update(UmsUser before, UmsUser after) {
System.out.println("监听修改");
System.out.println("修改前=======" + before);
System.out.println("修改后=======" + after);
}
/**
* 监听删除
* @param umsUser 删除umsUser对象数据
*/
public void delete(UmsUser umsUser) {
System.out.println("监听删除");
System.out.println("删除====" + umsUser);
}
}
启动spring项目
修改ums_user表信息,监听到休息的信息。
注意:canal只监听,增删改,不监听查询
通过上面我们可以发现,通过canal监听可以,对数据库的表进行监听,结合EntryHandler实现的增删改的方法,我们拿到增加、删除、修改的数据。可以对数据进行操作。
例如。添加数据后,我们可以在增加方法里面发送邮件;
4、springboot + canal +rabbitmq 整合
上面那种方法数据库表变更后,就会canal就会监听到,调用对应的方法处理。
我们可以使用异步进行处理,表变更后,发送mq消息到队列,消费消息发送邮件;
继续修改canal instance.properties配置文件
修改canal.properties配置文件信息
删除springboot配置文件中canal配置信息,因为如果使用mq连接形式就不是tcp了
编写测试代码,ums_user表修改、添加、删除数据,发送邮件
package com.powernode.core;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* canal监听数据库所有的表,如果数据库ums_user表插入数据,就给email队列中发送mq消息
*/
@Slf4j
@Component
public class ListenerMysqlEmail {
public static final String QUEUE_NAME = "email";
@Resource
JavaMailSender javaMailSender;
@Value("${spring.mail.username}")
String from;
/**
* canal监控
* 消费mq, 判断如果监控到shop.ums_user表数据有更新或者新增或者删除,发送邮件
* @param message
*/
@RabbitListener(queues = QUEUE_NAME)
public void listenerMysqlEmailQueue(Message message) {
String body = new String(message.getBody());
List<String> typeList = Stream.of("INSERT", "UPDATE", "DELETE").collect(Collectors.toList());
Map<String, Object> result = JSONObject.parseObject(body, Map.class);
if("shop".equals(result.get("database")) && "ums_user".equals(result.get("table"))){
if(typeList.contains((String) result.get("type"))){
MimeMessage mimeMessage = javaMailSender.createMimeMessage();
try {
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage,true);
List<Map<String, Object>> data =(List)result.get("data");
for (Map<String, Object> map:data) {
String text = "<p>尊敬的" +map.get("name") + "先生/女士" + "</p><br/>" + "<p>系统为您创建了了用户,登录名是您的手机号或者邮箱</p></br>"
+ "<p>密码是" + map.get("password") +"</p>";
mimeMessageHelper.setSubject("系统消息");
mimeMessageHelper.setFrom(from);
mimeMessageHelper.setTo((String) map.get("email"));
mimeMessageHelper.setText(text);
javaMailSender.send(mimeMessage);
}
} catch (MessagingException e) {
e.printStackTrace();
log.error("邮件发送异常" + e);
}
}
}
}
}
5、编写测试代码,修改ums_resource表信息,删除redis缓存,重新刷新redis
package com.powernode.core;
import com.alibaba.fastjson.JSONObject;
import com.powernode.service.UmsResourceService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Component
public class ListenerResource {
@Resource
private UmsResourceService umsResourceService;
@Resource
private RedisTemplate redisTemplate;
/**
* 监听数据库
* 如果监听到了shop.ums_resource表数据更新或者删除,更新redis数据
* @param message
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "email", durable = "true"),
exchange = @Exchange(value = "canal.exchage"),
key = "canalEmail"
)
})
public void handleDataChange(Message message) {
String body = new String(message.getBody());
List<String> typeList = Stream.of("INSERT", "UPDATE", "DELETE").collect(Collectors.toList());
Map<String, Object> result = JSONObject.parseObject(body, Map.class);
if ("shop".equals(result.get("database")) && "ums_resource".equals(result.get("table"))) {
if (typeList.contains((String) result.get("type"))) {
//如果是增删改,删除redis中的key,重新将数据刷新到redis中
if (redisTemplate.delete("ums::resource")) {
umsResourceService.getAll();
}
}
}
}
}