如何监听来自 Kafka 的正确 ACK 消息

2023-12-01

我正在做一个POC使用 Spring Boot 和 Kafka 进行事务性项目,我有以下疑问:

设想:一个微服务MSPUB1接收来自客户的请求。该请求发布有关主题的消息TRANSACTION_TOPIC1在 Kafka 上,但微服务可以并行接收多个请求。微服务监听主题TRANSACTION_RESULT1检查交易是否完成。

在流媒体平台的另一边,另一个微服务MSSUB1正在听主题TRANSACTION_TOPIC1并处理所有消息并将结果发布到:TRANSACTION_RESULT1

最好的方法是什么MSPUB1了解消息是否与主题相关TRANSACTION_RESULT1符合他原来的要求吗?微服务MSPUB1可以有一个在初始主题上发布的任何消息的 IDTRANSACTION_TOPIC1并被移至TRANSACTION_RESULT1

问题:当你在读分区时,你移动了指针,但是在多个请求的并发环境下,如何检查消息是否在主题上TRANSACTION_RESULT1是预期的吗?

提前谢谢了

胡安·安东尼奥


一种方法是使用弹簧集成BarrierMessageHandler.

这是一个示例应用程序。希望这是不言自明的。需要 Kafka 0.11 或更高版本...

@SpringBootApplication
@RestController
public class So48349993Application {

    private static final Logger logger = LoggerFactory.getLogger(So48349993Application.class);

    private static final String TRANSACTION_TOPIC1 = "TRANSACTION_TOPIC1";

    private static final String TRANSACTION_RESULT1 = "TRANSACTION_RESULT1";

    public static void main(String[] args) {
        SpringApplication.run(So48349993Application.class, args);
    }

    private final Exchanger exchanger;

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public So48349993Application(Exchanger exchanger,
            KafkaTemplate<String, String> kafkaTemplate) {
        this.exchanger = exchanger;
        this.kafkaTemplate = kafkaTemplate;
        kafkaTemplate.setDefaultTopic(TRANSACTION_RESULT1);
    }

    @RequestMapping(path = "/foo/{id}/{other}", method = RequestMethod.GET)
    @ResponseBody
    public String foo(@PathVariable String id, @PathVariable String other) {
        logger.info("Controller received: " + other);
        String reply = this.exchanger.exchange(id, other);
        // if reply is null, we timed out
        logger.info("Controller replying: " + reply);
        return reply;
    }

    // Client side

    @MessagingGateway(defaultRequestChannel = "outbound", defaultReplyTimeout = "10000")
    public interface Exchanger {

        @Gateway
        String exchange(@Header(IntegrationMessageHeaderAccessor.CORRELATION_ID) String id,
                @Payload String out);

    }

    @Bean
    public IntegrationFlow router() {
        return IntegrationFlows.from("outbound")
                .routeToRecipients(r -> r
                        .recipient("toKafka")
                        .recipient("barrierChannel"))
                .get();
    }

    @Bean
    public IntegrationFlow outFlow(KafkaTemplate<String, String> kafkaTemplate) {
        return IntegrationFlows.from("toKafka")
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(TRANSACTION_TOPIC1))
                .get();
    }

    @Bean
    public IntegrationFlow barrierFlow(BarrierMessageHandler barrier) {
        return IntegrationFlows.from("barrierChannel")
                .handle(barrier)
                .transform("payload.get(1)") // payload is list with input/reply
                .get();
    }

    @Bean
    public BarrierMessageHandler barrier() {
        return new BarrierMessageHandler(10_000L);
    }

    @KafkaListener(id = "clientReply", topics = TRANSACTION_RESULT1)
    public void result(Message<?> reply) {
        logger.info("Received reply: " + reply.getPayload() + " for id "
                + reply.getHeaders().get(IntegrationMessageHeaderAccessor.CORRELATION_ID));
        barrier().trigger(reply);
    }

    // Server side

    @KafkaListener(id = "server", topics = TRANSACTION_TOPIC1)
    public void service(String in,
            @Header(IntegrationMessageHeaderAccessor.CORRELATION_ID) String id) throws InterruptedException {
        logger.info("Service Received " + in);
        Thread.sleep(5_000);
        logger.info("Service Replying to " + in);
        // with spring-kafka 2.0 (and Boot 2), you can return a String and use @SendTo instead of this.
        this.kafkaTemplate.send(new GenericMessage<>("reply for " + in,
                Collections.singletonMap(IntegrationMessageHeaderAccessor.CORRELATION_ID, id)));
    }

    // Provision topics if needed

    // provided by Boot in 2.0
    @Bean
    public KafkaAdmin admin() {
        Map<String, Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return new KafkaAdmin(config);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic(TRANSACTION_TOPIC1, 10, (short) 1);
    }

    @Bean
    public NewTopic result1() {
        return new NewTopic(TRANSACTION_RESULT1, 10, (short) 1);
    }

}

