请读下面的这句绕口令:ResourceManager中的Resource Estimator框架介绍与算法剖析

2023-11-10

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~

本文由宋超发表于云+社区专栏

本文首先介绍了Hadoop中的ResourceManager中的estimator service的框架与运行流程,然后对其中用到的资源估算算法进行了原理剖析。

一. Resource Estimator Service的出发点与目标

估计作业运行使用资源是大数据处理集群的一个重要且具有挑战性的问题。随着用户使用的集群资源越来越多,这一需求被逐渐放大。当前现有的解决方案一般是依赖于用户的经验来对作业资源需求进行估计,这样即繁琐又低效。根据对集群工作负载的分析,可以发现大部分工作(超过60%)是重复工作,这样我们便有机会根据作业历史资源使用情况来估计作业下一次的资源需求量。同时,在未来,希望能提出一种与框架无关的黑盒解决方案。这样,即使作业来自不同的计算框架,我们也能对重复性作业进行资源需求估算。

二. Resource Estimator Service的框架结构

img

Hadoop-resource estimator主要由三个模块组成:Translator,SkylineStore和Estimator。下面分别介绍这三部分。

1.ResourceSkyline用来表征作业在其生命周期中的资源利用率。它使用RLESparseResourceAllocation记录容器分配的信息。RecurrenceId用于标识重复pipeline的特定运行。pipeline可以包含多个作业,每个作业都有一个ResourceSkyline来表征其资源利用率。

2.Translator用来解析作业日志,提取他们的ResourceSkylines并将它们存储到SkylineStore。SingleLineParser解析日志流中的一行并提取ResourceSkyline。

3.SkylineStore充当Hadoop-resource estimator的存储层,由2部分组成。HistorySkylineStore存储由转换程序提取的ResourceSkylines。它支持四种操作:addHistory,deleteHistory,updateHistory和getHistory。addHistory将新的ResourceSkylines附加到定期pipeline,而updateHistory删除特定定期pipeline的所有ResourceSkyline,并重新插入新的ResourceSkylines。PredictionSkylineStore存储由Estimator生成的预测RLESparseResourceAllocation。它支持两个操作:addEstimation和getEstimation。

4.Estimator根据历史记录运行预测重复出现的pipeline资源需求,将预测存储到SkylineStore并在YARN上进行资源预留。Solver读取特定定期pipeline的所有历史ResourceSkylines,并预测其包含在RLESparseResourceAllocation中的新资源需求。目前,Hadoop-resource estimator提供了一个LPSOLVER来进行预测(其中用到的算法模型会在后面进行讲解)。

三.以示例demo演示其运行流程

Resource Estimator Service的URI是http://0.0.0.0,默认服务端口是9998

(在$ ResourceEstimatorServiceHome/conf/resourceestimator-config.xml” 中配置)。 在$ ResourceEstimatorServiceHome/data中,有一个示例日志文件resourceEstimatorService.txt,其中包含2次运行的tpch_q12查询作业的日志。进行资源预测主要有以下几个步骤:

1.解析作业日志:

POST http://URI:port/resourceestimator/translator/LOG_FILE_DIRECTORY

发送

POST http://0.0.0.0:9998/resourceestimator/translator/data/resourceEstimatorService.txt

underlying estimator将从日志文件中提取ResourceSkylines并将它们存储在jobHistory SkylineStore中。

2.查询作业的历史ResourceSkylines:

GET http://URI:port/resourceestimator/skylinestore/history/{pipelineId}/{runId}

发送

GET http://0.0.0.0:9998/resourceestimator/skylinestore/history/*/*

underlying estimator将返回历史SkylineStore中的所有记录。在示例文件中能够看到两次运行tpch_q12的ResourceSkylines:tpch_q12_0和tpch_q12_1。

3.预测作业的资源使用情况:

GET http://URI:port/resourceestimator/estimator/{pipelineId}

发送

http://0.0.0.0:9998/resourceestimator/estimator/tpch_q12

estimator将根据其历史ResourceSkylines预测新运行的作业资源需求,并将预测的资源需求存储到jobEstimation SkylineStore。

4.查询作业的预测资源情况:

GET http://URI:port/resourceestimator/skylinestore/estimate/{pipelineId}

发送

http://0.0.0.0:9998/resourceestimator/skylinestore/estimation/tpch_q12

估算器将返回tpch_q12作业资源预测情况。

