大数据Hadoop学习之————基于物品的协同过滤算法实现物品推荐

2023-11-11

一、基础概念

协同过滤算法一般分为两种实现:

  1. 基于用户的协同过滤算法(userCF):通过寻找相似兴趣的其他用户,为指定用户推荐物品。比如用户A喜欢商品A、B,用户B也喜欢商品A和B,则可以认为用户A和B兴趣相似,这时候就可以像用户A推荐用户B喜欢的商品C。
                                                       用户A对物品Y的喜好 = 用户B对物品Y的喜好 * 用户A和用户B的相似度
  2. 基于物品的协同过滤算法(itemCF):通过寻找用户感兴趣的物品的相似物品进行推荐。这里物品的相似度根据对它感兴趣的用户决定,即两个物品出现在同一个用户的次数。比如用户A和B都同时喜欢商品A和B,则可以认为物品A和物品B比较相似,这时候就可以像对物品A感兴趣的用户C推荐物品B。
                                                       用户A对物品Y的喜好 = 用户A对物品X的喜好 * 物品X和物品Y的相似度

第一种基于用户的协同过滤算法适用于公司初期,用户数量不多的情况下;这里物品推荐实现采用的是第二种,基于物品的协同过滤算法。

二、实现算法

      1、用户对物品的推荐列表(用户为列,物品为行) = 用户对物品的评分矩阵 × 物品同现矩阵

      2、用户对物品的评分矩阵:用户对物品的点击、收藏、加购物车和购买等行为都是对物品的不同评分,矩阵如下:

             并且,矩阵可以进行行列转换

 

      3、物品同现矩阵:两个物品出现在同一个用户的次数,即item1和item2都出现在user1和user3,所以item1:item2的同现值为2,当然item2:item1的同现值也为2,所以物品同现矩阵是一个对称矩阵,对称矩阵的行列转换不会改变数据形态,如下:

     4、矩阵相乘(行*列)计算可以得到推荐列表:

       可以看到,两种计算方法得到的结果都是一样的,只是矩阵展示形式不一样,user1的优先推荐物品都是item2,评分为9,其次是item1,评分是8,最后是item3,评分是5。

 

三、MapReduce分析

一、使用itemId作为key,来进行矩阵的行列计算,因为:

        1、reduce一次只计算一组相同key的数据;

        2、行列计算需要用户评分矩阵数据和物品同现矩阵树;

        3、物品同现矩阵的行和列都是itemId,所以只能用itemId。

二、因为是以itemId作为key来计算,所以要给用户对物品评分矩阵做行列转换,如上A行列转换变成B,这样在进行redice计算是就可以输入所有用户对物品itemId的评分。

三、MapReduce使用的计算方式应该是第一种  物品对用户的评分矩阵A * 物品同现矩阵C

四、行列计算分析,等号前面是行列计算各乘积算和,就可以得到对应用户对物品的推荐值:

         user1:item1 * item1:item1 + user1:item2 * item2:item1 + user1:item3 * item3:item1 = user1:item1

         user1:item1 * item1:item2 + user1:item2 * item2:item2 + user1:item3 * item3:item2 = user1:item2

         user1:item1 * item1:item3 + user1:item2 * item2:item3 + user1:item3 * item3:item3 = user1:item3

       注意,这里每一个乘积的因子都有一个共同的itemId,且用户对物品的推荐值与乘积因子的其他两个元素相对应,如user1:item1,因此矩阵相乘计算可以分以下两步reduce完成:

               1、第一次reduce对同一物品id计算所有的 评分 * 同现值,获得一个乘积列表,包括如:user1:item1 * item1:item1user1:item1 * item1:item2,输出已userId为key,item1、item2拼上乘积作为key为vaue;

               2、第二次reduce求和,以user1:item1为一类,计算user1对item1的推荐总值。所有用户对所有物品都计算求和,就可以得到用户对物品的推荐列表。

 

四、MapReduce实现

 测试数据

