使用librdkafka的C++接口实现简单的生产者和消费者

2023-11-02

一.编译librdkafka

环境:Fedora 20,32位

依赖:pthreads(必选),zlib(可选),libssl-dev(可选),libsasl2-dev(可选)

先查看自己的linux上是否安装了pthreads,指令如下:

# locate libpthread

因为我之前安过了,所以可以直接编译librdkafka,没有安的下个pthreads的源码——configure、make、make install。

开始编译librdkafka,指令如下:

# ./configure
# make
# make install
lib库会被默认安装到/usr/local/lib目录

头文件被默认安装到/usr/local/include/librdkafka目录


二.生产者

新建Qt控制台工程KafkaProducer,Pro文件如下:

#-------------------------------------------------
#
# Project created by QtCreator 2018-03-27T19:45:09
#
#-------------------------------------------------
QT       -= gui core

TARGET = KafkaProducer
CONFIG   += console
CONFIG   -= app_bundle

TEMPLATE = app

SOURCES += main.cpp
INCLUDEPATH += /usr/local/include/librdkafka
LIBS += -L/usr/local/lib -lrdkafka
LIBS += -L/usr/local/lib -lrdkafka++
main.cpp文件如下:
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>

#include <getopt.h>

#include "rdkafkacpp.h"

static bool run = true;

static void sigterm (int sig) {
    run = false;
}

class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb (RdKafka::Message &message) {
        std::cout << "Message delivery for (" << message.len() << " bytes): " <<
                     message.errstr() << std::endl;
        if (message.key())
            std::cout << "Key: " << *(message.key()) << ";" << std::endl;
    }
};

class ExampleEventCb : public RdKafka::EventCb {
public:
    void event_cb (RdKafka::Event &event) {
        switch (event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                         event.str() << std::endl;
            if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
                run = false;
            break;

        case RdKafka::Event::EVENT_STATS:
            std::cerr << "\"STATS\": " << event.str() << std::endl;
            break;

        case RdKafka::Event::EVENT_LOG:
            fprintf(stderr, "LOG-%i-%s: %s\n",
                    event.severity(), event.fac().c_str(), event.str().c_str());
            break;

        default:
            std::cerr << "EVENT " << event.type() <<
                         " (" << RdKafka::err2str(event.err()) << "): " <<
                         event.str() << std::endl;
            break;
        }
    }
};

int main ()
{
    std::string brokers = "localhost";
    std::string errstr;
    std::string topic_str="test";
    int32_t partition = RdKafka::Topic::PARTITION_UA;

    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    conf->set("bootstrap.servers", brokers, errstr);

    ExampleEventCb ex_event_cb;
    conf->set("event_cb", &ex_event_cb, errstr);

    signal(SIGINT, sigterm);
    signal(SIGTERM, sigterm);

    ExampleDeliveryReportCb ex_dr_cb;
    conf->set("dr_cb", &ex_dr_cb, errstr);

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }
    std::cout << "% Created producer " << producer->name() << std::endl;

    RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,
                                                   tconf, errstr);
    if (!topic) {
        std::cerr << "Failed to create topic: " << errstr << std::endl;
        exit(1);
    }

    for (std::string line; run && std::getline(std::cin, line);) {
        if (line.empty()) {
            producer->poll(0);
            continue;
        }

        RdKafka::ErrorCode resp =
                producer->produce(topic, partition,
                                  RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
                                  const_cast<char *>(line.c_str()), line.size(),
                                  NULL, NULL);
        if (resp != RdKafka::ERR_NO_ERROR)
            std::cerr << "% Produce failed: " <<
                         RdKafka::err2str(resp) << std::endl;
        else
            std::cerr << "% Produced message (" << line.size() << " bytes)" <<
                         std::endl;

        producer->poll(0);
    }

    run = true;
   // 退出前处理完输出队列中的消息
    while (run && producer->outq_len() > 0) {
        std::cerr << "Waiting for " << producer->outq_len() << std::endl;
        producer->poll(1000);
    }

    delete conf;
    delete tconf;
    delete topic;
    delete producer;

    RdKafka::wait_destroyed(5000);

    return 0;
}

三.消费者

新建Qt控制台工程KafkaConsumer,Pro文件如下:

