基于Spark MLlib平台的协同过滤算法---电影推荐系统

2023-11-10

说到推荐系统,大家可能立马会想到协同过滤算法。本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用。其中,主要包括三部分内容:


  • 协同过滤算法概述

  • 基于模型的协同过滤应---电影推荐

  • 实时推荐架构分析

    一、协同过滤算法概述

        本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理。

        通常,协同过滤算法按照数据使用,可以分为:

        1)基于用户(UserCF)

       2)基于商品(ItemCF)

       3)基于模型(ModelCF)

        按照模型,可以分为:

        1)最近邻模型:基于距离的协同过滤算法

       2)Latent Factor Mode(SVD):基于矩阵分解的模型

       3)Graph:图模型,社会网络图模型

        文中,使用的协同过滤算法是基于矩阵分解的模型。

 1、基于用户(UserCF)---基于用户相似性

        基于用户的协同过滤,通过不同用户对物品的评分来评测用户之间的相似性,基于用户之间的相似性做出推荐。简单来讲,就是给用户推荐和他兴趣相似的其他用户喜欢的物品。

        举个例子:

        技术分享

        如图,有三个用户A、B、C,四个物品A、B、C、D,需要向用户A推荐物品。这里,由于用户A和用户C都买过物品A和物品C,所以,我们认为用户A和用户C非常相似,同时,用户C又买过物品D,那么就需要给A用户推荐物品D。

        基于UserCF的基本思想相当简单,基于用户对物品的偏好,找到相邻邻居用户,然后将邻居用户喜欢的商品推荐给当前用户。

        计算上,将一个用户对所有物品的偏好作为一个向量来计算用户之间的相似度,找到K邻居后,根据邻居的相似度权重以及他们对物品的偏好,预测当前用户没有偏好的未涉及物品,计算得到一个排序的物品列表作为推荐。


  2、基于商品(ItemCF)---基于商品相似性

      基于商品的协同过滤,通过用户对不同item的评分来评测item之间的相似性,基于item之间的相似性做出推荐。简单来将,就是给用户推荐和他之前喜欢的物品相似的物品。

       例如:

       技术分享

       如图,有三个用户A、B、C和三件物品A、B、C,需要向用户C推荐物品。这里,由于用户A买过物品A和C,用户B买过物品A、B、C,用户C买过物品A,从用户A和B可以看出,这两个用户都买过物品A和C,说明物品A和C非常相似,同时,用户C又买过物品A,所以,将物品C推荐给用户C。

       基于ItemCF的原理和基于UserCF类似,只是在计算邻居时采用物品本身,而不是从用户的角度,即基于用户对物品的偏好找到相似的物品,然后根据用户的历史偏好,推荐相似的物品给他。

       从计算角度,即将所有用户对某个物品的偏好作为一个向量来计算物品之间的相似度,得到物品的相似物品后,根据用户历史的偏好预测当前用户还没有表示偏好的物品,计算得到一个排序的物品列表作为推荐。


    3基于模型(ModelCF)

        基于模型的协同过滤推荐就是基于样本的用户喜好信息,训练一个推荐模型,然后根据实时的用户喜好的信息进行预测,计算推荐。

                本文使用的基于矩阵分解的模型,算法如图:

        技术分享

         Spark MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐性因子进行表达,并且这些因子也用于预测缺失的元素。MLlib使用交替最小二乘法(ALS)来学习这些隐性因子。

         如果有兴趣,可以阅读Spark的这部分源代码:

         技术分享

         

 二、基于模型的协同过滤应用---电影推荐

         本文实现对用户推荐电影的简单应用。

        1、测试数据描述

           本次测试数据主要包括四个数据文件:(详细的数据描述参见README文件)

           1)用户数据文件

              用户ID::性别::年龄::职业编号::邮编

              技术分享

          2)电影数据文件

             电影ID::电影名称::电影种类

             技术分享

         3)评分数据文件

            用户ID::电影ID::评分::时间

            技术分享

        4)测试数据

           用户ID::电影ID::评分::时间

           技术分享

        这里,前三个数据文件用于模型训练,第四个数据文件用于测试模型。


        2、实现代码:

           

import org.apache.log4j.{Level, Logger}

import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}

import org.apache.spark.rdd._

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.SparkContext._


import Scala.io.Source


object MovieLensALS {

  def main(args:Array[String]) {


    //屏蔽不必要的日志显示在终端上

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)


    //设置运行环境

    val sparkConf = new SparkConf().setAppName("MovieLensALS").setMaster("local[5]")

    val sc = new SparkContext(sparkConf)


