Kafka - a simple consumer demo - c++

2023-11-05

Kafka

如果有 kafka 基础的同学可以不用看前面的废话,可以从第五条 [配置] 开始看起~ 代码在第七条

前言:官网比我这标准多了~ 官网跳转,大家可以先完成quickStart部分kafka单机生产消费

一、概念简介

Kafka 是一个分布式流处理平台,常被用作消息队列中间件。具有横向扩展,容错,高速等优点。

Kafka中的概念
  • record: 由 key , value , timestamp 组成,Kafka 集群会保持所有的消息,直到它们过期,无论消息是否被消费。Kafka 的性能是和数据大小是无关的,所以可以长期保存数据。
  • producer: 生产者用于发布消息。
  • consumer: 消费者用于订阅消息。
  • consumer group: 相同的 group.id 的消费者将视为同一个消费者组。
  • topic: 消息的一种逻辑分组,用于对消息分类。相同主题的消息放在一个队列中。
  • partition: 消息的一种物理分组,一个主题被称为多个分区,每个分区就是一个顺序的、不可变的消息队列,并且可以持续添加。每个分区对应一个逻辑 log,有多个 segment 组成。
  • offset: 分区中的每个消息都有一个唯一的 id,称之为偏移量。它代表已经消费的位置。可以手动或自动提交偏移量。
  • broker: 一台 Kafka 服务器称之为一个 broker
  • replica: 副本只是一个分区的备份。副本从不读取或写入数据,它们用于防止数据丢失
  • leader: Learder 是负责给定分区的所有读取和写入的节点。每个分区都有一个服务器充当 Leader, producer 和 consumer 只跟 Leader 交互
  • follower: 跟随 Leader 指令的节点成为 Follower。如果 Leader 失败,一个 Follower 将自动成为 Leader。 Follower 作为正常的消费者,拉取消息并更新其自己的数据存储。副本中的一个角色,从 Leader 中复制数据。
  • zookeeper: Kafka 代理都是无状态的,所以使用 Zookeeper 管理集群状态。Zookeeper 用于管理和协调 Kafka 代理

二、适用场景

  • MQ - 构造实时流数据管道,它可以在系统或应用之间可靠的获取数据。
  • 流处理 - 构建实时流式应用程序,对这些流数据进行转换或者影响。

三、四个核心 API

  1. Producer API : 允许一个应用程序发布一串流式数据到一个或者多个 Kafka topic。
  2. Consumer API : 允许一个应用程序订阅一个或者多个 topic,并且对发布给他们的流式数据进行处理。
  3. Streams API : 允许一个应用程序作为一个流处理器,消费一个或者多个 topic 产生的输入流,然后生产一个输出流到一个或者多个 topic 中去,在输入输出流之间进行有效的转换。
  4. Connector API : 允许构建并运行可重用的生产者或者消费者,将 Kafka topics 连接到已存在的应用程序或者数据系统。eg:连接到一个 DB,捕捉表的所有

四、Topics 和 log

  1. topic 就是数据主题。kafka 采用多订阅者模式,一个 topic 可以拥有一个或者多个消费者订阅它的数据。
  2. 对于每一个 topic,Kafka 集群都会维持一个分区日志。每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的 commit log 文件。分区中的每一个记录都会分配一个 id 号来表示顺序,也就是偏移量 offset,offset 用来唯一的标识分区中的每一条记录。
  3. Kafka 集群保留所有发布的记录 - 无论他们是否已被消费 - 并通过一个可配置的参数 – 保留期限 – 来控制。如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。 Kafka 的性能和数据大小无关,所以长时间存储数据没有什么问题。
  4. 消费者可以采用任何顺序来消费,生产者对 topic 内容进行增加并不会影响已存在的消费者消费数据。
  5. 日志中的 partition 有以下几个用途 – 当日志大小超过了单台服务器的限制,允许日志进行扩展。每个独立的分区都必须首先与主机的文件限制,不过一个主题可能有多个分区,所有可以处理无限量的数据 – 第二,可以作为并行的单元集。

五、server.properties

############################# Server Basics #############################

# 用于服务的 broker id。如果没设置,将生成一个唯一的 broker id。为了避免 zk 生成的 id 和用户配置的 id 相冲突,生成的 id 将在 
# reserved.broker.max.id 的值的基础上加1.
broker.id=0 # kafka 核心配置之一. DEFAULT: broker.id = -1.

############################# Socket Server Settings #############################

# 服务器用于从接受网络请求并发送网络响应的线程数
num.network.threads=3

# 服务器用于请求处理的线程数,可能包括磁盘I/O
num.io.threads=8

# 服务端用来处理 socket 链接的 SO_SNDBUFF 缓冲大小(int)。如果值为-1,则使用系统默认值 102400.
socket.send.buffer.bytes=102400

