02-kafka集群搭建

2023-11-03

image.png

0. 服务器准备

  • 服务器规划
IP hostname 服务 服务
10.10.xxx.61 kafka-01 zookeeper-01 kafka-01
10.10.xxx.62 kafka-01 zookeeper-02 kafka-02
10.10.xxx.63 kafka-01 zookeeper-03 kafka-03
  • hostname
    三台服务器hostname设置分别如上表
  • hosts
    /etc/hosts 文件添加如下内容
10.10.239.61     kafka-01
10.10.239.62     kafka-02
10.10.239.63     kafka-03
  • java环境已安装

1. zookeeper集群

1.1 下载

下载地址: https://mirrors.bfsu.edu.cn/apache/zookeeper/
image.png

下载带bin的这个tar包,另一个现在的版本是源码。

下载错了报错如下

 [root@kafka-01 logs]# tailf zookeeper-root-server-kafka-01.out
错误: 找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain

1.2 拷贝

  • 拷贝
    下载完毕,分别拷贝到3台服务器的/data目录(我服务器的数据盘),解压缩如下。
[root@kafka-01 data]# ls
apache-zookeeper-3.6.2-bin  apache-zookeeper-3.6.2-bin.tar.gz
[root@kafka-01 data]# pwd
/data
  • 软连接
    因为目录放在数据盘且目录名保留了版本号,为了使用方便,我在/opt下创建一个软连接:
[root@kafka-01 data]# ln -s /data/apache-zookeeper-3.6.2-bin /opt/zookeeper
[root@kafka-01 opt]# ll
总用量 4
lrwxrwxrwx  1 root root   32 32 13:28 zookeeper -> /data/apache-zookeeper-3.6.2-bin

1.3 配置变量

/etc/profile 文件添加如下内容:

###### zookeeper #############
export ZK_HOME=/opt/zookeeper
export PATH=$ZK_HOME/bin:$PATH

引用变量

[root@kafka-01 opt]# source /etc/profile

1.4 配置

  • 复制配置文件
[root@kafka-01 opt]# cd /opt/zookeeper/conf/
[root@kafka-01 conf]# ll
总用量 16
-rw-r--r-- 1 cloud_user cloud_user  535 94 20:43 configuration.xsl
-rw-r--r-- 1 cloud_user cloud_user 3435 94 20:43 log4j.properties
-rw-r--r-- 1 cloud_user cloud_user 1148 94 20:43 zoo_sample.cfg
[root@kafka-01 conf]# cp zoo_sample.cfg zoo.cfg
  • 修改zoo.cfg 文件如下:
# 修改数据目录(这个目录稍后创建)
dataDir=/opt/zookeeper/data
#添加集群信息(后边myid需要和这个一致)。
server.1=10.10.xxx.61:2888:3888
server.2=10.10.xxx.62:2888:3888
server.3=10.10.xxx.63:2888:3888
  • 创建数据目录
[root@kafka-01 conf]# cd /opt/zookeeper/
[root@kafka-01 zookeeper]# mkdir data
  • myid
    给个节点写入myid,需要和前边配置文件中一致。三台分别写入1,2,3。
[root@kafka-01 zookeeper]# cd data/
[root@kafka-01 data]# echo 1 > myid

1.5 启动

  • 启动
[root@kafka-01 data]# zkServer.sh start
[root@kafka-01 data]# netstat -ntlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      941/sshd
tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      1392/master
tcp6       0      0 :::17822                :::*                    LISTEN      941/sshd
tcp6       0      0 :::37923                :::*                    LISTEN      17244/java
tcp6       0      0 :::2181                 :::*                    LISTEN      17244/java
tcp6       0      0 10.10.239.61:3888       :::*                    LISTEN      17244/java
tcp6       0      0 :::8080                 :::*                    LISTEN      17244/java
tcp6       0      0 :::22                   :::*                    LISTEN      941/sshd
tcp6       0      0 ::1:25                  :::*                    LISTEN      1392/master
  • 设置开机启动
