Centos7.9搭建kafka-3.2.0集群,三台机器组成,并安装安装eagle 监控

2023-10-26

目录

1、准备工作

2、搭建zookeeper集群

3、搭建kafka集群

4、测试kafka集群

4.1 命令测试

4.2 java代码测试

5、安装eagle 监控


1、准备工作

kafka-3.2.0的单机安装教程如下:

CentOS7.9安装kafka-3.2.0和window10 下安装kafka-3.2.0_fyihdg的博客-CSDN博客CentOS7.9安装kafka-3.2.0https://blog.csdn.net/fyihdg/article/details/125552717

在这个文章基础上搭建的集群,我的是三台服务器。

192.168.1.61

192.168.1.62

192.168.1.63

软件包已经上传到了服务器:

修改主机名 

 vi /etc/hosts

 我三台的hosts文件都是修改成如下

192.168.1.61 node01
192.168.1.62 node02
192.168.1.63 node03

 java 环境变更也早配置好了

安装时间同步插件:

yum install ntp -y

 同步服务器时间命令:

ntpdate ntp1.aliyun.com

clock -w

 2、搭建zookeeper集群

  修改zk配置文件,zoo.cfg

vi zoo.cfg

修改192.168.1.61的服务器,修改的地方有:

dataDir=/root/zkdata

 与单机的区别就是要添加以下信息:

server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888

记得创建目录:

mkdir /root/zkdata

 剩下的两台都是相同的操作。

修改myid

 192.168.1.61上执行:
 echo 1 > /root/zkdata/myid

 192.168.1.62上执行:
 echo 2 > /root/zkdata/myid


 192.168.1.63上执行:
 echo 3 > /root/zkdata/myid

 

 

 切换到目录:/opt/apache-zookeeper-3.8.0/bin,三台服务器执行,

./zkServer.sh start zoo.cfg

 然后查看状态:

 

 可以看到,63这台是主节点,zookeeper 集群搭建好了。

3、搭建kafka集群

切换到kafa文件目录,然后解压:

 tar -zxf kafka_2.13-3.2.0.tgz -C /opt

切换到目录:/opt/kafka_2.13-3.2.0/config, 

修改配置文件: server.properties,192.168.1.61修改如下:

listeners=PLAINTEXT://node01:9092

log.dirs=/usr/kafka-logs

zookeeper.connect=node01:2181,node02:2181,node03:2181

修改配置文件: server.properties,192.168.1.62修改如下:

broker.id=1

listeners=PLAINTEXT://node02:9092

log.dirs=/usr/kafka-logs

zookeeper.connect=node01:2181,node02:2181,node03:2181

修改配置文件: server.properties,192.168.1.63修改如下:

broker.id=2

listeners=PLAINTEXT://node03:9092

log.dirs=/usr/kafka-logs

zookeeper.connect=node01:2181,node02:2181,node03:2181

配置就完成了

4、测试kafka集群

4.1 命令测试

 启动kafka服务:

3台机器分别执行以下命令: 

./bin/kafka-server-start.sh  -daemon  config/server.properties

创建主题:

./bin/kafka-topics.sh  --bootstrap-server node01:9092,node02:9092,node03:9092  --create  --topic topic01  --partitions 3 --replication-factor 2

 如果三台机器都创建主题的话,会报错:“Error while executing topic command : Topic 'topic02' already exists.”

查看有多少队列:

./bin/kafka-topics.sh 	  --bootstrap-server node01:9092,node02:9092,node03:9092 --list

查看主题详细信息:

./bin/kafka-topics.sh  --bootstrap-server node01:9092,node02:9092,node03:9092  --describe   --topic topic01

看一下/usr/kafka-logs这个目录有什么内容

 如有需要可以修改:

./bin/kafka-topics.sh  --bootstrap-server node01:9092,node02:9092,node03:9092   --alter  --topic topic02  --partitions 3

删除:

./bin/kafka-topics.sh     --bootstrap-server node01:9092,node02:9092,node03:9092   --delete   --topic topic03

订阅

输入命令:

./bin/kafka-console-consumer.sh  --bootstrap-server node01:9092,node02:9092,node03:9092  --topic topic01  --group g1 --property print.key=true --property print.value=true  --property key.separator=,

 生产者:

