实时数据同步工具<Maxwell 操作案例>

2023-05-16

文章目录

  • 案例一:监控MySQL中的数据并输出到控制台
  • 案例二:Maxwell监控mysql的数据输出到kafka
  • 案例三:监控MySQL指定表的数据并输出到kafka

案例一:监控MySQL中的数据并输出到控制台

  1. 运行Maxwell来监控mysql数据的更新

    [root@bigdata01 maxwell-1.29.2]# bin/maxwell --user='maxwell' --password='123456' --host='bigdata01' --producer=stdout
    
  2. 向mysql的test_maxwell数据库中创建表test

    mysql> CREATE TABLE test(`name` VARCHAR(30),sex VARCHAR(10),score INT);
    

    Maxwell控制台的输出内容如下:

    11:20:31,509 INFO  AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000002:67898], lastHeartbeat=1664594424396] after applying 
    "create table test(
    `name` varchar(30),
    sex varchar(10),
    score int)" 
    to test_maxwell, new schema id is 2
    
  3. 向表test中插入一条数据

    mysql> INSERT INTO test VALUES('ZhangSan','male',28); 
    

    向test表中插入数据时在maxwell控制台输出的数据如下:

    {
    	"database":"test_maxwell", //数据库名字
    	"table":"test", //表名
    	"type":"insert", //操作类型
    	"ts":1664594529, //操作时间
    	"xid":1403, //操作id
    	"commit":true, //提交状态
    	"data":   //数据
    		{"name":"ZhangSan",
    		"sex":"male",
    		"score":28
    		}
    }
    

    当插入2条数据时Maxwell控制台的输出内容:
    在这里插入图片描述
    说明:当我们对表test插入两条数据,控制台就会有两个json日志,说明Maxwell是以数据行为为单位进行进行日志的采集

  4. 当我们修改test表中的一条数据时,

    mysql> UPDATE test SET score=26 WHERE  NAME='lisi';
    

    Maxwell控制台的输出如下:

    {
    	"database":"test_maxwell",
    	"table":"test",
    	"type":"update",  //对数据的操作类型
    	"ts":1664594697, //操作的时间
    	"xid":1855,   //操作id
    	"commit":true,  //任务提交状态true表示成功
    	"data":  //修改后的数据
    		{
    		"name":"lisi",
    		"sex":"male",
    		"score":26
    		},
    	"old":    //原来数据
    	{
    		"score":19
    	}
    }
    
  5. 删除test表中一条数据

    mysql> delete from test where name='lisi';
    

    maxwell 的控制台输出:

    {
        "database":"test_maxwell",
        "table":"test",
        "type":"delete",
        "ts":1664595306,
        "xid":3458,
        "commit":true,
        "data":
        {
            "name":"lisi",
            "sex":"male",
            "score":26
        }
    }
    

