Springboot2(44)集成canal

2023-11-12

源码地址

springboot2教程系列

canal高可用部署安装和配置参数详解

前言

canal是阿里巴巴的基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql。

可以用于比如数据库数据变化的监听从而同步缓存(如Redis)数据等。

由于项目中基本都是使用的Spring-Boot,所以写了一个基于Spring-Boot的starter方便使用。

特点

使用方便。可以通过简单的配置就可以开始使用,当对某些操作感兴趣的时候可以通过注解或者注入接口实现的方式监听对应的事件。

实现

注解方式 (insert为例)

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@ListenPoint(eventType = CanalEntry.EventType.INSERT)
public @interface InsertListenPoint {

    /**
     * canal 指令
     * default for all
     *
     */
    @AliasFor(annotation = ListenPoint.class)
    String destination() default "";

    /**
     * 数据库实例
     *
     */
    @AliasFor(annotation = ListenPoint.class)
    String[] schema() default {};

    /**
     * 监听的表
     * default for all
     *
     */
    @AliasFor(annotation = ListenPoint.class)
    String[] table() default {};

}

创建canal客户端(连接实例)

private CanalConnector processInstanceEntry(Map.Entry<String, CanalProperties.Instance> instanceEntry) {
		//获取配置
		CanalProperties.Instance instance = instanceEntry.getValue();
		//声明连接
		CanalConnector connector;
		//是否是集群模式
		if (instance.isClusterEnabled()) {
			//zookeeper 连接集合
			for (String s : instance.getZookeeperAddress()) {
				String[] entry = s.split(":");
				if (entry.length != 2) {
					throw new CanalClientException("zookeeper 地址格式不正确,应该为 ip:port....:" + s);
				}
			}
			//若集群的话,使用 newClusterConnector 方法初始化
			connector = CanalConnectors.newClusterConnector(StringUtils.join(instance.getZookeeperAddress(), ","), instanceEntry.getKey(), instance.getUserName(), instance.getPassword());
		} else {
			//若不是集群的话,使用 newSingleConnector 初始化
			connector = CanalConnectors.newSingleConnector(new InetSocketAddress(instance.getHost(), instance.getPort()), instanceEntry.getKey(), instance.getUserName(), instance.getPassword());
		}
		//canal 连接
		connector.connect();
		if (!StringUtils.isEmpty(instance.getFilter())) {
			//canal 连接订阅,包含过滤规则
			connector.subscribe(instance.getFilter());
		} else {
			//canal 连接订阅,无过滤规则
			connector.subscribe();
		}
		
		//canal 连接反转
		connector.rollback();
		//返回 canal 连接
		return connector;
	}

获取数据

public void run() {
		//错误重试次数
		int errorCount = config.getRetryCount();
		//捕获信息的心跳时间
		final long interval = config.getAcquireInterval();
		//当前线程的名字
		final String threadName = Thread.currentThread().getName();
		//若线程正在进行
		while (running && !Thread.currentThread().isInterrupted()) {
			try {
				//获取消息
				Message message = connector.getWithoutAck(config.getBatchSize());
				//获取消息 ID
				long batchId = message.getId();
				//消息数
				int size = message.getEntries().size();
				//debug 模式打印消息数
				if (logger.isDebugEnabled()) {
					logger.debug("{}: 从 canal 服务器获取消息: >>>>> 数:{}", threadName, size);
				}
				//若是没有消息
				if (batchId == -1 || size == 0) {
					if (logger.isDebugEnabled()) {
						logger.debug("{}: 没有任何消息啊,我休息{}毫秒", threadName, interval);
					}
					//休息
					Thread.sleep(interval);
				} else {
					//处理消息
					distributeEvent(message);
				}
				//确认消息已被处理完
				connector.ack(batchId);
				//若是 debug模式
				if (logger.isDebugEnabled()) {
					logger.debug("{}: 确认消息已被消费,消息ID:{}", threadName, batchId);
				}
			} catch (CanalClientException e) {
				//每次错误,重试次数减一处理
				errorCount--;
				logger.error(threadName + ": 发生错误!! ", e);
				try {
					//等待时间
					Thread.sleep(interval);
				} catch (InterruptedException e1) {
					errorCount = 0;
				}
			} catch (InterruptedException e) {
				//线程中止处理
				errorCount = 0;
				connector.rollback();
			} finally {
				//若错误次数小于 0
				if (errorCount <= 0) {
					//停止 canal 客户端
					stop();
					logger.info("{}: canal 客户端已停止... ", Thread.currentThread().getName());
				}
			}
		}
		//停止 canal 客户端
		stop();
		logger.info("{}: canal 客户端已停止. ", Thread.currentThread().getName());
	}