./bin/kafka-console-producer.sh   --broker-list node01:9092,node02:9092,node03:9092   --topic topic01

 

 

     消费组

  ./bin/kafka-consumer-groups.sh  --bootstrap-server node01:9092,node02:9092,node03:9092    --list  g1

消费组详细

 ./bin/kafka-consumer-groups.sh  --bootstrap-server node01:9092,node02:9092,node03:9092     --describe   --group g1

4.2 java代码测试

 如果是window 10,要修改hosts文件,首先到目录:C:\Windows\System32\drivers\etc

然后修改hosts文件,添加集群的主机映射:

192.168.1.61    node01 
192.168.1.62    node02 
192.168.1.63    node03 

 

 新建代码:

public class KafkaConsumerDemo {
    public static void main(String[] args) {
        //1.创建Kafka链接参数
        Properties props=new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092,node02:9092,node03:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");

        //2.创建Topic消费者
        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
        //3.订阅topic开头的消息队列
        consumer.subscribe(Pattern.compile("^topic.*$"));

        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();
            while (recordIterator.hasNext()){
                ConsumerRecord<String, String> record = recordIterator.next();
                String key = record.key();
                String value = record.value();
                long offset = record.offset();
                int partition = record.partition();
                System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);
            }
        }
    }
}

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hdg</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>


    <dependencies>

            <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<!--            <dependency>-->
<!--                <groupId>org.apache.kafka</groupId>-->
<!--                <artifactId>kafka-clients</artifactId>-->
<!--                <version>2.2.0</version>-->
<!--            </dependency>-->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>2.2.0</version>
            </dependency>

            <!-- https://mvnrepository.com/artifact/log4j/log4j -->
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.25</version>
            </dependency>

            <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.9</version>
            </dependency>


    </dependencies>

</project>

 如果不配置hosts文件,可能会报:

 

 到此集群搭建成功。

5、安装eagle 监控

下载地址:

官网地址:EFAKhttps://www.kafka-eagle.org/

 

源码地址:GitHub - smartloli/EFAK: A easy and high-performance monitoring system, for comprehensive monitoring and management of kafka cluster.A easy and high-performance monitoring system, for comprehensive monitoring and management of kafka cluster. - GitHub - smartloli/EFAK: A easy and high-performance monitoring system, for comprehensive monitoring and management of kafka cluster.https://github.com/smartloli/EFAK 

我已经把安装包上传到CSDN,免积分可以下载:

kafka-eagle-bin-2.1.0.tar.gz-Java文档类资源-CSDN下载kafka-eagle-bin-2.1.0.tar.gz2022年7月份下载,最新版更多下载资源、学习资料请访问CSDN下载频道.https://download.csdn.net/download/fyihdg/85901920上传到服务器,我是安装在192.168.1.61这一台服务器,只在一台安装:

解压到 /opt

tar -xvf kafka-eagle-bin-2.1.0.tar.gz -C /opt

 

 可以看到, 还是一个压缩包,到/opt/kafka-eagle-bin-2.1.0目录,再解压:

tar -xvf efak-web-2.1.0-bin.tar.gz 

解压后:

移到到 /opt目录
mv efak-web-2.1.0 /opt


把源安装包删除

 rm -rf kafka-eagle-bin-2.1.0/

 

修改环境变量:

 vi /etc/profile

 使文件生效

source /etc/profile

 切换到目录:/opt/efak-web-2.1.0/conf,修改配置文件:system-config.properties

efak.zk.cluster.alias=cluster1
cluster1.zk.list=node01:2181,node02:2181,node03:2181
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181



cluster1.efak.offset.storage=kafka
#cluster2.efak.offset.storage=zk



#cluster2.efak.sasl.enable=false
#cluster2.efak.sasl.protocol=SASL_PLAINTEXT
#cluster2.efak.sasl.mechanism=PLAIN
#cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
#cluster2.efak.sasl.client.id=
#cluster2.efak.blacklist.topics=
#cluster2.efak.sasl.cgroup.enable=false
#cluster2.efak.sasl.cgroup.topics=

