为什么消费者在使用 Java 客户端 API 在 DC/OS 上消费来自 Kafka 的消息时会挂起?

2024-02-24

我在 AWS 上的 DC/OS (Mesos) 集群上安装了 Kafka。启用三个代理并创建一个名为“topic1”的主题。

dcos kafka topic create topic1 --partitions 3 --replication 3

然后我编写了一个 Producer 类来发送消息,并编写了一个 Consumer 类来接收消息。

public class Producer {
    public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
        Map<String, Object> producerConfig = new HashMap<>();
        System.out.println("setting Producerconfig.");
        producerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");

        ByteArraySerializer serializer = new ByteArraySerializer();
        System.out.println("Creating KafkaProcuder");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
        for (int i = 0; i < 100; i++) {
            String msgstr = msg + i;
            byte[] message = msgstr.getBytes();
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
            System.out.println("Sent:" + msgstr);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        sendMessage("Kafka test message 2/27 3:32");
    }

}

public class Consumer {
    public static String getMessage() {
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
        consumerConfig.put("group.id", "dj-group");
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

        kafkaConsumer.subscribe(Arrays.asList("topic1"));
        while (true) {
            ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
            System.out.println(records.count() + " of records received.");
            for (ConsumerRecord<byte[], byte[]> record : records) {
                System.out.println(Arrays.toString(record.value()));
            }
        }
    }

    public static void main(String[] args) {
        getMessage();
    }
}

首先我跑了Producer在集群上将消息发送到topic1。然而当我跑的时候Consumer,它无法接收任何东西,只是挂起。

Producer正在工作,因为我能够通过运行 Kafka 安装附带的 shell 脚本来获取所有消息

./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning

但为什么我不能接收Consumer? This post https://github.com/SOHU-Co/kafka-node/issues/11建议具有旧偏移量的 group.id 可能是一个可能的原因。我只在消费者中而不是生产者中创建 group.id 。如何配置该组的偏移量?


事实证明,kafkaConsumer.subscribe(Arrays.asList("topic1"));正在引起poll()挂起来。根据Kafka Consumer收不到消息 https://stackoverflow.com/questions/34414308/kafka-consumer-does-not-receive-messages,有两种方法可以连接到主题,assign and subscribe。我更换后subscribe使用下面的几行,它开始工作。

    TopicPartition tp = new TopicPartition("topic1", 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);

