发送大量消息 Kafka Producer

2023-12-23

我正在使用卡夫卡。

我有一个包含 10k json 的列表。

目前我发送的 Json 如下:

for(int i=0 ;i< jsonList.size(); i++){
     ProducerRecord<K,V> record = new ProducerRecord(topic, jsonList[i]);
     producer.send(record);
}

发送每条消息。

我想将列表发送到 kafka 并让 kafka 在 json 后发送 json (不是一条包含所有 json 字符串的消息),如下所示:

ProducerRecord<K,V> record = new ProducerRecord(topic, jsonList);
producer.send(record);

我该怎么做?

Thanks


正式通过使用KafkaProducer and producerRecord你不能这样做,但你可以通过配置一些属性来做到这一点ProducerConfig

批量大小从文档生成器将记录批量发送到发送到同一分区的请求并立即发送它们

每当多个记录发送到同一分区时,生产者将尝试将记录一起批处理为更少的请求。这有助于提高客户端和服务器的性能。此配置控制默认批量大小(以字节为单位)。 不会尝试批量记录大于此大小的记录。

徘徊者该设置用于生产者的延迟时间,让生产者保持一段时间,以便同时所有请求都将被批量发送,但是批量大小是上限,如果生产者获得足够的批量大小,它将忽略此属性并向 kafka 发送批量消息

生产者将请求传输之间到达的所有记录分组到单个批处理请求中。此设置通过添加少量人为延迟来实现此目的 - 也就是说,生产者不会立即发送记录,而是等待给定的延迟以允许发送其他记录,以便可以将发送一起批处理。此设置给出了批处理延迟的上限:一旦我们得到了batch.size的值无论此设置如何,都会立即发送分区的记录。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

发送大量消息 Kafka Producer 的相关文章

  • 无法对 @KafkaListener 带注释的方法进行单元测试

    我正在尝试在 Spring 中对 kafka 消费者类进行单元测试 我想知道如果 kafka 消息发送到它的主题 则侦听器方法被正确调用 我的消费者类注释如下 KafkaListener topics kafka topics myTopi
  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst
  • Spark:将 bytearray 转换为 bigint

    尝试使用 pyspark 和 Spark sql 将 kafka 键 二进制 字节数组 转换为 long bigint 会导致数据类型不匹配 无法将二进制转换为 bigint 环境详情 Python 3 6 8 Anaconda custo
  • KafkaStreams 同一应用程序中的多个流

    我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策 假设我想将两个不同的事件放入其中KTables 我有一个制作人将这些消息发送给KStream那就是听那个话题 据我所知 我不能对消息使用条件转发KafkaStrea
  • 安装 confluence-kafka 时“文件名或扩展名太长”?

    我在使用 pip install confluence kafka 安装 confluence kafka 时遇到一些问题 但我收到此错误 文件名或扩展名太长 详细信息如下 Collecting confluent kafka Using
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • 无法向 kafka 主题发送消息

    我正在使用 Kafka Play 以及 Scala 这是我的代码 我想在其中发送消息到kafka服务器 主题名称是 测试主题 尽管我没有在主题中看到我发送的消息 但我没有收到任何错误 这里有什么问题吗 import kafka produc
  • Apache Kafka 与 Apache Storm

    Apache Kafka 分布式消息系统Apache Storm 实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据 就实时数据管道而言 在我看来 两者的工作都是相同的 我们如何在数据管道上使用这两种技术 您可以使用 Apa
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • kafka Avro 多个主题的消息反序列化器

    我正在尝试以 avro 格式反序列化 kafka 消息 我使用以下代码 https github com ivangfr springboot kafka debezium ksql blob master kafka research c
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • Kafka - 如何同时使用过滤器和过滤器?

    我有一个 Kafka 流 它从一个主题获取数据 并且需要将该信息过滤到两个不同的主题 KStream
  • Kafka Streams - 跳跃窗口 - 去重键

    我正在 4 小时窗口上进行跳跃窗口聚合 每 5 分钟前进一次 由于跳跃窗口重叠 我得到了具有不同聚合值的重复键 TimeWindows of 240 60 1000L advanceBy 5 60 1000L 如何消除具有重复数据的重复键或
  • Kafka 分区键无法正常工作

    我正在努力解决如何正确使用分区键机制的问题 我的逻辑是设置分区号为3 然后创建三个分区键为 0 1 2 然后使用分区键创建三个KeyedMessage 例如 KeyedMessage 主题 0 消息 KeyedMessage 主题 1 消息
  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 使用表白名单选项更新 Debezium MySQL 连接器

    我正在使用 Debezium 0 7 5 MySQL 连接器 并且我试图了解如果我想使用以下选项更新此配置 最好的方法是什么table whitelist 假设我创建了一个连接器 如下所示 curl i X POST H Accept ap
  • Kafka Connect Confluence S3 Sink 连接器:找不到类 io.confluence.connect.avro.AvroConverter

    使用此 Kafka Connect 连接器 https www confluence io hub confluenceinc kafka connect s3 https www confluent io hub confluentinc
  • 使用 offsets_for_times 从时间戳消费

    尝试使用 confluence kafka AvroConsumer 来消费给定时间戳的消息 if flag creating a list topic partitons to search list map lambda p Topic