[root@kafka-01 init.d]# cat  > /etc/init.d/zookeeper.sh << EOF 
#!/bin/sh
#chkconfig: 2345 55 56
#description:zookeeper.sh
source /etc/profile
zkServer.sh start
EOF
[root@kafka-01 init.d]# chmod 755 /etc/init.d/zookeeper.sh
[root@kafka-01 init.d]# systemctl enable zookeeper.sh
zookeeper.sh.service is not a native service, redirecting to /sbin/chkconfig.
Executing /sbin/chkconfig zookeeper.sh on

说明:
脚本中引用一下profile文件,因为子shell中读不到JAVA_HOME 和 ZK_HOME

2. kafka

2.1 下载

2.2 拷贝到服务器

  • 下载完毕后,拷贝到/data 目录下,并解压缩。结果如下:
[root@kafka-01 data]# ll
总用量 79224
drwxr-xr-x 8 root root     4096 32 13:37 apache-zookeeper-3.6.2-bin
-rw-r--r-- 1 root root 12515974 32 13:27 apache-zookeeper-3.6.2-bin.tar.gz
drwxr-xr-x 7 root root     4096 32 15:28 kafka_2.13-2.7.0
-rw-r--r-- 1 root root 68583422 32 15:07 kafka_2.13-2.7.0.tgz
drwx------ 2 root root    16384 32 10:54 lost+found
  • 为方便使用,在/opt下创建一个软连接
[root@kafka-01 data]# ln -s /data/kafka_2.13-2.7.0 /opt/kafka
[root@kafka-01 data]# cd /opt/
[root@kafka-01 opt]# ll
总用量 4
lrwxrwxrwx  1 root root   22 32 15:33 kafka -> /data/kafka_2.13-2.7.0
lrwxrwxrwx  1 root root   32 32 13:28 zookeeper -> /data/apache-zookeeper-3.6.2-bin

2.2 配置文件

  • config目录
[root@kafka-01 config]# cd /opt/kafka/config/
[root@kafka-01 config]# ll
总用量 72
-rw-r--r-- 1 root root  906 1216 21:58 connect-console-sink.properties
-rw-r--r-- 1 root root  909 1216 21:58 connect-console-source.properties
-rw-r--r-- 1 root root 5321 1216 21:58 connect-distributed.properties
-rw-r--r-- 1 root root  883 1216 21:58 connect-file-sink.properties
-rw-r--r-- 1 root root  881 1216 21:58 connect-file-source.properties
-rw-r--r-- 1 root root 2247 1216 21:58 connect-log4j.properties
-rw-r--r-- 1 root root 2540 1216 21:58 connect-mirror-maker.properties
-rw-r--r-- 1 root root 2262 1216 21:58 connect-standalone.properties
-rw-r--r-- 1 root root 1221 1216 21:58 consumer.properties
-rw-r--r-- 1 root root 4674 1216 21:58 log4j.properties
-rw-r--r-- 1 root root 1925 1216 21:58 producer.properties
-rw-r--r-- 1 root root 6888 32 15:20 server.properties
-rw-r--r-- 1 root root 1032 1216 21:58 tools-log4j.properties
-rw-r--r-- 1 root root 1169 1216 21:58 trogdor.conf
-rw-r--r-- 1 root root 1205 1216 21:58 zookeeper.properties

我们需要修改 server.properties文件,修改如下:

  • 修改broker.id

三台不能一样,如分别是1,2,3

############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
  • 链接zookeeper
zookeeper.connect=10.10.xxx.61:2181,10.10.xxx.62:2181,10.10.xxx.63:2181
  • 修改后,过滤注释和空格查看内容如下:
[root@kafka-01 config]# cat server.properties |grep -v "^#"|grep -v "^$"
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.10.239.61:2181,10.10.239.62:2181,10.10.239.63:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

2.3 启动

  • nohup启动(不推荐)
nohup /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties 1>/dev/null 2>&1 &
  • 以demean模式启动(推荐)
/opt/kafka/bin/kafka-server-start.sh -daemon  /opt/kafka/config/server.properties
  • 开机启动
[root@kafka-03 init.d]# cat > /etc/init.d/kafka.sh << EOF
#!/bin/sh
#chkconfig: 2345 56 55
#description:kafka.sh
#####################
source /etc/profile
cd /opt/kafka/logs
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
EOF
[root@kafka-01 init.d]# chmod 755 /etc/init.d/kafka.sh
[root@kafka-01 init.d]# systemctl enable kafka.sh
kafka.sh.service is not a native service, redirecting to /sbin/chkconfig.
Executing /sbin/chkconfig kafka.sh on

