您好,我是集群计算的新手,目前我只在独立集群上玩(sc <- spark_connect(master = "local", version = '2.0.2')
)。我有一个巨大的 csv 文件(15GB),我想将其转换为 parquet 文件(第三块代码解释了原因)。这个 15GB 文件已经是 60GB 文件的样本,当我停止播放时,我需要使用/查询完整的 60GB 文件。目前我所做的是:
> system.time({FILE<-spark_read_csv(sc,"FILE",file.path("DATA/FILE.csv"),memory = FALSE)})
user system elapsed
0.16 0.04 1017.11
> system.time({spark_write_parquet(FILE, file.path("DATA/FILE.parquet"),mode='overwrite')})
user system elapsed
0.92 1.48 1267.72
> system.time({FILE<-spark_read_parquet(sc,"FILE", file.path("DATA/FILE.parquet"),memory = FALSE)})
user system elapsed
0.00 0.00 0.26
如您所见,这需要相当长的时间。我想知道第一行代码中发生了什么(spark_read_csv
) with memory = FALSE
?它在哪里读取/保存它?当我断开并再次重新连接会话时,我可以访问该位置吗?
另外,有没有办法以更有效的方式结合步骤 1 和 2?
我并不羞于尝试使用 API 中尚未提供的较低级别的函数,因为它很简单并且可以在很大程度上实现自动化。
当以下情况时不保存数据spark_read_csv
被调用memory = FALSE
。您的延迟与数据加载本身无关,而是与模式推断过程有关,该过程需要单独的数据扫描。
尽管使用模式推断很方便,但显式提供模式(如命名向量)从列名映射到输入简单的字符串 https://stackoverflow.com/a/32286450。例如,如果您要将 iris 数据集加载到local
mode:
path <- tempfile()
readr::write_csv(iris, path)
你会用
spark_read_csv(
sc, "iris", path, infer_schema=FALSE, memory = FALSE,
columns = c(
Sepal_Length = "double", Sepal_Width = "double",
Petal_Length = "double", Petal_Width = "double",
Species = "string"))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)