案例二:Maxwell监控mysql的数据输出到kafka

  1. 普通输出到kafka
    (1)此案例需要先启动zookeeper和kafka

    启动zk,启动kafka
    [root@bigdata01 maxwell-1.29.2]# xcall.sh jps
    ---------------------bigdata01----------------
    3648 Jps
    2947 QuorumPeerMain
    3331 Kafka
    ---------------------bigdata02----------------
    2378 QuorumPeerMain
    2939 Jps
    2764 Kafka
    ---------------------bigdata03----------------
    2965 Jps
    2760 Kafka
    2381 QuorumPeerMain
    

    (2)启动Maxwell监控binlog

    bin/maxwell --user='maxwell' --password='123456' --host='hadoop102'--producer=kafka --kafka.bootstrap.servers=bigdata01:9092 --kafka_topic=maxwell
    
    [root@bigdata01 ~]# xcall.sh jps
    	---------------------bigdata01----------------
    	4688 Maxwell
    	2947 QuorumPeerMain
    	3331 Kafka
    	4781 Jps
    	---------------------bigdata02----------------
    

    (3)打开 kafka的控制台的消费者消费maxwell主题

    bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic maxwell
    
    [root@bigdata01 ~]# jps
    4688 Maxwell
    2947 QuorumPeerMain
    3331 Kafka
    5195 Jps
    4812 ConsoleConsumer
    

    (4)对test_maxwell库中的test表进行操作
    插入一条数据

    mysql> INSERT INTO test VALUES('sisi','famale',100);
    

    (5)kafka消费者端的输出

    {
    	"database":"test_maxwell",
    	"table":"test",
    	"type":"insert",
    	"ts":1664595971,
    	"xid":4331,
    	"commit":true,
    	"data":{
    		"name":"wangwu",
    		"sex":"male",
    		"score":80
    	}
    }
    

    注意:kafka 消费者端输出结果与我们使用maxwell监控mysql时输出到maxwell控制台的json数据:格式相同

  2. Maxwell配置监控实现kafka分区控制

    在公司生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这
    些数据发往 kafka 的一个主题 Topic,并且这个主题也肯定是多分区的,为了提高并发度。
    那么如何控制这些数据的分区问题,就变得至关重要,实现步骤如下:

    (1)创建一个具有三个分区的topic

    [root@bigdata01 kafka]# bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 3 --replication-factor 3 --topic maxwell3
    

    (2)修改Maxwell的配置文件config.properties
    生产者环境设置为kafka,本文选择的分区模式是按数据库进行分区,控制分区模式包括 库名,表名,列名,主键

    producer=kafka
    kafka.bootstrap.servers=bigdata01:9092
    
    # mysql login info
    host=bigdata01
    user=maxwell
    password=123456
    
    kafka_topic=maxwell3
    producer_partition_by=database
    

    (3) 使用Maxwell的配置文件config.properties启动Maxwell进程

    [root@bigdata01 maxwell-1.29.2]# bin/maxwell --config ./config.properties
    

    (4)向Maxwell监控的数据库test_maxwell和test_maxwell3中分别插入一条数据,我们观察kafka的变化

    maxwell监控哪些数据库可以再my.cnf文件中进行配置
    [mysqld]
    server_id=1
    log-bin=mysql-bin
    binlog_format=row
    binlog-do-db=test_maxwell
    binlog-do-db=test_maxwell2

    • 向test_maxwell库中的test表插入一条数据

    INSERT INTO test VALUES(‘wangwu’,‘male’,80);

    在kafka tool中查看kafka的状态
    在这里插入图片描述

    • 向test_maxwell2库中的test表插入一条数据
    mysql> INSERT INTO test VALUES('sisi','famale',100);
    

    在kafka tool查看kafka状态
    在这里插入图片描述

    注意:我们可以看见对test_maxwell库的操作传输到了kafka中主题maxwell3的1号分区,而对test_maxwell2中表的操作传输到了2号分区,证实了我们定制的安装数据库进行分区的规则。

案例三:监控MySQL指定表的数据并输出到kafka

(1)运行zookeeper和kafka

[root@bigdata01 maxwell-1.29.2]# xcall.sh jps
	---------------------bigdata01----------------
	3648 Jps
	2947 QuorumPeerMain
	3331 Kafka
	---------------------bigdata02----------------
	2378 QuorumPeerMain
	2939 Jps
	2764 Kafka
	---------------------bigdata03----------------
	2965 Jps
	2760 Kafka
	2381 QuorumPeerMain

(2)通过配置config.properties,定制化启动Maxwell并指定监控test_maxwell下的test表,并将数据传给kafka的maxwell3主题

  • 配置config.properties文件中的filter参数,指定监控某个表
filter = exclude: *.*, include: test_maxwell.test

注意:还可以设置 include: test_maxwell.*,通过此种方式来监控 mysql 某个库的所有
表,也就是说过滤整个库。读者可以自行测试。

  • 启动Maxwell,在Maxwell的目录下
    bin/maxwell --config ./config.properties
    

(3)向test表中插入一条数据,看kafka tool的变化和kafka消费者端控制台的输出情况

mysql> INSERT INTO test VALUES('lili','famale',100);
  • 在kafka tool查看
    在这里插入图片描述

  • 在kafka消费者端控制台查看
    在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

