RPC框架的异步处理

2023-11-13

RPC异步调用(以tars rpc框架为示例说明)

引入工作线程池和io收发线程池将工作线程和io收发线程两者的同步关系解除。RPC中的上下文十分重要,因为请求包的发送,响应包的callback回调不在同一个工作线程中完成,需要一个context来记录一个请求的上下文,把请求-响应-回调等一些信息匹配起来。通过rpc框架的内部请求id作为key,来保存调用开始时间time,超时时间timeout,回调函数callback,超时回调timeout_callback等信息。注意:请求id由client端服务调用时生成,会序列化成字节流发送给server端,server端会返回该请求id。
在这里插入图片描述

servantProxy和Objproxyd的关系

ObjectProxy:一个网络线程上的某个服务实体A;ServantProxy:RPC服务句柄,所有网络线程上的某ObjectProxy(服务实体)的总代理; ObjectProxy类是一个服务实体,注意与ServantProxy类是一个服务代理相区别,前者表示一个网络线程上的某个服务实体A,后者表示对所有网络线程上的某服务实体A的总代理。

ServantProxy内含多个服务实体ObjectProxy,能够帮助用户在同一个服务代理内进行负载均衡。ObjectProxy对象的个数,其个数由客户端的网络线程数决定,每个网络线程有一个ObjectProxy。

举例有一个Demo.StringServer.StringServantObj的服务,提供一个RPC接口是append,传入两个string类型的变量,返回两个string类型变量的拼接结果。而且假设有两台服务器,socket标识分别是192.112.112.112:112与192.112.112.113:113,设置客户端的网络线程数为3,那么执行如下代码:

Communicator _comm;
StringServantPrx _proxy;
_comm.stringToProxy("Demo.StringServer.StringServantObj@tcp -h 192.112.112.113 -p 113:tcp -h 192.112.112.112 -p 112", _proxy);

在这里插入图片描述

//communicatorepoll客户端
class CommunicatorEpoll : public TC_Thread ,public TC_ThreadRecMutex{
	 /* * ObjectProxy的工厂类 */
    ObjectProxyFactory *   _objectProxyFactory; //用于创建objproxy
    /* * 异步线程数组*/
    AsyncProcThread *      _asyncThread[MAX_CLIENT_ASYNCTHREAD_NUM];

    /* * 异步线程数目  */
    size_t                 _asyncThreadNum;
	/* * 分发给异步线程的索引seq*/
    size_t                 _asyncSeq;
    /** 网络线程的id号*/
    size_t                 _netThreadSeq;
};
ObjectProxy * CommunicatorEpoll::getObjectProxy(const string & sObjectProxyName,const string& setName)
{
    return _objectProxyFactory->getObjectProxy(sObjectProxyName,setName);
}
---
class ServantProxy : public TC_HandleBase, public TC_ThreadMutex
{
	/** * 通信器 */
    Communicator *            _communicator;
    /** * 保存ObjectProxy对象的指针数组 */
    ObjectProxy **            _objectProxy;
}


//servant初始化
ServantPrx::element_type* ServantProxyFactory::getServantProxy(const string& name,const string& setName)
{
    TC_LockT<TC_ThreadRecMutex> lock(*this);
    string tmpObjName = name + ":" + setName;
    map<string, ServantPrx>::iterator it = _servantProxy.find(tmpObjName);
    if(it != _servantProxy.end())
    {
        return it->second.get();
    }
    ObjectProxy ** ppObjectProxy = new ObjectProxy * [_comm->getClientThreadNum()];
    assert(ppObjectProxy != NULL);
    for(size_t i = 0; i < _comm->getClientThreadNum(); ++i)
    {
        ppObjectProxy[i] = _comm->getCommunicatorEpoll(i)->getObjectProxy(name, setName);
    }
    ServantPrx sp = new ServantProxy(_comm, ppObjectProxy, _comm->getClientThreadNum());
	...
    _servantProxy[tmpObjName] = sp;
    return sp.get();
}

在这里插入图片描述

1、在每一个网络线程CommunicatorEpoll的初始化过程中,会创建_asyncThreadNum条异步线程,等待异步调用的时候处理响应数据。

CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThreadSeq)
{
 ......
   //异步线程数
    _asyncThreadNum = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncthread", "3"));

    if(_asyncThreadNum == 0)
    {
        _asyncThreadNum = 3;
    }

    if(_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM)
    {
        _asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM;
    }
 ......
    //异步队列的大小
    size_t iAsyncQueueCap = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncqueuecap", "10000"));
    if(iAsyncQueueCap < 10000)
    {
        iAsyncQueueCap = 10000;
    }
 ......
    //创建异步线程
    for(size_t i = 0; i < _asyncThreadNum; ++i)
    {
        _asyncThread[i] = new AsyncProcThread(iAsyncQueueCap);
        _asyncThread[i]->start();
    }
 ......
}

2、tars2cpp的文件中定义了回调函数基类,要继承回调函数基类实现自己的回调函数。

class ConfigAdminPrxCallback: public tars::ServantProxyCallback
{
    public:
        virtual ~ConfigAdminPrxCallback(){}
        virtual void callback_AddConfig(tars::Int32 ret,  const std::string& result)
        { throw std::runtime_error("callback_AddConfig() override incorrect."); }
        virtual void callback_AddConfig_exception(tars::Int32 ret)
        { throw std::runtime_error("callback_AddConfig_exception() override incorrect."); }
 }

3、客户端桩函数代理调用异步请求函数,并传入实现的回调函数的类指针。

class ConfigAdminProxy : public tars::ServantProxy
    {
    public:
        //同步调用
        tars::Int32 AddConfig(const tars::AddConfigInfo & config,std::string &result,const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseContext = NULL)
        {}
		//异步调用
        void async_AddConfig(ConfigAdminPrxCallbackPtr callback,const tars::AddConfigInfo &config,const map<string, string>& context = TARS_CONTEXT())
        {}
};

4、客户端调用。 ServantProxy::invoke中进行负载均衡调用,并且实例化此次请求msg包,实例化该请求的objproxy。CommunicatorEpoll会中的CommunicatorEpoll::handle会调用msg->obj->invoke进行请求的发送。每个客户端线程中含有跟客户端网络线程通信的队列 ReqInfoQueue * _reqQueue[MAX_CLIENT_THREAD_NUM]; //队列数组 。请求包在invoke中push到相应的队列中,comunicatorEpoll解包出msg请求体进行数据传输。

_reqNo:每个主调线程会被分配一个唯一的空闲序列号,这个序列号对应到网络线程的epoll事件数据通知fd。SeqManager类负责分配该序列号。

void ServantProxy::invoke(ReqMessage * msg, bool bCoroAsync)
{
   ...
    ObjectProxy * pObjProxy = NULL;
    ReqInfoQueue * pReqQ    = NULL;
    //选择网络线程
    selectNetThreadInfo(pSptd,pObjProxy,pReqQ);
    //调用发起时间
    msg->iBeginTime   = TNOWMS;
    msg->pObjectProxy = pObjProxy;//实例化msg请求包中的obj实例
  ...
    //通知网络线程
    bool bEmpty = false;
    bool bSync  = (msg->eType == ReqMessage::SYNC_CALL);
    if(!pReqQ->push_back(msg,bEmpty))
    {
    ...
        pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);

        throw TarsClientQueueException("client queue full");
    }
    pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
 ...
}

5、客户端负载均衡: 每个客户端调用线程跟客户端的网络线程池通信的队列 ,在业务客户端根据负载均衡选择网络线程池中的网络线程进行传输数据。每条caller线程与每条客户端网络线程CommunicatorEpoll进行信息交互的桥梁——通信队列ReqInfoQueue数组,数组中的每个ReqInfoQueue元素负责与一条网络线程进行交互。数组里面的ReqInfoQueue元素便是该数组对应的caller线程与两条网络线程的通信桥梁,一条网络线程对应着数组里面的一个元素,通过网络线程ID进行数组索引。整个关系有点像生产者消费者模型,生产者Caller线程向自己的线程私有数据ReqInfoQueue[]中的第N个元素ReqInfoQueue[N] push请求包,消费者客户端第N个网络线程就会从这个队列中pop请求包。

在这里插入图片描述

(1)第一层负载均衡:轮询选择ObjectProxy(CommunicatorEpoll)和与之相对应的ReqInfoQueue

