按 Spark 键写入多个输出 - 一个 Spark 作业

2024-01-02

如何在单个作业中使用 Spark 写入依赖于密钥的多个输出。

有关的:按键写入多个输出 Scalding Hadoop,一个 MapReduce 作业 https://stackoverflow.com/questions/23994383/write-to-multiple-outputs-by-key-scalding-hadoop-one-mapreduce-job/

E.g.

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

将确保cat prefix/1 is

a
b

and cat prefix/2将会

c

编辑:我最近添加了一个新答案,其中包括完整导入、pimp 和压缩编解码器,请参阅https://stackoverflow.com/a/46118044/1586965 https://stackoverflow.com/a/46118044/1586965,除了之前的答案之外,这可能会有所帮助。


如果您使用 Spark 1.4+,这会变得非常非常容易,这要归功于数据帧API https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html。 (DataFrames 是在 Spark 1.3 中引入的,但是partitionBy(),我们需要的是1.4中引入 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.partitionBy.)

如果您开始使用 RDD,则首先需要将其转换为 DataFrame:

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")

在 Python 中,同样的代码是:

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])

一旦有了 DataFrame,根据特定键写入多个输出就很简单了。更重要的是——这就是 DataFrame API 的美妙之处——Python、Scala、Java 和 R 中的代码几乎相同:

people_df.write.partitionBy("number").text("people")

如果需要,您可以轻松使用其他输出格式:

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")

在每个示例中,Spark 都会为我们对 DataFrame 进行分区的每个键创建一个子目录:

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

按 Spark 键写入多个输出 - 一个 Spark 作业 的相关文章

  • 过滤字符串上的 Spark DataFrame 包含

    我在用火花1 3 0 http spark apache org releases spark release 1 3 0 html and 火花阿夫罗1 0 0 https github com databricks spark avro
  • 减少/折叠幺半群列表,但减少器返回任一

    我发现自己遇到过几次这样的情况 我有一个减速器 组合 fn 如下所示 def combiner a String b String Either String String a b asRight String 它是一个虚拟实现 但 fn
  • 在 Akka 中配置嵌套 Router

    我有一些嵌套的路由器 应创建它FromConfig 我想要的是这样的 test akka actor deployment worker router round robin nr of instances 5 slave router b
  • 如何使用 PySpark 预处理图像?

    我有一个项目 需要为 1 设置大数据架构 AWS S3 SageMaker 的概念验证使用 PySpark 预处理图像 2 执行 PCA and 3 训练一些机器或深度学习模型 我的问题是了解如何使用 PySpark 操作图像数据 但无法在
  • Akka Stream Graph 恢复问题

    我创建了一个图表来并行化具有相同输入的两个流 这些流产生 Future Option Entity 如果 flowA 失败 我想返回 Future None 但恢复似乎没有被调用 val graph Flow Input Future Op
  • Spark SQL 失败,因为“常量池已超过 JVM 限制 0xFFFF”

    我在 EMR 4 6 0 Spark 1 6 1 上运行此代码 val sqlContext SQLContext getOrCreate sc val inputRDD sqlContext read json input try inp
  • 对两种类型之间的二元关系进行建模

    有企业 也有人 用户可以对某个企业点赞或发表评论 但效果是一样的can not发生在一个人身上 当用户发布有关某个企业的内容或对其点赞时 该企业就被称为target喜欢或帖子 trait TargetingRelation Targetin
  • 在 Spark MLlib 上使用 Java 中的 Breeze

    在尝试从Java使用MLlib时 使用微风矩阵运算的正确方法是什么 例如scala 中的乘法很简单 matrix vector 相应的功能在Java中是如何表达的 有一些方法 例如 colon times 可以通过正确的方式调用 breez
  • 如何使用 Spark 2 屏蔽列?

    我有一些表 我需要屏蔽其中的一些列 要屏蔽的列因表而异 我正在读取这些列application conf file 例如 对于员工表如下所示 id name age address 1 abcd 21 India 2 qazx 42 Ger
  • 理解 Scala FP 库

    只是为了让那些想要开始使用 Scala FP 库 在纯 FP 方面变得更好的人快速清晰地了解 有人能澄清猫和猫效应 猫效应 IO 之间的区别 关系吗 最重要的是 齐奥和莫尼克斯对此有何看法 最后 与 ScalaZ 7 8 有何关系 到目前为
  • 如何在映射中将字符串转换为 Seq[String]

    我有一个Map String String 以及需要的第三方功能Map String Seq String 有没有一种简单的方法来转换它 以便我可以将地图传递给函数 original mapValues Seq 注意mapValues返回地
  • 获取 int() 参数必须是字符串或数字,而不是“Column”- Apache Spark

    如果我使用以下代码 我会收到此异常 int argument must be a string or a number not Column df df withColumn FY F when df ID substr 5 2 isin
  • Scala中有类似Java Stream的“peek”操作吗?

    在Java中你可以调用peek x gt println x 在 Stream 上 它将对每个元素执行操作并返回原始流 这与 foreach 不同 foreach 是 Unit Scala 中是否有类似的东西 最好是适用于所有 Monady
  • 如何将 Pyspark Dataframe 标题设置到另一行?

    我有一个如下所示的数据框 col1 col2 col3 id name val 1 a01 X 2 a02 Y 我需要从中创建一个新的数据框 使用 row 1 作为新的列标题并忽略或删除 col1 col2 等行 新表应如下所示 id na
  • Spark DataFrame 序列化为无效 json

    TL DR 当我倾倒 Spark 时DataFrame作为 json 我总是得到类似的结果 key1 v11 key2 v21 key1 v12 key2 v22 key1 v13 key2 v23 这是无效的 json 我可以手动编辑转储
  • Java 8 Stream,获取头部和尾部

    Java 8 引入了Stream http download java net jdk8 docs api java util stream Stream html类似于 Scala 的类Stream http www scala lang
  • 解决“Show”类型类实例的隐式问题

    我正在努力使Gender实施Show类型类 scala gt trait Gender extends Show Gender defined trait Gender scala gt case object Male extends G
  • Scala Spark 包含与不包含

    我可以使用 contains 过滤 RDD 中的元组 如下所示 但是使用 不包含 来过滤 RDD 又如何呢 val rdd2 rdd1 filter x gt x 1 contains 我找不到这个的语法 假设这是可能的并且我没有使用Dat
  • Spark 中的 Distinct() 函数如何工作?

    我是 Apache Spark 的新手 正在学习基本功能 有一个小疑问 假设我有一个元组 键 值 的 RDD 并且想从中获取一些唯一的元组 我使用distinct 函数 我想知道该函数基于什么基础认为元组是不同的 是基于键 值还是两者 di
  • Spark 有没有办法捕获执行器终止异常?

    在执行我的 Spark 程序期间 有时 其原因对我来说仍然是个谜 yarn 会杀死容器 执行器 并给出超出内存限制的消息 我的程序确实恢复了 但 Spark 通过生成一个新容器重新执行任务 但是 在我的程序中 任务还会在磁盘上创建一些中间文

随机推荐