Kafka —— java实现一生产者多消费者实例

2023-05-16

架构图:(网图,很通俗易懂了,就不自己画了,这里实现的是一个Producer 两个Consumer)

 

前提:已经开启zookeeper 和kafka ,具体可参考博客https://blog.csdn.net/DGH2430284817/article/details/90483089

 

步骤:

        1,:设置kafka 分区为2 :

        修改kafka 目录config 下的文件server.properties ,修改 num.partitions=2,保存,重启kafka。

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=2

        在修改后,生产者在发送消息到kafka 的broker 的时候就会保存到两个分区中的一个,具体是哪个要根据分区规则的写法,一般采取的都是生产者提供一个key ,对这个key 进行hashCode() 计算它的值,再对这个值除分区数取余,用这个余数来决定消息保存到哪个分区。(如现在分区是2,经过上面的计算后只会出现0和1的值,而且是散列的,两个分区的数据量会差不多)

        2:编写分区规则:

package com.kafka1;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class SimplePartitioner implements Partitioner {
	
	public SimplePartitioner () { }
	
	public SimplePartitioner (VerifiableProperties props) { }
 
	public int partition(Object key, int numPartitions) {
		int partition = 0;
		String k = (String)key;
		partition = Math.abs(k.hashCode()) % numPartitions;//根据key的hashCode和分区数numPartitions,算出所在分区,如分区数为2,返回值只会是0和1
		return partition;
	}
} 

 

        3:编写生产者类PartitionerProducer.java:

package com.kafka1;

import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.producer.Partitioner;

public class PartitionerProducer {
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put("serializer.class", "kafka.serializer.StringEncoder"); 
		props.put("metadata.broker.list", "127.0.0.1:9092");//连接 kafka
		props.put("partitioner.class", "com.kafka1.SimplePartitioner"); //指定分区规则的类
		Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
		String topic = "dgh-test";
		Partitioner Partitioner = new SimplePartitioner();
		for (int i = 0; i <= 10; i++) {
			String k = "key" + System.currentTimeMillis(); //key+当前毫秒,形成唯一key,也可以使用时间戳
			String v = k + ":value" + i; //消息内容
			int partition = Partitioner.partition(k, 2); //计算消息key所保存的分区号,2是分区数,实际生产这里可以不用,因为要方便观看所以输出分区
			System.out.println("topic:" + topic + ", partition:" + partition + ", key:" + k + ", value:" +v);
			producer.send(new KeyedMessage<String, String>(topic, k, v));
		}
		producer.close();
	}
}

        右键运行:

topic:dgh-test, partition:1, key:key1560695392884, value:key1560695392884:value0
topic:dgh-test, partition:1, key:key1560695393122, value:key1560695393122:value1
topic:dgh-test, partition:1, key:key1560695393124, value:key1560695393124:value2
topic:dgh-test, partition:0, key:key1560695393127, value:key1560695393127:value3
topic:dgh-test, partition:1, key:key1560695393128, value:key1560695393128:value4
topic:dgh-test, partition:0, key:key1560695393129, value:key1560695393129:value5
topic:dgh-test, partition:0, key:key1560695393130, value:key1560695393130:value6
topic:dgh-test, partition:0, key:key1560695393132, value:key1560695393132:value7
topic:dgh-test, partition:1, key:key1560695393133, value:key1560695393133:value8
topic:dgh-test, partition:1, key:key1560695393135, value:key1560695393135:value9

        根据控制台结果,可以看到一共生产了10条消息,其中根据分区规则,有6条消息保存在partition分区1,4条在分区0,如果数据量大,两个分区的数据是很平均的。(想要多个生产者的就多开几个线程去生产消息)

        4:编码消费者类DConsumer.java:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class DConsumer extends Thread{
	public static void main(String[] args) throws InterruptedException {
		//开两个线程,两个连接,相当于两个消费者
		DConsumer consumer1 = new DConsumer("consumer-1");//消费者consumer-1
		consumer1.start();
		DConsumer consumer2 = new DConsumer("consumer-2");//消费者consumer-2
		consumer2.start();
	}
	private String consumerName = null;
	public DConsumer ( String consumerName) {
		this.consumerName = consumerName;
	} 
    @Override
    public void run() {
    	System.out.println("消费者:" +consumerName + "开始处理消息");
		String topic = "dgh-test";
		Properties props = new Properties();
		props.put("group.id", "group1");
		//props.put("zookeeper.connect", "127.0.0.132:2181,127.0.0.133:2182,127.0.0.134:2183");
		props.put("zookeeper.connect", "127.0.0.1:2181");//连接 zookeeper
		props.put("zookeeper.session.timeout.ms", "400");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");
		props.put("auto.offset.reset", "smallest"); 
		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, new Integer(1));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
		KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
		while (it.hasNext()) {
			MessageAndMetadata<byte[], byte[]> mam = it.next();
			System.out.println("consumer: " + consumerName + ", Partition: " + mam.partition() + ", Message: " + new String(mam.message()) + "");
			try {
				sleep(3000);
			} catch (InterruptedException e) { 
				e.printStackTrace();
			}
		}
	}
}

        右键运行:

