我提交的第一个Flink commit - Flink 闭包检查

2023-11-11

为什么闭包

Flink中算子都是通过序列化分发到各节点上,所以要确保算子对象是可以被序列化的。算子的成员变量,代码中的匿名内部类都是检查的范围。

闭包检查入库

被调用的入口是 StreamExecutionEnvironment#clean()
而真正执行闭包检查的是ClosureCleaner#clean()代码不复杂。我们之间看代码来分析

private static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable, Set<Object> visited) {
	if (func == null) {
		return;
	}
	if (!visited.add(func)) {
		return;
	}
	final Class<?> cls = func.getClass();
	if (ClassUtils.isPrimitiveOrWrapper(cls)) {
		return;
	}
	// 如果使用的自定义序列化方法 writeObject 或者 writeReplace
	if (usesCustomSerialization(cls)) {
		return;
	}
	// First find the field name of the "this$0" field, this can
	// be "this$x" depending on the nesting
	boolean closureAccessed = false;
	for (Field f: cls.getDeclaredFields()) {
		if (f.getName().startsWith("this$")) {
			// found a closure referencing field - now try to clean
			closureAccessed |= cleanThis0(func, cls, f.getName());
		} else {
			Object fieldObject;
			try {
				f.setAccessible(true);
				fieldObject = f.get(func);
			} catch (IllegalAccessException e) {
				throw new RuntimeException(String.format("Can not access to the %s field in Class %s", f.getName(), func.getClass()));
			}
			/*
			 * we should do a deep clean when we encounter an anonymous class, inner class and local class, but should
			 * skip the class with custom serialize method.
			 *
			 * There are five kinds of classes (or interfaces):
			 * a) Top level classes
			 * b) Nested classes (static member classes)
			 * c) Inner classes (non-static member classes)
			 * d) Local classes (named classes declared within a method)
			 * e) Anonymous classes
			 */
			if (level == ExecutionConfig.ClosureCleanerLevel.RECURSIVE && needsRecursion(f, fieldObject)) {
				if (LOG.isDebugEnabled()) {
					LOG.debug("Dig to clean the {}", fieldObject.getClass().getName());
				}
				clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true, visited);
			}
		}
	}
	if (checkSerializable) {
		try {
			InstantiationUtil.serializeObject(func);
		}
		catch (Exception e) {
			String functionType = getSuperClassOrInterfaceName(func.getClass());
			String msg = functionType == null ?
					(func + " is not serializable.") :
					("The implementation of the " + functionType + " is not serializable.");
			if (closureAccessed) {
				msg += " The implementation accesses fields of its enclosing class, which is " +
						"a common reason for non-serializability. " +
						"A common solution is to make the function a proper (non-inner) class, or " +
						"a static inner class.";
			} else {
				msg += " The object probably contains or references non serializable fields.";
			}
			throw new InvalidProgramException(msg, e);
		}
	}
}

第一步:查找闭包引用的成员变量,f.getName().startsWith("this$") 这块应该是scala里面才会存在这种"this$"开头,JAVA 的都会走else:

for (Field f: cls.getDeclaredFields()) {
	if (f.getName().startsWith("this$")) {
		// found a closure referencing field - now try to clean
		closureAccessed |= cleanThis0(func, cls, f.getName());
	} else {
	    // 省略了一些代码,在第二步中提到
		....
	}
}

第二步:深度清除除了成员变量的类。注释中提到了五种class

* a) Top level classes
* b) Nested classes (static member classes)
* c) Inner classes (non-static member classes)
* d) Local classes (named classes declared within a method)
* e) Anonymous classes
if (level == ExecutionConfig.ClosureCleanerLevel.RECURSIVE && needsRecursion(f, fieldObject)) {
	if (LOG.isDebugEnabled()) {
		LOG.debug("Dig to clean the {}", fieldObject.getClass().getName());
	}
    // 因为是对象,所以递归调用clean去闭包清除
	clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true, visited);
}

第三步:真正检查闭包的逻辑。逻辑简单也非常的暴力。直接给你来上一发序列化。不成功就报错。

try {
		InstantiationUtil.serializeObject(func);
	} catch (Exception e) {
	   String functionType = getSuperClassOrInterfaceName(func.getClass());
	   ....
	}
	

public static byte[] serializeObject(Object o) throws IOException {
	try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
				ObjectOutputStream oos = new ObjectOutputStream(baos)) {
		oos.writeObject(o);
		oos.flush();
		return baos.toByteArray();
	}
}

下面是我修改Bug的地方

