Kafka基础—3、Kafka 消费者API

2023-12-19

一、Kafka消费者API

1、消息消费

当我们谈论 Kafka 消费者 API 中的消息消费时,我们指的是消费者如何从 Kafka 主题中拉取消息,并对这些消息进行处理的过程。

消费者是 Kafka 中的消息接收端,它从指定的主题中获取消息并执行相应的处理逻辑。

1.1 消费者组(Consumer Group)

Kafka 中的消费者可以组成一个消费者组。消费者组中的每个消费者负责消费主题的一个或多个分区。消费者组的好处在于可以实现负载均衡和水平扩展,多个消费者可以并行处理消息。

1.2 消费者订阅主题

在消费者开始消费消息之前,它需要订阅一个或多个主题。订阅主题的过程告诉 Kafka 消费者关心哪些消息。

1.3 示例

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	"github.com/Shopify/sarama"
)

func main() {
	// 创建配置
	config := sarama.NewConfig()

	// 设置消费者组名称
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	// 创建消费者
	consumer, err := sarama.NewConsumer([]string{"kafka-broker1:9092", "kafka-broker2:9092"}, config)
	if err != nil {
		log.Fatalf("Error creating consumer: %s", err.Error())
	}
	defer func() {
		if err := consumer.Close(); err != nil {
			log.Fatalf("Error closing consumer: %s", err.Error())
		}
	}()

	// 订阅主题
	topics := []string{"your_topic"}
	consumerGroup, err := sarama.NewConsumerGroupFromClient("group-1", consumer)
	if err != nil {
		log.Fatalf("Error creating consumer group: %s", err.Error())
	}

	// 开启消费者组协程
	go func() {
		for {
			err := consumerGroup.Consume(topics, &ConsumerHandler{})
			if err != nil {
				log.Printf("Error consuming messages: %s", err.Error())
			}
		}
	}()

	// 处理退出信号
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, os.Interrupt)
	<-sigchan
}

// ConsumerHandler 消费者处理器
type ConsumerHandler struct{}

// Setup 实现 sarama.ConsumerGroupHandler 接口的 Setup 方法
func (h *ConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
	// 可以在这里进行一些初始化操作
	return nil
}

// Cleanup 实现 sarama.ConsumerGroupHandler 接口的 Cleanup 方法
func (h *ConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
	// 可以在这里进行一些清理操作
	return nil
}

// ConsumeClaim 实现 sarama.ConsumerGroupHandler 接口的 ConsumeClaim 方法
func (h *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		// 在这里处理消费的消息
		fmt.Printf("Received message: Topic=%s, Partition=%d, Offset=%d, Key=%s, Value=%s\n",
			message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))

		// 标记消息为已处理
		session.MarkMessage(message, "")
	}

	return nil
}

1.4 解释代码

在这个示例中,我们首先创建了一个消费者配置 sarama.Config 。其中,我们设置了消费者组的重新平衡策略为 BalanceStrategyRange ,表示当消费者组中的消费者发生变化时,重新分配分区的策略是按照分区范围进行平衡。同时,设置了初始偏移量为最早的消息。

然后,我们创建了消费者 sarama.NewConsumer 和消费者组 sarama.NewConsumerGroupFromClient ,并订阅了一个或多个主题。

消费者组使用 Consume 方法从主题中拉取消息,然后调用 ConsumerHandler ConsumeClaim 方法处理消息。在这个方法中,你可以自定义消息的处理逻辑。在示例中,我们简单地打印了消息的一些信息。

最后,我们通过捕捉中断信号来优雅地关闭消费者。在 ConsumerHandler 中的 Setup Cleanup 方法中,你可以进行一些消费者组的初始化和清理操作。

2、消费者偏移量(Offset)

Kafka 消费者 API 中的消费者偏移量(Offset)是指消费者在一个特定分区中的下一个要读取的消息的位置。

消费者需要跟踪每个分区的偏移量,以确保它可以准确地从上次停止的位置继续消费消息,而不会重复消费或错过消息。

2.1 消费者偏移量的重要性

消费者偏移量是非常重要的,因为它决定了消费者应该从分区的哪个位置开始读取消息。如果消费者偏移量设置不正确,可能会导致消息的重复消费或者错过消息。

2.2 示例

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	"github.com/Shopify/sarama"
)

