简介
elasticjob是基于quartz构建支持分片的分布式弹性可伸缩的job执行组件
zookeeper节点数据设计
job
leader
election
latch
instance //主节点的实例ID 临时节点 在节点选举成功后添加
sharding
necessary
processing //临时节点标记 分片是否正在进行
servers
10.2.123.152
123.254.26.23
instances
456 //临时节点 运行实例
235
sharding
0
instance = 10.7.1.2@-@456
running //标记该分片的状态正在运行
1
instance = 10.7.1.2@-@456
1 在线的实例节点设计为临时节点
public void persistOnline() {
jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
}
2 标记分片正在进行中的标识
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
public void shardingIfNecessary() {
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
if (!leaderService.isLeaderUntilBlock()) {
blockUntilShardingCompleted();
return;
}
waitingOtherShardingItemCompleted();
LiteJobConfiguration liteJobConfig = configService.load(false);
int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
resetShardingInfo(shardingTotalCount);
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
log.debug("Job '{}' sharding complete.", jobName);
}
分片事务结束时候需要删除节点
curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();
3 实例正在运行的状态的标记
public void registerJobBegin(final ShardingContexts shardingContexts) {
JobRegistry.getInstance().setJobRunning(jobName, true);
if (!configService.load(true).isMonitorExecution()) {
return;
}
for (int each : shardingContexts.getShardingItemParameters().keySet()) {
jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
}
}
任务开始执行时候即会注册相应的实例分片运行状态
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
}
return;
}
jobFacade.registerJobBegin(shardingContexts);
String taskId = shardingContexts.getTaskId();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
}
try {
process(shardingContexts, executionSource);
} finally {
// TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
jobFacade.registerJobCompleted(shardingContexts);
if (itemErrorMessages.isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
}
} else {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
}
}
}
}
原理分析之初始化
使用时我们定义执行ElasticJob,但是ElasticJob底层执行的必然是quartz的Job,在源码中是 LiteJob
public final class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
那LiteJob 是如何被初始化创建并start的,实际上 elasticjob中具体的任务将会被封