user2,item3,click
user1,item1,click
user1,item1,collect
user1,item2,click
user2,item2,collect
user2,item3,click
user3,item1,collect
user3,item2,click
user1,item2,click
user3,item1,collect
user1,item2,click
user3,item3,click

第一步——原始数据去重

       如果原始数据记录有重复,则需要一个MapReduce进行去重,即用户对物品的同一行为的多条数据去重。可以设置输入格式化类为KeyValueTextInputFormat,然后Map只做透传,可以直接用框架的Mapper类,reduce只输出第一条记录,主要代码如下:

//设置输入格式化类,是的整条记录作为key
job.setInputFormatClass(KeyValueTextInputFormat.class);
public class ItemCFStep1Reducer extends Reducer<Text, NullWritable, Text, NullWritable> {

	@Override
	protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
		//只输出一个
		context.write(key, NullWritable.get());
	}
}

第二步——获得用户对物品的评分矩阵

    一、mpper的输入数据是上一步的结果集,输出用户对应物品的评分记录,如user1 item1:1。代码如下:

public class ItemCFStep2Mapper extends Mapper<Text, Text, Text, Text> {
	private final Text mkey = new Text();
	private final Text mval = new Text();

	@Override
	protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
		//样本数据:key i1890		value u10218,collect
		StringTokenizer st = new StringTokenizer(value.toString(), ",");
		String userId = st.nextToken();
		//用户对物品评分
		int recode = UserAction.getRecord(st.nextToken());
		mkey.set(userId);
		mval.set(key.toString() + ":" + recode);
		context.write(mkey, mval);
	}

}

    注意,这里创建job时需要先设置输入格式化类,以及分割符,如下:

job.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
job.setInputFormatClass(KeyValueTextInputFormat.class);

     二、reducer需要完成下面两步工作:

            1、统计各用户对个物品的评分; 2、合并同个用户的所有物品及其评分;

public class ItemCFStep2Reducer extends Reducer<Text, Text, Text, Text> {

	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
		//样本数据:
		//u10218	i1890:1
		HashMap<String, Integer> map = new HashMap<>();
		for (Text value : values) {
			//对相同用户的相同商品累计分数
			StringTokenizer st = new StringTokenizer(value.toString(), ":");
			String itemId = st.nextToken();
			int tmpRecord = Integer.parseInt(st.nextToken());
			Integer record = map.get(itemId);
			map.put(itemId, record == null ? tmpRecord : tmpRecord + record);
		}

		StringBuffer sb = new StringBuffer();
		for (Map.Entry<String, Integer> entry : map.entrySet()) {
			sb.append(entry.getKey()).append(":").append(entry.getValue()).append(",");
		}
		//输出的同一个用户的物品不会重复
		context.write(key, new Text(sb.substring(0, sb.length() - 1)));
	}

}

      三、最终获得的结果集如下:

user1	item1:3,item2:1
user2	item2:2,item3:1
user3	item1:2,item2:1,item3:1

第三步——从用户评分矩阵获得物品同现矩阵数据

     一、mapper的输入数据是上一步的结果集,然对各个用户的物品评分列表里的物品两两输出,一条记录计数为1。注意,物品自己对自己也要输出一条记录。

public class ItemCFStep3Mapper extends Mapper<Text, Text, Text, IntWritable> {
	private final Text mkey = new Text();
	private final IntWritable mval = new IntWritable(1);

	@Override
	protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
		//数据样本:u10224	i1500:3,i1748:2,i1627:4,i1966:3
		String[] userReocers = StringUtils.split(value.toString(), ',');
			for (int i = 0; i < userReocers.length; i++) {
				String item1 = userReocers[i].split(":")[0];
				//输出自己
				mkey.set(item1 + ":" + item1);
				context.write(mkey, mval);
				for (int j = i + 1; j < userReocers.length; j++) {
					mkey.set(new Text(item1 + ":" + userReocers[j].split(":")[0]));
					context.write(mkey, mval);
				}
			}
	}
}

    二、reducer对相同的两个物品统计同现次数,注意!!!输出时还要多输出一条镜像记录,如记录item1:item2还要输出一条item2:item1,不然得到的同现矩阵只有一半的数据。

