kafka基本知识

2023-11-03

kafka

消息队列是什么,解决什么样的问题,有什么常见的应用场景

MQ(message queue)消息队列是本质上是队列(先进先出的数据结构),生产者将消息放到队列上,消费者通过,消息的消费者通过拉取或者订阅推送的机制来获取消息。

解决的问题:

mq本质上是一种通信机制,它屏蔽到了底层繁琐的http或者tcp通信的细节,提供一套消息队列机制。

解耦:当一个业务需要多个模块的配合,一条消息需要多个系统的处理时,可以使用消息队列机制降低整个系统的耦合度。

异步:主任务执行结束之后,从业务通过异步的方式进行处理,降低系统的响应时间,提高用户体验。

削峰:在多线程的系统当中业务采用异步的方式处理,提高整个系统的性能,避免系统瘫痪。

消息队列的分类

当前消息的队列的分类,当前消息队列的分类大致上分为有broker的消息队列和无broker的消息队列(代表为zero MQ),有broker的队列又分为重Topic(kafka,Rocket MQ,Active MQ)的队列和轻Topic的队列(Rabbit MQ)。

kafka的基本介绍(给一个定义)

kafka是一个基于zookeeper协调的,支持分区的多副本的分布式系统

kafka的基本概念(架构)

kafka是一个基于生产者和消费者的消息队列,其中的消息队列由broker(中间件处理节点)存储和管理,生产者负责将消息发送到broker上,消费者从broker上消费消息。

broker中的各种概念

Topic

topic是消息的主题,kafka使用topic将消息分类

默认主题:

kafka中存在_consumer_offset这个topic,它用于记录所有的consumerGroup的消费偏移量,将ConsumerGroupID+Topic+partition作为key将offset作为值存储。由于这个主题可能会引起大并发问题所以kafka默认将其分为50个分区来存储,并且根据hash(consumer——groupid)来计算消息锁使用的分区。

Partition

物理上的概念,⼀个topic可以分为多个partition,每个partition内部消息是有序的 。

使用partition的的好处主要是可以将 分布式存储和并发写。

所有的消息全部存储在data/kafka-logs/topic-partition中

副本

副本顾名思义就是分区的备份。

如果存在多个broker就可以为分区设置多个备份,分区中有这么几个需要知道的概念。replicas代表当前分区的所有节点,isr代表已同步的节点是,leader和flower是相对的概念,kafka会在多个副本中选取一个副本作为leader,所有对这个分区的读写操作都在再leader上完成,并且在读写完成之后将消息同步到flower上,当leader挂掉了就会使用选举机制启动其中一个flower作为新的leader。

producer细节

基本的消息发送流程:

设定消息发送的broker地址序列化和反序列化规则之后,通过这些规则创建一个Producer用于发送消息。在发送消息的时候会在producer端创建一个32k的缓冲区,并且有一个专门的线程去缓冲区中获取消息并且发送,当消息数据满16k或者时间到达10ms时会发送数据。

发送的方式:

消息发送的方式分为同步发送和异步发送两种,同步发送时业务会暂停等待ack的回复,如果没有收到回复就会默认重新发送,如果收到ack需要查看当前ack的配置(配置有三种0代表只要收到ack不管broker的情况直接代表消息发送成功,1代表需要broker中将leader的消息同步到log中,-1则代表不但需要同步到leader中还需要同步到flower中至于同步到几个flower中由min-insync-replicas指定。这三种ack的设置性能上递减,安全性上递增。)。异步发送的方式就是在发送消息的时候不会阻塞等待,而是使用一个回调方法来后续跟踪消息的情况,这种方式容易丢失消息。

发送到的地方:

在发送的时候可以指定分区也可以使用一个key使用hash算法指定分区。

consumer细节

consumergroup的概念:多个消费者属于一个消费者组,一个消费者组中的消费者只能消费一次同一条消息。我们可以查看当前消费者组中的消费者有哪些,每个消费者的current_offset,log_end_offset,lag等

单播消息和多播消息:
单播消息值得是一条消息只能被一个消费者组中的一个消费者消费也就是一个消息只被消费一次,多播消息指的是可以创建多个消费者组这样多个消费者组中就能够有多个消费者能够消费同一条消息。

消费者和分区的关系:
消费者和分区的关系是一个一对多的关系一个消费者可以消费多个分区但是一个分区只能由一个消费者消费,所以建议消费者的数量应该小于分区的数量否则多余的消费者就只能当做备选。

