为什么闭包
Flink中算子都是通过序列化分发到各节点上,所以要确保算子对象是可以被序列化的。算子的成员变量,代码中的匿名内部类都是检查的范围。
闭包检查入库
被调用的入口是 StreamExecutionEnvironment#clean()
而真正执行闭包检查的是ClosureCleaner#clean()
代码不复杂。我们之间看代码来分析
private static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable, Set<Object> visited) {
if (func == null) {
return;
}
if (!visited.add(func)) {
return;
}
final Class<?> cls = func.getClass();
if (ClassUtils.isPrimitiveOrWrapper(cls)) {
return;
}
// 如果使用的自定义序列化方法 writeObject 或者 writeReplace
if (usesCustomSerialization(cls)) {
return;
}
// First find the field name of the "this$0" field, this can
// be "this$x" depending on the nesting
boolean closureAccessed = false;
for (Field f: cls.getDeclaredFields()) {
if (f.getName().startsWith("this$")) {
// found a closure referencing field - now try to clean
closureAccessed |= cleanThis0(func, cls, f.getName());
} else {
Object fieldObject;
try {
f.setAccessible(true);
fieldObject = f.get(func);
} catch (IllegalAccessException e) {
throw new RuntimeException(String.format("Can not access to the %s field in Class %s", f.getName(), func.getClass()));
}
/*
* we should do a deep clean when we encounter an anonymous class, inner class and local class, but should
* skip the class with custom serialize method.
*
* There are five kinds of classes (or interfaces):
* a) Top level classes
* b) Nested classes (static member classes)
* c) Inner classes (non-static member classes)
* d) Local classes (named classes declared within a method)
* e) Anonymous classes
*/
if (level == ExecutionConfig.ClosureCleanerLevel.RECURSIVE && needsRecursion(f, fieldObject)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dig to clean the {}", fieldObject.getClass().getName());
}
clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true, visited);
}
}
}
if (checkSerializable) {
try {
InstantiationUtil.serializeObject(func);
}
catch (Exception e) {
String functionType = getSuperClassOrInterfaceName(func.getClass());
String msg = functionType == null ?
(func + " is not serializable.") :
("The implementation of the " + functionType + " is not serializable.");
if (closureAccessed) {
msg += " The implementation accesses fields of its enclosing class, which is " +
"a common reason for non-serializability. " +
"A common solution is to make the function a proper (non-inner) class, or " +
"a static inner class.";
} else {
msg += " The object probably contains or references non serializable fields.";
}
throw new InvalidProgramException(msg, e);
}
}
}
第一步:查找闭包引用的成员变量,f.getName().startsWith("this$")
这块应该是scala里面才会存在这种"this$"开头,JAVA 的都会走else:
for (Field f: cls.getDeclaredFields()) {
if (f.getName().startsWith("this$")) {
// found a closure referencing field - now try to clean
closureAccessed |= cleanThis0(func, cls, f.getName());
} else {
// 省略了一些代码,在第二步中提到
....
}
}
第二步:深度清除除了成员变量的类。注释中提到了五种class
* a) Top level classes
* b) Nested classes (static member classes)
* c) Inner classes (non-static member classes)
* d) Local classes (named classes declared within a method)
* e) Anonymous classes
if (level == ExecutionConfig.ClosureCleanerLevel.RECURSIVE && needsRecursion(f, fieldObject)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dig to clean the {}", fieldObject.getClass().getName());
}
// 因为是对象,所以递归调用clean去闭包清除
clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true, visited);
}
第三步:真正检查闭包的逻辑。逻辑简单也非常的暴力。直接给你来上一发序列化。不成功就报错。
try {
InstantiationUtil.serializeObject(func);
} catch (Exception e) {
String functionType = getSuperClassOrInterfaceName(func.getClass());
....
}
public static byte[] serializeObject(Object o) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(o);
oos.flush();
return baos.toByteArray();
}
}
下面是我修改Bug的地方
当序列化失败的时候就会报错。报错的时候会取superclass 或者是 interface。具体看getSuperClassOrInterfaceName(func.getClass())
private static String getSuperClassOrInterfaceName(Class<?> cls) {
Class<?> superclass = cls.getSuperclass();
if (superclass.getName().startsWith("org.apache.flink")) {
return superclass.getSimpleName();
} else {
for (Class<?> inFace : cls.getInterfaces()) {
if (inFace.getName().startsWith("org.apache.flink")) {
return inFace.getSimpleName();
}
}
return null;
}
}
如果这方法传入 Object.class 的时候 cls.geetSuperClass() 是会抛出空指针异常的。所以很自然的这个地方加个判空即可。
if (superclass == null) { return null; }