带有断路器的 Kafka Consumer,使用 Resilience4j 重试模式

2023-12-31

我需要一些帮助来了解如何使用 Spring boot、Kafka、Resilence4J 提出解决方案,以实现来自 Kafka Consumer 的微服务调用。假设如果微服务关闭,那么我需要使用断路器模式通知我的 Kafka 消费者停止获取消息/事件,直到微服务启动并运行。


借助 Spring Kafka,您可以使用pause and resume方法取决于 CircuitBreaker 状态转换。我发现最好的方法是将其定义为带有 @Configuration 注释的“supervisor”。还使用了 Resilience4j。

@Configuration
public class CircuitBreakerConsumerConfiguration {

public CircuitBreakerConsumerConfiguration(CircuitBreakerRegistry circuitBreakerRegistry, KafkaManager kafkaManager) {
    circuitBreakerRegistry.circuitBreaker("yourCBName").getEventPublisher().onStateTransition(event -> {
  
        switch (event.getStateTransition()) {
            case CLOSED_TO_OPEN:
            case CLOSED_TO_FORCED_OPEN:
            case HALF_OPEN_TO_OPEN:
                kafkaManager.pause();
                break;
            case OPEN_TO_HALF_OPEN:
            case HALF_OPEN_TO_CLOSED:
            case FORCED_OPEN_TO_CLOSED:
            case FORCED_OPEN_TO_HALF_OPEN:
                kafkaManager.resume();
                break;
            default:
                throw new IllegalStateException("Unknown transition state: " + event.getStateTransition());
        }
    });
   }
}

这是我与带有注释的 KafkaManager 结合使用的@Component.

@Component
public class KafkaManager {
  private final KafkaListenerEndpointRegistry registry;

  public KafkaManager(KafkaListenerEndpointRegistry registry) {
    this.registry = registry;
  }
  public void pause() {   
    registry.getListenerContainers().forEach(MessageListenerContainer::pause);
  }

  public void resume() {
    registry.getListenerContainers().forEach(MessageListenerContainer::resume);
  }
}

此外,我的消费者服务如下所示:

  @KafkaListener(topics = "#{'${topic.name}'}", concurrency = "1", id = "CBListener")
public void receive(final ConsumerRecord<String, ReplayData> replayData, Acknowledgment acknowledgment) throws
        Exception {

    try {
        httpClientServiceCB.receiveHandleCircuitBreaker(replayData);
        acknowledgement.acknowledge();
    } catch (Exception e) {
        acknowledgment.nack(1000);
    }
}

And the @CircuitBreaker注解:

@CircuitBreaker(name = "yourCBName")
public void receiveHandleCircuitBreaker(ConsumerRecord<String, ReplayData> replayData) throws
        Exception {
    try {
        String response = restTemplate.getForObject("http://localhost:8081/item", String.class);
    } catch (Exception e                                                                       ) {
       
        // throwing the exception is needed to trigger the Circuit Breaker state change
        throw new Exception();
    }
}

并且还补充了以下内容application.properties

  resilience4j.circuitbreaker.instances.yourCBName.failure-rate-threshold=80
  resilience4j.circuitbreaker.instances.yourCBName.sliding-window-type=COUNT_BASED
  resilience4j.circuitbreaker.instances.yourCBName.sliding-window-size=5
  resilience4j.circuitbreaker.instances.yourCBName.wait-duration-in-open-state=10000
  resilience4j.circuitbreaker.instances.yourCBName.automatic-transition-from-open-to-half-open-enabled=true
  spring.kafka.consumer.enable.auto.commit = false
  spring.kafka.listener.ack-mode = MANUAL_IMMEDIATE

还可以看看https://resilience4j.readme.io/docs/Circuitbreaker https://resilience4j.readme.io/docs/circuitbreaker

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

