假设我有一个充满任务的队列,我需要将其提交给执行器服务。我希望一次处理一个。我能想到的最简单的方法是:
- 从队列中取出一个任务
- 提交给执行人
- 对返回的 Future 调用 .get 并阻塞,直到有结果可用
- 从队列中取出另一个任务...
但是,我试图完全避免阻塞。如果我有 10,000 个这样的队列,它们需要一次处理一个任务,那么我将耗尽堆栈空间,因为它们中的大多数将保留阻塞的线程。
我想要的是提交任务并提供任务完成时调用的回调。我将使用该回调通知作为发送下一个任务的标志。 (functionjava和jetlang显然使用了这种非阻塞算法,但我无法理解他们的代码)
如果不编写自己的执行器服务,如何使用 JDK 的 java.util.concurrent 来做到这一点?
(为我提供这些任务的队列本身可能会阻塞,但这是一个稍后要解决的问题)
定义一个回调接口来接收您想要在完成通知中传递的任何参数。然后在任务结束时调用它。
您甚至可以为可运行任务编写通用包装器,并将它们提交给ExecutorService
。或者,请参阅下文了解 Java 8 中内置的机制。
class CallbackTask implements Runnable {
private final Runnable task;
private final Callback callback;
CallbackTask(Runnable task, Callback callback) {
this.task = task;
this.callback = callback;
}
public void run() {
task.run();
callback.complete();
}
}
With CompletableFuture http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html,Java 8 包含了一种更复杂的方法来组成管道,其中流程可以异步且有条件地完成。这是一个人为但完整的通知示例。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class GetTaskNotificationWithoutBlocking {
public static void main(String... argv) throws Exception {
ExampleService svc = new ExampleService();
GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
f.thenAccept(listener::notify);
System.out.println("Exiting main()");
}
void notify(String msg) {
System.out.println("Received message: " + msg);
}
}
class ExampleService {
String work() {
sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
char[] str = new char[5];
ThreadLocalRandom current = ThreadLocalRandom.current();
for (int idx = 0; idx < str.length; ++idx)
str[idx] = (char) ('A' + current.nextInt(26));
String msg = new String(str);
System.out.println("Generated message: " + msg);
return msg;
}
public static void sleep(long average, TimeUnit unit) {
String name = Thread.currentThread().getName();
long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
try {
unit.sleep(timeout);
System.out.println(name + " awoke.");
} catch (InterruptedException abort) {
Thread.currentThread().interrupt();
System.out.println(name + " interrupted.");
}
}
public static long exponential(long avg) {
return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)