5.删除作业的历史资源情况数据:

DELETE http://URI:port/resourceestimator/skylinestore/history/{pipelineId}/{runId}

发送

http://0.0.0.0:9998/resourceestimator/skylinestore/history/tpch_q12/tpch_q12_0

underlying estimator将删除tpch_q12_0的ResourceSkyline记录。重新发送

GET http://0.0.0.0:9998/resourceestimator/skylinestore/history/*/*

underlying estimator只返回tpch_q12_1的ResourceSkyline。

四.资源预测算法中用到的数据介绍

Hadoop-resource estimator的Translator组件会解析日志并将其按照一定规范的格式进行拼接,下面给出了示例中的资源历史使用数据和预测资源数据,可以看到作业的历史资源使用数据是同一个job的两次run,分别为tpch_q12_0和tpch_q12_1,其主要给出了随时间变化的memory和cpu的使用情况。其中第0时间单位表示的是container规格,为memory:1024,vcores:1,第25时间单位为作业结束时刻,memory和cpu皆为0。可以看到预测数据根据历史数据给出了10~25时间单位的资源预测数据。

历史资源使用数据:

[[{"pipelineId":"tpch\_q12","runId":"tpch\_q12\_0"},

[{"jobId":"tpch\_q12\_0","jobInputDataSize":0.0,"jobSubmissionTime":0,"jobFinishTime":25,"containerSpec":{"memory":1024,"vcores":1},

"skylineList":

{"resourceAllocation":{

"0":{"memory":1024,"vcores":1},

"10":{"memory":1099776,"vcores":1074},

"15":{"memory":2598912,"vcores":2538},

"20":{"memory":2527232,"vcores":2468},

"25":{"memory":0,"vcores":0}}}}]],

[{"pipelineId":"tpch\_q12","runId":"tpch\_q12\_1"},

[{"jobId":"tpch\_q12\_1","jobInputDataSize":0.0,"jobSubmissionTime":0,"jobFinishTime":25,"containerSpec":{"memory":1024,"vcores":1},

"skylineList":

{"resourceAllocation":{

"0":{"memory":1024,"vcores":1},

"10":{"memory":813056,"vcores":794},

"15":{"memory":2577408,"vcores":2517},

"20":{"memory":2543616,"vcores":2484},

"25":{"memory":0,"vcores":0}}}}]]]

预测数据:

{"resourceAllocation":

"10":{"memory":1083392,"vcores":1058},

"15":{"memory":2598912,"vcores":2538},

"20":{"memory":2543616,"vcores":2484},

"25":{"memory":0,"vcores":0}}}
五.Resource Estimator Service算法框架与原理

在本部分将重点介绍一下estimator中用到的资源预测算法原理。此算法由微软提出,其链接在文末参考资料中给出。下图是estimator的运行框架,可以看到其主要由三部分组成,下面分别介绍这三部分。

imgimage

  1. Automatic interence,提取出作业的运行时间和历史资源使用情况。 (a) Extractor of target,能提取出作业的运行开始与结束时间。 (b) Job resource model,能提取出作业的资源使用情况,例如作业资源随时间运行的变化情况和资源使用总量。
  2. Recurring Reservation,此部分包括有Job Resource Model,可以根据作业历史运行时间与作业历史资源使用情况给出下一任务的资源使用情况。 (a) 通过改变参数α,可以控制estimator在分配资源的时候是侧重过分配还是侧重欠分配。 (b) 根据作业资源预测模型给出的预测值为作业在原来分配的资源的基础上添加资源添加agenda。此job下一个run就运行在此资源分配的基础上。
  3. Dynamic Reprovisioning,此部分根据前面给出的资源agenda,动态调整作业的每个运行阶段的资源分配。
六.算法原理剖析

微软提出的此资源分配算法本质上是一种最优化算法,其优化的目标函数是由两部分组成的线性组合,下文中stage的概念是指每个job的运行期间按照一定规则划分成多个时间片,每个时间片称之为一个stage,下面分步骤阐述其算法原理。