消费者:consumer-1开始处理消息
消费者:consumer-2开始处理消息
[2019-06-16 22:38:40,317] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2019-06-16 22:38:40,317] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
consumer: consumer-1, Partition: 0, Message: key1560695393127:value3
consumer: consumer-2, Partition: 1, Message: key1560695392884:value0
consumer: consumer-2, Partition: 1, Message: key1560695393122:value1
consumer: consumer-1, Partition: 0, Message: key1560695393129:value5
consumer: consumer-1, Partition: 0, Message: key1560695393130:value6
consumer: consumer-2, Partition: 1, Message: key1560695393124:value2
consumer: consumer-2, Partition: 1, Message: key1560695393128:value4
consumer: consumer-1, Partition: 0, Message: key1560695393132:value7
consumer: consumer-2, Partition: 1, Message: key1560695393133:value8
consumer: consumer-2, Partition: 1, Message: key1560695393135:value9

        在消费者连接进行消息消费的时候,如果只有一个消费者,那两个分区的消息都是会把消息发送给这个消费者,如果两个消费者的话,先连接的消费者获取的分区0的消息,后连接的消费者获取分区1的消息。如果三个消费者的话,第三个连的消费者会获取不了消息,所以消费者的数量最好不要超过分区数。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka —— java实现一生产者多消费者实例 的相关文章

  • Android ViewPager用法

    1 适配器PagerAdapter ViewPager使用适配器类将数据和view的处理分离 xff0c ViewPager的适配器叫PagerAdapter xff0c 这是一个抽象类 xff0c 不能实例化 xff0c 所以它有两个子类
  • Android Fragment★★

    1 Fragment fragment译为 碎片 xff0c 是Android 3 0 xff08 API 11 xff09 提出的 xff0c 最开始是为了适配大屏的平板 Fragment看起来和Activity一样 xff0c 是一个用
  • Android设计模式—适配器模式★★★

    1 适配器模式 适配器模式是指把一个类的接口变换成客户端所期待的另一种接口 xff0c 从而使原本因接口不匹配而无法在一起工作的两个类能够在一起工作 适配器模式是为了解决接口不兼容问题的 比如厂商给你的接口和你现有的接口对接不起来 旧的数据
  • Android 类加载机制

    nbsp 1 类加载机制 java文件不是可执行的文件 需要先编译成 class文件才可以被虚拟机执行 而类加载就是指通过类加载器把 class文件加载到虚拟机的内存空间 具体来说是方法区 类通常是按需加载 即第一次使用该类时才加载 Jav
  • Android Bitmap防止内存溢出

    1 Bitmap 在Android开发中经常会使用到Bitmap xff0c 而Bitmap使用不当很容易引发OOM Bitmap占用内存大小的计算公式为 xff1a 图片宽度 图片高度 一个像素点所占字节数 xff0c 因此减小这三个参数
  • Swift NSAttributedString的使用

    NSMutableAttributedString let testAttributes 61 NSAttributedStringKey foregroundColor UIColor blue NSAttributedStringKey
  • Android ViewStub

    1 ViewStub ViewStub是一个可用于性能优化的控件 xff0c 它是一个不可见的 零尺寸的View xff0c 可以在运行时进行延迟加载一个布局文件 xff0c 从而提高显示速率 viewstub和include比较像 xff
  • Android Jetpack—LiveData和数据倒灌

    1 LiveData LiveData是Android Jetpack包提供的一种可观察的数据存储器类 xff0c 它可以通过添加观察者被其他组件观察其变更 不同于普通的观察者 xff0c LiveData最重要的特征是它具有生命周期感知能
  • Gradle build 报错:Received status code 400 from server: Bad Request

    全部错误是这样的 xff1a Could not GET 39 https dl google com dl android maven2 com android tools build gradle 3 1 2 gradle 3 1 2
  • 排列组合详解

    在笔试题中看到的一个选择题 用1 3的瓷砖密铺3 20的地板有几种方式 xff1f 排列组合问题 排列和组合问题 xff0c 其实是两种问题 xff0c 区分它们的原则是是否需要考虑顺序的不同 排列问题 xff0c 考虑顺序 xff1b 组
  • SCKKRS-关键词、关键短语提取

    1 简介 SCKKRS Self supervised Contextual Keyword and Keyphrase Retrieval with Self Labelling 本文根据2019年 Self supervised Con
  • kali安装vnc

    一 安装x11vnc 1 经过N多次的实验 xff0c kali一直报错 xff0c tightvncserver一直报错 怎么配置都是黑屏 xff0c 奔溃 最后退而求其次 xff0c 安装x11vnc 2 很简单的命令 sudo apt
  • 计算机硬件技术基础第一章总结

    1 1 计算机发展概述 1 1 1 计算机的发展简史 第一台计算机 xff1a ENIAC 第一代 xff1a 电子管数字计算机 xff08 1946 1958 xff09 逻辑元件 xff1a 真空电子管体积大 xff0c 功耗高 xff
  • CentOS7安装Oracle JDK1.8

    JDK1 8下载地址 https www oracle com java technologies javase javase8 archive downloads html 需要登录之后才能下载文件 xff0c 下载jdk 8u202 l
  • Ubuntu 16.04 安装 rtl8812au系列 (DWA-182) wireless adapter driver

    Ubuntu 16 04 安装 rtl8812au系列 DWA 182 wireless adapter driver 刚刚开始使用Linux xff0c 一脸懵逼 xff0c 命令行搞得一愣一愣的 xff0c 不过熟悉了之后就好很多了 一
  • SpringBoot项目启动失败报错Annotation-specified bean name ‘xx‘ for bean class [xxx] conflicts with existing

    问题描述 xff1a 项目启动就会报 xff1a Annotation specified bean name xx for bean class xxx conflicts with existing non compatible bea
  • Visual Studio高效实用的扩展工具、插件

    说明 xff1a 对一个有想法的程序员来说 xff0c 善于使用一款高效的开发工具是很重要的 xff0c 今天给大家介绍的是宇宙第一IDE vs用起来很不错的开发工具 xff0c 假如大家觉得不错也可以尝试的用用 xff0c 毕竟对于我们这
  • java琐事

    并发编程 并发的意义 并发通常是提高运行在单处理器上的程序的性能 如果程序中的某个任务因为该程序控制范围之外的某些条件 I O 而导致不能继续执行 xff0c 那么这个任务或线程就阻塞了 如果没有并发 xff0c 整个程序都讲停下来 从性能
  • java类的初始化和实例化的初始化(类的初始化过程)

    Java类的加载顺序 父类静态代变量 父类静态代码块 子类静态变量 子类静态代码块 父类非静态变量 xff08 父类实例成员变量 xff09 父类构造函数 子类非静态变量 xff08 子类实例成员变量 xff09 子类构造函数 上面的说法也
  • 最优吞吐量和最短停顿时间

    在实践活动中 xff0c 我们通过最优吞吐量和最短停顿时间来评价jvm系统的性能 吞吐量越高算法越好 暂停时间越短算法越好 首先让我们来明确垃圾收集 GC 中的两个术语 吞吐量 throughput 和暂停时间 pause times JV

