如何正确读取 Flux 并将其转换为单个 inputStream

2024-03-13

我在用着WebClient和定制BodyExtractor我的 spring-boot 应用程序的类

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> {
     return response.body(new BodyExtractor());
   })

BodyExtractor.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;
    }
  }).next();
}

上面的代码适用于小有效负载,但不适用于大有效负载,我认为这是因为我只读取单个通量值next我不知道如何组合并阅读所有内容dataBuffer.

我是reactor的新手,所以我不知道很多flux/mono的技巧。


这实际上并不像其他答案所暗示的那么复杂。

流式传输数据而不将其全部缓冲在内存中的唯一方法是使用管道,如 @jin-kwon 建议的那样。然而,使用 Spring 可以非常简单地完成身体提取器 https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/reactive/function/BodyExtractors.html and 数据缓冲区工具 https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/core/io/buffer/DataBufferUtils.html实用程序类。

Example:

private InputStream readAsInputStream(String url) throws IOException {
    PipedOutputStream osPipe = new PipedOutputStream();
    PipedInputStream isPipe = new PipedInputStream(osPipe);

    ClientResponse response = webClient.get().uri(url)
        .accept(MediaType.APPLICATION.XML)
        .exchange()
        .block();
    final int statusCode = response.rawStatusCode();
    // check HTTP status code, can throw exception if needed
    // ....

    Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
        .doOnError(t -> {
            log.error("Error reading body.", t);
            // close pipe to force InputStream to error,
            // otherwise the returned InputStream will hang forever if an error occurs
            try(isPipe) {
              //no-op
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
            }
        })
        .doFinally(s -> {
            try(osPipe) {
              //no-op
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
            }
        });

    DataBufferUtils.write(body, osPipe)
        .subscribe(DataBufferUtils.releaseConsumer());

    return isPipe;
}

如果您不关心检查响应代码或引发失败状态代码的异常,则可以跳过block()调用和中间ClientResponse变量通过使用

flatMap(r -> r.body(BodyExtractors.toDataBuffers()))

instead.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何正确读取 Flux 并将其转换为单个 inputStream 的相关文章

