kafka简介和使用

2023-11-11

1.       kafka介绍

1.1.主要功能

根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能:

  1:It lets you publish and subscribe to streams of records.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因

  2:It lets you store streams of records in a fault-tolerant way.以容错的方式记录消息流,kafka以文件的方式来存储消息流

  3:It lets you process streams of records as they occur.可以再消息发布的时候进行处理

1.2. 使用场景

1:Building real-time streaming data pipelines that reliably get data between systems or applications.在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能

2:Building real-time streaming applications that transform or react to the streams of data。构建实时的流数据处理程序来变换或处理数据流,数据处理功能

 

1.3. 详细介绍

Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制

 1.3.1 消息传输流程

 

Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。

Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息

Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

从上图中就可以看出同一个Topic下的消费者和生产者的数量并不是对应的。

1.3.2 kafka服务器消息存储策略

 

谈到kafka的存储,就不得不提到分区,即partitions,创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。

 

  在每个分区中,消息以顺序存储,最晚接收的的消息会最后被消费。

  1.3.3 与生产者的交互

 

    生产者在向kafka集群发送消息的时候,可以通过指定分区来发送到指定的分区中

               也可以通过指定均衡策略来将消息发送到不同的分区中

    如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的分区中

  1.3.4  与消费者的交互

  

 在消费者消费消息时,kafka使用offset来记录当前消费的位置。在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的的消费的记录位置offset各不项目,不互相干扰。对于一个group而言,消费者的数量不应该多余分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费。因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。

2.       Kafka安装与使用

2.1.       下载

  你可以在kafka官网 http://kafka.apache.org/downloads下载到最新的kafka安装包,选择下载二进制版本的tgz文件,根据网络状态可能需要fq,这里我们选择的版本是0.11.0.1,目前的最新版

2.2.       安装

  Kafka是使用scala编写的运行与jvm虚拟机上的程序,虽然也可以在windows上使用,但是kafka基本上是运行在linux服务器上,因此我们这里也使用linux来开始今天的实战。

  首先确保你的机器上安装了jdk,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用

  说是安装,如果只需要进行最简单的尝试的话我们只需要解压到任意目录即可,这里我们将kafka压缩包解压到/home目录

2.3.       配置

  在kafka解压目录下下有一个config的文件夹,里面放置的是我们的配置文件

  consumer.properites 消费者配置,这个配置文件用于配置于2.5节中开启的消费者,此处我们使用默认的即可

  producer.properties 生产者配置,这个配置文件用于配置于2.5节中开启的生产者,此处我们使用默认的即可

  server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,目前仅介绍几个最基础的配置

  1. broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可
  2. listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,例如:listeners=PLAINTEXT:// 192.168.180.128:9092。并确保服务器的9092端口能够访问
  3. zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,使用默认配置即可zookeeper.connect=localhost:2181

2.4.       运行

  1. 启动zookeeper

cd进入kafka解压目录,输入

bin/zookeeper-server-start.sh config/zookeeper.properties

启动zookeeper成功后会看到如下的输出

 2.启动kafka

cd进入kafka解压目录,输入

bin/kafka-server-start.sh config/server.properties

启动kafka成功后会看到如下的输出

 

2.5.       第一个消息

    1 . 创建一个topic

    Kafka通过topic对同一类的数据进行管理,同一类的数据使用同一个topic可以在处理数据时更加的便捷

    在kafka解压目录打开终端,输入 

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

 创建一个名为test的topic

 

    创建topic后可以通过输入

bin/kafka-topics.sh --list --zookeeper localhost:2181

 

来查看已经创建的topic,

查看topic详细信息

​​​​​​​bin/kafka-topics.sh --describe --zookeeper localhost:2182 --topic test

2. 创建一个消息消费者

在kafka解压目录打开终端,输入   

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

 可以创建一个用于消费topic为test的消费者

 

 消费者创建完成之后,因为还没有发送任何数据,因此这里在执行后没有打印出任何数据

         不过别着急,不要关闭这个终端,打开一个新的终端,接下来我们创建第一个消息生产者

 3  创建一个消息生产者

    在kafka解压目录打开一个新的终端,输入

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    在执行完毕后会进入的编辑器页面

在发送完消息之后,可以回到我们的消息消费者终端中,可以看到,终端中已经打印出了我们刚才发送的消息

 

问题记录:

场景:机器A(192.168.0.226  server1)往机器B(192.168.0.166 server2)定时吐数据。A吐数据失败显示如下

 

解决方法:

1、更改配置文件server.properties(192.168.0.166上的) 

 

配置文件改为listeners=PLAINTEXT://192.168.0.166:9092 或者listeners=PLAINTEXT://server2:9092

如果使用的是/kafka-console-producer.sh --broker-list server2:9092 --topic test

在A机器 /etc/hosts中加入一条 192.168.0.166 server2 

 

问题场景:

一台主机A地址为192.168.0.178内doker安装kafka并将9092端口映射为39092,从另一台主机B读取数据报如下错误

解决:

telnet 端口是通的,并且在主机A上执行相同命令可以取到数据。排除网络原因,检查kafka配置

将advertised.listeners 配置项改为192.168.0.178:39092后测试成功拉取到数据。

原因是advertised.listeners是暴露给外部的listeners,如果没有设置,会用listeners。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka简介和使用 的相关文章

  • 《算法图解》总结第 6 章:广度优先搜索

    仅用于记录学习 欢迎批评指正 大神勿喷 系列文章目录 算法图解 总结第 1 章 二分查找 大O表示法 算法图解 总结第 2 章 数组和链表 选择排序 算法图解 总结第 3 章 while循环 递归 栈 算法图解 总结第 4 章 分而治之 快
  • LeetCode 102. 二叉树的层序遍历BFS

    LeetCode 102 二叉树的层序遍历BFS 给你二叉树的根节点 root 返回其节点值的 层序遍历 即逐层地 从左到右访问所有节点 示例 1 输入 root 3 9 20 null null 15 7 输出 3 9 20 15 7 示
  • Linux 下搭建 Kafka 环境

    安装步骤 准备软件目录 mkdir datalake 上传之前下载好的安装包到 datalake 目录下 jdk 8u181 linux x64 gz kafka 2 11 2 1 0 tgz zookeeper 3 4 5 tar gz
  • C++编写优先队列打印任务

    打印机的打印队列中 每一个打印任务都有一个优先级 为1 9的一个整数 9的优先级最高 1的优先级最低 打印按如下方法进行 1 取出打印队列中队首的打印任务J 2 如果打印队列中存在优先级高于J的打印任务 则将J移动到打印队列的队尾 否则 打
  • 第十四章 kafka专题之日志数据删除策略

    日志数据清理 为了控制磁盘的容量 需要对过去的消息进行清理 1 内部定时任务检测删除日志 默认是5分钟 2 日志清理参数配置 支持配置策略对数据进行清理 以segment为基本单位进行定期清理 当前正在使用的segment不会被清理 启用c
  • Java语言通过三种方法来实现队列

    队列 关于作者 作者介绍 博客主页 作者主页 简介 JAVA领域优质创作者 一名在校大三学生 在校期间参加各种省赛 国赛 斩获一系列荣誉 关注我 关注我学习资料 文档下载统统都有 每日定时更新文章 励志做一名JAVA资深程序猿 文章目录 队
  • 大数据技术之Kafka——Kafka入门

    目录 一 概述 1 1 为什么要有Kafka 1 2 定义 1 3 消息队列 1 消息队列的应用场景 2 消息队列的两种模式 1 4 基础架构 二 Producer生产者 2 1 生产者消息发送流程 2 1 1 发送原理 2 2 异步发送A
  • Kafka 顺序消费方案

    Kafka 顺序消费方案 前言 1 问题引入 2 解决思路 3 实现方案 前言 本文针对解决Kafka不同Topic之间存在一定的数据关联时的顺序消费问题 如存在Topic insert和Topic update分别是对数据的插入和更新 当
  • 黑马头条 热点文章实时计算、kafkaStream

    热点文章 实时计算 1 今日内容 1 1 定时计算与实时计算 1 2 今日内容 kafkaStream 什么是流式计算 kafkaStream概述 kafkaStream入门案例 Springboot集成kafkaStream 实时计算 用
  • flink 1.4版本flink table方式消费kafka写入hive方式踩坑

    最近在搞flink 搞了一个当前比较新的版本试了一下 当时运行了很长时间 hdfs里面查询有文件 但是hive里面查询这个表为空 后面用了很多种方式 一些是说自己去刷新hive表 如下 第一种方式刷新 alter table t kafka
  • STL deque 源码——deque特点、实现框架、源码分段剖析、常用函数总结(上)

    一 deque的一些特点 支持随机访问 即支持 以及at 但是性能没有vector好 可以在内部进行插入和删除操作 但性能不及list deque 两端 都能够快速插入和删除元素 而vector只能在尾端进行 deque的元素存取和迭代器操
  • windows python kafka 初级使用

    今天花了点时间在这个kafka上 因为我们工作中也用到了kafka 我这边对于kafka的理解是能用或者知道基本原理就行 实现在自己的windows环境搭建一次kafka 然后使用python进行数据的生产和消费 如果之后工作中对于kafk
  • kafka配置内外网访问

    listeners 学名叫监听器 其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务 advertised listeners 和 listeners 相比多了个 advertised Advertised 的
  • explain查看sql语句执行计划

    explain sql 执行结果字段描述 id select唯一标识 select type select类型 table 表名称 type 连接类型 possible keys 可能的索引选择 key 实际用到的索引 key len 实际
  • kafka(三)重平衡

    历史文章 kafka 一 kafka的基础与常用配置 文章目录 一 kafka消费者组 二 重平衡 Rebalance 2 1 重平衡触发条件 2 2 重平衡策略 2 2 1 Range 平均分配 2 2 2 RoundRobin 轮询分配
  • 【Docker安装部署Kafka+Zookeeper详细教程】

    Docker安装部署Kafka Zookeeper Docker拉取镜像 Docker拉取zookeeper的镜像 docker pull zookeeper Docker拉取kafka的镜像 docker pull wurstmeiste
  • 浏览器请求队列机制-请求为什么会阻塞

    前言 最近遇到一个问题 我1个站点链接2个后端服务 但1个后端服务有问题 导致访问超时 但请求接口都是分开的 自认为一个服务站点请求超时 不会影响到另外一个请求的 但不是 全部请求都发不出去 为什么呢 是不是浏览器有请求机制管理 正常情况前
  • Kafka 监控系统Eagle 使用教程 V1.4.0

    1 下载安装zookeeper 2 下载安装kafka 3 下载安装kafka eagle http download kafka eagle org tar zvxf kafka eagle bin 1 4 0 tar gz 4 配置JA
  • kafka的新API 得到最新一条数据

    业务的需要 需要得到最新的一条消息从kafka中 但是发现ConsumerRecords 这个对象并没有 get index 这种方式的获取并且只能 iterator 或者增强for 循环这种方式来循环 记录 但是有一个count 可以得到
  • 阿里技术官亲笔力作:Kafka限量笔记,一本书助你掌握Kafka的精髓

    前言 分布式 堪称程序员江湖中的一把利器 无论面试还是职场 皆是不可或缺的技能 而Kafka 这款分布式发布订阅消息队列的璀璨明珠 其魅力之强大 无与伦比 对于Kafka的奥秘 我们仍需继续探索 要论对Kafka的熟悉程度 恐怕阿里的大佬们