ServantProxy::invoke中进行负载均衡调用,并且实例化此次请求msg包,实例化该请求的objproxy。CommunicatorEpoll中的CommunicatorEpoll::handle会调用obj->invoke进行请求的发送。

//第一层:选取一个网络线程对应的信息
void ServantProxy::selectNetThreadInfo(ServantProxyThreadData * pSptd, ObjectProxy * & pObjProxy, ReqInfoQueue * & pReqQ)
{
    //指针为空 就new一个
    if(!pSptd->_queueInit)
    {
        for(size_t i=0;i<_objectProxyNum;++i)
        {
            pSptd->_reqQueue[i] = new ReqInfoQueue(_queueSize);
        }
        pSptd->_objectProxyNum = _objectProxyNum;
        pSptd->_objectProxy    = _objectProxy;
        pSptd->_queueInit      = true;
    }
    if(_objectProxyNum == 1)
    {
        pObjProxy = *_objectProxy;
        pReqQ     = pSptd->_reqQueue[0];
    }
    else
    {
        if(pSptd->_netThreadSeq >= 0)
        {
            //网络线程发起的请求
            assert(pSptd->_netThreadSeq < static_cast<int>(_objectProxyNum));
            pObjProxy = *(_objectProxy + pSptd->_netThreadSeq);
            pReqQ     = pSptd->_reqQueue[pSptd->_netThreadSeq];
        }
        else
        {
            //用线程的私有数据来保存选到的seq
            pObjProxy = *(_objectProxy + pSptd->_netSeq);//选取obj
            pReqQ     = pSptd->_reqQueue[pSptd->_netSeq];
            pSptd->_netSeq++;

            if(pSptd->_netSeq == _objectProxyNum)
                pSptd->_netSeq = 0;
        }
    }
}

(2)第二层负载均衡通过EndpointManager选择AdapterProxy,负载均衡算法(Hash、权重、轮询)

objproxy中的invoke调用selectAdapterProxy。

bool EndpointManager::selectAdapterProxy(ReqMessage * msg,AdapterProxy * & pAdapterProxy)
{
    pAdapterProxy = NULL;
    //刷新主控
    refreshReg(E_DEFAULT,"");

    //无效的数据 返回true
    if(!_valid)
    {
        return true;
    }

    //如果有hash,则先使用hash策略
    if (msg->bHash)
    {
        pAdapterProxy = getHashProxy(msg->iHashCode, msg->bConHash);

        return false;
    }
    
    if(_weightType == E_STATIC_WEIGHT)
    {
        //权重模式
        bool bStaticWeighted = false;
        if(_weightType == E_STATIC_WEIGHT || msg->eType == ReqMessage::ONE_WAY)
            bStaticWeighted = true;

        pAdapterProxy = getWeightedProxy(bStaticWeighted);
    }
    else
    {
        //普通轮询模式
        pAdapterProxy = getNextValidProxy();
    }

    return false;
}

6、同步调用的话,创建完条件变量,通知网络线程进行请求包的发送。

//通知网络线程
    bool bEmpty = false;
    bool bSync  = (msg->eType == ReqMessage::SYNC_CALL);

    if(!pReqQ->push_back(msg,bEmpty))
    {
        TLOGERROR("[TARS][ServantProxy::invoke msgQueue push_back error num:" << pSptd->_netSeq << "]" << endl);

        delete msg;
        msg = NULL;

        pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);

        throw TarsClientQueueException("client queue full");
    }

    pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);

7、数据在客户端epoll中监听发送。 在CommunicatorEpoll::notify()中,caller线程往请求事件通知数组NotifyInfo _notify[]中添加请求事件,通知CommunicatorEpoll进行请求包的发送。注意了,这个函数的作用仅仅是通知网络线程准备发送数据,通过TC_Epoller::mod()或者TC_Epoller::add()触发一个EPOLLIN事件,从而促使阻塞在TC_Epoller::wait()(在CommunicatorEpoll::run()中阻塞)的网络线程CommunicatorEpoll被唤醒,并设置唤醒后的epoll_event中的联合体epoll_data变量为&_notify[iSeq].stFDInfo。
_iSeq为每个主调线程会被分配一个唯一的序列号,这个序列号对应到网络线程的epoll事件数据列表。SeqManager类负责分配该序列号。