1.首先定义一个目标函数,也可以称之为损失函数,即我们优化的目标。在此算法中由过分配和欠分配组成的线性组合组成损失函数costfunction。目标就是minimize(cost=αA0(s)+(1−α)Au(s))。其中A0(s)表示在当前stage的资源过分配值,其是由当前stage的分配值减去此stage的历史资源使用均值然后取平均得到,其公式表示为A0(s)=1N∑Ni=1∑k(sk−si,k)+,sk即为当前的资源分配值,si,k即为第i次run的历史资源使用值;Au(s)表示当前stage的欠分配值,其是由上一stage的欠分配值加上当前stage的欠分配值得到,公式表示如下:Di,k(s1,…,sk)=(Di,k+si,k−sk)+,Au(s)=1N∑Ni=1Di,k(s),下图比较直观的显示了estimator在预测资源时的一种过分配与欠分配的情况。

img

2.针对每个stage,此算法的策略就是选择可以使得costfunction最小的资源分配方式,即选择一个值使得costfunction最小,即得到Sk,即每一个stage上的资源分配值。 因为分配值是固定规格的倍数,所以在实现时可以通过简单的for循环或者一些最优化算法比如爬山法或者蚁群算法就可以快速得到最小值。

3.总结:算法中的做法是针对一个job,根据其历史运行时间拿到其作业开始和结束时间,在这时间段内按照一定规则划分时间片,每一个时间片为一个stage,根据同一job多次run的历史资源使用情况来预测下一run的资源使用情况。其每次配置的策略是使得costfunction最小。costfunction是过分配与欠分配的一个线性组合。

七.算法的测试效果

在本次测试中运行tpch_q12作业9次,并在每次运行中收集作业的资源skylines。然后,在Resource Estimator Service中运行日志解析器,从日志中提取ResourceSkylines并将它们存储在SkylineStore中。下面绘制了作业的ResourceSkylines以进行演示。

img

在Resource Estimator Service中运行估算器来预测新运行的资源需求,下面绘制了预测的资源需求数据。可以看到预测数据根据历史资源使用情况较好地表征了下一次运行的资源使用数据,有一定的参考意义。另外在实际场景业务上的测试效果还有待考证。

img

八.参考

1.Resourcemanager Estimator Service

2.微软算法文章

相关阅读
简单聊聊py的高性能编程
Prometheus 初体验
IF函数——放松工作,享受生活!
【每日课程推荐】机器学习实战!快速入门在线广告业务及CTR相应知识

此文已由作者授权腾讯云+社区发布,更多原文请点击

搜索关注公众号「云加社区」,第一时间获取技术干货,关注后回复1024 送你一份技术课程大礼包!

海量技术实践经验,尽在云加社区

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

