使用canal连接kafka

2023-05-16

这篇主要是项目还原,目的是记录构建时遇到的各种奇葩坑,避免下次迷路。废话不多说,直接上手。

默认已安装dockerdocker-composenodejsyarntypescript

  1. 首先根据 kafka-docker 这个官方的仓库下的docker-compose.yml复制一份到自己的项目中
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: .
    ports:
      - "9092"
    environment:
      # 更改为自己的ip地址,最好是ip地址,我先使用localhost
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

kafka下的build项,更改为kafka镜像,可以从dockerhub中查找指定版本的kafka,这里使用wurstmeister/kafka:2.13-2.7.0

environment下添加配置属性

...
  kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    ports:
      - "9092:9092" #向外暴露端口
    environment:
      KAFKA_BROKER_ID: 1 #新增一个broker id
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "test:2:1" #新增一个topic或多个 
...

然后拉取镜像,并运行起来

$ docker-compose up
  1. 编写ProducerCustomer

    kafkajs

    //config.ts //简单的配置
    const Config = {
        brokers: [
            "localhost:9092" //kafka的服务器
        ],
        topic: 'test' //与kafka添加的topcs一样
    }
    
    export default Config;
    
    
    //kafka.ts //实例化一个kafkajs对象
    import { Kafka } from "kafkajs";
    import Config from "./config";
    
    const kafka = new Kafka({
        clientId: 'kafkajs',
        brokers: Config.brokers
    });
    
    export default kafka;
    
    //producer.ts //kafka Producer
    import { Message } from "kafkajs";
    import Config from "./config";
    import kafka from "./kafka";
    
    async function producer(messages: Message[]) {
        const producer = kafka.producer();
        await producer.connect();
        await producer.send({
            topic: Config.topic,
            messages
        });
        await producer.disconnect()
    }
    
    export default producer;
    
    //consumer.ts //kafka Consumer
    import kafka from "./kafka";
    import Config from "./config";
    import { ConsumerConfig, EachMessagePayload } from "kafkajs";
    
    async function consumer(config: ConsumerConfig) {
        const consumer = kafka.consumer(config);
        await consumer.connect()
        await consumer.subscribe({
            topic: Config.topic,
            fromBeginning: true
        });
        await consumer.run({
            eachMessage: async ({topic, partition, message}: EachMessagePayload): Promise<void> => {
                console.log({
                    value: message.value.toString(),
                    topic,
                    partition
                })
            }
        })
    }
    
    export default consumer;
    
    //index.ts //函数入口
    import producer from "./producer";
    import consumer from './consumer';
    
    async function start() {
        await producer([
            {value: 'Hello'},
            {value: ','},
            {value: 'I\'m'},
            {value: 'kafkajs'}
        ])
    
        await consumer({
            groupId: 'consumer-1'
        })
        await consumer({
            groupId: 'consumer-2'
        })
    }
    
    start()
    .catch(console.log)
    

    然后编译文件,并运行,可以看到我们消息从Producer发送到了Consumer

  2. 接入canal

    修改docker-compose.yml,添加canal的镜像和相关配置,同时添加一个测试的mysql镜像(注,由于项目需求,我还配置了wordpress镜像)

    canal配置说明

    ...
      canal:
        image: canal/canal-server:v1.1.4
        environment:
          - canal.instance.mysql.slaveId=54321 #slave id 不要与mysql的一样就行
          - canal.instance.master.address=mysql:3306 #mysql地址
          - canal.instance.dbUsername=kafka #mysql 对应的用户名
          - canal.instance.dbPassword=kafka #mysql 对应的密码
          - canal.instance.parser.parallel=false #由于我用的虚拟机,cpu为1G,所以设为false
          - canal.instance.filter.regex=kafka\.user #数据库中要监听的表,详细看官方说明
          - canal.mq.dynamicTopic=.*\..* #动态生成topic
          - canal.zkServers=zookeeper:2181 #链接zookeeper集群的链接信息
          #canal.properties 配置
          - canal.serverMode=kafka #MQ使用的kafka
          - canal.mq.servers=kafka:9092 #kafka地址
        depends_on:
          - zookeeper
          - kafka
    
      mysql:
        image: mysql:5.7
        restart: always
        volumes:
          - ./configuration/conf.d/binlog.cnf:/etc/mysql/conf.d/binlog.cnf #为了让mysql开启bin_log模式的配置
        restart: always
        environment:
          MYSQL_ROOT_PASSWORD: root_password_can_you
          MYSQL_DATABASE: kafkadb
          MYSQL_USER: kafka
          MYSQL_PASSWORD: kafka
        ports:
          - 3306:3306
    
    

    binlog.cnf配置文件内容,canal官方说明

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    

    拉取镜像,启动项目

    $ docker-compose up
    

    更改mysql权限 ,使用root登录到mysql

    CREATE USER kafka IDENTIFIED BY 'kafka';  # 创建与docker-compose.yml中对应的用户和密码
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'kafka'@'%'; #给mysql用户权限
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; #也可以给所有权限
    FLUSH PRIVILEGES;
    

    创建一个数据库kafkadb并添加一个user

    user表插入数据

    INSERT INTO user ( `id`, `username`) VALUES ( 1, 'yan');
    

    好像没有数据过来(至少我的是这样)

  3. 排查问题

    首先查看是否镜像运行正常

    $ docker ps 
    

    发现没有问题,只有依次排查每个镜像日志,先从canal查起

    $ docker exec -it <canal 镜像> bash
    #然后进入canal-server文件夹
    $ cd canal-server
    $ cat logs/example/example.log
    #发现出错了,以下为片段
    # Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    

    百度后,发现和这个问题很像,那应该就是我们前面说的kafkaip设置成localhost导致的,尝试更改一下,问题解决

    再插入数据,可以看到数据被接收到了

