如果我理解正确的话,你需要这样的东西来包装你现有的callExternalUrl
static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method)
{
return Observable.fromCallable(() -> callExternalUrl(url, json, method))
.subscribeOn(Schedulers.io())
.flatMap(re -> {
if (re.hasBody())
return Observable.just(re.getBody());
else
return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
},
e -> Observable.error(e),
(Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
.observeOn(Schedulers.computation());
}
代码简短描述:
- 它安排现有的执行
callExternalUrl
on the Schedulers.io
- 是否进行最小变换
ResponseEntity<T>
进入成功T
和错误情况。它发生在io
调度程序也是如此,但这并不重要,因为它真的很短。 (如果里面有异常callExternalUrl
,它按原样传递。)
- 使订阅者能够执行结果
Schedulers.computation
Caveats:
- 您可能想对两者使用自定义调度程序
subscribeOn
and observeOn
- 您可能希望在传递给的第一个 lambda 中有一些更好的逻辑
flatMap
为了区分成功和错误,并且您肯定需要一些更具体的异常类型。
高阶魔法
如果您愿意使用高阶函数并牺牲一点性能来减少代码重复,您可以这样做:
// Universal wrapper method
static <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method)
{
return Observable.fromCallable(() -> externalCall.call(url, json, method))
.subscribeOn(Schedulers.io())
.flatMap(re -> {
if (re.hasBody())
return Observable.just(re.getBody());
else
return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
},
e -> Observable.error(e),
(Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
.observeOn(Schedulers.computation());
}
static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method)
{
return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);
}
Where MyClass
无论你在哪里callExternalUrl
is.
Update(仅限异步调用)
私有静态 RxClient httpClient = Rx.newClient(RxObservableInvoker.class); // 这里你可以传递自定义的 ExecutorService
private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) {
return httpClient.target(url)
.request()
.headers(httpHeaders) // assuming httpHeaders is something global as in your example
.rx()
.method(httpMethod, entity)
.map(resp -> {
if (200 != resp.getStatus()) {
throw new RuntimeException("Bad status code " + resp.getStatus());
} else {
if (!resp.hasEntity()) {
// return null; // or error?
throw new RuntimeException("Empty response"); // or empty?
} else {
try {
return resp.readEntity(String.class);
} catch (Exception ex) {
throw new RuntimeException(ex); // wrap exception into unchecked
}
}
}
})
.observeOn(Schedulers.computation());
}
private Observable<String> executeGetAsync(String url) {
return executeHttpAsync(url, "GET", null);
}
private Observable<String> executePostAsync(String url, String json) {
return executeHttpAsync(url, "POST", Entity.json(json));
}
又相似caveats apply:
- 您可能想对两者使用自定义调度程序
newClient
打电话和observeOn
- 您可能想要一些更好的错误处理逻辑,而不仅仅是检查它是否是 HTTP 200,并且您肯定需要一些更具体的异常类型。但这都是特定于业务逻辑的,因此取决于您。
此外,从您的示例中还不清楚请求的正文(HttpEntity
)是构建以及你是否真的总是想要String
作为响应,就像您原来的示例中一样。不过我只是按原样复制了你的逻辑。如果您需要更多内容,您可能应该参考以下位置的文档:https://jersey.java.net/documentation/2.25/media.html#json