如何消费:
基本的消费流程

最基本的消费流程就是指定好需要消费的broker的地址,消费组信息,序列化反序列化的配置之后创建一个消费者。然后消费者订阅主题之后消费。

指定分区消费

消费过程中可以指定分区消费,回溯消费指定偏移量消费或者指定时间点消费。

poll的流程

poll消息的流程大致是这样的我们可以根据消费者的消费能力指定每次poll的消息数量和一次长轮询时间,当poll的消息数量达到我们设置的值或者时间达到长轮询的时间之后poll1结束。在poll的过程当中消费者如果消费能力太差导致两次poll的时间超出设置的默认值就会发生rebalance,或者维持的心跳包没有按时发送给broker也会发生rebalance。

消费结束后offset提交方式

offset的提交方式分为自动提交和手动提交两种,自动提交指的是每次poll到消息之后直接提交offset,这回出现消息丢失的问题。手动提交又分为手动同步提交和自动异步提交,区别就在于是否阻塞。

如果有新的消费者组加入了怎么办

kafka的一些机制

controller机制:

根据zk中创建broker节点的时间,最快创建的一个broker会被选择作为controller,它的作用就是在leader挂掉的时候通过在isr中选择同步性能好的一个flower作为leader继续工作,还有就是在新增分区或者broker的时候在各个broker之间同步。

rebalance机制:

它讲述的是分区和消费之组中消费者的关系,当消费者没有指定分区消费的时候,或者消费者和分区的关系发生变化时会触发这个机制。它有range(根据公式来计算分配的分区个数),轮询分配以及sticky(在不改变原来分配的基础上重新分配)三种方式

HW和LEO

LEO(log_end_offset):最后消费位置。

HW指的是高水位,kafka需要维持一个高水位,就是说当所有的消息都到达高水位之后才能暴露给消费者消费,否则会出现重复消费的情况。

面试题

如何保证顺序消费

保证一个消费者,一个分区

如何保证消息不重复消费

两种方式,第一种是关闭重试机制,第二种是在消费者端设置一些幂等性的操作,比如说分布式锁,或者创建联合主键。

如何保证消息丢失

消息生产者采用同步发送的方式,1,-1.消息的消费者在消费的时候采用手动提交。

消息积压的问题

