java实现kafka消息发送和接收

2023-11-12

之前写了一篇关于kafka集群搭建的点击打开链接。想了解的可以看下。

今天这个实现是和前面集群对应的。使用的是新版的API。属性如果想定制自己的,需要到官方网址上面去查看一下对应的值。

推介大家多去看看官方的介绍和demo。网上有些翻译过来的例子并不完善,最好是知己知彼,才能百战不殆

maven:

	<dependency>
    		<groupId>org.apache.kafka</groupId>
    		<artifactId>kafka-clients</artifactId>
    		<version>0.11.0.0</version>
    	</dependency>
   	    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.11.0.0</version>
    	</dependency>

生产者Producer:

package com.roncoo.example.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo {

    private final KafkaProducer<String, String> producer;

    public final static String TOPIC = "test5";

    private ProducerDemo() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "xxx:9092,1xxx:9092,xxx:9092");//xxx服务器ip
        props.put("acks", "all");//所有follower都响应了才认为消息提交成功,即"committed"
        props.put("retries", 0);//retries = MAX 无限重试,直到你意识到出现了问题:)
        props.put("batch.size", 16384);//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数
        //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
        props.put("linger.ms", 1);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理
        props.put("buffer.memory", 33554432);//producer可以用来缓存数据的内存大小。
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer",
              "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<String, String>(props);
    }

    public void produce() {
        int messageNo = 1;
        final int COUNT = 5;

        while(messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = String.format("hello KafkaProducer message %s from hubo 06291018 ", key);
            
            try {
                producer.send(new ProducerRecord<String, String>(TOPIC, data));
            } catch (Exception e) {
                e.printStackTrace();
            }

            messageNo++;
        }
        
        producer.close();
    }

    public static void main(String[] args) {
        new ProducerDemo().produce();
    }
}

消费者Consumer:

package com.roncoo.example.kafka;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;


public class UserKafkaConsumer extends Thread {

        public static void main(String[] args){
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092");//xxx是服务器集群的ip
            properties.put("group.id", "jd-group");
            properties.put("enable.auto.commit", "true");
            properties.put("auto.commit.interval.ms", "1000");
            properties.put("auto.offset.reset", "latest");
            properties.put("session.timeout.ms", "30000");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            kafkaConsumer.subscribe(Arrays.asList("test5"));
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("-----------------");
                    System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                    System.out.println();
                }
            }

        }
}


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java实现kafka消息发送和接收 的相关文章