#-------------------------------------------------
#
# Project created by QtCreator 2018-03-28T16:27:54
#
#-------------------------------------------------
QT       -= gui core

TARGET = KafkaConsumer
CONFIG   += console
CONFIG   -= app_bundle

TEMPLATE = app

SOURCES += main.cpp
INCLUDEPATH += /usr/local/include/librdkafka
LIBS += -L/usr/local/lib -lrdkafka
LIBS += -L/usr/local/lib -lrdkafka++
main.cpp文件如下:
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>

#include <sys/time.h>
#include <getopt.h>
#include <unistd.h>

#include "rdkafkacpp.h"

static bool run = true;
static bool exit_eof = true;
static int eof_cnt = 0;
static int partition_cnt = 0;
static int verbosity = 1;
static long msg_cnt = 0;
static int64_t msg_bytes = 0;

static void sigterm (int sig) {
    run = false;
}

class ExampleEventCb : public RdKafka::EventCb {
public:
    void event_cb (RdKafka::Event &event) {
        switch (event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                         event.str() << std::endl;
            if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
                run = false;
            break;

        case RdKafka::Event::EVENT_STATS:
            std::cerr << "\"STATS\": " << event.str() << std::endl;
            break;

        case RdKafka::Event::EVENT_LOG:
            fprintf(stderr, "LOG-%i-%s: %s\n",
                    event.severity(), event.fac().c_str(), event.str().c_str());
            break;

        case RdKafka::Event::EVENT_THROTTLE:
            std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
                         event.broker_name() << " id " << (int)event.broker_id() << std::endl;
            break;

        default:
            std::cerr << "EVENT " << event.type() <<
                         " (" << RdKafka::err2str(event.err()) << "): " <<
                         event.str() << std::endl;
            break;
        }
    }
};

void msg_consume(RdKafka::Message* message, void* opaque) {
    switch (message->err()) {
    case RdKafka::ERR__TIMED_OUT:
        //std::cerr << "RdKafka::ERR__TIMED_OUT"<<std::endl;
        break;

    case RdKafka::ERR_NO_ERROR:
        /* Real message */
        msg_cnt++;
        msg_bytes += message->len();
        if (verbosity >= 3)
            std::cerr << "Read msg at offset " << message->offset() << std::endl;
        RdKafka::MessageTimestamp ts;
        ts = message->timestamp();
        if (verbosity >= 2 &&
                ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
            std::string tsname = "?";
            if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)
                tsname = "create time";
            else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME)
                tsname = "log append time";
            std::cout << "Timestamp: " << tsname << " " << ts.timestamp << std::endl;
        }
        if (verbosity >= 2 && message->key()) {
            std::cout << "Key: " << *message->key() << std::endl;
        }
        if (verbosity >= 1) {
            printf("%.*s\n",
                   static_cast<int>(message->len()),
                   static_cast<const char *>(message->payload()));
        }
        break;

    case RdKafka::ERR__PARTITION_EOF:
        /* Last message */
        if (exit_eof && ++eof_cnt == partition_cnt) {
            std::cerr << "%% EOF reached for all " << partition_cnt <<
                         " partition(s)" << std::endl;
            run = false;
        }
        break;

    case RdKafka::ERR__UNKNOWN_TOPIC:
    case RdKafka::ERR__UNKNOWN_PARTITION:
        std::cerr << "Consume failed: " << message->errstr() << std::endl;
        run = false;
        break;

    default:
        /* Errors */
        std::cerr << "Consume failed: " << message->errstr() << std::endl;
        run = false;
    }
}

class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:
    void consume_cb (RdKafka::Message &msg, void *opaque) {
        msg_consume(&msg, opaque);
    }
};

