CompletableFuture 是异步的。但它是非阻塞的吗?
CompletableFuture 的一个事实是它是真正的异步,它允许您从调用者线程和 API 异步运行任务,例如thenXXX
允许您在结果可用时对其进行处理。另一方面,CompletableFuture
并不总是非阻塞的。例如,当您运行以下代码时,它将在默认情况下异步执行ForkJoinPool
:
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
});
很明显,Thread
in ForkJoinPool
执行任务的函数最终会被阻塞,这意味着我们不能保证调用是非阻塞的。
另一方面,CompletableFuture
公开 API,使您能够真正做到非阻塞。
例如,您始终可以执行以下操作:
public CompletableFuture myNonBlockingHttpCall(Object someData) {
var uncompletedFuture = new CompletableFuture(); // creates uncompleted future
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
uncompletedFuture.completeExceptionally(exception);
return;
}
uncompletedFuture.complete(result);
})
return uncompletedFuture;
}
如您所见,APICompletableFuture
未来为您提供complete
and completeExceptionally
方法可以在需要时完成执行,而不会阻塞任何线程。
Mono 与 CompletableFuture
在上一节中,我们概述了 CF 的行为,但是 CompletableFuture 和 Mono 之间的主要区别是什么?
值得一提的是,我们也可以阻塞 Mono。没有人阻止我们写以下内容:
Mono.fromCallable(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
})
当然,一旦我们订阅了 future,调用者线程就会被阻塞。但我们总是可以通过提供额外的解决方案来解决这个问题subscribeOn
操作员。尽管如此,更广泛的 APIMono
不是关键功能。
为了了解之间的主要区别CompletableFuture
and Mono
,让我们回到前面提到的myNonBlockingHttpCall
方法实施。
public CompletableFuture myUpperLevelBusinessLogic() {
var future = myNonBlockingHttpCall();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
var errorFuture = new CompletableFuture();
errorFuture.completeExceptionally(new RuntimeException());
return errorFuture;
}
return future;
}
如果是CompletableFuture
,一旦调用该方法,它将急切地执行对另一个服务/资源的 HTTP 调用。即使在验证某些前置/后置条件后我们并不真正需要执行结果,它也会开始执行,并且将为这项工作分配额外的 CPU/DB-Connections/What-Ever-Machine-Resources。
相比之下,Mono
根据定义,类型是惰性的:
public Mono myNonBlockingHttpCallWithMono(Object someData) {
return Mono.create(sink -> {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
})
});
}
public Mono myUpperLevelBusinessLogic() {
var mono = myNonBlockingHttpCallWithMono();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
return Mono.error(new RuntimeException());
}
return mono;
}
在这种情况下,直到最后一刻都不会发生任何事情mono
已订阅。因此,只有当Mono
由返回myNonBlockingHttpCallWithMono
方法,将被订阅,逻辑提供给Mono.create(Consumer)
将被执行。
我们还可以走得更远。我们可以让我们的执行更加懒惰。你可能知道,Mono
延伸Publisher
来自反应流规范。 Reactive Streams 的一大特色是背压支持。因此,使用Mono
仅当确实需要数据并且我们的订阅者准备好使用它们时,我们才能执行 API:
Mono.create(sink -> {
AtomicBoolean once = new AtomicBoolean();
sink.onRequest(__ -> {
if(!once.get() && once.compareAndSet(false, true) {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
});
}
});
});
在此示例中,我们仅在订阅者调用时才执行数据Subscription#request
因此,它通过这样做来声明已准备好接收数据。
Summary
-
CompletableFuture
是异步的并且可以是非阻塞的
-
CompletableFuture
很渴望。你不能推迟执行。但你可以取消它们(这比什么都没有好)
-
Mono
是异步/非阻塞的,可以轻松地执行不同的调用Thread
通过编写主要Mono
与不同的运营商。
-
Mono
确实是懒惰的,并且允许通过订阅者的存在及其消费数据的准备情况来推迟执行启动。