Kafka 2.0的简单Producer和Consumer实现

2023-10-26

系统环境

在kafka单节点运行环境下,尝试使用java创建Kafka的Producer和Consumer进行测试,具体的代码环境如下:

  • OS:Ubuntu 16.4
  • Kafka:2.11_2.0.0
  • Zookeeper:使用Kafka中自带的Zookeeper进行启动
  • JDK: 1.8

项目使用maven,其中pom.xml的相关内容如下:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.0.0</version>
</dependency>
 
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>
 
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>
 
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
    <!--<scope>test</scope>-->
</dependency>

简单Producer的实现

简单Producer的代码如下:

package kafka;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
 
public class TestProducer {
 
    private static Properties kafkaProps;
 
    private static void initKafka() {
        kafkaProps = new Properties();
        // broker url
        kafkaProps.put("bootstrap.servers", "localhost:9092"); //,192.168.216.139:9092,192.168.216.140:9092
        // request need to validate
        kafkaProps.put("acks", "all");
        // request failed to try
        kafkaProps.put("retries", 0);
        // memory cache size
        kafkaProps.put("batch.size", 16384);
        //
        kafkaProps.put("linger.ms", 1);
        kafkaProps.put("buffer.memory", 33554432);
        // define the way of key and value serializer
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
    }
 
    public static void main(String[] args) {
        initKafka();
        Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
        }
        System.out.println("Message sent successfully!");
        producer.close();
    }
}

代码中需要注意的是:bootstrap.servers的配置项,在默认kafka的单节点配置时,不能使用IP,而是使用localhost进行连接,否则会连接异常。

此处对代码中用到的几个参数进行解释:

  • bootstrap.servers:用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2;
  • acks:生产者需要server端在接收到消息后,进行反馈确认的尺度,主要用于消息的可靠性传输;acks=0表示生产者不需要来自server的确认;acks=1表示server端将消息保存后即可发送ack,而不必等到其他follower角色的都收到了该消息;acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认。
  • retries:生产者发送失败后,重试的次数 batch.size:当多条消息发送到同一个partition时,该值控制生产者批量发送消息的大小,批量发送可以减少生产者到服务端的请求数,有助于提高客户端和服务端的性能。
  • linger.ms:默认情况下缓冲区的消息会被立即发送到服务端,即使缓冲区的空间并没有被用完。可以将该值设置为大于0的值,这样发送者将等待一段时间后,再向服务端发送请求,以实现每次请求可以尽可能多的发送批量消息。
  • batch.size和linger.ms是两种实现让客户端每次请求尽可能多的发送消息的机制,它们可以并存使用,并不冲突。
  • buffer.memory:生产者缓冲区的大小,保存的是还未来得及发送到server端的消息,如果生产者的发送速度大于消息被提交到server端的速度,该缓冲区将被耗尽。
  • key.serializer,value.serializer说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。

简单Consumer的实现

简单Consumer的代码如下:

package kafka;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
import java.util.Collections;
import java.util.Properties;
 
public class TestConsumer {
 
    private static Properties kafkaProps = new Properties();
 
    private static void kafkaInit() {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        // group id for each consumer
        kafkaProps.put("group.id", "test");
        // if value legal, auto add offset
        kafkaProps.put("enable.auto.commit", "true");
        // set how long time to udpate the offset value
        kafkaProps.put("auto.commit.interval.ms", "1000");
        // set session response time
        kafkaProps.put("session.timeout.ms", "30000");
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }
 
    public static void main(String[] args) {
        kafkaInit();
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(kafkaProps);
        kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
        System.out.println("Subscribed to topic:" + "my-topic");
 
        int i = 0;
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100); // ?
            for (ConsumerRecord<String, String> record : records) {
                // print the offset, key and value for the consumer records
                System.out.printf("Offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
            }
        }
    }
}

运行情况

Producer的成功运行后,部分输出如下:

可以看到,在producer的initKafka的相关配置项的值出现在ProducerConfig values中。

image

Consumer成功运行后,可以看到在producer中send的相关key和value值,在consumer的输出中出现:

image

遇到的问题

在producer运行时,出现如下错误:

image