把数据注入到相应的注解方法处理

/**
	 * 处理注解方式的 canal 监听器
	 *
	 * @param destination canal 指令
	 * @param schemaName  实例名称
	 * @param tableName   表名称
	 * @param rowChange   数据
	 * @return
	 */
	protected void distributeByAnnotation(String destination,
										  String schemaName,
										  String tableName,
										  CanalEntry.RowChange rowChange) {

		//对注解的监听器进行事件委托
		if (!CollectionUtils.isEmpty(annoListeners)) {
			annoListeners.forEach(point -> point
					.getInvokeMap()
					.entrySet()
					.stream()
					.filter(getAnnotationFilter(destination, schemaName, tableName, rowChange.getEventType()))
					.forEach(entry -> {
						Method method = entry.getKey();
						method.setAccessible(true);
						try {
							CanalMsg canalMsg = new CanalMsg();
							canalMsg.setDestination(destination);
							canalMsg.setSchemaName(schemaName);
							canalMsg.setTableName(tableName);

							Object[] args = getInvokeArgs(method, canalMsg, rowChange);
							method.invoke(point.getTarget(), args);
						} catch (Exception e) {
							
						}
					}));
		}
	}

注解监听的使用

/**
 * 注解方法测试
 *
 */
@CanalEventListener
public class MyAnnoEventListener {
	
	@InsertListenPoint
	public void onEventInsertData(CanalMsg canalMsg, CanalEntry.RowChange rowChange) {
		System.out.println("======================注解方式(新增数据操作)==========================");
		List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
		for (CanalEntry.RowData rowData : rowDatasList) {
			String sql = "use " + canalMsg.getSchemaName() + ";\n";
			StringBuffer colums = new StringBuffer();
			StringBuffer values = new StringBuffer();
			rowData.getAfterColumnsList().forEach((c) -> {
				colums.append(c.getName() + ",");
				values.append("'" + c.getValue() + "',");
			});
			
			
			sql += "INSERT INTO " + canalMsg.getTableName() + "(" + colums.substring(0, colums.length() - 1) + ") VALUES(" + values.substring(0, values.length() - 1) + ");";
			System.out.println(sql);
		}
		System.out.println("\n======================================================");
		
	}
	
	
}

