使用项目反应器 mergeWith() 运算符来实现“if/elseif/else”分支逻辑

2024-03-17

我正在尝试使用项目反应堆 mergeWith运算符以实现if/elseif/else分支逻辑如下所述:RxJS,If-Else 运算符在哪里 https://rangle.io/blog/rxjs-where-is-the-if-else-operator/.

提供的示例是用 RxJS 编写的,但基本思想保持不变。

基本上这个想法是使用filter3 号操作员monos/publishers(因此有 3 个不同的谓词)并合并 3 个monos如下(这里是RxJSObservables当然):

const somethings$ = source$
  .filter(isSomething)
  .do(something);

const betterThings$ = source$
  .filter(isBetterThings)
  .do(betterThings);

const defaultThings$ = source$
  .filter((val) => !isSomething(val) && !isBetterThings(val))
  .do(defaultThing);

// merge them together
const onlyTheRightThings$ = somethings$
  .merge(
    betterThings$,
    defaultThings$,
  )
  .do(correctThings);

我复制并粘贴了上述文章中的相关示例。

考虑一下something$, betterThings$ and defaultThings$是我们的单声道isSomething & isBetterThings是谓词。

现在这是我的 3 个真实的monos/publishers(用java编写):

private Mono<ServerResponse> validateUser(User user) {
    return Mono.just(new BeanPropertyBindingResult(user, User.class.getName()))
        .doOnNext(err -> userValidator.validate(user, err))
        .filter(AbstractBindingResult::hasErrors)
        .flatMap(err ->
            status(BAD_REQUEST)
                .contentType(APPLICATION_JSON)
                .body(BodyInserters.fromObject(err.getAllErrors()))
        );
}

private Mono<ServerResponse> validateEmailNotExists(User user) {
    return userRepository.findByEmail(user.getEmail())
        .flatMap(existingUser ->
            status(BAD_REQUEST)
                .contentType(APPLICATION_JSON)
                .body(BodyInserters.fromObject("User already exists."))
        );
}

private Mono<ServerResponse> saveUser(User user) {
    return userRepository.save(user)
        .flatMap(newUser -> status(CREATED)
            .contentType(APPLICATION_JSON)
            .body(BodyInserters.fromObject(newUser))
        );
}

这是需要合并三个的顶级方法publishers:

public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
    return serverRequest.bodyToMono(User.class)
        .mergeWith(...)

}

我不知道如何使用mergeWith()运算符...我已经尝试过Mono.when()静态运算符需要多个发布者(对我来说很好)但返回一个Mono<void>(对我来说不好)。

有人可以帮忙吗?

附:我相信您会原谅 RxJS (js) 和 Reactor 代码 (java) 之间的混合。我打算利用 RxJS 中的知识在我的 Reactor 应用程序中实现类似的目标。 :-)

edit 1: 我已经尝试过这个:

public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
    return serverRequest
        .bodyToMono(User.class)
        .flatMap(user -> validateUser(user).or(validateEmailNotExists(user)).or(saveUser(user))).single();
}

但我收到这个错误:NoSuchElementException: Source was empty

edit 2:与(注意括号)相同:

public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
    return serverRequest
        .bodyToMono(User.class)
        .flatMap(user -> validateUser(user).or(validateEmailNotExists(user)).or(saveUser(user)).single());
}

edit 3:同样的错误Mono<User>:

public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
    Mono<User> userMono = serverRequest.bodyToMono(User.class);
    return validateUser(userMono)
        .or(validateEmailNotExists(userMono))
        .or(saveUser(userMono))
        .single();
}

edit 4:我可以确认三个单声道中至少有一个会始终发光。这是当我使用or()操作员表示出了问题...

如果我使用它,我的所有测试都会通过:

public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
    return serverRequest.bodyToMono(User.class)
        .flatMap(user -> Flux.concat(validateUser(user), validateEmailNotExists(user), saveUser(user)).next().single());
}

我已经用过concat()此处的运算符保留运算顺序。

你知道我做错了什么吗or()操作员?

edit 5: 我已经尝试过cache()运算符如下无济于事:

public Mono<ServerResponse> signUpUser(ServerRequest serverRequest) {
    return serverRequest
        .bodyToMono(User.class)
        .cache()
        .flatMap(user -> validateUser(user)
            .or(validateEmailNotExists(user))
            .or(saveUser(user))
            .single()
        );
}

您当前的代码示例意味着您的 3 个方法返回Mono<ServerResponse>应该采取Mono<User>而不是一个User,所以你可能需要在那里改变一些东西。

然而,我离题了——这似乎不是这里的主要问题。

根据我对该链接中描述的模式的理解,您正在创建 3 个单独的Mono对象,其中只有一个会返回结果 - 并且您需要一个Mono无论您原来的 3 个中的哪一个Mono对象返回。

在这种情况下,我会推荐如下内容:

Mono<ServerResult> result = Flux.merge(validateUser(user), validateEmailNotExists(user), saveUser(user)).next().single();

分解一下:

  • 静态的Flux.merge()方法需要你 3Mono对象并将它们合并成一个Flux;
  • next()返回第一个可用结果作为Mono;
  • single()将确保Mono发出一个值,而不是什么都不发出,否则抛出异常。 (可选,但只是一点安全网。)

你也可以直接链接Mono.or()像这样:

Mono<ServerResult> result = validateUser(user).or(validateEmailNotExists(user)).or(saveUser(user)).single();

这种方法的优点是:

  • 在某些情况下,它可以说更具可读性;
  • 如果您可能拥有多个Mono在你的链中返回一个结果,这允许你设置一个选择的优先顺序(与上面的例子相反,你只会得到任何Mono首先发出一个值。)

缺点之一是性能方面。如果saveUser()上面的代码中先返回一个值,然后还得等待另外两个Mono合并之前要完成的对象Mono将完成。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用项目反应器 mergeWith() 运算符来实现“if/elseif/else”分支逻辑 的相关文章

随机推荐