在提示的参考URL页面中,可以找到相关问题的说明:

image

具体的解决方法为,修改pom.xml文件:在pom.xml文件中加入slf4j的相关引用,并将slf4j-log4j12引用中:

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>
 
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
    <!--<scope>test</scope>-->
</dependency>

修改完成后,重新运行producer程序,可以正常运行。

资料参考

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 2.0的简单Producer和Consumer实现 的相关文章

  • Kafka/Spark消费topic到写出到topic

    1 Kafka的工具类 1 1 从kafka消费数据的方法 消费者代码 def getKafkaDStream ssc StreamingContext topic String groupId String consumerConfigs
  • 《消息队列高手课》缓存策略:如何使用缓存来减少磁盘IO?

    现代的消息队列 都使用磁盘文件来存储消息 因为磁盘是一个持久化的存储 即使服务器掉电也不会丢失数据 绝大多数用于生产系统的服务器 都会使用多块儿磁盘组成磁盘阵列 这样不仅服务器掉电不会丢失数据 即使其中的一块儿磁盘发生故障 也可以把数据从其
  • python 自建kafka消息生成和消费小工具

    要将 Kafka 的消息生产和消费转换为 API 接口 我们可以使用 Python 的 Web 框架 其中 Flask 是一个轻量级且易于使用的选择 下面是一个简单的例子 使用 Flask 创建 API 来生成和消费 Kafka 消息 1
  • 如何更好地使用Kafka?

    引言 要确保Kafka在使用过程中的稳定性 需要从kafka在业务中的使用周期进行依次保障 主要可以分为 事先预防 通过规范的使用 开发 预防问题产生 运行时监控 保障集群稳定 出问题能及时发现 故障时解决 有完整的应急预案 这三阶段 事先
  • 第十四章 kafka专题之日志数据删除策略

    日志数据清理 为了控制磁盘的容量 需要对过去的消息进行清理 1 内部定时任务检测删除日志 默认是5分钟 2 日志清理参数配置 支持配置策略对数据进行清理 以segment为基本单位进行定期清理 当前正在使用的segment不会被清理 启用c
  • Kafka——集群

    文章目录 集群 1 搭建个集群 2 集群发送消息 3 集群消费 3 1 Procuder 3 2 Consumer 4 消费顺序 集群 对于kafka来说 一个单独的broker意味着kafka集群中只有一个节点 要想增加kafka集群中的
  • Flink设置Source数据源使用kafka获取数据

    流处理说明 有边界的流bounded stream 批数据 无边界的流unbounded stream 真正的流数据 Source 基于集合 package com pzb source import org apache flink ap
  • windows python kafka 初级使用

    今天花了点时间在这个kafka上 因为我们工作中也用到了kafka 我这边对于kafka的理解是能用或者知道基本原理就行 实现在自己的windows环境搭建一次kafka 然后使用python进行数据的生产和消费 如果之后工作中对于kafk
  • 仿kafka实现java版时间轮

    系统定时 超时 在我们平时的项目开发中 会设置系统的超时时间 比如在http接口中设置超时时间 在定时调度中也会用到 在jdk的开发的实现Timer和ScheduledThreadPoolExecutor DelayQueue定时调度中使用
  • springboot集成kafka实战项目,kafka生产者、消费者、创建topic,指定消费分区

    springboot集成kafka实战项目 kafka生产者 消费者 创建topic 指定消费分区 前言 本项目代码可直接集成到你现有的springboot项目中 功能包括 1 kafka生产者配置 2 kafka消费者配置 指定分区消费
  • 附录:kafka源码启动

    本文以源码2 8为例 准备如下 idea 2019 1 4 jdk 1 8 scala 2 12 8 gradle 6 8 1 zookeeper 3 4 10 kafka2 8源码 注意 以下安装都需要装在没有空格的路径上 比如D Pro
  • java版kafka producer实现

    需求 1 kafka server已经配置完全 且设定了访问限制 基于这一点 必须要设定认证 及预先分配的账号密码 2 由于项目开发环境是java 且不允许使用LogStash 基于这一点 必须实现一个java版的producer 先贴一份
  • [分布式] zookeeper集群与kafka集群

    目录 一 Zookeeper 概述 1 1 Zookeeper定义 1 2 Zookeeper 工作机制 1 3 Zookeeper 特点 1 4 Zookeeper 数据结构 1 5 Zookeeper 应用场景 1 6 Zookeepe
  • JMS 消耗多个主题

    我是 Java 新手 正在开发一个使用多个 不同 主题并将其发送到另一台服务器的项目 我想知道处理多个主题的最佳方法是什么 据我了解 每个消费者都与一个主题相关 因此 如果我必须使用多个主题 则每个不同的主题都需要一个消费者 由于消费者进行
  • MQ - KAFKA 基础篇

    1 KAFKA的核心组件 API Producer API 它允许应用程序向一个或多个 topics 上发送消息记录 Consumer API 允许应用程序订阅一个或多个 topics 并处理为其生成的记录流 Streams API 它允许
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台 用于实时捕获和发布数据库更改事件 它可以将关系型数据库 如 MySQL PostgreSQL Oracle 等 的变更事件转化为可观察的流数据 以供其他应用程序实时消费和处理 本文中我们将采用 De
  • Java 8 java.util.function.Consumer<> 的 C# 等效项是什么?

    C 中是否有与此接口等效的接口 例子 Consumer
  • 阿里技术官亲笔力作:Kafka限量笔记,一本书助你掌握Kafka的精髓

    前言 分布式 堪称程序员江湖中的一把利器 无论面试还是职场 皆是不可或缺的技能 而Kafka 这款分布式发布订阅消息队列的璀璨明珠 其魅力之强大 无与伦比 对于Kafka的奥秘 我们仍需继续探索 要论对Kafka的熟悉程度 恐怕阿里的大佬们
  • Java线程生产者消费者算法无法正常工作

    我正在尝试学习线程 因此我编写了一个示例生产者消费者问题 其中生产者生成从 1 到 10 的数字 而消费者必须显示它们 但只有消费者显示数字 1 并停止 正如我所说 该程序写得不好 可能很荒谬 但我仍然想弄清楚为什么从 1 到 10 的所有
  • 使用 Kafka 和 NodeJS 进行实时通知

    在我的项目中 我必须设计一个实时通知系统 我就是这样做的 如下图所示 你可以看到我使用 Kafka 作为队列消息系统 并使用 NodeJS 来构建 Websocket Server 和 Kafka Consumers 生产者将收集通知数据并