请读下面的这句绕口令:ResourceManager中的Resource Estimator框架介绍与算法剖析 的相关文章

  • Hive如何存储数据,什么是SerDe?

    当查询表时 SerDe 将将文件中的字节中的一行数据反序列化为 Hive 内部使用的对象来操作该行数据 执行 INSERT 或 CTAS 时 请参阅第 441 页上的 导入数据 表的 SerDe 将将 Hive 的一行数据的内部表示序列化为
  • 错误:java.io.IOException:错误值类:类 org.apache.hadoop.io.Text 不是类 Myclass

    我的映射器和减速器如下 但我遇到了某种奇怪的异常 我不明白为什么它会抛出这种异常 public static class MyMapper implements Mapper
  • 将 hadoop fs 路径转换为 ​​EMR 上的 hdfs:// 路径

    我想知道如何将数据从 EMR 集群的 HDFS 文件系统移动到 S3 存储桶 我认识到我可以直接在 Spark 中写入 S3 但原则上 之后执行它也应该很简单 到目前为止 我还没有发现在实践中这是正确的 AWS 文档建议s3 dist cp
  • Hive ParseException - 无法识别“结束”“字符串”附近的输入

    尝试从现有 DynamoDB 表创建 Hive 表时出现以下错误 NoViableAltException 88 at org apache hadoop hive ql parse HiveParser IdentifiersParser
  • Hive查询快速查找表大小(行数)

    是否有 Hive 查询可以快速查找表大小 即行数 而无需启动耗时的 MapReduce 作业 这就是为什么我想避免COUNT I tried DESCRIBE EXTENDED 但这产生了numRows 0这显然是不正确的 对新手问题表示歉
  • 为什么组合器输入记录的数量比映射的输出数量多?

    Combiner 在 Mapper 之后 Reducer 之前运行 它将接收给定节点上的 Mapper 实例发出的所有数据作为输入 然后它将输出发送到Reducers 因此组合器输入的记录应小于映射输出的记录 12 08 29 13 38
  • 使用 Hadoop 映射两个数据集

    假设我有两个键值数据集 数据集A和B 我们称它们为数据集A和B 我想用 B 组的数据更新 A 组中的所有数据 其中两者在键上匹配 因为我要处理如此大量的数据 所以我使用 Hadoop 进行 MapReduce 我担心的是 为了在 A 和 B
  • 如果 HBase 不是运行在分布式环境中,它还有意义吗?

    我正在构建数据索引 这将需要以形式存储大量三元组 document term weight 我将存储多达几百万个这样的行 目前我正在 MySQL 中将其作为一个简单的表来执行 我将文档和术语标识符存储为字符串值 而不是其他表的外键 我正在重
  • 猪如何过滤不同的对(对)

    我是猪的新手 我有一个 Pig 脚本 它在两个元素之间生成制表符分隔的对 每行一对 例如 John Paul Tom Nik Mark Bill Tom Nik Paul John 我需要过滤掉重复的组合 如果我使用 DISTINCT 我会
  • http://localhost:50070/ 的 hadoop Web UI 不起作用

    命令 jps 显示以下详细信息 第5144章 5464 节点管理器 5307 资源管理器 5800 Jps 显然namenode和datanode丢失了 网络用户界面位于http 本地主机 50070 http localhost 5007
  • 将日期字符串转换为“MM/DD/YY”格式

    我刚刚看到这个例子 我该如何解决这个问题 Hive 元存储包含一个名为 Problem1 的数据库 其中包含一个名为 customer 的表 customer 表包含 9000 万条客户记录 90 000 000 每条记录都有一个生日字段
  • hive创建表的多个转义字符

    我正在尝试将带有管道分隔符的 csv 加载到配置单元外部表 数据值包含单引号 双引号 括号等 使用 Open CSV 版本 2 3 测试文件 csv id name phone 1 Rahul 123 2 Kumar s 456 3 Nee
  • 如何在 Hadoop 中将 String 对象转换为 IntWritable 对象

    我想转换String反对IntWritableHadoop 中的对象 任何过程都可以进行转换 IntWritable value new IntWritable Integer parseInt someString 并处理以下可能性par
  • 在 Amazon EMR 上使用 java 中的 hbase 时遇到问题

    因此 我尝试使用作为 MapReduce 步骤启动的自定义 jar 来查询 Amazon ec2 上的 hbase 集群 我的 jar 在地图函数内 我这样调用 Hbase public void map Text key BytesWri
  • 猪参考

    我正在学习 Hadoop Pig 并且我总是坚持引用元素 请查找下面的示例 groupwordcount group chararray words bag of tokenTuples from line token chararray
  • Hive“添加分区”并发

    我们有一个外部 Hive 表 用于处理原始日志文件数据 这些文件每小时一次 并按日期和源主机名分区 目前 我们正在使用简单的 python 脚本导入文件 这些脚本每小时触发几次 该脚本根据需要在 HDFS 上创建子文件夹 从临时本地存储复制
  • YARN UNHEALTHY 节点

    在我们的 YARN 集群已满 80 的情况下 我们看到一些纱线节点管理器被标记为不健康 在深入研究日志后 我发现这是因为数据目录的磁盘空间已满 90 出现以下错误 2015 02 21 08 33 51 590 INFO org apach
  • 将 CSV 转换为序列文件

    我有一个 CSV 文件 我想将其转换为 SequenceFile 我最终将使用它来创建 NamedVectors 以在聚类作业中使用 我一直在使用 seqdirectory 命令尝试创建 SequenceFile 然后使用 nv 选项将该输
  • 如何将SQL数据加载到Hortonworks中?

    我已在我的电脑中安装了 Hortonworks SandBox 还尝试使用 CSV 文件 并以表结构的方式获取它 这是可以的 Hive Hadoop nw 我想将当前的 SQL 数据库迁移到沙箱 MS SQL 2008 r2 中 我将如何做
  • InvalidRequestException(为什么:empid 如果包含 Equal,则不能被多个关系限制)

    这是关于我从 Apache Spark 查询 Cassandra 时遇到的问题 Spark 的正常查询工作正常 没有任何问题 但是当我使用关键条件进行查询时 出现以下错误 最初 我尝试查询复合键列族 它也给出了与下面相同的问题 由以下原因引

随机推荐