Kafka 0.10 Java消费者不从主题读取消息

2023-12-30

我有一个简单的 java 生产者,如下所示

public class Producer 
{
    private final static String TOPIC = "my-example-topi8";
    private final static String BOOTSTRAP_SERVERS = "localhost:8092";

    public static void main( String[] args ) throws Exception {
        Producer<String, byte[]> producer = createProducer();
        for(int i=0;i<3000;i++) {
            String msg = "Test Message-" + i;
            final ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
            producer.send(record).get();
            System.out.println("Sent message " + msg);
        }
        producer.close();
    }

    private static Producer<String, byte[]> createProducer() {
        Properties props = new Properties();
        props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("client.id", "AppFromJava");
        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("compression.codec", "snappy");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer<String, byte[]>(props);
    }
}

我正在尝试读取如下数据

public class Consumer 
{
    private final static String TOPIC = "my-example-topi8";
    private final static String BOOTSTRAP_SERVERS = "localhost:8092";

    public static void main( String[] args ) throws Exception {
        Consumer<String, byte[]> consumer = createConsumer();
        start(consumer);
    }

    static void start(Consumer<String, byte[]> consumer) throws InterruptedException {
        final int giveUp = 10;   
        int noRecordsCount = 0;
        int stopCount = 1000;

        while (true) {
            final ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(1000);
            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }


            consumerRecords.forEach(record -> {
               // Process the record System.out.printf("\nConsumer Record:(%s, %s, %s)", record.key(), new String(record.value()), record.topic());
            });

            consumer.commitSync();
            break;
        }
        consumer.close();
        System.out.println("DONE");
    }

    private static Consumer<String, byte[]> createConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                    BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                                    "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ByteArrayDeserializer.class.getName());
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
        props.put("enable.auto.commit", "false");

        // Create the consumer using props.
        final Consumer<String, byte[]> consumer = new KafkaConsumer(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
        return consumer;
    }
}

但消费者没有阅读来自 kafka 的任何消息。如果我在最开始添加以下内容start()

consumer.poll(0);
consumer.seekToBeginning(consumer.assignment());

然后消费者开始阅读该主题。但是每次消费者重新启动时,它都会从我不想要的主题开头读取消息。如果我在启动 Consumer 时添加以下配置

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

然后它从主题读取消息,但如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。

有人可以让我知道出了什么问题以及如何解决这个问题吗?

Kafka Broker 和 Zookeeper 使用默认配置运行。


您对 commitSync() 的调用是确认来自最后一次 poll() 的批次中的所有消息,而不仅仅是您正在处理的每一条消息,这就是我认为您正在尝试做的事情。

从文档中

“上面的示例使用 commitSync 将所有收到的记录标记为已提交。在某些情况下,您可能希望通过显式指定偏移量来更好地控制已提交的记录。在下面的示例中,我们在处理完每个分区中的记录后提交偏移量。

 try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {
   consumer.close();
 }

注意:提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。因此,当调用 commitSync(offsets) 时,您应该在最后处理的消息的偏移量上加一。 ”

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 0.10 Java消费者不从主题读取消息 的相关文章

