封装C++风格的rdkafka库

2023-11-15

项目中用到了kafka,系统是C++开发的,没有现成的可集成API,查阅github,发现有rdkafka,这个C库,挺好用的,但是,他依然不够简洁,因此,对他做了一下封装.

#ifndef _KAFKAMQ_H_
#define _KAFKAMQ_H_

#include <vector>
#include <string>
#include <librdkafka/rdkafkacpp.h>

using namespace std;

class KafkaProducer
{
public:
    KafkaProducer();
    virtual ~KafkaProducer();

public:
    void setBrokers(const string& sBrokers) { m_sBrokers = sBrokers; }

public:
    const vector<string> &getTopicList() const;
    int32_t initialize(const string &sbroker);
    void addTopic(const string& sTopic);
    void pushData2Topic(const string &sTopic, const string sMsg);

private:
    string                          m_sBrokers;
	char*                           m_szBuf;

    RdKafka::Producer*              m_pProducer;
    RdKafka::Conf*                  m_pGlobalConf;
    RdKafka::Conf*                  m_pTopicConf;
    vector<string>                  m_vecTopics;
};


 
class KafkaConsumer
{
public:
    KafkaConsumer();
    virtual ~KafkaConsumer();

public:
    int32_t initializer(const string& sbroker,const string& sGroupId);
    int32_t addTopic(const string& sNewTopic);
    void    addPartitions(const string& sNewTopic);
    int32_t subscribe();

public:
    void work(int32_t iTimeoutMs = 1000);
    virtual void handleConsumeCb(RdKafka::Message *message, void *opaque) = 0;

private:
    string                      m_sBrokers;
    int32_t                     m_iPartition;
    int64_t                     m_iCurrentOffset;
    vector<string>              m_vecTopics;
    string                      m_sGroupId;
    bool                        m_bInit;
    bool                        m_bSub;

    RdKafka::Conf*              m_pGlobalConf;
    RdKafka::Conf*              m_pTopicConf;
    RdKafka::KafkaConsumer*     m_pConsumer;

    std::vector<RdKafka::TopicPartition *> m_vecTopicPartitions;
};

#endif // _KAFKAMQ_H_

上面是他的头文件定义,里面有一个纯虚函数"handleConsumeCb",这是kafka的消费者在接收到要用的数据的时候,按照需求自己去重载,对信息做处理.

#include "KafkaMq.h"
#define     HANDLE_NUM_PER_LOOP     100
#define     MAX_BUFFER_LENGTH       65535


KafkaProducer::KafkaProducer()
    : m_sBrokers()
    , m_pProducer(NULL)
    , m_pGlobalConf(NULL)
    , m_pTopicConf(NULL)
    , m_szBuf(NULL)
{

}

KafkaProducer::~KafkaProducer()
{
    if (m_pGlobalConf)
    {
        delete m_pGlobalConf;
        m_pGlobalConf = NULL;
    }

    if (m_pTopicConf)
    {
        delete m_pTopicConf;
        m_pTopicConf = NULL;
    }

    if (m_pProducer)
    {
        delete m_pProducer;
        m_pProducer = NULL;
    }

    if(m_szBuf)
    {
        delete [] m_szBuf;
        m_szBuf = NULL;
    }
}

