DATAX_HOOK,怎么实现的
JobContainer 类Datax的job执行类。
// JobContainer 类
// An highlighted block
// JobContainer 类关于
finally {
if (!isDryRun) {
this.destroy();
this.endTimeStamp = System.currentTimeMillis();
if (!hasException) {
//最后打印cpu的平均消耗,GC的统计
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
vmInfo.getDelta(false);
LOG.info(vmInfo.totalString());
} LOG.info(PerfTrace.getInstance().summarizeNoException());
this.logStatistics(jobStatistics);
DirtyRecordContext.clear();
//这里就是调用Hook
this.invokeHooks();
}
}
}
invokeHooks 方法。
// invokeHooks 方法
// An highlighted block
/**
* 调用外部hook
*/
private void invokeHooks() {
Communication comm = super.getContainerCommunicator().collect();
//这里就是存放实现hook接口的class类
HookInvoker invoker = new HookInvoker(CoreConstant.DATAX_HOME + "/hook", configuration, comm.getCounter(), jobStatistics);
invoker.invokeAll();
}
invokeAll方法 。
//invokeAll方法 。
// An highlighted block
public void invokeAll() {
if (!baseDir.exists() || baseDir.isFile()) {//这里就是判断/hook文件夹下面有没有class文件,这class文件就是Hook接口的实现类
LOG.info("No hook invoked, because base dir not exists or is a file: " + baseDir.getAbsolutePath());
return;
}
String[] subDirs = baseDir.list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return new File(dir, name).isDirectory();
}
});
if (subDirs == null) {
throw DataXException.asDataXException(FrameworkErrorCode.HOOK_LOAD_ERROR, "获取HOOK子目录返回null");
}
for (String subDir : subDirs) {
//实际上就是这里调用
doInvoke(new File(baseDir, subDir).getAbsolutePath());
}
}
doInvoke方法。
doInvoke方法
Class文件实现 HooK接口里面的 invoke 方法
/**
* 遍历path路径下的所有jar文件,解析。这些jar包里的class都是hook的实现类,然后执行invoke实现方法
* 好处:可以无限制的添加hook的实现类
* @param path
*/
private void doInvoke(String path) {
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
try {
//根据path(文件夹里面的jar,包含hook实现类的class,以及相关的依赖的jar包),获取实现类的class
JarLoader jarLoader = new JarLoader(new String[]{path});
//JarLoader 是继承与 URLClassLoader,这个类的作用就是根据路径读取jar包
//这一步是为了把读取的jar包,加入到jvm里面
Thread.currentThread().setContextClassLoader(jarLoader);
Iterator<Hook> hookIt = ServiceLoader.load(Hook.class).iterator();
if (!hookIt.hasNext()) {
LOG.warn("No hook defined under path: {}", path);
} else {
while (hookIt.hasNext()) {
Hook hook = hookIt.next();
LOG.info("Invoke hook [{}], path: {}", hook.getName(), path);
hook.invoke(conf, msg, jobStatistics);
}
}
} catch (Exception e) {
LOG.error("Exception when invoke hook");
throw DataXException.asDataXException(
CommonErrorCode.HOOK_INTERNAL_ERROR, "Exception when invoke hook", e);
} finally {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
}
Hook的接口
// Hook的接口
public interface Hook {
/**
* 返回名字
*
* @return
*/
public String getName();
/**
* TODO 文档
*
* @param jobConf
* @param msg
* @param jobStatistics Datax job监控信息
*/
void invoke(Configuration jobConf, Map<String, Number> msg, JobStatistics jobStatistics);
}