Spark API 的大量子集需要隐式ClassTags
(see Scala:什么是 TypeTag 以及如何使用它? https://stackoverflow.com/q/12218641/6910411) and PairDStreamFunctions.mapWithState
没有什么不同。查看类定义 https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L37-L38:
class PairDStreamFunctions[K, V](self: DStream[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
and https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L380-L381:
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {
...
}
如果想创建一个对通用对流进行操作并使用的函数mapWithState
你至少应该提供ClassTags
for KeyType
and ValueType
types:
def foo[T : ClassTag, U : ClassTag](
stream: DStream[(T, U)], f: StateSpec[T, U, Int, Int]) = stream.mapWithState(f)
If StateType
and MappedType
也被参数化了,你需要ClassTags
对于这些也:
def bar[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
stream: DStream[(T, U)], f: StateSpec[T, U, V, W]) = stream.mapWithState(f)