public class ItemCFStep3Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	private final Text rkey = new Text();
	private final IntWritable rval = new IntWritable();

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		//样本数据:item1:item2	1
		int count = 0;
		for (IntWritable value : values) {
			count += value.get();
		}
		rval.set(count);
		String[] ss = StringUtils.split(key.toString(), ':');
		if (!ss[0].equals(ss[1])) {
			//注意!!!这里要对物品同现值做一个镜像输出(物品对自己的不需要),不然得到的物品同现矩阵只有一半的数据
			rkey.set(ss[1] + ":" + ss[0]);
			context.write(rkey, rval);
		}
		context.write(key,rval);
	}
}

    三、输出结果集

item1:item1	2
item2:item1	2
item1:item2	2
item3:item1	1
item1:item3	1
item2:item2	3
item3:item2	2
item2:item3	2
item3:item3	2

第四步——两个矩阵的行列各元素相乘

   一、这里mapper需要处理两个数据集,第二步得到的用户评分矩阵和第三步得到的同现矩阵。两个数据集的输出都以itemId做为key,第一个数据集以userId:3作为value,第二个数据集以itemId:2作为value,所以为了在reduce中可以区分这两种数据,输入时要改value加上相应的标识。

public class ItemCFStep4Mapper extends Mapper<Text, Text, Text, Text> {
	private String file = "";
	private final Text mkey = new Text();
	private final Text mval = new Text();

	@Override
	protected void setup(Context context) throws IOException, InterruptedException {
		FileSplit fs = (FileSplit) context.getInputSplit();
		file = fs.getPath().getParent().getName();
	}

	@Override
	protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
		//两个数据集样本
		//i1000:i1000	40
		//u10224	i1500:3,i1748:2,i1627:4,i1966:3
		if (file.equals("output3")) {
			String[] items = StringUtils.split(key.toString(), ':');
			mkey.set(items[0]);
			mval.set("A:" + items[1] + "," + value.toString());
			context.write(mkey, mval);
		} else {
			String[] itemRecords = StringUtils.split(value.toString(), ',');
			for (String itemRecord : itemRecords) {
				String[] ss = StringUtils.split(itemRecord, ':');
				mkey.set(ss[0]);
				mval.set("B:" + key.toString() + "," + ss[1]);
				context.write(mkey, mval);
			}
		}
	}
}

    二、reduce以itemId为维度,对所有用户对该物品的评分和所有物品与该物品的同现值两两相乘,并将乘积一一输出,userId作为key,物品的itemId(注意不是作为reduce输入key的itemId)拼上乘积作为value。这样我们计算矩阵行列相乘里需要累加的乘积的数据集就得到了。主要代码逻辑:

      1、mapA保存所有物品和该物品同现数次,mapB保存所有用户对该物品的评分

      2、对mapA和mapB做嵌套迭代,每次内层迭代输出键值对,key为用户id,value为物品i拼上两项乘积。              

                            两项乘积 = i物品对item1的同现次数 * j用户对item1的评分

public class ItemCFStep4Reducer extends Reducer<Text, Text, Text, Text> {
	/**正则表达式可以预编译,所以最好现在外面创建表达式对象*/
	public static final Pattern PATTERN = Pattern.compile("[:,]");
	private final Text rkey = new Text();
	private final Text rval = new Text();

	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
		//样本数据:
		//物品同现矩阵数据
		//item1	A:item2,1
		//item1	A:item1,1
		//物品对用户评分数据
		//item1	B:user1,2
		//item1	B:user2,1

		//保存同现矩阵数据,item1 1
		Map<String, Integer> mapA = new HashMap<>();
		//保存用户评分数据,user1 1
		Map<String, Integer> mapB = new HashMap<>();
		for (Text value : values) {
			String[] ss = PATTERN.split(value.toString());
			if ("A".equals(ss[0])) {
				mapA.put(ss[1], Integer.parseInt(ss[2]));
			} else if ("B".equals(ss[0])) {
				mapB.put(ss[1], Integer.parseInt(ss[2]));
			}
		}

