我正在使用带有 java8 的 Spark-sql 2.4.1v。
我有一个场景,dataset1 中有一些元数据,即从 HDFS Parquet 文件加载。
我还有另一个从 Kafka Stream 读取的 dataset2。
For each record from dataset2 for column1 I need to check columnX in dataset2
if its there in dataset1.
If it is there in dataset1,then I need replace the columnX value with column1 value of dataset1
Else
I need to add increment (max(column1 ) from dataset1 ) by one and store it dataset1.
您可以在此处查看一些示例数据:
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/3447405230020171/7035720262824085/latest.html https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/3447405230020171/7035720262824085/latest.html
如何在 sSpark 中完成此操作?
Example:
val df1 = Seq(
("20359045","2263"),
("8476349","3280"),
("60886923","2860"),
("204831453","50330"),
("6487533","48236"),
("583633","46067"),
).toDF("company_id_external","company_id")
val df2 = Seq(
("60886923","Chengdu Fuma Food Co,.Ltd"), //company_id_external match found in df1
("608815923","Australia Deloraine Dairy Pty Ltd ),
("59322769","Consalac B.V.")
("583633","Boso oil and fat Co., Ltd. ") //company_id_external match found in df1
)toDF("company_id_external","companyName")
如果在 df1 中找到匹配项
Here only two records of df1 "company_id_external" matching to df2 "company_id_external"
i.e. 60886923 & 583633 ( first and last record)
For these records of df2
i.e. ("60886923","Chengdu Fuma Food Co,.Ltd") becomes ==> ("2860","Chengdu Fuma Food Co,.Ltd")
("583633","Boso oil and fat Co., Ltd. ") becomes ==> ("46067","Boso oil and fat Co., Ltd. ")
否则在 df1 中找不到匹配项
对于df2的其他两个,df1中没有“company_id_external”匹配,需要生成company_id并添加到df1
即(“608815923”,“澳大利亚德洛兰乳业有限公司),
("59322769","Consalac B.V.")
company_id生成逻辑
新的 company_id = df1 + 1 的 max(company_id)
从上面的最大值是 50330 + 1 => 50331 将此记录添加到 df1 即 ("608815923","50331")
对其他记录执行相同操作,即将此记录添加到 df1,即 ("583633","50332")
**So now**
df1 = Seq(
("20359045","2263"),
("8476349","3280"),
("60886923","2860"),
("204831453","50330"),
("6487533","48236"),
("583633","46067"),
("608815923","50331")
("583633","50332")
).toDF("company_id_external","company_id")