收藏假期干货:Apache DolphinScheduler源码分析系列(超详细)

2023-05-16

0caf206bc6c980667c14a58f58bb54e9.jpeg

01

引言

Apache DolphinScheduler官方文档地址:

https://dolphinscheduler.apache.org/zh-cn/index.html

Apache DolphinScheduler是一个分布式去中心化,易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。

其原理图如下:

4063951980759c1f3f99913b52b2e670.png

接下来,本文一步一步详细地讲解其源码。

02

DolphinScheduler 项目结构

2.1 结构分析

DS

b27c390fd0e7799e17a5a2b48f47cd68.png

导入项目后,可以看到其主要核心模块如下:

模块描述
dolphinscheduler-alert告警模块,提供 AlertServer 服务。
dolphinscheduler-apiweb应用模块,提供 ApiServer 服务。
dolphinscheduler-common通用的常量枚举、工具类、数据结构或者基类
dolphinscheduler-dao提供数据库访问等操作。
dolphinscheduler-remote基于 netty 的客户端、服务端
dolphinscheduler-serverMasterServer 和 WorkerServer 服务
dolphinscheduler-serviceservice模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用
dolphinscheduler-ui前端模块

2.2 表分析

DS

dolphinscheduler_ddl.sql及dolphinscheduler_dml.sql

26c23adc91bc6bbcc98c80a1b9c146ec.png

执行完后,可以在数据库里看到有如下表:

表名表信息
t_ds_access_token访问ds后端的token
t_ds_alert告警信息
t_ds_alertgroup告警组
t_ds_command执行命令
t_ds_datasource数据源
t_ds_error_command(核心表)错误命令
t_ds_process_definition(核心表)流程定义
t_ds_process_instance(核心表)流程实例
t_ds_project项目
t_ds_queue队列
t_ds_relation_datasource_user用户关联数据源
t_ds_relation_process_instance子流程
t_ds_relation_project_user用户关联项目
t_ds_relation_resources_user用户关联资源
t_ds_relation_udfs_user用户关联UDF函数
t_ds_relation_user_alertgroup用户关联告警组
t_ds_resources资源文件
t_ds_schedules(核心表)流程定时调度
t_ds_session用户登录的session
t_ds_task_instance(核心表)任务实例
t_ds_tenant租户
t_ds_udfsUDF资源
t_ds_user用户
t_ds_versionds版本信息

核心表可以直接看文末附录。

2.2.1 类关系图 (用户/队列/数据源)

DS

1f18aea09395eee32fa089098d95f33d.png

描述如下:

  • 一个租户下可以有多个用户;

  • t_ds_user中的queue字段存储的是队列表中的queue_name信息;

  • t_ds_tenant下存的是queue_id,在流程定义执行过程中,用户队列优先级最高,用户队列为空则采用租户队列;

  • t_ds_datasource表中的user_id字段表示创建该数据源的用户;

  • t_ds_relation_datasource_user中的user_id表示,对数据源有权限的用户。

2.2.2 类关系图 (项目/资源/告警)

DS

db6671f37d82312bde39dae91018d761.png

描述如下:

  • 一个用户可以有多个项目,用户项目授权通过t_ds_relation_project_user表完成project_id和user_id的关系绑定

  • t_ds_projcet表中的user_id表示创建该项目的用户;

  • t_ds_relation_project_user表中的user_id表示对项目有权限的用户;

  • t_ds_resources表中的user_id表示创建该资源的用户;

  • t_ds_relation_resources_user中的user_id表示对资源有权限的用户;

  • t_ds_udfs表中的user_id表示创建该UDF的用户;

  • t_ds_relation_udfs_user表中的user_id表示对UDF有权限的用户。

2.2.3 类关系图 ( 命令/流程/任务)

DS

dac0dac93d05206abac7334e92a9189f.png

9d1c0b92579fde457337835a7125a7fb.png

描述如下:

  • 一个项目有多个流程定义,一个流程定义可以生成多个流程实例,一个流程实例可以生成多个任务实例

  • t_ds_schedulers表存放流程定义的定时调度信息;

  • t_ds_relation_process_instance表存放的数据用于处理流程定义中含有子流程的情况,parent_process_instance_id表示含有子流程的主流程实例id,process_instance_id表示子流程实例的id,parent_task_instance_id表示子流程节点的任务实例id,流程实例表和任务实例表分别对应t_ds_process_instance表和t_ds_task_instance表

