项目中用到了kafka,系统是C++开发的,没有现成的可集成API,查阅github,发现有rdkafka,这个C库,挺好用的,但是,他依然不够简洁,因此,对他做了一下封装.
#ifndef _KAFKAMQ_H_
#define _KAFKAMQ_H_
#include <vector>
#include <string>
#include <librdkafka/rdkafkacpp.h>
using namespace std;
class KafkaProducer
{
public:
KafkaProducer();
virtual ~KafkaProducer();
public:
void setBrokers(const string& sBrokers) { m_sBrokers = sBrokers; }
public:
const vector<string> &getTopicList() const;
int32_t initialize(const string &sbroker);
void addTopic(const string& sTopic);
void pushData2Topic(const string &sTopic, const string sMsg);
private:
string m_sBrokers;
char* m_szBuf;
RdKafka::Producer* m_pProducer;
RdKafka::Conf* m_pGlobalConf;
RdKafka::Conf* m_pTopicConf;
vector<string> m_vecTopics;
};
class KafkaConsumer
{
public:
KafkaConsumer();
virtual ~KafkaConsumer();
public:
int32_t initializer(const string& sbroker,const string& sGroupId);
int32_t addTopic(const string& sNewTopic);
void addPartitions(const string& sNewTopic);
int32_t subscribe();
public:
void work(int32_t iTimeoutMs = 1000);
virtual void handleConsumeCb(RdKafka::Message *message, void *opaque) = 0;
private:
string m_sBrokers;
int32_t m_iPartition;
int64_t m_iCurrentOffset;
vector<string> m_vecTopics;
string m_sGroupId;
bool m_bInit;
bool m_bSub;
RdKafka::Conf* m_pGlobalConf;
RdKafka::Conf* m_pTopicConf;
RdKafka::KafkaConsumer* m_pConsumer;
std::vector<RdKafka::TopicPartition *> m_vecTopicPartitions;
};
#endif // _KAFKAMQ_H_
上面是他的头文件定义,里面有一个纯虚函数"handleConsumeCb",这是kafka的消费者在接收到要用的数据的时候,按照需求自己去重载,对信息做处理.
#include "KafkaMq.h"
#define HANDLE_NUM_PER_LOOP 100
#define MAX_BUFFER_LENGTH 65535
KafkaProducer::KafkaProducer()
: m_sBrokers()
, m_pProducer(NULL)
, m_pGlobalConf(NULL)
, m_pTopicConf(NULL)
, m_szBuf(NULL)
{
}
KafkaProducer::~KafkaProducer()
{
if (m_pGlobalConf)
{
delete m_pGlobalConf;
m_pGlobalConf = NULL;
}
if (m_pTopicConf)
{
delete m_pTopicConf;
m_pTopicConf = NULL;
}
if (m_pProducer)
{
delete m_pProducer;
m_pProducer = NULL;
}
if(m_szBuf)
{
delete [] m_szBuf;
m_szBuf = NULL;
}
}
int32_t KafkaProducer::initialize(const string &sBroker)
{
if (sBroker.empty())
{
cout<<" KafkaProducer::initialize Parameter error" << endl;
return -1;
}
m_szBuf = new char[MAX_BUFFER_LENGTH];
m_sBrokers = sBroker;
string strError;
m_pGlobalConf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if (!m_pGlobalConf)
{
cout<<"RdKafka create global conf failed" << endl;
return -1;
}
m_pTopicConf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (!m_pTopicConf)
{
cout<<"RdKafka create global conf failed" << endl;
return -1;
}
if (m_pGlobalConf->set("message.max.bytes", "10240000", strError) != RdKafka::Conf::CONF_OK)
{
cout<<"RdKafka conf set message.max.bytes failed :" << strError << endl;
return -1;
}
if (m_pGlobalConf->set("bootstrap.servers", m_sBrokers, strError) != RdKafka::Conf::CONF_OK)
{
cout<<"RdKafka conf set bootstrap.servers failed :" << strError << endl;
return -1;
}
m_pProducer = RdKafka::Producer::create(m_pGlobalConf, strError);
if (!m_pProducer)
{
cout<<"create topic failed :" << strError << endl;
return -1;
}
//从服务端获取已经创建的topic
RdKafka::Metadata *stMetadataMap;
RdKafka::ErrorCode errCode = m_pProducer->metadata(true, nullptr, &stMetadataMap, 5000);
if (errCode != RdKafka::ERR_NO_ERROR)
{
cout<<"get topiclist failed :" << strError << endl;
return -1;
}
const RdKafka::Metadata::TopicMetadataVector *topicList = stMetadataMap->topics();
for (auto itr = topicList->begin(); itr != topicList->end(); itr++)
{
cout<<" find topic: " << (*itr)->topic() << endl;
m_vecTopics.push_back((*itr)->topic());
}
if (stMetadataMap)
{
delete stMetadataMap;
stMetadataMap = NULL;
}
return 0;
}
void KafkaProducer::addTopic(const string& sTopic)
{
if (find(m_vecTopics.begin(), m_vecTopics.end(), sTopic) == m_vecTopics.end())
{
string sErrStr;
RdKafka::Topic *topic = RdKafka::Topic::create(m_pProducer, sTopic, m_pTopicConf, sErrStr);
if (!topic)
{
cout<<"Create Topic failed" << sErrStr << endl;
return;
}
m_vecTopics.push_back(sTopic);
delete topic;
}
else
{
cout<<"this topic is already created!"<< endl;
}
}
const vector<string> &KafkaProducer::getTopicList() const
{
return m_vecTopics;
}
void KafkaProducer::pushData2Topic(const string &sTopic, const string sMsg)
{
if (sMsg.length() > MAX_BUFFER_LENGTH)
{
cout<<"msg length is out of range"<< endl;
return;
}
int64_t iTimestamp = tars::TC_Common::now2ms();
memset(m_szBuf, 0, MAX_BUFFER_LENGTH);
memcpy(m_szBuf, sMsg.c_str(), sMsg.length());
//发送消息
RdKafka::ErrorCode resCode = m_pProducer->produce(sTopic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
(void *)m_szBuf, strlen(m_szBuf),
NULL, 0,
iTimestamp++, NULL);
usleep(10000);
if (resCode != RdKafka::ERR_NO_ERROR)
{
cout<<"Push data failed! " << RdKafka::err2str(resCode) << endl;
}
}
//
KafkaConsumer::KafkaConsumer()
: m_sBrokers()
, m_iPartition(0)
, m_iCurrentOffset(RdKafka::Topic::OFFSET_BEGINNING)
, m_sGroupId()
, m_bInit(false)
, m_bSub(false)
, m_pGlobalConf(NULL)
, m_pTopicConf(NULL)
, m_pConsumer(NULL)
{
m_pGlobalConf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
m_pTopicConf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
}
KafkaConsumer::~KafkaConsumer()
{
if (m_pConsumer)
{
delete m_pConsumer;
m_pConsumer = NULL;
}
if (m_pGlobalConf)
{
delete m_pGlobalConf;
m_pGlobalConf = NULL;
}
if (m_pTopicConf)
{
delete m_pTopicConf;
m_pTopicConf = NULL;
}
for (auto itr = m_vecTopicPartitions.begin(); itr != m_vecTopicPartitions.end(); ++itr)
{
if (NULL != *itr)
{
delete *itr;
*itr = NULL;
}
}
m_vecTopicPartitions.clear();
RdKafka::wait_destroyed(5000);
m_bInit = false;
}
int32_t KafkaConsumer::initializer(const string& sbroker,const string& sGroupId)
{
if(sbroker.empty() || sGroupId.empty())
{
cout<<"KafkaConsumer::initializer : Invalid Parameter" << endl;
return -1;
}
m_sBrokers = sbroker;
m_sGroupId = sGroupId;
string sErrStr;
if (m_pGlobalConf->set("bootstrap.servers", m_sBrokers, sErrStr))
{
cout<<"KafkaConsumer::initializer : set broker failed" << endl;
return -1;
}
if(m_pGlobalConf->set("max.partition.fetch.bytes", "1024000", sErrStr))
{
cout<<"KafkaConsumer::initializer : set max.partition.fetch.bytes failed" << endl;
return -1;
}
if(m_pGlobalConf->set("group.id", m_sGroupId, sErrStr))
{
cout<<"KafkaConsumer::initializer : set group.id failed" << endl;
return -1;
}
if(m_pGlobalConf->set("default_topic_conf", m_pTopicConf, sErrStr))
{
cout<<"KafkaConsumer::initializer : set default topic conf failed" << endl;
return -1;
}
if(m_pTopicConf->set("auto.offset.reset", "latest", sErrStr))
{
cout<<"KafkaConsumer::initializer : set auto.offset.reset failed" << endl;
return -1;
}
m_pConsumer = RdKafka::KafkaConsumer::create(m_pGlobalConf, sErrStr);
if (!m_pConsumer)
{
cout<<"KafkaConsumer::initializer : Failed to create consumer : " << sErrStr << endl;
return -1;
}
m_bInit = true;
return 0;
}
void KafkaConsumer::work(int32_t iTimeoutMs)
{
if(!m_bInit)
{
cout<<"need to init first!" << endl);
return;
}
if(m_bSub)
{
RdKafka::Message *msg = m_pConsumer->consume(iTimeoutMs);
handleConsumeCb(msg, NULL);
delete msg;
}
}
int32_t KafkaConsumer::addTopic(const string & sNewTopic)
{
if(find(m_vecTopics.begin(), m_vecTopics.end(), sNewTopic) != m_vecTopics.end())
{
cout<<sNewTopic << " is already subscribed!" << endl;
return 0;
}
cout<<sNewTopic << " is now subscribing" << endl;
m_vecTopics.push_back(sNewTopic);
if(m_bSub)
{
m_bSub = false;
addPartitions(sNewTopic);
}
return 0;
}
int32_t KafkaConsumer::subscribe()
{
if (!m_bInit)
{
cout<<"need to init first!" << endl;
return -1;
}
if (m_vecTopics.size() == 0)
{
cout<<"KafkaConsumer::subscribe : Invalid param" << endl;
return -1;
}
m_pConsumer->unassign();
m_pConsumer->unsubscribe();
string sErrStr;
RdKafka::ErrorCode errCode;
m_pConsumer->assign(m_vecTopicPartitions);
errCode = m_pConsumer->subscribe(m_vecTopics);
if (errCode != RdKafka::ERR_NO_ERROR)
{
cout<<"KafkaConsumer::subscribe : " << RdKafka::err2str(errCode) << endl;
return -1;
}
m_bSub = true;
return 0;
}
void KafkaConsumer::addPartitions(const string& sNewTopic)
{
if(find(m_vecTopics.begin(), m_vecTopics.end(), sNewTopic) == m_vecTopics.end())
{
cout<<sNewTopic << " is now subscribed!"<<endl;
m_vecTopics.push_back(sNewTopic);
}
RdKafka::Metadata *pMetadataMapPtr;
RdKafka::ErrorCode errCode = m_pConsumer->metadata(true, nullptr, &pMetadataMapPtr, 2000);
if (errCode != RdKafka::ERR_NO_ERROR)
{
cout<<"KafkaConsumer::subscribe : " << RdKafka::err2str(errCode) << endl;
return ;
}
const RdKafka::Metadata::TopicMetadataVector *topicList = pMetadataMapPtr->topics();
RdKafka::Metadata::TopicMetadataVector subTopicMetaVec;
for (auto itr = topicList->begin(); itr != topicList->end(); itr++)
{
auto sitr = find(m_vecTopics.begin(), m_vecTopics.end(), (*itr)->topic());
if(sitr != m_vecTopics.end())
{
subTopicMetaVec.push_back(*itr);
}
else
{
cout<<"Can not find this topic from server" << endl;
return;
}
}
for (auto itr = subTopicMetaVec.begin(); itr != subTopicMetaVec.end(); itr++)
{
const RdKafka::TopicMetadata *data = *itr;
auto parVec = data->partitions();
for(auto pItr = parVec->begin(); pItr != parVec->end(); pItr++)
{
m_vecTopicPartitions.push_back(RdKafka::TopicPartition::create(data->topic(), (*pItr)->id(), RdKafka::Topic::OFFSET_END));
}
}
if (pMetadataMapPtr)
{
delete pMetadataMapPtr;
pMetadataMapPtr = NULL;
}
}
以上是相关函数的实现代码.
测试程序如下
// 消息订阅者测试程序
int testConsumer()
{
KafkaConsumer stComsumer;
int iRet = stComsumer.initializer("127.0.0.1:9092", "demoGroup");
if (iRet)
{
return -1;
}
stComsumer.addTopic("my_first_topic");
stComsumer.addTopic("testTopic");
stComsumer.subscribe();
while (true)
{
stComsumer.work(5000);
}
RdKafka::wait_destroyed(5000);
return 0;
}
// 消息发布者测试
int testProducer()
{
KafkaProducer producer;
int iRet = producer.initialize("127.0.0.1:9092");
if(iRet)
{
cout<<"Initialize failed!"<<endl;
return -1;
}
producer.addTopic("Market.1");
producer.pushData2Topic("Market.1","test");
vector<string> vecTopics = producer.getTopicList();
for(auto itr = vecTopics.begin(); itr != vecTopics.end(); itr++)
{
producer.pushData2Topic(*itr,"test!");
}
//从新获取topic,然后推送数据
producer.addTopic("test090218");
usleep(60000);
vecTopics = producer.getTopicList();
for(auto itr = vecTopics.begin(); itr != vecTopics.end(); itr++)
{
producer.pushData2Topic(*itr,"testnew topic!");
}
return 0;
}
int main()
{
//testConsumer();
//testProducer();
return 0;
}
上述代码,main函数无法直接运行,因为实际中,除非在测试的时候,将纯虚函数实现(测试的时候,可以直接打印里面内容即可).
然后编译代码的时候,需要增加链接rdkafka的选项(rdkafka++)