阿里云服务器安装及部署canal
1、环境部署
1.1、mysql开启binlog模式
(1)查看当前mysql是否开启binlog模式。
SHOW VARIABLES LIKE '%log_bin%'
如果log_bin的值为OFF是未开启,为ON是已开启。
(2)修改/etc/my.cnf 需要开启binlog模式。
[mysqlId]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
修改完成之后,重启mysqld的服务。
(3)进入mysql
mysql -h localhost -u root -p
(4)创建账号 用于测试使用
使用root账号创建用户并授予权限
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
1.2、 canal服务端安装配置
(1)下载地址canal
https://github.com/alibaba/canal/releases/tag/canal-1.0.24
(2)下载之后 上传到linux系统中,解压缩到指定的目录/usr/local/canal
解压缩之后的目录结构如下:
(3)修改 exmaple下的实例配置
vi conf/example/instance.properties
修改如图所示的几个参数。
(4)启动服务:
[root@localhost canal]# ./bin/startup.sh
(5)查看日志:
cat /usr/local/canal/logs/canal/canal.log
这样就表示启动成功了。(注意,红框中的内容为启动canal的ip及端口号,ip和端口号配置在usr/local/canal/conf中的canal.properties)
通过图片中的红框所示,其实我在这里并没有配置canal.ip的值,只配置了canal.port的值,这也是canal的默认端口号11111,如果这里没有配置ip地址,启动canal会自动分配一个ip地址,如第五步查看日志的第一张图所示;(ps:这里我试过自定义阿里云的地址,发现无法启动,最好还是留空让它自动配置吧!)
1.3数据监控微服务
当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。我们就可以将解析出来的数据进行相应的逻辑处理。
我们这里使用的一个开源的项目,它实现了springboot与canal的集成。比原生的canal更加优雅。
https://github.com/chenqian56131/spring-boot-starter-canal
使用前需要将starter-canal安装到本地仓库。(如何安装到本地仓库百度即可)
我们可以参照它提供的canal-test,进行代码实现。
1.3.1、微服务搭建
(1)创建工程模块changgou_canal,pom引入依赖
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
(2)创建包com.项目名.canal ,包下创建启动类
@SpringBootApplication
@EnableCanalClient //声明当前的服务室canal的客户端
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class,args);
}
}
(3)添加配置文件application.properties
canal.client.instances.example.host=xxx.xxx.xxx.xx #注意,这里填写的ip地址并非canal日志里的自动分配的启动ip地址,而是要填写自己的阿里云地址(始终canal是在自己的阿里云上部署,使用阿里云地址链接即可)
canal.client.instances.example.port=11111
canal.client.instances.example.batch-size=1000
spring.rabbitmq.host=xxx.xxx.xxx.xx #ip地址同上
(4)创建com.项目名.canal.listener包,包下创建类
@CanalEventListener //声明当前的类是canal的监听类
public class BusinessListener {
/**
*
* @param eventType 当前操作数据库的类型
* @param rowData 当前操作数据库的数据
*/
@ListenPoint(schema = "数据库名称",table = "监听的表名称") //监听的数据库与表
public void adUpdate(CanalEntry.EventType eventType,CanalEntry.RowData rowData){
System.out.println("表数据发生改变");
//获取改变之前的数据
rowData.getBeforeColumnsList().forEach((c) -> System.out.println("改变前的数据:"+c.getName()+"::"+c.getValue()));
//获取改变之后的数据
rowData.getAfterColumnsList().forEach((c) -> System.out.println("改变之后的数据:"+c.getName()+"::"+c.getValue()));
}
}
测试:启动数据监控微服务,修改监听数据库中的xx表,观察控制台输出。