Result

2018-01-20 17:27:54.668  INFO 98522 --- [   server-1-C-1] com.example.So48349993Application        : Service Received foo
2018-01-20 17:27:55.782  INFO 98522 --- [nio-8080-exec-2] com.example.So48349993Application        : Controller received: bar
2018-01-20 17:27:55.788  INFO 98522 --- [   server-0-C-1] com.example.So48349993Application        : Service Received bar
2018-01-20 17:27:59.673  INFO 98522 --- [   server-1-C-1] com.example.So48349993Application        : Service Replying to foo
2018-01-20 17:27:59.702  INFO 98522 --- [ientReply-1-C-1] com.example.So48349993Application        : Received reply: reply for foo for id 1
2018-01-20 17:27:59.705  INFO 98522 --- [nio-8080-exec-1] com.example.So48349993Application        : Controller replying: reply for foo
2018-01-20 17:28:00.792  INFO 98522 --- [   server-0-C-1] com.example.So48349993Application        : Service Replying to bar
2018-01-20 17:28:00.798  INFO 98522 --- [ientReply-0-C-1] com.example.So48349993Application        : Received reply: reply for bar for id 2
2018-01-20 17:28:00.800  INFO 98522 --- [nio-8080-exec-2] com.example.So48349993Application        : Controller replying: reply for bar

Pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>so48349993</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>so48349993</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>2.3.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.3.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

应用程序属性

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.concurrency=2

EDIT

这是一个在服务器端使用 Spring Integration 的版本,而不是@KafkaListener...

@SpringBootApplication
@RestController
public class So483499931Application {

    private static final Logger logger = LoggerFactory.getLogger(So483499931Application.class);

    private static final String TRANSACTION_TOPIC1 = "TRANSACTION_TOPIC3";

    private static final String TRANSACTION_RESULT1 = "TRANSACTION_RESULT3";

    public static void main(String[] args) {
        SpringApplication.run(So483499931Application.class, args);
    }

    private final Exchanger exchanger;

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public So483499931Application(Exchanger exchanger,
            KafkaTemplate<String, String> kafkaTemplate) {
        this.exchanger = exchanger;
        this.kafkaTemplate = kafkaTemplate;
        kafkaTemplate.setDefaultTopic(TRANSACTION_RESULT1);
    }

    @RequestMapping(path = "/foo/{id}/{other}", method = RequestMethod.GET)
    @ResponseBody
    public String foo(@PathVariable String id, @PathVariable String other) {
        logger.info("Controller received: " + other);
        String reply = this.exchanger.exchange(id, other);
        logger.info("Controller replying: " + reply);
        return reply;
    }

    // Client side

    @MessagingGateway(defaultRequestChannel = "outbound", defaultReplyTimeout = "10000")
    public interface Exchanger {

        @Gateway
        String exchange(@Header(IntegrationMessageHeaderAccessor.CORRELATION_ID) String id,
                @Payload String out);

    }

    @Bean
    public IntegrationFlow router() {
        return IntegrationFlows.from("outbound")
                .routeToRecipients(r -> r
                        .recipient("toKafka")
                        .recipient("barrierChannel"))
                .get();
    }