		for (Map.Entry<String, Integer> entryA : mapA.entrySet()) {
			for (Map.Entry<String, Integer> entryB : mapB.entrySet()) {
				rkey.set(entryB.getKey());
				rval.set(entryA.getKey() + "," + entryA.getValue() * entryB.getValue());
				context.write(rkey, rval);
			}
		}
	}

}

    三、输出结果集

user1	item2,6
user3	item2,4
user1	item1,6
user3	item1,4
user1	item3,3
user3	item3,2
user1	item2,3
user2	item2,6
user3	item2,3
user1	item3,2
user2	item3,4
user3	item3,2
user2	item3,2
user3	item3,2

 

第五步——计算用户对每个物品的物品的推荐评分

   一、mapper的输入数据是上一步的结果集,直接输出key和value,可以用默认的Mapper。

   二、reduce根据用户id计算对各物品的评分,同一物品需要累计求和,即根据userId和ItemId确定一组求和,即可得到用户对某物品的推荐值。

public class ItemCFStep5Reducer extends Reducer<Text, Text, Text, Text> {
	private final Text rval = new Text();

	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
		//样本数据
		//user1	item2,6
		//user1	item1,6
		//user1	item2,3
		Map<String, Integer> map = new HashMap<>();
		for (Text text : values) {
			String[] ss = StringUtils.split(text.toString(), ',');
			Integer record = map.get(ss[0]);
			map.put(ss[0], record == null ? Integer.parseInt(ss[1]) : record + Integer.parseInt(ss[1]));
		}
		// StringBuffer sb = new StringBuffer();
		for (Map.Entry<String, Integer> entry : map.entrySet()) {
			rval.set(entry.getKey() + ":" + entry.getValue());
			context.write(key, rval);
			// sb.append(entry.getKey()).append(":").append(entry.getValue()).append(",");
		}
		// rval.set(sb.substring(0, sb.length() - 1));
	}
}

    三、输出结果集:

user1:item2	9
user1:item1	8
user1:item3	5
user2:item1	5
user2:item2	8
user2:item3	6
user3:item1	7
user3:item2	9
user3:item3	6

第六步——获得用户推荐列表

       1、mapper对记录转换成 user1:8    item1

       2、排序比较器根据用户id和评分排序,评分是倒排序

       3、分组比较器按照用户id分组

       4、reducer对同一个用户的推荐物品及评分合并

   一、mapper的输入数据是上一步的结果集,转换数据形态。

public class ItemCFStep6Mapper extends Mapper<Text, Text, Text, Text> {
	private final Text mkev = new Text();
	private final Text mval = new Text();

	@Override
	protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
		//样本数据:u10004:i1090 253
		String[] ss = StringUtils.split(value.toString(), ':');
		mkev.set(key.toString() + ":" + ss[1]);
		mval.set(ss[0]);
		context.write(mkev, mval);
	}
}

 二、排序比较器

public class ItemCFStep6Comparator extends WritableComparator {

	public ItemCFStep6Comparator() {
		super(Text.class, true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		String[] as = StringUtils.split(a.toString(), ":");
		String[] bs = StringUtils.split(b.toString(), ":");
		int i = as[0].compareTo(bs[0]);
		if (i == 0) {
			//这里按分值倒序
			return Integer.compare(Integer.parseInt(bs[1]), Integer.parseInt(as[1]));
		}
		return i;
	}
}

三、分组比较器

public class ItemCFStep6GroupComparator extends WritableComparator {