int32_t KafkaProducer::initialize(const string &sBroker)
{
    if (sBroker.empty())
    {
        cout<<" KafkaProducer::initialize Parameter error" << endl;
        return -1;
    }

    m_szBuf = new char[MAX_BUFFER_LENGTH];
    m_sBrokers = sBroker;

    string  strError;
    m_pGlobalConf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if (!m_pGlobalConf)
    {
        cout<<"RdKafka create global conf failed" << endl;
        return -1;
    }

    m_pTopicConf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if (!m_pTopicConf)
    {
        cout<<"RdKafka create global conf failed" << endl;
        return -1;
    }

    if (m_pGlobalConf->set("message.max.bytes", "10240000", strError) != RdKafka::Conf::CONF_OK)
    {
        cout<<"RdKafka conf set message.max.bytes failed :" << strError << endl;
        return -1;
    }

    if (m_pGlobalConf->set("bootstrap.servers", m_sBrokers, strError) != RdKafka::Conf::CONF_OK)
    {
        cout<<"RdKafka conf set bootstrap.servers failed :" << strError << endl;
        return -1;
    }

    m_pProducer = RdKafka::Producer::create(m_pGlobalConf, strError);
    if (!m_pProducer)
    {
        cout<<"create topic  failed :" << strError << endl;
        return -1;
    }

    //从服务端获取已经创建的topic
    RdKafka::Metadata *stMetadataMap;
    RdKafka::ErrorCode errCode = m_pProducer->metadata(true, nullptr, &stMetadataMap, 5000);
    if (errCode != RdKafka::ERR_NO_ERROR)
    {
        cout<<"get topiclist failed :" << strError << endl;
        return -1;
    }

    const RdKafka::Metadata::TopicMetadataVector *topicList = stMetadataMap->topics();
    for (auto itr = topicList->begin(); itr != topicList->end(); itr++)
    {
        cout<<" find topic: " << (*itr)->topic() << endl;
        m_vecTopics.push_back((*itr)->topic());
    }

    if (stMetadataMap)
    {
        delete stMetadataMap;
        stMetadataMap = NULL;
    }

    return 0;
}

void KafkaProducer::addTopic(const string& sTopic)
{
    if (find(m_vecTopics.begin(), m_vecTopics.end(), sTopic) == m_vecTopics.end())
    {
        string sErrStr;
        RdKafka::Topic *topic = RdKafka::Topic::create(m_pProducer, sTopic, m_pTopicConf, sErrStr);
        if (!topic)
        {
            cout<<"Create Topic failed" << sErrStr << endl;
            return;
        }
        m_vecTopics.push_back(sTopic);
        delete topic;
    }
    else
    {
        cout<<"this topic is already created!"<< endl;
    }
}

const vector<string> &KafkaProducer::getTopicList() const
{
    return m_vecTopics;
}

void KafkaProducer::pushData2Topic(const string &sTopic, const string sMsg)
{
    if (sMsg.length() > MAX_BUFFER_LENGTH)
    {
        cout<<"msg length is out of range"<< endl;
        return;
    }

    int64_t iTimestamp = tars::TC_Common::now2ms();

    memset(m_szBuf, 0, MAX_BUFFER_LENGTH);
    memcpy(m_szBuf, sMsg.c_str(), sMsg.length());

    //发送消息
    RdKafka::ErrorCode resCode = m_pProducer->produce(sTopic, RdKafka::Topic::PARTITION_UA,
                                                     RdKafka::Producer::RK_MSG_COPY,
                                                     (void *)m_szBuf, strlen(m_szBuf),
                                                     NULL, 0,
                                                     iTimestamp++, NULL);
    usleep(10000);
    if (resCode != RdKafka::ERR_NO_ERROR)
    {
        cout<<"Push data failed! " << RdKafka::err2str(resCode) << endl;
    }
}


//


KafkaConsumer::KafkaConsumer()
    : m_sBrokers()
    , m_iPartition(0)
    , m_iCurrentOffset(RdKafka::Topic::OFFSET_BEGINNING)
    , m_sGroupId()
    , m_bInit(false)
    , m_bSub(false)
    , m_pGlobalConf(NULL)
    , m_pTopicConf(NULL)
    , m_pConsumer(NULL)
{
    m_pGlobalConf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    m_pTopicConf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
}

KafkaConsumer::~KafkaConsumer()
{
    if (m_pConsumer)
    {
        delete m_pConsumer;
        m_pConsumer = NULL;
    }

    if (m_pGlobalConf)
    {
        delete m_pGlobalConf;
        m_pGlobalConf = NULL;
    }

    if (m_pTopicConf)
    {
        delete m_pTopicConf;
        m_pTopicConf = NULL;
    }

    for (auto itr = m_vecTopicPartitions.begin(); itr != m_vecTopicPartitions.end(); ++itr)
    {
        if (NULL != *itr)
        {
            delete *itr;
            *itr = NULL;
        }
    }
    m_vecTopicPartitions.clear();

    RdKafka::wait_destroyed(5000);

    m_bInit = false;
}

