cloudstackapi调度流程
我们发往cloudstack的api命令由management端的ApiServlet的processRequest(req,resp)进性处理,该函数开启一个线程进行处理。对于login和logout命令单独处理,其它命令发往ApiServer组件进行处理
finalStringresponse=_apiServer.handleRequest(params,responseType,auditTrailSb);
在ApiServer组件的handleRequest函数中,通过解析参数params,构建出cmdObj对象,剩余参数存入paramMap,之后进入命令的调度入口函数queueCommand:
//This is where the command is either serialized, or directlydispatched
response= queueCommand(cmdObj,paramMap);
在ApiServer组件的queueCommand函数中,会依据cmdObj的类型进行不同的调度路径
//Queue command based onCmdsuper class:
//BaseCmd:cmdis dispatched to ApiDispatcher, executed, serialized and returned.
//BaseAsyncCmd:cmdis processed and submitted as an AsyncJob, job related info isserialized and returned.
//BaseAsyncCreateCmd:cmdparamsare processed and create() is called, then same workflowas BaseAsyncCmd.
下面给出DeployVMCmd创建VM命令的类继承结构
在queueCommand函数中,调用逻辑关键代码如下:
if(cmdObjinstanceofBaseAsyncCmd) {
if(cmdObjinstanceofBaseAsyncCreateCmd) {
_dispatcher.dispatchCreateCmd(createCmd,params);
}
else{
dispatchChainFactory.getStandardDispatchChain().dispatch(newDispatchTask(cmdObj,params));
}
AsyncJobVOjob=newAsyncJobVO(...);
job.setDispatcher(_asyncDispatcher.getName());
finallongjobId=_asyncMgr.submitAsyncJob(job);
returngetBaseAsyncResponse(jobId,asyncCmd);
}
else{
_dispatcher.dispatch(cmdObj,params,false);
returnApiResponseSerializer.toSerializedString(...);
}
上面的调度逻辑涉及了5个类:
ApiDispatcher_dispatcher,
DispatchChainFactorydispatchChainFactory,
ApiAsyncJobDispatcher_asyncDispatcher,
AsyncJobVOjob,
AsyncJobManager_asyncMgr,
前2个类是用于直接调度cmd对象。
下面是ApiDispatcher的类图
ApiDispatcher的dispatchCreateCmd(createCmd,params)会执行Cmd对象createCmd的create()方法。
ApiDispatcher的dispatch(cmdObj,params,false)会执行Cmd对象cmdObj的execute()
方法。此外,在执行cmd对象方法execute()前,会对cmdObj进行判断
if(cmd instanceof BaseAsyncCmd) {
.....
_asyncMgr.syncAsyncJobExecution((AsyncJob)asyncCmd.getJob(),asyncCmd.getSyncObjType(),asyncCmd.getSyncObjId().longValue(),queueSizeLimit);
.....
}
cmd.execute();
异步调度框架的ApiAsyncJobDispatcher采用了ApiDispatcher的dispatch方法来调度cmd对象。
后3个类属于异步调度,job管理。
下面是AsyncJob执行流程的顺序图
由_asyncMgr.submitAsyncJob(job)开始,调用submitAsyncJob(job,false)函数,该函数将job信息存入到db表async_job中,之后
向messageBus发行submit消息
publishOnEventBus(job,"submit");
调度执行job
scheduleExecution(job,scheduleJobExecutionInContext);
该函数体如下:
privatevoidscheduleExecution(final AsyncJobjob,boolean executeInContext){
Runnablerunnable= getExecutorRunnable(job);
if(executeInContext){
runnable.run();
}else{
if(job.getDispatcher()==null||job.getDispatcher().equalsIgnoreCase("ApiAsyncJobDispatcher"))
_apiJobExecutor.submit(runnable);
else
_workerJobExecutor.submit(runnable);
}
}
传入的scheduleJobExecutionInContext为false,并且job的Dispatcher为ApiAsyncJobDispatcher,最终会把runnable放入_apiJobExecutor线程池中运行。
跟入getExecutorRunnable(job)会跟踪job的执行流程
关键代码
_jobMonitor.registerActiveTask(runNumber,job.getId());
对job执行情况进行监视。
关键代码
if((getAndResetPendingSignals(job)& AsyncJob.Constants.SIGNAL_MASK_WAKEUP)!= 0) {
AsyncJobDispatcherjobDispatcher= getWakeupDispatcher(job);
if(jobDispatcher!=null){
jobDispatcher.runJob(job);
}
...
}else{
AsyncJobDispatcherjobDispatcher= getDispatcher(job.getDispatcher());
if(jobDispatcher!=null){
jobDispatcher.runJob(job);
}
}
job无论是通过Signals被激活还是正常的执行,都会先获取job的jobDispatcher调度器,执行调度器
jobDispatcher.runJob(job)的runJob(job)方法。
Api命令的job调度器为ApiAsyncJobDispatcher,进入ApiAsyncJobDispatcher的runJob(job)方法:
在ApiAsyncJobDispatcher的runJob(finalAsyncJobjob)方法中,关键代码为
.....
Class<?>cmdClass= Class.forName(job.getCmd());
cmdObj= (BaseAsyncCmd)cmdClass.newInstance();
cmdObj= ComponentContext.inject(cmdObj);
cmdObj.configure();
cmdObj.setJob(job);
.....
//dispatch could ultimately queue the job
_dispatcher.dispatch(cmdObj,params,true);
//serialize this to the asyncjob table
_asyncJobMgr.completeAsyncJob(job.getId(),JobInfo.Status.SUCCEEDED,0,ApiSerializerHelper.toSerializedString(cmdObj.getResponseObject()));
......
由job构建cmdObj对象,通过ApiDispatcher的dispatch方法来调度cmd对象,调度执行完成后对dbasyncjob相关表进行更改。前面提到过,ApiDispatcher的dispatch方法在执行cmd对象方法execute()前,如果cmd对象为BaseAsyncCmd的子类,会调用AsyncJobManagerImpl的
syncAsyncJobExecution(AsyncJobjob,StringsyncObjType,longsyncObjId,longqueueSizeLimit)方法。
该方法将待执行job的信息插入db的表sync_queue和sync_queue_item中。
之后执行cmd对象的execute()方法。
调度完后,通过AsyncJobManagerImpl的completeAsyncJob(...)方法对db表async_job,sync_queue_item和sync_queue修改,完成的job从sync_queue_item和sync_queue删除。