03 xxl-job任务执行流程

2023-11-17

作业类型

xxl-job支持七种作业类型:BeanGLUE(Java)GLUE(Shell)GLUE(Python)GLUE(PHP)GLUE(Nodejs)GLUE(PowerShell)。其中,GLUE类型作业都是在admin管理端编辑业务代码,而Bean类型作业是将用户业务代码逻辑集成到xxl-job进行调度,源码位于用户项目中,而非xxl-jobadmin模块

xxl-job抽象IJobHandler组件,用于执行作业,其实现有三种(见下图):

MethodJobHandler:Bean类型作业处理器,Bean类型作业逻辑实际上封装在带有@XxlJob注解的Method中;

ScriptJobHandler:脚本类型作业处理器,如ShellPythonPHPNodejsPowerShell等都可以看出脚本类型作业,使用该处理器;

GlueJobHandler:该种作业处理器专门用于处理Glue(Java)类型作业,上节分析过Java类型作业会被GlueFactory编译、初始化成实例,然后封装到GlueJobHandler中进行执行;

执行流程

服务端流程

服务端作业执行触发入口见JobTriggerPoolHelper#addTrigger

public void addTrigger(final int jobId,
                      final TriggerTypeEnum triggerType,
                      final int failRetryCount,
                      final String executorShardingParam,
                      final String executorParam,
                      final String addressList) {

   // 这里根据一定规则将触发任务从两个线程池中选取一个进行投递
   // fastTriggerPool:默认投递线程池
   // slowTriggerPool:慢作业投递到该线程池
   // 慢作业定义:投递超过500ms,且累计一分钟超过10次(每分钟重置缓存重新计算),则该作业就是慢作业,后续执行时使用slowTriggerPool
   ThreadPoolExecutor triggerPool_ = fastTriggerPool;
   AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
   if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
       triggerPool_ = slowTriggerPool;
   }

   // trigger
   triggerPool_.execute(new Runnable() {
       @Override
       public void run() {

           long start = System.currentTimeMillis();

           try {
               // 触发作业
               XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
           } catch (Exception e) {
               logger.error(e.getMessage(), e);
           } finally {

               // 每分钟清空慢作业累计缓存
               long minTim_now = System.currentTimeMillis()/60000;
               if (minTim != minTim_now) {
                   minTim = minTim_now;
                   jobTimeoutCountMap.clear();
               }

               // 超过500ms则慢作业执行次数累计+1,
               // 执行端采用异步模式:作业下发到执行端放入到队列中即返回,所以,这个时间是不包括作业本身执行时间
               long cost = System.currentTimeMillis()-start;
               if (cost > 500) {       // ob-timeout threshold 500ms
                   AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                   if (timeoutCount != null) {
                       timeoutCount.incrementAndGet();
                   }
               }
           }

       }
   });
}

继续向下跟踪XxlJobTrigger#trigger:

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

   // 阻塞处理策略
   ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);
   // 路由策略
   ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
   // 分片参数
   String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

   // 1、save log-id
   XxlJobLog jobLog = new XxlJobLog();
   jobLog.setJobGroup(jobInfo.getJobGroup());
   jobLog.setJobId(jobInfo.getId());
   jobLog.setTriggerTime(new Date());
   // xxl_job_log插入运行日志
   XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
   logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

   // 2、init trigger-param
   TriggerParam triggerParam = new TriggerParam();
   triggerParam.setJobId(jobInfo.getId());
   triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
   triggerParam.setExecutorParams(jobInfo.getExecutorParam());
   triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
   triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
   triggerParam.setLogId(jobLog.getId());
   triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
   triggerParam.setGlueType(jobInfo.getGlueType());
   triggerParam.setGlueSource(jobInfo.getGlueSource());
   triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
   triggerParam.setBroadcastIndex(index);
   triggerParam.setBroadcastTotal(total);

   // 初始化执行器地址
   String address = null;
   ReturnT<String> routeAddressResult = null;
   if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
       if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
           // 分片广播模式
           if (index < group.getRegistryList().size()) {
               address = group.getRegistryList().get(index);
           } else {
               address = group.getRegistryList().get(0);
           }
       } else {
           //路由策略选取执行器地址
           routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
           if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
               address = routeAddressResult.getContent();
           }
       }
   } else {
       routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
   }

   // 4、trigger remote executor
   ReturnT<String> triggerResult = null;
   if (address != null) {
       // 作业执行
       triggerResult = runExecutor(triggerParam, address);
   } else {
       triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
   }

   // 收集执行信息
   StringBuffer triggerMsgSb = new StringBuffer();
   triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
   triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
   triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
               .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
   triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
   triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
   if (shardingParam != null) {
       triggerMsgSb.append("("+shardingParam+")");
   }
   triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
   triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
   triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

   triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
               .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

   // 6、save log trigger-info
   jobLog.setExecutorAddress(address);
   jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
   jobLog.setExecutorParam(jobInfo.getExecutorParam());
   jobLog.setExecutorShardingParam(shardingParam);
   jobLog.setExecutorFailRetryCount(finalFailRetryCount);
   //jobLog.setTriggerTime();
   jobLog.setTriggerCode(triggerResult.getCode());
   jobLog.setTriggerMsg(triggerMsgSb.toString());

   // 将执行信息更新到xxl_job_log日志表中
   XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

   logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

