使用 Spark Streaming 更新基于 Kafka Stream 的静态源?

2024-02-02

我正在使用带有 java8 的 Spark-sql 2.4.1v。

我有一个场景,dataset1 中有一些元数据,即从 HDFS Parquet 文件加载。

我还有另一个从 Kafka Stream 读取的 dataset2。

For each record from dataset2 for column1 I need to check  columnX in dataset2 
if its there in dataset1. 

If it is there in dataset1,then I need replace the columnX value with column1 value  of dataset1 
Else
  I need to add increment (max(column1 ) from dataset1 ) by one and store it dataset1.

您可以在此处查看一些示例数据:

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/3447405230020171/7035720262824085/latest.html https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/3447405230020171/7035720262824085/latest.html

如何在 sSpark 中完成此操作?

Example:

val df1 = Seq(
  ("20359045","2263"),
("8476349","3280"),
("60886923","2860"),  
("204831453","50330"),
("6487533","48236"),
("583633","46067"),  
  ).toDF("company_id_external","company_id")

 val df2 = Seq(
  ("60886923","Chengdu Fuma Food Co,.Ltd"), //company_id_external match found in df1 
("608815923","Australia Deloraine Dairy Pty Ltd ),
("59322769","Consalac B.V.")
("583633","Boso oil and fat Co., Ltd.      ") //company_id_external match found in df1
  )toDF("company_id_external","companyName")

如果在 df1 中找到匹配项

Here only two records of df1 "company_id_external" matching to df2 "company_id_external"
    i.e. 60886923 & 583633  ( first and last record)

    For these records of df2  
    i.e. ("60886923","Chengdu Fuma Food Co,.Ltd")  becomes ==> ("2860","Chengdu Fuma Food Co,.Ltd")
          ("583633","Boso oil and fat Co., Ltd.      ")  becomes ==>  ("46067","Boso oil and fat Co., Ltd.      ")

否则在 df1 中找不到匹配项

对于df2的其他两个,df1中没有“company_id_external”匹配,需要生成company_id并添加到df1 即(“608815923”,“澳大利亚德洛兰乳业有限公司), ("59322769","Consalac B.V.")

company_id生成逻辑 新的 company_id = df1 + 1 的 max(company_id) 从上面的最大值是 50330 + 1 => 50331 将此记录添加到 df1 即 ("608815923","50331") 对其他记录执行相同操作,即将此记录添加到 df1,即 ("583633","50332")

 **So now** 

df1 = Seq(
        ("20359045","2263"),
        ("8476349","3280"),
        ("60886923","2860"),  
        ("204831453","50330"),
        ("6487533","48236"),
        ("583633","46067"), 
        ("608815923","50331")
        ("583633","50332")
          ).toDF("company_id_external","company_id")

None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Spark Streaming 更新基于 Kafka Stream 的静态源? 的相关文章

随机推荐