    //装载用户评分,该评分由评分器生成(即生成文件personalRatings.txt)

    val myRatings = loadRatings(args(1))

    val myRatingsRDD = sc.parallelize(myRatings, 1)


    //样本数据目录

    val movielensHomeDir = args(0)


    //装载样本评分数据,其中最后一列Timestamp取除10的余数作为key,Rating为值,即(Int,Rating)

    val ratings = sc.textFile(movielensHomeDir + "/ratings.dat").map {

      line =>

        val fields = line.split("::")

        // format: (timestamp % 10, Rating(userId, movieId, rating))

        (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))

    }


    //装载电影目录对照表(电影ID->电影标题)

    val movies = sc.textFile(movielensHomeDir + "/movies.dat").map {

      line =>

        val fields = line.split("::")

        // format: (movieId, movieName)

        (fields(0).toInt, fields(1))

    }.collect().toMap

    

    //统计有用户数量和电影数量以及用户对电影的评分数目

    val numRatings = ratings.count()

    val numUsers = ratings.map(_._2.user).distinct().count()

    val numMovies = ratings.map(_._2.product).distinct().count()

    println("Got " + numRatings + " ratings from " + numUsers + " users " + numMovies + " movies")


    //将样本评分表以key值切分成3个部分,分别用于训练 (60%,并加入用户评分), 校验 (20%), and 测试 (20%)

    //该数据在计算过程中要多次应用到,所以cache到内存

    val numPartitions = 4

    val training = ratings.filter(x => x._1 < 6).values.union(myRatingsRDD).repartition(numPartitions).persist()

    val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8).values.repartition(numPartitions).persist()

    val test = ratings.filter(x => x._1 >= 8).values.persist()


    val numTraining = training.count()

    val numValidation = validation.count()

    val numTest = test.count()

    println("Training: " + numTraining + " validation: " + numValidation + " test: " + numTest)



    //训练不同参数下的模型,并在校验集中验证,获取最佳参数下的模型

    val ranks = List(8, 12)

    val lambdas = List(0.1, 10.0)

    val numIters = List(10, 20)

    var bestModel: Option[MatrixFactorizationModel] = None

    var bestValidationRmse = Double.MaxValue

    var bestRank = 0

    var bestLambda = -1.0

    var bestNumIter = -1


    for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {

      val model = ALS.train(training, rank, numIter, lambda)

      val validationRmse = computeRmse(model, validation, numValidation)

      println("RMSE(validation) = " + validationRmse + " for the model trained with rank = "

        + rank + ",lambda = " + lambda + ",and numIter = " + numIter + ".")


      if (validationRmse < bestValidationRmse) {

        bestModel = Some(model)

        bestValidationRmse = validationRmse

        bestRank = rank

        bestLambda = lambda

        bestNumIter = numIter

      }

    }


    //用最佳模型预测测试集的评分,并计算和实际评分之间的均方根误差(RMSE)

    val testRmse = computeRmse(bestModel.get, test, numTest)

    println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda

      + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")


    //create a naive baseline and compare it with the best model

    val meanRating = training.union(validation).map(_.rating).mean

    val baselineRmse = math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).reduce(_ + _) / numTest)

    val improvement = (baselineRmse - testRmse) / baselineRmse * 100

    println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")


    //推荐前十部最感兴趣的电影,注意要剔除用户已经评分的电影

    val myRatedMovieIds = myRatings.map(_.product).toSet

    val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)

    val recommendations = bestModel.get

      .predict(candidates.map((0, _)))

      .collect

      .sortBy(-_.rating)

      .take(10)

    var i = 1

    println("Movies recommended for you:")

    recommendations.foreach { r =>

      println("%2d".format(i) + ": " + movies(r.product))

      i += 1

    }


    sc.stop()

  }



  /** 校验集预测数据和实际数据之间的均方根误差 **/

  def computeRmse(model:MatrixFactorizationModel,data:RDD[Rating],n:Long):Double = {


    val predictions:RDD[Rating] = model.predict((data.map(x => (x.user,x.product))))

    val predictionsAndRatings = predictions.map{ x =>((x.user,x.product),x.rating)}

                          .join(data.map(x => ((x.user,x.product),x.rating))).values

    math.sqrt(predictionsAndRatings.map( x => (x._1 - x._2) * (x._1 - x._2)).reduce(_+_)/n)

  }


  /** 装载用户评分文件 personalRatings.txt **/

  def loadRatings(path:String):Seq[Rating] = {

    val lines = Source.fromFile(path).getLines()

    val ratings = lines.map{

      line =>

        val fields = line.split("::")

        Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble)

    }.filter(_.rating > 0.0)

    if(ratings.isEmpty){

      sys.error("No ratings provided.")

    }else{

      ratings.toSeq

    }

  }

}


       3、运行程序

        1)设置参数,运行程序(两个参数:第一个数据文件目录,第二个测试数据)

             技术分享

         2)程序运行效果---模型训练过程

           技术分享

        3)程序运行效果---电影推荐结果

      技术分享

        

        4、总结

          这样,一个简单的基于模型的电影推荐应用就算OK了。


    三、实时推荐架构分析

        上面,实现了简单的推荐系统应用,但是,仅仅实现用户的定向推荐,在实际应用中价值不是非常大,如果体现价值,最好能够实现实时或者准实时推荐。

        下面,简单介绍下实时推荐的一个架构:

        技术分享

        

        该架构图取自淘宝Spark On Yarn的实时架构,这里,给出一些个人的观点:

        架构图分为三层:离线、近线和在线。

            离线部分:主要实现模型的建立。原始数据通过ETL加工清洗,得到目标数据,目标业务数据结合合适的算法,学习训练模型,得到最佳的模型。

            近线部分:主要使用Hbase存储用户行为信息,模型混合系统综合显性反馈和隐性反馈的模型处理结果,将最终的结果推荐给用户。

            在线部分:这里,主要有两种反馈,显性和隐性,个人理解,显性反馈理解为用户将商品加入购物车,用户购买商品这些用户行为;隐性反馈理解为用户在某个商品上停留的时间,用户点击哪些商品这些用户行为。这里,为了实现实时/准实时操作,使用到了Spark Streaming对数据进行实时处理。(有可能是Flume+Kafka+Spark Streaming架构)

        这里是个人的一些理解,不足之处,望各位指点。