实时数据同步工具<Maxwell 操作案例> 的相关文章

  • Canal AdminGuide

    背景 先前开源了一个开源项目 xff1a 阿里巴巴开源项目 基于mysql数据库binlog的增量订阅 amp 消费 本文主要是介绍一下如何部署 amp 使用 环境要求 1 操作系统 a 纯java开发 xff0c windows linu
  • 【Android病毒分析报告】 - Usbcleaver

    本文章由Jack Jia编写 xff0c 转载请注明出处 文章链接 xff1a http blog csdn net jiazhijun article details 9179749 作者 xff1a Jack Jia 邮箱 xff1a
  • 【浅墨著作】《逐梦旅程:Windows游戏编程之从零开始》勘误&配套源代码下载...

    I 39 m back 恩 xff0c 几个月不见 xff0c 大家还好吗 xff1f 这段时间真的好多童鞋在博客里留言说或者发邮件说浅墨你回来继续更新博客吧 woxiangnifrr童鞋说每天都在来浅墨的博客逛一下看有没有更新 xff0c
  • Anychart图表系列九之Dashboard

    Dashboard中文释义为仪表盘 控制台 xff0c 这个用词很生动 xff0c 它指的是多个图组合在一起的效果 xff0c 就像模具工作中的操作台一样由多种图形仪器组成 在项目中 xff0c 特别是高管平台 xff0c 领导看重的是多套
  • 若依前后端分离版本启动教程

    第一步 xff1a 搜索若依官网 第二步 xff1a 选择前后端分离版本 第三步 复制链接 第四步 随便在自己的电脑找找一个文件夹 xff0c 执行git命令 xff08 git安装使用教程这里不做说明 xff09 回车 96 96 96
  • char指针与char数组的区别(非常详细!)

    首先看指针 char p 61 helloworld 在这里p 是一个变量 其类型为指针类型 并且指向一个字符串 字符串内容为 helloworld 如果要访问p 2 的话 就需要先从p 中取出地址 该地址为 helloworld 的首地址
  • 大学六年我读过的一些书

    明天就是2014年 xff0c 今晚是13年最后一天 xff0c 我想不少人会写一下自己13年的一些心得 xff0c 在上一篇日志中 xff0c 该总结的已经总结 xff0c 想了解更多的已经加我个人微信了 我在想 xff0c 大学其实很多
  • Ubuntu硬盘分区和分区文件类型及Linux分区基础知识

    Ubuntu硬盘分区的常用系统 xff0c 于是我学习研究了Ubuntu硬盘分区 xff0c 在这里对大家详细介绍下Ubuntu硬盘分区应用 xff0c 希望对大家有用Ubuntu硬盘分区包含了非常好的翻译和容易使用的架构 如果你准备在硬盘
  • 欢迎使用CSDN-markdown编辑器

    我和谁都不争 xff0c 和谁争 xff0c 我都不屑 xff1b 我爱大自然 xff0c 其次就是艺术 xff1b 我双手烤着生命之火取暖 xff0c 火萎了 xff0c 我也准备走了 本Markdown编辑器使用StackEdit修改而
  • 从高考到程序员--------你到底在追求着什么?------串烧似的文章

    随便写写 xff0c 万事开头难 xff0c 不知道应该怎么写出来 xff0c 因为只有自己经历的才能是最美好的回忆 xff0c 不知道这个编辑器写的效果如何 xff0c 凑合着看吧 xff0c 从高考到程序员征文看了几篇 xff0c 决定
  • AI以来----已不再是未来

    在过去的2016年 xff0c 可以说AI的核心发展一直围绕着开源和生态 国际巨头纷纷涉足深度学习平台 xff0c 让原本是高高在上的人工智能 xff0c 迅速落地细化到了行业的各个领域 随着AI技术正在为全球开发者所用 xff0c 极大的
  • 即兴嘻哈

    kao ha wowo 深夜不能寐 xff0c 那个翻来覆也不能把觉睡 一首即兴style说说ni为何半夜还不睡 xff0c 一线城市压力大来人总疲惫 xff0c 为了买房买车拼命都在向前奔 年轻人出来打拼为何这么累 xff0c 多少人为了
  • 裸辞3个月扛不住后,随便接了offer更惨!

    最近发现年底找工作的人不少 xff0c 部门里就2个hc xff0c 一周能收2000 43 简历 xff0c 这比例有点 过分 了 虽说大部分是年底先看看机会试试水 xff0c 准备年后冲击的 xff0c 但看简历里也有不少中间裸辞的 x
  • Docker常用命令-自用

    Docker常用命令 自用 1 镜像仓库 1 1 登陆 登出 登陆 默认Docker Hub docker login u span class token punctuation span username span class toke
  • 程序猿的浪漫之二进制表白篇

    那天情人节 xff0c 我给她发了一串数字 xff08 01001001 00100000 01101100 01101111 01110110 01100101 00100000 01111001 01101111 01110101 xf
  • Dockerfile使用教程

    第一 创建一个spring boot项目 第二 创建 xff1a Dockerfile文件 xff08 在项目根目录下 xff09 第三 复制粘贴 FROM java 8 设置基础镜像 FROM openjdk 8 jdk alpine 指
  • C++ 用socket封装成http

    HttpSocket h interface for the CHttpSocket class if defined AFX HTTPSOCKET H F49A8F82 A933 41A8 AF47 68FBCAC4ADA6 INCLUD
  • echarts更换主题

    前言 本篇文章基于上一篇文章 xff1a vue工程整合echarts xff0c 因此这里主要讲的是主题是如何配置的 xff0c 其他代码在这里不再赘述 官网中已经说明了echarts除了有默认的主题外 xff0c 并且内置了dark主题
  • Java final修饰符详解

    final 在 Java 中的意思是最终 xff0c 也可以称为完结器 xff0c 表示对象是最终形态的 xff0c 不可改变的意思 final 应用于类 方法和变量时意义是不同的 xff0c 但本质是一样的 xff0c 都表示不可改变 x
  • Python Requests库安装和使用

    Python 提供了多个用来编写爬虫程序的库 xff0c 除了前面已经介绍的 urllib 库之外 xff0c 还有一个很重的 Requests 库 xff0c 这个库的宗旨是 让 HTTP 服务于人类 Requests 是 Python