######################################
# kafka ssl authenticate
######################################
#cluster3.efak.ssl.enable=false
#cluster3.efak.ssl.protocol=SSL
#cluster3.efak.ssl.truststore.location=
#cluster3.efak.ssl.truststore.password=
#cluster3.efak.ssl.keystore.location=
#cluster3.efak.ssl.keystore.password=
#cluster3.efak.ssl.key.password=
#cluster3.efak.ssl.endpoint.identification.algorithm=https
#cluster3.efak.blacklist.topics=
#cluster3.efak.ssl.cgroup.enable=false
#cluster3.efak.ssl.cgroup.topics=

#这个是我window mysql
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://192.168.1.66:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root

切换到目录:/opt/kafka_2.13-3.2.0/bin,3台服务器都要修改,添加: 

vi kafka-server-start.sh

 export JMX_PORT="7788"

 3台服务器配置好后,重启一下

./kafka-server-stop.sh 


./bin/kafka-server-start.sh  -daemon  config/server.properties

 切换到目录:/opt/efak-web-2.1.0/bin,查看,发现没有执行权限:

授权:

chmod 777 *

 启动:

./ke.sh  start

 

如果不能连接,比如报以下错误:

发现没开启远程访问

解决办法:

在装有MySQL的机器上登录MySQL mysql -u root -p密码
执行use mysql;
执行update user set host = '%' where user = 'root';这一句执行完可能会报错,不用管它。
执行FLUSH PRIVILEGES;

经过上面4步,就可以解决这个问题了。 

输入访问地址:http://192.168.1.61:8048

 * Account:admin ,Password:123456

 如果发现无法登录,可以查看日志:/opt/efak-web-2.1.0/logs

less  log.log

比如说,会报:出现The server time zone value ‘�й���׼ʱ��’ is unrecogni异常

mysql连接加上&serverTimezone=UTC

又发现报错了

### The error occurred while setting parameters
### SQL: insert into   ke_metrics(`cluster`,`broker`,`type`,`key`,`value`,`timespan`,`tm`)   values         (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)    ,     (?,?,?,?,?,?,?)
### Cause: java.sql.SQLSyntaxErrorException: Table 'ke.ke_metrics' doesn't exist
; bad SQL grammar []; nested exception is java.sql.SQLSyntaxErrorException: Table 'ke.ke_metrics' doesn't exist

这是因为mysql8.0不支持,我们得使用mysql5.7版本!!

自动创建数据库和表了: 

搭建成功了,一定要注意,不能使用mysql8.0,否则是无法进入的 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Centos7.9搭建kafka-3.2.0集群,三台机器组成,并安装安装eagle 监控 的相关文章

