这里的问题是你的reduce函数。对于每个键,reduceByKey
使用值对调用您的reduce 函数,并期望它生成相同类型的组合值。
例如,假设我想执行字数统计操作。首先,我可以将每个单词映射到一个(word, 1)
配对,然后我可以reduceByKey(lambda x, y: x + y)
总结每个单词的计数。最后,我留下了一个 RDD(word, count)
pairs.
这是一个来自PySpark API 文档:
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]
要理解为什么你的示例不起作用,你可以想象像这样应用reduce函数:
reduce(reduce(reduce(firstValue, secondValue), thirdValue), fourthValue) ...
根据您的减少功能,听起来您可能正在尝试实现内置groupByKey操作,它将每个键及其值列表分组。
另外,看看combineByKey,概括为reduceByKey()
允许reduce函数的输入和输出类型不同(reduceByKey
is 实施的按照combineByKey
)