void CommunicatorEpoll::notify(size_t iSeq,ReqInfoQueue * msgQueue)
{
    assert(iSeq < MAX_CLIENT_NOTIFYEVENT_NUM);

    if(_notify[iSeq].bValid)
    {
        _ep.mod(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
        assert(_notify[iSeq].stFDInfo.p == (void*)msgQueue);
    }
    else
    {
        _notify[iSeq].stFDInfo.iType   = FDInfo::ET_C_NOTIFY;
        _notify[iSeq].stFDInfo.p       = (void*)msgQueue;
        _notify[iSeq].stFDInfo.fd      = _notify[iSeq].eventFd;
        _notify[iSeq].stFDInfo.iSeq    = iSeq;
        _notify[iSeq].notify.createSocket();
        _notify[iSeq].bValid           = true;
		//主调线程占用第iSeq个通知事件fd, 用于向communicatorEpoll注册发送事件。
        _ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
    }
}

8、adapterProxy发送数据
Client发起请求时,如果发送缓冲区没有数据,就直接从连接发送出去,如果发送缓冲区有数据,则将发送请求先放入超时队列,网络线程从超时队列拉取请求进行发送。每个AdapterProxy有一个 超时队列 _timeoutQueue,存储了原始的msg结构体信息(含同步异步调用等信息)。**

 //交给连接发送数据,连接连上,buffer不为空,直接发送数据成功
    if(_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData.c_str(),msg->sReqData.size()) != Transceiver::eRetError)
    {
        //请求发送成功了,单向调用直接返回
        if(msg->eType == ReqMessage::ONE_WAY)
        {
       ...
            return 0;
        }

        bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
      ...
    }
    else
    {
 //请求发送失败了,放入超时队列等待重发
        bool bFlag = _timeoutQueue->push(msg,msg->request.iRequestId, msg->request.iTimeout+msg->iBeginTime, false);
       ...
        }
    }

adapterproxy调用transceiver向sever发送数据,并在communicatorEpoll中注册客户端socket,监听可读可写事件。

