如何从 Http 集成流程创建 Spring Reactor Flux?

2023-11-29

我有一个与此非常相似的问题如何从 ActiveMQ 队列创建 Spring Reactor Flux?

唯一的区别是消息来自 Http 端点而不是 JMS 队列。问题是消息通道由于某种原因没有被填充,或者它没有被 Flux.from() 拾取。日志条目显示 GenericMessage 是从 Http 集成流创建的,并以有效负载作为路径变量,但未排队/发布到通道?我试过.channel(MessageChannels.queue()) and .channel(MessageChannels.publishSubscribe())没有任何区别,事件流是空的。这是代码:

@Bean
public Publisher<Message<String>> httpReactiveSource() {
        return IntegrationFlows.
                from(Http.inboundChannelAdapter("/eventmessage/{id}")
                        .requestMapping(r -> r
                        .methods(HttpMethod.POST)                                                                                   
                        )
                        .payloadExpression("#pathVariables.id")
                        )                           
                        .channel(MessageChannels.queue())
                        .log(LoggingHandler.Level.DEBUG)                            
                        .log()  
                        .toReactivePublisher();
    }


@GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages(@PathVariable String id){     
    return Flux.from(httpReactiveSource())              
            .map(Message::getPayload);

}

UPDATE1:

构建.gradle

buildscript {
    ext {
        springBootVersion = '2.0.0.M2'
    }
    repositories {
        mavenCentral()
        maven { url "https://repo.spring.io/snapshot" }
        maven { url "https://repo.spring.io/milestone" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }
}


dependencies {
    compile('org.springframework.boot:spring-boot-starter-freemarker')
    compile('org.springframework.boot:spring-boot-starter-integration') 
    compile('org.springframework.boot:spring-boot-starter-web')
    compile('org.springframework.boot:spring-boot-starter-webflux') 
    compile('org.springframework.integration:spring-integration-http')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('io.projectreactor:reactor-test')

}

UPDATE2

它起作用时@SpringBootApplication and @RestController在一个文件中定义,但在以下情况下停止工作@SpringBootApplication and @RestController位于单独的文件中。

测试应用程序.java

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class TestApp {
     public static void main(String[] args) {
            SpringApplication.run(TestApp.class, args);
    }
}

测试控制器.java

package com.example.controller;


import org.springframework.context.annotation.Bean;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.http.dsl.Http;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;



import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Flux;



@RestController
public class TestController {
     @Bean
        public Publisher<Message<String>> httpReactiveSource() {
            return IntegrationFlows.
                    from(Http.inboundChannelAdapter("/message/{id}")
                            .requestMapping(r -> r
                                    .methods(HttpMethod.POST)
                            )
                            .payloadExpression("#pathVariables.id")
                    )
                    .channel(MessageChannels.queue())
                    .toReactivePublisher();
        }

        @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<String> eventMessages() {
            return Flux.from(httpReactiveSource())
                    .map(Message::getPayload);
        }

}

这对我来说效果很好:

@SpringBootApplication
@RestController
public class SpringIntegrationSseDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);
    }

    @Bean
    public Publisher<Message<String>> httpReactiveSource() {
        return IntegrationFlows.
                from(Http.inboundChannelAdapter("/message/{id}")
                        .requestMapping(r -> r
                                .methods(HttpMethod.POST)
                        )
                        .payloadExpression("#pathVariables.id")
                )
                .channel(MessageChannels.queue())
                .toReactivePublisher();
    }

    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> eventMessages() {
        return Flux.from(httpReactiveSource())
                .map(Message::getPayload);
    }

}

我在 POM 中有这个依赖项:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.BUILD-SNAPSHOT</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-http</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

我运行该应用程序并有两个终端:

curl http://localhost:8080/events

听取上证所的意见。

在第二个中我执行此操作:

curl -X POST http://localhost:8080/message/foo

curl -X POST http://localhost:8080/message/bar

curl -X POST http://localhost:8080/message/666

因此,第一个终端的响应如下:

data:foo

data:bar

data:666

请注意,我们不需要spring-boot-starter-webflux依赖性。这FluxSSE 与 Servlet 容器上的常规 MVC 配合良好。

