使用 ReactiveX for Java 进行 Http 调用

2023-11-23

我是 ReactiveX for Java 的新手,我有以下代码块可以进行外部 http 调用,但它不是异步的。我们使用 rxjava 1.2 和 Java 1.8

  private ResponseEntity<String> callExternalUrl(String url, String json, HttpMethod method) {

    RestTemplate restTemplate;
    HttpEntity request;

      request = new HttpEntity(jsonContent, httpHeaders);

    return restTemplate.exchange(url, httpMethod, request, String.class);

  }

我在网上找到了以下代码块,但我无法完全理解它以及如何将其应用到我的代码库中。

private RxClient<RxObservableInvoker> httpClient;
public <T> Observable<T> fetchResult(String url, Func1<Response, T> mapper) {

    return httpClient.target(url)
        .request()
        .rx()
        .get()
        .subscribeOn(Schedulers.io())
        .map(mapper);
  }

如果我理解正确的话,你需要这样的东西来包装你现有的callExternalUrl

static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> callExternalUrl(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

代码简短描述:

  1. 它安排现有的执行callExternalUrl on the Schedulers.io
  2. 是否进行最小变换ResponseEntity<T>进入成功T和错误情况。它发生在io调度程序也是如此,但这并不重要,因为它真的很短。 (如果里面有异常callExternalUrl,它按原样传递。)
  3. 使订阅者能够执行结果Schedulers.computation

Caveats:

  1. 您可能想对两者使用自定义调度程序subscribeOn and observeOn
  2. 您可能希望在传递给的第一个 lambda 中有一些更好的逻辑flatMap为了区分成功和错误,并且您肯定需要一些更具体的异常类型。

高阶魔法

如果您愿意使用高阶函数并牺牲一点性能来减少代码重复,您可以这样做:

// Universal wrapper method
static <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> externalCall.call(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method)
{
    return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);
}

Where MyClass无论你在哪里callExternalUrl is.


Update(仅限异步调用)

私有静态 RxClient httpClient = Rx.newClient(RxObservableInvoker.class); // 这里你可以传递自定义的 ExecutorService

private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) {
    return httpClient.target(url)
            .request()
            .headers(httpHeaders) // assuming httpHeaders is something global as in your example
            .rx()
            .method(httpMethod, entity)
            .map(resp -> {
                if (200 != resp.getStatus()) {
                    throw new RuntimeException("Bad status code " + resp.getStatus());
                } else {
                    if (!resp.hasEntity()) {
                        // return null; // or error?
                        throw new RuntimeException("Empty response"); // or empty?
                    } else {
                        try {
                            return resp.readEntity(String.class);
                        } catch (Exception ex) {
                            throw new RuntimeException(ex); // wrap exception into unchecked
                        }
                    }
                }
            })
            .observeOn(Schedulers.computation());
}

private Observable<String> executeGetAsync(String url) {
    return executeHttpAsync(url, "GET", null);
}

private Observable<String> executePostAsync(String url, String json) {
    return executeHttpAsync(url, "POST", Entity.json(json));
}

又相似caveats apply:

  1. 您可能想对两者使用自定义调度程序newClient打电话和observeOn
  2. 您可能想要一些更好的错误处理逻辑,而不仅仅是检查它是否是 HTTP 200,并且您肯定需要一些更具体的异常类型。但这都是特定于业务逻辑的,因此取决于您。

此外,从您的示例中还不清楚请求的正文(HttpEntity)是构建以及你是否真的总是想要String作为响应,就像您原来的示例中一样。不过我只是按原样复制了你的逻辑。如果您需要更多内容,您可能应该参考以下位置的文档:https://jersey.java.net/documentation/2.25/media.html#json

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 ReactiveX for Java 进行 Http 调用 的相关文章