随机推荐

  • 基于51单片机的红外解码器

    1 简介 本红外解码器是以MCS 51系列AT89C512片机为核心 将红外传感器接收的信号解析出来 LCD1602显示屏将解码数据显示出来 2 总体原理图 硬件组成 单片机最小系统 LCD1602显示屏 IR红外接收器 系统电源 3 程序
  • 数据结构---n皇后问题

    n皇后问题 题目描述 JAVA实现 力扣提交代码 n皇后问题 题目描述 对于四皇后问题的解 放置一个皇后 棋盘被占据的解为 很明显 每行只能放置 且只能放置一个皇后 代码中 queen row 1 来实现的 一个完整的示例 放第一个皇后 放
  • 抖音将会输给快手?时间会证明一切

    为什么快手产品的主界面上没有设置频道分类 这样带来的用户体验真的好吗 在快手一个月之前的员工大会上 入职不久的新员工赵波提出了这个疑问 这个问题不只是他一个人的 之前官方的解答是 不意给用户设置标签是为了保持界面简洁 不对内容做过多的评判
  • i.MX6ULL - 从零开始移植uboot-imx_v2020.04_5.4.70_2.3.0

    i MX6ULL 从零开始移植uboot imx v2020 04 5 4 70 2 3 0 目录 i MX6ULL 从零开始移植uboot imx v2020 04 5 4 70 2 3 0 前言 1 环境搭建 2 NXP官方原版UBOO
  • 一文解决VS Code安装、C++环境配置、OpenCV配置

    前言 本文包括VScode安装 C 环境配置以及OpenCV配置全过程 VS Studio配置OpenCV比较简单 可以直接使用OpenCV官网已有的用VS Studio编译器编译好的OpenCV库 但VS Code不能直接利用VS Stu
  • Android:基本程序单元 Activity

    Activity 概述 在 Android 应用中 提供了 4 大基本组件 分别是 Activity Service BroadcastReceiver 和 ContentProvider 而 Activity 是 Android 应用最常
  • QApplication与QCoreApplication

    QApplication GUI 程序中 有且仅有一个 QApplication 类 管理GUI程序的控制流和主设置 QApplication 包含主事件循环 所有来自窗口系统和其他源的事件将被处理和分配 它也处理程序的初始化 析构和提供会
  • **vue.esm.js?efeb:591 [Vue warn]: Invalid prop: type check failed for prop "data". Expected Array

    vue esm js efeb 591 Vue warn Invalid prop type check failed for prop data Expected Array got String 有可能是这几种情况
  • CyclicBarrier底层源码解析

    一 概述 前面我们讲解了ReentrantLock CountDownLatch Semaphore的源码 他们都是由AQS来实现的 而CyclicBarrier则是通过ReentrantLock Condition实现的 CyclicBa
  • 达夫设备简单介绍

    前言 想到肯哥每天的Open话题 总能学到一些知识 怕忘记 所以我就当成博客记录一下了 今天要记录的是2023年6月5日 肯哥的技术交流群里面的一个代码 肯哥话题 肯哥的原话 hello 又到了每天的open话题时刻 今天我们聊点技术的东西
  • MATLAB学习——Matlab系统环境介绍

    本篇文章并不涉及Matlab的具体使用方法和相关函数 仅仅是和大家一起熟悉Matlab的操作界面 祝大家小年快乐 记得吃糖瓜 总体来说 Matlab的使用界面和office的使用界面具有很高的相似性 因此 对于要熟悉Matlab使用的初学者
  • JVM内存分配机制

    学习了JVM的内存分配机制为大家分享一下 现在把学习笔记总结记录一下 如果记录有些错误 还望指出 一 对象的创建 对象创建的主要流程 1 类加载检查 虚拟机遇到一条new指令时 首先将去检查这个指令的参数是否能在常量池中定位到一个类的符号引
  • 【 OpenCV】——图像缩放

    OpenCV 图像缩放 前言 本文介绍了图像放大 图像放小基础内容 使用步骤 1 引入库 include
  • 第五章 运输层 ---概述,端口号,复用,分用

    第五章 运输层 概述 端口号 复用 分用 5 1 运输层概述 作用范围与简介 总结 5 2 运输层端口号 复用 分用的概念 运输层端口号 发送方的复用与接收方的分用 TCP IP体系的应用层常用协议所使用的运输层熟知端口号 5 1 运输层概
  • python 定时器

    1 BlockingScheduler库 缺点 会阻塞代码 优点 调用定时函数时方便灵活定义定时 比如间隔多长时间调用一次 比如那几个月调用一次 代码 from apscheduler schedulers blocking import
  • torch.tensor拼接与list(tensors)

    tensor list tensors Construct list tensors To stack list tensors To concatenate list tensors Construct list tensors 创建一个
  • Linux操作笔记

    1 关闭死程序 root node3 ps aux grep fire root 2105 0 0 0 0 112660 964 pts 0 S 15 10 0 00 grep color auto fire root 10620 0 0
  • 最简单的Flutter权限管理插件

    文章目录 用法 配置权限 Android iOS 检查权限 请求权限 处理回调 例子 插件开发 欢迎关注公众号 编程之路从0到1 这是Flutter上的一个动态权限处理的插件库 可以让Flutter应用层的开发者以非常简单的API统一处理原
  • Springboot整合redis

    Springboot整合redis 原文链接 https www kuangstudy com bbs 1534913977346584577 为方便自己整合redis 特记录一下redisTemplate和redisUtil代码 1 自定
  • Kafka 2.0的简单Producer和Consumer实现

    系统环境 在kafka单节点运行环境下 尝试使用java创建Kafka的Producer和Consumer进行测试 具体的代码环境如下 OS Ubuntu 16 4 Kafka 2 11 2 0 0 Zookeeper 使用Kafka中自带