随机推荐

  • C语言 十进制转十六进制

    问题描述 十六进制数是在程序设计时经常要使用到的一种整数的表示方式 它有0 1 2 3 4 5 6 7 8 9 A B C D E F共16个符号 xff0c 分别表示十进制数的0至15 十六进制的计数方法是满16进1 xff0c 所以十进
  • Pandas知识点超全总结

    Pandas知识点超全总结 一 数据结构1 Series1 创建2 切片 修改3 其他属性 2 DataFrame1 创建2 切片3 增加 修改4 删除5 查看 二 读写数据1 读数据1 excel文件2 csv文件3 sql文件 2 写数
  • socket通信小结

    1 网络中的进程之间如何进行通信 区别于本地的进程间通信 xff0c 我们首要解决的问题是如何唯一标识一个进程 xff0c 否则通信无从谈起 xff01 在本地可以通过进程PID来唯一标识一个进程 xff0c 但是在网络中这是行不通的 其实
  • eclipse alt+/代码智能提示总是报错:problems during content assist

    解决办法如下图 xff1a 依次点击红框标记的地方即可解决问题
  • linux C 遍历目录及其子目录 opendir -> readdir -> closedir

    在 linux 下遍历某一目录下内容 LINUX 下历遍目录的方法一般是这样的 xff1a 打开目录 gt 读取 gt 关闭目录 相关函数是 opendir gt readdir gt closedir xff0c 其原型如下 xff1a
  • 最全面的 linux 信号量解析

    一 xff0e 什么是信号量 信号量的使用主要是用来保护共享资源 xff0c 使得资源在一个时刻只有一个进程 xff08 线程 xff09 所拥有 信号量的值为正的时候 xff0c 说明它空闲 所测试的线程可以锁定而使用它 若为 0 xff
  • vue3学习一:let和const

    在函数的内部 xff0c 定义name变量 xff0c 当i等于false时 xff0c 按照Java语言 xff0c else里是拿不到name的 xff0c 并且会显示报错 xff0c 但是却不会报错 xff0c 这是因为javascr
  • PHP 中最全的设计模式(23种)

    PhpDesignPatterns PHP 中的设计模式 一 Introduction 介绍 设计模式 xff1a 提供了一种广泛的可重用的方式来解决我们日常编程中常常遇见的问题 设计模式并不一定就是一个类库或者第三方框架 xff0c 它们
  • 异常详细信息: System.NullReferenceException: 未将对象引用设置到对象的实例。

    我遇到的出现这种错误的原因一般是以下几种情况 xff1a 1 在绑定数据控件的时候 xff0c 建立数据库连接 OleDbConnection conn 61 new OleDbConnection 34 provider 61 micro
  • 连接不上Github,网络超时的检查和解决办法

    先附上图片吧 连接不上github但是其它网站都是正常上网 解决办法 win 43 r打开运行输入cmd 再命令行里输入 ping github com 看看返回值 我的全是请求超时 xff0c 发送的数据包全丢 xff08 我先ping了
  • 2018校招笔试真题汇总

    2018校招笔试真题汇总 最近看好多牛油贡献了很多考试的真题 xff0c 我把他们汇总在一起给到大家 xff0c 也感谢这些牛油的贡献 xff0c 只要进这个汇总贴的 xff0c 你们都将每人获得一份牛客送出的礼物一份 科大讯飞 xff1a
  • 【2】Docker的启动与停止

    1 xff09 启动 docker systemctl start docker 2 xff09 查看 docker 状态 systemctl status docker 3 xff09 查看 docker 概要信息 docker info
  • 【8】Docker中部署Redis

    1 xff09 拉取镜像 docker pull redis 这是我在 VMware 的 CentOS 中装过的 redis 版本 xff0c 拉取该指定版本使用 docker pull redis 5 0 12 命令 xff0c 不过下面
  • 操作系统学习之系统调用

    目录 一 操作系统学习之系统调用 1 什么是系统调用 2 系统调用有什么用 3 为什么需要系统调用 4 系统调用的具体流程 1 xff09 执行过程 2 如何实现用户态与内态之间的切换 3 系统调用常见名词 4 系统调用如何返回 传递返回值
  • 22-Docker-常用命令详解-docker pull

    常用命令详解 docker pull 前言docker pull语法格式options说明 使用示例未指定tag a 拉取所有 tagged 镜像 前言 本篇来学习docker pull命令 docker pull 作用 xff1a 从镜像
  • 720p,1080p对应像素解释

    720P是1280 720 61 921600 xff0c 即 分辨率为921600 xff0c 即大约92万像素 xff0c 921600接近100万像素 xff08 1280是按照16 9算出来的 xff0c 4 3的另算 xff0c
  • vue3学习二:模板字符串

    模板字符串是为了解决字符串拼接问题 xff0c 在es5中 xff0c 字符串拼接是这样的 xff1a let name 61 34 wjdsg 34 console log 34 您好 34 43 name 而在es6中可以使用模板字符串
  • Canal 读取 mysql bin_log

    场景 xff1a 在微服务开发的过程中多个项目协同完成一个功能 xff0c 工程与工程之间存在数据上的解耦 xff0c 底层服务为上层服务提供数据 而底层服务有需要对数据进行管理 解决方案 xff1a 基本底层服务 通过 canal 获取
  • PuTTY连接Linux服务器被拒绝问题

    PuTTY连接Linux服务器被拒绝问题 1 使用命令 xff1a ssh localhost 查看是否安装ssh1 2需要手动安装ssh1 2 1 输入命令 xff1a 1 2 2 若是出现下图所示 xff1a 1 2 3 查看进程 xf
  • 实时数据同步工具<Maxwell 操作案例>

    文章目录 案例一 xff1a 监控MySQL中的数据并输出到控制台案例二 xff1a Maxwell监控mysql的数据输出到kafka案例三 xff1a 监控MySQL指定表的数据并输出到kafka 案例一 xff1a 监控MySQL中的