这个方法代码比较多,但是逻辑都比较简单,核心逻辑:广播或路由策略选取执行器地址 -> 作业执行 -> 收集执行信息更新到xxl_job_log日志表中。

路由策略下节单独分析,接下里继续跟踪作业执行流程XxlJobTrigger#runExecutor:

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
   ReturnT<String> runResult = null;
   try {
       // 根据address获取ExecutorBiz
       ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
       runResult = executorBiz.run(triggerParam);
   } catch (Exception e) {
       logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
       runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
   }

   // 结果解析
   StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
   runResultSB.append("<br>address:").append(address);
   runResultSB.append("<br>code:").append(runResult.getCode());
   runResultSB.append("<br>msg:").append(runResult.getMsg());

   runResult.setMsg(runResultSB.toString());
   return runResult;
}

根据address获取对应的执行器代理ExecutorBiz,然后调用其run方法将作业下发到执行器端运行。上节分析过执行器启动时使用netty初始化一个http serverweb容器,所以,这里的下发逻辑比较简单,就是调用http接口XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);

执行端流程

上节执行器启动流程分析过其在启动时会利用netty初始化一个http serverweb容器,用于接收admin下发指令,然后将接收到的指令转给EmbedHttpServerHandler#process处理:

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
       
   // valid
   if (HttpMethod.POST != httpMethod) {
       return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
   }
   if (uri==null || uri.trim().length()==0) {
       return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
   }
   if (accessToken!=null
           && accessToken.trim().length()>0
           && !accessToken.equals(accessTokenReq)) {
       return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
   }

   // services mapping
   try {
       if ("/beat".equals(uri)) { //执行器是否正常(在线),对应路由策略:故障转移
           return executorBiz.beat();
       } else if ("/idleBeat".equals(uri)) {// 执行器是否空闲,对应路由策略:忙碌转移
           IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
           return executorBiz.idleBeat(idleBeatParam);
       } else if ("/run".equals(uri)) {
           TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
           return executorBiz.run(triggerParam);
       } else if ("/kill".equals(uri)) { // kill作业指令监听
           logger.info("receive kill, data:{}", requestData);
           KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
           return executorBiz.kill(killParam);
       } else if ("/log".equals(uri)) {// 查看执行器调度日志监听
           LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
           return executorBiz.log(logParam);
       } else {
           return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
       }
   } catch (Exception e) {
       logger.error(e.getMessage(), e);
       return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
   }
}

