在 C++ 中将序列化的 Thrift 结构序列化到 Kafka

2024-02-23

我有一套structs定义于Thrift例如以下内容:

struct Foo {
  1: i32 a,
  2: i64 b
}

我需要执行以下操作C++:

(a) 序列化实例Foo转换为 Thrift 兼容字节(使用Binary or Compact节俭协议)

(b) 将字节序列化实例发送到Kafka topic

Question

我如何发送Thrift将实例序列化为Kafka簇?

提前致谢


找到了我自己问题的答案。

序列化

下面的代码片段说明了如何序列化一个实例Foo to Thrift- 兼容字节(使用 ThriftCompact协议)。为了使用Binary协议,替换TCompactProtocol with TBinaryProtocol.

#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TCompactProtocol.h>

using apache::thrift::protocol::TCompactProtocol;
using apache::thrift::transport::TMemoryBuffer;

...
...
boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer());
boost::shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(buffer));
uint8_t **serialized_bytes = reinterpret_cast<uint8_t **>(malloc(sizeof(uint8_t *)));
uint32_t num_bytes = 0;

// 'foo' is an instance of Foo
foo->write(protocol.get());
buffer->getBuffer(serialized_bytes, &num_bytes);

发送到Kafka集群

以下代码片段说明了如何将 Thrift 兼容字节发送到 Kafka 集群。

NOTE:下面使用的kafka客户端库是自由卡夫卡 https://github.com/edenhill/librdkafka.

#include "rdkafkacpp.h"

std::string errstr;

// Create global configuration
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", "localhost:9092", errstr);
conf->set("api.version.request", "true", errstr);

// Create kafka producer
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);

// Create topic-specific configuration
RdKafka::Topic *topic = RdKafka::Topic::create(producer, "topic_name", nullptr, errstr);

auto partition = 1;

// Sending the serialized bytes to Kafka cluster
auto res = producer->produce(
    topic, partition,
    RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
    serialized_bytes, num_bytes,
    NULL, NULL);

  if (res != RdKafka::ERR_NO_ERROR) {
    std::cerr << "Failed to publish message" << RdKafka::err2str(res) << std::endl;
  } else {
    std::cout << "Published message of " << num_bytes << " bytes" << std::endl;
  }

