Kafka 简易教程 C++ 实例

2023-11-11

转自:https://www.cnblogs.com/kaishan1990/p/7228683.html,但是其中实例有细微修改。
Kafka 简易教程
1.初识概念

Apache Kafka是一个分布式消息发布订阅系统。

Topic
Kafka将消息种子(Feed)分门别类, 每一类的消息称之为话题(Topic).

Producer
发布消息的对象称之为话题生产者(Kafka topic producer)

Consumer
订阅消息并处理发布的消息的种子的对象称之为话题消费者(consumers)
Broker
已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。

分区

一个topic可以有一个或多个分区,每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分配了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。

每个分区有一个leader,零或多个follower。Leader处理此分区的所有的读写请求而follower被动的复制数据。如果leader当机,其它的一个follower会被推举为新的leader。

通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。

Kafka 只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要 topic 中所有消息的有序性,那就只能让这个 topic 只有一个分区,当然也就只有一个 consumer 组消费它。

2.安装使用

  1. 下载 Kafka

下载 wget http://apache.01link.hk/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz 或者
wget http://ftp.cuhk.edu.hk/pub/packages/apache.org/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz(看哪个源比较快)
解压 tar -xzf kafka_2.11-0.10.0.0.tgz
进入文件夹 cd kafka_2.11-0.10.0.0/
2. 启动服务

启动 ZooKeeper bin/zookeeper-server-start.sh config/zookeeper.properties &(利用 &放到后台方便继续操作)
启动 Kafka bin/kafka-server-start.sh config/server.properties &
3. 创建一个叫做 dawang 的 topic,它只有一个分区,一个副本

创建 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dawang
查看 bin/kafka-topics.sh --list --zookeeper localhost:2181
还可以配置 broker 让它自动创建 topic
4. 发送消息。Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。

发送消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dawang(然后可以随意输入内容,回车可以发送,ctrl+c 退出)
5. 启动 consumer。可以读取消息并输出到标准输出:

接收消息 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dawang --from-beginning
在一个终端中运行 consumer 命令行,另一个终端中运行 producer 命令行,就可以在一个终端输入消息,另一个终端读取消息。这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。
6. 搭建一个多个 broker 的集群,启动有 3 个 broker 组成的集群,这些 broker 节点也都在本机

首先复制一下配置文件:cp config/server.properties config/server-1.properties 和 cp config/server.properties config/server-2.properties

两个文件需要改动的内容为:

config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
这里我们把 broker id, 端口号和日志地址配置成和之前不一样,然后我们启动这两个 broker:

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
然后创建一个复制因子为 3 的 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic oh3topic

可以使用 describe 命令来显示 topic 详情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oh3topic
Topic:oh3topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: oh3topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
我们也可以来看看之前的另一个 topic 的情况

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic dawang
Topic:dawang PartitionCount:1 ReplicationFactor:1 Configs:
Topic: dawang Partition: 0 Leader: 0 Replicas: 0 Isr: 0
最后我们可以按照同样的方法来生产和消费消息,例如
#生产
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic oh3topic

消费

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic oh3topic
开俩终端就可以一边生产消息,一边消费消息了。

测试一下容错. 干掉leader,也就是Broker 1:

ps -ef | grep server-1.properties

Leader被切换到一个follower上节, 点 1 不会被列在isr中了,因为它死了:

再次使用 describe 命令来显示 topic 详情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oh3topic

但是,消息没丢啊,不信你试试:

彻底删除kafka中的topic

1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录
2、Kafka 删除topic的命令是:

如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic oh3topic

3.代码实例

需要自行安装librdkafka库

https://github.com/edenhill/librdkafka

produce

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <list>
#include <memory>
#include <string>
#include <string.h>
#include "librdkafka/rdkafkacpp.h"
//#include "librdkafka/rdkafka.h"
using namespace std;

bool run = true;

class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb
{
 public:
  void dr_cb (RdKafka::Message &message) {
    std::cout << "Message delivery for (" << message.len() << " bytes): " <<
        message.errstr() << std::endl;
    if (message.key())
      std::cout << "Key: " << *(message.key()) << ";" << std::endl;
  }
};


class ExampleEventCb : public RdKafka::EventCb {
 public:
  void event_cb (RdKafka::Event &event) {
    switch (event.type())
    {
      case RdKafka::Event::EVENT_ERROR:
        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
          run = false;
        break;

      case RdKafka::Event::EVENT_STATS:
        std::cerr << "\"STATS\": " << event.str() << std::endl;
        break;

      case RdKafka::Event::EVENT_LOG:
        fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
        break;

      default:
        std::cerr << "EVENT " << event.type() <<
            " (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        break;
    }
  }
};