Spring Integration 也将很快支持 WebFlux:https://jira.spring.io/browse/INT-4300。因此,您将能够在那里进行如下配置:

   IntegrationFlows
    .from(Http.inboundReactiveGateway("/sse")
                        .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))

并且完全仅依赖 WebFlux,无需任何 Servlet 容器依赖项。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 Http 集成流程创建 Spring Reactor Flux? 的相关文章

  • 由于保存之前/之后的 CSV 差异而导致错误解析(Java w/ Apache Commons CSV)

    我有一个 37 列的 CSV 文件 我正在使用 Apache Commons CSV 1 2 在 Java 中解析该文件 我的设置代码如下 initialize FileReader object FileReader fileReader
  • 在 Java 构建过程中更改常量的最佳方法

    我继承了一个在 Tomcat 下运行的 Java 应用程序 servlet 由于历史原因 根据应用程序的部署位置 本质上是品牌问题 代码具有不同的 外观和感觉 选项 有几个常量控制这个品牌过程 它们具有不同的功能 不应压缩为单个常量 即 B
  • Spring Boot“没有可用消息”错误(状态 = 404),

    我正在使用带有嵌入式 Tomcat 的 Spring Boot 当它启动时 它会登录到控制台 s w s m m a RequestMappingHandlerMapping 将 home 映射到公共 java lang String co
  • 访问 java jigsaw 模块中的资源文件[重复]

    这个问题在这里已经有答案了 我正在尝试从项目中的类访问 Eclipse 项目中的文件 我需要将该项目声明为 jigsaw 模块才能从其他项目访问它 但是通过这样做 我无法再访问项目中的 example png 等文件 这是我的项目结构 pr
  • Android 在 ROOM 数据库中插入大量数据

    我有大约 10 个模型 每个模型都有超过 120K 行和 90 列的记录 其中包含双数组值 在 Room 中插入任何模型都需要超过 125 130 秒 任何人都可以建议我需要做什么才能使用一些批量插入技术来保存所有这些 120K 该技术大约
  • 使用 Morphia 配置 Spring Boot?

    我不想利用 Spring DATA MongoDB 支持 我想利用名为 Morphia 的 MongoDB ORM https github com mongodb morphia https github com mongodb morp
  • Android 中的 ImageView 拖动限制

    我在布局中有一个 ImageView 并在 ImageView 上设置 OnTouchListener 来拖动 ImageView 它工作得很好 我的问题是如何防止将 ImageView 移动到布局范围之外 这是我的代码 活动类别 publ
  • JAXB、Marshal 的问题 - 无法封送类型“java.lang.String”

    当我运行 marshal 操作时 出现以下错误 javax xml bind MarshalException with linked exception com sun istack internal SAXException2 unab
  • Simplify-Ja​​va (by hgoebl) 减少点列表大小始终为 2 的问题

    我正在尝试实现减少算法https github com hgoebl simplify java https github com hgoebl simplify java 我查看了他的测试代码 并试图找出我认为正确的逻辑 我正在列出一份清
  • 使用 python 中的 java 库

    我有一个 python 应用程序和 java 应用程序 python 应用程序为 java 应用程序生成输入并在命令行上调用它 我确信一定有一个更优雅的解决方案 就像使用 JNI 从 Java 调用 C 代码一样 有什么指点吗 仅供参考 我
  • spring启动时如何加载@Cache?

    我正在使用 spring cache 来改进数据库查询 其工作原理如下 Bean public CacheManager cacheManager return new ConcurrentMapCacheManager books Cac
  • 当派生类中重写该方法时,如何使用派生类 Object 调用基类方法?

    class A public void m1 System out println hi base class class B extends A public void m1 System out println hi derived p
  • 非法监控状态异常

    如何将轮询线程传递给另一个线程进行处理 程序执行在控制器类中 该类具有 main 方法和线程池 主类控制器 public static void main String args throws InterruptedException Ru
  • 与 Java 中的同步块相比,新的 Lock 接口有什么优势?

    与 Java 中的同步块相比 新的 Lock 接口有什么优势 您需要实现一个高性能缓存 允许多个读取器但单个写入器保持完整性 您将如何实现它 锁的优点是 让他们公平是可能的 可以使线程在等待 Lock 对象时响应中断 可以尝试获取锁 但如果
  • Cordova Android 应用程序中的网页不可用

    编辑 我一直在解决这个问题并回顾我的所有步骤 我很乐意缩小这个问题的规模 并在令人困惑的情况下获得更多确切的细节 目前 我觉得 Keycloak 似乎只想将我重定向到 https 据我所知 这应该是 Wildfly 服务器配置问题 编辑 我
  • 如何从 Java 类调用 Kotlin 类

    我需要将意图从 java 活动传递到 Kotlin 活动 Java活动ProfileActivity class Intent selectGameIntent new Intent ProfileActivity this kotlin
  • 定时器启动/停止参数

    自从加入这个社区以来 我在技能和进步方面取得了突飞猛进的进步 你们都是一个巨大的帮助 我无法提供一个计时器 该计时器已在启动和停止时实现了某些参数 我要么收到错误消息 局部变量计时器可能尚未初始化 要么没有收到错误消息 但什么也没有发生 也
  • 确定方法调用顺序的接口设计模式

    我想创建一个具有多种方法的 Java 接口 但我希望界面的用户只能按照我定义的顺序或顺序调用方法 例如buyTicket 不应在此之前调用reserveTicket 问 有没有设计模式或任何关于如何做到这一点的提示 我考虑过 A 接口被包装
  • 在同一项目上使用 Eclipse 和 NetBeans

    Eclipse 是一个非常棒的编辑器 我更喜欢使用它 但是缺少 Eclipse 的 GUI 设计工具 另一方面 NetBeans 非常适合 GUI 设计 在同一项目中使用 NetBeans 进行 GUI 设计和 Eclipse 进行其他所有
  • 我可以在方法体内使用注释吗?

    允许 Java 注释的语义将它们放置在某处在函数体内 例如注释特定的函数调用 语句或表达式 例如 class MyClass void theFunc Thing thing String s null Catching NullPoint