void Transceiver::connect()
{
    if(isValid())  {  return; }
    if(_connStatus == eConnecting || _connStatus == eConnected) {return;}
    int fd = -1;
    if (_ep.type() == EndpointInfo::UDP)
    {
        fd = NetworkUtil::createSocket(true, false, _ep.isIPv6());
        NetworkUtil::setBlock(fd, false);
        _connStatus = eConnected;
    }
    else
    {
        fd = NetworkUtil::createSocket(false, false, _ep.isIPv6());
        NetworkUtil::setBlock(fd, false);

        socklen_t len = _ep.isIPv6() ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
        bool bConnected = NetworkUtil::doConnect(fd, _ep.addrPtr(), len);
        if(bConnected)
        {
            setConnected();
        }
        else
        {
            _connStatus     = Transceiver::eConnecting;
            _conTimeoutTime = TNOWMS + _adapterProxy->getConTimeout();
        }
    }
    _fd = fd;
...
    //设置套接口选项
    vector<SocketOpt> &socketOpts = _adapterProxy->getObjProxy()->getSocketOpt();
    for(size_t i=0; i<socketOpts.size(); ++i)
    {
        if(setsockopt(_fd,socketOpts[i].level,socketOpts[i].optname,socketOpts[i].optval,socketOpts[i].optlen) == -1)
        {
            ...
    }

    _adapterProxy->getObjProxy()->getCommunicatorEpoll()->addFd(fd, &_fdInfo, EPOLLIN|EPOLLOUT);
}

9、发送和响应的数据包中含有请求ID字段,iRequestId是一个自增的id,用来关联请求和响应包,另外ReqMessage结构体中,有调用的类型字段用于区分异步同步等。

struct ReqMessage : public TC_HandleBase
{
    //调用类型
    enum CallType
    {
        SYNC_CALL = 1, //同步
        ASYNC_CALL,    //异步
        ONE_WAY,       //单向
        THREAD_EXIT    //线程退出的标识
    };
}



//请求包体
    struct RequestPacket
    {
        1  require short        iVersion;		//版本号
        2  require byte         cPacketType;	//包类型
        3  require int          iMessageType;//消息类型
        4  require int          iRequestId;	//请求ID
        5  require string       sServantName;	//servant名字
        6  require string       sFuncName;	//函数名称
        7  require vector<byte> sBuffer;		//二进制buffer
        8  require int          iTimeout;		//超时时间(毫秒)
        9  require map<string, string> context;	//业务上下文
        10 require map<string, string> status; 	//框架协议上下文
    };

    struct ResponsePacket
    {
        1 require short         iVersion;		//版本号
        2 require byte          cPacketType;	//包类型
        3 require int           iRequestId;		//请求ID
        4 require int           iMessageType;	//消息类型
        5 require int           iRet;			//返回值
        6 require vector<byte>  sBuffer;		//二进制流
        7 require map<string, string> status; 	//协议上下文
        8 optional string        sResultDesc;   //描述
        9 optional map<string, string> context;   //业务上下文
    };
    

10、客户端接收服务端发来的信息包,并根据响应包的请求ID(iRequestId),从超时队列中取出原发送的msg结构体,查验调用类型等。

void AdapterProxy::finishInvoke(ResponsePacket & rsp)
{
    ReqMessage * msg = NULL;

    //requestid 为0 是push消息
    if(rsp.iRequestId == 0)
    {
        msg               = new ReqMessage();
        msg->eStatus      = ReqMessage::REQ_RSP;
        msg->eType        = ReqMessage::ASYNC_CALL;
        msg->bFromRpc     = true;
        msg->bPush        = true;
        msg->proxy        = _objectProxy->getServantProxy();
        msg->pObjectProxy = _objectProxy;
        msg->adapter      = this;
        msg->callback     = _objectProxy->getPushCallback();
    }
    else
    {
        //这里的队列中的发送链表中的数据可能已经在timeout的时候删除了
        bool retErase = _timeoutQueue->erase(rsp.iRequestId, msg);

        //找不到此请求id信息
        if (!retErase)
        {
            if(_timeoutLogFlag)
            {
                TLOGERROR("[TARS][AdapterProxy::finishInvoke(ResponsePacket) objname:"<< _objectProxy->name() << ",get req-ptr NULL,may be timeout,id:" << rsp.iRequestId 
                    << ",desc:" << _endpoint.desc() << endl);
            }
            return ;
        }

        assert(msg->eStatus == ReqMessage::REQ_REQ);

        msg->eStatus = ReqMessage::REQ_RSP;
    }

    msg->response = rsp;//完善msg的响应包

    finishInvoke(msg);
}

11、在函数AdapterProxy::finishInvoke(ReqMessage)中,程序在CommunicatorEpoll::pushAsyncThreadQueue()中,通过轮询的方式选择异步回调处理线程处理接收到的响应包,每个异步处理线程有一个任务处理队列,通过以下代码将信息包msg(带响应信息)放到异步回调处理线程中的队列中。异步处理线程数默认是3,最大是1024。

//异步回调,放入回调处理线程中
_objectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);

void CommunicatorEpoll::pushAsyncThreadQueue(ReqMessage * msg)
{
    //轮询的方式选择异步处理线程。
    _asyncThread[_asyncSeq]->push_back(msg);
    _asyncSeq ++;

    if(_asyncSeq == _asyncThreadNum)
    {
        _asyncSeq = 0;
    }
}

12、选取之后,通过AsyncProcThread::push_back(),将msg包放在响应包队列AsyncProcThread::_msgQueue中,然后通过AsyncProcThread:: notify()函数通知本异步回调处理线程进行处理,AsyncProcThread:: notify()函数可以令阻塞在AsyncProcThread:: run()中的AsyncProcThread::timedWait()的异步处理线程被唤醒。在AsyncProcThread::run()中,主要执行下面的程序进行函数回调:

if (_msgQueue->pop_front(msg))
{
 ......

    try
    {
        ReqMessagePtr msgPtr = msg;
        msg->callback->onDispatch(msgPtr);
    }
    catch (exception& e)
    {
        TLOGERROR("[TARS][AsyncProcThread exception]:" << e.what() << endl);
    }
    catch (...)
    {
        TLOGERROR("[TARS][AsyncProcThread exception.]" << endl);
    }
}