	public ItemCFStep6GroupComparator() {
		super(Text.class, true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		String[] as = StringUtils.split(a.toString(), ":");
		String[] bs = StringUtils.split(b.toString(), ":");
		return as[0].compareTo(bs[0]);
	}
}

四、reducer拼接排好序的所有商品推荐列表

public class ItemCFStep6Reducer extends Reducer<Text, Text, Text, Text> {
	private final Text rkey = new Text();
	private final Text rval = new Text();

	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
		String[] ss = StringUtils.split(key.toString(), ':');
		Iterator<Text> iterator = values.iterator();
		StringBuffer sb = new StringBuffer(iterator.next().toString() + ":" + ss[1]);
		while (iterator.hasNext()) {
			ss = StringUtils.split(key.toString(), ':');
			sb.append(",").append(iterator.next().toString() + ":" + ss[1]);
		}
		rkey.set(ss[0]);
		rval.set(sb.toString());
		context.write(rkey, rval);
	}
}

五、最后输出排好序的用户物品推荐列表

user1	item2:9,item1:8,item3:5
user2	item2:8,item3:6,item1:5
user3	item2:9,item1:7,item3:6

六、完整代码详见码云:hadoop-test传送门

 

五、通过Spark算子实现

/**
 * 通过Spark算子实现协同过滤算法的物品推荐
 */
public class ItemUserSpark {