随机推荐

  • S7协议抓包分析(附pcap数据包)

    一 S7协议概述 1 S7协议简介 S7comm S7 通信 是西门子专有协议 可在西门子 S7 300 400 系列的可编程逻辑控制器 PLC 之间运行 它用于 PLC 编程 PLC 之间的数据交换 从 SCADA 监控和数据采集 系统访
  • 算法:单圈绝对值编码器处理成多圈的

    硬件描述 单圈编码器的分辨率是4096 功能描述 将单圈数据处理处理成多圈数据 起始圈数是1000圈 long GetCurrentAbsTotalValue long lValue long m absEncTotal 计算编码器总时间片
  • STM32与USB3300共同实现USB OTG HS的CDC串口通信速度测试

    项目场景 STM32和上位机传统通信方式就是串口 IIC SPI等 IIC和SPI一般不常用 串口是用的最多的通信方式 然而串口一般用于输出调试信息这种对传输速度没要求的场景 那种大容量数据快速传输的场景 串口显得捉襟见肘 STM32自带U
  • Siebel是什么意思

    简介 Siebel是电子商务软件的突出供应商 其客户关系管理 CRM 企业资源管理 ERM 以及合作关系管理 PRM 应用设计用于实现企业这些方面的自动化以及允许企业在互联网和零售或电话中心网络等其它渠道来执行和调节相关任务 Sieble的
  • Unity中的Animator动画详解

    Unity中的Animator动画详解 Animator动画导入 Animator动画详解 动画类型选择 Rig面板属性 Mode面板属性 Animation面板属性 动画片段 控制使用 Animator动画 创建动画控制器 添加需要播放的
  • 疑似APT组织响尾蛇的JavaScript脚本调试分析

    APT组织响尾蛇JavaScript脚本调试分析 样本描述 样本分析 投递手法 HTA JS代码 JavaScript调试方式 IE 打印参数 代码逻辑 样本描述 响尾蛇投递与巴基斯坦外交政策有关的LNK文件 LNK文件不携带主要的恶意代码
  • 认识一下以太坊、EOS和Hyperledger等不同的区块链

    不同的区块链智能合约和区块链技术现在风靡一时 越来越多的人出于某种原因试图进入这个神奇的世界 如果你是这项技术的新手并正在寻找基于区块链的开发平台的快速入门 那么本指南非常适合你 我们将重点关注和比较的平台是 Ethereum EOS Hy
  • eclipse上配置JavaFX完整教程

    1 选择菜单栏Help中的Install New Software 2 点击Add添加安装JavaFx环境 name e fx clipse Location http download eclipse org efxclipse upda
  • QT4、5、6各版本之间的特点和选择

    文章目录 0 引入 1 稳定版本 2 各版本冷知识 持续补充 3 5 0系列主要版本特性 4 建议 0 引入 QT用的最多的是QT5系列 qt6系列目前虽然是一个大版本 增加了更多的系列 但是不稳定且要求win10以及以上版本 1 稳定版本
  • 图像处理系列——直方图之灰度直方图(Image Histogram)

    目录 0 前言 1 理论知识 2 数学原理 3 案例分析 4 代码实现
  • 在CSS之中实现

  • C语言基础练习题

    文章目录 一 初始C语言 题目1 7 前言 最近做的一些C语言的基础题目 可能有错 最近找忙着找工作 内容 一 依次输入10个数 求出之中的最大数 int main int a i t i 2 scanf d a 先输入第一个数的值 t a
  • 论文格式检测网站经验总结第一弹

    这是我的第一个实战项目 前前后后开发了大致有两个多月的时间 在开发过程中我担任的是整个项目的后端部分 只有我一个人 采用的编程语言是Python 后端框架为Django 我总结了主要的知识点如下 Django的基本使用 视图函数 路由 se
  • SVG基础教程(超级详细)

    一 内置图形 rect 矩形 circle 圆 ellipse 椭圆 line 直线 polyline 折线 polygon 多边形 path 路径 二 内置图形的html属性或 css样式 fill 填充颜色 fill opacity 填
  • Linux网络编程基础

    Linux网络编程基础 1 协议的概念 什么是协议 典型协议 网络程序设计模式 分层模型 TCP IP四层模型 实际开发中常用模型 通信过程 协议的概念 从应用的角度出发 协议可理解为 规则 是数据传输和数据的解释的规则 假设 A B双方欲
  • jenkins升级

    jenkins升级 最关心的问题莫过于其中的job保存住 新版本中启动后可以直接使用 答案是可以的 以centos为例 不管是war部署到tomcat下面启动 还是直接通过java jar 方式启动 默认初始化目录都在 root jenki
  • 将训练好的模型应用——onnxruntime、TensorRT安装

    本文可作为ubuntu20 04 NVIDIA 3060配置CUDA cuDNN anaconda pytorch过程 20230226 的后续 onnxruntime安装 先在相应的aconoda环境进行python 然后输入import
  • Java 反射详解和使用

    目录 1 反射的概述 2 Java反射常用API 3 反射的应用 4 反射的优缺点 5 代码实现 总结 1 反射的概述 反射 Reflection 机制是Java语言特性之一 是Java被视为动态 或准动态 语言的一个关键特性 JAVA反射
  • 互联网安全架构

    web安全架构 上 开始之前这们说一下 web网站其实防御也相当重要 不管是服务器防御 后台数据防御 数据库防御都是必须滴 那我们说说常见的几种 后续再给大家分享 api接口安全性设计 黑名单白名单 以及防御DDOS XSS攻击 SQL注入
  • kafka简介和使用

    1 kafka介绍 1 1 主要功能 根据官网的介绍 ApacheKafka 是一个分布式流媒体平台 它主要有3种功能 1 It lets you publish and subscribe to streams of records 发布