随机推荐

  • 如何从android中的edittext中获取选定的文本?

    我有一个编辑文本 其中选择了一些文本 我只想通过单击按钮从 edittext 中获取选定的文本 请向我推荐可用的链接或示例代码 EditText et EditText findViewById R id edit int startSel
  • 如何在 JSF 中的表中动态添加行?

    在我的应用程序中 我需要单击按钮添加一行 并且该按钮将位于所有行中 需要帮助来做到这一点吗 项目类别 public class Item public Item private String value public Item String
  • 在java web应用程序中添加httponly和安全标志来设置cookie

    我想添加httponly and secureCookie 的标志 为了实现它 我正在使用Filters其配置在web xml 添加flag的代码如下 package com crisil dbconn import java io IOE
  • 无法在 IIS (Windows 10) 上运行 ASP.net 服务 - 错误 404(未找到)

    场景如下 Windows 10 VS2015 update 3 创建新的空白 空 asp net 项目 NET 4 5 2 并添加虚拟 index html 使用 IIS Express 默认 运行项目 运行良好 将服务器更改为 IIS 获
  • 在 Rails 中将时间从一个时区转换为另一时区

    My created at时间戳以 UTC 格式存储 gt gt Annotation last created at gt Sat 29 Aug 2009 23 30 09 UTC 00 00 如何将其中之一转换为 东部时间 美国和加拿大
  • Rails name_scope 与 has_and_belongs_to_many

    我有 3 个表 电影 film genres 用于连接 2 个表 和流派 在模型 film rb 中 has and belongs to many genres 在模型类型 rb has and belongs to many films
  • 在 TextMate 中将 RVM 与不同的 gemset 结合使用

    我已经设置了 RVM 并按照以下方式为我的项目制作了单独的宝石集RVM 最佳实践 http rvm beginrescueend com rvm best practices 现在在 TextMate 中运行测试文件不起作用 我已经在这里读
  • Laravel Mix 中的真实 ENV 值

    因此 在 Laravel Mix 中 文档中说我们可以向 env 文件添加以 MIX 为前缀的内容 然后我们可以在编译时在 JS 文件中访问它 我想我可能在这里遗漏了一些东西 因为这并没有真正提供与 env 文件类似的任何内容 因为在将资产
  • 在脚本中使用内置电子表格函数

    我是第一次使用 Google App Script 我在 Google Doc 电子表格上使用它 我正在尝试非常简单的功能 只是为了学习基础知识 例如这有效 function test hello return hello 但我对这个简单的
  • BEM CSS:相似块和样式共享

    阅读有关 BEM CSS 的内容并使用它编写了一些小型网站 我对它相当熟悉 但是 我仍然不确定如何处理非常相似但没有关系的块 假设我有很多无序列表块 它们的顶行都具有相同的样式 其他列表项可以以不同的方式布局 并且彼此完全无关 我发现自己给
  • 在 onPageFinished 事件之外将 Javascript 注入到 Web 视图中(使用 DatePicker 在 Web 视图的输入上设置日期)

    我有一个 Android 应用程序 运行一个加载特定页面的 WebView 也是该应用程序的一部分 我想使用 Android DatePicker 从用户那里获取日期 并在 WebView 页面内的输入上设置该值 当用户单击输入时 Date
  • UIWebView横向旋转不填充视图

    我的 UIWebView 遇到问题 当视图加载时 它会以任一方向加载 完美填充整个页面等 然而 如果我以纵向方式加载它 然后旋转设备 网络视图不会一直填充到右侧 我一生都无法弄清楚为什么 这是我的看法确实加载方法 void viewDidL
  • 创建 ViewModel:在模型数据可用之前还是之后进行?

    我有一个从数据源加载数据的 WPF 应用程序 在加载一些数据之前 没有任何内容可显示 我的问题是我是否 在任何数据可用之前创建V和VM 一旦可用 就在虚拟机中设置数据 一开始只创建V 等待数据可用 然后创建注入数据的虚拟机 仅在数据可用时创
  • 如何删除已删除的 git 子树文件夹的历史记录?

    我使用 git subtree 添加了一个 git 存储库 问题是我硬重置回使用 git subtree 添加存储库之前 现在提交历史记录仍在存储库中 但已与主服务器断开连接 知道如何删除它吗 我尝试了 git rm cached 但没有成
  • 跨多个表维护 Identity 值

    我们遇到的情况是 多个表中有一个名为 Customer Number 的列 该列是所有表中的标识列 但是有没有办法可以使该列在所有表中唯一 例如 如果我在 table one 中添加一行 并且标识列为其分配值 1 现在如果有人在 table
  • 从.ajax()调用加载knockout.js observableArray()

    这让我很困惑 这一定是我没看到的小事 我正在尝试加载一个非常简单的observableArray通过 ajax 调用进行淘汰赛 javascript we bind the array to the view model property
  • 抑制命令行输出

    我有一个像这样的简单批处理文件 echo off taskkill im test exe f gt nul pause 如果 test exe 未运行 我会收到以下消息 ERROR The process test exe not fou
  • 静态主类 - AS3

    有没有一种方法可以使主类 基于主 fla 的类 静态 所以我们可以像在java中一样使用它 能够从其他类引用它 因为我必须将main本身的实例作为参数传递给一个类 否则我会失去引用 我尝试添加静态前缀 但似乎 as3 不允许 AS3中没有静
  • 生成随机数一次

    我需要创建一组 0 到 800 之间的随机数 问题是目前我需要快速执行此操作 并且每个数字仅返回一次 我目前的做法是 创建一个std vector包含从 0 到 800 的数字 使用选择一个号码numberVector rand numbe
  • 如何正确读取 Flux 并将其转换为单个 inputStream

    我在用着WebClient和定制BodyExtractor我的 spring boot 应用程序的类 WebClient webLCient WebClient create webClient get uri url params acc