当序列化失败的时候就会报错。报错的时候会取superclass 或者是 interface。具体看getSuperClassOrInterfaceName(func.getClass())

private static String getSuperClassOrInterfaceName(Class<?> cls) {
		Class<?> superclass = cls.getSuperclass();
		if (superclass.getName().startsWith("org.apache.flink")) {
			return superclass.getSimpleName();
		} else {
			for (Class<?> inFace : cls.getInterfaces()) {
				if (inFace.getName().startsWith("org.apache.flink")) {
					return inFace.getSimpleName();
				}
			}
			return null;
		}
	}

如果这方法传入 Object.class 的时候 cls.geetSuperClass() 是会抛出空指针异常的。所以很自然的这个地方加个判空即可。
if (superclass == null) { return null; }

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

我提交的第一个Flink commit - Flink 闭包检查 的相关文章

  • 53.Eight (15分待续)

    题目内容 在3 3的棋盘上 摆有八个棋子 每个棋子上标有1至8的某一数字 棋盘中留有一个空格 空格用0来表示 空格周围的棋子可以移到空格中 要求解的问题是 给出一种初始布局和目标布局 为了使题目简单 设目标状态为 1 2 3 8 0 4 7
  • 用SPM技术固定EBS标准功能的SQL执行计划

    Introduction介绍 本文是Oracle SPM技术的一个应用实例 分享给没了解过SPM或者没用过SPM的老铁们 通过本文 应该要了解什么是SPM 它的作用是什么 它的应用场景是什么 这个应用实例总结就是 通过使用SPM技术 固定S
  • 找错:ZdalRuleCalculateException: 规则引擎计算出错,拆分值=

    序言 这一段是我遇上问题的过程 可以直接跳转到下面的正文 在刚写的博客 Zdal分库分表介绍 超详细一步一步搭建简单的zdal框架 中 是通过向线程中存放数据库远程路由来指定操作哪个数据库 在mybatis执 行插引用块内容入操作时 会从数
  • JVM调优

    1 查看当前jvm使用详情 java XX PrintGCDetails或者 Xloggc data jvm gc log或者 verbose gc 2 JVM调优工具 jps l 查看java程序pid 3 jstat gc pid 50