# 服务端用来处理 socket 链接的 SO_RCVBUFF 缓冲大小(int)。如果值为-1,则使用系统默认值 102400.
socket.receive.buffer.bytes=102400

# socket 请求的最大大小,为了防止OOM,不能大于 Java heap 的大小
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
# 一个以逗号分隔的目录列表,用于存储日志文件。
# 如果未设置将使用 log.dir 的配置
log.dirs=/tmp/kafka-logs # kafka 的核心配置之一 (string 类型)。DEFAULT: null
# log.dir = /tmp/kafka-logs 保存日志的目录,对 log.dirs 的补充。 string类型,DEFAULT: /tmp/kafka-logs

# 每个主题的默认日志分区数。更多的分区允许更大的并行,但也会导致跨代理的更多文件!
# DEFAULT: 1 (int 类型)
num.partitions=1

# 每个数据目录,用于启动时日志恢复和关闭时刷新的线程数。
# This value is recommended to be increased for installations with data dirs located in RAID array.
# 官方翻译 - 对于数据目录位于 RAID 阵列的安装,建议增加此值。
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# 对于开发测试以外的任何事,建议使用大于1的任何值来保证可用性,例如3

# offset topic 的副本数 (设置的越大,可用性越高)。内部 topic 将创建失败,直到集群大小满足此副本要求
# DEFAULT: 3 (short 类型)
offsets.topic.replication.factor=1

# 事务 topic 的副本数 (设置的越大,可用性越高)。内部 topic 将创建失败,直到集群大小满足此要求
# DEFAULT: 3 (short 类型)
transaction.state.log.replication.factor=1

# 覆盖事务 topic 的 min.insync.replicas 配置
# 至于 min.insync.replicas 到底是什么,我现在还不理解,而且默认配置文件没有,先暂且不管吧~
# DEFAULT: 2 (int 类型)
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
# 日志刷新策略 - 默认配置文件没写就按照 kafka 的默认配置来吧~
# 消息会立即刷新到文件系统但默认情况下只有 fsync() 进行同步
# 下面是需要权衡的几点,下面的配置控制将数据刷新到磁盘:
# 1. 持久性 - 如果不使用副本,未刷新的数据可能会丢失
# 2. 延迟性 - 当有大量的数据要刷新时,非常大的刷新间隔可能会导致延迟峰值
# 3. 吞吐量 - flush通常是大开销的操作,一个小的flush间隔可能会导致过多的seek
# 在强制将数据刷新到磁盘之前要接受的消息数
# log.flush.interval.messages=10000
############################# Log Retention Policy #############################

# 以下配置控制 log segment 的处理。该策略可以设置为在一段时间后或者累积给定大小后删除段。
# 满足这些条件的任何一个,就会删除一个 segment。删除总是从日志的末尾开始。

# 日志删除的时间阈值 - 小时 为单位。 DEFAULT: 168 (int 类型)
log.retention.hours=168

# 基于大小的日志保留策略。除非剩余的段低于 log.retention.bytes,否则将从日志中修剪段。功能独立于 log.retention.hours。
#log.retention.bytes=1073741824

# 单个日志文件的最大大小
log.segment.bytes=1073741824

# 日志检查时间间隔,查看是否可以根据保留策略进行保留 - 毫秒 为单位。
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# 每一个都会对应一个zk,是一个逗号分隔的字符串。
# e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# 将可选的 chroot 字符串附加到 url 以制定所有 kafka znode 的根目录。
zookeeper.connect=localhost:2181  # kafka的核心配置之一。

# 与 ZK server 建立连接的超时时间,没有配置就使用 zookeeper.session.timeout.ms
# zookeeper.session.timeout.ms = 6000 (int 类型)
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################

# 以下配置指定 GroupCoordinator 延迟初始消费者重新平衡的时间,以毫秒为单位。
# 随着新成员加入组,重新平衡将进一步延迟 group.initial.rebalance.delay.ms 的值,最多可达 max.poll.interval.ms
# 默认值是3秒
# 这里使用0的原因是,它可以为开发和测试提供更好的开箱即用体验。但是在生产环境中,默认值3秒更为合适!
# 这有助于避免在应用程序启动期间进行不必要的且可能代价高昂的重新平衡。
group.initial.rebalance.delay.ms=0

六、关于 consumer.properties, log4j.properties, producer.properties

这里参考 kafka 官网的配置文档,默认配置都比较简单,需要什么取什么~

七、一个简单的 c++ - consumer 消费者实例

血与泪的教训:别在 mac 本地开发!clang环境需要装12G的Xcode!

# 使用 gcc 也行
yum install clang
# 有版本输出就说明安装ok
clang -v 
# 如果报错:/usr/bin/ld: cannot find -ldstdc++ 就装上
yum install -y libstdc++*

