我正在使用第三方 REST 控制器,它接受 JSON 对象数组并返回单个对象响应。当我使用有限的 WebClient 进行 POST 时Flux
代码有效(我认为,因为Flux
完成)。
然而,当Flux
可能是无限的,我该怎么办;
- 以数组块形式发布?
- 根据 POSTed 数组捕获响应?
- 停止传输
Flux
?
这是我的豆子;
public class Car implements Serializable {
Long id;
public Car() {}
public Car(Long id) { this.id = id; }
public Long getId() {return id; }
public void setId(Long id) { this.id = id; }
}
这就是我假设第三方客户端的样子;
@RestController
public class ThirdPartyServer {
@PostMapping("/cars")
public CarResponse doCars(@RequestBody List<Car> cars) {
System.err.println("Got " + cars);
return new CarResponse("OK");
}
}
这是我的代码。当我发帖时flux2
,完成后发送 JSON 数组。但是,当我发布flux1
,第一个之后没有发送任何内容take(5)
。如何 POST 接下来的 5 个块?
@Component
public class MyCarClient {
public void sendCars() {
// Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));
Flux<Car> flux2 = Flux.range(1, 10).map(i -> new Car((long) i));
WebClient client = WebClient.create("http://localhost:8080");
client
.post()
.uri("/cars")
.contentType(MediaType.APPLICATION_JSON)
.body(flux2, Car.class)
// .body(flux1.take(5).collectList(), new ParameterizedTypeReference<List<Car>>() {})
.exchange()
.subscribe(r -> System.err.println(r.statusCode()));
}
}
- 如何在数组块中进行 POST?
使用其中一种变体Flux.window https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#window-int-将主通量拆分为窗口通量,然后使用窗口通量发送请求.flatMap
Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));
WebClient client = WebClient.create("http://localhost:8080");
Disposable disposable = flux1
// 1
.window(5)
.flatMap(windowedFlux -> client
.post()
.uri("/cars")
.contentType(MediaType.APPLICATION_JSON)
.body(windowedFlux, Car.class)
.exchange()
// 2
.doOnNext(response -> System.out.println(response.statusCode()))
.flatMap(response -> response.bodyToMono(...)))
.subscribe();
Thread.sleep(10000);
// 3
disposable.dispose();
- 如何捕获每个 POSTed 数组的响应?
您可以在之后通过运算符分析响应.exchange()
.
在我提供的示例中,可以在以下内容中看到响应doOnNext
运算符,但您可以使用任何可操作的运算符onNext
信号,例如map
or handle
.
请务必完整阅读响应正文,以确保连接返回到池中(请参阅note https://docs.spring.io/spring-framework/docs/current/spring-framework-reference/web-reactive.html#webflux-client-exchange)。在这里,我使用了.bodyToMono
,但任何.body
or .toEntity
方法会起作用。
- 停止通量的传输?
当使用subscribe
正如您所做的那样,您可以使用返回的方法停止流程disposable.dispose()
.
或者,您可以从以下位置返回 Flux:sendCars()
方法并将订阅和处置委托给调用者。
请注意,在我提供的示例中,我只是使用了Thread.sleep()
来模拟等待。在实际应用中,您应该使用更高级的东西,并避免Thread.sleep()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)