int32_t KafkaConsumer::initializer(const string& sbroker,const string& sGroupId)
{
    if(sbroker.empty() || sGroupId.empty())
    {
        cout<<"KafkaConsumer::initializer : Invalid Parameter" << endl;
        return -1;
    }

    m_sBrokers = sbroker;
    m_sGroupId = sGroupId;

    string sErrStr;
    if (m_pGlobalConf->set("bootstrap.servers", m_sBrokers, sErrStr))
    {
        cout<<"KafkaConsumer::initializer : set broker failed" << endl;
        return -1;
    }

    if(m_pGlobalConf->set("max.partition.fetch.bytes", "1024000", sErrStr))
    {
        cout<<"KafkaConsumer::initializer : set max.partition.fetch.bytes failed" << endl;
        return -1;
    }

    if(m_pGlobalConf->set("group.id", m_sGroupId, sErrStr))
    {
        cout<<"KafkaConsumer::initializer : set group.id failed" << endl;
        return -1;
    }

    if(m_pGlobalConf->set("default_topic_conf", m_pTopicConf, sErrStr))
    {
        cout<<"KafkaConsumer::initializer : set default topic conf failed" << endl;
        return -1;
    }

    if(m_pTopicConf->set("auto.offset.reset", "latest", sErrStr))
    {
        cout<<"KafkaConsumer::initializer : set auto.offset.reset failed" << endl;
        return -1;
    }
    
    m_pConsumer = RdKafka::KafkaConsumer::create(m_pGlobalConf, sErrStr);
    if (!m_pConsumer)
    {
        cout<<"KafkaConsumer::initializer : Failed to create consumer : " << sErrStr << endl;
        return -1;
    }

    m_bInit = true;

    return 0;
}

void KafkaConsumer::work(int32_t iTimeoutMs)
{
    if(!m_bInit)
    {
        cout<<"need to init first!" << endl);
        return;
    }

    if(m_bSub)
    {
        RdKafka::Message *msg = m_pConsumer->consume(iTimeoutMs);
        handleConsumeCb(msg, NULL);
        delete msg;
    }
}

int32_t KafkaConsumer::addTopic(const string & sNewTopic)
{
    if(find(m_vecTopics.begin(), m_vecTopics.end(), sNewTopic) != m_vecTopics.end())
    {
        cout<<sNewTopic << " is already subscribed!" << endl;
        return 0;
    }

    cout<<sNewTopic << " is now subscribing" << endl;

    m_vecTopics.push_back(sNewTopic);

    if(m_bSub)
    {
        m_bSub = false;
        addPartitions(sNewTopic);
    }

    return 0;
}

int32_t KafkaConsumer::subscribe()
{
    if (!m_bInit)
    {
        cout<<"need to init first!" << endl;
        return -1;
    }

    if (m_vecTopics.size() == 0)
    {
        cout<<"KafkaConsumer::subscribe : Invalid param" << endl;
        return -1;
    }

    m_pConsumer->unassign();
    m_pConsumer->unsubscribe();

    string sErrStr;
    RdKafka::ErrorCode errCode;
    m_pConsumer->assign(m_vecTopicPartitions);
    errCode = m_pConsumer->subscribe(m_vecTopics);
    if (errCode != RdKafka::ERR_NO_ERROR)
    {
        cout<<"KafkaConsumer::subscribe : " << RdKafka::err2str(errCode) << endl;
        return -1;
    }

    m_bSub = true;

    return 0;
}

void KafkaConsumer::addPartitions(const string& sNewTopic)
{
    if(find(m_vecTopics.begin(), m_vecTopics.end(), sNewTopic) == m_vecTopics.end())
    {
        cout<<sNewTopic << " is now subscribed!"<<endl;
        m_vecTopics.push_back(sNewTopic);
    }

    RdKafka::Metadata *pMetadataMapPtr;
    RdKafka::ErrorCode errCode = m_pConsumer->metadata(true, nullptr, &pMetadataMapPtr, 2000);
    if (errCode != RdKafka::ERR_NO_ERROR)
    {
        cout<<"KafkaConsumer::subscribe : " << RdKafka::err2str(errCode) << endl;
        return ;
    }

    const RdKafka::Metadata::TopicMetadataVector *topicList = pMetadataMapPtr->topics();

    RdKafka::Metadata::TopicMetadataVector subTopicMetaVec;
    for (auto itr = topicList->begin(); itr != topicList->end(); itr++)
    {
        auto sitr = find(m_vecTopics.begin(), m_vecTopics.end(), (*itr)->topic());
        if(sitr != m_vecTopics.end())
        {
            subTopicMetaVec.push_back(*itr);
        }
        else
        {
            cout<<"Can not find this topic from server" << endl;
            return;
        }
    }

    for (auto itr = subTopicMetaVec.begin(); itr != subTopicMetaVec.end(); itr++)
    {
        const RdKafka::TopicMetadata *data = *itr;
        auto parVec = data->partitions();
        for(auto pItr = parVec->begin(); pItr != parVec->end(); pItr++)
        {
            m_vecTopicPartitions.push_back(RdKafka::TopicPartition::create(data->topic(), (*pItr)->id(), RdKafka::Topic::OFFSET_END));
        }
    }

    if (pMetadataMapPtr)
    {
        delete pMetadataMapPtr;
        pMetadataMapPtr = NULL;
    }
}