参考资料

http://blog.selfup.cn/1001.html

http://www.zybang.com/question/d2bcbb91f8085edb719b7c4f942d82cf.html

https://msdn.microsoft.com/library/azure/fa4aa69d-2f1c-4ba4-ad5f-90ea3a515b4c

https://studio.azureml.NET/



本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于Spark MLlib平台的协同过滤算法---电影推荐系统 的相关文章

  • Hudi 0.12.0 搭建——集成 Hive3.1 与 Spark3.2

    Hudi 搭建 https blog csdn net weixin 46389691 article details 128276527 环境准备 一 安装 Maven 1 解压 2 配置环境变量 3 修改 Maven 下载源 二 安装
  • Spark课程设计——电影推荐系统

    题目所需数据集及相应信息描述 数据集 1 用户评分数据集ratings dat 包含了大量用户的历史评分数据 2 样本评分数据集personalRatings dat 包含了少数几个用户的个性化评分数据 这些数据反映了某个用户的个性化观影喜
  • Compressed Sparse Column format(CSC)

    CSR Compressed Sparse Row format 和CSC Compressed Spare Column format 都是一种稀疏矩阵的存储格式 这里分别给出实例 假设有如下矩阵 1360
  • python推荐系统学习笔记(5)——基于图的模型推荐算法

    python推荐系统学习笔记 5 基于图的模型推荐算法 2 1 用户行为数据的二分图表示 为可以把基于邻域的模型看作基于图的模型的简单形式 用户物品二分图模型 对于数据集中每一个二元组 u i 图中都有一套对应的边e vu vi 其中vu属
  • spark算子执行位置研究,driver端?executor端?

    参考资料 https cloud tencent com developer article 1545723 前言 spark算子的执行位置 driver端 还是executor端 这些之前其实没有注意过 最近在学流处理 发现这个还是很重要
  • 推荐系统(四)——因果效应uplift model系列模型S-Learner,T-Learner,X-Learner

    在之前的文章中我们介绍了使用因果推断中的去除混杂和反事实的相关理论来纠正推荐系统中的偏差问题 在这篇文章中主要和大家分享uplift model相关知识和方法 例子 小夏的商铺在上次请了明星代言后 销量有所上升 但是他不清楚是不是每个人都对
  • 直播预告

    点击蓝字 关注我们 AI TIME欢迎每一位AI爱好者的加入 6月9日晚7 30 9 00 AI TIME特别邀请了三位优秀的讲者跟大家共同开启ICLR专场六 哔哩哔哩直播通道 扫码关注AITIME哔哩哔哩官方账号 观看直播 链接 http
  • 大数据spark开发入门教程

    大数据是互联网发展的方向 大数据人才是未来的高薪贵族 随着大数据人才的供不应求 大数据人才的薪资待遇也在不断提升 如果你也想进入大数据行业 也想学习大数据技术 大数据讲师认为 可以先从spark技术开始 一 Spark是什么 Spark是一
  • spark-submit 报错 Initial job has not accepted any resources

    spark submit 报这样的错误 WARN scheduler TaskSchedulerImpl Initial job has not accepted any resources check your cluster UI to
  • Hudi和Kudu的比较

    与Kudu相比 Kudu是一个支持OLTP workload的数据存储系统 而Hudi的设计目标是基于Hadoop兼容的文件系统 如HDFS S3等 重度依赖Spark的数据处理能力来实现增量处理和丰富的查询能力 Hudi支持Increme
  • spark内存模型

    Spark 1 6 开始使用了统一内存管理模块 UnifiedMemoryManager 并引入了堆外内存 Off heap memory 1 6之前的内存管理就不进行介绍了 spark堆内和堆外内存模型的示意图 注意 堆外内存是依赖于wo
  • 使用Flink1.16.0的SQLGateway迁移Hive SQL任务

    使用Flink的SQL Gateway迁移Hive SQL任务 前言 我们有数万个离线任务 主要还是默认的DataPhin调度CDP集群的Hive On Tez这种低成本任务 当然也有PySpark 打Jar包的Spark和打Jar包的Fl
  • spark groupByKey和groupBy,groupByKey和reduceByKey的区别

    1 groupByKey Vs groupBy 用于对pairRDD按照key进行排序 author starxhong object Test def main args Array String Unit val sparkConf n
  • Spark学习(文件读取路径)

    在不同的启动模式下 加载文件时的路径写法是不一样的 对于local模式下 默认就是读取本地文件 而在standlone或者yarn client 或者cluster模式下 默认读的都是hdfs文件系统 这几种模式下很难读取本地文件 这是很显
  • Spark 任务调度机制

    1 Spark任务提交流程 Spark YARN Cluster模式下的任务提交流程 如下图所示 图YARN Cluster任务提交流程 下面的时序图清晰地说明了一个Spark应用程序从提交到运行的完整流程 图Spark任务提交时序图 提交
  • 【硬刚大数据之学习路线篇】2021年从零到大数据专家的学习指南(全面升级版)

    欢迎关注博客主页 https blog csdn net u013411339 本文由 王知无 原创 首发于 CSDN博客 本文首发CSDN论坛 未经过官方和本人允许 严禁转载 欢迎点赞 收藏 留言 欢迎留言交流 声明 本篇博客在我之前发表
  • 通过yarn提交作业到spark,运行一段时间后报错。

    加粗样式
  • spark hadoop环境及运行

    hadoop配置 在Ubuntu20 04里安装Hadoop详细步骤 图文 亲测成功 ubuntu20 04安装hadoop 菜鸡的学习之路的博客 CSDN博客 启动hadoop root ubuntu usr local hadoop s
  • spark SQL基础教程

    1 sparkSQL入门 sparksql专门用于处理结构化的数据 而RDD还可以处理非结构化的数据 sparksql的优点之一是sparkfsql使用统一的api读取不同的数据 第二个优点是可以在语言中使用其他语言 例如python 另外
  • spark相关

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 pandas是什么 二 使用步骤 1 引入库 2 读入数据 总结 前言 提示 这里可以添加本文要记录的大概内容 例如 随着人工智能的不断发展 机器学习这门