继续跟踪ExecutorBizImpl#run:

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
   // load old:jobHandler + jobThread
   // 根据jobId从缓存中加载JobThread和IJobHandler
   JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
   IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
   String removeOldReason = null;

   // 作业类型匹配 并进行IJobHandler校验
   // 比如作业IJobHandler发送变更、Glue类作业源码出现编辑等,则之前缓存的JobThread不能再继续使用,并使用最新IJobHandler创建JobThread
   GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
   if (GlueTypeEnum.BEAN == glueTypeEnum) {//Bean类型作业
 ......
   } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {//Java类型作业
 ......
   } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {//脚本类作业
 ......
   } else {
       return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
   }

   if (jobThread != null) {
       // 如果JobThread != null,则该JobThread可能存在正在运行作业,则根据阻塞策略处理
       ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
       if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
           // 丢弃后续调度:如果JobThread还正在执行作业或其triggerQueue中有排队作业,则当前作业丢弃
           if (jobThread.isRunningOrHasQueue()) {
               return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
           }
       } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
           // 覆盖之前调度:如果JobThread还正在执行作业或其triggerQueue中有排队作业,则destroy之前的JobThread,并重新创建JobThread运行当前作业
           if (jobThread.isRunningOrHasQueue()) {
               removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

               jobThread = null;
           }
       } else {
           // 单机串行则直接将作业发送到JobThread的triggerQueue中即可
       }
   }

   if (jobThread == null) {
       // 创建JobThread,并放入缓存,如果jobId缓存中已存在,则destroy
       jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
   }

   logger.debug("jobThread.pushTriggerQueue hash:{}, data:{}", System.identityHashCode(jobThread), GsonTool.toJson(triggerParam));
   // 将下发的作业放入到JobThread的triggerQueue中,JobThread处理线程从triggerQueue提取执行
   ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
   return pushResult;
}

下发的作业被投递到JobThreadtriggerQueue队列中,JobThread#run:

@Override
public void run() {

   try {
    // 调用IJobHandler.init方法,如@XxlJob(init=xxx)即在这里调用
 handler.init();
} catch (Throwable e) {
    logger.error(e.getMessage(), e);
}

while(!toStop){
       // running=false表示当前JobThread没有在处理作业
 // isRunningOrHasQueue()中判断JobThread是否运行用到该值以及triggerQueue
 running = false;
 // 空闲次数累加+1
 idleTimes++;

       TriggerParam triggerParam = null;
           ReturnT<String> executeResult = null;
           try {
   triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
   if (triggerParam!=null) {
                   // running=true表示当前JobThread正在处理作业
    running = true;
    // 重置空闲统计次数
    idleTimes = 0;
    triggerLogIdSet.remove(triggerParam.getLogId());

    // log filename, like "logPath/yyyy-MM-dd/9999.log"
    // 初始化日志文件
    String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
    XxlJobFileAppender.contextHolder.set(logFileName);
    // 将分片信息注入到线程上下文中:InheritableThreadLocal
    ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));

    // execute
    XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());

    // executorTimeout:作业执行超时控制
    // 正常执行作业是handler.execute(triggerParam.getExecutorParams()),
    // 如果带有超时控制,则封装FutureTask放入到线程中异步执行,超时则触发中断并返回超时异常
    if (triggerParam.getExecutorTimeout() > 0) {
     // limit timeout
     Thread futureThread = null;
     try {
      final TriggerParam triggerParamTmp = triggerParam;
      FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
       @Override
       public ReturnT<String> call() throws Exception {
        return handler.execute(triggerParamTmp.getExecutorParams());
       }
      });
      futureThread = new Thread(futureTask);
      futureThread.start();

      executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
     } catch (TimeoutException e) {

      XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
      XxlJobLogger.log(e);

      executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
     } finally {
      futureThread.interrupt();
     }
    } else {
     // 调用对应的IJobHandler处理作业
     executeResult = handler.execute(triggerParam.getExecutorParams());
    }

    if (executeResult == null) {
     executeResult = IJobHandler.FAIL;
    } else {
     executeResult.setMsg(
       (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
         ?executeResult.getMsg().substring(0, 50000).concat("...")
         :executeResult.getMsg());
     executeResult.setContent(null); // limit obj size
    }
    XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);

   } else {
    // 连续超时30次(每次3秒),即90秒内JobThread一直空闲,则销毁JobThread
    if (idleTimes > 30) {
     if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
      XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
     }
    }
   }
  } catch (Throwable e) {
   if (toStop) {
    XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
   }

   StringWriter stringWriter = new StringWriter();
   e.printStackTrace(new PrintWriter(stringWriter));
   String errorMsg = stringWriter.toString();
   executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);

   // 作业执行异常,则将异常信息写入到日志中
   XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
  } finally {
               if(triggerParam != null) {
                   if (!toStop) {
     // JobThread未停止场景下,异步回调机制将执行结果推送到admin
                       TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
                   } else {
     // JobThread停止场景下,异步回调机制将kill异常推送到admin
                       ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
                       TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
                   }
               }
           }
       }

 // JobThread被kill,检查下triggerQueue是否还有等待触发作业,如果有则向admin推送异常信息
 while(triggerQueue !=null && triggerQueue.size()>0){
  TriggerParam triggerParam = triggerQueue.poll();
  if (triggerParam!=null) {
   // is killed
   ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
   TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
  }
 }

 // destroy
 try {
  // 销毁IJobHandler,调用IJobHandler.destroy方法,如@XxlJob(destroy=xxx)即在这里调用
  handler.destroy();
 } catch (Throwable e) {
  logger.error(e.getMessage(), e);
 }

 logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

