很明显,程序中的瓶颈是搜索candidates
。鉴于 Spark 架构,它严重限制了并行化的能力,并通过为每个用户启动 Spark 作业增加了大量开销。
假设典型场景,700 万用户 and 十亿种产品大多数时候,您会预测整个范围的产品,减去用户已经购买的少数产品。至少在我看来,重要的问题是为什么还要费心过滤。即使你推荐以前买过的产品,真的有害吗?
除非你有非常严格的要求,否则我会简单地忽略这个问题并使用MatrixFactorizationModel.recommendProductsForUsers https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.recommendation.MatrixFactorizationModel它几乎可以为您完成所有工作(不包括数据导出)。之后,您可以执行批量导出,一切顺利。
现在假设您有明确的无重复政策。假设典型用户仅购买相对少量的产品,您可以从为每个用户获取一组产品开始:
val userProdSet = buy_values
.map{case (user, product, _) => (user, product)}
.aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2)
接下来你可以简单地映射userProdSet
获得预测:
// Number of predictions for each user
val nPred = 30;
userProdSet.map{case (user, prodSet) => {
val recommended = model
// Find recommendations for user
.recommendProducts(_, nPred + prodSet.size))
// Filter to remove already purchased
.filter(rating => !prodSet.contains(rating.product))
// Sort and limit
.sortBy(_.rating)
.reverse
.take(nPred)
(user, recommended)
}}
您可以通过使用可变集进行聚合和广播模型来进一步改进,但这只是一般想法。
如果用户数量user_ids
小于整组用户数(buy_values
)你可以简单地过滤userProdSet
仅保留一部分用户。