重放上一条成功消息中的 Kafka 主题

2023-12-29

使用 Spring Cloud Stream 上通道的标准配置,消息会重试 3 次,然后被跳过。如果以下消息处理成功完成,则提交偏移量。 这意味着在短暂的异常情况下消息可能会丢失。

是否可以更改此行为,以便通道卡在失败消息上,直到瞬态条件得到修复?

我尝试过配置重试模板,但是您必须指定重试次数,这看起来像是一个无用的参数,因为所需的行为是永远重试。

有人陷入过这些烦恼吗?谢谢。

我也有点怀疑这会如何干扰max.poll.interval.ms财产。


在活页夹中禁用重试并使用ListenerContainerCustomizer添加一个SeekToCurrentErrorHandler无限重试...

@SpringBootApplication
public class So63193500Application {

    public static void main(String[] args) {
        SpringApplication.run(So63193500Application.class, args);
    }

    @Bean
    Consumer<String> input() {
        return str -> {
            System.out.println(str);
            throw new RuntimeException("test");
        };
    }

    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
        return (container, dest, group) -> {
            if (group.equals("so63193500")) {
                container.setErrorHandler(new SeekToCurrentErrorHandler(
                        new FixedBackOff(5_000, FixedBackOff.UNLIMITED_ATTEMPTS)));
            }
        };
    }

}
spring.cloud.stream.bindings.input-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.input-in-0.group=so63193500

这会导致查找,并且只要退避间隔不太长,就不会影响轮询间隔。

您还可以使用ExponentialBackOff.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

重放上一条成功消息中的 Kafka 主题 的相关文章

随机推荐

  • 为什么创建空数组时要添加()

    我们可以像这样创建一个空数组 var newArray1 Int 我的意思是 当我们创建一个新数组时 在这里意味着什么 为什么我们不通过编写以下内容来创建一个新的空数组 var newArray2 Int 我知道它的语法 但是有人可以解释一
  • 返回 const std::string& 的方法应该返回 const std::string_view 吗?

    假设我们在一个类中有一个简单的 getter 方法 它返回一个const参考一个std string member const std string getString const noexcept return someString 随着
  • 访问受密码保护的 MS Access 文件

    我已通过 文件 gt 信息 下的加密选项对 Access 文件进行了加密 在 Excel 中 当我尝试从 Access 文件检索表时 它会弹出一个密码框 但不接受我的密码 我可以毫无问题地从 Access 文件链接到 Access 文件 有
  • 内置帮助程序将 User.Identity.Name 解析为 Domain\Username

    是否有任何内置实用程序或帮助程序可以解析HttpContext Current User Identity Name e g domain user单独获取域名 如果存在 和用户 或者还有其他类可以这样做吗 我知道打电话很容易String
  • Vue:处理多个 API 调用的最佳实践

    我发现自己在 vuex 操作中进行了多个 API 调用 这让我想知道处理这种情况的最佳方法是什么 多个 API 调用的最佳实践 让我们从我的代码开始 我有一个操作 从不同的 API 端点 后端为 Laravel 收集所有帖子和所有帖子类别
  • Spring MVC 5 中的默认消息转换器

    我试图理解为什么我的spring v 5 0 4 RELEASE无法正确加载默认消息转换器 我从 servlet xml 中删除了所有声明 我希望找到正确加载的所有默认转换器AbstractMessageConverterMethodPro
  • GORM 中有“not in”的对应词吗?

    这可以在 createCriteria 中转换吗 SELECT FROM node WHERE node type act AND nid NOT IN SELECT nid FROM snbr act community LIMIT 10
  • 将自定义 SQL 函数(类似于 date_trunc)注释为 Django ORM 查询集

    我在用timescaledb这基本上只是一个扩展postgres 它带有一个名为的 SQL 函数time bucket https docs timescale com latest api api timescaledb time buc
  • 系统通知 Phonegap (Android)

    我最近刚刚开始做一些 Android 应用程序开发 基于 Phonegap 由于我基于 Web 的背景 PHP MySQL 和 jQuery mobile 第一次测试很有希望 但下一个级别对我来说有点太多了 我正在尝试集成这个插件 http
  • MemFree 和 MemAvailable 之间的区别

    使用 Ubuntu 14 02 并运行命令 cat proc meminfo 我得到以下信息 MemTotal 1007796 kB MemFree 64248 kB MemAvailable 64876 kB 我想知道 MemFree 和
  • 在实体框架中播种多对多数据

    我首先使用代码 并且书名和类别之间存在多对多关系 在开发过程中播种数据的最佳方法是什么 如果我在同一类别中添加两本书 种子逻辑会将该类别两次添加到类别表中 我可以将类别单独添加到类别表中 但是如何指定图书关键字集合中的现有类别记录 我相信这
  • 如何使用 React-native-element 复选框和 FlatList React Native 处理从 json 获取的复选框?

    我正在尝试创建动态复选框 其名称从 json 获取 这个问题 https github com react native training react native elements issues 603看起来与我需要的相同 但如果没有代码
  • 使用@IdClass存储具有复合主键的实体,但无法持久化

    我的 id 类如下 public class EmployeeId implements Serializable public EmployeeId public EmployeeId Integer id String country
  • 未知的 $rootElementProvider:Qunit + angularjs 集成 [已关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 当我尝试在单元测试 qunit 中注入 location 服务时 出现错误 未知 rootElementProvider
  • 如何获取复制文件进度

    我有代码 但不知道如何找到复制文件进度 我应该怎么做才能接收文件复制进度 public sealed class FileRoutines public static void CopyFile FileInfo source FileIn
  • 用户界面中的文本大写

    例如 我想问您是否有理由将菜单中的所有项目等大写 文件 gt 页面设置 编辑 gt 全选 帮助 gt 技术支持 为什么我不应该将这些项目标记为 文件 gt 页面设置 等 这种大写对我来说似乎是错误的 但我不是以英语为母语的人 所以我可能不会
  • 为什么 python 解码会替换编码字符串中的无效字节?

    尝试解码无效编码的 utf 8 html 页面会产生不同的结果 蟒蛇 火狐和铬 测试页面中的无效编码片段看起来像 PREFIX xe3 xabSUFFIX gt gt gt fragment PREFIX xe3 xabSUFFIX gt
  • Svelte 3、async onMount 还是有效的替代方案?

    我需要的是使用async await在斯韦尔特onMount 或者也许你可以建议我哪里出了问题以及我可以使用什么替代方案 重现 到这里 https svelte dev repl 000ae69c0fe14d9483678d4ace8747
  • GPS android 卡尔曼滤波器 [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 为了从 GPS 获得更准确的数据 建议使用卡尔曼滤波器 但我找不到任何关于如何为 GPS android 实现卡尔曼滤波器的教程 GPS
  • 重放上一条成功消息中的 Kafka 主题

    使用 Spring Cloud Stream 上通道的标准配置 消息会重试 3 次 然后被跳过 如果以下消息处理成功完成 则提交偏移量 这意味着在短暂的异常情况下消息可能会丢失 是否可以更改此行为 以便通道卡在失败消息上 直到瞬态条件得到修