随机推荐

  • 什么是好的Jsp IDE

    我是一名 C 开发人员 但有一份涉及一些 jsp 代码的工作 我一直在使用记事本 它在突出显示方面做得很好 但我缺少智能感知 并且发现很难找到我需要的方法 对于喜欢 C 并具有智能感知功能的人来说 什么是好的 jsp IDE Update
  • 在 bash 无限循环期间禁用用户输入

    我有这个 bash 脚本 它基本上启动带有进度指示器的 Web 和 selenium 服务器 由于硒服务器启动需要一些时间 我正在无限循环中检查状态 问题是 在等待它启动时 我不小心按下了按键 它会显示在屏幕上 如果循环结束 超时 它也会显
  • C# 中的 HTTP 代理服务器

    我的公司正在尝试使用 NET Fx 3 5 和 C 编写代理服务器 来自我们的research我读到 HttpListener 不是代理服务器的良好候选者 尽管我不确定为什么 我们目前正在与Mentalis代理示例源代码尽管这将涉及到实现我
  • 如何在屏幕之间切换?

    我是 Android 开发世界的新手 我创建了简单的应用程序 并通过一个按钮创建了一个简单的 GUI 如果用户按下此按钮 我想更改屏幕以显示其他 GUI 我怎样才能做到这一点 你可以这样做 import android view View
  • Golang - 如何从代码内部显示模块版本

    我正在编写两个二进制文件 它们都使用两个库 我们可以称它们为 libA 和 libB 每个库都位于专用的 git 存储库中 并使用 git 标签来声明版本 例如 libA 的版本为 v1 0 9 libB 的版本为 v0 0 12 两个二进
  • 所有小提琴的面积相同,与 ggplot2 中的面无关

    我想为三个不同的因素创建一个图 其中所有小提琴都具有相同的面积 但使用facet grid C 似乎迫使每个方面内的小提琴 即仅在因子 C 水平内的小提琴 具有相同的面积 我怎样才能克服这个问题 library ggplot2 d lt d
  • 在 Gradle 中,如何生成具有解析为实际使用版本的动态依赖项的 POM 文件?

    在 Gradle 中 如何生成具有解析为实际使用版本的动态依赖项的 POM 文件 dependencies testCompile group junit name junit version 4 这是从上面的依赖关系生成的
  • 如何在 ASP.NET 运行时更改 FormsCookieName

    我们希望根据应用程序实例更改 FormsCookiePath 的 FormsCookieName 我们有一个应用程序 在 1 个服务器 域名上有多个实例 因此 我们只能同时在 1 个应用程序中工作 因为 cookie 会互相覆盖 顺便说一句
  • 无法使用 Vue CLI 3 要求“fs”

    我正在使用 Vue CLI 3 开发适用于 Windows 10 的调度软件 该应用程序需要使用 fs 模块 但是 我找不到方法 任何地方都没有 webpack 配置文件 我该如何解决这个问题 Vue CLI 3 是如此不同 以至于我无法使
  • jQuery .load() 不加载脚本

    我有 jQuery load 功能类似于 load to html 页面 targetID load load from html bodyPart script 但是 这似乎没有从 load from html 页面加载 javascri
  • 如何将系统命令输出存储在变量中?

    我正在执行一个 system 函数 它返回一个文件名 现在我不想在屏幕上显示输出 即文件名 或通过管道传输到新文件 我只想将其存储在变量中 那可能吗 如果是这样 怎么办 谢谢 单个文件名 是的 这当然是可能的 但不使用system Use
  • 批量删除文件名中的字符

    我在 Windows 资源管理器中有 3 个主文件夹 其中包含命名类似于 ALB 01 00000 intsect d kml 或 Baxters Creek AL intsect d kml 的文件 尽管第一个名称发生了变化 但我想从所有
  • HTML5 Canvas:缩放

    有没有简单的方法可以在画布 JavaScript 中放大和缩小 基本上我有一个 400x400px 的画布 我希望能够使用 mousedown 2x 放大并使用 mouseup 返回 过去两天用谷歌搜索 但到目前为止还没有运气 基于使用 d
  • 等待背景图像(css)加载完毕

    假设我们有一个图片幻灯片 这些图片的缩略图显示在带有滑块的 div 包装器中 我用 Jquery 创建的 并且每个图像都包含在 li 带有 CSS 背景集 它当然代表图像 我选择使用背景图像来进行布局 因为它们的大小和纵横比都不同 图像来自
  • 有哪些方法可以在 AngularJS 中显示“分块”响应?

    目前 我在显示从 Web 服务 Node js 服务器 localhost 3000 发送到 Node js 服务器 localhost 3001 上运行的模拟客户端的响应 块 时遇到问题 编辑 当前实现仅使用 Angular 的 http
  • 我可以从 popen() 流打开 bash 吗?

    根据 popen 的手册页 我正在打开 bin sh 有没有办法可以重载此行为以打开 bin bash shell 并与 BASH shell 脚本交互 或者我是否需要打开 pty 风格的连接才能做到这一点 如果您想在传递给的代码片段中使用
  • Android 警报对话框并设置肯定按钮

    这是用于滑块拼图的 我想在拼图完成时显示一个带有 确定 按钮的对话框 当按下 确定 按钮时 我使用Intent通过 Android 浏览器加载网站 唯一的问题是 使用当前代码 当拼图完成时 它不会加载盒子 当我使用null 它没有任何作用
  • 如何将 istream 与字符串一起使用

    我想将文件读入字符串 我正在寻找不同的方法来有效地做到这一点 使用固定大小的 char 缓冲区 我收到了answer来自 Tony 的内容创建了一个 16 kb 缓冲区并读取该缓冲区并追加该缓冲区 直到没有更多内容可读取 我了解它是如何工作
  • 在 XSLT 中使用 fn:sum 以及包含空值的节点集

    我正在尝试使用 XSLT 和 XPath 函数 fn sum 对 XML 中的一组值求和 只要值非空 这种方法就可以正常工作 但事实并非如此 为了说明我的问题 我举了一个例子
  • 使用 ReactiveX for Java 进行 Http 调用

    我是 ReactiveX for Java 的新手 我有以下代码块可以进行外部 http 调用 但它不是异步的 我们使用 rxjava 1 2 和 Java 1 8 private ResponseEntity