随机推荐

  • python连接clickhouse使用方法

    前沿 clickhouse现在作为分布式存储成熟的解决方案 在python开发中经常会用到clickhouse的连接方案 下面所列一个简单的连接clickhouse的写法 正文 from clickhouse driver import C
  • 焉建伟:3.31黄金走势看涨看跌?黄金原油今日如何操作? 实时策略

    消息面 美东时间周二 美国媒体援引两名知情人士的话报道 拜登周三预计将在匹兹堡宣布2 25万亿美元的一揽子基础设施和就业支持计划 具体而言 大约6500亿美元会被用于重建美国基础设施 如道路 桥梁 高速公路和港口 4000亿美元用于老年人和
  • 蓝桥杯单片机前言(经验分享)

    本人在今年省赛的获得了省一等奖 这是战利品hhh 国赛由于没有好好准备 所以没有取得好名次 已经后悔了555 个人经历 当时是寒假开始学习蓝桥杯单片机开发板的 本人情况是有模电 电路理论基础 c语言基础 当时差不多忘记完了 没有51和模电基
  • 六轴融合算法

    先说什么叫六轴融合 在3Dof姿态追踪功能中 最主要的传感器就是陀螺仪 Gyroscope 它可以提供3个轴的角加速度 对时间进行积分 就可以得出物体旋转的方向角度 但是因为硬件精度等各方面原因 会产生误差 随着时间的累积 计算得到的角度误
  • 活动回顾|解锁 AIGC 密码,探寻企业发展新商机

    5月24日 Google Cloud 与 Cloud Ace 联合主办的线下活动顺利落下帷幕 本次活动 有近 40 位企业精英到场支持 三位 Google Cloud 演讲嘉宾就本次活动主题 为大家带来了比较深度的演讲内容 干货满满 以下的
  • Lua中的协同程序 coroutine

    int running 1 int lua finish lua State L running 0 printf lua finish called n return 0 int lua sleep lua State L printf
  • 装X指南之Xposed安装与配置

    一 前言 Xposed 能干嘛 我可以告诉你 Root Xposed 真的可以为所欲为 而 Android 开源 为 搞机 带了更多的乐趣的同时 当然也引入安全性问题 部分流氓软件在 Root 下 会盗取用户私密信息 例如 号码 照片 短信
  • 惊为天人,普林斯顿博士NumPy手写全部主流机器学习模型,代码超3万行

    点击上方 高级农民工 选择 星标 公众号 第一时间速享重磅干货 本文转自 机器之心 禁止二次转载 用 NumPy 手写所有主流 ML 模型 普林斯顿博士后 David Bourgin 最近开源了一个非常剽悍的项目 超过 3 万行代码 30
  • Golang 字符串(string)与字节数组([]byte)一行代码互转

    Golang 字符串 string 与字节数组 byte 一行代码互转 Golang轻松学习 文章目录 Golang 字符串 string 与字节数组 byte 一行代码互转 一 字符串与字节数组 二 详细代码 1 简单的方式字节转字符串
  • 【Redis】《Redis 开发与运维》笔记-汇总

    一 初识Redis 1 Redis提供了两种持久化方式 RDB和AOF 即可以用这两种策略将内存的数据保存到硬盘中 2 复制功能是分布式Redis的基础 3 一般推荐使用的安装方式 源码的方式进行安装 下面以3 0 7版本为例 只需6步 w
  • xshell传输文件到服务器(ubuntu)(上传下载)

    一 利用xshell上传下载内容 点击xftp按钮 绿色按钮 出现一个对话框 对话框左边是本地的文件预览 对话框右边是服务器当前路径的文件预览 上传下载文件 直接拖拉内容 二 其他方式 1 xshell连接服务器 本地虚拟机 2 首先在服务
  • python变量进阶(可变不可变,局部变量和全局变量)

    变量进阶 理解 目标 变量的引用 可变和不可变类型 局部变量和全局变量 01 变量的引用 变量 和 数据 都是保存在 内存 中的 在 Python 中 函数 的 参数传递 以及 返回值 都是靠 引用 传递的 1 1 引用的概念 在 Pyth
  • Mongodb常用语句总结

    1 查询所有用户信息 db system users find pretty 2 查询mongo启动进程 ps ef grep mongo 3 mongo常用操作命令 1 打开数据库 mongo 10 1 1 30 20000 admin
  • C++中promise和future详解

    Promise和Future 原理 C 11中promise和future机制是用于并发编程的一种解决方案 用于在不同线程完成数据传递 异步操作 传统方式通过回调函数处理异步返回的结果 导致代码逻辑分散且难以维护 Promise和Futur
  • js如何截取某个字符串前面或者后面的内容

    截取某个字符串后面的内容 var str http abc def ghi com var index str lastIndexOf str str substring index 1 str length 返回最后一个 后面的内容 截取
  • JavaScript -- 数组去重

    文章目录 方法一 利用indexOf 去重 思路 代码 方法二 利用splice方法去重 思路 代码实现 方法三 利用ES6的Set去重 思路 代码实现 方法一 利用indexOf 去重 indexOf 方法可返回某个指定的字符串值在字符串
  • REST API URI 设计的七准则

    在了解 REST API URI 设计的规则之前 让我们快速过一下我们将要讨论的一些术语 URI REST API 使用统一资源标识符 URI 来寻址资源 在今天的网站上 URI 设计范围从可以清楚地传达API的资源模型 如 http ap
  • 汇编指令与Intrinsics指令的对应关系汇总

    汇编指令与Intrinsics指令的对应关系汇总 参考网址 https software intel com sites landingpage IntrinsicsGuide 1 赋值指令 movq 使用方法 movq xmm m64 功
  • vs编译与停止调试时卡顿、无响应的问题

    这是由于VS运行太久参数大量的缓存导致 1 单击 开始 选择 运行 或者win r快捷键 2 键入 devenv exe resetuserdata 此命令会运行几分钟时间 Visual Studio 清除设置并将其自身重置到其最初的状态
  • java实现kafka消息发送和接收

    之前写了一篇关于kafka集群搭建的点击打开链接 想了解的可以看下 今天这个实现是和前面集群对应的 使用的是新版的API 属性如果想定制自己的 需要到官方网址上面去查看一下对应的值 推介大家多去看看官方的介绍和demo 网上有些翻译过来的例