随机推荐

  • Networkx 副本说明

    根据doc http networkx lanl gov reference generated networkx Graph copy html 看来networkx copy方法对图进行深度复制 我最关心的是声明 这将生成图的完整副本
  • 限制可以创建 PHP 类的内容

    我有两个班级 A 和 B 在应用程序逻辑中 除了 A 类之外 任何人都不允许创建 B 类的对象 但是 由于我不想将这两个类放在同一个文件中 因此我无法使用 私有 属性来限制它 是否有可能创建这种限制 如果 A 之外的其他人尝试创建 B 类的
  • AutoMapper 的替代品 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 除了 AutoMapper 之外 NET 中的对象到对象映射还有哪些不同的替代框架 目前我们计划使用
  • Android 选项卡文本颜色[重复]

    这个问题在这里已经有答案了 可能的重复 Android 以编程方式更改选项卡文本颜色 https stackoverflow com questions 5577688 android change tab text color progr
  • excel中正则表达式的匹配函数?

    我的工作表中有几个单元格 其中包含ISIN http en wikipedia org wiki International Securities Identification Number 以下是 ISIN 的示例 DE000623100
  • 熊猫测量条件为真时经过的时间

    我有以下数据框 dt binary 2016 01 01 00 00 00 False 2016 01 01 00 00 01 False 2016 01 01 00 00 02 False 2016 01 01 00 00 03 Fals
  • C# 中 & 和 && 运算符有什么区别

    我试图理解之间的区别 and C 中的运算符 我在网上搜索没有成功 有人可以举例说明吗 是按位与运算符 对于整数类型的操作数 它将计算操作数的按位与 结果将是整数类型 对于布尔操作数 它将计算操作数的逻辑与 是逻辑 AND 运算符 不适用于
  • 在 Javascript 中引用 Go 数组

    我有一个 Golang 数组 正在传递到前端的 html 文件 我知道 index Array 0 工作并从数组中提取第一个元素 但我想做一个 Javascript for 循环并打印数组中的每个元素 如下所示
  • 如何在 ksh 中 grep 精确匹配带点的字符串

    在尝试 grep 查找其中包含点的字符串时 我无法获得精确匹配的字符串作为输出 eg grep APPLICATION REFERENCE LOCAL
  • 使用 S3 全球域名的 CloudFront 源的性能是否比区域域名的性能更好?

    我有一个 CloudFront 发行版 其中包含 S3eu west 1作为原点 我知道S3区域域名 bucket name s3 region amazonaws com为我提供即时初始 CloudFront 初始化 无需停机 全球的 b
  • Ionic 4 (Angular 7) - 共享组件问题

    我正在尝试为 Angular 这样的框架做一件极其平常的事情 目标是使用相同的 标头组件 通过共享模块多次组件 我的shared module ts import CommonModule from angular common impor
  • 如何使用 PowerShell 和 CSV 导入更新 AD 用户?

    我正在尝试使用此 powershell 脚本来更新 AD 用户 理想情况下 我将更新一堆属性 但现在我只是试图让它更新部门 以便我知道它是否有效 Import Module ActiveDirectory dataSource import
  • 无需客户端身份验证即可访问 dropbox api?

    我正在尝试创建一个流畅的解决方案 用于通过自定义文件呈现 徽标字体等 来共享文件 我的理想是将文件放在保管箱文件夹中 然后在单独的服务器上创建一个网页 通过 JavaScript 访问这些文件并显示链接到实际文件的列表 要求是最终用户不必使
  • BNlearn R 错误“变量 Variable1 必须至少有两个级别。”

    尝试使用 BNlearn 创建 BN 但我不断收到错误 Error in check data data allowed types discrete data types variable Variable1 must have at l
  • 在 JavaScript 中使用 eval() 的主要好处是什么?

    我知道这可能是一个新手问题 但我很好奇它的主要好处eval 它在哪里使用最好 我很感激任何信息 The eval最好使用的功能 从不 它的目的是将字符串作为 Javascript 表达式进行计算 例子 eval x 42 它以前已经被使用过
  • InstallShield LE(使用 Visual Studio 2012)完全无法检测依赖项

    我正在经历一场噩梦 试图让一个简单的安装程序在 InstallShield LE VS 2012 附带的那个 中工作 我可以解决各种各样的问题 例如我不能再执行 全部重建 而不搞乱一切 我需要在开发过程中简单地卸载 I nstallShie
  • Xcode 需要很长时间才能打印调试结果。

    当我在 Xcode 上调试时 需要大约 30 秒或更长时间才能打印结果po在 Xcode 控制台上 不幸的是 这只是我关于这个问题的很少的信息 然而 还有一点需要考虑 这个问题对于一个项目来说是非常具体的 这是因为当我使用po对于同一台 M
  • 有没有办法使用 cloudsql_proxy 可执行文件模拟服务帐户?

    https github com GoogleCloudPlatform cloudsql proxy https github com GoogleCloudPlatform cloudsql proxy 我发现可以通过使用以下命令设置模
  • 如何在类库项目中使用 Server.MapPath

    我有一个包含许多类库项目的 Web 应用程序 下面是一些示例代码 public static class LenderBL static string LenderXml get return MyPathHere public stati
  • 发送大量消息 Kafka Producer

    我正在使用卡夫卡 我有一个包含 10k json 的列表 目前我发送的 Json 如下 for int i 0 i lt jsonList size i ProducerRecord