producer->flush(10000);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 C++ 中将序列化的 Thrift 结构序列化到 Kafka 的相关文章

  • 请求的资源不支持 HTTP 方法“GET”

    我的路线配置正确 并且我的方法具有装饰标签 我仍然收到 请求的资源不支持 HTTP 方法 GET 消息 System Web Mvc AcceptVerbs GET POST System Web Mvc HttpGet public st
  • Ruby 解释器嵌入到 C 代码中

    我只是尝试书中的一个简单例子 我有一个 sum rb 文件 class Summer def sum max raise Invalid maximum max if max lt 0 max max max 2 end end 还有一个
  • Rx Framework:在超时时执行操作,而不中断原始可观察序列

    给定一个可观察的源 通过轮询低级设备的 变化 状态生成 observable source metacode IObservable
  • C++ 中的字符串到 LPCWSTR

    我正在尝试从字符串转换为 LPCWSTR 我使用多位 1 例如 LPCWSTR ToLPCWSTR string text LPCWSTR sw LPCWSTR text c str return sw 2 返回中文字符 LPCWSTR T
  • 将 Uploadify 与 Sharepoint 和 .net 结合使用

    我在共享点页面上有一些由 JQuery 生成的 html 我想在这个 html 中使用 uploadify 将文件上传到服务器 亚历山大 https stackoverflow com users 25427 alexander gyosh
  • 如何在Qt无框窗口中实现QSizeGrip?

    如何使用 Qt 无框窗口实现 QSizeGrip 代码会是什么样的 您只需在布局内窗口的一角添加 QSizeGrip 即可使其保持在该角落 QDialog dialog new QDialog 0 Qt FramelessWindowHin
  • string.empty 和 string[0] == '\0' 之间的区别

    假设我们有一个字符串 std string str some value is assigned 有什么区别str empty and str 0 0 C 11 及更高版本 string variable 0 如果字符串为空 则需要返回空字
  • 混合 VS2012 平台工具集

    我们正在从 VS2005 切换到 VS2012 update 2 我们正在构建大量 主要是控制台 本机 C 无 MFC ATL 可执行文件 它们使用几个常见的静态链接库 这些可执行文件主要在 Win7 计算机上运行 但有些也部署在较旧的 X
  • 我需要一个树转储选项,该选项在当前的 gcc 版本中不再存在

    旧版本的 GCC 例如 4 0 2 或 4 1 2 有该选项 df see 用于调试程序或 GCC 的选项对于4 1 2 http gcc gnu org onlinedocs gcc 4 1 2 gcc Debugging Options
  • C# 列表框 ObservableCollection

    我正在尝试使用 ListBox DataSource ObservableCollection 但是我不知道如何在 OC 更新时让列表框自动更新 我可以在 OC 上挂接 CollectionChanged 事件 但是我需要对列表框执行什么操
  • 如何解决素数函数的大O表示法?

    我正在尝试理解 Big O 表示法 很抱歉 如果我问的问题太明显了 但我似乎无法理解这一点 我有以下 C 代码函数 我正在尝试为其计算 Big O 表示法 for i 2 i lt 100 i for j 2 j lt i j j if i
  • tcmalloc/jemalloc 和内存池之间有什么区别(以及选择的理由)?

    tcmalloc jemalloc是改进的内存分配器 还引入了内存池以更好地分配内存 那么它们之间有什么区别以及在我的应用中如何选择它们呢 这取决于您的程序的要求 如果您的程序有更多的动态内存分配 那么您 需要从可用的分配器中选择一个内存分
  • 将 libpng 链接到 android 原生项目

    我在尝试在本机 Android 项目中加载 libpng 时遇到问题 编译器似乎无法识别 libpng 函数 但可以识别类型 如 png byte 它可以正常编译类型 但如果我添加函数 则会抛出错误 这是编译输出 Windows 7 cmd
  • C++ 静态工厂构造函数

    我正在进行模拟 它需要创建多个相当相似的模型 我的想法是有一个名为 Model 的类并使用静态工厂方法来构造模型 例如 模型 createTriangle or 模型 createFromFile 我从以前的 java 代码中汲取了这个想法
  • 是否可以从.NET Core中间件检索控制器的操作结果?

    public class UsersController APIControllerBase public UsersController public Client Get return new Client ClientID 1 Las
  • 第一个随机数始终小于其余随机数

    我碰巧注意到 在 C 中 使用 std rand 方法调用的第一个随机数大多数时候都明显小于第二个随机数 关于 Qt 实现 第一个几乎总是小几个数量级 qsrand QTime currentTime msec qDebug lt lt q
  • 更改预处理到文件后出现错误 1 ​​错误 LNK1104

    我必须使用预处理器 所以我改变了 配置属性 gt C gt 预处理器 gt 预处理为文件 gt 是 并得到错误 错误 1 错误 LNK1104 无法打开文件 Debug asnreal obj 这个问题的解决办法 我必须在 lib 文件的路
  • Python 中的 C 指针算术

    我正在尝试将一个简单的 C 程序转换为 Python 但由于我对 C 和 Python 都一无所知 这对我来说很困难 我被 C 指针困住了 有一个函数采用 unsigned long int 指针并将其值添加到 while 循环中的某些变量
  • Android NDK - 仅用 C/C++ 编写

    有没有一种可能的方法可以使用 C C 编写整个 NDK 应用程序 而无需像 hello jni 示例项目 HelloJni java 中那样的 Java 入门 类 以某种方式创建一个 HelloJni c 来执行相同的操作 从 Androi
  • 从哪里开始阅读 SQLite 源代码? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我想了解sqlite是如何实现的 并且 想阅读源代码 我已经下载了源代码 我应该开始查看代码的哪一部分 SQLite文档页 http

随机推荐