kafka介绍,安装以及简单的java调用kafka代码

2023-11-05


Producer :消息生产者,向broker发消息的客户端。
Consumer :消息消费者,向broker取消息的客户端
Topic :一个队列,主题。

Message:消息是kafka处理的对象,在kafka中,消息是被发布到brokertopic中。而consumer也是从相应的topic中拿数据。也就是说,message是按topic存储的
Consumer Group :将topic消息的广播发给consumer的手段。一个topic可以有多个CG。
Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka


安装过程

(1)下载解压。官网下载kafka,http://kafka.apache.org/ 

解压到安装目录下 tar -xcvf 

(2)修改配置文件/usr/local/kafka/config/server.properties,修改如下内容

broker.id=0

host.name=hadoop1

zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181,hadoop4:2181,hadoop5:2181,hadoop6:2181

(3)修改完配置文件即可将整个文件夹传输到其他节点  scp -r 。。。。

(4)传输完之后修改每个节点的broker.id的编号,递增。

(5)启动zookeeper。

这边可以使用kafka自带的zookeeper,也可以使用自己安装的zookeeper。

启动自己安装的zookeeper  :   /app/zookeeper-3.4.6/bin/zkServer.sh start

各个节点均启动完成之后,可以查看zk的状态  /app/zookeeper-3.4.6/bin/zkServer.sh status

(6)启动kafka

/app/kafka_2.9.2-0.8.2.1/bin/kafka-server-start.sh /app/kafka_2.9.2-0.8.2.1/config/server.properties &

尾部加上&的作用是可以启动完之后直接按回车退出,继续下一步操作。也可不加&

(7)创建topic

/app/kafka_2.9.2-0.8.2.1/bin/kafka-topics.sh --create --topic idoall_testTopic --replication-factor 6 --partitions 2 --zookeeper hadoop1:2181


可通过指令查看所有的topic     /app/kafka_2.9.2-0.8.2.1/bin/kafka-topics.sh --list --zookeeper hadoop2:2181

(8)发送消息
/app/kafka_2.9.2-0.8.2.1/bin/kafka-console-producer.sh --broker-list hadoop3:9092 --sync --topic idoall_testTopic

(9)消费消息。重新打开一个终端执行指令

/app/kafka_2.9.2-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic idoall_testTopic --from-beginning
在原终端中输入消息,新终端中会显示出输入的消息。

关闭kafka指令。  /app/kafka_2.9.2-0.8.2.1/bin/kafka-server-stop.sh /app/kafka_2.9.2-0.8.2.1/config/server.properties &

java调用kafka

首先创建一个java project,网上很多说需创建maven工程,本人经过测验,发现maven工程和普通的java project均可调用。

需要注意,工程所采用的jar包,可以在相应版本的kafka安装文件夹的lib目录下引用,不同版本的jar包可能不通用,

如果出现java.lang.NoClassDefFoundError: scala/reflect/ClassManifest的报错,可能是由于jar包不匹配引起的。


Producer端代码:

package com;


import java.util.Date;
import java.util.Properties;
import java.text.SimpleDateFormat;   

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


public class Producertest {
     
     public static void main(String[] args) {
         Properties props = new Properties();
         props.put("zk.connect", "hadoop1:2181/kafka,hadoop2:2181/kafka,hadoop3:2181/kafka,hadoop4:2181/kafka,hadoop5:2181/kafka,hadoop6:2181/kafka");
         // serializer.class为消息的序列化类
         props.put("serializer.class", "kafka.serializer.StringEncoder");
         // 配置metadata.broker.list, 为了高可用, 最好配两个broker实例
         props.put("metadata.broker.list", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
         // 设置Partition类, 对队列进行合理的划分
         //props.put("partitioner.class", "idoall.testkafka.Partitionertest");
         // ACK机制, 消息发送需要kafka服务端确认
         props.put("request.required.acks", "1");

          props.put("num.partitions", "6");
         ProducerConfig config = new ProducerConfig(props);
         Producer<String, String> producer = new Producer<String, String>(config);
         for (int i = 0; i < 10; i++)
         {
           // KeyedMessage<K, V>
           //   K对应Partition Key的类型
           //   V对应消息本身的类型
//   topic: "test", key: "key", message: "message"
           SimpleDateFormat formatter = new SimpleDateFormat   ("yyyy年MM月dd日 HH:mm:ss SSS");      
           Date curDate = new Date(System.currentTimeMillis());//获取当前时间      
           String str = formatter.format(curDate);   
            
           String msg = "idoall.org" + i+"="+str;
           String key = i+"";
           producer.send(new KeyedMessage<String, String>("idoall_testTopic",key, msg));
         }
       }
}

Consumer端代码:

package com;
import java.util.HashMap;
import java.util.List;  
import java.util.Map;  
import java.util.Properties;  
   
import kafka.consumer.ConsumerConfig;  
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;

public class Consumertest extends Thread{
      
