搭建: canal部署与实例运行

2023-11-01

1、准备:

github:https://github.com/alibaba/canal

里面有包括canal的文档,server端 client端的 例子 源码包等等。

2、canal概述:

canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。ps. 目前内部使用的同步,已经支持mysql5.x和oracle部分版本的日志解析


基于日志增量订阅&消费支持的业务:


数据库镜像
数据库实时备份
多级索引 (卖家和买家各自分库索引)
search build
业务cache刷新
价格变化等重要业务消息

keyword:数据库同步,增量订阅&消费。

3、canal工作原理:



从上层来看,复制分成三步:


master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
slave将master的binary log events拷贝到它的中继日志(relay log);
slave重做中继日志中的事件,将改变反映它自己的数据。

4、部署canal:

部署canal-server:

(1)开启mysql的binlog功能,并配置binlog模式为row。

在my.cnf 加入如下:

[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

(2)在mysql中 配置canal数据库管理用户,配置相应权限(repication权限)

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
(3)下载canal https://github.com/alibaba/canal/releases

解压到相应文件夹

tar -zxvf canal canal
canal 文件目录结构

drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs

修改配置 instance.properties

vim canal/conf/example/instance.properties

#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234

position info,需要改成自己的数据库信息

canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =

username/password,需要改成自己的数据库信息

canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = canal_test
canal.instance.connectionCharset = UTF-8

table regex

canal.instance.filter.regex = .\…

#################################################


然后cd到bin目录 启动和停止canal-server

启动

./startup.sh
停止

./stop.sh

验证启动状态,查看log文件

vim canal/log/canal/canal.log

2014-07-18 10:21:08.525 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2014-07-18 10:21:08.609 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.12.109.201:11111]
2014-07-18 10:21:09.037 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

上述日志信息显示启动canal成功

运行canal-client实例:

(1)建立实例maven工程

mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample

(2)添加pom依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.12</version>
</dependency>
(3)更新依赖 mvn install

(4)canal-client.java 实例代码

