文章目录
- 案例一:监控MySQL中的数据并输出到控制台
- 案例二:Maxwell监控mysql的数据输出到kafka
- 案例三:监控MySQL指定表的数据并输出到kafka
案例一:监控MySQL中的数据并输出到控制台
-
运行Maxwell来监控mysql数据的更新
[root@bigdata01 maxwell-1.29.2]
-
向mysql的test_maxwell数据库中创建表test
mysql> CREATE TABLE test(`name` VARCHAR(30),sex VARCHAR(10),score INT);
Maxwell控制台的输出内容如下:
11:20:31,509 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000002:67898], lastHeartbeat=1664594424396] after applying
"create table test(
`name` varchar(30),
sex varchar(10),
score int)"
to test_maxwell, new schema id is 2
-
向表test中插入一条数据
mysql> INSERT INTO test VALUES('ZhangSan','male',28);
向test表中插入数据时在maxwell控制台输出的数据如下:
{
"database":"test_maxwell", //数据库名字
"table":"test", //表名
"type":"insert", //操作类型
"ts":1664594529, //操作时间
"xid":1403, //操作id
"commit":true, //提交状态
"data": //数据
{"name":"ZhangSan",
"sex":"male",
"score":28
}
}
当插入2条数据时Maxwell控制台的输出内容:
说明:
当我们对表test插入两条数据,控制台就会有两个json日志,说明Maxwell是以数据行为为单位进行进行日志的采集
-
当我们修改test表中的一条数据时,
mysql> UPDATE test SET score=26 WHERE NAME='lisi';
Maxwell控制台的输出如下:
{
"database":"test_maxwell",
"table":"test",
"type":"update", //对数据的操作类型
"ts":1664594697, //操作的时间
"xid":1855, //操作id
"commit":true, //任务提交状态true表示成功
"data": //修改后的数据
{
"name":"lisi",
"sex":"male",
"score":26
},
"old": //原来数据
{
"score":19
}
}
-
删除test表中一条数据
mysql> delete from test where name='lisi';
maxwell 的控制台输出:
{
"database":"test_maxwell",
"table":"test",
"type":"delete",
"ts":1664595306,
"xid":3458,
"commit":true,
"data":
{
"name":"lisi",
"sex":"male",
"score":26
}
}
案例二:Maxwell监控mysql的数据输出到kafka
-
普通输出到kafka
(1)此案例需要先启动zookeeper和kafka
启动zk,启动kafka
[root@bigdata01 maxwell-1.29.2]
---------------------bigdata01----------------
3648 Jps
2947 QuorumPeerMain
3331 Kafka
---------------------bigdata02----------------
2378 QuorumPeerMain
2939 Jps
2764 Kafka
---------------------bigdata03----------------
2965 Jps
2760 Kafka
2381 QuorumPeerMain
(2)启动Maxwell监控binlog
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102'--producer=kafka --kafka.bootstrap.servers=bigdata01:9092 --kafka_topic=maxwell
[root@bigdata01 ~]
---------------------bigdata01----------------
4688 Maxwell
2947 QuorumPeerMain
3331 Kafka
4781 Jps
---------------------bigdata02----------------
(3)打开 kafka的控制台的消费者消费maxwell主题
bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic maxwell
[root@bigdata01 ~]
4688 Maxwell
2947 QuorumPeerMain
3331 Kafka
5195 Jps
4812 ConsoleConsumer
(4)对test_maxwell库中的test表进行操作
插入一条数据
mysql> INSERT INTO test VALUES('sisi','famale',100);
(5)kafka消费者端的输出
{
"database":"test_maxwell",
"table":"test",
"type":"insert",
"ts":1664595971,
"xid":4331,
"commit":true,
"data":{
"name":"wangwu",
"sex":"male",
"score":80
}
}
注意:
kafka 消费者端输出结果与我们使用maxwell监控mysql时输出到maxwell控制台的json数据:格式相同
-
Maxwell配置监控实现kafka分区控制
在公司生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这
些数据发往 kafka 的一个主题 Topic,并且这个主题也肯定是多分区的,为了提高并发度。
那么如何控制这些数据的分区问题,就变得至关重要,实现步骤如下:
(1)创建一个具有三个分区的topic
[root@bigdata01 kafka]
(2)修改Maxwell的配置文件config.properties
生产者环境设置为kafka,本文选择的分区模式是按数据库进行分区,控制分区模式包括 库名,表名,列名,主键
producer=kafka
kafka.bootstrap.servers=bigdata01:9092
host=bigdata01
user=maxwell
password=123456
kafka_topic=maxwell3
producer_partition_by=database
(3) 使用Maxwell的配置文件config.properties启动Maxwell进程
[root@bigdata01 maxwell-1.29.2]
(4)向Maxwell监控的数据库test_maxwell和test_maxwell3中分别插入一条数据,我们观察kafka的变化
maxwell监控哪些数据库可以再my.cnf文件中进行配置
[mysqld]
server_id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=test_maxwell
binlog-do-db=test_maxwell2
- 向test_maxwell库中的test表插入一条数据
INSERT INTO test VALUES(‘wangwu’,‘male’,80);
在kafka tool中查看kafka的状态
- 向test_maxwell2库中的test表插入一条数据
mysql> INSERT INTO test VALUES('sisi','famale',100);
在kafka tool查看kafka状态
注意:
我们可以看见对test_maxwell库的操作传输到了kafka中主题maxwell3的1号分区,而对test_maxwell2中表的操作传输到了2号分区,证实了我们定制的安装数据库进行分区的规则。
案例三:监控MySQL指定表的数据并输出到kafka
(1)运行zookeeper和kafka
[root@bigdata01 maxwell-1.29.2]
---------------------bigdata01----------------
3648 Jps
2947 QuorumPeerMain
3331 Kafka
---------------------bigdata02----------------
2378 QuorumPeerMain
2939 Jps
2764 Kafka
---------------------bigdata03----------------
2965 Jps
2760 Kafka
2381 QuorumPeerMain
(2)通过配置config.properties,定制化启动Maxwell并指定监控test_maxwell下的test表,并将数据传给kafka的maxwell3主题
- 配置config.properties文件中的filter参数,指定监控某个表
filter = exclude: *.*, include: test_maxwell.test
注意:
还可以设置 include: test_maxwell.*,通过此种方式来监控 mysql 某个库的所有
表,也就是说过滤整个库。读者可以自行测试。
(3)向test表中插入一条数据,看kafka tool的变化和kafka消费者端控制台的输出情况
mysql> INSERT INTO test VALUES('lili','famale',100);
-
在kafka tool查看
-
在kafka消费者端控制台查看
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)