int main () {
    std::string brokers = "localhost";
    std::string errstr;
    std::string topic_str="test";
    std::vector<std::string> topics;
    std::string group_id="101";

    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    //group.id必须设置
    if (conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK) {
           std::cerr << errstr << std::endl;
           exit(1);
         }

    topics.push_back(topic_str);
   //bootstrap.servers可以替换为metadata.broker.list
    conf->set("bootstrap.servers", brokers, errstr);

    ExampleConsumeCb ex_consume_cb;
    conf->set("consume_cb", &ex_consume_cb, errstr);

    ExampleEventCb ex_event_cb;
    conf->set("event_cb", &ex_event_cb, errstr);
    conf->set("default_topic_conf", tconf, errstr);

    signal(SIGINT, sigterm);
    signal(SIGTERM, sigterm);

    RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
    if (!consumer) {
        std::cerr << "Failed to create consumer: " << errstr << std::endl;
        exit(1);
    }
    std::cout << "% Created consumer " << consumer->name() << std::endl;

    RdKafka::ErrorCode err = consumer->subscribe(topics);
    if (err) {
        std::cerr << "Failed to subscribe to " << topics.size() << " topics: "
                  << RdKafka::err2str(err) << std::endl;
        exit(1);
    }

    while (run) {
        //5000毫秒未订阅到消息,触发RdKafka::ERR__TIMED_OUT
        RdKafka::Message *msg = consumer->consume(5000);
        msg_consume(msg, NULL);
        delete msg;
    }

    consumer->close();

    delete conf;
    delete tconf;
    delete consumer;

    std::cerr << "% Consumed " << msg_cnt << " messages ("
              << msg_bytes << " bytes)" << std::endl;

    //应用退出之前等待rdkafka清理资源
    RdKafka::wait_destroyed(5000);

    return 0;
}
四.测试

先启动zookeeper服务和kafka服务,详见:kafka的编译和使用,然后再启动生产者和消费者。

生产者循环等待用户输入,输入后回车,消息就发布出去了,此时消费者显示订阅到的内容。


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用librdkafka的C++接口实现简单的生产者和消费者 的相关文章

  • kafka详解及集群环境搭建

    一 kafka详解 安装包下载地址 https download csdn net download weixin 45894220 87020758 1 1Kafka是什么 1 Kafka是一个开源消息系统 由Scala写成 是由Apac
  • kafka中partition数量与消费者对应关系

    kafka是由Apache软件基金会开发的一个开源流处理平台 kafka是一种高吞吐量的分布式发布订阅消息系统 它可以处理消费者在网站中的所有动作流数据 kafka中partition类似数据库中的分表数据 可以起到水平扩展数据的目的 比如
  • springboot本机启动elasticjob抛出异常HostException(ip is null)

    1 使用的elasticjob版本为3 0 1 2 本机的IPV4在校验isReachable 返回false 可能是使用无线网 导致ip验证问题 3 最后引入Groovy解决 引入包
  • Kafka剖析(一):Kafka背景及架构介绍

    转载自 http www infoq com cn articles kafka analysis part 1 Kafka 是由 LinkedIn 开发的一个分布式的消息系统 使用 Scala 编写 它以可水平扩展和高吞吐率而被广泛使用
  • Kafka一文懂

    初识 Kafka 什么是 Kafka Kafka 是由 Linkedin 公司开发的 它是一个分布式的 支持多分区 多副本 基于 Zookeeper 的分布式消息流平台 它同时也是一款开源的基于发布订阅模式的消息引擎系统 Kafka 的基本
  • 大数据技术之Kafka——Kafka入门

    目录 一 概述 1 1 为什么要有Kafka 1 2 定义 1 3 消息队列 1 消息队列的应用场景 2 消息队列的两种模式 1 4 基础架构 二 Producer生产者 2 1 生产者消息发送流程 2 1 1 发送原理 2 2 异步发送A
  • 黑马头条 热点文章实时计算、kafkaStream

    热点文章 实时计算 1 今日内容 1 1 定时计算与实时计算 1 2 今日内容 kafkaStream 什么是流式计算 kafkaStream概述 kafkaStream入门案例 Springboot集成kafkaStream 实时计算 用
  • win10系统下安装Kafka 的详细步骤

    Win10 系统下要使用Kafka需要经过以下三个步骤 1 安装JDK 需要安装依赖java JDK 2 安装zookeeper 资源协调 分配管理 3 安装Kafka 一 安装 Java SE Development Kit 13 0 1
  • kafka问题解决:org.apache.kafka.common.errors.TimeoutException

    记录使用kafka遇到的问题 1 Caused by java nio channels UnresolvedAddressException null 2 org apache kafka common errors TimeoutExc
  • kafka(三)重平衡

    历史文章 kafka 一 kafka的基础与常用配置 文章目录 一 kafka消费者组 二 重平衡 Rebalance 2 1 重平衡触发条件 2 2 重平衡策略 2 2 1 Range 平均分配 2 2 2 RoundRobin 轮询分配
  • WebSocket + kafka实时推送数据(springboot纯后台)

    逻辑 kafka订阅消费者主题 消费后通过webSocket推送到前端 kafka vue financial webSocket 学习引用 SpringBoot2 0集成WebSocket 实现后台向前端推送信息 World Of Mos
  • 公司实战 ElasticSearch+Kafka+Redis+MySQL

    一 需求 前一段时间公司要进行数据转移 将我们ES数据库中的数据转移到客户的服务器上 并且使用定时将新增的数据同步 在这过程中学到了很多 在此记录一下 二 技术栈 Mysql Redis ElasticSearch Kafka 三 方案 为
  • JMS 消耗多个主题

    我是 Java 新手 正在开发一个使用多个 不同 主题并将其发送到另一台服务器的项目 我想知道处理多个主题的最佳方法是什么 据我了解 每个消费者都与一个主题相关 因此 如果我必须使用多个主题 则每个不同的主题都需要一个消费者 由于消费者进行
  • MQ - KAFKA 基础篇

    1 KAFKA的核心组件 API Producer API 它允许应用程序向一个或多个 topics 上发送消息记录 Consumer API 允许应用程序订阅一个或多个 topics 并处理为其生成的记录流 Streams API 它允许
  • 如何使用 python asyncio 编写 Consumer.Producer 代码?

    我的Python版本是3 6 1 我写了一些东西来使用 Python asyncio 实现消费者 生产者模型 但它并没有按预期工作 四个活动均已创建 但没有任何打印导出 async def consumer queue id while T
  • 一文弄懂事件Event与Kafka的区别

    事件 Event 和 Apache Kafka 是两个概念层面上有所不同的东西 它们在应用程序中的作用和使用场景也有很大的差异 1 概念和定义 事件 Event 事件是 系统内发生 的特定事情或状态变化的表示 在编程和软件设计中 事件通常被
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的
  • Java 8 java.util.function.Consumer<> 的 C# 等效项是什么?

    C 中是否有与此接口等效的接口 例子 Consumer
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比
  • java并发:多生产者一消费者

    我遇到一种情况 不同的线程填充一个队列 生产者 并且一个消费者从该队列中检索元素 我的问题是 当从队列中检索这些元素之一时 某些元素会丢失 丢失信号 生产者代码是 class Producer implements Runnable pri