func main() {
	// 创建配置
	config := sarama.NewConfig()

	// 创建消费者
	consumer, err := sarama.NewConsumer([]string{"kafka-broker1:9092", "kafka-broker2:9092"}, config)
	if err != nil {
		log.Fatalf("Error creating consumer: %s", err.Error())
	}
	defer func() {
		if err := consumer.Close(); err != nil {
			log.Fatalf("Error closing consumer: %s", err.Error())
		}
	}()

	// 订阅主题
	topic := "your_topic"
	partition := int32(0) // 假设要消费的分区号
	offset := int64(0)     // 从分区的起始位置开始消费

	// 获取分区的最新偏移量
	newestOffset, err := consumer.GetOffset(topic, partition, sarama.OffsetNewest)
	if err != nil {
		log.Fatalf("Error getting newest offset: %s", err.Error())
	}

	// 如果要从最新位置开始消费,可以将 offset 设置为 newestOffset
	// 如果要从特定的偏移量开始消费,可以将 offset 设置为相应的值

	// 创建分区消费者
	partitionConsumer, err := consumer.ConsumePartition(topic, partition, offset)
	if err != nil {
		log.Fatalf("Error creating partition consumer: %s", err.Error())
	}
	defer func() {
		if err := partitionConsumer.Close(); err != nil {
			log.Fatalf("Error closing partition consumer: %s", err.Error())
		}
	}()

	// 消费消息
	for message := range partitionConsumer.Messages() {
		// 在这里处理消费的消息
		fmt.Printf("Received message: Topic=%s, Partition=%d, Offset=%d, Key=%s, Value=%s\n",
			message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))
	}

	// 处理退出信号
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, os.Interrupt)
	<-sigchan
}

2.3 解释代码

在这个示例中,我们首先创建了一个消费者 sarama.NewConsumer ,然后订阅了一个特定的主题。接着,我们获取了指定分区的最新偏移量 newestOffset ,这个偏移量表示了分区中最新消息的位置。

然后,我们创建了分区消费者 consumer.ConsumePartition ,并指定了要从哪个偏移量开始消费消息。在示例中,我们将偏移量设置为 0,表示从分区的起始位置开始消费。你也可以将偏移量设置为 newestOffset ,表示从最新位置开始消费。

最后,我们通过消费者的 Messages 方法来获取分区中的消息,并处理这些消息。在实际应用中,你需要根据业务需求来处理消费的消息,比如进行数据处理、存储或者其他操作。

3、消费者配置

Kafka 消费者 API 中的消费者配置是指消费者在创建时可以设置的一系列参数,这些参数可以影响消费者的行为,包括消费者组的配置、偏移量的管理、消息处理等。

3.1 消费者配置参数

1. 消费者组配置
  • GroupID :指定消费者所属的消费者组的唯一标识符。
  • Rebalance.Strategy :指定消费者组的重新平衡策略,用于在消费者加入或离开时重新分配分区。
2. 偏移量管理
  • Offsets.Initial :指定消费者在初次订阅主题时,如果没有初始偏移量的情况下应该从哪里开始消费消息,可以选择最早的消息或者最新的消息。
  • Offsets.AutoCommit.Enable :指定是否开启自动提交偏移量的功能。
  • Offsets.AutoCommit.Interval :指定自动提交偏移量的时间间隔。
3. 消息处理
  • ChannelBufferSize :指定消费者内部通道的缓冲区大小,影响消费者处理消息的并发能力。
  • MaxProcessingTime :指定消费者处理消息的最大时间,超过这个时间将被认为是处理超时。

3.2 示例

package main

import (
	"log"

	"github.com/Shopify/sarama"
)

func main() {
	// 创建配置
	config := sarama.NewConfig()

	// 设置消费者组名称
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	// 开启自动提交偏移量
	config.Consumer.Offsets.AutoCommit.Enable = true
	config.Consumer.Offsets.AutoCommit.Interval = 1

	// 设置消费者内部通道的缓冲区大小
	config.ChannelBufferSize = 256

	// 创建消费者
	consumer, err := sarama.NewConsumer([]string{"kafka-broker1:9092", "kafka-broker2:9092"}, config)
	if err != nil {
		log.Fatalf("Error creating consumer: %s", err.Error())
	}
	defer func() {
		if err := consumer.Close(); err != nil {
			log.Fatalf("Error closing consumer: %s", err.Error())
		}
	}

	// 自己的其他处理代码
}

3.3 解释代码