上面代码很多,但是逻辑不太复杂,看注释很容易理解到,接下来再来看下执行流程中最后一个核心组件IJobHandler,调用作业执行逻辑被封装到该组件中,xxl-job内置提供了三种实现方式,分别对应调用BeanJava脚本类型作业,其实现不太复杂,这里就不再继续深入分析。

核心抽象组件

ExecutorRouter:路由组件,选取执行器地址;

ExecutorBizClient:路由组件选取任务执行器地址后,将其包装成ExecutorBizClientExecutorBizClient可以看成执行器在引擎端代理,屏蔽远程RPC网络通信底层细节;

EmbedHttpServerHandler:执行器通过netty实现http server容器,EmbedHttpServerHandler扩展组件用于处理接收指令;

ExecutorBizImplExecutorBizClient作为执行器在引擎端代理,主要将指令通过RPC转发给执行器,起到透传作用,ExecutorBizImpl则是执行器上真正实现逻辑封装,所以,ExecutorBizClientExecutorBizImpl都实现同一接口ExecutorBiz

JobThread:每个任务在执行器上执行都会对应一个JobThread,任务和任务间是互相独立的,JobThread控制任务在执行器上并发模型。

IJobHandlerIJobHandler则是封装怎么调用任务逻辑,xxl-job内置三种实现类分别用来调用不同类型任务。

总结

上面对xxl-job作业执行的核心关键代码进行了整体分析梳理,整体还是比较简单,可能比较枯燥,下面简要整理了作业执行的大概流程(见下图),可对xxl-job调度机制有个大致理解:

大致描述:

  • xxl-job整体架构采用中心化设计,分为调度中心Admin和执行器两部分;

  • 调度中心Admin模块提供trigger触发接口进行作业调度,然后根据作业历史统计下发耗时将作业分配到两个线程池中的一个进行执行;

  • 执行前将作业启动日志记录到xxl_job_log表中,然后利用路由组件选取执行器地址,并利用执行器代理ExecutorBiz将执行下发到路由的执行器上,执行器代理ExecutorBiz实现很简单:就是发送http请求;

  • 执行器在启动时会利用netty初始化一个内嵌http server容器,当接收到调度中心发送过来的指令后,将其转交给EmbedHttpServerHandler处理器进行处理;

  • EmbedHttpServerHandler处理器在处理作业运行指令时,会根据jobId从缓存中查找对应的JobThread,然后将作业执行指令投递到JobThread实例中triggerQueue队列中排队;

  • JobThread线程不停循环从triggerQueue队列中提取等待执行的作业信息,然后将其交由IJobHandler真正处理作业调用,JobThreadIJobHandler处理结果解析后投递给TriggerCallbackThread线程中callBackQueue队列中排队;

  • TriggerCallbackThread内部也是线程不停循环从callBackQueue提取回调任务,然后转交给doCallback方法,这个方法内部通过Admin代理类AdminBizClient叫结果回调发送给调用中心的回调接口,即完成作业完成通知。

