从projectreactor使用Flux时生成和处理异常的正确方法是什么

2023-12-19

我正在使用 io.projectreactor 3 (reactor-core 3.2.6.RELEASE),我注意到错误处理方面存在一些差异。不幸的是,官方文档没有提供足够的细节来解决我的问题。

我有以下 4 个片段。在某些情况下,异常将被忽略,而在其他情况下,它将进一步抛出。实际产生和消费异常的方式是怎样的呢?

片段1

在这种情况下,异常将被忽略,并且 main() 将完成而不会收到异常。

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.push(sink -> {
            sink.next(1);
            sink.next(2);
        }).doOnNext(e -> {
            throw new RuntimeException("HELLO WORLD");
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

Output:

DONE

片段2

与上面的示例类似,只是我们不使用 Flux.push 而是使用 Flux.just。 Main() 将收到异常。

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.just(
                1
        ).doOnNext(e -> {
            throw new RuntimeException("HELLO WORLD");
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

Output:

Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
    at Scratch.lambda$main$1(scratch_15.java:10)
...

片段3

我们通过调用sink.error来发出异常信号。 Main() 不会收到异常。

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.push(sink -> {
            sink.next(1);
            sink.next(2);
            sink.error(new RuntimeException("HELLO WORLD"));
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

Output:

1
2
DONE

片段4

我们直接抛出异常。 Main() 将收到异常。

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.push(sink -> {
            sink.next(1);
            sink.next(2);
            throw new RuntimeException("HELLO WORLD");
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

Output

1
2
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
    at Scratch.lambda$main$1(scratch_15.java:10)
...

使用反应核心时处理异常的正确方法是什么?唯一可靠的方法似乎根本不使用错误回调,而是用 try/catch 包围 Flux.subscribe。但在那种情况下我总是收到UnsupportedOperationException而不是原来的异常,然后我需要使用Exceptions.isErrorCallbackNotImplemented要检查它是否来自反应式,请提取嵌套异常然后抛出它。

这当然可以做到,但需要在我们订阅 Flux 的每个地方一致地完成。这对我来说看起来不太好。我在这里缺少什么?


在你所有的例子中,问题都是从.subscribe(...)错误处理 lambda。

如果您希望在主块中引发异常,请使用block()变体。

如果您想测试错误是否在整个管道中传播,请使用StepVerifier.create(pipeline).expectError(...).verify().

.subscribe一般来说是为了获得“终端”状态的通知,而不是为了恢复或重新抛出错误(使用onError*上游运营商)。

The just基于 -的示例似乎正确地传播了异常,因为它们在订阅时不执行用户提供的代码,因此在执行期间没有 try/catchsubscribe(Consumer<Throwable>).

push, like generate/create/defer and compose,执行一些用户定义的逻辑(Consumer<FluxSink>),在订阅时。他们防范整个Consumer抛出异常并尝试传播它(作为onErrorsignal)而不是直接抛出它。

但如果失败了Consumer是在执行其中之一时引起的sink的方法,如果subscriber重新抛出:我们进入递归,其中调用接收器调用接收器。当我们检测到水槽的递归排水时,我们通过退出来防止这种无限情况。

这就是为什么push基于示例之后触发错误sink.next or in sink.error(示例 1 和 3)无法在 main 中产生异常:

  1. Consumer被申请;被应用
  2. sink.next被调用并且下一个运算符创建异常 1,或者sink.error叫做
  3. 异常1到达subscribe并作为异常 2 重新抛出
  4. 这会短路Consumer.apply,异常 2 被传递给sink.error
  5. 接收器已经被调用,所以我们突破以避免无限递归
  6. 例外2从未见过

另一方面,在示例 4 中,我们不再调用接收器的方法,并且原始异常不会首先到达订阅者:

  1. Consumer被申请;被应用
  2. 直接抛出异常1
  3. 这会短路Consumer.apply异常 1 被传递给sink.error
  4. 传播到订阅者
  5. 将其重新抛出为异常 2
  6. 异常2出现在main方法中
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从projectreactor使用Flux时生成和处理异常的正确方法是什么 的相关文章

  • 是否可以在 Spring Batch 中结合分区和并行步骤?

    我只是想知道它在 Spring Batch 中可行吗 Step1Step2 流程 gt 流程1 流程2 流程3 Step3 其中每个flow1 gt 划分为 5 个 GridSizeflow2 gt 划分为 5 个 GridSizeflow
  • 检查双精度值的等于和不等于条件

    我在比较两者时遇到困难double values using and 我创建了 6 个双变量并尝试进行比较If健康 状况 double a b c d e f if a b c d e f My code here in case of t
  • 通过 InjectMocks Spy 注入对象

    我需要对一个类运行一系列单元测试 该类具有 Autowired Logger 实现 实现的基本思想是 Mock Logger logger InjectMocks TestedClass tested 但我想保存日志输出功能 Mockito
  • 方法断点可能会大大减慢调试速度

    每当向方法声明行添加断点 在 Intellij IDEA 或 Android Studio 中 时 都会出现一个弹出窗口 方法断点可能会大大减慢调试速度 为什么会这样戏剧性地减慢调试速度 是我的问题吗 将断点放在函数的第一行有什么不同 Th
  • PropertySources 中各种源的优先级

    Spring引入了新的注释 PropertySources对于所有标记为的类 Configuration since 4 0 需要不同的 PropertySource作为论证 PropertySources PropertySource c
  • 如何将 Spotlight for Help 插入本地化的 macOS 应用程序?

    我正在 macOS 上使用 Swing GUI 框架实现 Java 应用程序 当使用system外观和感觉以及screen菜单栏 Swing 自动插入一个搜索栏 called 聚光灯寻求帮助 https developer apple co
  • Java 变量的作用域

    我不明白为什么这段代码的输出是10 package uno public class A int x 10 A int x 12 new B public static void main String args int x 11 new
  • Spring Stomp over Websocket:流式传输大文件

    我的SockJs客户端在网页中 发送帧大小为16K的消息 消息大小限制决定了我可以传输的文件的最大大小 以下是我在文档中找到的内容 Configure the maximum size for an incoming sub protoco
  • @EnableTransactionManagement 的范围是什么?

    我试图了解正确的放置位置 EnableTransactionManagement多个 JavaConfig 上下文的情况下的注释 考虑以下场景 我在 JPAConfig java 和 AppConfig java 中有 JPA 配置以及一组
  • 具有多种值类型的 Java 枚举

    基本上我所做的是为国家编写一个枚举 我希望不仅能够像国家一样访问它们 而且还能够访问它们的缩写以及它们是否是原始殖民地 public enum States MASSACHUSETTS Massachusetts MA true MICHI
  • tomcat 过滤所有 web 应用程序

    问题 我想对所有网络应用程序进行过滤 我创建了一个过滤器来监视对 apache tomcat 服务器的请求 举例来说 它称为 MyFilter 我在 netbeans 中创建了它 它创建了 2 个独立的目录 webpages contain
  • 如何使用 Mockito 和 Junit 模拟 ZonedDateTime

    我需要模拟一个ZonedDateTime ofInstant 方法 我知道SO中有很多建议 但对于我的具体问题 到目前为止我还没有找到任何简单的解决办法 这是我的代码 public ZonedDateTime myMethodToTest
  • ExceptionHandler 不适用于 Throwable

    我们的应用程序是基于 Spring MVC 的 REST 应用程序 我正在尝试使用 ExceptionHandler 注释来处理所有错误和异常 I have ExceptionHandler Throwable class public R
  • 使用 Java 从 S3 上的文件在 S3 上创建 zip 文件

    我在 S3 上有很多文件 需要对其进行压缩 然后通过 S3 提供压缩文件 目前 我将它们从流压缩到本地文件 然后再次上传该文件 这会占用大量磁盘空间 因为每个文件大约有 3 10MB 而且我必须压缩多达 100 000 个文件 所以一个 z
  • 如何在android sdk上使用PowerMock

    我想为我的 android 项目编写一些单元测试和仪器测试 然而 我遇到了一个困扰我一段时间的问题 我需要模拟静态方法并伪造返回值来测试项目 经过一些论坛的调查 唯一的方法是使用PowerMock来模拟静态方法 这是我的 gradle 的一
  • 来自客户端的超时 Web 服务调用

    我正在使用 RestEasy 客户端调用网络服务 一项要求是 如果调用运行时间超过 5 秒 则中止 超时调用 我如何使用 RestEasy 客户端实现这一目标 我只看到服务器端超时 即如果在一定时间内未完成请求 Rest Easy 网络服务
  • Spock模拟inputStream导致无限循环

    我有一个代码 gridFSFile inputStream bytes 当我尝试这样测试时 given def inputStream Mock InputStream def gridFSDBFile Mock GridFSDBFile
  • Java中获取集合的幂集

    的幂集为 1 2 3 is 2 3 2 3 1 2 1 3 1 2 3 1 假设我有一个Set在爪哇中 Set
  • Spring Boot MSSQL Kerberos 身份验证

    目前在我的春季靴子中application properties文件中 我指定以下行来连接到 MSSql 服务器 spring datasource url jdbc sqlserver localhost databaseName spr
  • Java中有类似分支/跳转表的东西吗?

    Java有类似分支表或跳转表的东西吗 分支表或跳转表是 根据维基百科 http en wikipedia org wiki Branch table 用于描述使用分支指令表将程序控制 分支 转移到程序的另一部分 或可能已动态加载的不同程序

随机推荐

  • Java SDK 的 Couchbase 连接超时

    我按照 couchbase 教程连接到远程 couchbase 服务器 但在我尝试打开默认存储桶后 连接超时失败 我已检查是否可以在我的计算机上打开 couchbase 服务器页面 192 xx xx xx 8091 这是我的Java代码
  • 如何使用 css 滤镜获得图像:模糊和锐利边缘?

    我想在悬停时模糊图像 问题是图像的边缘也模糊得令人不快 在 Fiddle 中 您可以在绿色背景下清楚地看到它 如果我缩放图像 即 1 2 它最终会解决问题 但在过渡过程中 模糊的边缘仍然出现 有什么想法如何使边缘具有这种效果 http js
  • HTML5 上传前预先调整图像大小

    这是一个面条刮刀 请记住 我们有 HTML5 本地存储和 xhr v2 等等 我想知道是否有人可以找到一个可行的示例 甚至只是对这个问题给出 是 或 否 是否可以使用新的本地存储 或其他 预先调整图像大小 以便不知道如何调整图像大小的用户可
  • Swift:@objc(...) 属性

    在 Apple 生成的代码中 Core DataNSManagedObject子类 例如 我看到这个 objc LPFile public class LPFile NSManagedObject 我的问题是 为什么是 objc声明如上所述
  • 我需要与多个远程参与者系统进行通信

    我正在使用 akka Net 开发一种插件架构 其中包含一个或多个插件的每个 dll 都被加载到自己的中AppDomain并且新的参与者系统被初始化 准备接收来自 主机 的消息 我试图让它与多个插件一起工作 但我却陷入困境 所以主机配置如下
  • 条形图中的渐变填充

    我正在观察不同人群的行为 称为Clusters在此数据集中 以及他们对所使用的浏览器类型的偏好 我想创建一个条形图 显示使用每种类型浏览器的每个集群的百分比 下面是一些生成类似数据集的代码 请忽略每个簇的百分比加起来不会等于 1 brows
  • Clang++-3.7 CRTP 编译错误“父级模板参数中没有命名成员”

    在下面的代码中 我尝试使用 CRTP 来使用父类中子类的静态成员 值 当使用带有 pedantic 标志的 g 5 2 1 编译代码时 我能够按预期编译 并且在执行时c print value and Child
  • 从包含数百万个文件的目录中精确匹配地高效查找数千个文件 (bash/python/perl)

    我在 Linux 上 试图从包含数百万个文件的目录 SOURCE DIR 中查找数千个文件 我有一个需要查找的文件名列表 存储在单个文本文件 FILE LIST 中 该文件的每一行都包含与 SOURCE DIR 中的文件相对应的单个名称 并
  • Visual Studio 卡在生成代码中

    当我使用 Visual Studio 构建 C 项目时 进程陷入困境生成代码 1 gt Rebuild All started Project myWrapper Configuration Release Win32 1 gt funzi
  • iOS 重新安装应用程序不会清除徽章

    我已通过本地通知将应用程序的徽章编号设置为 1 然后我卸载该应用程序 当我重新安装它时 徽章仍然存在 这是 iOS 错误还是有办法在卸载时清除徽章 thanks 徽章计数由操作系统维护 独立于应用程序 卸载 删除 应用程序时 操作系统会保留
  • 为什么 PyQt 执行我的操作三次?

    我对 PyQt 还是个新手 但我真的不知道为什么会发生这种情况 我有一个像这样创建的主窗口 class MainWindow QtGui QMainWindow initialize def init self Call parent co
  • shopkick 应用程序 UI 小部件

    请参阅下面的链接 http itunes apple com app id383298204 mt 8 http itunes apple com app id383298204 mt 8 有 showpick 应用程序的屏幕截图 其中有一
  • React中删除项目后如何刷新页面?

    我的问题是 当我单击删除按钮时 我可以从表中删除该项目 但我需要手动刷新页面 如何解决自动刷新 有人能解决我的问题吗 我还在下面附上了我的代码部分 这是我的代码部分 还有 Home 组件和 RowCreator 组件 import Reac
  • Puppeteer:Chromium 实例在 browser.disconnect 后在后台保持活动状态

    我的环境 傀儡师版本 3 1 0 平台 操作系统版本 Windows 10 Node js 版本 12 16 1 我的问题是 我有一个for of使用 puppeteer 循环访问 3000 多个网址 我用puppeteer connect
  • MSAA 的抗锯齿问题、深度绘制 CSG 和 FBO

    我已经重新实现了OpenCSG http www opencsg org适用于现代 OpenGL 版本 PixelFormat属性 NSOpenGLPFAColorSize 24 NSOpenGLPFAAlphaSize 8 NSOpenG
  • React.js 破坏了dispatchEvent

    例如 我在反应中添加事件处理程序 div 然后我调度事件 let clickEvt new MouseEvent click bubbles false cancelable true elm dispatchEvent clickEvt
  • C++ 汇编器输出 - 如何实现引用

    C 和汇编工具链 GNU 我有以下 C 代码 int main void int i 33 j 66 swap i j cout lt lt i lt lt lt lt j lt lt endl return 0 如果我现在检查生成的汇编代
  • 您会将 SQLite 数据库文件放置在 iPhone 应用程序中的什么位置?

    当我最初为我的应用程序创建一个带有预先插入的数据集的 SQLite 数据库文件时 我必须将该文件放置在我的 Xcode 项目中的某个位置 以便它可以访问我的 iPhone 应用程序 我想 资源 是正确的地方 在 iPhone 应用程序中部署
  • 如果 std::numeric_limits::is_iec559 为 true,这是否意味着我可以以明确定义的方式提取指数和尾数?

    我已经构建了一个自定义版本frexp https en cppreference com w cpp numeric math frexp auto frexp float f noexcept static assert std nume
  • 从projectreactor使用Flux时生成和处理异常的正确方法是什么

    我正在使用 io projectreactor 3 reactor core 3 2 6 RELEASE 我注意到错误处理方面存在一些差异 不幸的是 官方文档没有提供足够的细节来解决我的问题 我有以下 4 个片段 在某些情况下 异常将被忽略