03

DolphinScheduler 源码分析

讲解源码前,先贴一份官网的启动流程图:

73c7f6362c6f1b5070cdcab1ecaf2e4f.png

3.1 ExecutorController

DS

org.apache.dolphinscheduler.api.controller.ExecutorController

ff67282a8989f2e68b14648032b433d4.png

以下是对各接口的描述:

接口描述
/start-process-instance执行流程实例
/batch-start-process-instance批量执行流程实例
/execute操作流程实例,如:暂停, 停止, 重跑, 从暂停恢复,从停止恢复
/batch-execute批量操作流程实例
/start-check检查流程定义或检查所有的子流程定义是否在线

接下我们看看最核心的方法:

/**
     * do action to process instance: pause, stop, repeat, recover from pause, recover from stop
     *
     * @param loginUser login user
     * @param projectCode project code
     * @param processInstanceId process instance id
     * @param executeType execute type
     * @return execute result code
     */
    @ApiOperation(value = "execute", notes = "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
            @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType")
    })
    @PostMapping(value = "/execute")
    @ResponseStatus(HttpStatus.OK)
    @ApiException(EXECUTE_PROCESS_INSTANCE_ERROR)
    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
    public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                          @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
                          @RequestParam("processInstanceId") Integer processInstanceId,
                          @RequestParam("executeType") ExecuteType executeType
    ) {
        Map result = execService.execute(loginUser, projectCode, processInstanceId, executeType);
        return returnDataList(result);
    }

可以看到execute接口,是直接使用ExecService去执行了,下面分析下。

3.2 ExecService

DS

下面看看里面的execute方法,已经加好了注释:

/**
 * 操作工作流实例
 *
 * @param loginUser         登录用户
 * @param projectCode       项目编码
 * @param processInstanceId 流程实例ID
 * @param executeType       执行类型(repeat running、resume pause、resume failure、stop、pause)
 * @return 执行结果
 */
@Override
public Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType) {


    /*** 查询项目信息 **/
    Project project = projectMapper.queryByCode(projectCode);
    //check user access for project


    /*** 判断当前用户是否有操作权限 **/
    Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, ApiFuncIdentificationConstant.map.get(executeType));
    if (result.get(Constants.STATUS) != Status.SUCCESS) {
        return result;
    }


    /*** 检查Master节点是否存在 **/
    if (!checkMasterExists(result)) {
        return result;
    }


    /*** 查询工作流实例详情 **/
    ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
    if (processInstance == null) {
        putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
        return result;
    }


    /*** 根据工作流实例绑定的流程定义ID查询流程定义 **/
    ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
            processInstance.getProcessDefinitionVersion());
    if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
        /*** 校验工作流定义能否执行(工作流是否存在?是否上线状态?存在子工作流定义不是上线状态?) **/
        result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
        if (result.get(Constants.STATUS) != Status.SUCCESS) {
            return result;
        }
    }


    /*** 根据当前工作流实例的状态判断能否执行对应executeType类型的操作 **/
    result = checkExecuteType(processInstance, executeType);
    if (result.get(Constants.STATUS) != Status.SUCCESS) {
        return result;
    }


    /*** 判断是否已经选择了合适的租户 **/
    if (!checkTenantSuitable(processDefinition)) {
        logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
                processDefinition.getId(), processDefinition.getName());
        putMsg(result, Status.TENANT_NOT_SUITABLE);
    }


    /*** 在executeType为重跑的状态下,获取用户指定的启动参数 **/
    Map<String, Object> commandMap = JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() {
    });
    String startParams = null;
    if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) {
        Object startParamsJson = commandMap.get(Constants.CMD_PARAM_START_PARAMS);
        if (startParamsJson != null) {
            startParams = startParamsJson.toString();
        }
    }


    /*** 根据不同的ExecuteType去执行相应的操作 **/
    switch (executeType) {
        case REPEAT_RUNNING: // 重跑
            result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams);
            break;
        case RECOVER_SUSPENDED_PROCESS: // 恢复挂载的工作流
            result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
            break;
        case START_FAILURE_TASK_PROCESS: // 启动失败的工作流
            result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
            break;
        case STOP: // 停止
            if (processInstance.getState() == ExecutionStatus.READY_STOP) {
                putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
            } else {
                result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
            }
            break;
        case PAUSE: // 暂停
            if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
                putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
            } else {
                result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
            }
            break;
        default:
            logger.error("unknown execute type : {}", executeType);
            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");


            break;
    }
    return result;
}