/* Use of this partitioner is pretty pointless since no key is provided * in the produce() call.so when you need input your key */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
    public:
        int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque)
        {
            std::cout<<"partition_cnt="<<partition_cnt<<std::endl;
            return djb_hash(key->c_str(), key->size()) % partition_cnt;
        }
    private:
        static inline unsigned int djb_hash (const char *str, size_t len)
        {
        unsigned int hash = 5381;
        for (size_t i = 0 ; i < len ; i++)
            hash = ((hash << 5) + hash) + str[i];
        std::cout<<"hash1="<<hash<<std::endl;

        return hash;
        }
};

void TestProducer()
{
    std::string brokers = "localhost";
    std::string errstr;
    std::string topic_str="helloworld_kugou_test";//自行制定主题topic
    MyHashPartitionerCb hash_partitioner;
    int32_t partition = RdKafka::Topic::PARTITION_UA;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump = false;
    int opt;

    int use_ccb = 0;

    //Create configuration objects
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK)
     {
          std::cerr << errstr << std::endl;
          exit(1);
     }

    /* * Set configuration properties */
    conf->set("metadata.broker.list", brokers, errstr);
    ExampleEventCb ex_event_cb;
    conf->set("event_cb", &ex_event_cb, errstr);

    ExampleDeliveryReportCb ex_dr_cb;

    /* Set delivery report callback */
    conf->set("dr_cb", &ex_dr_cb, errstr);

    /* * Create producer using accumulated global configuration. */
    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer)
    {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }

    std::cout << "% Created producer " << producer->name() << std::endl;

    /* * Create topic handle. */
    RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
    if (!topic) {
      std::cerr << "Failed to create topic: " << errstr << std::endl;
      exit(1);
    }

    /* * Read messages from stdin and produce to broker. */
    for (std::string line; run && std::getline(std::cin, line);)
    {
        if (line.empty())
        {
            producer->poll(0);
            continue;
        }

      /* * Produce message // 1. topic // 2. partition // 3. flags // 4. payload // 5. payload len // 6. std::string key // 7. msg_opaque? NULL */
      std::string key=line.substr(0,5);//根据line前5个字符串作为key值
      // int a = MyHashPartitionerCb::djb_hash(key.c_str(),key.size());
      // std::cout<<"hash="<<a<<std::endl;
      RdKafka::ErrorCode resp = producer->produce(topic, partition,
          RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
          const_cast<char *>(line.c_str()), line.size(),
          key.c_str(), key.size(), NULL);//这里可以设计key值,因为会根据key值放在对应的partition
        if (resp != RdKafka::ERR_NO_ERROR)
            std::cerr << "% Produce failed: " <<RdKafka::err2str(resp) << std::endl;
        else
            std::cerr << "% Produced message (" << line.size() << " bytes)" <<std::endl;
        producer->poll(0);//对于socket进行读写操作。poll方法才是做实际的IO操作的。return the number of events served
    }
    //
    run = true;

    while (run && producer->outq_len() > 0) {
      std::cerr << "Waiting for " << producer->outq_len() << std::endl;
      producer->poll(1000);
    }

    delete topic;
    delete producer;
}
 
int main(int argc, char *argv[])
{
    TestProducer();
    return EXIT_SUCCESS;
}

consumer

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <list>
#include <memory>
#include <string>
#include <string.h>
#include "librdkafka/rdkafkacpp.h"
using namespace std;

bool run = true;
bool exit_eof = true;
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb
{
 public:
  void dr_cb (RdKafka::Message &message) {
    std::cout << "Message delivery for (" << message.len() << " bytes): " <<
        message.errstr() << std::endl;
    if (message.key())
      std::cout << "Key: " << *(message.key()) << ";" << std::endl;
  }
};


class ExampleEventCb : public RdKafka::EventCb {
 public:
  void event_cb (RdKafka::Event &event) {
    switch (event.type())
    {
      case RdKafka::Event::EVENT_ERROR:
        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
          run = false;
        break;

      case RdKafka::Event::EVENT_STATS:
        std::cerr << "\"STATS\": " << event.str() << std::endl;
        break;

      case RdKafka::Event::EVENT_LOG:
        fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
        break;

      default:
        std::cerr << "EVENT " << event.type() <<
            " (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        break;
    }
  }
};

/* Use of this partitioner is pretty pointless since no key is provided * in the produce() call.so when you need input your key */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
    public:
        int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque)
        {
            std::cout<<"partition_cnt="<<partition_cnt<<std::endl;
            return djb_hash(key->c_str(), key->size()) % partition_cnt;
        }
    private:
        static inline unsigned int djb_hash (const char *str, size_t len)
        {
        unsigned int hash = 5381;
        for (size_t i = 0 ; i < len ; i++)
            hash = ((hash << 5) + hash) + str[i];
        std::cout<<"hash1="<<hash<<std::endl;

        return hash;
        }
};

