join
定义在 RDD 上,即 RDD 类型RDD[(K,V)]
。
所需的第一步是将输入数据转换为正确的类型。
我们首先需要对原始数据进行类型转换String
成对(Key, Value)
:
val parse:String => (String, String) = s => {
val regex = "^\\('([^']+)',[\\W]*'([^']+)'\\)$".r
s match {
case regex(k,v) => (k,v)
case _ => ("","")
}
}
(请注意,我们不能使用简单的split(",")
表达式,因为键包含逗号)
然后我们使用该函数来解析文本输入数据:
val s1 = Seq("('abc,def', 'monkey(1)')","('df,gh', 'zebra')")
val s2 = Seq("('a,efg', 'apple')","('abc,def', 'banana(1)')")
val rdd1 = sparkContext.parallelize(s1)
val rdd2 = sparkContext.parallelize(s2)
val kvRdd1 = rdd1.map(parse)
val kvRdd2 = rdd2.map(parse)
最后,我们使用join
连接两个 RDD 的方法
val joined = kvRdd1.join(kvRdd2)
// 我们来看看结果
joined.collect
// res31: Array[(String, (String, String))] = Array((abc,def,(monkey(1),banana(1))))