可以看到,以上代码前半部分主要是做了校验的操作,后半部分是根据执行类型来做不同的操作,操作主要分为两部分:insertCommand以及updateProcessInstancePrepare

3.2.1 insertCommand

DS

方法代码如下,其实主要就是把生成命令并插入t_ds_command(执行命令表),插入已经添加好注释:

/**
 * 插入命令(re run, recovery (pause / failure) execution)
 *
 * @param loginUser             登录用户
 * @param instanceId            工作流实例id
 * @param processDefinitionCode 工作流定义id
 * @param processVersion        工作流版本
 * @param commandType           命令类型
 * @return 操作结果
 */
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) {
    Map<String, Object> result = new HashMap<>();


    /*** 封装启动参数 **/
    Map<String, Object> cmdParam = new HashMap<>();
    cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId);
    if (!StringUtils.isEmpty(startParams)) {
        cmdParam.put(CMD_PARAM_START_PARAMS, startParams);
    }


    Command command = new Command();
    command.setCommandType(commandType);
    command.setProcessDefinitionCode(processDefinitionCode);
    command.setCommandParam(JSONUtils.toJsonString(cmdParam));
    command.setExecutorId(loginUser.getId());
    command.setProcessDefinitionVersion(processVersion);
    command.setProcessInstanceId(instanceId);


    /*** 判断工作流实例是否正在执行 **/
    if (!processService.verifyIsNeedCreateCommand(command)) {
        putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
        return result;
    }


    /*** 保存命令 **/
    int create = processService.createCommand(command);


    if (create > 0) {
        putMsg(result, Status.SUCCESS);
    } else {
        putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
    }


    return result;
}

3.2.2 updateProcessInstancePrepare

DS

方法代码如下,已经添加注释

/**
 * 准备更新工作流实例的命令类型和状态
 *
 * @param processInstance 工作流实例
 * @param commandType     命令类型
 * @param executionStatus 执行状态
 * @return 更新结果
 */
private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
    Map<String, Object> result = new HashMap<>();


    processInstance.setCommandType(commandType);
    processInstance.addHistoryCmd(commandType);
    processInstance.setState(executionStatus);
    int update = processService.updateProcessInstance(processInstance);


    // 判断流程是否正常
    if (update > 0) {
        StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
                processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0
        );
        Host host = new Host(processInstance.getHost());
        stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
        putMsg(result, Status.SUCCESS);
    } else {
        putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
    }
    return result;
}

根据流程图,我们可以看到了已经执行了如下红框的代码,也就是把我们的command已经缓存到了DB。

接下来需要看看Master的代码。

ad3cc87159209e59e7c688a2b560a060.png

3.3 MasterServer

DS

3706cbf05d74732d663fc749d657b9c9.png

@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
@EnableTransactionManagement
@EnableCaching
public class MasterServer implements IStoppable {
    private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);


    @Autowired
    private SpringApplicationContext springApplicationContext;


    @Autowired
    private MasterRegistryClient masterRegistryClient;


    @Autowired
    private TaskPluginManager taskPluginManager;


    @Autowired
    private MasterSchedulerService masterSchedulerService;


    @Autowired
    private SchedulerApi schedulerApi;


    @Autowired
    private EventExecuteService eventExecuteService;


    @Autowired
    private FailoverExecuteThread failoverExecuteThread;


    @Autowired
    private MasterRPCServer masterRPCServer;


    public static void main(String[] args) {
        Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
        SpringApplication.run(MasterServer.class);
    }


    /**
     * 启动 master server
     */
    @PostConstruct
    public void run() throws SchedulerException {


        // 初始化 RPC服务
        this.masterRPCServer.start();


        //安装任务插件
        this.taskPluginManager.installPlugin();


        /*** MasterServer 注册客户端,用于连接到注册表并传递注册表事件。
         * 当主节点启动时,它将在注册中心注册,并调度一个{@link HeartBeatTask}来更新注册表中的元数据**/
        this.masterRegistryClient.init();
        this.masterRegistryClient.start();
        this.masterRegistryClient.setRegistryStoppable(this);


        // 主调度程序线程,该线程将使用来自数据库的命令并触发执行的processInstance。
        this.masterSchedulerService.init();
        this.masterSchedulerService.start();


        this.eventExecuteService.start();
        this.failoverExecuteThread.start();


        //这是调度器的接口,包含操作调度任务的方法。
        this.schedulerApi.start();


        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (Stopper.isRunning()) {
                close("MasterServer shutdownHook");
            }
        }));
    }


    /**
     * 优雅的关闭方法
     *
     * @param cause 关闭的原因
     */
    public void close(String cause) {


        try {
            // set stop signal is true
            // execute only once
            if (!Stopper.stop()) {
                logger.warn("MasterServer is already stopped, current cause: {}", cause);
                return;
            }


            logger.info("Master server is stopping, current cause : {}", cause);


            // thread sleep 3 seconds for thread quietly stop
            ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
            // close
            this.schedulerApi.close();
            this.masterSchedulerService.close();
            this.masterRPCServer.close();
            this.masterRegistryClient.closeRegistry();
            // close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
            // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
            springApplicationContext.close();


            logger.info("MasterServer stopped, current cause: {}", cause);
        } catch (Exception e) {
            logger.error("MasterServer stop failed, current cause: {}", cause, e);
        }
    }


    @Override
    public void stop(String cause) {
        close(cause);
    }
}