在这个示例中,我们创建了一个消费者配置 sarama.NewConfig() ,然后设置了一系列消费者参数。首先,我们设置了消费者组的重新平衡策略为 BalanceStrategyRange ,表示在消费者加入或离开时重新分配分区的策略是按照分区范围进行平衡。

接着,我们设置了初始偏移量为最早的消息,开启了自动提交偏移量的功能,并设置了自动提交的时间间隔为 1 秒。我们还设置了消费者内部通道的缓冲区大小为 256,以影响消费者处理消息的并发能力。

4、分区分配策略

Kafka 消费者 API 中的分区分配策略是指在消费者组中的消费者加入或离开时,Kafka 如何重新分配分区给消费者的策略。

这个策略决定了每个消费者将负责消费哪些分区,以实现负载均衡和高可用性。

4.1 分区分配策略

Kafka 提供了几种分区分配策略,其中一些常见的策略包括:

  1. Range 分配策略 :这种策略会尽量让每个消费者负责一系列连续的分区,以实现负载均衡。

  2. RoundRobin 分配策略 :这种策略会按照轮询的方式将分区分配给每个消费者,以实现均匀分配。

  3. Sticky 分配策略 :这种策略会尽量让每个消费者在重新分配分区时保持之前分配的分区,以最大程度地减少重新分配的影响。

4.2 示例

package main

import (
	"log"
	"os"
	"os/signal"

	"github.com/Shopify/sarama"
)

func main() {
	// 创建配置
	config := sarama.NewConfig()

	// 设置消费者组名称
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange

	// 创建消费者
	consumer, err := sarama.NewConsumerGroup([]string{"kafka-broker1:9092", "kafka-broker2:9092"}, "group-1", config)
	if err != nil {
		log.Fatalf("Error creating consumer group: %s", err.Error())
	}
	defer func() {
		if err := consumer.Close(); err != nil {
			log.Fatalf("Error closing consumer group: %s", err.Error())
		}
	}()

	// 订阅主题
	topics := []string{"your_topic"}

	// 开启消费者组协程
	go func() {
		for {
			err := consumer.Consume(topics, &ConsumerHandler{})
			if err != nil {
				log.Printf("Error consuming messages: %s", err.Error())
			}
		}
	}()

	// 处理退出信号
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, os.Interrupt)
	<-sigchan
}

// ConsumerHandler 消费者处理器
type ConsumerHandler struct{}

// 实现 sarama.ConsumerGroupHandler 接口的方法...

4.3 解释代码

在这个示例中,我们创建了一个消费者配置 sarama.NewConfig() ,然后设置了消费者组的重新平衡策略为 BalanceStrategyRange ,表示在消费者加入或离开时重新分配分区的策略是按照分区范围进行平衡。

然后,我们创建了消费者组 sarama.NewConsumerGroup ,并订阅了一个或多个主题。在示例中,我们创建了一个消费者组协程,用于处理消费者组的消息消费。

5、反序列化(Deserializer)

Kafka 消费者 API 中的反序列化(Deserializer)是指将从 Kafka 消费的消息的字节流转换为应用程序能够理解的数据格式的过程。

在消费者端,你需要将从 Kafka 中获取的原始消息数据转换为你的应用程序所需的对象或数据类型。

5.1 反序列化的重要性

Kafka 中的消息通常是以字节流的形式存在的,而在应用程序中,我们通常希望能够使用更高级别的数据结构来表示这些消息,比如结构体、JSON 对象等。因此,反序列化是将原始字节流转换为这些数据结构的过程,使得应用程序能够更方便地处理和理解消息内容。

5.2 示例

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"

	"github.com/Shopify/sarama"
)

// Message 结构体用于存储反序列化后的消息数据
type Message struct {
	ID   int    `json:"id"`
	Data string `json:"data"`
}

