如何在scala Spark中按键连接两个数据集

2024-04-23

我有两个数据集,每个数据集都有两个元素。 以下是示例。

数据1:(名称,动物)

('abc,def', 'monkey(1)')
('df,gh', 'zebra')
...

数据2:(名称、水果)

('a,efg', 'apple')
('abc,def', 'banana(1)')
...

预期结果:(名称、动物、水果)

('abc,def', 'monkey(1)', 'banana(1)')
... 

我想通过使用第一列“名称”来加入这两个数据集。我已经尝试这样做了几个小时,但我无法弄清楚。谁能帮我?

val sparkConf = new SparkConf().setAppName("abc").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val text1 = sc.textFile(args(0))
val text2 = sc.textFile(args(1))

val joined = text1.join(text2)

上面的代码不起作用!


join定义在 RDD 上,即 RDD 类型RDD[(K,V)]。 所需的第一步是将输入数据转换为正确的类型。

我们首先需要对原始数据进行类型转换String成对(Key, Value):

val parse:String => (String, String) = s => {
  val regex = "^\\('([^']+)',[\\W]*'([^']+)'\\)$".r
  s match {
    case regex(k,v) => (k,v)
    case _ => ("","")
  }
}

(请注意,我们不能使用简单的split(",")表达式,因为键包含逗号)

然后我们使用该函数来解析文本输入数据:

val s1 = Seq("('abc,def', 'monkey(1)')","('df,gh', 'zebra')")
val s2 = Seq("('a,efg', 'apple')","('abc,def', 'banana(1)')")

val rdd1 = sparkContext.parallelize(s1)
val rdd2 = sparkContext.parallelize(s2)

val kvRdd1 = rdd1.map(parse)
val kvRdd2 = rdd2.map(parse)

最后,我们使用join连接两个 RDD 的方法

val joined = kvRdd1.join(kvRdd2)

// 我们来看看结果

joined.collect

// res31: Array[(String, (String, String))] = Array((abc,def,(monkey(1),banana(1))))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在scala Spark中按键连接两个数据集 的相关文章

随机推荐