/**
 * Created by hp on 14-7-17.
 */
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
import org.jetbrains.annotations.NotNull;
public class ClientSample {
    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmtryCount = 1200;
            while (emptyCount < totalEmtryCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(@NotNull List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(@NotNull List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

(5)运行java实例

启动后看到控制端信息:

empty count : 1
empty count : 2
empty count : 3
empty count : 4

(6)触发数据库变更

create table test (
uid int (4) primary key not null auto_increment,
name varchar(10) not null);

insert into test (name) values('10');

(7)client 抓取mysql信息:

================> binlog[mysql-bin.000016:3281] , name[canal_test,test] , eventType : INSERT
uid : 7    update=false
name : 10    update=false
empty count : 1
empty count : 2

5、部署过程中产生问题:

(1)启动失败,log日志中地址正在使用

1、11111端口正在被占用 可以用 ls -i:11111 查看监听进程谁占用端口 或者 用 ps -ef | grep 11111 查看哪个进程占用端口号 然后 kill -9 进程号 杀掉占用进程

2、可以编辑 canal/conf/canal.properties 中的端口号 ,改为不占用的端口

(2)canal无法抓取mysql触发数据库改变的信息

1、检查mysql是否打开binlog写入功能 检查binlog 是否为行模式。

show variables like "binlog_format"
2、检查my.cnf 和 instance.properties 等配置文件填写信息是否正确。

3、检查client 代码 调试实例代码

4、版本兼容问题,canal 1.8 换成 canal 1.7 继续测试

5、查看所有日志文件 分析日志











本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

搭建: canal部署与实例运行 的相关文章

  • 由于缺少 PHP 扩展,CakePHP 3 无法连接到数据库

    我正在尝试使用 WT NMP 安装 cakePHP 3 0 0 但收到以下消息 CakePHP 无法连接到数据库 由于以下原因无法使用数据库驱动程序 Cake Database Driver Mysql 缺少 PHP 扩展或未满足的依赖项
  • 限制分页页数

    objConnect mysql connect localhost root or die mysql error objDB mysql select db Test strSQL SELECT FROM UserAddedRecord
  • Errno 121,写入或更新时出现重复密钥?

    SET OLD UNIQUE CHECKS UNIQUE CHECKS UNIQUE CHECKS 0 SET OLD FOREIGN KEY CHECKS FOREIGN KEY CHECKS FOREIGN KEY CHECKS 0 S
  • 如何使用php在mysql数据库中添加照片? [关闭]

    这个问题不太可能对任何未来的访客有帮助 它只与一个较小的地理区域 一个特定的时间点或一个非常狭窄的情况相关 通常不适用于全世界的互联网受众 为了帮助使这个问题更广泛地适用 访问帮助中心 help reopen questions 我对 PH
  • 连接 3 三张表

    我有这个图表应该可以解释我的情况 我需要一些关于连接 3 个表的帮助 我不知道如何做这种事情 因此 我可以通过执行以下操作来经历一段检索记录的 while 循环 img src alt Album AlbumID 使用内部联接 http w
  • PHP:如何检查总数。 URL 中的参数?

    我正在使用 REQUEST 检索参数 有没有办法找到总数 URL 中的参数 而不是检索每个参数然后进行计数 这将为您提供总数 分隔的 URL 查询参数 count explode SERVER QUERY STRING 如果您只想要唯一的参
  • 数据读取过程中遇到致命错误

    我正在进行定期更新表扫描 Using connect1 As New MySqlConnection ConnectLocalhost serverString connect1 Open Dim cmd New MySqlCommand
  • 您可以使用 MySQL 查询来完整创建数据库的副本吗

    我有一个包含 5 个表的 MySQL 数据库的实时版本和一个测试版本 我不断使用 phpMyAdmin 将实时版本中的每个表复制到测试版本 有谁有mysql查询语句来制作数据库的完整副本吗 查询字符串需要考虑结构 数据 自动增量值以及与需要
  • 如何在 Laravel 5 中使用 Orchestral/Tenanti 构建具有多个数据库的多租户应用程序?

    我正在尝试使用 Laravel 5 构建和应用程序 它应该是使用多个数据库的多租户数据库架构 我的雇主出于安全目的要求这样做 我尝试手动管理主数据库迁移和租户迁移 但失败了 所以我决定借助 Laravel 特定包的帮助 这应该是我所需要的
  • 主键默认可以为NULL吗?为什么这样描述呢?

    我有一张桌子 当我describe这是 mysql gt DESC my table Field Type Null Key Default Extra contact id int 11 NO PRI NULL auto incremen
  • 从 CSV 到 MySQL 的换行问题

    我正在将 csv 文件导入 MySQL 除了文件中的换行符之外 一切正常 我的 csv 行之一如下所示 42 E A R Classic Earplugs ear images ear classic jpg 5 Proven size s
  • MySQL 5:我的 GROUP BY 字段的顺序重要吗?

    Peeps 我的 MySQL 查询中有一些聚合 计算字段 我的 GROUP BY 子句是动态生成的 具体取决于用户在 Web 表单中选择的选项 很好奇 GROUP BY 子句中列出的字段顺序是否会对计算产生任何影响 例如 SUM AVERA
  • 用于全文搜索和 2 亿多条记录的数据库

    我即将创建一个包含至少 2 亿个条目的庞大数据库 数据库需要可使用全文进行搜索 并且速度应该很快 我的数据库从许多不同的数据源获取数据 我需要定期导入新的或更新的数据 将我的所有数据存储在像 mysql 这样的关系数据库中 然后创建一个 n
  • PDO获取最后插入的ID

    我有一个查询 我想获取插入的最后一个 ID 字段ID是主键并且自动递增 我知道我必须使用这个声明 LAST INSERT ID 该语句适用于如下查询 query INSERT INTO cell place ID VALUES LAST I
  • MySQL - 查找接近的匹配项

    MySQL 有没有办法在文本字段中找到紧密匹配的内容 说找到 email protected cdn cgi l email protection当搜索时 email protected cdn cgi l email protection
  • 在 Python 中,如果我有 unix 时间戳,如何将其插入 MySQL 日期时间字段?

    我正在使用 Python MySQLDB 我想将其插入 Mysql 中的 DATETIME 字段 我该如何使用cursor execute 来做到这一点 要将 UNIX 时间戳转换为 Python 日期时间对象 请使用datetime fr
  • DataTables 第 2 页的分页未调用放大弹出窗口

    所以我有这个启用分页的数据表 我编码了一种方式 以便用户可以编辑表的行 当用户调用它在放大弹出窗口中打开的编辑页面时 它在第 1 页 从第 2 页起都运行良好 DataTable 及其前面停止调用 Magnific Popup 我只是不明白
  • 如何修复“未知变量'sql-mode = ANSI'”?

    使用 MySQL 5 5 27 在 my cnf 中我有 mysql sql mode ANSI 启动 mysql 控制台后我得到 unknown variable sql mode TRADITIONAL MySQL 文档服务器 SQL
  • Monkeyrunner/jython 中未找到 JDBC 驱动程序错误

    我需要在中插入一些东西DB 我在用着JDBC as a connector jython the script mysql数据库和脚本正在运行CentOS 我的代码看起来像这样 from com android monkeyrunner i
  • mysql 查询选择当月的所有行?

    我有一个名为 startdate 的日期时间类型的列 我必须获取当前月份的开始日期和结束日期之间的所有行 即从 1 11 2014 到 30 11 2014 select from your table where year curdate

随机推荐

  • 写一个函数求100至200之间的素数及其个数

  • 2023 年全球威胁趋势预测:新兴威胁汹涌来袭

    总体而言 未来网络犯罪及其攻击手段将持续快速扩展 全球网络安全领导者Fortinet Nasdaq FTNT 近日发布 2023年及未来网络威胁趋势预测报告 该报告由Fortinet全球威胁情报和研究团队 FortiGuard Labs 制
  • STM32F103 UART4串口使用DMA接收不定长数据和DMA中断发送

    一 前言 使用DMA通信的好处是 不占用单片机资源 不像普通串口中断 发送一个字节触发一次中断 发送100个字节触发100次中断 接收一个字节触发一次中断 接收200个字节触发200次中断 数据接收完毕触发一次DMA中断 发送数据完毕触发一
  • 预备研究主题:给出经纬度范围(矩形或圆形)快速从大批量经纬度中按批量取出信息点

    预备研究主题 给出经纬度范围 矩形或圆形 快速从大批量经纬度中按批量取出信息点 目前从数据库中取的方法 已可以解决 但信息数量有限 数据一多就显得较慢 如果生成文本文件根据索引去查找应该可以解决 最主要的是数据切分方法和快速索引查找方法 可
  • 写给信息学竞赛选手的趣味编程 基于DevC++的SDL2图形程序设计(-)环境搭建篇

    系列文章目录 前言 大部分玩信息学竞赛的朋友使用DevCPP集成开发环境 IDE 进行程序设计 这个IDE操作简单 实现功能也比较简单 应对类似控制台输入输出的各种竞赛算法实现绰绰有余 但对于动画 游戏等图形程序设计实现略显单薄 而很多同学
  • thymeleaf 引入js css

    http www cnblogs com suncj p 4028768 html 我一般 都是
  • Nginx安装部署学习

    什么是Nginx Nginx engine x 是一个高性能的HTTP和反向代理web服务器 同时也提供了IMAP POP3 SMTP服务 其特点是占有内存少 并发能力强 协议 BSD like Nginx 安装 1 部署执行命令 apt
  • phpstorm部署sftp的root path跟mappings的问题

    在部署phpstorm的sftp时要注意root path的设置 例如如果想要将本地的 var www目录映射到服务器上的 var www目录 那么如果你的root path为 var www 然后mappings里面的server pat
  • C/C++基本数据类型

    学了 C 然后 C 然后 MFC Windows 然后是 C 其中数据类型很多 由基本类型衍生的 typedef 类型也 N 多 熟知基本数据类型是我们正确表达实际问题中各种数据的前提 因此我分类总结了一下 C C Windows C 基本
  • 信管备考脑图

    参考网址 https zhuanlan zhihu com p 97968103 参考教材 1 信息化与信息系统
  • 中职网络安全2022年赛题Windows(SMB服务漏洞结合NTLM中继进行渗透测试)解析

    文章目录 目录 一 赛题 二 解析 一 赛题 二 解析 任务实施 P066 综合渗透测试 使用SMB服务漏洞结合NTLM中继进行渗透测试 实验环境说明 渗透机 p10 kali 6 1 Kali Linux 用户名 root 密码 toor
  • 使用git强行切换分支

    两行代码轻松搞定 git status 一下 一堆文件为暂存的 git clean dfx 可以直接全部干掉它门 然后git checkout v2 0 切换到2 0分支去 git clean dfx public vue static j
  • Python 导入模块的3种方式

    回到顶部 一 定义 模块就是用一堆的代码实现了一些功能的代码的集合 通常一个或者多个函数写在一个 py文件里 而如果有些功能实现起来很复杂 那么就需要创建n个 py文件 这n个 py文件的集合就是模块 如果不懂可以先看下面这篇博文 http
  • Markdown基础教程

    1 标题 Markdown支持6级标题 一级标题 二级标题 三级标题 四级标题 五级标题 六级标题 标题的效果 一级标题 二级标题 三级标题 四级标题 五级标题 六级标题 2 段落及区块引用 Markdown提供了一个特殊符号 gt 用于段
  • 网红表白弹窗

    如何使用Python表白 先看效果图 一 具体步骤 1 首先我们要安装tkinter库 pip install tkinter 等待安装完成即可 2 使用步骤引入库 from tkinter import from tkinter impo
  • 配置MAC刷新ARP功能

    在以太网中 MAC地址表项用于指导设备进行二层数据转发 ARP表项通过IP地址和MAC地址的映射指导设备进行不同网段间的通信 MAC地址表项的出接口通过报文触发刷新的 ARP表项的出接口是在老化时间到后通过老化探测进行刷新的 这样就可能会出
  • 显著性检验【t-test、方差分析、ks检验】

    显著性检验 t test 方差分析 ks检验 0 目录 1显著性检验基本定义 what 2 使用显著性检验的意义 why 3 显著性检验的具体操作流程 how 1 显著性检验基本定义 统计假设检验 Statistical hypothesi
  • vue 按需引入 element-ui 组件

    新建 plugins element ui js 文件 在里面写入需要引入的组件 import Vue from vue import Button Dialog Form FormItem Loading Message Paginati
  • java char判断相等_java面试题-基础

    1 一个 java 源文件中是否可以包括多个类 不是内部类 有什么限制 可以有多个类 但只能有一个public的类 并且public的类名必须与文件名相一致 2 Java有没有goto java中的保留字 现在没有在java中使用 3 说说
  • 搭建: canal部署与实例运行

    1 准备 github https github com alibaba canal 里面有包括canal的文档 server端 client端的 例子 源码包等等 2 canal概述 canal是应阿里巴巴存在杭州和美国的双机房部署 存在