随机推荐

  • YouTube Android API:YouTubePlayerFragment 加载微调器

    我正在使用 Android YouTube API 示例在我的应用程序中创建一个 chromeless YouTube 播放器 我遇到的问题是 即使视频已加载并开始播放 缓冲 加载进度条仍会继续显示在视频上 我可以在FragmentDemo
  • 测试 LDAP 与 Active Directory 服务器的连接

    我正在编写一个访问 LDAP 服务器 当前为 OpenLDAP 的 Linux 应用程序 我还想使用 Active Directory 服务器测试该应用程序 因为它可能会在生产中使用 Microsoft 是否有任何免费演示版或试用版可以让我
  • 快速 C++ 单生产者单消费者实现

    我正在寻找一个单生产者 单消费者的 FIFO 实现 它的执行速度比普通的锁定 写入 解锁 信号 waitForSignal 锁定 读取 解锁的东西更快 我正在寻找大多数 POSIX 操作系统 x86 特定的很好 支持的用 C 或 C 编写的
  • 如何以 DRY 方式定义变量

    假设我有一个写入 S3 存储桶的函数 因此 存储桶名称显然是一个 变量 不应硬编码到函数中 例如 开发与生产可能是不同的存储桶 现在 如果我希望函数能够访问它 我需要在至少两个地方使用存储桶名称 在该功能的 IAM 策略中 允许访问存储桶
  • 如何从函数内部确定函数名称

    如果我有一个像这样的 Bash 脚本 bin bash f echo function name f in this case 有什么办法可以做到这一点吗 这可以用在帮助消息中 例如 printf Usage s blah blah bla
  • 当 Facebook 用户在应用程序内发送“发送”对话框时,应用程序可以跟踪收件人吗?

    当用户打开 发送 对话框时 例如 他们可以在 收件人 行中添加或删除人员 如果用户实际按下 发送 应用程序是否有办法跟踪 收件人 行中的人员 不 至少目前不 如果你看一下官方文档 https developers facebook com
  • 在下划线中使用 debounce 函数

    我使用 underscore js 来运行任务 debounce task 100 如何停止执行 debounce debounce不执行任何操作 因此您无法阻止它 它returns一个负责去抖的新函数 如果不想再使用 就使用原来的功能即可
  • 使用 jquery 仅选择按钮前带有某个类的

    我有这个 html 标记 我想使用 jQuery 选择一些行 当按下 cerrar 类的按钮时 我想选择3 tr 在同一个按钮之前有类 hidden tr class main td class table sub title Test t
  • OO 设计原则适用于 Python 吗?

    似乎许多 OO 讨论都使用 Java 或 C 作为示例 例如 Head First 设计模式 这些模式同样适用于 Python 吗 或者如果我遵循设计模式 我最终会用 Python 编写 Java 这显然是一件非常糟糕的事情 最大的区别是
  • 如何设置 JFrame 的最小大小,以阻止用户将其大小调整为更小?

    我有一个JFrame不能小于特定尺寸 否则元素无法正确布局 我尝试过了setMinimumSize 并覆盖getMinimumSize 该框架的方法 但我仍然可以将框架调整为更小 那么 我必须倾听我的界限的变化吗 componentList
  • 使用IDEA的内容设计器时contentPane不能为null

    因此 我正在尝试使用 IntelliJ IDEA 的内容设计器来创建一个简单的 GUI 并且我遵循了使用它的所有指南 但是当从 IDEA 运行时 尚未编译成 JAR 它会返回以下错误 Exception in thread main jav
  • 如何保持 JButton 的透明度 (java)

    我正在制作一款坦克游戏 在我的菜单中 我想使用图片作为 jbuttons 它们是部分透明的 当它们出现在屏幕上时 透明部分会变成白色 i tried using setOpaque but this doesn t work i can t
  • Numpy:array1 中同时也是 array2 元素的元素的掩码

    我想知道是否有一种 numpy 自然的方法可以为 array2 中的元素在 array1 上创建二进制掩码 另一种说法是 数组 1 上的二进制掩码用于数组 1 和 2 的交集 这有效 def bin mask a b return sum
  • MongoDB 删除所有数据库

    我想知道是否有命令可以从我的 MongoDB 中删除所有数据库 我知道如果我只想删除一个数据表 我只需要输入数据库的名称 如下面的代码 但我不想指定它 mongo DB NAME eval db dropDatabase 您可以创建一个 j
  • 为什么 Socket.AcceptAsync 不触发 SocketAsyncEventArgs Completed 事件?

    我正在开发一个服务器应用程序 它将接收消息并做出响应 没什么新鲜的 所以 实际上我正在关注这个答案 https stackoverflow com questions 869744 how to write a scalable tcp i
  • 如何在javafx WebView中启用HTML5本地存储

    有没有办法在 javafx 2 2 WebView 上启用 HTML5 本地存储 无论我做什么 本地存储似乎都被禁用 不可用于 javafx 提供的 WebView 我什至使用它进行了测试http html5test com http ht
  • 如何获取html5音频的持续时间

    我有一个html5
  • 如何从 fopen FILE 结构中获取文件句柄?

    The fopen http msdn microsoft com en us library yeby3zcb aspx函数返回一个指向a的指针FILE结构 应将其视为不透明的值 而不处理其内容或含义 在 Windows 上 C 运行时是
  • C++ 继承。更改对象数据类型

    我在强制更改我自己的对象上的数据类型时遇到问题 我有一个基类说A和两个派生自的类A called B and C 我传递物体B and C到一个检查它是什么类型的对象的函数 B or C 下面是一些示例代码以及我的问题的问题 enum Cl
  • Kafka 0.10 Java消费者不从主题读取消息

    我有一个简单的 java 生产者 如下所示 public class Producer private final static String TOPIC my example topi8 private final static Stri