前言
在异步编程中,各种回调将让人眼花缭乱,代码分散,维护起来十分困难。boost和C++11 的 future/promise 提供了一个很好的解决方案,使得代码更加漂亮、易维护。
在工作中,我也用过几次future/promise,但是还是十分生疏,所以决定学习下它的原理,用起来才更加顺畅。
查了很多资料,发现很多语言都有这个机制,但是关于C++的promise的资料却很少,只有一些使用的教程,而没有找到原理方面的。
“源码之前,了无秘密。”
所以还是决定从源码入手学习!
本文针对的源码是借鉴boost实现的版本,由于copyright的原因,就不贴出完整的源码了,需要学习的朋友可以参见boost的实现。
关于future/promise的简介,参见我之前写的一篇博文:
http://blog.csdn.net/jiange_zh/article/details/51602938
一、基于Future/Promise的异步同步化编程依赖的组件
- bind和callback,类似与boost库的bind和function;
- shared_ptr、scoped_ptr、tuple、exception,参考boost库中的shared_ptr、scoped_ptr、tuple、exception实现;
- Future和Promise,借鉴boost库的future设计思想;
- when_all,通过Future、Promise、tuple来实现,针对异步并行的同步化。
二、Function和Bind的用法
参见我之前写的一篇博文:
http://blog.csdn.net/jiange_zh/article/details/51598580
下面提到的bind类似于boost的bind,而Callback类似于Function,一般跟bind搭配使用。
三、shared_ptr、scoped_ptr、tuple
shared_ptr:引用计数。
scoped_ptr:不可转移所有权。
Tuple:很多的时候我们需要为函数返回多个值,我们可以使用class或struct来封装要返回的多个值,然后返回封装struct或class,但是使用这种方法的弊端就是增加的程序的代码量,最好是能通过一种匿名的struct或class来解决这个问题。Boost::tuple就为我们提供了一种类似于匿名struct的方法为我们解决函数的多个返回值的问题。既增强了代码的可读性又不增加代码量。其实std::pair就是boost::tuple的2个参数的特例,对boost::tuple你可以绑定更多的参数,或者你可以迭代实现无限多参数的情况。
四、EnableSharedFromThis
使用boost库时,经常会看到如下的类:
class A:public enable_share_from_this<A>
在什么情况下要使类A继承enable_share_from_this?
使用场合 :当类A被share_ptr管理,且在类A的成员函数里需要把当前类对象作为参数传给其他函数时,就需要传递一个指向自身的share_ptr。
我们就使类A继承enable_share_from_this,然后通过其成员函数 share_from_this()返回当指向自身的share_ptr。
以上有2个疑惑:
1.把当前类对象作为参数传给其他函数时,为什么要传递share_ptr呢?直接传递this指针不可以吗?
一个裸指针传递给调用者,谁也不知道调用者会干什么?假如调用者delete了该对象,而share_tr此时还指向该对象。
2.这样传递share_ptr可以吗?share_ptr< this >
这样会造成2个非共享的share_ptr指向一个对象,最后造成2次析构该对象。
boost官方文档中一个非常典型的例子:
http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html
部分代码:
1 class tcp_connection
2 : public boost::enable_shared_from_this<tcp_connection>
3 {
4 public:
5 typedef boost::shared_ptr<tcp_connection> pointer;
6
7 static pointer create(boost::asio::io_service& io_service)
8 {
9 return pointer(new tcp_connection(io_service));
10 }
11
12 tcp::socket& socket()
13 {
14 return socket_;
15 }
16
17 void start()
18 {
19 message_ = make_daytime_string();
20
21 boost::asio::async_write(socket_, boost::asio::buffer(message_),
22 boost::bind(&tcp_connection::handle_write, shared_from_this(),
23 boost::asio::placeholders::error,
24 boost::asio::placeholders::bytes_transferred));
25 }
26
27 private:
28 tcp_connection(boost::asio::io_service& io_service)
29 : socket_(io_service)
30 {
31 }
32
33 void handle_write(const boost::system::error_code& ,
34 size_t )
35 {
36 }
37
38 tcp::socket socket_;
39 std::string message_;
40 };
类tcp_connection继承enable_share_from_this,在22行里,它的成员函数start(),通过share_from_this返回指向自身的share_ptr。
五、Future和Promise
5.1 简介
Promise对象可保存T类型的值,该值可被future对象读取(可能在另一个线程中),这是promise提供的同步的一种手段。
在构造promise时,promise对象可以与共享状态关联起来,这个共享状态可以存储一个T类型或者一个由std::exception派生出的类的值,并可以通过get_future来获取与promise对象关联的对象,调用该函数之后,两个对象共享相同的共享状态(shared state)。
Promise对象是异步provider,它可以在某一时刻设置共享状态的值。
Future对象可以返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标识变为ready,然后才能获取共享状态的值。
5.2 关系图总览
5.3从使用入手学习
下面是使用future/promise的一个例子(目前仅讨论串行调用的情况):
taf::Int32
AServantImp::queryResultSerial(const std::string& sIn, std::string &sOut, taf::JceCurrentPtr current)
{
current->setResponse(false);
promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current);
f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
.then(promise::bind(&handleCRspAndReturnClient, current));
return 0;
}
promise::Future<std::string>
sendBReq(BServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)
{
promise::Promise<std::string> promise;
Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);
prx->async_queryResult(cb, sIn);
return promise.getFuture();
}
promise::Future<std::string>
sendCReq(CServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)
{
}
promise::Future<std::string>
handleBRspAndSendCReq(CServantPrx prx, JceCurrentPtr current, const promise::Future<std::string>& future)
{
std::string sResult("");
std::string sException("");
try
{
sResult = future.get();
return sendCReq(prx, sResult, current);
}
catch (exception& e)
{
TLOGDEBUG("Exception:" << e.what() << endl);
sException = e.what();
}
promise::Promise<std::string> promise;
promise.setValue(sException);
return promise.getFuture();
}
int
handleCRspAndReturnClient(JceCurrentPtr current, const promise::Future<std::string>& future)
{
int ret = 0;
std::string sResult("");
try
{
sResult = future.get();
}
catch (exception& e)
{
ret = -1;
sResult = e.what();
TLOGDEBUG("Exception:" << e.what() << endl);
}
AServant::async_response_queryResultSerial(current, ret, sResult);
return 0;
}
我们一步步看看发生了什么吧~
5.4定义并初始化一个Future类型的变量
promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current);
sendBReq通过promise.getFuture()返回了一个Future,用它来初始化f。
两个问题:
1.promise.getFuture()是怎么来的?
2.f是如何初始化的?
1.promise.getFuture()是怎么来的?
promise::Promise<std::string> promise;
Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);
prx->async_queryResult(cb, sIn);
return promise.getFuture();
promise内部有一个数据成员:
SharedPtr<detail::FutureObjectInterface<T> > m_future;
该成员的默认构造:
Promise()
: m_future(SharedPtr<detail::FutureObject<T> >(new detail::FutureObject<T>()))
{}
它使用了FutureObject来作为FutureObjectInterface的具体实现。
Promise的getFuture()方法用m_future构造了一个临时对象Future< T >(该构造函数为private,因此需要Future将Promise设为友元)并返回,因此promise.getFuture()临时对象中的m_future和promise中的m_future指向同一个FutureObject对象。
Future<T> getFuture() const
{
return Future<T> (m_future);
}
2.f是如何初始化的?
Future< T >继承自FutureBase< T >,继承的数据成员:
typedef SharedPtr<detail::FutureObjectInterface<T> > FuturePtr;
FuturePtr m_future;
我们的目的就是用promise.getFuture()(一个匿名的临时对象)的m_future来初始化f的m_future,使promise、promise.getFuture()和f的m_future均指向同一个对象,之后promise.getFuture()临时对象析构,只剩下promise和f的m_future指向同一个对象,有了这个共享,我们就可以在promise中进行赋值(在BServantCallback中调用setValue进行赋值),而在f中进行读取(通过f.get())!
Future< T >的3个public构造函数:
Future() {}
explicit Future(typename detail::FutureTraits<T>::rvalue_source_type t)
: detail::FutureBase<T>( SharedPtr<detail::PromptFutureObject<T> >
(new detail::PromptFutureObject<T>(t)) )
{}
Future(const ExceptionPtr& e)
: detail::FutureBase<T>(e)
{}
对于第二个,由于T为string,detail::FutureTraits< T >::rvalue_source_type实际上就是 const std::string&。
从上面看,并没有匹配的构造函数可用,且其父类也没有拷贝构造函数,因此编译器会进行成员逐个拷贝,最终将sendBReq中的m_future成员拷贝过来,该成员由shared_ptr进行管理,因此promise和f的m_future指向同一个对象的目的达到。
5.5绑定回调函数进行处理,处理完毕之后链式调用
当promise承诺的值设置好之后,需要回调函数进行处理。因此我们需要通过then来绑定回调函数。另外,为支持链式调用,then应该返回一个future,这个future一般是回调函数的返回值,在回调函数中通过promise.getFuture()来获取。
Future< T >从FutureBase< T >继承下来的成员函数:
get();
isDone();
hasValue();
hasException();
operator unspecified_bool_type() const;
以上函数都是转调用m_future的相关函数,因此整个future的细节封装在了FutureObject和PromptFutureObject当中,这里先不深究。
Future< T >另外自己实现了一个函数then:
/**
* Register a callback which will be called once the future is satisfied. If an
* exception is thrown the callback will not be registered and then will not be called.
*
* \throws std::bad_alloc if memory is unavailable.
*/
template <typename R>
Future<typename detail::resolved_type<R>::type>
then(const Callback<R(const Future&)>& callback) const
{
typedef typename detail::resolved_type<R>::type value_type;
if (!this->m_future)
{
throwException(FutureUninitializedException(__FILE__, __LINE__));
}
Promise<value_type> promise;
this->m_future->registerCallback(
bind(&detail::SequentialCallback<R, T>::template run<R>,
owned(new detail::SequentialCallback<R, T>(callback, promise))));
return promise.getFuture();
}
该函数接受一个Callback对象作为参数,Callback封装了一个函数,其返回值为R,参数为Future。
比如以下调用:
f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current));
其中handleBRspAndSendCReq的签名如下:
promise::Future<std::string> handleBRspAndSendCReq(CServantPrx prx, JceCurrentPtr current, const promise::Future<std::string>& future);
bind绑定了函数handleBRspAndSendCReq的前两个参数prx和current,剩下第三个参数future。
5.5.1 then的返回值类型
看完参数,我们来看看then的返回值类型:
Future<typename detail::resolved_type<R>::type>
在上面例子中,handleBRspAndSendCReq的返回类型为promise::Future< std::string >,它被用来具现化then的模板参数R。
为什么这里的返回值类型不直接使用R,而要通过resolved_type来决议呢?
我们先看一下resolved_type的定义:
template <typename T>
struct resolved_type
{
typedef T type;
};
template <typename T>
struct resolved_type<Future<T> >
{
typedef T type;
};
resolved_type< T >的type为T;
resolved_type< Future< T > >的type也为T。
无论是普通的T,还是Future< T >,通过resolved_type决议出的type成员都为T。
我们看以下另一个then的调用:
f.then(promise::bind(&handleCRspAndReturnClient, current));
其中handleCRspAndReturnClient的签名如下:
int handleCRspAndReturnClient(JceCurrentPtr current, const promise::Future<std::string>& future)
此时将用int来具现化模板参数R。
为了可进行链式调用,我们应该保证then返回的是一个Future,因此这时不能直接用R,而需要用Future< R >。如果R本身就是Future< T >,我们则可以通过resolved_type将T萃取出来。
5.5.2 then的函数体
首先明确下then的使命:
1.注册一个回调函数,来处理就绪的future;
2.将回调函数的返回值带回来,返回一个future给用户做链式调用。
函数体内的操作:
1.首先保证m_future已经初始化。
2.定义了一个Promise变量:
Promise<value_type> promise;
3.调用this->m_future->registerCallback()来注册我们传进来的callback函数,这里使用了bind、SequentialCallback进行了包装:
this->m_future->registerCallback(
bind(&detail::SequentialCallback<R, T>::template run<R>,
owned(new detail::SequentialCallback<R, T>(callback, promise))));
4.返回promise的future:
return promise.getFuture();
具体的细节这里暂不探讨,目前只需要明白,这里的promise承诺返回一个Future< value_type >而value_type是跟函数callback(比如handleBRspAndSendCReq)的返回值息息相关的(比如handleBRspAndSendCReq返回一个Future< std::string >,其value_type为string,handleCRspAndReturnClient返回一个int,其value_type为int)。
Promise和SequentialCallback的作用就是把callback的返回值给带回来,最终返回给用户来做链式调用。
2016/9/15更新:
带回callback的返回值
现在简单说下在then中,如何将callback的返回值带回来并生成一个future返回:
Promise<value_type> promise;
this->m_future->registerCallback(bind(
&detail::SequentialCallback<R, T>::template run<R>,
owned(new detail::SequentialCallback<R, T>(callback, promise))));
return promise.getFuture();
这里多引入了一层promise,该promise**承诺带回函数callback()的返回值**(如果该返回值是一个future,则还是带回一个future,如果不是future,比如int,则带回一个future< int >)。
可以看到,bind绑定的是SequentialCallback的成员函数run(template表示这是一个模板)。
通过第二个参数的SequentialCallback对象进行调用,该对象封装了callback和一个promise。
当上一层的future就绪时,会调用回调函数,此时调用的是SequentialCallback 的run函数,而不是真正的callback。在run函数中,再调用真正的callback进行处理,并将callback的返回值设置到SequentialCallback对象的promise当中,而then返回的正是这个promise关联的future。
因此,通过一层间接的future/promise,then成功地返回了callback的返回值的一个future。
Run有多个重载版本:
template <typename U>
typename enable_if<is_void<U> >::type run(const FuturePtr& future)
{
try
{
m_callback(future);
m_promise.set();
}
catch (...)
{
m_promise.setException(currentException());
}
}
当callback返回值为void时,无需返回值,所以promise调用空的set。
template <typename U>
typename enable_if_c<!is_void<U>::value && !is_future_type<U>::value>::type
run(const FuturePtr& future)
{
try
{
m_promise.setValue(m_callback(future));
}
catch (...)
{
m_promise.setException(currentException());
}
}
当callback返回值为非void且非future时,调用m_callback并将返回值设置到promise中,于是该值可以通过promise.getFuture().get()来获取。
template <typename U>
typename enable_if<is_future_type<U> >::type run(const FuturePtr& future)
{
try
{
m_callback(future).then(
bind(&ForwardValue<value_type>::template run<value_type>,
owned(new ForwardValue<value_type>(m_promise))));
}
catch (...)
{
m_promise.setException(currentException());
}
}
当callback返回值为future时,则对该future使用then绑定到ForwardValue的run函数当中(类似的手法),run函数中再通过get()方法把future内的值取出来,并设置到then中定义的promise当中,于是then的返回值便以一个future的形式存放着callback的返回值。
至于如何对类型进行判断,当然是利器traits技法,这里不再展开。
5.5.3 then函数总结
我们重新看一下下面这个例子,理清这两句代码包含的内容:
promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current);
f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
.then(promise::bind(&handleCRspAndReturnClient, current));
首先,我们在sendBReq中定义了一个Promise,并把它绑定到BServantCallback中。
当异步调用回包时,将回调BServantCallback,在里面调用Promise的setValue进行赋值。
赋值完毕之后,将调用通过then绑定的回调handleBRspAndSendCReq来处理。
由于我们通过promise.getFuture()使得f和promise的m_future**指向了同一个对象**,所以我们在回调handleBRspAndSendCReq中可以通过f.get()来读取该值。
f.get()只是完成了handleBRsp部分,在SendCReq的时候,类似于sendBReq,我又定义了一个Promise,我们需要把与之关联的future(通过promise.getFuture()获取)作为handleBRspAndSendCReq的返回值,并通过then中的Promise和SequentialCallback将这个future返回给用户,从而用户可以继续调用then来指定handle。
总结:then帮我们把handle回调函数注册到future当中,当future可读时,将调用该handle进行处理,then还为我们把handle的返回值带回来,以供链式调用。
5.6 future和promise的关系梳理
两者都有一个m_future成员,其类型为
SharedPtr<detail::FutureObjectInterface<T> >
由于为Future< T >需要针对void进行特化,为避免过多重复的代码,把与特化无关的部分抽离出来形成FutureBase作为基类。
从上图可以看出,Future和Promise均持有m_future,两者正是通过该对象进行共享、关联的(通过promise.getFuture()实现)。其中Promise对外提供了对m_future的set(写)接口,而Future对外提供了m_future的get(读)接口。
5.7 FutureObjectInterface的实现
下图展示了FutureObjectInterface的具体实现:
可以看到,FutureObjectInterface有FutureObject和PromptFutureObject两种实现。
Promise的FutureObjectInterface 是一个FutureObject。
Future的FutureObjectInterface有两种情况:直接用一个值来构造Future时(比如调用makeFuture来获取一个future)用的是PromptFutureObject,而其他情况(比如通过Promise获得的future)用的是FutureObject。
那么,两者有何区别呢?
对于第一个应用场景,future不是通过promise来获取的,而是直接用一个立即数构造:
explicit Future(typename detail::FutureTraits<T>::rvalue_source_type t)
: detail::FutureBase<T>(SharedPtr<detail::PromptFutureObject<T> >
(new detail::PromptFutureObject<T>(t)))
{}
比如下面这个应用场景:
Future< int > hsF = makeFuture<int>(-1); //使用立即数构造
if (openSwitch)
{
// sendBReq中进行异步调用,并通过promise.getFuture()返回一个future
hsF = sendBReq();
}
// …
在handle中,我们可以通过以下判断来决定是否处理:
int result = hsF.get();
if (result != -1)
{
}
立即数构造这种用法,由于其值已经设定了,不需要等待promise填值进去,因此该future内部的PromptFutureObject是只读的,也就不需要加锁。如果还是使用FutureObject这个版本,将会在加锁解锁上做无用功。因此PromptFutureObject是针对这种场景进行优化的。
而当Future 与Promise共享同一个m_future时,由于Future和Promise可能在不同线程中,因此可能同时读写,这里存在race condition,因此需要加锁。FutureObject正是一个加锁的版本。
关于FutureObject,有几个需要注意的点:
1.在一开始介绍的时候我们说过,Future可以获取共享状态的值(通过get()方法),在必要的情况下阻塞调用者并等待共享状态标识变为ready,然后才能获取共享状态的值。
2.setValue只能被调用一次,即共享状态的值只能设置一次,如果试图设置第二次,将抛出异常。
3.在registerCallback时,根据m_is_done来判断是否已经setValue,如果m_is_done为true,则直接调用callback(this->sharedFromThis())来处理,否则将callback加入m_pending_callbacks列表中,等待setValue之后调用。在setValue中,除了设置值之外,还会调用doPendingCallbacks()函数,在该函数中逐个调用m_pending_callbacks列表中的callback。
最后,关于when_all的实现(并行异步调用),后续有时间再补充~
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)