在run方法里面,可以看到,主要依次执行了:

  • ① MasterRPCServer.start():启动master的rpc服务;

  • ② TaskPluginManager.installPlugin():安装任务插件;

  • ③ MasterRegistryClient.start():向Zookeeper注册MasterServer;

  • ④ MasterSchedulerService.start():主调度程序线程,该线程将使用来自数据库的命令并触发执行的processInstance。

  • ⑤ EventExecuteService.start():工作流实例执行情况

  • ⑥ FailoverExecuteThread():故障转移检测

  • ⑦ SchedulerApi.start():scheduler接口去操作任务实例

3.1.1 MasterRPCServer

DS

Master RPC Server主要用来发送或接收请求给其它系统

初始化方法如下:

@PostConstruct
private void init() {
    // 初始化远程服务
    NettyServerConfig serverConfig = new NettyServerConfig();
    serverConfig.setListenPort(masterConfig.getListenPort());
    this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);


    // 日志服务
    this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);


    this.nettyRemotingServer.start();
}

3.2.2 TaskPluginManager

DS

ff51798e603c33a5ba7154c6f13253a8.png

d311cb16ef8ed64713bf7f4cfce52c93.png

6ae435460ad241d78d2ed8acd0ae4832.png

3.3.3 MasterRegistryClient

DS

去中心化思想

所以MasterRegistryClient主要的作用是注册MasterServer客户端,用于连接到注册表并传递注册表事件。

当Master节点启动时,它将在注册中心注册,并调度一个HeartBeatTask来更新注册表中的元数据。

ebdc2c790b38ff94c1c052196d6570a8.png

a133576ce83be5c7b6a9eec66bfe2fa3.png

3.3.4 MasterSchedulerService

DS

其init和run方法如下,init主要就是初始化一个工作流实例的队列:

b410496f95ce3dc5d89f0a707f04f852.png

scheduleWorkflow()

3002e691a2eaf309f02fefe1fe859680.png

看看里面的scheduleWorkflow()方法,已写好注释:

/**
 * 从数据库中按槽位查询命令,转换为工作流实例,然后提交给workflowExecuteThreadPool。
 */
private void scheduleWorkflow() throws InterruptedException, MasterException {
    // 从数据库中按槽位查询命令
    List commands = findCommands();
    if (CollectionUtils.isEmpty(commands)) {
        // indicate that no command ,sleep for 1s
        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        return;
    }


    // 转换为工作流实例
    List processInstances = command2ProcessInstance(commands);
    if (CollectionUtils.isEmpty(processInstances)) {
        // indicate that the command transform to processInstance error, sleep for 1s
        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        return;
    }
    MasterServerMetrics.incMasterConsumeCommand(commands.size());


    for (ProcessInstance processInstance : processInstances) {
        //提交给workflowExecuteThreadPool
        submitProcessInstance(processInstance);
    }
}

提交工作流实例方法如下,注意提交到了workflowExecuteThreadPool

/**
 * 提交工作流实例给 workflowExecuteThreadPool
 *
 * @param processInstance 工作流实例
 */