随机推荐

  • sql执行慢的原因有哪些,如何进行sql优化?

    一 导致SQL执行慢的原因 1 硬件问题 如网络速度慢 xff0c 内存不足 xff0c I O吞吐量小 xff0c 磁盘空间满了等 2 没有索引或者索引失效 xff08 一般在互联网公司 xff0c DBA会在半夜把表锁了 xff0c 重
  • 阿里java开发手册2019年最新版619(华山版)PDF下载

    链接 https pan baidu com s 1ANvBu1hidnvRCZILDGXuQA 密码 ugq8
  • Mockito:org.mockito.exceptions.misusing.InvalidUseOfMatchersException

    org span class token punctuation span mockito span class token punctuation span exceptions span class token punctuation
  • 一个简单通用的基于java反射实现pojo转为fastjson对象的方法

    最近在公司工作需要实现一个工具实现一个pojo转为fastjson对象的通用工具 xff0c 直接上源码 span class token comment 通用的pojo转为Json对象的方法 64 author ZFX 64 date20
  • Java魔法类:Unsafe应用解析

    这个美团大神对于Unsafe的分析很全面 https tech meituan com 2019 02 14 talk about java magic class unsafe html
  • Linux X-Window Error: Can‘t open display: :0

    问题过程描述 许多经常部署Oracle数据的管理员经常需要对数据库软件进行部署 xff0c 但大多数都是通过远程部署的方式进行部署 xff0c 使用远程部署有两种方式 xff0c 一种是通过脚本部署 xff0c 另一种就是通过图形化进行部署
  • maven打包生成war跳过单元测试

    maven将项目打包成war包的命令是 mvn install 或mvn package 每次生成war包时会进行所以的单元测试 xff0c 如果想跳过单元测试直接生成war包有以下3种方式 方法1 xff1a 在pom xml中加入如下代
  • 程序员每天工作多少个小时_程序员每天实际工作几个小时?

    程序员每天工作多少个小时 您如何看待 xff0c 程序员每天实际工作多长时间 xff1f 大多数人会说答案是8到9个小时 有人说他们每天工作12个小时或更长时间 尽管这是正确的 xff0c 但它并不是大多数程序员实际工作的数量 xff0c
  • ubuntu 显示缺少库文件 libcom_err.so.2 解决办法

    运行任何代码都显示 xff1a error while loading shared libraries libcom err so 2 cannot open shared object file No such file or dire
  • 记CVTE第一次面试

    首先说明一下博主是一个大三的学生 xff0c 专业计算机科学与技术 xff0c 主学的方向是Web后台开发 xff0c 主语言是Java 前几天看到CVTE有校园招聘实习生 xff0c 就报名参加了 xff0c 做了CVTE的笔试题 xff
  • Java Socket 编程那些事(1)

    前言 最近在准备面试和笔试的一些东西 xff0c 回去翻看了Java关于IO的基础 xff0c 发现很多基础还是没有记牢固 xff0c 现在回头重新学习 xff0c 就从socket通讯开始吧 xff0c 虽然说现在企业很少直接编写sock
  • Redis集群的原理和搭建

    前言 Redis 是我们目前大规模使用的缓存中间件 xff0c 由于它强大高效而又便捷的功能 xff0c 得到了广泛的使用 单节点的Redis已经就达到了很高的性能 xff0c 为了提高可用性我们可以使用Redis集群 本文参考了Rdis的
  • Java多线程爬虫爬取京东商品信息

    前言 网络爬虫 xff0c 是一种按照一定的规则 xff0c 自动地抓取万维网信息的程序或者脚本 爬虫可以通过模拟浏览器访问网页 xff0c 从而获取数据 xff0c 一般网页里会有很多个URL 爬虫可以访问这些URL到达其他网页 xff0
  • 关于js中的“Uncaught SyntaxError: Unexpected token

    我在js中为一个已经定义的数组重新定义新的一个维度的数组时 xff0c 调试器这样报错 只说结果 xff1a 肯定是在给已经定义的数组中的元素重新定义下一维度时 xff0c 多在前面加了一个 var 就像下面的这样 xff1a var gr
  • 学成在线--day03 CMS页面管理开发

    学成在线 第3天 讲义 CMS页面管理开发 1 自定义条件 1 1 需求分析 在页面输入查询条件 xff0c 查询符合条件的页面信息 查询条件如下 xff1a 站点Id xff1a 精确匹配 模板Id xff1a 精确匹配 页面别名 xff
  • Ubuntu下安装Inode后双击InodeClient无反映解决方法

    由于比较喜好linux编程环境 xff0c 所以准本一直使用linux 学校无线有时有有时没很不爽 xff0c 所以准本安装Inode xff0c 但是安装完Inode后双击是一直没反映 最后求助万能的百度 xff0c 谁知道百度的搜索不得
  • 使用GitHub托管网站,自定义域名

    1 如何使用GitHub托管 官网链接 xff1a 点击跳转 官网首页就有详细的搭建步骤 xff0c 总共5步便可搭建成功 访问 github用户名 github io 便可看到自己的网站 2 自定义域名的方法 1 申请一个域名 xff0c
  • ElasticsearchRestTemplate 基本使用

    随着数据量的增加和数据结构的复杂化 xff0c 传统的关系型数据库已经不能满足用户的需求 xff0c 而搜索引擎则成为了一种更加高效 可扩展的数据检索方案 而 Elasticsearch 则是一个流行的搜索引擎 xff0c 在 Java 生
  • Navcat无法连接mysql报错1449

    把mysql从5升级成8后第二次连接mysql就报错1449 不清楚什么原因 xff0c 反正肯定是升级数据库之后mysql用户被动了 xff0c 看了很多博客都没有用 xff0c 什么在navcat里新建用户 xff0c 数据库都连不上怎
  • Kafka —— java实现一生产者多消费者实例

    架构图 xff1a xff08 网图 xff0c 很通俗易懂了 xff0c 就不自己画了 xff0c 这里实现的是一个Producer 两个Consumer xff09 前提 xff1a 已经开启zookeeper 和kafka xff0c