load() 在 Spark 中做什么?

2023-12-19

火花很懒吧? 那么什么是load() do?

start = timeit.default_timer()

 df = sqlContext.read.option(
     "es.resource", indexes
 ).format("org.elasticsearch.spark.sql")
 end = timeit.default_timer()

 print('without load: ', end - start) # almost instant
 start = timeit.default_timer()

 df = df.load()
 end = timeit.default_timer()
 print('load: ', end - start) # takes 1sec

 start = timeit.default_timer()

 df.show()
 end = timeit.default_timer()
 print('show: ', end - start) # takes 4 sec

If show()我猜这是唯一的行动load不会花费太多时间,如 1 秒。所以我得出结论load()是一个动作(与 Spark 中的转换相对)

load 是否真的将整个数据加载到内存中?我不这么认为,但是它有什么作用呢?

我搜索并查看了文档https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html但这没有帮助..


tl;dr load()是一个 DataFrameReader api(org.apache.spark.sql.DataFrameReader#load)从下面的代码可以看出,它返回一个DataFrame,可以在其上应用 Spark 转换。

/**
   * Loads input in as a `DataFrame`, for data sources that support multiple paths.
   * Only works if the source is a HadoopFsRelationProvider.
   *
   * @since 1.6.0
   */
  @scala.annotation.varargs
  def load(paths: String*): DataFrame

需要创建一个 DataFrame 来执行转换。
要从路径(HDFS、S3 等)创建数据帧,用户可以使用spark.read.format("<format>").load().(还有特定于数据源的 API,可以自动加载文件,例如spark.read.parquet(<path>))

为什么需要整整1秒?

在基于文件的源中,这一次可以归因于文件列表。在 HDFS 中,这些列表并不昂贵,而在像 S3 这样的云存储中,这个列表非常昂贵,并且需要与文件数量成比例的时间。
在您的情况下,使用的数据源是elastic-search,时间可归因于连接建立、收集元数据以执行分布式扫描等,这取决于 Elastic Serach 连接器实现。我们可以启用调试日志并检查更多信息。如果elasticsearch有办法记录它收到的请求,我们可以检查elasticsearch日志中是否有在该时间之后发出的请求load()被解雇。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

load() 在 Spark 中做什么? 的相关文章

随机推荐