以上是相关函数的实现代码.

测试程序如下

// 消息订阅者测试程序
int testConsumer()
{
    KafkaConsumer stComsumer;
    int iRet = stComsumer.initializer("127.0.0.1:9092", "demoGroup");
    if (iRet)
    {
        return -1;
    }
    stComsumer.addTopic("my_first_topic");
    stComsumer.addTopic("testTopic");
    stComsumer.subscribe();
    while (true)
    {
        stComsumer.work(5000);
    }
    RdKafka::wait_destroyed(5000);

    return 0;
}

// 消息发布者测试
int testProducer()
{
    KafkaProducer producer;
    int iRet = producer.initialize("127.0.0.1:9092");
    if(iRet)
    {
        cout<<"Initialize failed!"<<endl;
        return -1;
    }

    producer.addTopic("Market.1");
    producer.pushData2Topic("Market.1","test");

    vector<string> vecTopics = producer.getTopicList();
    for(auto itr = vecTopics.begin(); itr != vecTopics.end(); itr++)
    {
        producer.pushData2Topic(*itr,"test!");
    }

    //从新获取topic,然后推送数据
    producer.addTopic("test090218");
    usleep(60000);
    vecTopics = producer.getTopicList();
    for(auto itr = vecTopics.begin(); itr != vecTopics.end(); itr++)
    {
        producer.pushData2Topic(*itr,"testnew topic!");
    }

    return 0;
}

int main()
{
	//testConsumer();
	//testProducer();
	return 0;
}

上述代码,main函数无法直接运行,因为实际中,除非在测试的时候,将纯虚函数实现(测试的时候,可以直接打印里面内容即可).

然后编译代码的时候,需要增加链接rdkafka的选项(rdkafka++)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

