从 Kafka 多次读取同一条消息

2023-11-24

I use 春季卡夫卡 API通过手动偏移量管理来实现 Kafka 消费者:

@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
    if (someCondition) {
        acknowledgment.acknowledge();
    }
}

在这里,我希望消费者仅在以下情况下提交偏移量:someCondition成立。否则消费者应该睡一会儿并阅读同一条消息 again.

卡夫卡配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
    factory.getContainerProperties().setAckMode(MANUAL);
    return factory;
}

private Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    ...
    return props;
}

在当前配置下,如果someCondition == false,消费者不提交偏移量,但仍然读取下一条消息。如果卡夫卡有没有办法让消费者重读消息acknowledgement没有执行?


正如@Gary 已经指出的,你的方向是正确的,seek()就是这样做的方法。今天,当我遇到这个问题时,我找不到它的代码示例。这是任何想要解决问题的人的代码。

public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {

    private ConsumerSeekCallback consumerSeekCallback;


    @Override
    public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {

        if (/*some condition*/) {
            //process
            acknowledgment.acknowledge(); //send ack
        } else {

            consumerSeekCallback.seek("your.topic", record.partition(), record.offset());

        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
        this.consumerSeekCallback = consumerSeekCallback;
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 Kafka 多次读取同一条消息 的相关文章

  • 删除 PriorityQueue 的顶部?

    假设我使用 Java util 中的 PriorityQueue 类 我想从 PriorityQueue pq 中删除最大的数字 我们假设它位于队列的头部 下面的工作会起作用吗 1 int head pq peek pq dequeue h
  • Spring MVC 应用程序可以是多线程的,即使它的 servlet 不是吗?

    当您谈论 Spring 应用程序是多线程时 您是否一定是指该应用程序中定义的 servlet 是否是多线程的 或者即使应用程序中的 servlet 不是多线程 Spring 应用程序也可以配置为多线程吗 不再支持单线程 servlet 它们
  • 继续使用 sketch.js 编辑草图图像 [关闭]

    Closed 这个问题需要细节或清晰度 help closed questions 目前不接受答案 我正在使用 sketch js 中的示例 http intridea github io sketch js http intridea g
  • Java程序验证signtool签名的数字签名

    我已经使用 SignTool 对文件 exe 或 dll 不是 jar 文件 进行了数字签名 Signtool还可以验证数字签名 但我的要求是使用java程序检查由signtool签名的文件的数字签名 我在互联网上搜索但没有找到任何信息 您
  • Hazelcast Jet 变更数据捕获

    我在我的应用程序中使用 Hazelcast 更改数据捕获 CDC 我使用 CDC 的原因是 如果使用 jdbc 或其他替代功能将数据加载到缓存中 会花费大量时间 所以CDC将在数据库和 Hazelcast Jet 之间进行数据同步 Stre
  • Java GUI,根据actionListener更改面板

    我在两个不同的面板中添加了两个按钮 如果单击第一个按钮 则需要转到下一个面板 其中包含第二个按钮 但是当我单击第一个按钮时 该按钮没有被替换 Java GUI import java awt event ActionEvent import
  • 如何知道 Solr Optimize 何时完成?

    我正在使用 Solr php client 通过 php 与 Solr 进行通信 这段代码触发solr优化命令 solr gt optimize 请问有没有什么方法可以确定优化完成了 这都是因为我的网站上有一个管理页面 我每天必须手动优化
  • Java 会话变量

    我听说有些人认为在会话中将信息存储在服务器上是一个坏主意 因为它不安全 因此 在多页面业务流程功能中 应用程序将数据写入数据库 然后在需要时检索信息 在会话中存储私人信息是否一定不安全 只要会话本身安全 在会话中存储属性就不存在安全风险劫持
  • 在 Scala 中创建 Java 对象

    我有一个 Java 类 Listings 我在 Java MapReduce 作业中使用它 如下所示 public void map Object key Text value Context context throws IOExcept
  • IntelliJ 对于 Java 项目使用的默认构建过程是什么?

    直接从 IntelliJ 中的 IDE 构建 Java 项目非常好 它速度很快 而且很有效 我无法找到任何有关 IntelliJ 如何进行这些默认构建的文档 我猜它使用Ant 我想做的是为下载我的项目的任何人自动化这个快速 轻松的构建过程
  • java代码的等效vb代码

    谁能告诉我这段Java代码到底做了什么 SecureRandom random SecureRandom getInstance SHA1PRNG byte bytes new byte 20 synchronized random ran
  • Android Studio安装JDK错误

    In Android Studio I am facing bellow error 当我按下时会显示此弹出窗口Alt Enter对于缺少的类 符号 当我点击 setup SDK 时 它显示两个选项 1 8 Java版本 1 8 0 60
  • spring-hibernate 花费更多时间的任何原因?

    目前 我正在春季和冬眠期间从事一个项目 我来到这里 获取记录并在 JSP 中显示这些记录需要更多时间 我在各处都保留了时间戳 以查看哪里花费了更多时间 Time HomeController start 2014 07 09 18 58 5
  • 选择活动时运行时崩溃

    首先我想说我几乎没有 Android 经验 这是我在 Android 中的第一个项目 而且我的老师不太擅长教学 所以我对任何过度的无知表示歉意 在进一步讨论之前先解释一下 我的应用程序的目标本质上是能够记录您在某些活动上花费了多少时间 记录
  • 如果使用 Maven,是否应该忽略 VCS 中 Eclipse 特定的文件?

    我知道为什么不将 Eclipse IDE 特定的文件提交到像 Git 我实际上正在使用的 这样的 VCS 中 这就是我使用 Maven 并让它为您生成这些文件的原因之一not将它们置于版本控制之下 但我想知道 是否应该在 gitignore
  • 从 Spring 启动运行 Java 类

    我使用的是Java8和Spring4 3 1 我有一个 Java Spring 应用程序托管由浏览器和移动应用程序客户端访问的 RESTfult 服务 其次 我编写了一个侦听事件的聊天服务器 socket io 来自客户 该聊天服务器正在从
  • Selenium 查看鼠标/指针

    有什么方法可以在运行测试时真正看到硒鼠标吗 要么是 Windows 光标图像 要么是某种点或十字线或任何东西 我正在尝试使用拖放功能selenium and java in an HTML5Web 应用程序 并且能够看到光标以了解它实际在做
  • 无法读取使用 DataOutputStream 发送的号码

    这是我的客户端代码 Random rand new Random int n rand nextInt 50 1 DataInputStream dis new DataInputStream socket getInputStream D
  • 尝试 Catch 性能 Java

    当捕获异常而不是进行检查时 try catch 需要多长时间 以纳秒为单位 假设消息具有用于查找的 HashMap 类型性能 try timestamp message getLongField MessageField TIMESTAMP
  • 这种说法是否恰当。 if (0 != 表达式或变量) {} 在java中? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi

随机推荐