    @Bean
    public IntegrationFlow outFlow(KafkaTemplate<String, String> kafkaTemplate) {
        return IntegrationFlows.from("toKafka")
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(TRANSACTION_TOPIC1))
                .get();
    }

    @Bean
    public IntegrationFlow barrierFlow(BarrierMessageHandler barrier) {
        return IntegrationFlows.from("barrierChannel")
                .handle(barrier)
                .transform("payload.get(1)") // payload is list with input/reply
                .get();
    }

    @Bean
    public BarrierMessageHandler barrier() {
        return new BarrierMessageHandler(10_000L);
    }

    @KafkaListener(id = "clientReply", topics = TRANSACTION_RESULT1)
    public void result(Message<?> reply) {
        logger.info("Received reply: " + reply.getPayload() + " for id "
                + reply.getHeaders().get(IntegrationMessageHeaderAccessor.CORRELATION_ID));
        barrier().trigger(reply);
    }

    // Server side

    @Bean
    public IntegrationFlow server(ConsumerFactory<String, String> consumerFactory,
            KafkaTemplate<String, String> kafkaTemplate) {
        return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory, TRANSACTION_TOPIC1))
            .handle("so483499931Application", "service")
            .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(TRANSACTION_RESULT1))
            .get();
    }

    public String service(String in) throws InterruptedException {
        logger.info("Service Received " + in);
        Thread.sleep(5_000);
        logger.info("Service Replying to " + in);
        return "reply for " + in;
    }

    // Provision topics if needed

    // provided by Boot in 2.0
    @Bean
    public KafkaAdmin admin() {
        Map<String, Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return new KafkaAdmin(config);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic(TRANSACTION_TOPIC1, 10, (short) 1);
    }

    @Bean
    public NewTopic result1() {
        return new NewTopic(TRANSACTION_RESULT1, 10, (short) 1);
    }

}

and

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.concurrency=2
spring.kafka.consumer.group-id=server
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何监听来自 Kafka 的正确 ACK 消息 的相关文章

  • 尝试使用 Spring boot CLI 加密时出现错误“‘encypt’不是有效命令”

    我正在尝试使用 Spring boot CLI 1 5 7 加密密码 spring encrypt mysecret key ashish 我收到此命令的以下错误 encypt is not a valid command See help
  • 在 Spring Boot 中重新加载/刷新缓存

    我正在使用 Spring Boot 对于缓存 我使用 Ehcache 到目前为止一切正常 但现在我必须重新加载 刷新 那么我该如何执行此操作 以便我的应用程序不会出现任何停机时间 我在Spring Ehcache中尝试了很多方法 但它不起作
  • 在应用程序启动时将实例注册为“单例”bean

    我正在使用 Spring Boot 我正在尝试构建一个实例ServiceImpl时要解决Service是必须的 目前我将实现注释为 Component但这并没有给我机会构建我想要的实例 The ServiceImpl应使用包含磁盘上文件路径
  • Spring boot 404错误自定义错误响应ReST

    我正在使用 Spring boot 来托管 REST API 即使浏览器正在访问 URL 以及自定义数据结构 我也希望始终发送 JSON 响应 而不是使用标准错误响应 我可以使用 ControllerAdvice 和 ExceptionHa
  • Spring-Security + Angular 2 + CORS。无法重定向 Facebook 登录

    我正在构建一个在前端使用 Angular 2 在后端使用 Spring Boot 的应用程序 我正在使用 Spring Security 和 Spring Social 来允许通过 Facebook 登录 我以为我已正确配置 CORS 但收
  • Spring Data Redis 覆盖默认序列化器

    我正在尝试创建一个RedisTemplatebean 将具有更新的值序列化器来序列化对象JSONredis 中的格式 Configuration class RedisConfig Bean name redisTemplate Prima
  • 从 SCDF 执行任务时,数据库凭证作为部分作业参数公开

    我有自定义构建的 SCDF 它在 Openshift 中构建为 docker 映像 并在 server deployment yaml 中称为 docker 映像 我使用 Oracle 数据库来存储任务元数据 并且是此处的外部源 我传递了
  • Spring @ComponentScan 不适用于 @Repository

    我的存储库与配置类位于不同的包中 因此我使用 Repostiory 将其注释为以下内容 package test Repository public interface UserTest extends JpaRepository
  • Spring引导@Configurable

    我正在尝试配置Autowired在 Spring Boot 应用程序下的非 Spring 托管类中 我在部署在 tomcat 服务器下的 Web 应用程序下成功运行了这个 但是当我想在 Spring Boot 下运行它时 没有任何效果 我制
  • Spring Kafka - 如何使用 @KafkaListener 重试

    来自推特的问题 只是想找到一个使用 spring kafka 2 1 7 的简单示例 该示例与 KafkaListener 和 AckMode MANUAL IMMEDIATE 一起使用 以重试上次失败的消息 https twitter c
  • Spring Boot 多部分文件始终为 null

    我正在使用 Spring Boot version 1 4 0 RC1 和 Spring Boot Stormpath 1 0 2 我正在尝试使用分段文件上传 但控制器中的 MultipartFile 始终为空 当我使用 RequestPa
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或
  • Jackson 的 ObjectMapper 和 SQL 中的 RowMapper

    我们正在使用对象映射器 当将 ObjectMapper 与 RowMapper 一起使用时 是否应该在每个 mapRow 内部 如下所示 声明它 还是在 mapRow 外部声明为类公共成员 我认为根据本文 它应该作为公共类成员在外部 我应该
  • Springboot:防止Resttemplate对%进行双重编码

    我们的代码使用Asyncresttemplate如下 String uri http api host com version test address 23 language en US format json getAysncRestT
  • Spring Kafka - 为任何主题的分区消耗最后 N 条消息

    我正在尝试读取请求的卡夫卡消息数 对于非事务性消息 我们将从 endoffset N 对于 M 个分区 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息 对于幂等 事务消息 我们必须考虑事务标记 重复消息 这意味着偏移量将不连续 在这
  • Kafka不启动空白输出

    我正在努力安装 Kafka 和 Zookeeper 我已经运行了 Zookeeper 并且它当前正在运行 我将所有内容设置为 https dzone com articles running apache kafka on windows
  • POST 请求“访问此资源需要完全身份验证”

    是否有人在尝试使用 POST 请求 oauth token 进行身份验证时遇到错误 访问此资源需要完全身份验证 卷曲命令 curl localhost 85 oauth token d grant type password d clien
  • 为什么我无法从外部连接到 Kafka?

    我在 ec2 实例上运行 kafka 所以amazon ec2实例有两个ip 一个是内部ip 第二个是外部使用的 我从本地计算机创建了生产者 但它重定向到内部 IP 并给我连接不成功的错误 任何人都可以帮助我在 ec2 实例上配置 kafk
  • 如何配置Spring boot分页从第1页开始,而不是从0开始

    boot 1 4 0 可分页 用于分页 它工作正常 没有任何问题 但默认情况下 页面值从 0 开始 但在前端 页面值从 1 开始 那么是否有任何标准方法来增加值而不是手动增加代码内的页码 public Page
  • HTTPS 请求仅在 iOS、Ionic 2 上失败

    我有一个Ionic 2调用一个应用程序Spring Boot用于向其他设备发送推送通知的 API API 配置为 HTTPS The API POST请求适用于一切except iOS 我在服务器上的 SSL 证书是自签名的 也许就是这样