void msg_consume(RdKafka::Message* message, void* opaque)
{
    switch (message->err())
    {
        case RdKafka::ERR__TIMED_OUT:
            break;

        case RdKafka::ERR_NO_ERROR:
          /* Real message */
            std::cout << "Read msg at offset " << message->offset() << std::endl;
            if (message->key())
            {
                std::cout << "Key: " << *message->key() << std::endl;
            }
            printf("%.*s\n", static_cast<int>(message->len()),static_cast<const char *>(message->payload()));
            break;
        case RdKafka::ERR__PARTITION_EOF:
              /* Last message */
              if (exit_eof)
              {
                  run = false;
                  cout << "ERR__PARTITION_EOF" << endl;
              }
              break;
        case RdKafka::ERR__UNKNOWN_TOPIC:
        case RdKafka::ERR__UNKNOWN_PARTITION:
            std::cerr << "Consume failed: " << message->errstr() << std::endl;
            run = false;
            break;
    default:
        /* Errors */
        std::cerr << "Consume failed: " << message->errstr() << std::endl;
        run = false;
    }
}
class ExampleConsumeCb : public RdKafka::ConsumeCb {
    public:
        void consume_cb (RdKafka::Message &msg, void *opaque)
        {
            msg_consume(&msg, opaque);
        }
};
void TestConsumer()
{
    std::string brokers = "localhost";
    std::string errstr;
    std::string topic_str="helloworld_kugou_test";//helloworld_kugou
    MyHashPartitionerCb hash_partitioner;
    int32_t partition = RdKafka::Topic::PARTITION_UA;//为何不能用??在Consumer这里只能写0???无法自动吗???
    partition = 0;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump = false;
    int opt;

    int use_ccb = 0;

    //Create configuration objects
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK)
    {
        std::cerr << errstr << std::endl;
        exit(1);
    }

    /* * Set configuration properties */
    conf->set("metadata.broker.list", brokers, errstr);
    ExampleEventCb ex_event_cb;
    conf->set("event_cb", &ex_event_cb, errstr);

    ExampleDeliveryReportCb ex_dr_cb;

    /* Set delivery report callback */
    conf->set("dr_cb", &ex_dr_cb, errstr);
    /* * Create consumer using accumulated global configuration. */
    RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
    if (!consumer)
    {
      std::cerr << "Failed to create consumer: " << errstr << std::endl;
      exit(1);
    }

    std::cout << "% Created consumer " << consumer->name() << std::endl;

    /* * Create topic handle. */
    RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr);
    if (!topic)
    {
      std::cerr << "Failed to create topic: " << errstr << std::endl;
      exit(1);
    }

    /* * Start consumer for topic+partition at start offset */
    RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
    if (resp != RdKafka::ERR_NO_ERROR) {
      std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl;
      exit(1);
    }

    ExampleConsumeCb ex_consume_cb;

    /* * Consume messages */
    while (run)
    {
        if (use_ccb)
        {
            consumer->consume_callback(topic, partition, 1000, &ex_consume_cb, &use_ccb);
      }
      else
      {
          RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
          msg_consume(msg, NULL);
          delete msg;
      }
      consumer->poll(0);
    }

    /* * Stop consumer */
    consumer->stop(topic, partition);

    consumer->poll(1000);

    delete topic;
    delete consumer;
}
 