带有断路器的 Kafka Consumer,使用 Resilience4j 重试模式 的相关文章

  • @SubscribeMapping 与 @MessageMapping

    当在 Spring Boot 中使用 websockets 时 我见过使用以下示例 Configuration EnableWebSocketMessageBroker public class WebSocketConfig extend
  • 从 SCDF 执行任务时,数据库凭证作为部分作业参数公开

    我有自定义构建的 SCDF 它在 Openshift 中构建为 docker 映像 并在 server deployment yaml 中称为 docker 映像 我使用 Oracle 数据库来存储任务元数据 并且是此处的外部源 我传递了
  • 在 Spring Boot 应用程序中启用 Spring 框架的日志记录

    我已经使用 spring boot 创建了简单的网络应用程序 我想为 springframework 包启用调试日志 我知道如何在普通 spring mvc 项目中启用日志记录 我在这里尝试了相同的操作 但它不起作用 有人可以帮我吗 我的
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 实体创建无用的 id 字段

    我有一个CrudRepository与两个实体 Problem 特征实体总是创建一个附加的id数据库中的字段但未选择正确的characteristic id要生成的字段JSON machine entity machine id name
  • Spring Boot 多部分文件始终为 null

    我正在使用 Spring Boot version 1 4 0 RC1 和 Spring Boot Stormpath 1 0 2 我正在尝试使用分段文件上传 但控制器中的 MultipartFile 始终为空 当我使用 RequestPa
  • 如何在不同的班级中启动和停止计时器?

    我想测量从传入 HTTP 请求开始到应用程序到达某个点的时间 这两个时间点都位于不同的类中 我将如何启动和停止这些不同类别的计时器 我没有看到使用 MeterRegistry 中的 命名 计时器的方法 我该怎么办呢 您可以使用 AOP 如下
  • 在测试中替换 OAuth2 WebClient

    我有一个小型 Spring Boot 2 2 批次 用于写入 OAuth2 REST API 我已经能够配置WebClient下列的https medium com asce4s oauth2 with spring webclient 7
  • Springboot:防止Resttemplate对%进行双重编码

    我们的代码使用Asyncresttemplate如下 String uri http api host com version test address 23 language en US format json getAysncRestT
  • Spring 框架 application.properties 与 logback.xml

    我正在使用 Spring 和 Spring boot 最近 在尝试使用 EhCache 时 我尝试为 EhCache 启用日志记录 在 application properties 中设置日志级别 logging level org spr
  • POST 请求“访问此资源需要完全身份验证”

    是否有人在尝试使用 POST 请求 oauth token 进行身份验证时遇到错误 访问此资源需要完全身份验证 卷曲命令 curl localhost 85 oauth token d grant type password d clien
  • Spring Boot MSSQL Kerberos 身份验证

    目前在我的春季靴子中application properties文件中 我指定以下行来连接到 MSSql 服务器 spring datasource url jdbc sqlserver localhost databaseName spr
  • 在spring data jpa中实现动态数据源

    我有 N 个服务器 N 个数据库和 N 个配置 看下面的场景 因此 对于每个请求 我都需要根据配置访问服务器和数据库 spring data jpa如何实现动态数据源 你可以试试抽象路由数据源 https docs spring io sp
  • 如何防止嵌入式netty服务器使用spring-boot-starter-webflux启动?

    我想使用 Springs 新的反应式在客户端和服务器应用程序之间建立通信webflux扩大 对于依赖管理我使用gradle 我在服务器和客户端上的 build gradle 文件基本上是 buildscript repositories m
  • 如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?

    我想使用 Python PySpark 从 Kafka 源到 MariaDB 进行 Spark 结构化流处理 Spark 2 4 x 我想使用流式 Spark 数据帧 而不是静态数据帧或 Pandas 数据帧 看来必须要用foreach o
  • Spring Boot 中的 Spring Security 配置

    我正在努力转换Spring 3项目到Spring 4 Spring 启动 我还不知道这样做是否正确 我转换Spring 安全 XML配置到一个基于Java的配置如下 Configuration EnableWebSecurity publi
  • 嵌入式 Kafka 测试随机失败

    我使用 EmbededKafka 实现了一系列集成测试 以测试使用 spring kafka 框架运行的一个 Kafka 流应用程序 流应用程序正在从 Kafka 主题读取消息 将其存储到内部状态存储中 进行一些转换并将其发送到另一个微服务
  • Spring批量写入器限制

    我正在工作 Spring Batch 项目 从数据库读取记录然后写入rabbitmq 然后发送到HTTP消息网关 网关有150TPS我需要将我的应用程序限制为 150TPS 有没有办法带弹簧批的油门或者还有其他更好的方法吗 你能行的 在 S
  • Spring Boot - 如何在开发过程中禁用@Cacheable?

    我正在寻找两件事 如何在开发过程中使用 Spring boot dev 配置文件禁用所有缓存 application properties 中似乎没有通用设置可以将其全部关闭 最简单的方法是什么 如何禁用特定方法的缓存 我尝试像这样使用 S