上面就是xxl-job作业执行的整体大致流程,将其抽象出来的几个核心组件串联起来看清其脉络,则整个逻辑就比较清晰了。这里理解关键点是JobThread组件,每个作业在每个执行器中会对应一个JobThread实例,当作业下发到执行器上时,找到对应的JobThread进行处理。JobThread采用懒加载和缓存模式设计,只有作业下发执行器未找到对应的JobThread才会创建并返回起来,待下次同一个作业过来执行时直接使用该JobThread即可。

什么场景下执行器找不到JobThread

  • 作业第一次下发到该执行器;

  • JobThread内部线程循环不停从triggerQueue提取作业进行处理,且每个作业在执行器上对应一个JobThread,如果某个作业在执行器上执行一次后面不再执行、或者执行频率很低,可能会导致大量线程浪费,所以JobThread设计上有空闲超时自动销毁机制。当30 * 3 = 90秒没有执行作业,则判断JobThread空闲超时,进入销毁流程,后面又接收到该作业下发来的指令,则会重新创建JobThread

长按二维码识别关注

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

03 xxl-job任务执行流程 的相关文章

  • Android 无法解析日期异常

    当尝试解析发送到我的 Android 客户端的日期字符串时 我得到一个无法解析的日期 这是例外 java text ParseException 无法解析的日期 2018 09 18T00 00 00Z 位于 偏移量 19 在 java t
  • Android:canvas.drawBitmap() 方法无法正常工作

    我已经发布了两个与此相关的问题 请参考此自定义饼图 1 https stackoverflow com questions 28343600 customize pie chart in quarter shape at the botto
  • 如何仅从 Firestore 获取最新更新的数据?

    在 Firestore 上发现任何更改时始终获取整个文档 如何只获取最近更新的数据 这是我的数据 我需要在第一次加载时在聊天中按对象顺序 例如 2018 09 17 30 40 msg和sendby 并且如果数据更新则仅获取新的msg和se
  • 带有 backstack Resume 的嵌套片段

    在我的应用程序中有几个fragments in an activity我正在维护一个backStack对于这些fragment 一切都很好 但其中有一个嵌套的片段 当我把它放入backStack然后再次按后退按钮恢复 该片段看起来与先前的内
  • 如何在 ant 中为 junit 测试设置 file.encoding?

    我还没有完全完成file encoding 和 ant https stackoverflow com questions 1339352 how do i set dfile encoding within ants build xml
  • 在 Xamarin 中隐藏软键盘

    如何隐藏软键盘以便在聚焦时显示Entry在 Xamarin forms 便携式表单项目中 我假设我们必须为此编写特定于平台的渲染器 但以下内容不起作用 我创建自己的条目子类 public class MyExtendedEntry Entr
  • react-native android fontFamily 不生效

    问题一 我在index android js的欢迎样式中添加了fontFamily 但没有效果 fontFamily 真的可以在 Android 上使用吗 欢迎 字体大小 20 fontFamily roboto thin 文本对齐 居中
  • Akka 与现有 java 项目集成的示例

    如果我已经有现有的javaWeb 应用程序使用spring and servlet容器 将 Akka 集成到其中的正确方法是什么 就像我将会有Actor1 and Actor2互相沟通的 开始使用这些演员的切入点是什么 例如 1 把它放在那
  • Jetpack Compose 中复选框中的透明复选标记

    在我的 Compose 应用程序中 我需要创建一个圆形复选框 我已经通过下面的代码实现了这一点 Composable fun CircleCheckBox isChecked Boolean modifier Modifier Modifi
  • JDBC 时间戳和日期 GMT 问题

    我有一个 JDBC 日期列 如果我使用 getDate 则会得到 date 仅部分2009 年 10 月 2 日但如果我使用 getTimestamp 我会得到完整的 date 2009 年 10 月 2 日 13 56 78 890 这正
  • 不可变的最终变量应该始终是静态的吗? [复制]

    这个问题在这里已经有答案了 在java中 如果一个变量是不可变的并且是final的 那么它应该是一个静态类变量吗 我问这个问题是因为每次类的实例使用它时创建一个新对象似乎很浪费 因为无论如何它总是相同的 Example 每次调用方法时都会创
  • 为什么 Google 建议将库复制到您的树中?

    谷歌的Play 服务 API 的使用说明 http developer android com google play services setup html 例如 说 将 extras google google play service
  • Android 4.4 Kitkat 自定义视图操作栏未填充整个宽度

    我试图拥有一个带有自定义视图的简单操作栏 但我得到以下结果 为了演示 我创建了一个带有黄色背景颜色的简单 xml 它应该占据整个宽度 这是 XML
  • Android Studio代理设置构建错误

    每当我尝试在 Android Studio 中构建应用程序时 都会收到以下错误 Error 169 254 16 169 254 16 Will ignore proxy settings for these hosts 我收到错误 5 次
  • Spring @Cacheable 和 @Async 注解

    我需要缓存一些异步计算的结果 具体来说 为了克服这个问题 我尝试使用 Spring 4 3 缓存和异步计算功能 作为示例 我们采用以下代码 Service class AsyncService Async Cacheable users C
  • 将 JScrollPane 添加到 JFrame

    我有一个关于向 Java 框架添加组件的问题 我有一个带有两个按钮的 JPanel 和一个添加了 JTable 的 JScrollPane 我想将这两个添加到 JFrame 中 我可以将 JPanel 添加到 JFrame 或将 JScro
  • 如何解决 greenDAO 在执行 InsertOrReplace 时“不存在这样的表错误”?

    我正在使用 greenDAO 并且已成功生成所有必需的类和实体 并且我可以看到我的表已创建 但是在要替换的行上放置断点后 我收到一条错误消息 告诉我 不存在这样的表错误 try appTimeUsageDao insertOrReplace
  • 如何更改 Spring OAuth2 上的response_type

    这是我使用 Instagram 进行 OAuth2 登录的配置 instagram client clientId clientId clientSecret clientSeret accessTokenUri https api ins
  • Spring RESTful控制器方法改进建议

    我是 Spring REST 和 Hibernate 的新手 也就是说 我尝试组合一个企业级控制器方法 我计划将其用作未来开发的模式 您认为可以通过哪些方法来改进 我确信有很多 RequestMapping value user metho
  • Android Espresso 单击按钮时出现错误

    我正在尝试使用 espresso 框架为 Android 应用程序编写一些 UI 测试 现在我只是检查启动屏幕上是否存在所有元素 然后尝试单击登录按钮 单击按钮时 测试由于错误而失败 我似乎无法理解为什么会发生这种情况 我的测试代码是 Ru

