当您使用阻塞来获取结果时,您只会遇到超时行为Future
。如果你想使用非阻塞回调onComplete
, onSuccess
or onFailure
,那么你就必须自己进行超时处理。 Akka 内置了请求/响应的超时处理(?
)参与者之间的消息传递,但不确定您是否要开始使用 Akka。 FWIW,在Akka中,为了超时处理,他们组成了两个Futures
一起通过Future.firstCompletedOf
,一个代表实际的异步任务,一个代表超时。如果超时计时器(通过HashedWheelTimer
) 首先弹出,异步回调失败。
自己动手的一个非常简单的例子可能是这样的。首先,一个用于调度超时的对象:
import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException
object TimeoutScheduler{
val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
def scheduleTimeout(promise:Promise[_], after:Duration) = {
timer.newTimeout(new TimerTask{
def run(timeout:Timeout){
promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))
}
}, after.toNanos, TimeUnit.NANOSECONDS)
}
}
然后是一个接受 Future 并向其添加超时行为的函数:
import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration
def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
val prom = Promise[T]()
val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
fut onComplete{case result => timeout.cancel()}
combinedFut
}
请注意,HashedWheelTimer
我这里使用的是来自Netty的。