private void submitProcessInstance(@NonNull ProcessInstance processInstance) {
    try {
        LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
        logger.info("Master schedule service starting workflow instance");


        // 封装工作流实例Runnable
        final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
                processInstance
                , processService
                , nettyExecutorManager
                , processAlertManager
                , masterConfig
                , stateWheelExecuteThread
                , curingGlobalParamsService);


        this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
        if (processInstance.getTimeout() > 0) {
            stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
        }
        ProcessInstanceMetrics.incProcessInstanceSubmit();


        // 提交封装好的工作流实例Runnable给workflowExecuteThreadPool
        CompletableFuture workflowSubmitFuture = CompletableFuture.supplyAsync(
                workflowExecuteRunnable::call, workflowExecuteThreadPool);
        workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
            if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
                // submit failed
                processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
                stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
                submitFailedProcessInstances.add(processInstance);
            }
        });
        logger.info("Master schedule service started workflow instance");


    } catch (Exception ex) {
        processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
        stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
        logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
    } finally {
        LoggerUtils.removeWorkflowInstanceIdMDC();
    }
}

3.3.5 EventExecuteService

DS

e30c480dd4a1f9088a5443c248d8cc64.png

3.3.6 FailoverExecuteThread

DS

FailoverExecuteThread为故障转移检测线程

b045568e1039db5a9e29b1ecf3f992f4.png

3.3.7 结构分析SchedulerApi

DS

SchedulerApi是操作调度任务实例的接口,其主要功能是启动调度程序、插入或更新调度任务、删除调度任务、关闭调度任务和释放资源

3.3.8 TaskPriorityQueueConsumer

DS

412e41a184061598c9b2b9dce296986d.png

b74dd6bf38e5d22697a757f1e0aba513.png

30b7f7a215de4ca7886b4c7ea3d3f724.png

4db8d8292f5682089927c0cd36ebcc7b.png

到这里,我们可以看到worker部分代码了。

3.4 WorkerServer

DS

@PostConstruct
public void run() {
  // worker rpc服务
    this.workerRpcServer.start();


  // 任务插件安装
    this.taskPluginManager.installPlugin();


    // 向Zookeeper注册客户端
    this.workerRegistryClient.registry();
    this.workerRegistryClient.setRegistryStoppable(this);
    Set workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
    this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);


   // 管理Worker线程
    this.workerManagerThread.start();
    
    // 报告状态线程
    this.retryReportTaskStatusThread.start();


    /*
     * registry hooks, which are called before the process exits
     */
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        if (Stopper.isRunning()) {
            close("WorkerServer shutdown hook");
        }
    }));
}

f2a78111e2ba8171227c8c5f6ebba054.png

3.4.1 TaskExecutePorcessor

DS

TaskExecuteProcessor

@Counted(value = "ds.task.execution.count", description = "task execute total count")
@Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Command command) {
 // code ...
}

3b3fa3381f99e289316cbce52fe6fe79.png

3.4.2 TaskExecuteThread

DS

TaskExecuteThread就是最终执行任务的代码了,里面的run方法如下,已加好注释:

@Override
public void run() {
    // dry run 预演模式
    if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
        taskExecutionContext.setStartTime(new Date());
        taskExecutionContext.setEndTime(new Date());
        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
        logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
            taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
        return;
    }
    try {
        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
        logger.info("script path : {}", taskExecutionContext.getExecutePath());
        if (taskExecutionContext.getStartTime() == null) {
            taskExecutionContext.setStartTime(new Date());
        }
        logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());


        //回调任务执行状态
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);


        // 拷贝 hdfs/minio 文件到本地
        List<pair> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());</pair
        if (!fileDownloads.isEmpty()) {
            downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
        }


        taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());


        taskExecutionContext.setTaskAppId(String.format("%s_%s",
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId()));


        TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
        if (null == taskChannel) {
            throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
        }
        String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
                taskExecutionContext.getProcessDefineCode(),
                taskExecutionContext.getProcessDefineVersion(),
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId());
        taskExecutionContext.setTaskLogName(taskLogName);


        // 给当前线程设置名称
        Thread.currentThread().setName(taskLogName);


        task = taskChannel.createTask(taskExecutionContext);


        // 执行任务插件方法 - init
        this.task.init();


        //init varPool
        this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());


        // 执行任务插件方法 -  handle
        this.task.handle();


        // 判断是否需要发送告警
        if (this.task.getNeedAlert()) {
            sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
        }


        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));
        taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
        taskExecutionContext.setProcessId(this.task.getProcessId());
        taskExecutionContext.setAppIds(this.task.getAppIds());
        taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
        logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
    } catch (Throwable e) {
        logger.error("task scheduler failure", e);
        kill();
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
        taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
        taskExecutionContext.setProcessId(this.task.getProcessId());
        taskExecutionContext.setAppIds(this.task.getAppIds());
    } finally {
        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
        clearTaskExecPath();
        LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
    }
}

