我正在映射一张 HBase 表,为每个 HBase 行生成一个 RDD 元素。然而,有时该行有坏数据(在解析代码中抛出 NullPointerException),在这种情况下我只想跳过它。
我的初始映射器返回一个Option
指示它返回 0 或 1 个元素,然后过滤Some
,然后获取包含的值:
// myRDD is RDD[(ImmutableBytesWritable, Result)]
val output = myRDD.
map( tuple => getData(tuple._2) ).
filter( {case Some(y) => true; case None => false} ).
map( _.get ).
// ... more RDD operations with the good data
def getData(r: Result) = {
val key = r.getRow
var id = "(unk)"
var x = -1L
try {
id = Bytes.toString(key, 0, 11)
x = Long.MaxValue - Bytes.toLong(key, 11)
// ... more code that might throw exceptions
Some( ( id, ( List(x),
// more stuff ...
) ) )
} catch {
case e: NullPointerException => {
logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e)
None
}
}
}
有没有更惯用的更短的方法?我觉得这看起来很混乱,无论是getData()
并在map.filter.map
我正在做的舞蹈。
也许是一个flatMap
可以工作(在 a 中生成 0 或 1 个项目Seq
),但我不希望它展平我在映射函数中创建的元组,只需消除空值即可。
另一种经常被忽视的方法是使用collect(PartialFunction pf)
,这意味着“选择”或“收集”RDD 中在偏函数中定义的特定元素。
代码如下所示:
val output = myRDD.collect{case Success(tuple) => tuple }
def getData(r: Result):Try[(String, List[X])] = Try {
val id = Bytes.toString(key, 0, 11)
val x = Long.MaxValue - Bytes.toLong(key, 11)
(id, List(x))
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)