3. 使用密码

  • 停止kafka
./kafka-server-stop.sh

3.1 配置文件

进入config目录修改如下文件

[root@kafka-01 config]# pwd
/opt/kafka/config
  • server.properties

说明:listeners的地址三台服务器根据实际情况填写各自的ip地址。

listeners=SASL_PLAINTEXT://10.10.xxx.61:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
  • kafka_server_jaas.conf
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="xxe888888"
    user_kafka="xxe888888"
    user_boe="xxe888888";
};
  • kafka_client_jaas.conf
    在config目录添加kafka_client_jaas.conf
KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="mooc"
        password="xxe888888";
};

3.2 修改启动脚本

进入bin目录

[root@kafka-01 bin]# pwd
/opt/kafka/bin
  • kafka-server-start.sh
    修改kafka服务的用户名密码

说明:用${KAFKA_OPTS} 指定用户密码文件。注意写在执行kafka-run-class.sh 脚本前

if [ "x$KAFKA_OPTS" = "x"  ]; then
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
fi
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
  • kafka-console-producer.sh/ kafka-console-consumer.sh
    修改客户端的用户名密码。

说明:用${KAFKA_OPTS} 指定用户密码文件。注意写在执行kafka-run-class.sh 脚本前

if [ "x$KAFKA_OPTS" = "x"  ]; then
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/kafka_client_jaas.conf"
fi
  • 启动服务
/opt/kafka/bin/kafka-server-start.sh -daemon  /opt/kafka/config/server.properties

4. 测试

  • 创建topic
./kafka-topics.sh  --zookeeper localhost:2181 --create --topic Test  --partitions 1 --replication-factor 1
  • 生产者
    打开一个终端,执行如下命令,进入console后随便输入一串数字
[root@kafka-01 bin]# ./kafka-console-producer.sh   --broker-list 10.10.xxx.62:9092 --topic Test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
>111
>hongjian
  • 消费者
    打开生产者的同时,打开一个消费者。可以看到接收到生产者的消息了。