04

附录

4.1 核心表

DS

① t_ds_process_definition(流程定义表)

字段类型注释
idint主键
namevarchar流程定义名称
versionint流程定义版本
release_statetinyint流程定义的发布状态:0 未上线 1已上线
project_idint项目id
user_idint流程定义所属用户id
process_definition_jsonlongtext流程定义json串
descriptiontext流程定义描述
global_paramstext全局参数
flagtinyint流程是否可用:0 不可用,1 可用
locationstext节点坐标信息
connectstext节点连线信息
receiverstext收件人
receivers_cctext抄送人
create_timedatetime创建时间
timeoutint超时时间
tenant_idint租户id
update_timedatetime更新时间
modify_byvarchar修改用户
resource_idsvarchar资源id集

② t_ds_process_instance(流程实例表)

字段类型注释
idint主键
namevarchar流程实例名称
process_definition_idint流程定义id
statetinyint流程实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成
recoverytinyint流程实例容错标识:0 正常,1 需要被容错重启
start_timedatetime流程实例开始时间
end_timedatetime流程实例结束时间
run_timesint流程实例运行次数
hostvarchar流程实例所在的机器
command_typetinyint命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程
command_paramtext命令的参数(json格式)
task_depend_typetinyint节点依赖类型:0 当前节点,1 向前执行,2 向后执行
max_try_timestinyint最大重试次数
failure_strategytinyint失败策略 0 失败后结束,1 失败后继续
warning_typetinyint告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
warning_group_idint告警组id
schedule_timedatetime预期运行时间
command_start_timedatetime开始命令时间
global_paramstext全局参数(固化流程定义的参数)
process_instance_jsonlongtext流程实例json(copy的流程定义的json)
flagtinyint是否可用,1 可用,0不可用
update_timetimestamp更新时间
is_sub_processint是否是子工作流 1 是,0 不是
executor_idint命令执行用户
locationstext节点坐标信息
connectstext节点连线信息
history_cmdtext历史命令,记录所有对流程实例的操作
dependence_schedule_timestext依赖节点的预估时间
process_instance_priorityint流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar任务指定运行的worker分组
timeoutint超时时间
tenant_idint租户id

③ t_ds_task_instance(任务实例表)

字段类型注释
idint主键
namevarchar任务名称
task_typevarchar任务类型
process_definition_idint流程定义id
process_instance_idint流程实例id
task_jsonlongtext任务节点json
statetinyint任务实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成
submit_timedatetime任务提交时间
start_timedatetime任务开始时间
end_timedatetime任务结束时间
hostvarchar执行任务的机器
execute_pathvarchar任务执行路径
log_pathvarchar任务日志路径
alert_flagtinyint是否告警
retry_timesint重试次数
pidint进程pid
app_linkvarcharyarn app id
flagtinyint是否可用:0 不可用,1 可用
retry_intervalint重试间隔
max_retry_timesint最大重试次数
task_instance_priorityint任务实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar任务指定运行的worker分组

④ t_ds_schedules(流程定时调度表):

字段类型注释
idint主键
process_definition_idint流程定义id
start_timedatetime调度开始时间
end_timedatetime调度结束时间
crontabvarcharcrontab 表达式
failure_strategytinyint失败策略:0 结束,1 继续
user_idint用户id
release_statetinyint状态:0 未上线,1 上线
warning_typetinyint告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
warning_group_idint告警组id
process_instance_priorityint流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar任务指定运行的worker分组
create_timedatetime创建时间
update_timedatetime更新时间

⑤ t_ds_command(执行命令表)

字段类型注释
idint主键
command_typetinyint命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程
process_definition_idint流程定义id
command_paramtext命令的参数(json格式)
task_depend_typetinyint节点依赖类型:0 当前节点,1 向前执行,2 向后执行
failure_strategytinyint失败策略:0结束,1继续
warning_typetinyint告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
warning_group_idint告警组
schedule_timedatetime预期运行时间
start_timedatetime开始时间
executor_idint执行用户id
dependencevarchar依赖字段
update_timedatetime更新时间
process_instance_priorityint流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar任务指定运行的worker分组

