我使用的是spark-sql 2.4.x版本,datastax-spark-cassandra-connector用于Cassandra-3.x版本。和卡夫卡一起。
我有货币样本的汇率元数据如下:
val ratesMetaDataDf = Seq(
("EUR","5/10/2019","1.130657","USD"),
("EUR","5/9/2019","1.13088","USD")
).toDF("base_code", "rate_date","rate_value","target_code")
.withColumn("rate_date", to_date($"rate_date" ,"MM/dd/yyyy").cast(DateType))
.withColumn("rate_value", $"rate_value".cast(DoubleType))
我从kafka主题收到的销售记录是,如下(示例)
:
val kafkaDf = Seq((15,2016, 4, 100.5,"USD","2021-01-20","EUR",221.4)
).toDF("companyId", "year","quarter","sales","code","calc_date","c_code","prev_sales")
要计算“prev_sales”,我需要获取其“c_code”各自的“rate_value”,它最接近“calc_date”,即rate_date”
我正在做如下
val w2 = Window.orderBy(col("rate_date") desc)
val rateJoinResultDf = kafkaDf.as("k").join(ratesMetaDataDf.as("e"))
.where( ($"k.c_code" === $"e.base_code") &&
($"rate_date" < $"calc_date")
).orderBy($"rate_date" desc)
.withColumn("row",row_number.over(w2))
.where($"row" === 1).drop("row")
.withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast(DoubleType))
.select("companyId", "year","quarter","sales","code","calc_date","prev_sales")
在上面,为了获取给定“rate_date”的最近记录(即来自 ratesMetaDataDf 的“5/10/2019”),我使用 window 和 row_number 函数并按“desc”对记录进行排序。
但在spark-sql流中它导致了如下错误
"
Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;"
那么如何获取第一条记录加入上面呢。