使用方法

  • 注意:基于已经有了数据库环境和canal-server环境的前提。

  • 获取源码。将源码中的starter-canl项目打包引入或者通过maven安装到仓库。
    在自己的Spring-Boot项目中:
    加入配置

    #false时为单机模式,true时为zookeeper高可用模式
    canal.client.instances.example.clusterEnabled: true
    #canal.client.instances.example.host: 10.10.2.137
    #zookeeper 地址
    canal.client.instances.example.zookeeperAddress: 10.10.2.137:2181,10.10.2.138:2181,10.10.2.139:2181
    
    canal.client.instances.example.port: 11111
    canal.client.instances.example.batchSize: 1000
    canal.client.instances.example.acquireInterval: 1000
    canal.client.instances.example.retryCount: 20
    

    编写自己的Listener(参照canal-test中的MyEventListener)
    启动。—》OK!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Springboot2(44)集成canal 的相关文章

  • 使用canal连接kafka

    这篇主要是项目还原 xff0c 目的是记录构建时遇到的各种奇葩坑 xff0c 避免下次迷路 废话不多说 xff0c 直接上手 默认已安装docker xff0c docker compose xff0c nodejs xff0c yarn
  • springboot2.x The character [_] is never valid in a domain name

    访问springcloud工程 xff0c springboot版本为2 1 15 xff0c 使用域名访问接口报400 但是直接用ip不报错 发现是集成的tomcat版本不支持域名下划线 大约是8 5 31以后的版本不支持带下划线的域名
  • 数据同步之初识Canal

    git地址 xff1a 阿里巴巴Canal的Git地址 Canal基于日志增量订阅和消费的业务包括 xff1a 数据库镜像 数据库实时备份索引构建和实时维护 拆分异构索引 倒排索引 业务cache刷新 带业务逻辑的增量数据处理 Mysql
  • Java Canal binlog 日志监控

    参考地址 超详细的Canal入门 xff0c 看这篇就够了 xff01 java技术爱好者 R的博客 CSDN博客 canal 有需要的参考博客 xff01 xff01 xff01 xff01 xff01 xff01
  • Canal监控MySQL数据到Kafka详细步骤(jdk+zookeeper+kafka+canal+mysql)

    目录 一 前言二 环境准备三 安装JDK四 安装zookeeper五 安装Kafka六 安装MySQL七 安装canal服务端 xff08 canal监控mysql数据发送到kafka xff09 八 测试是否可以监控到数据九 结语 一 前
  • SpringBoot2.x学习(二):为属性注入配置文件中的值:@ConfigurationProperties注解的使用

    文章目录 一 64 ConfigurationProperties 简单介绍二 64 ConfigurationProperties 使用示范1 创建两个 javaBean2 在 SpringBoot 全局配置文件写入需要注入的值2 1 a
  • canal监听mysql实践

    canal监听mysql实践 canal是用java开发的基于数据库增量日志解析 xff0c 提供增量数据订阅 amp 消费的中间件 目前 xff0c canal主要支持了MySQL的binlog解析 xff0c 解析完成后才利用canal
  • Elasticsearch7.9集群部署,head插件,canal同步mysql数据到es,亲自测试,无坑

    Elasticsearch集群部署 1 服务器规划 10 4 7 11 node1 10 4 7 12 node2 10 4 7 13 node3 1 集群相关 一个运行中的 Elasticsearch 实例称为一个节点 xff0c 而集群
  • springboot2.x +kafka使用和源码分析九(KafkaListenerEndpointRegistry暂停启动容器)

    我们在运行中如果需要暂停启动容器时可以通过此类KafkaListenerEndpointRegistry来处理 KafkaListenerEndpointRegistry源码 只解释了核心代码 public class KafkaListe
  • prometheus+grafana监控mysql、canal服务器

    一 prometheus配置 1 prometheus安装 1 1官网下载安装包 xff1a https prometheus io download 1 2解压安装包 xff1a tar zxvf prometheus 2 6 1 lin
  • 不同业务场景下数据同步方案设计

    企业开发实践中通常需要提供数据搜索的功能 例如 电商系统中的商品搜索 订单搜索等 通常 搜索任务通常由搜索引擎担当 如Elasticsearch 而我们的原始数据为了安全性等问题通常存储在关系型数据库中 在搜索数据前 我们需要先将数据从关系
  • IDEA日志输出格式控制、文件记录日志

    目录 一 日志输出格式控制 二 文件记录日志 一 日志输出格式控制 了解一下控制台日志显示格式 如何设置 d 日期时间 m 消息 n 换行 p 日志级别 5p 日志级别宽度设为5个字母 因为最长的debug是5个字母 clr 5p 日志级别
  • SpringBoot2的异常处理、Aop(事务)、拦截器

    目录 一 异常处理 一 ControllerAdvice ExceptionHandler 注解处理异常 二 自定义 HandlerExceptionResolver 类处理异常 二 事务Aop的相关使用 主要说明事务的使用方式 一 事务的
  • Spring boot Mybatis type-aliases-package错误解决

    背景 最近在练习spring boot 2 7 0整合mybatis 2 1 3时 发现在使用mybatis type aliases package配置后 xml中的别名会出现爆红的现象 错误复现 配置文件中 使用mybatis type
  • Springboot2(27)集成netty实现反向代理(内网穿透)

    源码地址 springboot2教程系列 其它netty文件有博客 Springboot2 24 集成netty实现http服务 类似SpingMvc的contoller层实现 Springboot2 25 集成netty实现文件传输 Sp
  • 深入解析中间件之-Canal

    canal 阿里巴巴mysql数据库binlog的增量订阅 消费组件 MySQL binlog MySQL主从复制 mysql服务端修改配置并重启 1 2 3 4 5 6 7 8 9 10 11 12 vi etc my cnf mysql
  • Springboot2(44)集成canal

    源码地址 springboot2教程系列 canal高可用部署安装和配置参数详解 前言 canal是阿里巴巴的基于数据库增量日志解析 提供增量数据订阅 消费 目前主要支持了mysql 可以用于比如数据库数据变化的监听从而同步缓存 如Redi
  • 使用canal同步数据,踩坑排雷全过程

    1 mysql配置 1 检查binlog功能是否有开启 mysql gt show variables like log bin Variable name Value log bin OFF 1 row in set 0 00 sec 如
  • canal简介及canal部署、原理和使用介绍

    阿里canal简介及canal部署 原理和使用介绍 canal入门 什么是canal 阿里巴巴B2B公司 因为业务的特性 卖家主要集中在国内 买家主要集中在国外 所以衍生出了杭州和美国异地机房的需求 从2010年开始 阿里系公司开始逐步的尝
  • 实战:实现缓存和数据库一致性方案

    原创 微信公众号 阿Q说代码 欢迎分享 转载请保留出处 哈喽大家好 我是阿Q 最近不是正好在研究 canal 嘛 刚巧前两天看了一篇关于解决缓存与数据库一致性问题的文章 里边提到了一种解决方案是结合 canal 来操作的 所以阿Q就想趁热打