随机推荐

  • STM32学习——FATFS文件系统

    目录 什么是文件系统 常用的文件系统 FATFS的特点 FATFS层次结构 移植步骤 相关配置宏 FATFS文件系统移植实验 FATFS程序结构图 FATFS底层设备驱动函数 宏定义 设备状态获取 设备初始化 读取扇区 扇区写入 什么是文件
  • 代码质量检测工具 QAPLug

    代码质量检测工具 情景 写完代码一定要别人review才发现bug或不好的语法或多余的变量是一件多么尴尬的事情 如果想在写代码时或者写代码后自己能发现问题 那么代码QA工具无疑是你必备的工具 工具 QAPlug就是一款实用十分方便的代码质量
  • [游戏] chrome 的小彩蛋

    在电脑上不了网时 chrome 显示无法显示此网页的同时 还会有一个小游戏可以玩 用户可以操作空格键来控制一只小恐龙让它跳过灌木丛
  • Python 实现逐步回归

    常用评价指标简介 当前统计学以计算机科学作为支撑 机器于人工的优势是计算速度 但机器无法自行判断运算何时退出 因此需要定量指标作为运算退出的标志 对于预测类的统计模型来说 常见的指标有赤池信息准则 AIC 贝叶斯信息准则 BIC R方 RO
  • 冒泡排序、选择排序、插入排序 原理及Java代码实现

    1 冒泡排序 冒泡排序 Bubble Sort 是一种计算机科学领域的较简单的排序算法 冒泡排序算法的原理如下 1 比较相邻的元素 如果第一个比第二个大 就交换他们两个 2 对每一对相邻元素做同样的工作 从开始第一对到结尾的最后一对 在这一
  • Cpp学习——动态内存管理

    目录 一 new 1 malloc realloc calloc的使用不便之处 2 new的好处 3 opreator new 二 delete 1 为什么要有delete 2 为什么要匹配使用 一 new 1 malloc realloc
  • 【论文精读】NeRF详解

    最近阅读了开启三维重建新纪元的经典文章 NeRF Representing Scenes as Neural Radiance Fields for View Synthesis 接下来会 更新NeRF系列的论文精读 代码详解 力求做到全网
  • SpringBoot2.x 集成 Swagger3(springdoc-openapi)

    Swagger是一款RESTFUL接口的文档在线自动生成加功能测试的软件 提供描述 生产 消费和可视化RESTful Web Service Swagger也是一个api文档维护组织 后来成为了OpenAPI 一个业界的api文档标准 标准
  • 赛联区块链培训课程介绍

    基础版
  • 谷歌V8引擎运行机制概览

    最近学习了极客时间上李兵大佬的谷歌V8引擎课程 总结了一下 在公司内部小组分享了一波 在此也分享一下 原理图直接用的专栏的图 由于时间有限 总结略显粗糙 注 解释执行 编译执行各有优缺点 解释执行 不需要做过多的编译 所以启动快 执行时不时
  • 如何让footer始终在页面底部固定

    footer height 50px position fixed bottom 0px left 0px right 0px 参考 https www cnblogs com lk kk p 4654832 html
  • 已 树莓派4b ros 系统 网盘_树莓派4B初次使用--系统安装

    准备 硬件 树莓派本体 读卡器 TF卡 电源线 HDMI连接线 可选 显示器 可选 软件 SDFormatter格式化工具 Win32DiskImager烧录工具 Finalshell Cellular Z 技术规格 首先 来看看树莓派4的
  • 四川对口高职计算机录取分数,四川录取分数线 四川对口高职录取分数线出炉!!!...

    四川对口高职录取分数线出炉 你在线吗 快来看看 成都纺织学院5141 四川中医学院5146 成都航空空职业技术学院5151 四川交通职业技术学院5152 达州职业技术学院5153 四川工程技术学院5155 绵阳职业技术学院5157 四川建筑
  • 【Unity-学习-014】EasyAR4.0稀疏空间地图 扫描场景功能

    本帖主要描写扫描场景的功能实现 以及一些需要注意的问题 跟上层贴有所关联 想要更多了解请移步链接 场景中有几个重要的预设需要添加 目录如下 其中 SparseSpatialMap 用于扫描空间成成点云信息 点云可以将空间数据以点的信息保存下
  • 跟踪路由 Tracert

    跟踪路由 Tracert 2007年04月20日 09 03 A M Tracert 跟踪路由 是路由跟踪实用程序 用于确定 IP 数据报访问目标所采取的路径 Tracert 命令用 IP 生存时间 TTL 字段和 ICMP 错误消息来确定
  • TMX瓦片地图无法加载问题

    1 cocos2d x 加载tmx图片失败 重新编译运行时加载出错 提示 TMX Only 1 tilset per layer is supported 原来是同一个图层上只能使用同一图块资源的元素 必须要新建一个图层 将新添加的元素布局
  • 虚拟机克隆两网卡冲突

    常见网卡设置 vim etc sysconfig network scripts ifcfg ens33 TYPE Ethernet BOOTPROTO static DEFROUTE yes NAME ens33 UUID 025f788
  • 云计算虚拟化技术与开发-------虚拟化技术应用第一章内容(虚拟化技术概念、虚拟化特征、虚拟化目的、半虚拟化和全虚拟化特点和区别、虚拟化实现的三种结构的特点和区别)

    目录 虚拟化技术第一章主要内容 虚拟化技术的概念 虚拟化的特征 虚拟化的目的 虚拟化与云计算的关系 半虚拟化和全虚拟化的特点和区别 虚拟化实现的三种结构的特点和区别 虚拟化技术第一章主要内容 虚拟化技术的概念 虚拟化 Virtualizat
  • pgsql遇到小问题及小功能记录

    一 问题 1 ERROR invalid input syntax for integer 使用COALESCE a delete flag 0 int as delete flag报错 因为delete flag里面包含空值 改为 COA
  • 使用librdkafka的C++接口实现简单的生产者和消费者

    一 编译librdkafka 环境 Fedora 20 32位 依赖 pthreads 必选 zlib 可选 libssl dev 可选 libsasl2 dev 可选 先查看自己的linux上是否安装了pthreads 指令如下 loca