int main(int argc, char *argv[])
{
    TestConsumer();
    return EXIT_SUCCESS;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 简易教程 C++ 实例 的相关文章

  • java正则表达式-案例代码

    转载请注明出处 http blog csdn net droyon article details 8635735 jdk在线api http docs oracle com javase 7 docs api index html 选择j
  • 使用 Element 实现首页

    完成首页 使用 Container 布局容器 结构
  • 背景渐变,多层背景图

    背景渐变 多层背景图 一 线性渐变 linear gradient 二 径向渐变 三 背景大小 一 线性渐变 linear gradient 为了创建一个线性渐变 你必须至少定义两种颜色结点 颜色结点即你想要呈现平稳过渡的颜色 同时 你也可
  • Relying upon circular references is discouraged and they are prohibited by default.循环依赖bug解决

    Relying upon circular references is discouraged and they are prohibited by default 循环依赖bug解决 出现的bug大概意思是 不鼓励依赖循环引用 默认情况下
  • 肌电信号采集电路分析

    最近在开发肌电信号的采集 表面肌电信号是非常微弱的生物信号 正常人体表面肌电信号赋值为0 1 5mV 主要能量频段集中在10 150Hz 电路主要是根据原始信号 设计相应的放大电路 滤波电路 下面直接放原理图说明 一级放大电路 一级放大电路
  • Spring MVC使用注解实现账户注册与登陆

    涉及到的注解 Controller 控制器类 RequestMapping 映射路径 RequestParam 用于将指定的请求参数赋值给方法中的形参 定义域对象 User1 用来接受并封装前台传递的数据 package com wen d
  • Java基础(七)——eclipse、集合(容器)

    集合 1 集合框架 l 所谓的框架就是一个类库的集合 集合框架就是一个用来表示和操作集合的统一的架构 它包含了实现集合的接口和类 l 集合框架中不同的集合类有各自不同的数据结构 所以在使用中要根据应用的性能要求来选择不用的集合类 l 集合类
  • C语言指针初阶

    思维导图 目录 编辑 1 指针与内存 1 1内存中的门牌号 地址 是如何编号的 1 2一个指针的大小 在32位平台上 1 3内存总空间大小 2 指针的使用 2 1指针的构成 2 2指针类型的作用 1 决定指针访问的权限的大小 2 决定指针走
  • 在家带娃有没有什么副业可以做?在家带娃的副业有哪些?

    很多朋友在有了自己的孩子之后 为了给孩子一个良好的成长环境 就会辞去工作专心在家带孩子 不过 考虑生活开销的支出问题 还是会想要做点副业赚钱 那在家带孩子可以做什么副业呢 1 数据录入 这是一个大行业 几乎所有人都可以做 需要的仅仅是一台能
  • Oracle TNS简述

    什么是TNS TNS是Oracle Net的一部分 专门用来管理和配置Oracle数据库和客户端连接的一个工具 在大多数情况下客户端和数据库要通讯 必须配置TNS 当然在少数情况下 不用配置TNS也可以连接Oracle数据库 比如通过JDB
  • java使用xfire创建和调用webservices

    1 创建工程 File gt New gt Web Service Project 弹出Web Service Project窗口 需要填写Project Name 例子是Demo 选择XFire 然后一路next 直到完成 创建完成后 打
  • 如何让Element UI的Message消息提示每次只弹出一个

    Element UI的Message消息提示是点击一次触发一次的 在开发的时候经常会作为一些校验提示 但是公司的测试人员在进行测试时会一直点 然后就会出现如下图的情况 虽然客户使用的时候一般来说不会出现这种情况 毕竟客户不会闲着没事一直点点
  • 用 visio 2013反转图形或镜像图形

    软件版本 visio 2013 选择待反转图形 依次点击 开始 位置 旋转形状 即可根据需求进行图形反转 若需要进行图形镜像操作 可提前将图形复制出一份 将复制出来的图形进行垂直反转或水平反转
  • 2020年tensorflow定制训练模型笔记(3)——开始训练

    现在 我们开始训练文件 这里 我将用一些训练好的模型来做迁移训练 这里简单介绍一下迁移训练 迁移训练就是在别人训练的模型基础上继续训练 这样我们的模型训练就不是从零开始 加快速度以及提高准确率 我讲的比较简单 参考网址在这 https bl
  • 华为OD题目:快递投放问题

    华为OD题目 快递投放问题 有N个快递站点用字符串标识 某些站点之间有道路连接 每个站点有一些包裹要运输 每个站点间的包裹不重复 路上有检查站会导致部分货物无法通行 计算哪些货物无法正常投递 输入描述 1 第一行输入M N M个包裹N个道路
  • python中的random和range

    random import random print random randint 1 10 产生 1 到 10 的一个整数型随机数 包括1和10 print random random 产生 0 到 1 之间的随机浮点数 print ra
  • 靶场 : upload-labs1-10

    搭建 用phpstudy搭建的 搭建很简单 下载源码 放置在phpstudy的根目录下 在phpstudy中创建 步骤 这里上传的文件内容是一句话木马 pass 1 上传一个php文件试一试 传不上的 使用的白名单过滤的 我们考虑一下是什么
  • 12个超好用的配色网站

    每次做海报做ppt的时候总是为配色发愁 到底怎样才能调出好看的配色方案呢 调着调着感觉自己已经是个色盲了 今天小编就给你们带来了福利 12个超好用的配色网站推荐 01 Material Palette 网站地址 http www mater
  • 【SpringCloud】四、Spring Cloud Config

    Spring Cloud Config 前言 一 什么是配置中心 1 为什么需要分布式配置中心 2 常用分布式配置中心框架 二 什么是Spring Cloud Config 1 Springcloud config 的工作原理 2 构建 S
  • truffle教程

    直接在geth的控制台通过solc进行编译部署的示例已经很多了 比如这篇博客 此处不再赘述 本文主要演示怎样通过truffle部署以太坊智能合约 truffle是一个以太坊智能合约开发框架 它会帮你做很多琐碎的事情 安装使用都很简单 1 安

随机推荐