05

文末

本文是个人阅读DolphinScheduler一些见解,欢迎大家跟我交流~如有错误,请批评指正!

cfa237e76bb51dbbf949f4be0d8f1aa0.png

最后非常欢迎大家加入 DolphinScheduler 大家庭,融入开源世界!

我们鼓励任何形式的参与社区,最终成为 Committer 或 PPMC,如:

  • 将遇到的问题通过 GitHub 上 issue 的形式反馈出来。

  • 回答别人遇到的 issue 问题。

  • 帮助完善文档。

  • 帮助项目增加测试用例。

  • 为代码添加注释。

  • 提交修复 Bug 或者 Feature 的 PR。

  • 发表应用案例实践、调度流程分析或者与调度相关的技术文章。

  • 帮助推广 DolphinScheduler,参与技术大会或者 meetup 的分享等。

欢迎加入贡献的队伍,加入开源从提交第一个 PR 开始。

  • 比如添加代码注释或找到带有 ”easy to fix” 标记或一些非常简单的 issue(拼写错误等) 等等,先通过第一个简单的 PR 熟悉提交流程。

注:贡献不仅仅限于 PR 哈,对促进项目发展的都是贡献。

相信参与 DolphinScheduler,一定会让您从开源中受益!

参与贡献

随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。

da425780cbe0ba20e8448332272924b7.png

参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:

3301cc0d3667b294bcb519cb6858a989.png

贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689

非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22

如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html

来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。

参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。

添加小助手微信时请说明想参与贡献。

来吧,开源社区非常期待您的参与。

< 🐬🐬 >

更多精彩推荐

☞DophineSheduler上下游任务之间动态传参案例及易错点总结

☞ApacheCon Asia 2022 精彩回顾 | 如何让更多人从大数据中获益?

☞一文读懂,硬核 Apache DolphinScheduler3.0 源码解析

☞7W+任务实例,80+台任务节点,联通数科基于 DolphinScheduler 的差异化改造和升级

☞ApacheCon精彩回顾|思科网讯DolphinScheduler与k8S整合实践,提高大数据处理效率!

☞Apache DolphinScheduler PMC:我在社区里如何玩转开源?

☞ApacheCon Asia 2022 精彩回顾 | DolphinScheduler 在联想作为统一调度中心的落地实践

☞国民乳业巨头伊利如何基于 DolphinScheduler 开辟企业数字化转型“蹊径”?

☞示例讲解 | Apache DolphinScheduler 简单任务定义及复杂的跨节点传参

我知道你在看哟!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