然而,输出显示了预期之外的数字数组(生产者发送的字符串)。但我想这是一个单独的问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么消费者在使用 Java 客户端 API 在 DC/OS 上消费来自 Kafka 的消息时会挂起? 的相关文章

  • 如何自定义BlockingQueue的阻塞行为

    我想创建一个阻塞队列 它根据自定义规则而不是队列中的项目数量来阻止生产者 例如 生产者生成一些文件并放入队列中 消费者经过一番分析后将它们转移到特定位置 对于上述场景 如果队列中的总文件大小达到某个阈值 我希望生产者等待生成新文件 如果总大
  • 如何使用 SLF4J 和 Log4j2 记录 FATAL(或任何自定义日志级别)

    我有那些具体的要求 需要能够登录FATAL level 需要使用SLF4J 需要使用Log4j2 现在 这是我的执行 final Logger logger LoggerFactory getLogger HelloWorld class
  • Hibernate OneToMany 关系是 PersistentBag 而不是 List

    我正在 javafx 中开发一个应用程序 它通过 RMI 与 EAR 连接 该 EAR 连接到 SQLServer DB 并使用 hibernate 映射 POJOS 这些 POJOS 包含双向 OneToMany 和 ManyToOne
  • V8 如何管理它的堆?

    我知道V8的垃圾收集在工作时 会从GC的root开始追踪 这样无法到达的对象就会被标记然后被清除 我的问题是GC是如何遍历那些对象的 必须有一个数据结构来存储所有可达或不可达的对象 位图 链接表 顺便说一句 JVM 也做同样的事情吗 艾伦秀
  • 有效地查找正则表达式的所有重叠匹配项

    这是后续与 java 正则表达式匹配的所有重叠子字符串 https stackoverflow com q 11303309 244526 有没有办法让这段代码更快 public static void allMatches String
  • 如何在Spring的applicationContext.xml中指定默认范围来请求范围?

    我想让所有 bean 请求默认作用域 但是 Spring 文档说默认作用域是 Singleton 第 3 4 1 和 3 4 2 节http static springsource org spring docs 2 5 x referen
  • 如何正确配置Tomcat SSLHostConfig?

    我正在按照本教程在 tomcat 中启用 ssl https medium com raupach how to install lets encrypt with tomcat 3db8a469e3d2 https medium com
  • Java:检查给定日期是否在当前月份内

    我需要检查给定的日期是否在当前月份 我编写了以下代码 但 IDE 提醒我getMonth https docs oracle com javase 7 docs api java util Date html getMonth and ge
  • 业务代表与服务定位器

    Business Delegate 和 Service Locator 之间有什么区别 两者都负责封装查找和创建机制 如果 Business Delegate 使用 Service Locator 来隐藏查找和创建机制 那么 Busines
  • 如何消除警告:使用“$”而不是“.”对于 Eclipse 中的内部类

    我是 Android 开发新手 当我将 eclipse 和 Android SDK 更新到最新版本后 我收到警告 Use instead of for inner classes or use only lowercase letters
  • 独占锁定ConcurrentHashMap

    我知道不可能锁定 ConcurrentHashMap 进行独占访问 但是 我找不到原因 是因为构成CHM的 Segment 没有被api公开吗 据推测 如果是的话 客户端代码可以执行 交接 锁定 Cheers 我知道不可能锁定 Concur
  • 在 Spring 中设置 WS https 调用超时 (HttpsUrlConnectionMessageSender)

    我正在尝试为 WS 调用设置超时 我延长了WebServiceGatewaySupport并尝试将发送者超时设置为如下 public Object marshalSendAndReceive Object requestPayload We
  • getClassLoader().getResource() 返回 null

    我有这个测试应用程序 import java applet import java awt import java net URL public class Test extends Applet public void init URL
  • 会话 bean 中的 EntityManager 异常处理

    我有一个托管无状态会话 bean 其中注入了 EntityManager em 我想做的是拥有一个具有唯一列的数据库表 然后我运行一些尝试插入实体的算法 但是 如果实体存在 它将更新它或跳过它 我想要这样的东西 try em persist
  • java JFileChooser 文件大小过滤器

    我知道我可以按文件类型进行过滤 但是可以按文件大小进行过滤吗 例如 JFileChooser 仅显示 3 MB 以内的图片 简短的回答应该是 你尝试过什么 长答案是肯定的 JFileChooser fc new JFileChooser f
  • 如何在将数据发送到 Firebase 数据库之前对其进行加密?

    我正在使用 Firebase 实时数据库制作聊天应用程序 我知道 Firebase 非常安全 只要您的规则正确 但我自己可以阅读使用我的应用程序的人的所有聊天记录 我想阻止这种情况 为此我需要一种解密和加密方法 我尝试使用凯撒解密 但失败了
  • 使用外部硬盘写入和存储 mysql 数据库

    我已经设置了 mysql 数据库在我的 Mac 上使用 java 和 eclipse 运行 它运行得很好 但现在我将生成大约 43 亿行数据 这将占用大约 64GB 的数据 我存储了大量的密钥和加密值 我有一个 1TB 外部我想用作存储位置
  • 哪种 Java DOM 包装器是最好或最受欢迎的? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 为什么在尝试使用 Java 连接到 RDS PostgreSQL 数据库时会收到 SocketTimeoutException?

    我有一个 Spring 应用程序 我试图在 AWS 上托管 几天来我一直在努力配置 我有一个 EC2 实例 并且能够通过 SSH 连接到它 我还在 AWS 中设置了 Postgres RDS 数据库 但我无法使用 IDE 中的代码连接到它
  • 如何使用 Spring AOP 建议静态方法?

    在执行类的静态方法之前和之后需要完成一些日志记录 我尝试使用 Spring AOP 来实现这一点 但它不起作用 而对于正常方法来说它起作用 请帮助我理解如何实现这一点 如果可以使用注释来完成 那就太好了 也许您应该在使用 Spring AO

