从主题读取后立即异步提交消息

2024-04-03

我正在尝试在阅读主题后立即提交一条消息。我已点击此链接(https://www.confluence.io/blog/apache-kafka-spring-boot-application https://www.confluent.io/blog/apache-kafka-spring-boot-application)用 spring 创建一个 Kafka 消费者。通常情况下,它工作得很好,消费者收到消息并等待,直到另一个人进入队列。但问题是,当我处理这些消息时,需要花费很多时间(大约10分钟),kafka队列认为该消息没有被消费(提交),并且消费者一次又一次地读取它。我不得不说,当我的处理时间少于 5 分钟时,它运行良好,但当它持续更长的时间时,它不会提交消息。

我已经寻找了一些答案,但它对我没有帮助,因为我没有使用相同的源代码(当然还有不同的结构)。我尝试发送异步方法并异步提交消息,但失败了。 一些来源是:

Spring Boot Kafka:由于组已经重新平衡,因此无法完成提交 https://stackoverflow.com/questions/51631894/spring-boot-kafka-commit-cannot-be-completed-since-the-group-has-already-rebala/51633247

https://www.confluence.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/ https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o

Kafka 0.10 Java消费者不从主题读取消息 https://stackoverflow.com/questions/48826279/kafka-0-10-java-consumer-not-reading-message-from-topic

https://github.com/confluenceinc/confluence-kafka-dotnet/issues/470 https://github.com/confluentinc/confluent-kafka-dotnet/issues/470

主要课程在这里:


@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaApp .class, args);
    }


消费者类(我需要在其中提交消息)

@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */

        Properties  props=prope.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
        try {

            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 
            /*This works fine when the processLaunch method takes less than 5 minutes, 
            if it takes longer the consumer will get the same message from the topic and start again with this operation 
            */

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("End of consumer method ");

    }

    }


如何在从队列中读取消息后立即提交该消息。

我想确保当我收到消息时我会立即提交该消息。现在,当我完成 (System.out.println) 之后的方法的执行时,消息就会被提交。那么有人可以告诉我该怎么做吗?

- - - 更新 - - - -

抱歉回复晚了,但正如 @GirishB 所建议的那样,我一直在查看 GirishB 的配置,但我不知道在哪里可以定义我想要从配置文件(applications.yml)中读取/侦听的主题。我看到的所有示例都使用与此类似的结构(http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html)。是否有任何选项可以读取其他服务器中声明的主题?使用与此类似的 @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")

=========== 解决方案 1 ======================================= ===

我遵循@victor gallet的建议并包含了消费者属性的声明,以便在消费方法中容纳“Acknowledgment”对象。我也关注了这个链接(https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main/java/org/s1p/CommonConfiguration .java https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main/java/org/s1p/CommonConfiguration.java)来获取我用来声明和设置所有属性的所有方法(consumerProperties、consumerFactory、kafkaListenerContainerFactory)。我发现的唯一问题是 “new SeekToCurrentErrorHandler() ”声明,因为我收到一个错误,目前我无法解决它(如果有人向我解释的话那就太好了)。


@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;


   @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();

        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        //factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProperties());
    }

     @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<>();
        Properties  propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
        //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
        //props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        //props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
        //props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer")); 
        return props;
    }




    @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
        acknowledgment.acknowledge();// commit immediately
        Properties  props=prop.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
        try {

            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 
            /*This works fine when the processLaunch method takes less than 5 minutes, 
            if it takes longer the consumer will get the same message from the topic and start again with this operation 
            */

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("End of consumer method ");

    }

    }

``````````````````````````````````````````````````````````

您必须使用属性修改您的消费者配置enable.auto.commit设置为 false :

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

然后,您必须修改 Spring Kafka Listener 工厂并将 ack-mode 设置为MANUAL_IMMEDIATE。这是一个例子ConcurrentKafkaListenerContainerFactory :

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(new SeekToCurrentErrorHandler());
    return factory;
}

正如文档中所解释的,MANUAL_IMMEDIATE表示:当侦听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。

你可以找到所有的提交方法here https://docs.spring.io/spring-kafka/reference/html/#committing-offsets.

然后,在侦听器代码中,您可以通过添加Acknowledgment对象,例如:

@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message, Acknowledgment acknowledgment) {
   // commit immediately
    acknowledgment.acknowledge();
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从主题读取后立即异步提交消息 的相关文章

随机推荐

  • Android - 在开发和生产 Web 服务之间切换

    我想让我的应用程序在开发和生产 Web 服务之间切换 而不需要对代码进行太多更改 并且相对简单 现在我的网络服务地址为static final String类中的变量执行实际的 HTTP 调用 并使用 a 来切换应用程序其余部分中的代码st
  • 如何执行作为 sp 参数传递的 sql 文本?

    我有一个带有 nvarchar 参数的存储过程 我希望调用者在使用此 SP 时提供 sql 命令的文本 如何从 SP 内执行提供的 sql 命令 这可能吗 我认为可以使用 EXEC 但以下内容 EXEC script 错误表明无法按给定名称
  • 带有 Base64 图像的 v-card-media

    我正在 ColdFusion 中创建验证码图像 并将其作为 Taffy 的 REST feed 返回 然后在 Vuetify 中显示 ColdFusion 太妃糖代码
  • 如何从Excel中获取工作表名称

    如何从 Excel 获取工作表名称并将其添加到我的组合框列表中 我似乎无法将其添加到我的代码中 因为它是public static public static DataTable ExcelToDataTable string fileNa
  • Consumer.endOffsets 在 Kafka 中如何工作?

    假设我有一个无限期运行的计时器任务 它会迭代 kafka 集群中的所有消费者组 并输出每个组的所有分区的延迟 提交偏移量和结束偏移量 与 Kafka 控制台消费者组脚本的工作方式类似 只不过它适用于所有组 就像是 单个消费者 不工作 不返回
  • 如何在Verilog中将二维数组中的所有位设置为0?

    我构建了一个 8 2bits 数组来表示 Verilog 中的一块内存 reg 1 0 m 0 7 该存储器有一个复位信号 如果复位为1 则该存储器中的所有位都应重置为0 但是我不知道如何以简洁的方式设置m的所有位 因为如果有数百个内存中有
  • 如何防止 WCF 服务进入故障状态?

    我有一个 WCF 服务不应进入故障状态 如果出现异常 则应将其记录下来 并且服务应继续不间断 该服务具有单向操作契约 并且正在从 MSMQ 读取消息 我的问题有两个 该服务似乎正在被吞噬 异常 错误 所以我无法 调试它 我如何获得服务 暴露
  • 从 Google 云端硬盘文件夹中删除旧文件

    我创建了这个脚本来删除 3 个多小时前发布的文件 而且即使最新文件超过 3 小时 也不会被删除 因此该文件夹永远不会为空 我启用了谷歌的高级服务 称为 DRIVE API V2 我激活了一个触发器 每 5 分钟分析一次文件夹 但文件通常不会
  • “此哈希方法尚未发现冲突”是什么意思?

    我的意思是我不需要寻找实际的碰撞 就知道它们的存在 如果没有冲突 那么如何获得固定长度的结果呢 这就是为什么我不明白人们声称 md5 不安全 是什么意思 有人发现了碰撞 或者类似的东西 我唯一能想到的是 碰撞搜索仅查找字典单词 例如 如果
  • 如何知道已保存模型中的输出和输入张量名称

    我知道如何加载已保存的 TensorFlow 模型 但如何知道输入和输出张量名称 我可以使用 tf import graph def 加载 protobuf 文件 然后使用函数 get tensor by name 加载张量 但我如何知道任
  • 如何在 CLion 中运行 SFML,错误未定义引用?

    我是 C 新手 尝试学习游戏编程 我选择 SFML 并在 Jetbrain 的 CLion 上运行并使用 Ubuntu 机器 我按照这个教程SFML 和 Linux http sfml dev org tutorials 2 0 start
  • Primefaces 嵌套对话框/'appendToBody' - 支持 bean 操作未触发

    我在两个嵌套表单中有一个支持 bean 方法 会话范围 该方法不会触发 我用一个展示问题的通用示例提出了这个问题 我希望了解如何 为何使用表单 对话框和appendToBody标签导致了问题 为了澄清 该行动contentsOfDialog
  • 如何阻止 Eclipse 每次重新启动时累积 Tomcat 实例?

    我在 Eclipse 2019 3 中运行 Tomcat 8 5 每次我通过单击绿色圆圈 三角形按钮重新启动 Tomcat 实例时 最终都会运行重复的实例 我怎样才能阻止这种情况发生 这是我已经尝试过的一些事情 我尝试升级到tomcat 9
  • PostgreSQL 9.3:将一列拆分为多列

    我想拆分一列colb在下面给出的示例中分为两列 喜欢column1 and column2 我有一个包含两列的表 Example create table t3 cola varchar colb varchar 插入 insert int
  • 在 Angular 中动态设置样式

    我有以下标记 tr style background color none tr 正如它所说 如果activity status字段待定 然后将背景颜色设置为红色 否则设置为绿色 但这不起作用 检查后我发现它呈现如下 tr style ba
  • C# - 编写 COM 服务器 - 映射到方法的属性

    我们正在尝试替换最初为 VB6 应用程序编写的 COM 服务器 我们无法访问源代码 由于某种原因 VB6 应用程序可以调用我们的构造函数 但随后它会得到 系统错误 H80004002 没有这样的 接口支持 我假设当它尝试使用 QueryIn
  • Magento - 致命错误:类名必须是有效的对象或字符串

    我在安装 Magento 时遇到问题 希望有人能帮助我解决 当我访问该网站时 我突然开始收到以下错误消息 Fatal error Class name must be a valid object or a string in app co
  • 科学记数法中的小“e”/Matlab中的Double是什么

    当我计算一个非常小的数字时 matlab给出 1 12345e 15这是什么 我可以将其解释为 1 12345 10 15 或其 1 12345 e 15 我很着急 抱歉问了这个愚蠢的问题 e 代表指数 它的科学计数法 http en wi
  • Crontab 格式化 - 每 15 分钟一次

    我试图让一个简单的 crontab 作业每 15 分钟运行一次 但在决定如何格式化计时时遇到困难 我一直在写的内容如下 15 我很确定这只运行每小时的前 15 分钟 我认为 crontab 允许用户指定确切的运行时间 即 0 15 30 4
  • 从主题读取后立即异步提交消息

    我正在尝试在阅读主题后立即提交一条消息 我已点击此链接 https www confluence io blog apache kafka spring boot application https www confluent io blo