     private final ConsumerConnector consumer;  
    private final String topic;  

    public static void main(String[] args) {  
      Consumertest consumerThread = new Consumertest("idoall_testTopic");  
        consumerThread.start();  
    }  
    public Consumertest(String topic) {  
        consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());  
        this.topic =topic;  
    }  

private static ConsumerConfig createConsumerConfig() {  
    Properties props = new Properties();  
    // 设置zookeeper的链接地址
    props.put("zookeeper.connect","hadoop1,hadoop2,hadoop3,hadoop4,hadoop5,hadoop6:2181");  
    // 设置group id
    props.put("group.id", "1");  
    // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
    props.put("auto.commit.interval.ms", "1000");
    props.put("zookeeper.session.timeout.ms","10000");  
    return new ConsumerConfig(props);  
}  

public void run(){  
     //设置Topic=>Thread Num映射关系, 构建具体的流
    Map<String,Integer> topickMap = new HashMap<String, Integer>();  
    topickMap.put(topic, 1);  
    Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);  
 
    KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
    ConsumerIterator<byte[],byte[]> it =stream.iterator();  
    System.out.println("*********Results********");  
    while(it.hasNext()){  
        System.err.println("get data:" +new String(it.next().message()));  
        try {  
            Thread.sleep(1000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  
}


之后运行代码。

运行Producer端代码,会发现之前的客户端中会接收到代码中发送的消息。

运行consumer端代码,在终端中输入消息,eclipse中会读取到发送的消息,打印出来。

至此,简单的java调用kafka操作完成。

如果将Kafka在zookeeper的默认目录,修改为自定义目录时,在运行过程中会报出java.lang.IllegalArgumentException: Path length must be > 0”错误

网上找了好久,发现别人说这是一个Bug,由于initZk()方法没有对路径进行处理导致

原代码:

private def initZk(): ZkClient = {
    info("Connecting to zookeeper on " + config.zkConnect)
    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
    ZkUtils.setupCommonPaths(zkClient)
    zkClient
}

 

解决代码:

private def initZk(): ZkClient = {
    info("Connecting to zookeeper on " + config.zkConnect)
    val chroot = {
      if (config.zkConnect.indexOf("/") > 0)
        config.zkConnect.substring(config.zkConnect.indexOf("/"))
      else
        ""
    }
    if (chroot.length > 1) {
      val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
      val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
      ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
      info("Created zookeeper path " + chroot)
      zkClientForChrootCreation.close()
    }
    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
    ZkUtils.setupCommonPaths(zkClient)
    zkClient
}

如果无法修改,那么可以将自定义目录修改成原来的默认目录,则不会报错。



附录网上找到的关于server.properties中所有配置参数的说明 http://www.inter12.org/archives/842




本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka介绍,安装以及简单的java调用kafka代码 的相关文章

  • UCOS2的文件目录

    想着闲着也是闲着 把之前学习ucos2源码的笔记整理一下 复盘一次 总结内容将其写为博客作为学习的输出 一 为什么要学RTOS或者IOTOS 我在大一时 开始进入实验室接触单片机 摸爬滚打的参加了几次比赛 也因此入了嵌入式的坑 大三时开始思

随机推荐

  • 一位年薪40W的测试被开除,回怼的一番话,令人沉思

    一位年薪40W测试工程师被开除回怼道 反正我有技术 在哪不一样 一技傍身 万事不愁 当我们掌握了一技之长后 在职场上说话就硬气了许多 不用担心被炒 反过来还可以炒了老板 这一点在码农界特别明显 许多测试人在辞职时 都有一种心态 烂公司 烂领
  • 学习第一天const

    constant 指针与const const char a 指向const对象的指针或者说指向常量的指针 char const a 同上 char const a 指向类型对象的const指针 或者说常指针 const指针 const c
  • ORACLE集群管理-19c RAC ipv6+IPV4双栈配置实战

    关于IPV6支持问题 单实例环境要支持IPV6 数据库版本至少11 2 0 4版本 其实从linux7开始系统默认开启ipv6 怎么确认ipv6是否开启呢 下面介绍两种常见的方法 1 通过查看网卡属性确定 ifconfig a 命令输出有
  • Vue计算两个datetime共多少天

    假如starttime和endtime都是YYYY MM DD HH mm ss类型 将选择器的默认时间格式 object 转换成时间戳 开始时间减去结束时间 时间戳的形式进行运算 s然后转换成天数 通过toFixed函数保留两位小数 th
  • c语言中swap的意思,C语言中swap的作用和用法?

    慕村225694 swap函数一般是一个程序员自定义函数 通常是实现两个变量数值的交换 比如123int a 2 int b 3 swap a b 一般用到变量数值交换 交换后a 3 b 2 实现的方法多种多样 比如下面几种写法 1 通过使
  • Python——类的方法重写、property、运算符重载

    1 super 函数 主要是用来调用父类的方法 在子类中调用父类的方法时进行使用 2 私有方法 私有属性 1 定义方法 在类的内部 使用def关键字可以为类定义一个方法 与一般函数定义不同 类方法必须包含参数self 且为第一个参数 2 私
  • ubuntu搭建vpn步骤

    1 搭建环境 系统 Ubuntu 18 04 4 LTS Bionic Beaver 位置 轻量应用云服务器 2 安装软件 Sudo apt get update Sudo apt get install pptpd Sudo apt ge
  • 【ESP8266】关于调试fatal exception/自动重启的一些经验分享

    本人小白一枚 最近在捣鼓ESP8266的NONOS SDK开发 本来已经写好了一个工程测试基本功能也没什么问题了 但是发现了一个很严重的问题 就是每次一跑上40来分钟的时候 就会宕机重启 自动重启 真是奇了个怪了 本来这也没啥 但出于对稳定
  • 关闭 Ubuntu 中的关机/重启确认的小技巧

    导读 对于 Ubuntu 新手来说 有很多新东西要学 但是网上很多教程不是针对新手的 在这里 我们不走寻常路 不能说全部的教程都是为初学者准备 但至少大部分是 关闭 Ubuntu 中的关机 重启确认 这篇文章也是一篇新手教程 并且展示如何在
  • Android常用控件之悬浮窗

    悬浮窗可以显示在所有应用程序之上 不管在PC机还是Android设备上都有这个 最常见的是360的 加速球 来看下在Android设备上的效果 程序的目录结构如下图 创建Activity后启动Service就关闭 java view pla
  • 基于cordova打包RPGMAKERMV 安卓app

    基于cordova打包RPGMAKERMV 安卓app 1 RPGMakerMV部分 部署出网页项目 2 node部分 https nodejs org en 上下载node左边稳定版 右边是包含最新特性的版本 这是目前的版本可能不一样 设
  • 刷题之图像渲染

    有一幅以二维整数数组表示的图画 每一个整数表示该图画的像素值大小 数值在 0 到 65535 之间 给你一个坐标 sr sc 表示图像渲染开始的像素值 行 列 和一个新的颜色值 newColor 让你重新上色这幅图像 为了完成上色工作 从初
  • Verilog基本语法之循环语句(六)

    循环语句分为以下4种 for语句 通过三个步骤来决定语句的循环执行 1 给控制循环次数的变量赋初值 2 判定循环执行条件 若为假则跳出循环 若为真 则执行指定语句后 转到第三步 3 修改循环变量的值 返回第二步 repeat 连续执行一条语
  • qt 调节win声音版本大小

    QT4 情况下 运行的 会出错 目前暂时没有办法解决在 win下调节音量大小问题 在这里插入代码片 参考资料 QT 对window系统下音量的设置和获取 还有个很好贴子 没有找到
  • vim入门了

    自从上次搞定代码折叠之后 仿佛vim真的入门了 今天又看了一些内容 会复制 粘贴 查找了 更加的感觉入门了 值得庆贺 2012 5 3
  • JZOJ 幽幽子与森林

    题目大意 迷途竹林可以看成是一个n个点的森林 幽幽子定义dis u v 为u到v路径上的边的数量 若u和v不连通则为m 她定义整个森林的危险度为 为了去拜访永琳师匠 幽幽子需要提前知道迷途竹林的危险度 但迷途竹林的形态是时刻变化着的 所以幽
  • 栈系列之 最小栈的实现

    算法专题导航页面 算法专题 栈 栈系列之 栈排序 栈系列之 最小栈的实现 栈系列之 用栈实现队列 栈系列之 递归实现一个栈的逆序 题目 设计一个栈 其拥有常规的入栈 出栈操作外 需要额外具备获取最小元素的功能 其他限制 获取最小元素功能的时
  • 25. TCP协议之TCP中MSS与MTU

    MSS MSS英文全称为Maximum Segment Size 表示最大TCP报文段数据长度 并且MSS只会出现在对端发送SYN段时才会夹带的信息 在三次握手的过程中可以看到这个对端期望能够收到最大的数据段长度 如下 可以看到现在对端的M
  • 阿里云服务器部署node服务(一)

    万事开头难 尝试通过阿里云服务器部署node服务 中间踩了一些坑 借此给自己一个总结 1 远程服务器安装node 1 安装node wget https npmmirror com mirrors node v16 16 0 node v1
  • kafka介绍,安装以及简单的java调用kafka代码

    Producer 消息生产者 向broker发消息的客户端 Consumer 消息消费者 向broker取消息的客户端 Topic 一个队列 主题 Message 消息是kafka处理的对象 在kafka中 消息是被发布到broker的to