13、通过msg->callback,程序可以调用回调函数基类ServantPrxCallback里面的onDispatch()函数。在ServantPrxCallback:: onDispatch()中,分析此次响应所对应的RPC方法名,获取响应结果,并通过动态多态,执行用户所定义好的派生类的虚函数。通过ReqMessagePtr的引用计数,还可以将ReqNessage msg删除掉,与同步调用不同,同步调用的msg的新建与删除都在caller线程中,而异步调用的msg在caller线程上构造,在异步回调处理线程中析构。

virtual int onDispatch(tars::ReqMessagePtr msg)
        {
            static ::std::string __ConfigAdmin_all[]=
            {
                "AddConfig",
                 ...
            };
            pair<string*, string*> r = equal_range(__ConfigAdmin_all, __ConfigAdmin_all+14, string(msg->request.sFuncName));
            if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
            switch(r.first - __ConfigAdmin_all)
            {
                case 0:
                {
                    if (msg->response.iRet != tars::TARSSERVERSUCCESS)
                    {
                        callback_AddConfig_exception(msg->response.iRet);

                        return msg->response.iRet;
                    }
                    tars::TarsInputStream<tars::BufferReader> _is;

                    _is.setBuffer(msg->response.sBuffer);
                    tars::Int32 _ret;
                    _is.read(_ret, 0, true);

                    std::string result;
                    _is.read(result, 2, true);
                    CallbackThreadData * pCbtd = CallbackThreadData::getData();
                    assert(pCbtd != NULL);

                    pCbtd->setResponseContext(msg->response.context);

                    callback_AddConfig(_ret, result);

                    pCbtd->delResponseContext();

                    return tars::TARSSERVERSUCCESS;

        }
  }

二、异步并发模式 promise和future

Future & Promise 模式,属于一种实现异步调用的并发模式。 普通的异步调用是基于回调的异步,当服务规模和业务逻辑比较简单的时候,基于回调的异步调用能简单就搞定了,但是随着业务服务的规模扩大,业务逻辑慢慢变得复杂,一些需要进行多次异步串行调用和异步并行调用的业务需求慢慢显现出来,虽然基于回调的异步调用能够满足这些需求,但是使得异步调用的逻辑流程逻辑分散在不同地方,这点造成了对业务开发提出了很大的挑战,编码代码十分不便,代码逻辑难于理解和维护。代码采用同步模式编写异步的业务逻辑。

Future表示一个调用结果,而这个结果可能不会立即给出,代表了未来某个时刻会得到结果,而这个结果是由Promise来保证的,可以通过get()得到这个结果,其成员函数then,表示注册一个对get()得到的结果进行处理的函数。Promise表示给调用其成员函数getFuture()的Future的一个承诺,通过setValue设置承诺的结果。

异步串行化:

//服务对外接口,串行调用
taf::Int32 
AServantImp::queryResultSerial(const std::string& sIn, std::string &sOut, taf::JceCurrentPtr current)
{
    //设置异步回包
    current->setResponse(false);

    // 向服务B发送异步请求,返回值的类型是
    // promise::Future<std::string>,
    // 意思就是服务B未来会返回一个string类型的数据
    promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current);

    // f调用其成员函数then,给未来要到达的string类型的
    // 返回结果设置一个处理函数
    // 在handleBRspAndSendCReq中获取返回结果,
    // 并return sendCReq(),即f2,然后f2通过链式法则调用then
    f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
    .then(promise::bind(&handleCRspAndReturnClient, current));

    return 0;
}
promise::Future<std::string> sendBReq(BServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)

{

//定义一个promise::Promise<std::string>类型的变量promise,

//其目的是承诺会在promise里面存放一个string类型的数据,

//然后把这个变量传到BServantCallback对象中,

//然后发起异步调用

//最后返回promise.getFuture(),意思是promise承诺的string类型数据

//可以通过promise::Future<std::string>类型的promise.getFuture()来获得

       promise::Promise<std::string> promise;

 

//其实这个的current可以不用传递给BServantCallback,当然传也没有影响

    Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);

 

       prx->async_queryResult(cb, sIn);

 

    return promise.getFuture();

}

异步并行化:

taf::Int32 AServantImp::queryResultParallel(const std::string& sIn, std::string &sOut, taf::JceCurrentPtr current)

{
       current->setResponse(false);
       promise::Future<std::string> f1 = sendBReq(_pPrxB, sIn, current);
       promise::Future<std::string> f2 = sendCReq(_pPrxC, sIn, current); 

       promise::Future<promise::Tuple<promise::Future<std::string>, promise::Future<std::string> > > f_all = promise::whenAll(f1, f2);

       f_all.then(promise::bind(&handleBCRspAndReturnClient, current));
       return 0;

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RPC框架的异步处理 的相关文章

  • Redis持久化的原理及优化

    更多内容 欢迎关注微信公众号 全菜工程师小辉 Redis提供了将数据定期自动持久化至硬盘的能力 包括RDB和AOF两种方案 两种方案分别有其长处和短板 可以配合起来同时运行 确保数据的稳定性 RDB 保存数据快照至一个RDB文件中 用于持久
  • An HTTP error occurred when trying to retrieve this URL.

    问题背景 conda install xxx 提示 An HTTP error occurred when trying to retrieve this URL Fetching package metadata CondaHTTPErr
  • 【Leetcode】863. 二叉树中所有距离为 K 的结点

    题目描述 题解 用map记录每个结点的父结点 然后让dfs从target结点开始 假设target就是根结点 然后递归时纪录深度 只要深度等于k 就是和target的距离等于k 就可以存入list 执行用时 14 ms 在所有 Java 提
  • LeetCode-1304. Find N Unique Integers Sum up to Zero

    Given an integer n return any array containing n unique integers such that they add up to 0 Example 1 Input n 5 Output 7
  • 毕业遭失业,前途一片黑暗...不得已转行软件测试,太多心酸和无助...

    大家好 我叫小涵 一名应届毕业生 目前已经成功转行互联网 写这篇文章的目的是因为很多人不喜欢自己的现状 想通过学习改变 奈何没有出路 所以想为这部分人提供一些思路 其次文章会总结我自己转行前后的经历和思考 提供一些我亲测有效的面试资源 找不
  • 解决Vmware Unbuntu 22虚拟机网络故障问题

    上午启动Vmware Unbuntu 22虚拟机 发现不能上网 屏幕右上侧的网络图标没有显示 怀疑是昨天在虚拟机上做路由器功能设置的时候某个操作产生的问题 于是进行排障 先尝试重启网络服务 service NetworkManager re

随机推荐

  • elasticsearch安装 及 启动异常解决

    虚拟机使用net连接模式 1 Download and unzip the latest Elasticsearch distribution 2 Run bin elasticsearch on Unix or bin elasticse
  • 第14课 右值引用(1)_基本概念

    1 左值和右值 1 两者区别 左值 能对表达式取地址 或具名对象 变量 一般指表达式结束后依然存在的持久对象 右值 不能对表达式取地址 或匿名对象 一般指表达式结束就不再存在的临时对象 2 右值的分类 将亡值 xvalue eXpiring
  • 数据中台-让数据用起来-7

    文章目录 第七章 数据体系建设 7 1 数体系规划 7 2 贴源数据层建设 全域数据统一存储 7 2 1 相关概念 7 2 2 贴源数据表设计 7 2 3 贴源数据表实现 7 3统一数仓层建设 标准化的数据底座 7 3 1 相关概念 7 3
  • Java高阶面试问答-通用基础

    线程和进程区别 进程是资源分配的最小单位 线程是程序执行的最小单位 进程有自己的独立地址空间 每启动一个进程 系统就会为它分配地址空间 建立数据表来维护代码段 堆栈段和数据段 线程是共享进程的数据空间 因此CPU切换一个线程的花费远比进程要
  • LeetCode-1775. 通过最少操作次数使数组的和相等【贪心,数组,计数】

    LeetCode 1775 通过最少操作次数使数组的和相等 贪心 数组 计数 题目描述 解题思路一 让sum1
  • kubernetes pv回收策略

    本文最近更新于2021 9 11 kubernetes pv回收策略 当用户不再使用其存储卷时 他们可以从 API 中将 PVC 对象删除 从而允许该资源被回收再利用 PersistentVolume 对象的回收策略告诉集群 当其被从申领中
  • 冈萨雷斯《数字图像处理》学习总结及感悟:第一章 绪论 百闻不如一见

    前往老猿Python博文目录 https blog csdn net LaoYuanPython 一 引言 好几月前开始自学OpenCV Python 但老猿以前没接触过图像基础知识 数学知识基本上也都忘光了 因此在自学OpenCV Pyt
  • Python处理txt数据实例

    现在有一个具体的案例是这样的 CST电磁仿真软件得到一些txt数据在origin data文件夹中 需要其中的一些数据来通过origin软件绘制曲线分析一些问题 而且需要里面的所有数据曲线显示在同一个图形中 如果通过手动将txt数据一一复制
  • LeetCode第55题解析

    给定一个非负整数数组 你最初位于数组的第一个位置 数组中的每个元素代表你在该位置可以跳跃的最大长度 判断你是否能够到达最后一个位置 示例 1 输入 2 3 1 1 4 输出 true 解释 我们可以先跳 1 步 从位置 0 到达 位置 1
  • js的闭包的理解

    js的变量的作用域分为全局变量和局部变量 函数内部的变量称为局部变量 在函数的内部可以访问到全局变量 但是函数外部无法访问函数内部的变量 闭包可以解决无法访问函数内部的变量的问题 且可以隐藏这个变量 不被外部直接访问 闭包 函数内部的子函数
  • JavaScript 搜索引擎 lunr.js

    lunr js 实现了在网页上的搜索引擎 类似 Solr 示例代码 view source print 01 定义索引 02 var idx lunr function 03 this field title boost 10 04 thi
  • flask需求文件requirements.txt的创建和使用

    flask需求文件requirements txt的创建及使用 简介 flask项目中包含一个requirements txt 文件 用于记录所有依赖包及其精确的版本号用以新环境部署 创建 生成需求文件 在命令行输入 pip freeze
  • 服务器一直被攻击怎么办?

    有很多人问说 网站一直被攻击 什么被挂马 什么被黑 每天一早打开网站 总是会出现各种各样的问题 这着实让站长们揪心 从修改服务器管理账号开始 到修改远程端口 什么措施都做了 还是会被攻击挂马 服务器一直被攻击时 要怎么做 1 切断网络 对服
  • 秋招-算法-动态规划篇

    秋招 算法 动态规划篇 只求秋招笔试能过 所以本文更多是怎么使用模板来解动态规划题 能过就好 对时间和空间的考虑不会太多 介绍 动态规划通过组合子问题的解得到原问题的解 适合动态规划解决的问题具有重叠子问题和最优子结构两大特征 通常使用空间
  • caffe 学习率设置问题

    solver算是caffe的核心的核心 它协调着整个模型的运作 caffe程序运行必带的一个参数就是solver配置文件 运行代码一般为 caffe train solver slover prototxt 在Deep Learning中
  • Unity和VS2019下载及配置流程

    https www jianshu com p 6fe2dc4de4c3
  • 可编程手机蓝牙App控制直流电机速度

    我们将通过 Android 应用程序 app 介绍直流电机速度控制 对于该项目 该应用程序安装在智能手机设备中 该设备使用蓝牙向控制直流电机速度的电路发送命令 此 Android 应用程序具有引人注目且易于使用的图形用户界面 GUI 我们将
  • IBM区块链负责人Jesse Lund的“你问我答”

    点击上方 蓝色字 可关注我们 编辑 铅笔盒 IBM区块链部门负责人Jesse Lund抽空参与了 你问我答 活动 Ask Me Anything AMA 回答了关于加密货币及IBM与Stellar合作关系的一些问题 具体内容如下 IBM现在
  • ue4 DerivedDataCache报错

    启动ue4时报错 报错堆栈如下 从堆栈可以看出是DerivedDataCache报错 从堆栈的CachedDataProbablyExists函数更可以看出是在判断cache是否存在 故而想到删除项目目录下DerivedDataCache中
  • RPC框架的异步处理

    RPC异步调用 以tars rpc框架为示例说明 引入工作线程池和io收发线程池将工作线程和io收发线程两者的同步关系解除 RPC中的上下文十分重要 因为请求包的发送 响应包的callback回调不在同一个工作线程中完成 需要一个conte