随机推荐

  • Haskell:let 语句,将数据类型复制到自身(带/不带修改)不起作用

    我想通过更改一个字段来更新记录语法 所以我做了类似的事情 let rec rec field 1 但我发现我无法打印rec不再 意味着当我尝试时编译器似乎进入无限循环 所以我尝试这样做 let a 1 prints OK let a a n
  • UITextView - 禁用垂直滚动

    如何禁用 UITextView 中的垂直滚动 我希望它基本上只是水平滚动 在某些情况下 当试图限制不需要的 UITextView 滚动时 我发现向 UITextView 委托添加类似以下内容很有帮助 这是一个 UIScrollView 委托
  • 在 Powershell 中更新 VSTS WorkItem 的正确 Invoke-RestMethod 语法是什么 - 构造包含 Windows 路径的 JSON 字符串 [重复]

    这个问题在这里已经有答案了 当尝试使用 Invoke RestMethod 更新现有 VSTS 工作项时 我不断收到 您必须在请求正文中传递有效的补丁文档 这是我要传递的内容 Body op test path rev value 1 op
  • 使用 PHP 读取 MIME 数据

    我有一个第三方程序 它基本上允许用户发送电子邮件 然后将其显示在系统中 但问题是它生成这样的输出 我只想获取这些数据并将其格式化为合适的格式 我想避免正则表达式 是否有任何选项或标准方法可以以更美观的方式显示以下内容 基本上我会将下面的所有
  • 细胞景观。从 URL 加载依赖项

    这是一个非常简单的问题 如何通过 url 将 cytoscape 指示为来源 我知道与D3这很简单 cytoscape 的等效地址是什么 Thanks 您要查找的术语是 CDN 托管的代码副本 严格来说 即使您指定了本地副本 您仍然在使用
  • 在android中制作一个超链接textview

    我想为文本视图文本创建一个链接 例如Google 有没有办法建立这样的链接 即 当点击 Google 一词时 它应该打开相应的链接 欢迎任何想法 试试这个 让我知道会发生什么 使用java代码 TextView textView TextV
  • 使用nodejs从文件中删除最后n行

    我正在尝试使用 fs 作为 nodejs 的一部分从文件中删除最后 3 行 我目前正在将文件读入内存 然后在没有这 3 行的情况下再次写入它 但我确信有一种更有效的方法 不需要将整个文件读入内存 我现在的代码 fs readFile fil
  • 如何在 on_member_join()discord.py 中向成员发送私人消息?

    这就是我所拥有的 client command pass context True client event async def on member join ctx member print f member has joined a s
  • 如何在不使用 SMIL 的情况下制作路径形状动画?

    下面的代码片段显示了我想做的事情 但也存在一些问题 不使用SMIL如何制作这个动画 SMIL 已被弃用 并且浏览器支持较差 如何使黑色路径穿过蓝色路径的右端 红色 蓝色和绿色路径的长度相同 动画应该重复向下 向上 向下 向上 依此类推 第一
  • Fluent nHibernate:需要多对多自引用映射的帮助

    我有一个名为 User 的实体 它可以有一个名为 Friends 的其他用户列表 有点像 Facebook 在我的 User 实体中 我声明了一个公共虚拟 IList Friends get private set 属性 并在构造函数中创建
  • 使用 ng-options 时如何将所选选项的文本分配给另一个模型?

    我可以创建一个选择标签并将数据放入其中 我可以将该选择的选定选项分配给模型 但无法将其文本分配给另一个模型 这是下面的例子jsFiddle链接 html div div div div
  • 在 VS 2010 Express 版本中发布构建事件?

    到目前为止 我一直在 VS 2008 中使用后期构建事件 并下载了 2010 Express 版本来查看新功能 2008 年 构建事件位于项目属性 gt 编译 gt 构建事件中 我在 2010 Express 版本中没有看到该按钮 这是因为
  • pandas 到 csv TypeError: get_handle() 有一个意外的关键字参数“错误”

    我有一张大桌子 我根据日期将其切成许多小桌子 dfs for fecha in fechas dfs fecha df df date fecha set index Hour now I can acess the tables like
  • 如何从 moment.js 获取月份的整数值

    我正在使用 Moment js 将日期添加到我正在制作的选项列表中 以便我可以使用这些日期来显示可用的约会 例如 某人可以从选项列表中选择 2 月 3 日星期五 然后选择可用时间列表将于2月3日出现 我遇到的问题是我正在使用一个在线调度 A
  • 为什么我需要 IoC 容器而不是简单的 DI 代码? [关闭]

    Closed 这个问题是基于意见的 目前不接受答案 Locked 这个问题及其答案是locked因为这个问题是题外话 但却具有历史意义 目前不接受新的答案或互动 我一直在使用依赖注入 DI 一段时间 注入构造函数 属性或方法 我从来没有觉得
  • 如何使用非法名称访问此对象属性?

    我正在使用某人编写的 PHP 类来与 BaseCamp API 进行交互 我正在执行的特定调用是检索待办事项列表中的项目 效果很好 我的问题是 我不知道如何访问todo items返回的对象的属性 这是返回对象的 var dump obje
  • 将变量从 Shell 脚本传递到 Fortran 90 程序

    我被这个小问题困住了 我想知道是否可以将 bash shell 脚本的变量传递给 f90 代码 我很确定之前已经在这里讨论过 但我找不到完全相同的副本 您可以直接将参数作为参数传递给程序 program arg1 arg2 您可以使用子例程
  • 什么是胖接口?

    你好 我在电影行业工作 模拟和应用工作室效果 我可以问一下什么是胖接口吗 我听到网上有人说它是什么 编辑 是的here尼可波拉斯说 我相信非常好的指示 fat interface an interface with more member
  • 如何更改PHP中数组的索引[重复]

    这个问题在这里已经有答案了 我想在执行一些操作后更改数组的索引 我的实际输出是 Array 0 gt Array 0 gt 4 1 gt 6 2 gt Array 0 gt 1 1 gt 7 5 gt Array 0 gt 1 1 gt 7
  • 如何从 Http 集成流程创建 Spring Reactor Flux?

    我有一个与此非常相似的问题如何从 ActiveMQ 队列创建 Spring Reactor Flux 唯一的区别是消息来自 Http 端点而不是 JMS 队列 问题是消息通道由于某种原因没有被填充 或者它没有被 Flux from 拾取 日