随机推荐

  • 上下文相关的标记化是否需要词汇语法中的多个目标符号?

    根据ECMAScript 规范 https tc39 es ecma262 sec ecmascript language lexical grammar 词法输入的识别有几种情况 元素对句法语法上下文敏感 即 消耗输入元素 这需要多个目标
  • 如何解析包含 javascript 代码的 html

    如何解析大量使用 javascript 的 html 文档 我知道python中有一些库可以解析静态xml html文件 我基本上正在寻找一个程序或库 甚至是firefox插件 来读取html javascript 执行javascript
  • 添加文本框值并使用 javascript 显示它

    我正在尝试使用 javascript 添加几个文本框的输入值并在下面显示总数 如何添加并保留计算后显示的总和 我不是 JavaScript 专家 下面是一个向您展示如何执行此操作的示例
  • Angular js 对本地化的支持 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我尝试在 AngularJS 中查找支持多种语言的文档 但没有成功 支持本地化吗 看看角度翻译 htt
  • 如何在python中使用scrapy获取直接父节点?

    我是新来的scrapy 我想从网络上抓取一些数据 我得到了如下所示的html文档 dom style1 div class user info p class user name something in p tag p text data
  • 实体框架:多对多关系中的重复记录

    我有以下实体框架代码第一代码 创建表并插入数据 但是 Club 表中有重复的记录 我的操作是 使用俱乐部创建应用程序创建俱乐部 使用人员应用程序创建人员 如何避免重复录入 static void Main string args Datab
  • Linq Select 语句 - 不在的地方

    我正在尝试编写相当于以下内容的 LINQ 语句 select e EmployeeID EmployeeName e FirstName e LastName from Employees e where e EmployeeID not
  • 对 `search_as_you_type` ngram 子字段感到困惑

    我正在尝试将 键入时搜索 功能添加到 Elasticsearch 中名为email address 我的理解从文档 https www elastic co guide en elasticsearch reference 7 7 sear
  • 从本地到 Heroku 服务器的 SCP 文件

    我想将 config yml 文件从本地 django 应用程序目录复制到我的 heroku 服务器 但我不知道如何获取 电子邮件受保护 cdn cgi l email protectionHeroku 的格式 我尝试过运行 heroku
  • Android Room 按别名排序

    我想根据我创建的自定义别名来订购数据集 我尝试过 但它会导致语法错误 我究竟做错了什么 Code Query SELECT a b as ratio FROM dataset where my status myStatus order b
  • WKWebview注入cookie头导致重定向循环

    我试图将我单独获取的会话cookie注入到WKWebview请求中 结果证明这是相当痛苦的 我设法使用注入会话cookie这个解决方案 https stackoverflow com questions 26573137 can i set
  • PCM -> AAC(编码器) -> PCM(解码器)实时且正确优化

    我正在尝试实施 AudioRecord MIC gt PCM gt AAC Encoder AAC gt PCM Decode gt AudioTrack SPEAKER with MediaCodec在 Android 4 1 API16
  • 如何在MySQL中进行批量插入

    我有 1 多条记录需要输入到表中 在查询中执行此操作的最佳方法是什么 我应该创建一个循环并每次迭代插入一条记录吗 或者 还有更好的方法 来自MySQL手册 http dev mysql com doc refman 5 7 en inser
  • Azure 管理 REST API - “身份验证失败。‘授权’标头以无效格式提供。”

    我拼命尝试将 2 个经典存储帐户从旧的 MSDN 订阅移动到 MPN 订阅 但我一直遇到困难 因为仅通过 REST API 支持这些帐户的移动 我已按照此处的说明启用了 API https azure microsoft com en us
  • Eclipse 是否有排列类文件的功能?

    Eclipse 有很多功能 我想知道这个功能是否存在 或者是否存在任何捷径 我想将我的类数据排列到该流程中的变量 构造函数 方法中 从上到下 进一步细化我想按访问级别 pub private protected 和类型 void 或返回的方
  • 使用 GSON 获取 JSON 键名

    我有一个 JSON 数组 其中包含如下对象 bjones fname Betty lname Jones password ababab level manager 我的 User 类有一个用户名 需要使用 JSON 对象的密钥 我如何获取
  • 添加不属于模型一部分的自定义表单字段 (Django)

    我在管理网站上注册了一个模型 它的字段之一是长字符串表达式 我想将自定义表单字段添加到管理员中此模型的添加 更新页面 根据这些字段的值 我将构建长字符串表达式并将其保存在相关的模型字段中 我怎样才能做到这一点 我正在从符号构建数学或字符串表
  • 在elasticbeanstalk中设置NODE_ENV变量

    我创建了一个名为 elasticbeanstalk environment config其中包含以下内容 option settings option name NODE ENV value development 我还将 process
  • 具有多个可选参数的 Spring Data MongoDB AND/OR 查询

    我正在尝试执行具有两个以上可选参数的查询 但没有得到任何结果 对于2个参数我遵循了这个问题的答案spring data mongo 可选查询参数 https stackoverflow com questions 11613464 spri
  • 带有断路器的 Kafka Consumer,使用 Resilience4j 重试模式

    我需要一些帮助来了解如何使用 Spring boot Kafka Resilence4J 提出解决方案 以实现来自 Kafka Consumer 的微服务调用 假设如果微服务关闭 那么我需要使用断路器模式通知我的 Kafka 消费者停止获取