将 Streaming Flux 的 WebClient Post 拆分为 JSON 数组

2024-03-25

我正在使用第三方 REST 控制器,它接受 JSON 对象数组并返回单个对象响应。当我使用有限的 WebClient 进行 POST 时Flux代码有效(我认为,因为Flux完成)。

然而,当Flux可能是无限的,我该怎么办;

  1. 以数组块形式发布?
  2. 根据 POSTed 数组捕获响应?
  3. 停止传输Flux?

这是我的豆子;

public class Car implements Serializable {

    Long id;

    public Car() {}
    public Car(Long id) { this.id = id; }
    public Long getId() {return id; }
    public void setId(Long id) { this.id = id; }
}

这就是我假设第三方客户端的样子;

@RestController
public class ThirdPartyServer {

    @PostMapping("/cars")
    public CarResponse doCars(@RequestBody List<Car> cars) {
        System.err.println("Got " + cars);
        return new CarResponse("OK");
    }
}

这是我的代码。当我发帖时flux2,完成后发送 JSON 数组。但是,当我发布flux1,第一个之后没有发送任何内容take(5)。如何 POST 接下来的 5 个块?

@Component
public class MyCarClient {

    public void sendCars() {

//      Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));
        Flux<Car> flux2 = Flux.range(1, 10).map(i -> new Car((long) i));

        WebClient client = WebClient.create("http://localhost:8080");
        client
            .post()
            .uri("/cars")
            .contentType(MediaType.APPLICATION_JSON)
            .body(flux2, Car.class) 
//          .body(flux1.take(5).collectList(), new ParameterizedTypeReference<List<Car>>() {})
            .exchange()
            .subscribe(r -> System.err.println(r.statusCode()));
    }
}

  1. 如何在数组块中进行 POST?

使用其中一种变体Flux.window https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#window-int-将主通量拆分为窗口通量,然后使用窗口通量发送请求.flatMap

        Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));

        WebClient client = WebClient.create("http://localhost:8080");
        Disposable disposable = flux1
                // 1
                .window(5)
                .flatMap(windowedFlux -> client
                        .post()
                        .uri("/cars")
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(windowedFlux, Car.class)
                        .exchange()
                        // 2
                        .doOnNext(response -> System.out.println(response.statusCode()))
                        .flatMap(response -> response.bodyToMono(...)))
                .subscribe();

        Thread.sleep(10000);

        // 3
        disposable.dispose();

  1. 如何捕获每个 POSTed 数组的响应?

您可以在之后通过运算符分析响应.exchange().

在我提供的示例中,可以在以下内容中看到响应doOnNext运算符,但您可以使用任何可操作的运算符onNext信号,例如map or handle.

请务必完整阅读响应正文,以确保连接返回到池中(请参阅note https://docs.spring.io/spring-framework/docs/current/spring-framework-reference/web-reactive.html#webflux-client-exchange)。在这里,我使用了.bodyToMono,但任何.body or .toEntity方法会起作用。

  1. 停止通量的传输?

当使用subscribe正如您所做的那样,您可以使用返回的方法停止流程disposable.dispose().

或者,您可以从以下位置返回 Flux:sendCars()方法并将订阅和处置委托给调用者。

请注意,在我提供的示例中,我只是使用了Thread.sleep()来模拟等待。在实际应用中,您应该使用更高级的东西,并避免Thread.sleep()

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 Streaming Flux 的 WebClient Post 拆分为 JSON 数组 的相关文章

随机推荐