# 安装 kafka c++ 客户端
yum install librdkafka-devel

现在环境就ok了。开始编写代码。

#include <iostream>
#include "librdkafka/rdkafkacpp.h"
#include <string>
#include <cstdio>
#include <list>
using namespace std;

void msg_consume(RdKafka::Message* msg) {
        std::cout << "msg::topic_name: " << msg->topic_name().c_str() << endl;
        if (msg->err() == RdKafka::ERR_NO_ERROR) {
                std::cout << "Read msg at offset " << msg->offset() << std::endl;
                if (msg->key()) {
                        std::cout << "Key: " << *msg->key() << std::endl;
                }
                printf("%.*s\n", static_cast<int>(msg->len()), static_cast<const char*>(msg->payload()));
        } else if (msg->err() == RdKafka::ERR__TIMED_OUT) {
                printf("error[%s]\n", "ERROR__TIMED_OUT");
        } else {
                printf("error[%s]\n", "other");
        }
}

int main() {
        std::string err_string;
        int32_t partition = RdKafka::Topic::PARTITION_UA;
  			// 分区 0
        partition = 0;

  			// 代理地址,也就是 kafka 运行端口
        std::string broker_list = "localhost:9092";
        RdKafka::Conf* global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
        RdKafka::Conf* topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

        int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
  			// 这里配置字段都是约定好的,配置文件我会贴在附录里
        global_conf->set("metadata.broker.list", broker_list, err_string);

  			// 创建消费者
        RdKafka::Consumer* consumer = RdKafka::Consumer::create(global_conf, err_string);
        if (!consumer) {
                printf("failed to create consumer, %s\n", err_string.c_str());
                return -1;
        }
        printf("create consumer %s \n", consumer->name().c_str());

  			// 创建topic
        std::string topic_name = "kafka-test-wayne";
        RdKafka::Topic* topic = RdKafka::Topic::create(consumer, topic_name, topic_conf, err_string);
  			if (!topic) {
                printf("try create topic[%s] failed, %s\n", topic_name.c_str(), err_string.c_str());
                return -1;
        }
        printf("create topic[%s] successd. \n", topic_name.c_str());
				
  			// 在 topic 下 partition 区 start offset 处 开始消费
        RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
        if (resp != RdKafka::ERR_NO_ERROR) {
                printf("Failed to start consumer: %s\n", RdKafka::err2str(resp).c_str());
                return -1;
        }

  			// 一直消费。control + z 结束
        while (true) {
                RdKafka::Message* msg = consumer->consume(topic, partition, 2000);
                printf("topic[%s], partition[%d], start consume\n", topic_name.c_str(), partition);
                msg_consume(msg);
                delete msg;
        }

        consumer->stop(topic, partition);
        consumer->poll(1000);

        delete topic;
        delete consumer;

        return 0;
}

按照 quickStart 中的方法,先开启 zk 和 kafka。
服务端要配置服务器监听端口,在 server.properties 中修改

# 加上这个配置 这样就能监听到9092端口
listeners=PLAINTEXT://:9092

然后开启 kafka 的生产者控制台
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-test-wayne 生产几条数据
在这里插入图片描述
编译c++代码,开始消费
clang++ consumer.cpp -std=c++11 -lrdkafka++ 默认会生成 a.out 文件,执行这个文件 ./a.out
在这里插入图片描述

可以看到已经成功消费~。

八、附录

  1. c++ kafka 客户端:https://github.com/edenhill/librdkafka
  2. kafka 官网:https://kafka.apachecn.org/
  3. librdkafka配置文件:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  4. 后续代码我会放在 https://github.com/shishc9/kafka-demo
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka - a simple consumer demo - c++ 的相关文章