随机推荐

  • 算法:两个有序数组合并成一个有序数组 java语言

    题目 有两个有序数组a 和b 将它们合并成数组c 需要c 也是有序数组 思路 新建一个以两个集合长度之和为长度的新数组 从两数组最左边开始比起 把小的放入新集合 并用变量标记后一位置 每次比较都是比较的最左边未比较过的元素 通过变量 循环比
  • 分享一个可交互的小场景(二)

    先看效果 可互动的小场景 再看代码 JS部分
  • 正点原子I.MX6ULL开发板车牌识别项目实战 1

    1 项目总体概述 下图为 车牌识别项目 的系统框图 借助这个框图 简要介绍项目的总体思路和所需要做的准备工作 1 1 总体思路 通过摄像头采集图像信息 并将图像信息传递开发板 这里使用的是OpenCv 开发板收到图像信息之后 通过定时器 周
  • Python解决ModuleNotFoundError: No module named 'Queue'的问题

    我们知道Python2和Python3两个版本之间 有些不兼容的地方 Python3中引入Queue会报出这个问题 Python3中要这样引入 1 import queue Python2中要这样引入 1 import Queue 为了兼容
  • 第十六课,面剔除

    使用OpenGL的面剔除选项 它默认是禁用状态 glEnable GL CULL FACE 直接运行后 我们发现正方体的部分面确实被剔除了 但是却不是背向面 这是因为我们定义的正方体并不是严格遵循逆时针顺序定义的 原理详见教程 这里就不过多
  • python输出文本 去掉引号,如何从导出的python列表中删除逗号,引号和括号?

    You guys were super helpful with my last newbie question so I figured I would give it another shot Right now my Python 3
  • 基于范围的for循环

    一 基于范围的for循环 C 11 1 范围for的语法 2 范围for的使用条件 二 指针空值nullptr 一 基于范围的for循环 C 11 1 范围for的语法 对于一个有范围的集合而言 由程序员来说明循环的范围是多余的 有时候还会
  • 智能聊天机器人实现(源码+解析)

    前言 之前写了一篇 美女图片采集器 源码 解析 得到了众多朋友的支持 发现这样系列的教程还是挺受欢迎的 也激励我继续写下去 也在那一篇文章中提过 美女图片采集只是我先前那个完整APP中的一个功能罢了 还有其他几个比较好玩的尚未开源 之后有时
  • QWidgetAction实现鼠标滑过菜单项图标高亮显示

    需求是鼠标滑过菜单项时 菜单项的文字 icon以及子菜单的小箭头都要高亮显示 qss中只能设置item背景色 文字颜色以及子菜单小箭头的样式 icon的图片不能切换 另外曾经想过用indicator 对action setCheckable
  • Ubuntu18.04安装QT5

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 QT5是什么 二 安装包安装 1 下载安装包 2 安装QT5 3 运行 4 其他方式 总结 前言 最近在学习QT5 在Windows上的安装自然不必多说
  • 爬虫 — 反爬

    目录 一 UA 反爬 二 Cookie 验证与反爬 1 Cookie 简介 2 使用 Cookie 原因 3 Cookie 作用 3 1 模拟登录 3 2 反反爬 三 Referer 反爬 一 UA 反爬 UA User Agent 用户代
  • [机械]“重工业面临两大危机”——向文波(三一重工股份有限公司执行总裁)

    向文波 三一重工股份有限公司执行总裁 向文波是三一重工的掌门人 但深受徐工事件影响 他以业内的视角 适时地向中国重工业的改革发出一个警示信号 提出一个超越 抓大放小 国进民退 等传统国企改革的新命题 产业安全 引起了舆论与政府的重视 中国重
  • 2021.11.13-15总结

    将C语言文件相关的内容学完了 了解了文件相关的函数
  • linux网络管理

    一 网络接口 1 在Linux系统中 主机的网络接口卡通常称为网络接口 使用ifconfig命令来查看网络 2 eth0 是Linux系统中第一块以太网卡的名称 3 lo 是Linux系统中的 环回 网络接口 lo 并不代表真正的网络接口
  • 用户访问session分析-按session粒度进行数据聚合

    思路 之前模拟创建了两张表 user visit action 和 user info 对于user visit action表 1 通过用户传过来的指定日期范围内 从user visit action中查询出指定的用户访问数据 变成 ac
  • nginx根据url参数动态代理

    nginx根据url参数动态代理 请求url格式 其中参数proxy后面的url就是需要访问的真实地址 http localhost 9388 proxy http localhost 8038 Content layui font ico
  • 腾讯滑块识别-通用滑块识别

    遇到滑块问题 在写爬虫的时候 经常会遇到滑块问题 很多次都想过尝试如何攻破滑块 但是每次都没成功 除了最开始的极验滑块 当时通过原图和滑块图的对比 能够得出缺口坐标 但是随着极验 网易 腾讯滑块的更新 已经不能够找到原图了 下面给出滑块通杀
  • python的gui神器——gooey

    python的gui神器 gooey python自带的gui库 tkinter库 最近研究的gui库 gooey tkinter教程 tkinter GUI编程 gooey地址和教程 gooey 入门教程 python使用tkinter库
  • Android基础知识 - 内置SQLite数据库

    文章目录 SQLite数据库简单介绍 创建数据库 SQLiteOpenHelper类 简单概述 DatabaseTest项目 升级数据库 对表中的数据进行操作 添加数据 更新数据 删除数据 查询数据 使用SQL操作数据库 SQLite数据库
  • 03 xxl-job任务执行流程

    作业类型 xxl job支持七种作业类型 Bean GLUE Java GLUE Shell GLUE Python GLUE PHP GLUE Nodejs GLUE PowerShell 其中 GLUE类型作业都是在admin管理端编辑