随机推荐

  • 如何停止Spring的默认输出?

    我是春天的新手 当我运行 Spring 批处理应用程序时 我期望只看到 Hello World 但相反 我得到以下附加详细信息 May 03 2012 12 28 42 PM org springframework context supp
  • 如何将关闭按钮添加到 UIModalPresentationFormSheet 中呈现的模态视图角?

    我想在某个角落添加一个浮动关闭 x 按钮UIModalPresentationPageSheet看法 效果如下 但是将其添加到父视图会使其显示在页面表后面 并且也无法点击 并将其添加到页面表将使其部分隐藏 因为它超出了视图区域 还有更好的解
  • Javascript:如何找到第一个重复值并返回其索引?

    我必须在数组中找到第一个重复值 然后在变量firstIndex 中返回其索引 这必须通过 for 循环来完成 for 循环应该在找到第一个重复项后停止 我知道这可能很简单 但我被困住了 到目前为止我已经有了这个 但它似乎不起作用 var n
  • SSL密钥交换加密技术[关闭]

    Closed 这个问题是无关 目前不接受答案 匿名 DH 临时 DH 和固定 DH 三种密钥交换算法有何区别 From 思科的文档 固定迪菲 赫尔曼 这是 Diffie Hellman 密钥交换 其中 服务器的证书包含 Diffie Hel
  • 从 UILabel Swift 中获取 Int

    我遇到的问题是 有大量以数字作为标签的按钮 所以我想我可以将标签视为整数 而不是为每个按钮创建一个操作 IBAction func NumberInput sender UIButton var input Int sender title
  • 使用 Foundation 6 的中间人

    我想将 Middleman 与出色的 Foundation 6 一起使用 Here是我在 Middleman 网站上找到的一个存储库 我安装了 Middleman 以及 Xcode 开发人员工具 当我跑步时middleman init T
  • 如何在android中的服务中调用振动器

    我正在尝试运行vibrator如果从我的应用程序调用服务 我将从Fragment但我不知道为什么振动器在服务内不起作用 我什至无法打印Toast 我的代码 从片段调用 Intent buzz new Intent getActivity L
  • Entity Framework Core 中动态更改架构

    UPD here这是我解决问题的方法 尽管它可能不是最好的 但它对我有用 我在使用 EF Core 时遇到问题 我想通过模式机制在项目数据库中分离不同公司的数据 我的问题是如何在运行时更改架构名称 我找到了类似的问题关于这个问题 但它仍然没
  • Google 图表:自定义刻度

    我想在 Google 图表的水平轴上设置自己的刻度 我包括了 hAxis ticks 根据文档 在我下面的尝试中 但它拒绝工作 因为刻度仍然不是整数 即使我指定它们是整数 我究竟做错了什么
  • 是否可以在未计算的上下文中从 STD 形成指向不可寻址函数的指针?

    如中所述命名空间 std 6 让 F 表示标准库函数 除非 F 被指定为可寻址函数 否则如果 C 程序显式或隐式尝试形成指向 F 的指针 则其行为是未指定的 可能是格式错误的 这对于以下程序意味着 include
  • 字符 Å Ö 没有显示在我的 DDL 中,我如何告诉 Restclient 使用特定的字符集?

    在我开始之前 这里是一个问题 应该是这样的 Bj rn Nilsson 相反 它显示奇怪的特殊字符 所有包含字符 和 的值都变成这样 我用 XML 格式的 API 中的值填充我的 DDL 其中包含所有值 并且我们还使用 Linq2Rest
  • 如何显示带有动态创建的姓名首字母的头像图标

    我有一个要求 通过传递名称 它应该返回头像 图标 其中包含该名称中包含的单词的第一个字母 例如 如果我通过 John Abraham 它应该返回一个带有 JA 的图标 我需要在 SAPUI5 控件中使用该图标 我对此没有任何想法 如何实施
  • Java 一行 if 不适用于打印[重复]

    这个问题在这里已经有答案了 如果你写这样的东西 boolean condition String out condition true false System out println out 有用 但如果你写 condition Syst
  • 正则表达式将所有字符都放在第一个空格的右侧?

    我正在尝试创建一个正则表达式 该表达式将匹配字符串中第一个空格之后 但不包括 的所有字符 输入文本 foo bar bacon 期望的比赛 bar bacon 到目前为止我发现的最接近的是 s 然而 这与 bar bacon 之外的第一个空
  • Eclipse 插件更新错误日志在哪里?

    当我尝试更新一个 Eclipse 插件时 它显示以下错误 但我在 Eclipse 文件夹下没有找到任何内容 日志路径在哪里 an error has occurred see the error log for details Thx 该日
  • NSURLCredentialStorage 和客户端证书认证

    从我在 MPMoviewPlayerController 的文档中看到的 NSURLCredentialStorage 可以设置为 NSURLConnection 身份验证挑战的替代方案 这对于从 URL 加载资源但抽象 NSURLConn
  • iOS:如何获取两个坐标之间的路线路径

    在我的项目中 我必须借助纬度和经度找出两个位置之间的路线路径 我正在使用以下代码 void viewDidLoad super viewDidLoad Do any additional setup after loading the vi
  • .net MVC:如何仅向授权用户提供静态文件

    我的网站中有静态文件 例如 http myurl com Content a html http myurl com Content b html etc 我想在访问时验证用户的身份验证 身份验证应通过数据库数据进行检查 我的框架 ASP
  • Swift 多关卡场景

    我正在尝试开发一个新的游戏项目 其中将包含多个级别 我正在读这个问题 Sprite Kit 定义多个场景的变量 关于使用尽可能少的重复代码来完成多个场景的最佳方法 答案当然是子类化 假设我创建了 baseScene 它是 SKScene 的
  • 如何监听来自 Kafka 的正确 ACK 消息

    我正在做一个POC使用 Spring Boot 和 Kafka 进行事务性项目 我有以下疑问 设想 一个微服务MSPUB1接收来自客户的请求 该请求发布有关主题的消息TRANSACTION TOPIC1在 Kafka 上 但微服务可以并行接