随机推荐

  • Unity5中叹为观止的实时GI效果

    原地址 今天为大家分享 unity与Alex Lovett共同使用 unity5制作的Shrine Arch viz Demo 其中充分利用了Unity5的实时全局光照功能 实在是太过惊艳 随便一帧都可以直接拿来当做屏保 上面的Demo使用
  • java trim 空指针_trim()空指针异常问题!

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 先上程序 import java io BufferedReader import java io File import java io FileReader public class Dis
  • springboot实现文件的上传下载

    SpringBoot文件上传与下载 文件的上传与下载 在springmvc阶段要实现文件的上传下载 需要的依赖 gt gt
  • gdb调试命令的使用及总结

    gdb是一个在UNIX环境下的命令行调试工具 如果需要使用gdb调试程序 请在gcc时加上 g选项 下面的命令部分是简化版 比如使用l代替list等等 1 基本命令 命令 描述 backtrace 或bt 查看各级函数调用及参数 finis
  • Python字符编码及转换

    Python字符编码及转换 在Python开发中 我们经常需要处理不同的字符编码问题 因为在不同的系统 平台 语言之间 字符编码的表示方式是不同的 本文将介绍Python中常见的字符编码以及它们之间的转换方法 ASCII字符集 ASCII
  • Linux保护文件实现,Linux完整性保护机制模块实现分析(1)

    原标题 Linux完整性保护机制模块实现分析 1 2 详细分析2 1 模块功能描述 文件系统完整性模块包含四种机制 监控磁盘机制 同步机制 检查修复文件系统机制 监视文件系统机制 1 监控磁盘机制主要由statfs by dentry vf
  • Maven安装(超详解)

    2 4 1 下载 下载地址 Maven Download Apache Maven 在提供的资料中 已经提供了下载好的安装包 如下 2 4 2 安装步骤 Maven安装配置步骤 解压安装 配置仓库 配置Maven环境变量 1 解压 apac
  • 生成csv

    package com study csv import java io File import java io FileNotFoundException import java io FileOutputStream import ja
  • python 行转列与气泡图,echarts玫瑰图画图

    原数据 stack data stack to frame head 10 unstack data pd read excel Users bella Desktop 考研 xlsx dropna data data set index
  • 经典坐标变换案例代码剖析

    题目 设有小萝卜一号和小萝卜二号位于世界坐标系中 记世界坐标系为W 小萝卜们的坐标系为R1和 R2 小萝卜一号的位姿为q2 0 35 0 2 0 3 0 1 T t1 0 3 0 1 0 1 T 小萝卜二号的位姿为q2 0 5 0 4 0
  • 镜像下载网站(全网最全)

    几家企业提供的镜像站 阿里云开源镜像 http mirrors aliyun com 搜狐开源镜像 http mirrors sohu com 网易开源镜像 http mirrors 163 com LUPA http mirror lup
  • Vue的详细教程--基础语法【上】

    Welcome Huihui s Code World 接下来看看由辉辉所写的关于Vue的相关操作吧 目录 Welcome Huihui s Code World 一 插值 1 文本 2 html 3 属性 class绑定 style绑定
  • 西门子200SMART(四) 程序块

    程序块是显示当前项目包含的程序列表 一般初始状态共有三个 主程序 子程序和中断程序 如下图 当然 鼠标右键选择某一个程序块 可以重命名 支持中文 然而一个项目中 只能有一个主程序 也必须有一个主程序 所以主程序是无法删除的 在初始状态下 想
  • 用pm2在本地部署服务器node项目,全栈实用技能,pm2部署node应用到服务器

    好东西就要拿来分享 不管你的目标是前端还是全栈 都值得一看 背景介绍 一般的 我们开发一个前端项目通常是在本地通过Node js搭一个服务器 所有的开发测试过程基本上都是在本地搞定 有时候 我们需要把我们的作品上线 好让更多的人能够访问到
  • 某网站hexin-v的解决方法,hexin-v的解密方法,hexin-v的生成方法

    前段时间做的项目 一直运行比较稳定 最近几天运行过程中异常退出 结果发现是某网站接口有变 使用谷歌chrome 对原网站进行调试 结果发现某个js页面有变化 以原有的hexin v生成方法 已经不能适用于最新的算法 本想偷偷懒从网络上找一下
  • Map和Set

    Map和Set是集合中的两个接口 Set实现了Collection接口 而Map没有实现 Map下面又有很多子类 我们主要研究HashMap和TreeMap Set同样有很多子类 主要研究HashSet和TreeSet 在理解掌握它们之前
  • FTP命令使用实例

    ftp命令是标准的文件传输协议的用户接口 ftp是在TCP IP网络上的计算机之间传输文件的简单有效的方法 它允许用户传输ASCII文件和二进制文件 在ftp会话过程中 用户可以通过使用ftp客户程序连接到另一台计算机上 从此 用户可以在目
  • LR.net敏捷软件开发平台核心功能详解

    软件开发 程序员就是不断地跟变量 方法 类 接口这些东西打交道 随着开发经验的积累 很多程序员会发现 虽然最终开发出来的软件每个都不一样 但是在开发过程中用到的很多东西却又是相通的 例如 每个软件的底层差不多都需要进行增删改查 文件操作 权
  • State 模式

    有限状态机 FSM Finite state machine 例子 1 若状态机在Locked状态收到了一个coin事件 则迁移到Unlocked状态并执行unlock动作 2 若状态机在UnLocked状态收到了一个pass事件 则迁移到
  • 基于Spark MLlib平台的协同过滤算法---电影推荐系统

    说到推荐系统 大家可能立马会想到协同过滤算法 本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用 其中 主要包括三部分内容 协同过滤算法概述 基于模型的协同过滤应用 电影推荐 实时推荐架构分析 一 协同过滤算法概述 本人对算