随机推荐

  • 为 UIButton 添加背景阴影

    如何给图片添加浅灰色阴影UIButton 我现在不想要一个方法来做到这一点 它应该是这样的 UIButton button1 button1 layer shadowOpacity 0 8 等等 但这不起作用 它只会在按钮内部添加阴影 但我
  • JUnit 5 - 为整个测试套件提供设置和拆卸

    我的要求是为一组测试完成一些初始化 并在所有测试完成后将其清除 这些测试涉及一些测试类 因为它们并不密切相关 但需要共同的初始化 我在用 SelectClasses形成套件并尝试利用 ExtendWith使用应该处理预处理和后处理的自定义扩
  • 如何使用 Ruby on Rails 操作 DOM

    正如标题所说 我有一些 DOM 操作任务 例如 我想 找到所有具有蓝色的 H1 元素 查找所有大小为 12px 的文本 ETC 我怎样才能用 Rails 做到这一点 谢谢 Update 我一直在根据这篇论文做一些关于提取网页内容的研究 gt
  • 科尔多瓦人行横道不建造

    我相信我下载过的每个版本的 Crosswalk Cordovahttps download 01 org crosswalk releases crosswalk android https download 01 org crosswal
  • 带有 Express.js 虚拟主机的子域上的 Socket.IO

    我的服务器上运行着两个 Express js 应用程序 一个普通的应用程序称为 main app 另一个使用 Socket IO 的应用程序称为 socket app 我在 mydomain com 上运行 main app 在 socke
  • 构建 VLC 时需要 NDKv8b 或更高版本

    我已经在 android 中构建了 VLC 并使用代码实现 jack export ANDROID SDK android sdk jack export ANDROID NDK android ndk r9d jack export PA
  • 设置 UITableViewCell 的动态高度,其中仅包含图像(可变高度)

    我有一个 UITableView 其单元格仅包含可变高度的图像 我根据图像动态设置行高 它不能完美工作 滚动时的图像有时会重叠 这是代码 NSInteger numberOfSectionsInTableView UITableView t
  • Symfony2 中的路由

    如何在 Symfony2 中设置默认路由 在 Symfony1 中它看起来像这样 homepage url param module default action index default symfony url symfony acti
  • 是什么阻止我在我的开发盒上专门使用 IIS Express?

    我只阅读了一些有关 IIS Express 的内容 并且正在阅读并立即安装 看来我应该能够正确卸载 IIS 并在 Visual Studio 2010 SP1 中开发 调试基于 Web 服务器的技术时使用 IIS Express 这是一个明
  • Facebook 不从 og:image 加载图像

    它已经可以工作了 但今天我很惊讶 当我尝试分享链接时 Facebook 没有显示来自 og image 元标记的图像 from 脸书调试器 http developers facebook com tools debug og object
  • 使用 Google 脚本永久删除我的电子邮件的脚本

    Gmail Users Messages remove userId id 是如何工作的 电子邮件的 ID 是什么 永久删除电子邮件的正确功能是什么 就我而言 我想立即明确地删除所有已发送的电子邮件 这是我从某人那里获取的一些代码 只是带有
  • Python 和“re”

    我在 python 中关于 Regex 的教程解释了如何在 python 中使用 re 模块 我想从 A 标签中获取 URL 因此知道 Regex 我编写了正确的表达式 并在我选择的正则表达式测试应用程序中对其进行了测试并确保它工作了 当放
  • 如何在 OSX Catalina (10.15) 中使用 Wireshark 捕获 USB 流量

    我正在尝试让 Wireshark USB 捕获工作 显然 Catalina 之前的技巧就是调出界面 以便 Wireshark 可以看到它 https forums developer apple com thread 95380 https
  • ggplot2 具有多个参数的绘图函数

    我得到了这个功能 change lt function score d k p k score 1 1 k d p 我想在一个图中绘制该函数对于一系列参数 d 和 p 的所有结果 在 r 基数中 就是这样 parameters lt c 1
  • Django:如何重定向到表单起源的页面

    在我的 Django 应用程序中 我有多个页面显示一个链接 该链接加载一个显示表单的新页面 提交表单后 重定向到访问该表单的原始页面的最简洁方法是什么 原始页面 gt 表单页面 gt 原始页面 使用下一个变量似乎不太优雅 因为我必须将其设置
  • 证书基本约束的路径长度

    对于 CA 类型的基本约束 路径长度为 0 和 None 是否相同 澄清一下 路径长度为 0 是否意味着 CA 不能颁发证书 而路径长度为 none 是否意味着它可以颁发无限数量的证书 取自RFC 5280 https www rfc ed
  • 为什么人们在 PHP 框架中使用单例

    好吧 伙计们 我很难理解为什么需要单例 让我们举一个真实的例子 我有一个 CMS 框架我需要一个记录一些信息的类 让我们继续使用 PHP Example class Logger private logs array public func
  • 插入特定行数

    我正在尝试插入特定数量的行 现在我正在使用下面的代码插入 4 行 我正在尝试编写一行代码 该代码将根据特定单元格中的数字插入特定数字或行 例如 如果我想插入 4 行 并且单元格 A2 是我可以更改要添加的行数的单元格 那么我将使用什么代码根
  • ld:找不到 -lgcc_s.10.5 Xcode 的库

    ld library not found for lgcc s 10 5 collect2 ld returned 1 exit status Command Developer Platforms iPhoneSimulator plat
  • 为什么消费者在使用 Java 客户端 API 在 DC/OS 上消费来自 Kafka 的消息时会挂起?

    我在 AWS 上的 DC OS Mesos 集群上安装了 Kafka 启用三个代理并创建一个名为 topic1 的主题 dcos kafka topic create topic1 partitions 3 replication 3 然后