	public static void main(String[] args) {
		SparkSession ss = SparkSession.builder().master("local")
				.appName("item_user")
				.getOrCreate();
		JavaSparkContext jsc = new JavaSparkContext(ss.sparkContext());
		JavaRDD<String> textRdd = jsc.textFile("file:/bigdata/hadoop-test/input/itemCF/test/input/test_data.txt");
		/*@step1 加载测试数据转换为二元组rdd:<itemId,<userId,record>>*/
		JavaPairRDD<String, Tuple2<String, Integer>> userItemRecordRdd = textRdd.distinct().mapToPair(line -> new Tuple2<>(line.split(",")[0],
				new Tuple2<>(line.split(",")[1], UserAction.getRecord(line.split(",")[2]))));
		/*@step2 计算各个用户对物品的评分:<itemId,Map<userId,record>>*/
		JavaPairRDD<String, HashMap<String, Integer>> userItemRecordCountRdd = userItemRecordRdd.aggregateByKey(new HashMap<>(), new Function2<HashMap<String, Integer>, Tuple2<String, Integer>, HashMap<String, Integer>>() {
			private static final long serialVersionUID = 4896422919000738408L;

			@Override
			public HashMap<String, Integer> call(HashMap<String, Integer> v1, Tuple2<String, Integer> v2) throws Exception {
				v1.merge(v2._1, v2._2, Integer::sum);
				return v1;
			}
		}, new Function2<HashMap<String, Integer>, HashMap<String, Integer>, HashMap<String
				, Integer>>() {
			private static final long serialVersionUID = 2135196741108911099L;

			@Override
			public HashMap<String, Integer> call(HashMap<String, Integer> v1, HashMap<String, Integer> v2) throws Exception {
				for (Map.Entry<String, Integer> entry : v2.entrySet()) {
					v1.merge(entry.getKey(), entry.getValue(), Integer::sum);
				}
				return v1;
			}
		});
		userItemRecordCountRdd.cache();
		/*@step3 转换为统计后的用户对物品评分rdd:<itemId,<userId,record>>*/
		JavaPairRDD<String, Tuple2<String, Integer>> itemUserRecordRdd = userItemRecordCountRdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<String, HashMap<String, Integer>>>, String, Tuple2<String, Integer>>() {
			private static final long serialVersionUID = -2318624350497226532L;

			@Override
			public Iterator<Tuple2<String, Tuple2<String, Integer>>> call(Iterator<Tuple2<String, HashMap<String, Integer>>> tuple2Iterator) throws Exception {
				List<Tuple2<String, Tuple2<String, Integer>>> ret = new ArrayList<>();
				while (tuple2Iterator.hasNext()) {
					Tuple2<String, HashMap<String, Integer>> next = tuple2Iterator.next();
					for (Map.Entry<String, Integer> entry : next._2.entrySet()) {
						ret.add(new Tuple2<>(entry.getKey(), new Tuple2<>(next._1, entry.getValue())));
					}
				}
				return ret.iterator();
			}
		});
		/*@step4 生成物品对物品同现次数的二元组rdd:<<itemId,itemId>, 1>*/
		JavaPairRDD<Tuple2<String, String>, Integer> itemItemTmpRdd = userItemRecordCountRdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<String, HashMap<String, Integer>>>, Tuple2<String, String>, Integer>() {
			private static final long serialVersionUID = -2318624350497226532L;

			@Override
			public Iterator<Tuple2<Tuple2<String, String>, Integer>> call(Iterator<Tuple2<String, HashMap<String, Integer>>> tuple2Iterator) throws Exception {
				List<Tuple2<Tuple2<String, String>, Integer>> ret = new ArrayList<>();
				while (tuple2Iterator.hasNext()) {
					Set<String> itemSet = tuple2Iterator.next()._2.keySet();
					Object[] os = itemSet.toArray();
					for (int i = 0; i < os.length; i++) {
						ret.add(new Tuple2<>(new Tuple2<>(os[i].toString(), os[i].toString()), 1));
						for (int j = i + 1; j < os.length; j++) {
							ret.add(new Tuple2<>(new Tuple2<>(os[i].toString(), os[j].toString()), 1));
						}
					}
				}
				return ret.iterator();
			}
		});
		/*@step5 统计得到物品对物品同现矩阵rdd:<itemId,<itemId, count>>*/
		JavaPairRDD<String, Tuple2<String, Integer>> itemItemRdd = itemItemTmpRdd.reduceByKey(Integer::sum)
				.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<Tuple2<String, String>, Integer>>, String, Tuple2<String, Integer>>() {
					private static final long serialVersionUID = 8193629530163757444L;

					@Override
					public Iterator<Tuple2<String, Tuple2<String, Integer>>> call(Iterator<Tuple2<Tuple2<String, String>, Integer>> tuple2Iterator) throws Exception {
						List<Tuple2<String, Tuple2<String, Integer>>> ret = new ArrayList<>();
						while (tuple2Iterator.hasNext()) {
							Tuple2<Tuple2<String, String>, Integer> mapTuple2 = tuple2Iterator.next();
							ret.add(new Tuple2<>(mapTuple2._1._1, new Tuple2<>(mapTuple2._1._2, mapTuple2._2)));
							if (!mapTuple2._1._1.equals(mapTuple2._1._2)) {
								//如果两个物品不相等,需要再输出一条镜像记录
								ret.add(new Tuple2<>(mapTuple2._1._2, new Tuple2<>(mapTuple2._1._1, mapTuple2._2)));
							}
						}
						return ret.iterator();
					}
				});
		itemItemRdd.cache();
		System.out.println("**********物品同现矩阵");
		itemItemRdd.foreach(t -> System.out.println(t));
		/*@step7 用户对物品评分矩阵乘以物品同现矩阵:<itemId, <userId, record>, <itemId, count>>*/
		JavaPairRDD<String, Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> join = itemUserRecordRdd.join(itemItemRdd);
		/*@step8 得到矩阵行列乘积rdd:<<userId, itemId>, value>*/
		JavaPairRDD<Tuple2<String, String>, Integer> userItemMultiRdd = join.mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>>, Tuple2<String, String>, Integer>() {
			private static final long serialVersionUID = -2951379396437189294L;

			@Override
			public Tuple2<Tuple2<String, String>, Integer> call(Tuple2<String, Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> t) throws Exception {
				return new Tuple2<>(new Tuple2<>(t._2._1._1, t._2._2._1), t._2._1._2 * t._2._2._2);
			}
		});
		/*@step9 求和得到用户对每个物品的物品的推荐评分矩阵:<<userId, itemId>, value>*/
		JavaPairRDD<Tuple2<String, String>, Integer> userItemMultiRecordRdd = userItemMultiRdd.reduceByKey(Integer::sum);
		System.out.println("**********用户对物品推荐矩阵");
		userItemMultiRecordRdd.foreach(t -> System.out.println(t));
		/*@step10 转换成二元组rdd:<<userId, value>, itemId>*/
		JavaPairRDD<Tuple2<String, Integer>, String> userRecordItemRdd = userItemMultiRecordRdd.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, Tuple2<String, Integer>, String>() {
			private static final long serialVersionUID = 3046037895848145693L;

			@Override
			public Tuple2<Tuple2<String, Integer>, String> call(Tuple2<Tuple2<String, String>, Integer> t) throws Exception {
				return new Tuple2<>(new Tuple2<>(t._1._1, t._2), t._1._2);
			}
		});
		//排序聚合后的推荐列表rdd:<userId, string>
		JavaPairRDD<String, String> userItemsRdd = userRecordItemRdd.sortByKey(new UserRecordCompare())
				/*@step11 自定义比较排序器,先按照userId自然排序,然后按照评分倒序排序*/
				.mapToPair(new PairFunction<Tuple2<Tuple2<String, Integer>, String>, String, Tuple2<String, Integer>>() {
					private static final long serialVersionUID = 2210128857753846055L;

					@Override
					public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Tuple2<String, Integer>, String> t) throws Exception {
						return new Tuple2<>(t._1._1, new Tuple2<>(t._2, t._1._2));
					}
					/*@step12 聚合排序后的每个用户的推荐列表*/
				}).aggregateByKey("", new Function2<String, Tuple2<String, Integer>, String>() {
					private static final long serialVersionUID = -2801745541613060736L;

					@Override
					public String call(String v1, Tuple2<String, Integer> v2) throws Exception {
						return v1 + "," + v2._1 + ":" + v2._2;
					}
				}, new Function2<String, String, String>() {
					private static final long serialVersionUID = -81211973327382009L;

					@Override
					public String call(String v1, String v2) throws Exception {
						return v1 + v2;
					}
				});
		System.out.println("*****用户物品推荐列表");
		userItemsRdd.foreach(t -> System.out.println(t));
	}

	public static class UserRecordCompare implements Serializable, Comparator<Tuple2<String, Integer>> {

		private static final long serialVersionUID = 1254251593402832337L;

		@Override
		public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
			int i = o1._1.compareTo(o2._1);
			if (i == 0) {
				i = o2._2.compareTo(o1._2);
			}
			return i;
		}
	}

}

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