随机推荐

  • IMT-2030(6G)推进组发布《6G总体愿景与潜在关键技术》白皮书

    来源 中国信通院CATCT 编辑 蒲蒲 当前 新一轮科技革命和产业变革突飞猛进 随着5G商用的大规模部署 全球业界已开启对下一代移动通信 6G 的探索研究 日前 IMT 2030 6G 推进组 以下简称 推进组 正式发布 6G总体愿景与潜在
  • 编译原理四元式实验_记一次编译原理实验——词法+语法&语义分析(Python实现)...

    记一次编译原理实验 词法 语法 语义分析 Python实现 编译原理是每个CS学子的必修课 编译原理的实验课对提高我们对编译过程的理解很重要 本文章写于实验结束后 所有代码已上传至Github https github com kabu12
  • Netty入门(六) WebSocket协议开发

    一 WebSocket协议介绍 1 WebSocket协议介绍 WebSocket协议是和Http协议同地位的应用层协议 都是基于TCP协议之上 但是其是以Http协议为基础的 未了解决Http协议半双工通信模式且数据冗杂的缺点 HTML5
  • 【三维目标分类】PointNet++详解(一)

    本文为博主原创文章 未经博主允许不得转载 本文为专栏 python三维点云从基础到深度学习 系列文章 地址为 https blog csdn net suiyingy article details 124017716 上一节主要介绍了Po
  • 2023最新如何轻松升级、安装和试用Navicat Premium 16.2.10 教程详解

    博主猫头虎 带您 Go to New World 博客首页 猫头虎的博客 面试题大全专栏 文章图文并茂 生动形象 简单易学 欢迎大家来踩踩 IDEA开发秘籍专栏 学会IDEA常用操作 工作效率翻倍 100天精通Golang 基础入门篇 学会
  • 数组的初识

    目录 1 初识数组 2 数组的简单操作 2 1读取元素 2 2更新元素 2 3插入元素 2 4删除元素 1 初识数组 数组是有限个相同类型的变量所组成的有序集合 数组中的每一个变量被称为元素 数组是最简单 最为常用的数据结构 以整型数组为例
  • HTML5全栈工程师好就业吗

    2017突然流行起来的一个新职位 全栈工程师 大概在很多人眼里 全栈工程师是一个全能人才 事实的确如此 以web前端为主 需求 后台 前台 用户 设计等内容为辅 全栈工程师拥有更广阔的视野和更广泛的学识 全栈工程师可以从更高的角度去看待问题
  • 基于FPGA的卷积神经网络实现(一)简介

    目录 简介 框架 资源分配 1 资源分配 2 数据量化 1 数据量化 2 数据读写 卷积模块 池化 全连接与输出 事先声明 仅用于记录和讨论 有任何问题欢迎批评指正 只是觉得菜的大佬们请绕路 就不用在这里说大实话了 因为本身就是一个粗糙的d
  • 2021-11-16尤破金11.16黄金原油今日行情涨跌趋势分析及周二多空操作建议布局

    黄金最新行情分析 黄金消息面解析 周一现货黄金持稳于1863附近 上周五金价探底回升并实现七连涨 受到美国消费信心大幅下滑和美元走软的支撑 自11月以来 金价已上涨高达110美元 这得益于对通胀的担忧加深 以及主要央行保证将暂时把利率保持在
  • jsp页面获取参数的方法(url解析、el表达式赋值、session取值)【原创】

    最近使用myEclispse做网站 使用jsp js css做页面 网站中常用到从列表进入详情页面的跳转 下面对详情页面的值填充方式做一个简单总结 1 url中使用 request获取参数 jsp上方添加type参数
  • 23个CVPR 2020收录的新数据集,都在这里了!

    编辑 Amusi Date 2020 06 20 来源 CVer微信公众号 链接 23个CVPR 2020收录的新数据集 都在这里了 前言 Amusi 之前整理了1467篇CVPR 2020所有论文PDF下载资源 以及300篇 CVPR 2
  • HBuilder html 乱码解决,java eclipse等应当同理

    1 网上下载了一套html代码 出现乱码 一般就是编码格式问题 2 解决方式 更改编码 3 剪切所有内容 然后右下角修改编码为utf 8 4 再粘贴内容 5 访问页面
  • (必行方案)PPT快捷键复制一次粘贴两次问题

    问题 复制的时候复制一次 粘贴的时候在Word Excel都正常但是PPT里面不正常 只有使用PPT的时候出现粘贴两次的问题 Word和Excel中Ctrl C后Ctrl V粘贴一次 没有问题 PPT中Ctrl V时粘贴2次 而用右键粘贴方
  • 系数矩阵与系统稳定性的关系

    对连续时间线性定常控制系统而言 系统内部渐近稳定的充分必要条件是其系数矩阵A的特征值都在复平面的左半开平面内 对连续时间线性定常控制系统而言 系统输入 输出稳定的充分必要条件是其特征方程的根 传递函数的极点 全都在复平面的左半平面内 离散时
  • Linux的简单介绍

    LINUX操作系统是一种免费使用和自由传播的类UNIX操作系统 其内核由林纳斯 托瓦兹于1991年10月5日首次发布 是一个基于POSIX的多用户 多任务 支持多线程和多CPU的操作系统 它能运行主要的Unix工具软件 应用程序和网络协议
  • 03-03 周五 镜像安装sshd和jupyter以及修改密码

    03 03 周五 镜像安装sshd和jupyter以及修改密码 时间 版本 修改人 描述 2023年3月3日15 34 49 V0 1 宋全恒 新建文档 简介 由于在镜像中需要进行jupyter和sshd的安装 并且需要进行密码的修改 因此
  • 在 Windows 10下安装Flutter+Dart+Android Studio 配置Flutter开发环境

    在 Windows 10下安装Flutter Dart Android Studio 配置Flutter开发环境 文章首发地址 配置环境变量 由于部分网站被墙的原因 我们需要先配置Flutter国内镜像地址 这两个地址是由Flutter官方
  • html css开关按钮样式,纯CSS实现开关按钮

    上面这种开关按钮在现代网页UI设计中经常出现 代替了以前丑陋的checkbox 在很多UI框架中如elementUI都有组件可以直接使用 但是画出这样一个开关是十分简单的 不需要借助JS代码就可以实现 核心思路就是将原有input框进行隐藏
  • github响应时间过长且修改hosts无效【解决办法】

    打开ipaddress com 分别键入 github global ssl fastly net 和 github com 查询到对应的IP地址 打开hosts文件 并新增 hosts文件地址 win下 C Windows System3
  • 我提交的第一个Flink commit - Flink 闭包检查

    为什么闭包 Flink中算子都是通过序列化分发到各节点上 所以要确保算子对象是可以被序列化的 算子的成员变量 代码中的匿名内部类都是检查的范围 闭包检查入库 被调用的入口是 StreamExecutionEnvironment clean