如何利用producer向Kafka发送信息(基于java客户端(Kafka-clients))

2023-11-07

继上一篇Kafka安装以及环境准备文章后,这一小节是讲解如何利用Java客户端kafka-clients库进行消息的发送。

工程结构(maven工程)

maven依赖

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.0</version>
        </dependency>

producer代码

public class MyProducer {

    public static void main(String[] args) {
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers", "192.168.8.135:9092");
        kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer(kafkaProperties);
        ProducerRecord record = new ProducerRecord("test", "name", "water");
        try {
            Future result = producer.send(record);
            System.out.println(result.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

代码解析:

  1. 主要是两个对象:producer和record,record组装想要发送的数据,然后通过producer进行发送。
  2. bootstrap.servers是表示Kafka服务器broker的地址,一般建议提供至少两个broker的地址信息,通过逗号分隔,一旦其中一个死机了,生产者仍然能连接到集群上,我这里的本地环境是单机,所以只填写了一个地址
  3. key.serializer是表示消息中的键的序列化器,网络传输需要进行序列化成字节数组
  4. value.serializer是表示消息中值的序列化器

可能出现的错误

  1. java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms. 造成该错误的原因有可能是防火墙问题,请把虚拟机的防火墙关闭就ok
  2. org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 30534 ms has passed since batch creation plus linger time 这种错误一开始让我百思不得其解,明明Kafka服务器broker的地址没有问题,为什么就是发送数据失败呢?最后通过debug发现了获取到的cluster的地址居然是bogon:9092(下面的图一),这个bogon是什么东西,为什么不是我在代码中的IP地址呢?个人推测是不是host name的问题,于是在kafka中的server.properties添加了图二中的host.name配置,重启Kafka就OK了



简单的producer发送消息的例子到此结束,大部分时间都花在了解决异常问题上,不过收获不少。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何利用producer向Kafka发送信息(基于java客户端(Kafka-clients)) 的相关文章

  • 【CentOS7离线ansible-playbook自动化安装CDH5.16(内附离线安装包地址,及自动化脚本)】

    CentOS7 离线环境 使用ansible自动部署CDH5 16 前言 本文介绍如何使用作者开发的自动化脚本 离线部署cdh集群 只需要简单的配置下yum源和cdh集群节点IP等几个参数 就可实现一键部署cdh集群 省去配置mysql n
  • Zookeeper的常见面试题

    1 Zookeeper 1 1 Zookeeper基本概念 Zookeeper作为一个优秀高效且可靠的分布式协调框架 ZooKeeper 在解决分布式数据一致性问题时并没有直接使用Paxos算法 而是专门定制了一致性协议叫做 ZAB Zoo
  • Spring Boot Kafka - 序列化和反序列化JSON

    文章目录 Spring Boot Kafka 序列化和反序列化JSON 前言 配置JsonSerializer和JsonDeserializer 定义一个Model类 Producer类 Consumer类 Controller类 测试 小
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    一 案例说明 现有一电商网站数据文件 名为buyer favorite1 记录了用户对商品的收藏数据 数据以 t 键分割 数据内容及数据格式如下 二 前置准备工作 项目环境说明 Linux Ubuntu 16 04 jdk 7u75 lin
  • Redis Streams做股票行情MQ?

    redis作为内存数据库 大多时候都是作为缓存来使用 但是因为有pub sub的存在 所以也可以做MQ来使用 做为MQ 它有两个严重的问题 1 无法持久化 2 没有ack机制 redis pub sub是一个要即时消费的MQ 如果消费晚了
  • 《从Paxos到ZooKeeper》读书笔记之第一章(二)

    从Paxos到ZooKeeper 读书笔记之第一章 二 1 2从ACID到CAP BASE 这一节由三小节 从大家数值的数据库事务的四个特性 引出来分布式事务的概念 通过对ACID模型的讨论 提出如何构建一个兼顾可用性和一致性的分布式系统方
  • kafka处理快速的原因

    生产者分析 生产者 producer 是负责向Kafka提交数据的 我们先分析这一部分 Kafka会把收到的消息都写入到硬盘中 它绝对不会丢失数据 为了优化写入速度Kafka采用了两个技术 顺序写入 和 MMFile 顺序写入 因为硬盘是机
  • Kafka/Spark消费topic到写出到topic

    1 Kafka的工具类 1 1 从kafka消费数据的方法 消费者代码 def getKafkaDStream ssc StreamingContext topic String groupId String consumerConfigs
  • Kafka原理分析

    在基础篇中我们介绍MQ的一些基础原理 这篇文章 我们针对kafka进行较深入的分析 上篇文章中我们提到了kafka中一个名词broker 其实broker可以理解成为一台kafa服务器 kafka的特性和功能 在kafka设计之初是为了实时
  • springboot本机启动elasticjob抛出异常HostException(ip is null)

    1 使用的elasticjob版本为3 0 1 2 本机的IPV4在校验isReachable 返回false 可能是使用无线网 导致ip验证问题 3 最后引入Groovy解决 引入包
  • Linux 下搭建 Kafka 环境

    安装步骤 准备软件目录 mkdir datalake 上传之前下载好的安装包到 datalake 目录下 jdk 8u181 linux x64 gz kafka 2 11 2 1 0 tgz zookeeper 3 4 5 tar gz
  • Kafka一致性

    一 存在的一致性问题 1 生产者和Kafka存储一致性的问题 即生产了多少条消息 就要成功保存多少条消息 不能丢失 不能重复 更重要的是不丢失 其实就是要确保消息写入成功 这可以通过acks 1来保证 保证所有ISR的副本都是一致的 即一条
  • Kafka一文懂

    初识 Kafka 什么是 Kafka Kafka 是由 Linkedin 公司开发的 它是一个分布式的 支持多分区 多副本 基于 Zookeeper 的分布式消息流平台 它同时也是一款开源的基于发布订阅模式的消息引擎系统 Kafka 的基本
  • windows python kafka 初级使用

    今天花了点时间在这个kafka上 因为我们工作中也用到了kafka 我这边对于kafka的理解是能用或者知道基本原理就行 实现在自己的windows环境搭建一次kafka 然后使用python进行数据的生产和消费 如果之后工作中对于kafk
  • 附录:kafka源码启动

    本文以源码2 8为例 准备如下 idea 2019 1 4 jdk 1 8 scala 2 12 8 gradle 6 8 1 zookeeper 3 4 10 kafka2 8源码 注意 以下安装都需要装在没有空格的路径上 比如D Pro
  • kafka配置内外网访问

    listeners 学名叫监听器 其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务 advertised listeners 和 listeners 相比多了个 advertised Advertised 的
  • [Docker]使用Docker部署Kafka

    Kafka 是一个分布式流处理平台 它依赖于 ZooKeeper 作为其协调服务 在 Kafka 集群中 ZooKeeper 负责管理和协调 Kafka 的各个节点 因此 要在 Docker 容器中启动 Kafka 通常需要同时启动一个 Z
  • WebSocket + kafka实时推送数据(springboot纯后台)

    逻辑 kafka订阅消费者主题 消费后通过webSocket推送到前端 kafka vue financial webSocket 学习引用 SpringBoot2 0集成WebSocket 实现后台向前端推送信息 World Of Mos
  • Kafka——Mac搭建kafka环境

    1 下载Kafka安装包 下载地址 将压缩包移动到 usr local mv kafka 2 12 3 1 0 tgz usr local 解压 tar zxvf kafka 2 12 3 1 0 tgz 2 启动 启动zookeeper
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台 用于实时捕获和发布数据库更改事件 它可以将关系型数据库 如 MySQL PostgreSQL Oracle 等 的变更事件转化为可观察的流数据 以供其他应用程序实时消费和处理 本文中我们将采用 De

随机推荐

  • 计算机二级excel评选投票,计算机二级-31-2.电子表格题(小刘-评选投票)

    1 首先查看本节课的课程题目 要求如图所示 2 然后打开 考试素材 具体目录如图所示 3 然后是 第一小题 修改文件名为Excel 进行另存 默认一般跟素材文件夹目录相同 4 第二小题 题目要求如图所示 5 首先检查 代码地址是否对应 然后
  • 正则表达式攻略

    第一章 正则表达式字符匹配攻略 正则表达式是匹配模式 要么匹配字符 要么匹配位置 请记住这句话 然而关于正则如何匹配字符的学习 大部分人都觉得这块比较杂乱 毕竟元字符太多了 看起来没有系统性 不好记 本章就解决这个问题 内容包括 两种模糊匹
  • Java用File完成硬盘文件的增删查改操作

    我的项目环境是eclipse 如果用的是idea的话最好创建好一个同类名的 class文件 因为字符编码不同 我用的是GBK 如果报错或者出现中文注释不出现的话 最好搜一下idea怎么改字符编码 package com bili demo3
  • siebel escript入门——基础知识一

    1 检测escript变量是否已定义 var test if typeof test undefined TheApplication RaiseErrorText test is undefined 2 escript的基本数据类型 es
  • 立式oled拼接屏有哪些产品优点?

    葫芦岛oled拼接屏是一种高清晰度的显示屏 由多个oled屏幕拼接而成 它可以用于广告牌 展览 演示 会议等场合 具有高亮度 高对比度 高色彩饱和度 高刷新率等优点 能够吸引人们的眼球 提高信息传递效果 葫芦岛oled拼接屏的优点主要有以下
  • 02node.js-◆ 模块化的基本概念 ◆ Node.js 中模块的分类 ◆ npm与包 ◆ 模块的加载机制

    学习内容 模块化的基本概念 Node js 中模块的分类 npm与包 模块的加载机制 1 模块化的基本概念 1 1 什么是模块化 模块化是指解决一个复杂问题时 自顶向下逐层把系统划分成若干模块的过程 对于整个系统来说 模块是可组合 分解和更
  • linux中,如何使用tar进行解压缩

    linux中 如何使用tar进行解压缩 环境 windows 7 virtualboax fedora 15 kde 可以使用tar命令解压缩 tar gz文件 下面以解压缩qt源文件举例 1 在windows中将qt源文件拷贝到共享文件夹
  • Python基础语法【4】—— 结构数据类型之列表

    文章目录 一 创建列表 1 使用 直接创建列表 2 使用list 函数创建列表 二 访问列表元素 1 使用索引方式访问列表元素 2 使用切片方式访问列表元素 三 添加元素到列表 1 使用append 方法添加元素 2 使用extend 方法
  • nvidia 专业显卡解码能力

    专业显卡问题 p620 解码 有时我们经常遇到专业显卡 专业显卡和非专业显卡在使用opengl 等底层调用时表现不一样 值得注意的是 专业显卡解码能力到了p400 以上才有显著的提升 p620 家族为开始又有提升 p620 不支持vp8 解
  • xml 入门 dtd

    xml dtd的格式 相
  • 入门文献复现——Murphy C K——Combining belief functions when evidence conflicts

    作者Murphy提出了综合平均法来组合多个BOE 大体的步骤如下 1 将给定的BOE进行平均 获得各个BPA的平均质量averageMass 2 利用Dempster的组合规则将 1 求得的平均质量进行组合 并且组合 n 1 次 n为BOE
  • Easyexcel 导出数据 一对多关系导出数据集合

    客户要求 要求导出的表格如图 实现这样表格 很多人会想到动态表头 easypoi可以直接实现 但是我用的是easyexcel 而easyexcel自身并没有提供自动合并的功能所以还是需要自己来合并 代码如下 首先我们来看下将嵌套数据平铺 不
  • 爬虫访问中,如何解决网站限制IP的问题?

    爬虫访问中 如何解决网站限制IP的问题 多年爬虫经验的人告诉你 国内ADSL是王道 多申请些线路 分布在多个不同的电信区局 能跨省跨市 IP覆盖面越多越好 九州动态IP是一款动态IP转换器客户端 该提供国内26个省共百万数据级动态ip切换
  • 数学建模模型_数学建模模型、算法、资料必备

    提到数学建模 首先想到的是国赛 美赛 其实不然 国内目前举办了很多类似的数学建模竞赛 比如五一数学建模竞赛 中青杯等比赛 这些竞赛都可以积极参与 积累实战经验 我曾参加过数次数学建模竞赛 作为过来人 针对往年数学建模竞赛的参赛经历 我提出两
  • Python Selenium 基础入门

    本内容主要介绍 Python Selenium 的基础使用方法 1 Python Selenium 简介和环境配置 1 1 Selenium 简介 Selenium 是一个 Web 的自动化测试工具 最初是为网站自动化测试而开发的 Sele
  • matlab如何读取一个文件夹下所有文件,Matlab获取一个文件夹下所有文件

    使用Matlab可以使用dir函数获取指定文件夹下的所有文件名 具体操作如下 首先进入指定的文件夹 例如 folder C Users example Documents 替换为你自己的文件夹路径 cd folder 进入指定文件夹中 然后
  • 10亿级数据量的系统性能优化设计,被惊艳到了!

    V xin ruyuanhadeng获得600 页原创精品文章汇总PDF 这篇文章 我们来聊一聊在十亿级的大数据量技术挑战下 世界上最优秀的大数据系统之一的Hadoop是如何将系统性能提升数十倍的 首先一起来画个图 回顾一下Hadoop H
  • 水面无人艇局部危险避障算法研究 参考文献

    水面无人艇局部危险避障算法研究 Local Risk Obstacle Avoidance Algorithm of USV 博主 的硕士毕业论文 参考文献 1 Manley J E Unmanned surface vehicles 15
  • Android关于手机和模拟器的区分

    在网上试了很多区分方法感觉都被模拟器破了 只能说一句模拟器太强大了 当我快要放弃的时候 突然想到个方法 方法分为获取手机的短信 获取手机的图片 获取手机的通讯录 一个正常的用户不可能没有这些信息 当然这些需要用户同意授权 所以当这些没用没授
  • 如何利用producer向Kafka发送信息(基于java客户端(Kafka-clients))

    继上一篇Kafka安装以及环境准备文章后 这一小节是讲解如何利用Java客户端kafka clients库进行消息的发送 工程结构 maven工程 maven依赖