前面五个类,殊途同归都是CliDriver
类,他负责接受用户在命令行上输入的信息,然后准备执行并将执行的结果返回。
而真正底层干事情的是Driver
,他将接受到的命令编译,优化为MR(或RDD),真正的调动集群跑作业。
在processLocalCmd
中有这样一句ret = qp.run(cmd).getResponseCode();
,这句中的run
不是CliDriver
的run
,而是Driver
的run
。
org.apache.hadoop.hive.ql.Driver.run
org.apache.hadoop.hive.ql.Driver
----------
public CommandProcessorResponse run(String command) throws CommandNeedRetryException {
return this.run(command, false);
}
public CommandProcessorResponse run() throws CommandNeedRetryException {
return this.run((String)null, true);
}
public CommandProcessorResponse run(String command, boolean alreadyCompiled) throws CommandNeedRetryException {
CommandProcessorResponse cpr = this.runInternal(command, alreadyCompiled);
if (cpr.getResponseCode() == 0) {
return cpr;
} else {
SessionState ss = SessionState.get();
if (ss == null) {
return cpr;
} else {
MetaDataFormatter mdf = MetaDataFormatUtils.getFormatter(ss.getConf());
if (!(mdf instanceof JsonMetaDataFormatter)) {
return cpr;
} else {
try {
if (this.downstreamError == null) {
mdf.error(ss.out, this.errorMessage, cpr.getResponseCode(), this.SQLState);
return cpr;
}
ErrorMsg canonicalErr = ErrorMsg.getErrorMsg(cpr.getResponseCode());
if (canonicalErr != null && canonicalErr != ErrorMsg.GENERIC_ERROR) {
mdf.error(ss.out, this.errorMessage, cpr.getResponseCode(), this.SQLState, (String)null);
return cpr;
}
if (this.downstreamError instanceof HiveException) {
HiveException rc = (HiveException)this.downstreamError;
mdf.error(ss.out, this.errorMessage, rc.getCanonicalErrorMsg().getErrorCode(), this.SQLState, rc.getCanonicalErrorMsg() == ErrorMsg.GENERIC_ERROR ? StringUtils.stringifyException(rc) : null);
} else {
ErrorMsg canonicalMsg = ErrorMsg.getErrorMsg(this.downstreamError.getMessage());
mdf.error(ss.out, this.errorMessage, canonicalMsg.getErrorCode(), this.SQLState, StringUtils.stringifyException(this.downstreamError));
}
} catch (HiveException var8) {
console.printError("Unable to JSON-encode the error", StringUtils.stringifyException(var8));
}
return cpr;
}
}
}
}
run
只是Driver
的入口,处在QL
阶段,后面还有:
所以只看关键部分。
org.apache.hadoop.hive.ql.Driver.runInternal
然后看runInternal
方法:
org.apache.hadoop.hive.ql.Driver
----------
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled) throws CommandNeedRetryException {
......
perfLogger = null;
PerfLogger perfLogger;
int ret;
if (!alreadyCompiled) {
ret = this.compileInternal(command, true);
perfLogger = SessionState.getPerfLogger();
if (ret != 0) {
CommandProcessorResponse var8 = this.createProcessorResponse(ret);
return var8;
}
} else {
perfLogger = SessionState.getPerfLogger();
this.plan.setQueryStartTime(perfLogger.getStartTime("Driver.run"));
}
......
ret = this.execute(true);
if (ret != 0) {
var10 = this.rollback(this.createProcessorResponse(ret));
return var10;
}
......
}
org.apache.hadoop.hive.ql.Driver.compileInternal
继续看compileInternal
方法
org.apache.hadoop.hive.ql.Driver
----------
private int compileInternal(String command, boolean deferClose) {
ReentrantLock compileLock = this.tryAcquireCompileLock(this.isParallelEnabled, command);
if (compileLock == null) {
return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();
} else {
int ret;
try {
ret = this.compile(command, true, deferClose);
} finally {
compileLock.unlock();
}
if (ret != 0) {
try {
this.releaseLocksAndCommitOrRollback(false, (HiveTxnManager)null);
} catch (LockException var8) {
LOG.warn("Exception in releasing locks. " + StringUtils.stringifyException(var8));
}
}
PerfLogger perfLogger = SessionState.getPerfLogger();
this.queryDisplay.setPerfLogStarts(Phase.COMPILATION, perfLogger.getStartTimes());
this.queryDisplay.setPerfLogEnds(Phase.COMPILATION, perfLogger.getEndTimes());
return ret;
}
}
org.apache.hadoop.hive.ql.Driver.compile
org.apache.hadoop.hive.ql.Driver
----------
public int compile(String command, boolean resetTaskIds, boolean deferClose) {
......
ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(command, ctx);
tree = ParseUtils.findRootNonNullToken(tree);
......
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
......
sem.analyze(tree, ctx);
......
acidSinks = sem.getAcidFileSinks();
LOG.info("Semantic Analysis Completed");
sem.validate();
acidInQuery = sem.hasAcidInQuery();
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
if (isInterrupted()) {
return handleInterruption("after analyzing query.");
}
schema = getSchema(sem, conf);
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,queryState.getHiveOperation(), schema);
......
}
Driver
主要进行SQL的语法解析和语义分析,他调用parse
将语句转换为ASTNode
,调用analyze
将ASTNode
转换为Task
。然后使用Task
构建QueryPlan
。
构建的QueryPlan
将会被保存在Driver
中,然后交由execute
进行执行。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)