[root@kafka-01 bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Test --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN --from-beginning
111
hongjian

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

02-kafka集群搭建 的相关文章

随机推荐

  • 计算机机房一般在几楼,21层的楼房设备层一般在几楼

    目前高层楼房的设备层一般在地下室 一层和屋面 除了基层之外一般都不会有设备在其他楼层的情况 所以21层楼房设备层应该也是在这些楼层 通常地下室是高层建筑设备最密集的地方 各类强电 弱电 生活水 消防水 污水处理等设备间都会在地下室 当然也有
  • Python操作xlwings

    xlwings的安装 xlwings库使用pip安装 在控制台输入 pip install xlwings 工作簿操作 coding UTF 8 导入所需的第三方模块 import xlwings as xw 关闭警告 屏幕刷新 app d
  • 1.elasticsearch文档存储(保存

    README 0 本文部分内容 数据 总结自 es 开发文档 Document APIs Elasticsearch Guide 7 2 Elastic 1 本文的es版本是7 2 1 2 elasticsearch 是一个数据存储 检索和
  • ROC,AUC,PR

    1 召回率 准确率 ROC曲线 AUC PR曲线这些基本概念 这个是今天的重点 传统机器学习里面也是很重要的一点 刚好可以重新学习一下 像AUC其实它就是一个用来评判你的一个模型的准确率的 因为在普通的正确率中 会因为样本的不平衡 而正确率
  • 人物角色群体攻击判定(三)Physics.OverlapSphere(群体攻击)

    使用Physics OverlapSphere来检测不方便调试 其他都可以 核心代码 检测敌人 public void CheckEnemy Collider cols Physics OverlapSphere this transfor
  • 数组模拟环形队列(思路分析) [数据结构与算法][Java]

    数组模拟环形队列 思路分析 使用数组模拟环形队列 就可以解决使用数组模拟队列中的遗留问题了 那么我们要如何使用数组模拟环形队列 相当于前面讲过的数组模拟非环形队列 也就是一般队列 我们这里有如下的变化 front变量的含义发生了改变 fro
  • sessionStorage什么时候失效

    最近在调试程序的时候无意间看到 cookie 的过期时间是 session 这个 session 表示的是什么时候过期 牵扯出来另一个存储方案 sessionStorage 存储的数据又是什么时候过期呢 在查找相关资料的时候总会看到会话结束
  • SpringCloudAlibaba负载均衡器-Ribbbon

    SpringCloudAlibaba负载均衡器 Ribbon 文章目录 SpringCloudAlibaba负载均衡器 Ribbon 1 什么是Ribbon 1 1 客户端的负载均衡 1 2服务端的负载均衡 1 3 常见的负载均衡算法 2
  • 【合天网安】SQLi-Labs系列之字符型报错注入

    SQL注入介绍 SQLi sql injection 我们称之为sql注入 何为sql 英文 Structured Query Language 叫做结构化查询语言 常见的结构化数据库有MySQL MS SQL Oracle以及Postgr
  • IDEA中快速搜索Jar包里面的内容

    版权声明 本文为CSDN博主 IT model 的原创文章 遵循 CC 4 0 BY SA 版权协议 转载请附上原文出处链接及本声明 原文链接 https blog csdn net IT model article details 888
  • vue3中引入全局的less 和配置代理

    使用vue cli搭建的项目 在项目中引入公共less变量 创建vue config js 内容如下 use strict const path require path const proxyUrlPort http xxxx modul
  • Failed to start mysqld.service: Unit not found

    很多人对本博客的方法提出了质疑 在此我解释一下 由于MySQL在CentOS7中收费了 所以已经不支持MySQL了 取而代之在CentOS7内部集成了mariadb 而安装MySQL的话会和MariaDB的文件冲突 所以本文建议直接安装ma
  • C# 系统应用之TreeView控件 (一).显示树状磁盘文件目录及加载图标

    在C 系统应用毕设U盘防御软件中需要实现文件不可恢复的删除 首先需要实现类似于资源管理器的界面 通过TreeView控件显示 我的电脑 所有磁盘文件树状目录并加载相应图标 显示结果如下图所示 一 界面设计 主窗体是一个Windowss窗体文
  • 简单的hashmap的实现

    package com public class linkList public final class Node public Object k public Object v public Node next null public N
  • suparc服务器没信号,SupARC对战平台新手上手教程

    11对战平台1 2 8 3 官方最新版 类型 修改器 游戏工具 大小 88 1M语言 中文 评分 9 8 标签 立即下载 这里有各种版本的SupARC客户端供选择 另外还有很多ROM 客户端下载成功后需要玩家注册新账号 新手玩家需要点击 注
  • dell服务器开启64位支持,dell服务器虚拟化开启(戴尔bios设置虚拟化)

    开机按f2进入bios设置界面 将光标移至 advanced 再使用上下方向键将光标移至 以上就是设置戴尔笔记本硬盘模式为ahci教程 有遇到戴尔笔记本不懂的如何修改硬 您好 戴尔电脑 一般进入bios的方法为开机按f2 而开机按f12是选
  • springboot毕设项目外文学术期刊遴选服务平台ba094(java+VUE+Mybatis+Maven+Mysql)

    springboot毕设项目外文学术期刊遴选服务平台ba094 java VUE Mybatis Maven Mysql 项目运行 环境配置 Jdk1 8 Tomcat8 5 Mysql HBuilderX Webstorm也行 Eclis
  • Python爬取租房信息并保存至Excel文件

    Python爬取租房信息并保存至Excel文件 爬取网页 解析数据 保存数据 本案例为Python编写Spider程序 获取租房相关信息 并保存至Excel文件 大致分为爬取网页 解析数据 保存数据三个步骤 程序具有通用性 只需获取目标网站
  • 大一下第一场

    大一下第一场比赛 太菜了 总结 小错误太多 应该提高严密性 long long scanf lld 有 gt lt 不要忘记了等于的情况 成绩那题 我默认把它看成了小数 其实还有整数的情况 一题有一些小细节没注意导致花了很多时间 诶 长寿的
  • 02-kafka集群搭建

    文章目录 0 服务器准备 1 zookeeper集群 1 1 下载 1 2 拷贝 1 3 配置变量 1 4 配置 1 5 启动 2 kafka 2 1 下载 2 2 拷贝到服务器 2 2 配置文件 2 3 启动 3 使用密码 3 1 配置文