我有一个包含一些 SQL 表达式(合并、case/when 等)的 DF。
我后来尝试映射/平面映射这个 DF,在那里我得到了Task not serializable
由于字段包含 SQL 表达式而导致错误。
(为什么我需要 map/flatMap 这个 DF 是一个单独的问题)
当我将此 DF 保存到 Parquet 文件并随后加载时,错误消失了,我可以转换为 RDD 并进行转换,没有问题!
保存前和加载后的 DF 有什么不同?以某种方式,SQL 表达式必须已被评估并持久化。我怎样才能实现同样的目标without保存/加载? (df.perists() 没有成功;(
)
这是测试代码:
val data = Seq( (1, "sku1", "EUR", 99.0, 89.0), (2, "sku2", "USD", 89.0, 79.0), (3, "sku3", "USD", 49.0, 39.9) )
val aditionalStuffForCertainSKUsMap = Map("sku1" -> List(10, 20, 30))
val listedPrice = coalesce(
List("EUR", "USD").map(c => when($"currency" === c, col(c)).otherwise(lit(null))): _*)
val df = (sc.parallelize(data)
.toDF("id", "sku", "currency", "EUR", "USD")
.withColumn("price_in_given_currency", when($"currency" === "EUR", $"EUR"*2).otherwise(1))
// .withColumn("fails_price_in_given_currency", listedPrice)
)
df.show
df.write.mode(SaveMode.Overwrite).parquet("test_df")
数据包含一个sku
有些 SKU 代表捆绑包,例如 sku1,我想为其添加一些其他字段到 DF。仅当我尝试访问此 Map[String, List[Int]] 时在 map() 中我收到了投诉fails_price_in_given_currency
列,但情况并非如此price_in_given_currency
:
// If I load the df first, the map() works even when using `fails_price_in_given_currency`
//val df = sqlContext.read.parquet("test_df")
val out = df.map(d => {
val key = d.getAs[String]("sku")
aditionalStuffForCertainSKUsMap.getOrElse(key, None)
})
当我使用时抛出错误fails_price_in_given_currency
反而。如果我加载df
之前的地图,它会再次运行!