大数据Hadoop学习之————基于物品的协同过滤算法实现物品推荐 的相关文章

随机推荐

  • 目标人脸检测与识别(计算机视觉)

    一 实验目的 通过python 语言编程设计人脸检测算法 以此人脸作为训练样本 训练目标人脸模型 进一步实现目标人脸的识别 通过上述编程促进学生理解并掌握人脸检测及识别的相关原理 同时培养学生的编程能力 二 实验硬 软件环境 笔记本电脑 w
  • hex码与float在线相互转换链接

    hex转gloat链接 float转hex链接
  • centOS yacc lex

    yacc command not found configure error Your operating system s lex is insufficient to compile libpcap flex is a lex repl
  • 指针数组(例题详解)

    include
  • js代码让iframe窗口全屏

    最近在工作中使用novnc远程连接电脑 在ifrmae中嵌入novnc页面 点击全屏按钮 能够让iframe页面全屏 我的思路是 1 点击全屏按钮 让当前页面全屏 2 将iframe重新fixed定位 高宽设置100 left top为0
  • jenkins介绍部署及三种构建方式配置

    1 前言 1 1 jenkins介绍 jenkins是基于java开发的一种持续集成工具 用于监控持续重复的工作 功能包括 1 持续的软件版本发布 测试 2 监控外部调用执行项目 Jenkins其实很早之前就有了 最近火起来的原因是 大家都
  • secureCRT 查看日志常用命令

    cd 进入根目录 cd 回到自己的目录 用户不同则目录也不同 root为 root xxt为 home xxt cd 回到上级目录 pwd 显示当前所在的目录 ls 显示当前目录下的所有文件 grep catalina out 在日志中查找
  • 【1G-6G】移动通信技术发展

    移动通信技术发展 1G 早在1947年 贝尔实验室的科学家就提出了蜂窝通信的概念 在20世纪60年代对此进行了系统的实验 20世纪60年代末 70年代初开始出现了第一个蜂窝 Cellular 系统 蜂窝的意思是将一个大区域划分为若干个相邻的
  • ATT&CK - T1546.003

    事件触发的执行 WMI事件订阅 目的 出现场景 ATT CK T1546 003 https attack mitre org techniques T1546 003 检查方式 复现方式 目的 建立持久性 出现场景 比如一些病毒的启动方式
  • excel 两列模糊匹配给出结果_EXCEL快速对比两列数据的不同

    作者 Miss 蜗牛 链接 https www jianshu com p 68b867d4558a 在工作中 我们经常需要对比两列数据或文本是否相同 如果是比较简单并且比较少的时候 我们可以肉眼一个一个的核对 或者都是数字的时候 可以用减
  • MySQL数据库和Oracle数据库的区别

    由于SQL Server不常用 所以这里只针对MySQL数据库和Oracle数据库的区别 1 对事务的提交 MySQL默认是自动提交 而Oracle默认不自动提交 需要用户手动提交 需要在写commit 指令或者点击commit按钮 2 分
  • TensorFlow Lite 入门样例,亲测有效

    参考链接 tensorflow 物体检测模型相关资料 https github com tensorflow models tree master research object detection java api接口 https ten
  • Java设计模式-状态模式

    状态模式 在软件开发过程中 应用程序中的有些对象可能会根据不同的情况做出不同的行为 我们把这种对象称为有状态的对象 而把影响对象行为的一个或多个动态变化的属性称为状态 当有状态的对象与外部事件产生互动时 其内部状态会发生改变 从而使得其行为
  • 【转】机器学习--- 分类算法详解

    原文链接 http blog csdn net china1000 article details 48597469 感觉狼厂有些把机器学习和数据挖掘神话了 机器学习 数据挖掘的能力其实是有边界的 机器学习 数据挖掘永远是给大公司的业务锦上
  • 电阻并联计算_电工必备10套计算公式,收藏了!

    1 串联电路电流和电压有以下几个规律 如 R1 R2串联 电流 I I1 I2 串联电路中各处的电流相等 电压 U U1 U2 总电压等于各处电压之和 电阻 R R1 R2 总电阻等于各电阻之和 如果n个阻值相同的电阻串联 则有R总 nR
  • Linux如何删除服务器上以问号开头的文件

    编辑配置文件的时候由于写入了中文 保存的时候乱码了 生成了问号文件 正常的删除命令是无法删除的 rm rf vimrc 那如何删除呢 使用文件的inode号 获取inode号 ls i 删除文件 find inum 34164153 exe
  • E: Could not get lock /var/lib/dpkg/lock-frontend - open (11: Resource temporarly unavailable)

    这篇文章阅读量最高 就借下楼 下面是我的视频主页 有数字图像处理 深度学习相关的一些视频分享 欢迎围观 MYVision MY视界的个人空间 哔哩哔哩 bilibili 数字图像处理通俗教程 冈萨雷斯 哔哩哔哩 bilibili 0 Pyt
  • opencv人脸检测--detectMultiScale函数

    opencv人脸检测 detectMultiScale函数 转载请注明出处 http blog csdn net itismelzp article details 50379359 首先上两张图 现在要对上面两张图进行人脸检测 一 Haa
  • sqli-labs 21-40关

    21关 进行基本尝试发现用户名与密码都被过滤 那么先登陆试试吧 登陆之后页面变为 发现注入点可能为ip cookie agent 使用抓包软件试试 看这个东西显然被加密了 尝试过之后发现cookie是注入点 不加密时 报错 查看一下题目co
  • 大数据Hadoop学习之————基于物品的协同过滤算法实现物品推荐

    一 基础概念 协同过滤算法一般分为两种实现 基于用户的协同过滤算法 userCF 通过寻找相似兴趣的其他用户 为指定用户推荐物品 比如用户A喜欢商品A B 用户B也喜欢商品A和B 则可以认为用户A和B兴趣相似 这时候就可以像用户A推荐用户B