收藏假期干货:Apache DolphinScheduler源码分析系列(超详细) 的相关文章

  • 在 htaccess 文件中使用 RewriteMap

    如何在 htaccess 文件中使用 RewriteMap 指令 当我把它放在那里时 我收到 此处不允许 RewriteMap 错误 我知道当将其放入 httpd conf 或虚拟主机配置文件中时 此错误将会消失 但我想知道是否可以将其放入
  • htaccess - XAMPP 上的互联网服务器错误 500 [已关闭]

    Closed 这个问题是无关 help closed questions 目前不接受答案 我尝试在 XAMPP 上运行脚本 但收到 互联网服务器错误 500 在实时服务器上运行时效果很好 我认为这是一个 htaccess 错误 htacce
  • mod_rewrite 有例外

    为了将我的服务器上的每个请求重定向到我使用的安全连接 RewriteCond SERVER PORT 80 RewriteRule https mywebsite com 1 R L 效果完美 但是我需要两条路径不被重定向 说我什么时候访问
  • 创建动态子域

    自从我考虑一些网站正在实施的此功能以来已经有一段时间了 它看起来非常成功 类似的网站tumblr com blogger com wordpress com允许用户使用简单的 HTML PHP 表单从网站内注册新的子域名 以我目前对 PHP
  • Apache:重定向用户,但保持相同的路径?

    我希望能够将用户重定向到不同的 TLD 但保持相同的路径 例如 如果用户访问 example com cars 10 使用 apache 如何将用户重定向到类似以下内容 my new site com cars 10 如果您的服务器启用了
  • 将 Node.js(用于实时通知)添加到现有 PHP 应用程序

    我有一个现有的 PHP 应用程序 我需要向其中添加实时通知 为了实现这一点 我安装了node js 打算添加socket io以实现所有实时功能 然而 尽管在过去的三个小时里研究并试图弄清楚如何将两者结合起来 但我发现自己并没有更接近于获得
  • 使用 Apache 允许 Glassfish 和 PHP 在同一服务器中协同工作

    是否可以建立从 Java 到 php 文件的桥梁 我有一个用 Java 编写的应用程序 我需要执行http piwik org http piwik org 这是用 PHP 编写的 在服务器中 我正在运行 PHP 但无法从浏览器访问 php
  • Apache 反向代理的基本身份验证问题

    我想为在 Ubuntu 服务器 12 04 1 上运行的 Apache 反向代理站点添加基本身份验证 网络应用程序是Jenkins http jenkins ci org运行在 Java EE 容器上 我在中添加了以下配置httpd con
  • 如果文件名减去扩展名,.htaccess url 重写行为将被覆盖。与网址相同

    我正在尝试整理 URL 并从中删除 php 扩展名等 我位于网站的基本文件夹中 因此没有可以优先处理的父 htaccess 文件或其他文件 这是我的 htaccess 代码 RewriteEngine On RewriteRule give
  • apache htaccess 将第一个段映射为参数而不干扰其他参数

    这可能是一个经典的 htaccess 问题 但我仍然找不到适合我的具体情况的问题 这是closest https stackoverflow com questions 9299793 apache httaccess rewriting我
  • phpinfo 说 php.ini 路径是 C:\Windows 但那里没有 php.ini

    我们正在尝试从 PHP5 切换到 PHP7 现在我们已经安装了 Apache 并且 PHP 可以运行了 然而 我们在php ini文件没有任何作用 Via phpinfo 我们意识到原因是Configuration File php ini
  • 仅第一个加载的 Django 站点有效

    我最近向 stackoverflow 提交了一个问题 标题为使用mod wsgi在apache上多次请求后Django无限加载 https stackoverflow com questions 71705909 django infini
  • 让 Rails 生产在端口 80 上运行

    我正在尝试让我的 Rails 应用程序在生产模式下运行 但遇到了一些困难 我正在使用 Passenger 和 apache 并运行 Ubuntu 12 04 我已经配置和创建了生产数据库 并设置了乘客 状态如下 rvmsudo passen
  • 使用代理时,React 应用程序正在不同位置查找静态文件

    我用过npx create react app my app创建一个反应应用程序 我用过的npm run build构建应用程序并使用它进行部署serve s build 我正在使用代理服务器来公开我的应用程序 我的 httpd 配置如下所
  • Apache 未发送 304 响应(如果启用了 mod_deflate 和 AddOutputFilterByType)

    我在 Apache httpd conf 中添加了以下行 AddOutputFilterByType DEFLATE text html text css application javascript application x javas
  • 当我使用 session_start() 时,Xampp 7.0.1 Apache 崩溃

    当我在 PHP 中使用 session start 启动会话时 我的 Apache 服务器停止工作 我正在使用 Windows 版 Xampp 7 0 1 我的配置文件如下所示 即使我把它放在文件的第一行 它也不起作用 有人知道如何解决这个
  • 如何授予 apache 使用 NTFS 分区上的目录的权限?

    我在一台带有 20GB 硬盘的旧机器上运行 Linux Lubutu 12 10 我有一个 1 TB 外部硬盘 上面有一个 NTFS 分区 在该分区上 有一个 www 目录 用于保存我的网页内容 它在启动时自动安装为 media t515
  • DBus 是我要找的吗?

    我需要一个Linux上的IPC系统 我的要求是 面向数据包 消息 能够处理点对点和一对多通信 没有层次结构 没有服务器和客户端 如果一个端点崩溃 必须通知其他端点 现有 Linux 发行版的良好支持 Apache 存在 绑定 用于创建动态页
  • 如何让 mod_wsgi 在 Mac 上运行?

    几个小时以来 我一直在尝试在 Mac 上安装最新版本的 mod wsgi 3 3 我使用的是 Snow Leopard 并且有系统附带的 Apache Apache 2 2 15 和 Python 2 6 1 r261 67515 版本 我
  • php隐藏所有错误[重复]

    这个问题在这里已经有答案了 隐藏的最佳做法是什么allPHP 错误 因为我不想向用户显示错误 我尝试过使用 htacess通过输入代码php flag display errors off在那里 但它返回给我一个500 error 还有其他

随机推荐