后记

其实在部署之间,遇到了很多问题,由于这次是问题重现,有些问题并没有再出现

例如有自己写的Producer程序推送消息时,报错There is no leader for this topic-partition as we are in the middle of a leadership election 这是由于,没有设置KAFKA_BROKER_ID导致每次构建项目,都重新生成了brokder id,可以在构建项目时在其后添加--no-recreate ,可以再这里找到 。记得使用docker-compose rm -vfs删除后再构建项目

也有zookeeper报错Zookeeper Report Error:KeeperErrorCode = NoNode,可以再这里找到

等等

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用canal连接kafka 的相关文章

随机推荐

  • Android S GTS 常见的 fail 项

    此文章只是一篇总结 xff0c 针对 MTK 平台近期的 GTS 做个简单的整理回顾 xff0c 后期不断扩展 GTS GtsJniUncompressHostTestCases com google android gts jniunco
  • 正则表达式底层实现 matcher.find

    matcher find 完成的任务 xff08 考虑分组 xff09 什么是分组 xff0c 比如 d d d d 正则表达式中有 表示分组 第1个 表示第1组 第2个 表示第2组 1 根据指定的规则 定位满足规则的子字符串 比如 19
  • MyBatisPlus配置与实现

    目录 基于SpringBoot使用MyBatisPlus 标准数据层开发 Lombok 分页功能 DQL编程控制 构建条件查询 null判定 查询投影 聚合查询 分组查询 查询条件 模糊查询 排序查询 映射匹配兼容性 DML编程控制 id生
  • 深入理解SpringApplication.run(PeaApplication.class,args)(1)

    运行流程 xff1a 前言 本篇将对SpringApplication run xff09 方法进行源码溯源 xff0c 深入理解该方法 在进入该方法后 xff0c 把第一个class参数转化为数组类型 xff0c 调用同名方法 这里有2个
  • Nacos注册中心

    国内公司一般都推崇阿里巴巴的技术 xff0c 比如注册中心 xff0c SpringCloudAlibaba也推出了一个名为Nacos的注册中心 认识和安装Nacos Nacos是阿里巴巴的产品 xff0c 现在是SpringCloud中的
  • Activity的启动和结束

    onCreate xff1a 创建活动 此时会把页面布局加载进内存 xff0c 进入了初始状态 onStart xff1a 开启活动 此时会把活动页面显示在屏幕上 xff0c 进入了就绪状态 onResume xff1a 恢复活动 此时活动
  • Nacos集群搭建

    官方给出的Nacos集群图 xff1a 其中包含3个nacos节点 xff0c 然后一个负载均衡器代理3个Nacos 这里负载均衡器可以使用nginx 我们计划的集群结构 xff1a 2 搭建集群 搭建集群的基本步骤 xff1a 搭建数据库
  • Feign远程调用

    目录 Feign替代RestTemplate 1 xff09 引入依赖 2 xff09 添加注解 3 xff09 编写Feign的客户端 Load balancer does not have available server for cl
  • Gateway服务网关

    目录 为什么需要网关 1 xff09 创建gateway服务 xff0c 引入依赖 2 xff09 编写启动类 3 xff09 编写基础配置和路由规则 4 xff09 重启测试 5 xff09 网关路由的流程图 断言工厂 过滤器工厂 路由过
  • 解决方案|pyltp RuntimeError: incompatible native format - size of long(For Mac OS)

    环境 系统 MAC OS 版本 Python 3 7 开发环境 Pycharm 问题 解决方案 报错定位 将Line 26 39 pisrl win model 39 改为 39 pisrl model 39
  • 安装Docker

    目录 0 安装Docker 1 CentOS安装Docker 1 1 卸载 xff08 可选 xff09 1 2 安装docker 1 3 启动docker 1 4 配置镜像加速 2 CentOS7安装DockerCompose 2 1 下
  • Linux命令提示符和命令格式

    一 Linux命令提示符 如上图 xff0c 终端命令提示符为dylan 64 dyalns ubuntu xff0c 表示当前终端的状态 span class token operator span dylan xff1a Linux是一
  • Ubuntu 报错:E: Package ' *** ' has no installation candidate

    在Ubuntu中安装软件的时候有时候会出现 Package has no installation candidate 的问题 xff0c 如下图所示 xff1a 此时 xff0c 亲测有效的方式是 xff1a 打开终端 xff0c 在终端
  • SpringBoot项目配置

    1 properties配置文件 修改上下文路径 server servlet context path 61 springBoot01 修改端口号 server port 61 8080 数据库的配置信息 spring datasourc
  • Java中集合工具类collections

    一 什么是collections类 xff1a xff08 1 xff09 Collections类是Java提供的一个集合操作工具类 它包含了大量的静态方法 xff0c 用于实现对集合元素的排序 查找和替换等操作 xff08 2 xff0
  • xubuntu系统中设置上边框隐藏

    问题描述 在xubuntu系统中使用软件时 xff0c 打开的软件弹窗都会置于最上方 xff0c 如果软件全屏使用的话 xff0c 上方的边框很影响用户使用体验 原因分析 xff1a 该问题属于panel的设置问题 在界面操作上如下 xff
  • apache2更换php版本最快捷方法

    将php7 0 更改为php5 6 安装php apt get install php5 php5 mysql libapache2 mod php5 禁用php7 0 a2dismod php7 0 启用php5 6 a2enmod ph
  • 修改MDK5(keil5)工程文件名称

    1 打开工程文件夹 xff0c 将Template uvoptx和Template uvproj文件改成LockAmplifier uvoptx和 LockAmplifier uvproj xff08 根据自己需要修改相应的名称 xff09
  • kafka 报错 no leader

    作为Producer向kafka发送消息时 xff0c 报出错误 There is no leader for this topic partition as we are in the middle of a leadership ele
  • 使用canal连接kafka

    这篇主要是项目还原 xff0c 目的是记录构建时遇到的各种奇葩坑 xff0c 避免下次迷路 废话不多说 xff0c 直接上手 默认已安装docker xff0c docker compose xff0c nodejs xff0c yarn