func main() {
	// 创建配置
	config := sarama.NewConfig()

	// 创建消费者
	consumer, err := sarama.NewConsumer([]string{"kafka-broker1:9092", "kafka-broker2:9092"}, config)
	if err != nil {
		log.Fatalf("Error creating consumer: %s", err.Error())
	}
	defer func() {
		if err := consumer.Close(); err != nil {
			log.Fatalf("Error closing consumer: %s", err.Error())
		}
	}()

	// 订阅主题
	topic := "your_topic"
	partition := int32(0)
	offset := int64(0)

	// 创建分区消费者
	partitionConsumer, err := consumer.ConsumePartition(topic, partition, offset)
	if err != nil {
		log.Fatalf("Error creating partition consumer: %s", err.Error())
	}
	defer func() {
		if err := partitionConsumer.Close(); err != nil {
			log.Fatalf("Error closing partition consumer: %s", err.Error())
		}
	}()

	// 消费消息
	for message := range partitionConsumer.Messages() {
		// 反序列化消息
		deserializedMessage, err := deserializeMessage(message.Value)
		if err != nil {
			log.Printf("Error deserializing message: %s", err.Error())
			continue
		}

		// 在这里处理反序列化后的消息
		fmt.Printf("Received message: ID=%d, Data=%s\n", deserializedMessage.ID, deserializedMessage.Data)
	}

	// 处理退出信号
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, os.Interrupt)
	<-sigchan
}

// deserializeMessage 函数用于反序列化消息
func deserializeMessage(data []byte) (*Message, error) {
	var message Message
	err := json.Unmarshal(data, &message)
	if err != nil {
		return nil, err
	}
	return &message, nil
}

5.3 解释代码

在这个示例中,我们定义了一个简单的 Message 结构体,用于存储反序列化后的消息数据。然后,我们创建了一个消费者,订阅了一个主题,并创建了分区消费者。

在消费消息的循环中,我们调用了 deserializeMessage 函数,该函数使用 Go 的 encoding/json 包将消息的字节流反序列化为 Message 结构体。这个函数返回了反序列化后的消息对象,我们可以在处理消息的地方直接使用这个对象了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka基础—3、Kafka 消费者API 的相关文章

