java连接kafka测试

2023-11-04

①进入到kafka文件夹中修改配置文件:vim config/server.properties

 

②启动zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties 

端口2181是ZooKeeper的默认端口,可以通过编辑文件config/zookeeper.properties 中的clientPort来修改监听端口。

③ 启动Kafka Broker

bin/kafka-server-start.sh config/server.properties 

④ 创建一个Topic 名称为HelloWorld

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \
> --partitions 1 --topic HelloWorld

校验Topic是否创建成功

bin/kafka-topics.sh --list --zookeeper localhost:2181 

⑥配置pom文件:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

⑦编写一个producer文件:

package com.winter.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "47.91.214.23:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = null;
        try {
            producer = new KafkaProducer<String, String>(properties);
//            producer.send(new ProducerRecord<String, String>("HelloWorld","aaaaaaaaaaaa"));
            for (int i = 0; i < 100; i++) {
                String msg = "This is Message " + i;
                producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
                System.out.println("Sent:" + msg);
            }
        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            producer.close();
        }

    }
}

⑧配置一个customer文件:

package com.winter.kafka;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerDemo {

    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "47.91.214.23:9092");
        properties.put("group.id", "group-1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println("=====================>");
            }
        }

    }
}

 

先动customer 文件,然后启动producer文件,然后观察控制台:

 

 

 

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java连接kafka测试 的相关文章

  • 在 catch 块中重新抛出异常是否有意义?

    从 catch 块中抛出异常只是为了记录消息以便我们确定导致异常的原因是否有意义 Code public void saveLogs Logs logs throws RemoteException try LogsOps saveLogs
  • 如何使用 Java 以独立于平台的方式读取 Windows 共享驱动器?

    如何使用 Java 从 Windows 共享驱动器中读取数据 以便执行读取的 Java 代码可以在任何平台上同样正确地运行 您可以使用JCIFS http jcifs samba org 使用纯 Java 代码访问 SMB CIFS 共享
  • 如何在 Java 中验证从 Azure AD B2C 生成的 JWT 令牌?

    我正在寻找 Java 代码示例来验证 Azure AD B2C 令牌 我们可以使用哪些依赖项 所有 JWT 令牌的 JWT 令牌验证步骤或代码是否相同还是会有所不同 我们的项目中没有使用 Spring Security 有大量的图书馆her
  • 如何修复安装 maven jar 插件依赖项时出现的错误?

    我正在将应用程序制作成 maven 中的 jar 文件 但是 当我从 Maven 中提取 jar 插件存储库并在终端中运行这三个命令时 mvn clean mvn compile mvn package 在 mvn package 中 我收
  • cygwin有java sdk吗?

    cygwin有java sdk吗 如果有一个使用 cygwin 文件系统和 X windows 进行显示的本机 cygwin 实现 那就太好了 不幸的是我不知道这样的版本 我认为移植 OpenJDK 也需要付出很大的努力 但我还没有尝试过
  • Java 8 中异常类型推断的一个独特功能

    在为该网站上的另一个答案编写代码时 我遇到了这个特性 static void testSneaky final Exception e new Exception sneakyThrow e no problems here nonSnea
  • 枚举内的枚举

    这不是我被卡住的问题 而是我正在寻找一种简洁的方式来编写我的代码 本质上 我正在编写一个事件驱动的应用程序 用户触发事件 事件被发送到适当的对象 然后对象处理事件 现在我正在编写偶数处理程序方法 我希望使用 switch 语句来确定如何处理
  • Logback 配置在单行上有异常吗?

    我的日志被提取 传输并合并到 elasticsearch 中 多行事件很难跟踪和诊断 有没有办法使用收集器和正则表达式将异常行分组到单个记录中登录配置 https logback qos ch manual layouts html xTh
  • 在Java中读取制表符分隔的文件

    我有以下代码来读取 Java 中的制表符分隔文件 while str in readLine null if str trim length 0 continue String values str split t System out p
  • 如何在 Java 中读取/转换 InputStream 为字符串?

    如果你有一个java io InputStream对象 您应该如何处理该对象并生成一个String 假设我有一个InputStream包含文本数据 我想将其转换为String 例如我可以将其写入日志文件 最简单的方法是什么InputStre
  • 如何使 JFileChooser 仅显示具有某些特定名称 Java 的文件夹

    有什么方法可以让 JFileChooser 加载时仅显示名称为 Hello 的文件夹 这是我的代码 它显示所有文件夹以及扩展名为 py 和 java 的文件 我想添加文件夹名称限制 FileNameExtensionFilter filte
  • oracle.jdbc.driver.OracleDriver ClassNotFoundException

    这是我收到错误的代码 我的classes12 jar已作为外部 jar 导入 import java io IOException import java io PrintWriter import java sql Connection
  • Java如何区分这些具有相同名称/签名的多个方法?

    今天我在追踪一个错误 我注意到我们的一个班级中有一些奇怪的事情 我删除了尽可能多的代码并发布在这里 class A static int obtainNumber return 42 static int obtainNumber retu
  • 如何将 .txt 文件的最后 5 行读入 java

    我有一个包含多个条目的文本文件 例如 hello there my name is JoeBloggs 我如何按降序阅读最后五个条目 即来自 JoeBloggs 那里 我目前有代码只能读取最后一行 public class TestLast
  • 如何强制 Spark 执行代码?

    我如何强制 Spark 执行对 map 的调用 即使它认为由于其惰性求值而不需要执行它 我试过把cache 与地图调用 但这仍然没有解决问题 我的地图方法实际上将结果上传到 HDFS 所以 它并非无用 但 Spark 认为它是无用的 简短回
  • 如何为用户的活动设置计时器?

    如果用户在 5 小时内停止工作 我需要执行特定的方法 假设用户已登录 但他在 5 小时内没有向数据库的特定表添加任何记录 任何时候用户将记录添加到指定的表中 该特定用户的计时器都应该重置 否则它将继续运行 如果达到 5 小时 应用程序应显示
  • 带有客户端认证连接的 HTTP 客户端的 SOAP 请求超时异常

    我正在尝试点击具有客户端认证的网址并生成密钥 keytool genkey alias server keyalg RSA keystore example jks validity 10950 和密钥存储 keytool import t
  • 将Json字符串映射到java中的map或hashmap字段

    假设我从服务器返回了以下 JSON 字符串 response imageInstances one id 1 url ONE two id 2 url TWO 杰克逊代码大厦 JsonProperty 我怎样才能得到HashMap对象出来了
  • 每次修改代码时都必须 mvn clean install

    我不是来自 Java 世界 但我必须为我的一个项目深入研究它 我不明白为什么每次修改或更新代码时 都必须 mvn clean install 来调试代码的最新版本 你知道为什么吗 尝试按Ctrl Shift F9 热插拔 有时会有所帮助
  • 仅当用户开始输入时清除 JavaFX TextField 中的提示文本

    默认行为是当字段获得焦点时 字段中的提示文本将被删除 那是标记在场上的时候 是否可以配置文本字段 以便仅在用户开始输入时删除提示文本 否则 我需要在每个文本字段旁边 上方添加一个标签 以描述其中的值 我知道它有点旧 但我自己也需要它 这仍然

随机推荐