随机推荐

  • VSCode+Idea 一个增删改查demo

    一个简单的增删改查demo 开发工具 Visual Studio Code Idea Navicat demo 前端 后台 开发工具 Visual Studio Code 一款前端开发工具 下载地址 https code visualstu
  • 微信小程序开发教程 #043 - 在小程序开发中使用 npm

    本文介绍了如何在微信小程序开发中使用 npm 中包的功能 大大提高微信小程序的开发效率 同时也是微信小程序系列教程的视频版更新 微信小程序在发布之初没有对 npm 的支持功能 这也是目前很多前端开发人员在熟悉了 npm 生态环境后 对微信小
  • 【腾讯云 Cloud Studio 实战训练营】丝滑体验:用 Cloud Studio 实现一个精确计时的时钟

    当今的云计算技术已经越来越成熟 基于云计算技术进行云端开发已经成为最新趋势 而 Cloud Studio 是一个基于云计算的 Web 端开发微服务平台 提供了代码编辑器 调试器 代码库 以及自动构建和部署工具等各种功能 帮助开发者在云端开发
  • druid 配置

    spring datasource druid连接池 type com alibaba druid pool DruidDataSource 数据库驱动 driver com mysql jdbc Driver 最大连接池数量 max ac
  • 开源协议详解

    开源在今天的软件业已经很普遍 但开源是否意味着使用者可以对开源后的代码为所欲为呢 答案是否定的 开源运动同样有自己的游戏规则和道德准则 不遵行这些规则不但损害开源运动的健康发展 也会对违规者造成名誉和市场上的损失 更可能陷入法律纠纷和赔偿
  • gitlab Undefined method `provider' for nil:nilclass 登陆提示处理

    使用管理员用户 在管理区域 用户管理里面 搜索对应用户 修改用户身份里面的LDAP身份为正确信息
  • 【基础】Unity:Application的常用方法

    Application的常用方法 static void LoadLevel int index static void LoadLevel string name static void CaptureScreenShot string
  • php使用区块链_PHP实现区块链

    作者 列旭松 来源 高可用架构 原文链接 http t cn RgjsJ1i 著作权归作者所有 商业转载请联系作者获得授权 非商业转载请注明出处 来自 Linux内核那些事 微信号 like linux 作者 列旭松 唯品会资深工程师 曾任
  • C++标准演绎(未完)

    作者 略游 q群 512 001 256 一 词汇定义 标准 standard C 语言标准 在代码世界里 我们假设与公理等价 结论 由标准推导出的事实 规定 便于讨论 我们设定的一些规则 类型 type 同一类型 它们在C 内存布局一致
  • 简谈拉电阻

    简谈拉电阻 前言 拉电阻 弱拉和强拉 上拉和下拉 前言 电路设计中经常设计到拉电阻的概念 与常用的GPIO口的配置也息息相关 网上也有很多的总结 不多累述 简单的总结拉电阻相关的一些概念 拉电阻 拉电阻分为上拉电阻 pull up 和下拉电
  • powerdesigner常用配置-修改外键设置

    文章目录 取消自动生成外键列 PowerDesigner给两个表添加reference 中间显示外键信息步骤 取消自动生成外键列 PowerDesigner给两个表添加reference 中间显示外键信息步骤
  • Floyd算法(三)之 Java详解

    前面分别通过C和C 实现了弗洛伊德算法 本文介绍弗洛伊德算法的Java实现 目录 1 弗洛伊德算法介绍 2 弗洛伊德算法图解 3 弗洛伊德算法的代码说明 4 弗洛伊德算法的源码 转载请注明出处 http www cnblogs com sk
  • 记一次XFS文件系统崩溃的处理

    1 问题出现 当使用rpm安装服务时 出现如下问题 当使用yum时也是一样 查了 var log messages发现以下错误 看起来是XFS的问题 可以发现dm 0实际是bel root的问题 查了网上很多解决方法都是说要使用xfs re
  • TCP/IP 报文格式(IP数据包、TCP报头、UDP报头)

    TCP IP 报文格式 IP数据包 TCP报头 UDP报头 一 IP包格式 IP数据包 是一种可以变长的分组 由首部与数据负载组成 首部长度为20 60字节 Byte 后40字节是可选的 但长度不固定 前20字节格式为固定 数据负载部分的长
  • 医疗器械维修工程师好做吗?赚钱吗?

    彩虹医疗器械维修培训第二期长期班安排 学技术 考证书 工作技术支持 彩虹介绍 彩虹医械维修培训中心成立于2003年 至今已有十余年的时间 为满足社会需求 推进医疗器械维修技术发展 开展医疗器械的维修技术培训 在社会上取得了良好的反应和口碑
  • 嵌入式Linux驱动开发(I2C专题)(四)

    编写APP直接访问EEPROM 参考资料 Linux驱动程序 drivers i2c i2c dev c I2C Tools 4 2 https mirrors edge kernel org pub software utils i2c
  • 在 Kubernetes 上体验 EMQX 5.0 的 MQTT over QUIC 特性

    引言 作为全球领先的开源分布式 MQTT Broker EMQX 在 5 0 版本中引入了 MQTT over QUIC 将 MQTT 协议的优势与 QUIC 的特性相结合 通过充分利用 QUIC 协议低连接开销和多路复用的特点 MQTT
  • 入职避坑指南(杭州)

    前言 求职都说金三银四 今天和大家分享一个亲身经历的事情 今天讲的是杭州 群电商公司 规模500人左右 说说我被公司白瞟的亲身经历 希望看到这篇文章的同学能避开这些坑 面试过程 公司比较远 附近没有地铁 大概坐了一个多小时的公交车去面试 到
  • CompletableFuture使用详解

    https blog csdn net admin123404 article details 111168902
  • Centos7.9搭建kafka-3.2.0集群,三台机器组成,并安装安装eagle 监控

    目录 1 准备工作 2 搭建zookeeper集群 3 搭建kafka集群 4 测试kafka集群 4 1 命令测试 4 2 java代码测试 5 安装eagle 监控 1 准备工作 kafka 3 2 0的单机安装教程如下 CentOS7