随机推荐

  • AttributeError: ‘module‘ object has no attribute ‘RAW_OPT‘解决方案

    大家好 我是爱编程的喵喵 双985硕士毕业 现担任全栈工程师一职 热衷于将数据思维应用到工作与生活中 从事机器学习以及相关的前后端开发工作 曾在阿里云 科大讯飞 CCF等比赛获得多次Top名次 现为CSDN博客专家 人工智能领域优质创作者
  • P2P应用

    目录 一 P2P的简介 二 P2P的工作方式 1 具有集中目录服务器的P2P工作方式 2 具有全分布式结构的P2P文件共享程序 一 P2P的简介 P2P 对等连接 是指两台主机在通信时 并不区分哪一个是服务请求方和哪一个是服务提供方 只要两
  • 11.1 Linux 设备树

    一 什么是设备树 设备树 Device Tree 描述设备树的文件叫做 DTS DeviceTree Source 这个 DTS 文件采用树形结构描述板级设备 也就是开发板上的设备信息 树的主干就是系统总线 IIC 控制器 GPIO 控制器
  • matlab实现逻辑算法优化最小二乘支持向量机ILA-lssvm实现数据回归预测

    作者简介 热爱科研的Matlab仿真开发者 修心和技术同步精进 代码获取 论文复现及科研仿真合作可私信 个人主页 Matlab科研工作室 个人信条 格物致知 更多Matlab完整代码及仿真定制内容点击 智能优化算法 神经网络预测 雷达通信
  • 其他配置相关安装

    consul安装和配置 docker run d p 8500 8500 p 8300 8300 p 8301 8301 p 8302 8302 p 8600 8600 udp consul consul agent dev client
  • Programming Abstractions in C阅读笔记:p235-p241

    Programming Abstractions in C 学习第66天 p235 p241总结 一 技术总结 1 backtracking algorithm 回溯算法 1 定义 p236 For many real world prob
  • I.MX RT1170双核学习(4):FreeRTOS之消息缓冲区(Message Buffer)双核通信详解

    对于RT1170来说 它有两个内核 那两个内核如何通信呢 我们可以通过 MU消息单元详解 来实现这些功能 但它一次只能传输32位的数据 我们知道CM7和CM4有一些公共的内存可以访问 那我们可不可以借助这些公共的内存来实现数据的交互呢 答案
  • Guitar Pro8.1最新2024中文免激活版下载(附教程)

    Guitar Pro 8 是一款功能强大的指法阅读器和编辑器 它允许您编辑吉他 贝斯和尤克里里的乐谱和指法谱 并为鼓或钢琴创建背景音轨 轻松创建 播放和共享您的标签 快速的进行乐谱播放并进行练习 也可以进行编辑操作 允许所有音乐家阅读 编写
  • PHP使用symfony/process来实现多进程请求url或执行多个php文件

    1 什么是symfony process Symfony Process是Symfony框架中的一个组件 用于处理和管理子进程 它提供了一个简单易用的API 可以执行外部命令 并与子进程进行交互 Symfony Process可以执行各种操
  • Docker与微服务:构建和部署微服务架构的完整指南

    微服务架构已经成为现代应用开发的主要范式之一 而Docker容器技术则为微服务的构建 部署和管理提供了理想的解决方案 本文将深入探讨如何使用Docker构建和部署微服务架构 提供更多示例代码和细致的指南 以帮助大家更全面地理解和运用这些关键
  • [杂谈] 乙方甲方交互的另一个例子

    之前有讨论 电信公司与设备供应商之间的一个甲乙关系 杂谈 甲方乙方的一个交互例子 https mzhan017 blog csdn net article details 135004416 其实作为供应商自己来说 其内部也有自己的乙方 比
  • 基于springboot+vue的露营地管理系统

    博主介绍 全网个人号和企业号 粉丝40W 每年辅导几千名大学生较好的完成毕业设计 专注计算机软件领域的项目研发 不断的进行新技术的项目实战 热门专栏 推荐订阅 订阅收藏起来 防止下次找不到 千套JAVA项目实战持续更新中 百套小程序APP项
  • 鉴赏 tcp vegas

    优秀的 vegas 之后 再鉴赏一下迄今唯一像那么回事的拥塞控制算法 vegas 从下图可看出所有的 对 所有的 aimd 都毫无伸缩性 z 吞吐 x rtt y 丢包率 由 buffer size 直接决定 一下就可看出 rtt 和 bu
  • 华为OD机试 Java 【最大载货量】

    描述 在火车站旁的货运站 小明负责调度2K辆中转车 其中K辆用于干货 K辆用于湿货 每批到站的货物来自不同的供货商 需要按照顺序装入中转车 注意 一个供货商的货物只能装在一辆车上 不能分开 但是 一辆车可以放多个供货商的货物 问题是 要让所
  • 【lssvm回归预测】基于逻辑算法优化最小二乘支持向量机ILA-lssvm实现PM2.5浓度预测附matlab代码

    作者简介 热爱科研的Matlab仿真开发者 修心和技术同步精进 代码获取 论文复现及科研仿真合作可私信 个人主页 Matlab科研工作室 个人信条 格物致知 更多Matlab完整代码及仿真定制内容点击 智能优化算法 神经网络预测 雷达通信
  • 《系统架构设计师教程(第2版)》第2章-计算机系统基础知识-07-系统性能

    文章目录 1 性能指标 1 1 计算机的性能指标 1 2 路由器的性能指标 了解即可 1 3 交换机的性能指标 了解即可 1 4 网络的性能指标 1 5 操作系统的性能指标 1 6 数据库管理系统的性能指标
  • AttributeError: module ‘tarfile‘ has no attribute ‘LinkOutsideDestinationError‘解决方案

    大家好 我是爱编程的喵喵 双985硕士毕业 现担任全栈工程师一职 热衷于将数据思维应用到工作与生活中 从事机器学习以及相关的前后端开发工作 曾在阿里云 科大讯飞 CCF等比赛获得多次Top名次 现为CSDN博客专家 人工智能领域优质创作者
  • WPF用ScottPlot动态绘制图像

    文章目录 单击移动 多线程 scott系列 绘图初步 多个图像 单击移动 在了解ScottPlot的绘图逻辑之后 在WPF中生成动态图像简直轻而易举 只需不断地删除旧图而绘制新图即可 新建一个按钮 绑定下面的函数 ScatterPlot d
  • 【华为数据之道学习笔记】5-10标签设计

    标签是根据业务场景的需求 通过对目标对象 含静态 动态特 性 运用抽象 归纳 推理等算法得到的高度精练的特征标识 用于差异化管理与决策 标签由标签和标签值组成 打在目标对象上 标签由互联网领域逐步推广到其他领域 打标签的对象也由用 户 产品
  • Kafka基础—3、Kafka 消费者API

    一 Kafka消费者API 1 消息消费 当我们谈论 Kafka 消费者 API 中的消息消费时 我们指的是消费者如何从 Kafka 主题中拉取消息 并对这些消息进行处理的过程 消费者是 Kafka 中的消息接收端 它从指定的主题中获取消息