优化消费的速度,多线程,多消费者。消息转发。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka基本知识 的相关文章

  • 为什么会出现此异常 FileItemStream$ItemSkippedException?

    在 gwt Web 应用程序中 我必须发送一个文件和附加的一些参数 在服务器端 try ServletFileUpload upload new ServletFileUpload FileItemIterator iterator upl
  • Maven 2:如何将当前项目版本打包在WAR文件中?

    我正在使用 Maven 2 构建我的 Java 项目 并且正在寻找一种向用户呈现 pom xml 当前版本号的方法 例如使用 Servlet 或 JSP 据我所知 最好的方法是 Maven 将版本号作为文本文件打包到 WAR 中 这使我能够
  • 具有默认值的 Java JAX-RS 自定义参数

    假设我有这个 这只是一个示例 GET Path value address Produces application json public Response getAddress QueryParam user User user 用户是
  • Jackson - 反序列化嵌套 JSON

    我有一个 JSON 字符串 其格式如下 response execution status ready report cache hit true created on 2013 07 29 08 42 42 fact cache erro
  • Java、Oracle 中索引处缺少 IN 或 OUT 参数:: 1 错误

    您好 我使用 Netbeans 8 0 2 和 Oracle 11g Express Edition 在 JSF 2 2 中编写了一个图书馆管理系统 我有几个名为 书籍 借阅者 等的页面 以及数据库中一些名为相同名称的表 我的问题是这样的
  • Spring3/Hibernate3/TestNG:有些测试给出 LazyInitializationException,有些则没有

    前言 我在单元测试中遇到了 LazyInitializationException 的问题 而且我很难理解它 正如你从我的问题中看到的那样Spring 中的数据库会话 https stackoverflow com questions 13
  • 将过滤器添加到 Eclipse 中的 Project Explorer

    我想向 Project Explorer 添加一个新的过滤器 以向用户隐藏一些在 Eclipse RCP 应用程序中自动创建的项目 到目前为止我已经找到了两个扩展点 org eclipse ui ide resourceFilters 允许
  • 正则表达式获取字符串中的第一个数字和其他字符

    我是正则表达式的新手 想知道如何才能只获取字符串中的第一个数字 例如100 2011 10 20 14 28 55 在这种情况下 我希望它返回100 但该数字也可以更短或更长 我在想类似的事情 0 9 但它单独获取每个数字 100 2001
  • 是否有任何API可以将Microsoft Exchange服务器与Java应用程序集成以进行任务同步?

    我正在尝试将 Java Web 应用程序与 Microsoft Exchange 服务器集成以实现双向日历 即任务 同步 是否有用于此集成的 Java 开源 商业 API 谢谢 文卡特 看一眼j 交易所 http sourceforge n
  • 如何拦截 REST 端点以接收所有标头?

    我当前的代码是 Path login RequestScoped public class LoginResource GET SecurityChecked public Response getUser HeaderParam AUTH
  • 字符串池可以包含两个具有相同值的字符串吗? [复制]

    这个问题在这里已经有答案了 字符串池可以包含两个具有相同值的字符串吗 String str abc String str1 new String abc Will the second statement with new operator
  • Java Microsoft Excel API [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • Jenkins 的代码覆盖率 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 发生错误。请参阅日志文件 - eclipse juno

    每当我启动 Eclipse Juno 时 都会出现错误 发生错误 请查看日志文件 C Program Files eclipse configuration 1362989254411 log 有的网站说卸载jdk重新安装 我这样做了 但没
  • 我想在java中使用XQuery进行Xml处理

    我想用XQuery用于从 java 中的 Xml 获取数据 但我没有得到需要为此添加哪个 Jar 我在谷歌上搜索了很多 但没有得到任何有用的例子 例如我得到以下链接 https docs oracle com database 121 AD
  • 了解 Spark 中的 DAG

    问题是我有以下 DAG 我认为当需要洗牌时 火花将工作划分为不同的阶段 考虑阶段 0 和阶段 1 有些操作不需要洗牌 那么为什么 Spark 将它们分成不同的阶段呢 我认为跨分区的实际数据移动应该发生在第 2 阶段 因为这里我们需要cogr
  • 为什么我的代码会产生错误:该语句没有返回结果集[重复]

    这个问题在这里已经有答案了 我正在从 Microsoft SQL Server Studio 执行以下查询 该查询工作正常并显示结果 SELECT INTO temp table FROM md criteria join WHERE us
  • 为什么这个私人浮动字段变为零?

    我有一些奇怪的行为 我很难向自己解释 称为 textureScale 的浮点字段变为零 如果某些代码正在更改该值 则可以解释这一点 然而 我希望能够通过将其设置为 私有最终浮点 来导致构建失败 或者至少是运行时异常 那么无论更改该值都将失败
  • 无法使用 wget 在 CentOS 机器上安装 oracle jdk

    我想在CentOS上安装oracle java jdk 8 我无法安装 java jdk 因为当我尝试使用命令安装 java jdk 时 root ADARSH PROD1 wget no cookies no check certific
  • 摩尔斯电码 至 英语

    我现在的问题是让 摩尔斯电码转英语 正常工作 将英语转换为莫尔斯电码的第一部分工作正常 我知道以前已经有人问过这个问题 但我不知道我做错了什么 我知道我需要在某个地方进行拆分 但我只是不确定将其放在代码中的何处 现在 莫尔斯电码到英语的部分

随机推荐

  • CC2642 数据长度扩展(LE Data Length Extension)

    概要 数据长度扩展 LE Data Length Extension 功能允许LE控制器在连接状态下发送具有高达251字节的PDU 在连接期间的任何时刻 主从设备可以协商该PDU大小 这个和MTU不同 MTU是应用层的数据传输长度 这个是链
  • 安装centos与动态磁盘

    在动态磁盘压缩磁盘后 Linux是检测不到压缩后的磁盘空闲空间 需要用分区助手将动态磁盘变为基本磁盘
  • 【MES】工业4.0之MES系统方案

    人类自从250年前发生在英国的第一次工业革命开始 我们共经历了工业1 0 机械化 工业2 0 电气化自动化 工业3 0 信息化 之后迎来了工业4 0网络化 从工业革命的定义来看 仅仅是生产力的提升 那还只是量变 不能称为工业革命 必须有生产
  • CodeCounter

    package cn zzsxt io2 import java io BufferedReader import java io File import java io FileReader import java io IOExcept
  • 老猿学5G扫盲贴:3GPP中的5G计费架构

    专栏 Python基础教程目录 专栏 使用PyQt开发图形界面Python应用 专栏 PyQt入门学习 老猿Python博文目录 老猿学5G博文目录 一 计费逻辑架构和信息流 在32240子系列文档内定义了计费的逻辑架构和信息流 如图 上图
  • 使用HttpClient为GET/POST请求获取数据 -- 信任证书 SS连接

    使用HttpClient为GET POST请求获取数据 信任证书 SS连接 依赖部分 使用GET请求获取数据 使用POST 请求 Form 表单格式请求获取数据 使用POST 请求 Body 格式请求获取数据 公共部分 执行http请求 信
  • random 与 range

    random方法 random randint 1 5 会生成一个1 5之间的任一个整数 包括1和5 random random 会随机生成0 1的浮点数 random choice list 会从list中随机选出一个值 range方法
  • 使用Edge调试 安卓app应用或插件

    Edge调试 安卓手机 1 1 找到开发者模式 打开 USB 调试 1 2 找条数据线连上电脑 PC 打开 edge 2 1 打开 URL edge inspect devices 2 2 等待手机和 edge 响应 然后出现页面后 点击
  • SQL练习

    学生选课表的50个SQL语句 1 查询001课程比002课程成绩高的所有学生的学号 select a s id from select s id score from sc where c id 001 a select s id scor
  • python自动化办公(三十二)pyinstaller.exe打包成exe程序,运行后ModuleNotFoundError或FileNoFounderError:no such file or **

    目录 一 打包Tkinter 二 下载pyinstaller 三 pycharm终端运行pyinstaller 四 pyinstaller exe直接运行Cmd命令打包
  • Docker中web项目的部署以及访问

    1 将应用打包成demo war 2 编写Dockerfile 构建镜像 Dockerfile FROM 包含tomcat的基础镜像 COPY demo war usr local tomcat webapps COPY apple app
  • 八皇后[n皇后]问题 python 算法的理解

    八皇后 n皇后 问题表述为 在8 8格的国际象棋上摆放8个皇后 使其不能互相攻击 即任意两个皇后都不能处于同一行 同一列或同一斜线上 问有多少种摆法 答案是92种 可以看看遍历过程 方便理解 对于递归queen A cur 1 的理解 cu
  • vue根据路由隐藏侧边栏

    项目要求某模块显示侧边栏 某模块隐藏侧边栏 所有模块统一引用了一个layout组件 所以在路由里设置一个自定义属性 在layout组件里监听路由 判断自定义属性值来隐藏侧边栏 An highlighted block path equipm
  • macos配置vscode支持c++11/17标准

    目录 简介 需求 步骤 步骤1 步骤2 步骤3 步骤4 步骤5 结语 简介 Hello 非常感谢您阅读海轰的文章 倘若文中有错误的地方 欢迎您指出 昵称 海轰 标签 程序猿 C 选手 学生 简介 因C语言结识编程 随后转入计算机专业 获得过
  • Looper和Handler

    Looper用于在android线程中进行消息处理 默认情况下 一个线程并不和任何Looper绑定 当我们调用Looper prepare 时 如果当前线程还没有和任何Looper绑定 那么将创建一个Looper让它和当前线程绑定 当我们调
  • Mysql数据库简单配置

    1 将安装包下载到本地文件路径 按照自己的情况 2 配置ini文件 放在mysql安装目录 没有文件名 解决方法 3 终端切换目录到安装目录下的bin目录下 建议配置环境变量 后面直接通过命令开启服务 直接双击path也可以进入 然后点击新
  • module “**.vue“ has not default

    module vue has not default 这个问题造成的原因是因为你在vue config js中设置了happyPackMode选项 如下所示 config module rule ts use ts loader loade
  • 初识注解

    注解的英文单词 Annotation 3 有一个public修饰的 入口 4 且该public修饰的类名必须与文件名相同 5 并且一个源文件可以只有非public类 package com kuang Annotation 测试元注解 im
  • 用一个函数实现用选择法对5个整数按升序排序

    用一个函数实现用选择法对5个整数按升序排序 选择法思想 先选出5个数中最小的数 把它和score 0 交换 这样a 0 就是5个数中最小的数了 再在剩下4个数 score 1 到score 4 中选出最小的数 把它和score 1 交换 这
  • kafka基本知识

    kafka 消息队列是什么 解决什么样的问题 有什么常见的应用场景 MQ message queue 消息队列是本质上是队列 先进先出的数据结构 生产者将消息放到队列上 消费者通过 消息的消费者通过拉取或者订阅推送的机制来获取消息 解决的问