随机推荐

  • Go实现两个Goroutine通信

    实现两个goroutine通信 要求如下 实现pingpong效果 保证程序能任意时长执行 且收到ctrl c信号之后 全身而退 即保证各个goroutine完整退出 在第三个goroutine中 可随时查找前两个goroutine各自发送
  • ASN.1编码方式详解

    ASN ASN 1 Abstract Syntax Notation dot one 抽象记法1 数字1被ISO加在ASN的后边 是为了保持ASN的开放性 可以让以后功能更加强大的ASN被命名为ASN 2等 但至今也没有出现 描述了一种对数
  • react学习记录-状态提升

    文章目录 前端应用状态管理 状态提升 课后习题 前端应用状态管理 状态提升 我们在使用state存放数据的时候 应当倾向于把数据放在父组件上 这是为了在父组件下的兄弟组件都在依赖或者影响这个数据的时候 能够进行共享 避免让两个子孙组件分别取
  • c语言求斐波那契数列n项以及前n项和

    斐波那契数列的定义 f n f n 1 f n 2 n gt 1 n 0 f 0 0 n 1 f 1 1 自定义函数 递归写法 fbi 此函数求x项的斐波那契数 int fbi int x 斐波那契 if x lt 0 return 0 e
  • 支付场景的测试用例

    功能测试 1 是否可以正常支付 2 支付金钱的最大值 最小值 错误金额 3 支付中断继续支付是否成功 4 支付中断后结束支付 5 支付中断结束后再一次支付 兼容性 1 pc端 手机端 平板电脑 安全性 1 姓名会不会显示给对方 2 对方账户
  • 信息系统项目管理师第七章-----项目管理一般知识

    考察知识点 项目管理概述 项目的属性 项目管理的特征 项目管理的知识体系 项目生命周期 项目生命周期的特点 阶段的划分 产品生命周期与项目生命周期的关系 项目的组织方式 职能型 项目型 矩阵式 总结 相关术语解析 项目干系人stakehol
  • 光猫改桥接,手机如何访问光猫后台,只要一根网线就可以了

    光猫改桥接之后一般有四种方法可以访问光猫后台 一 电脑用网线直连光猫的LAN口 然后设置好对应IP 二 光猫自带WiFi功能的 就直接开启WiFi 连接对应的WiFi 三 路由器的WAN口支持双模式 即可以进行PPPOE拨号 同时支持DHC
  • django根据已有数据库表生成model类。Django生成迁移文件,将迁移文件迁移到数据库

    根据已有的数据库表生成Django框架的APP中的models py代码 生成模型文件命令 python manage py inspectdb 将模型导入APP python manage py inspectdb gt app mode
  • 华三vlan配置

    基于MAC地址划分vlan 配置思路 创建VLAN 100 VLAN 200 配置Device A和Device C的上行端口为Trunk端口 并允许VLAN 100和VLAN 200的报文通过 配置 Device B 的下行端口为Trun
  • FindWinow

    1 MFC中的Caption属性就是windowName属性
  • 初始TypeScript

    来自刚接触TypeScript的小菜鸡 养生青年阿贺 一 什么是TypeScript 1 TypeScript 是一款编程式语言 微软开发的 2 typescript式Javascript的超集 遵循了最新的ES6 ES5的规范 types
  • join后 on , and ,where 实例测试

    目录 一 join实现方式 一 原理 二 join 后用 on and 还是 where 区别 一 原理 二 on where实例 1 创建表和数据 2 测试语句和结果 三 on and实例 1 添加数据 2 测试语句和结果 一 join实
  • 时序预测

    时序预测 MATLAB实现NAR非线性自回归模型时间序列预测 目录 时序预测 MATLAB实现NAR非线性自回归模型时间序列预测 效果一览 基本介绍 程序设计 参考资料 效果一览
  • Linux创建100个用户并设置密码

    创建100个用户并且设置123456为密码 bin bash for i in 1 100 do useradd a i echo 123456 passwd stdin a i done 删除100个用户 要先进root用户 bin sh
  • 通过输出流将文档下载到本地

    导出Excel文档到本地 param path 文件将要保存的目录 ApiOperation value 下载Excel模板 RequestMapping value downloadDepartment xls method Reques
  • C++子类和基类的相互转换

    C 子类和基类的相互转换 1 基类对象指针可以隐式转换为子类对象指针 2 子类对象转换为基类对象 基类必须存在虚函数表 不然访问报错 include
  • QT信号与槽的连接方式

    一 Qt AutoConnectionQt AutoConnection表示系统自动选择相应的连接方式 如果信号与槽在同一线程 就采用Qt DirectConnection 如果信号与槽不在同一线程 将采用Qt QueuedConnecti
  • 优秀的NAS不光只有群晖,看看威联通在安全性上如何K掉群晖

    声明 此贴转载纳斯网 感谢kala版主的呕心评测 让大家NAS有了更深入的了解 有了更多的选择 第一 为什么选择nas 其实nas对于我们来讲 第一大用处是什么 就是安全性 我想很多人都想把nas做成家里的数据中心吧 对应数据中心当然是希望
  • [GXYCTF2019]BabyUpload

    GXYCTF2019 BabyUpload 0x01漏洞类型 文件上传 经过测试 发现存在以下waf 不全 1 jpg不能过大 2 检查内容
  • Springboot2(44)集成canal

    源码地址 springboot2教程系列 canal高可用部署安装和配置参数详解 前言 canal是阿里巴巴的基于数据库增量日志解析 提供增量数据订阅 消费 目前主要支持了mysql 可以用于比如数据库数据变化的监听从而同步缓存 如Redi