封装C++风格的rdkafka库 的相关文章

  • C++:头文件中全局函数的多重定义错误

    该函数是全局的 在头文件中定义 暂时地我想把它留在那里 头文件还构成一个具有内联函数的特定类 其中一个函数调用this全局函数 源文件不包含任何有问题的全局函数 有关错误原因的任何提示吗 如果有人感兴趣的话我可以发布代码 mainwindo
  • 使用内部构造函数实例化类

    我有一个类 其构造函数被定义为内部 这意味着我无法实例化它 虽然这可能有道理 但出于调试和研究目的 我仍然愿意做一次 是否可以通过反射来做到这一点 我知道我可以访问私有 内部成员 但是我可以调用内部构造函数吗 或者 由于构造函数没有做任何重
  • 实体框架中的重复键异常?

    我试图捕获当我将具有给定用户名的现有用户插入数据库时 引发的异常 正如标题所说 我正在使用 EF 当我尝试将用户插入数据库时 引发的唯一异常是 UpdateException 如何提取此异常以识别其是否是重复异常或其他异常 catch Up
  • 为什么 LinkedList 通常比 List 慢?

    我开始在我的一些 C 算法中使用一些 LinkedList 而不是列表 希望能够加快速度 然而 我注意到他们只是感觉更慢 像任何优秀的开发人员一样 我认为我应该尽职调查并验证我的感受 所以我决定对一些简单的循环进行基准测试 我认为用一些随机
  • C# ConfigurationManager 从 app.config 检索错误的连接字符串

    我有一个简单的 WinForms 应用程序 它最终将成为一个游戏 现在 我正在研究它的数据访问层 但遇到了障碍 我创建了一个单独的项目 名为DataAccess在其中 我创建了一个本地 mdfSQL Server 数据库文件 我还创建了一个
  • 禁用除滚动之外的 DataGridView

    我如何配置 datagridview 以便用户只能在行中移动并使用滚动 而没有其他 如果我禁用网格不允许我使用滚动 将您的 datagridview 设置为只读 这将禁用任何编辑 dataGridView1 ReadOnly true 在你
  • 将占位符文本添加到文本框

    我正在寻找一种将占位符文本添加到文本框的方法 就像在 html5 中使用文本框一样 IE 如果文本框没有文本 则会添加文本Enter some text here 当用户单击它时 占位符文本消失并允许用户输入自己的文本 如果文本框失去焦点并
  • 如何在 C++ 的子目录中创建文件?

    这是我的代码 如何在子目录联系人中创建文件 每次创建该文件时 它都会出现在与我的程序相同的目录中 int main ofstream myfile contacts myfile open a myfile close 在构造函数中指定完整
  • Windows 程序如何临时更改其时区?

    我写了一个函数来返回time t与给定日期的午夜相对应的值 当给定日期没有午夜时 它返回最早可用的时间 例如 当埃及进入夏令时时 这种情况就可能发生 今年 时间更改于 4 月 29 日晚上午夜生效 因此时钟直接从 23 59 转到 01 0
  • Type_traits *_v 变量模板实用程序顺序无法编译

    看过了这个答案 https stackoverflow com a 31763111 7151494 我试图想出一个变量模板从中获取代码的实用程序 template
  • 基于 C++ 范围的 for 循环

    尝试使用基于范围的 for 循环执行某些操作 可以使用常规的 for 循环来完成 如下所示 vector
  • 如何使用eclipse构建C++应用程序

    我已经从以下位置下载了 Eclipse Juno for C here http www eclipse org downloads download php file technology epp downloads release ju
  • 停止 TcpListener 的正确方法

    我目前正在使用 TcpListener 来处理传入连接 每个连接都有一个线程用于处理通信 然后关闭该单个连接 代码如下 TcpListener listener new TcpListener IPAddress Any Port Syst
  • 统一;随机物体移动[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我正在制作一款机器人战斗游戏 我希望敌人随机移动 然后有时会向敌人移动 我希望运动包含在其中的代码 else if avoid fal
  • 无效的模板相关成员函数模板推导 - 认为我正在尝试使用 std::set

    我有一个继承自基类模板的类模板 基类模板有一个数据成员和一个成员函数模板 我想从我的超类中调用它 我知道为了消除对成员函数模板的调用的歧义 我必须使用template关键字 我必须明确引用this在超级班里 this gt base mem
  • C 中的静态和动态绑定(严格来说是 C,而不是 C++)是什么?

    我最初对发布这个问题感到担忧 以免它重复 但即使在谷歌搜索了许多关键字之后 我在 StackOverflow 上找不到任何解释 C 的静态和动态绑定的链接 尽管有 C 的问题和答案 但是都涉及classes以及显然不适合 C 的东西 Sta
  • 计算两个日期之间的工作日数?

    在C 中 如何计算business 或工作日 两个日期之间的天数 我以前曾经遇到过这样的任务 并且我已经找到了解决方案 当可以避免的时候 我会避免列举其间的所有日子 这里就是这种情况 正如我在上面的一个答案中看到的那样 我什至没有提到创建一
  • 网页执行回发时如何停止在注册表单上?

    我正在做我的最后一年的项目 其中 我在一页上有登录和注册表单 WebForm 当用户点击锚点时Sign Up下拉菜单ddlType 隐藏 和文本框 txtCustName txtEmail and txtConfirmPassword 显示
  • XmlDocument Save 使文件保持打开状态

    我有一个简单的 C 函数 可以创建一个基本的 XML 文件并保存 private void CreateXMlFile string Filename string Name string Company XmlDocument doc n
  • 如何获取通过网络驱动器访问的文件的 UNC 路径?

    我正在 VC 中开发一个应用程序 其中网络驱动器用于访问文件 驱动器由用户手动分配 然后在应用程序中选择驱动器 这会导致驱动器并不总是映射到相同的服务器 我该如何获取此类文件的 UNC 路径 这主要是为了识别目的 这是我用来将普通路径转换为

随机推荐