随机推荐

  • CCF/CSP 201409-3 字符串匹配(满分题解Java版)

    此题虽然放在了第三题 但是如果对Java的API了解的比较好的同学 解这道题一点都不难 比前几题都要简单一些 题目描述 官方题目地址 读题请点击 Java满分题解 import java util Scanner next 与 nextLi
  • Java 图形用户界面 复习题

    题目 编写一个包含主方法main的公共类 访问权限为public的类 该类继承自窗体类JFrame 并且 该类实现了接口ActionListener 实现接口ActionListener的方法actionPerformed 需要实现的界面
  • 工程师如何提高写作修养

    昨天非常有幸参加了电子工业出版社博文视点专业出版高峰论坛 在 写作精进 分论坛上 我受邀做了主题为 工程师如何提高写作修养 的分享 昨天现场的同学的不多 而这个主题估计很多都会有兴趣 发布在这里 供大家参考 在此 再次感谢电子工业出版社博文
  • 字符串前面补零的简单写法

    include
  • 代理模式--静态代理

    明确AOP之前首先要对代理模式进行深刻的学习 代理模式分为静态代理 和动态代理 动态代理又包括JDK代理和Cglib 本文主要学习静态代理 代理模式 从生活出发 我是一个要租房子的人 我要租房子 要找房屋中介 房源多 我不会去找房东 因为很
  • 2022亚太E题——How Many Nuclear Bombs can Destroy the Earth?(思路)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 2022亚太E题 How Many Nuclear Bombs can Destroy the Ear
  • 使用Flutter开发俄罗斯方块小游戏

    一 本篇文章主要是来讲解下俄罗斯方块游戏的开发思路 当然可能不是最好的思路 博客文章顶部有代码 仅供参考 二 效果图 视频效果图地址 三 UI页面思路拆解 游戏的主界面两部分组成 上面为15 10的格子用来放置方块 下面为操作按钮和显示当前
  • MP2481DH背光IC过压保护设置问题

    现象 夏普的屏可以点亮 但是君创的屏点不亮 分析 过压保护了 具体还没分析 以后再看数据手册 解决 如下电阻改为300K 可同时兼容两款屏
  • 基于FPGA的频率计

    1 简介 频率计又称为频率计数器 是一种专门对被测信号频率进行测量的电子测量仪器 2 传统测量法 传统测量法有两种 周期测量法 和 频率测量法 2 1 周期测量法 原理 先测出被测信号的周期 T T T 然后根据频率 f
  • 宏包algorithm与algorithmic引发的Undefined control sequence问题

    背景 自己是在texlive vs code环境下写小论文 在写算法的时候 一直出现输入控制语句全部都是没有定义的 如下 Undefined control sequence REQUIRE Undefined control sequen
  • Typora--图片上传方案Typora+PicGo+Gitee

    目录 一 简介 二 步骤 1 Gitee的部署 2 PicGo的设置 3 Typora的设置 三 其他 一 简介 当使用Typora的MarkDown编辑软件来做学习记录 上传图片时 都要创建一个文件夹来存放图片 这样子一来很不方便 有没有
  • 使用JDBC数据迁移把Mysql数据到另一个库中

    数据迁移 简介 整体思路链接2个需要迁移的数据库 根据sql 进行查询 判断什么样的数据需要迁移 什么样的数据需要过滤掉 数据重复或者出错的情况输出到某个文件中 如果数据可以整体迁移而且不出格式差异的情况也可以直接导出sql文件进行迁移 此
  • Binder 连接池的学习

    利用AIDL方式能很方便地进行客户端和服务端的跨进程通信 但是 我们想一下 如果按照我们之前的使用方法 必须满足一个AIDL接口对应一个service 那么问题来了 假如我们的应用 有很多业务场景 而每一个业务场景都需要和服务端通讯 那么我
  • MyBatis-Plus深入 —— 条件构造器与插件管理

    前言 在前面的文章中 荔枝梳理了一个MyBatis Plus的基本使用 配置和通用Service接口 我们发现在MyBatis Plus的辅助增强下我们不再需要通过配置xml文件中的sql语句来实现基本的sql操作了 不愧是最佳搭档 在这篇
  • keil5编译出现Error: L6411E:的解决办法

    出现这个问题很大的可能是keil5与ads产生冲突 此时只需要删除ads的环境变量即可 如下图 将其值含有ads的系统变量和用户变量全部删除 然后新建一个用户变量 变量名为ARMCC5LIB 其值要看你keil的安装路径 我的值是Y sof
  • android查看app是platform_app,system_app还是priv_app

    untrusted app 第三方app 没有Android平台签名 没有system权限 platform app 有android平台签名 没有system权限 system app 有android平台签名和system权限 从上面划
  • Windows10下载安装openjdk11及配置环境变量

    Windows10下载安装openjdk11及配置环境变量 下载JDK 首先我们需要下载java开发工具包JDK 下载地址 https cn azul com downloads zulu community version java 11
  • UE4外包团队:更新一下UE4和Unity3D案例

    全部的贴图都是用出的法线贴图构建的话只用了阳光和天光 都是静态光源 视角是第一人称模板最后的效果嘛就是全4K 120帧 0错误0警告 场景小是小了点但是效果还不错 工作活有时间更新 欢迎有UE4和Unity项目外包的联系我们 谢谢 转载于
  • CNN卷积核

    接着呢 我们需要处理我们的xs 把xs的形状变成 1 28 28 1 1代表先不考虑输入的图片例子多少这个维度 后面的1是channel的数量 因为我们输入的图片是黑白的 因此channel是1 例如如果是RGB图像 那么channel就是
  • Kafka - a simple consumer demo - c++

    Kafka 如果有 kafka 基础的同学可以不用看前面的废话 可以从第五条 配置 开始看起 代码在第七条 前言 官网比我这标准多了 官网跳转 大家可以先完成quickStart部分kafka单机生产消费 一 概念简介 Kafka 是一个分