1. 实例介绍
好友推荐算法在实际的社交环境中应用较多,比如qq软件中的“你可能认识的好友”或者是Facebook中的好友推介。好友推荐功能简单的说是这样一个需求,预测某两个人是否认识,并推荐为好友,并且某两个非好友的用户,他们的共同好友越多,那么他们越可能认识。
2. 数据流程
3. 具体实现
3.1 上传数据qq.txt
tom cat hadoop hello
hello mr tom world hive
cat tom hive
hive cat hadoop world hello mr
mr hive hello
hadoop tom hive world
world hadoop hive hello
每个名字之间用”\t”分隔符隔开,第一个是用户,之后是好友列表。
3.2 代码编写
3.2.1 类型定义
由于A1:A2与A2:A1是同一个潜在好友列表,为了能够方便的统计,故统一按照字典排序,输出A1:A2格式。
public class FoF extends Text {
public FoF() {
super();
}
public FoF(String friend01,String friend02) {
set(getof(friend01,friend02));
}
private String getof(String friend01, String friend02) {
int c = friend01.compareTo(friend02);
if (c>0) {
return friend02+"\t"+friend01;
}
return friend01+"\t"+friend02;
}
}
3.2.2 定义Map01
public class Map01 extends Mapper<LongWritable, Text, FoF, IntWritable> {
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String lines = value.toString();
String friends[] = StringUtils.split(lines, '\t');
for (int i = 1; i < friends.length; i++) {
String friend = friends[i];
context.write(new FoF(friends[0],friend),new IntWritable(0));
for (int j = i+1; j < friends.length; j++) {
String friend2 = friends[j];
context.write(new FoF(friend, friend2), new IntWritable(1));
}
}
}
}
3.2.2 定义Reduce01
public class Reduce01 extends Reducer<FoF, IntWritable,Text, NullWritable> {
@Override
protected void reduce(FoF key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
boolean f = true;
for(IntWritable i : values){
if (0==i.get()) {
f=false;
break;
}
sum+=i.get();
}
System.out.println("******************Reduce01*******************");
if (f) {
String msg = StringUtils.split(key.toString(), '\t')[0]+" "+StringUtils.split(key.toString(), '\t')[1]+" "+sum;
System.out.println(msg);
context.write(new Text(msg), NullWritable.get());
}
}
}
3.2.3 定义 Job类
public static void main(String[] args) {
Boolean flag = jobOne();
}
private static Boolean jobOne() {
Configuration config = new Configuration();
config.set("fs.defaultFS", "hdfs://node01:8020");
config.set("yarn.resourcemanager.hostname", "node02:8088");
boolean flag = false;
try{
Job job = Job.getInstance(config);
job.setJarByClass(JobFriends.class);
job.setJobName("fof one job");
job.setMapperClass(Map01.class);
job.setReducerClass(Reduce01.class);
job.setOutputKeyClass(FoF.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path("/friend/input/qq.txt"));
Path output = new Path("/friend/output/01");
FileSystem fs = FileSystem.get(config);
if (fs.exists(output)) {
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job, output);
flag = job.waitForCompletion(true);
if (flag) {
System.out.println("job1 success...");
}
} catch(Exception e){
e.printStackTrace();
}
return flag;
}
输出结果:
cat hadoop 2
cat hello 2
cat mr 1
cat world 1
hadoop hello 3
hadoop mr 1
hive tom 3
mr tom 1
mr world 2
tom world 2
3.2.4 好友推介计算,类型定义
由于在MapReduce中,key值自动能够排序,而value值往往不可以。所以为了根据每一个用户与其他用户的共同好友个数从高到低排序,不仅需要将用户名作为key,还需要将该用户与推介用户的共同好友个数作为key的一部分,所以需要重新定义一个类。
public class FriendSort implements WritableComparable<FriendSort>{
private String friend;
private int hot;
public String getFriend() {
return friend;
}
public void setFriend(String friend) {
this.friend = friend;
}
public int getHot() {
return hot;
}
public void setHot(int hot) {
this.hot = hot;
}
public FriendSort() {
super();
}
public FriendSort(String friend, int hot) {
this.friend = friend;
this.hot = hot;
}
@Override
public void readFields(DataInput in) throws IOException {
this.friend=in.readUTF();
this.hot=in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(friend);
out.writeInt(hot);
}
@Override
public int compareTo(FriendSort newFriend) {
System.out.println(friend+"-------"+newFriend.getFriend());
int c = friend.compareTo(newFriend.getFriend());
int e = -Integer.compare(hot, newFriend.getHot());
if (c==0) {
return e;
}
return c;
}
}
3.2.5 定义Map02
public class Map02 extends Mapper<LongWritable, Text, FriendSort, Text > {
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String lines = value.toString();
String friend01 = StringUtils.split(lines,' ')[0];
String friend02 = StringUtils.split(lines,' ')[1];
int hot = Integer.parseInt(StringUtils.split(lines,' ')[2]);
System.out.println("**************Map02******************");
System.out.println(friend01+" "+friend02+" "+hot);
System.out.println(friend02+" "+friend01+" "+hot);
context.write(new FriendSort(friend01,hot),new Text(friend02+":"+hot));
context.write(new FriendSort(friend02,hot),new Text(friend01+":"+hot));
}
}
3.2.6 定义sort类
public class NumSort extends WritableComparator{
public NumSort(){
super(FriendSort.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
FriendSort o1 =(FriendSort) a;
FriendSort o2 =(FriendSort) b;
int r =o1.getFriend().compareTo(o2.getFriend());
if(r==0){
return -Integer.compare(o1.getHot(), o2.getHot());
}
return r;
}
}
3.2.7 定义group类
public class UserGroup extends WritableComparator{
public UserGroup(){
super(FriendSort.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
FriendSort o1 =(FriendSort) a;
FriendSort o2 =(FriendSort) b;
return o1.getFriend().compareTo(o2.getFriend());
}
}
3.2.8 定义Reduce02
public class Reduce02 extends Reducer<FriendSort, Text, Text, Text> {
@Override
protected void reduce(FriendSort user, Iterable<Text> friends,Context context)
throws IOException, InterruptedException {
String msg = "";
for(Text friend :friends){
System.out.println("***************Reduce02*****************");
msg += friend.toString()+",";
System.out.println(msg);
}
context.write(new Text(user.getFriend()), new Text(msg));
}
}
3.2.9 定义 Job类
public class JobFriends {
public static void main(String[] args) {
Boolean flag = jobOne();
if (flag) {
jobTwo();
}
}
private static Boolean jobTwo() {
Configuration config =new Configuration();
config.set("fs.defaultFS", "hdfs://node01:8020");
config.set("yarn.resourcemanager.hostname", "node02:8088");
Boolean flag = false;
try {
Job job = Job.getInstance(config);
job.setJarByClass(JobFriends.class);
job.setJobName("fof two job");
job.setMapperClass(Map02.class);
job.setReducerClass(Reduce02.class);
job.setSortComparatorClass(NumSort.class);
job.setGroupingComparatorClass(UserGroup.class);
job.setMapOutputKeyClass(FriendSort.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/friend/output/01/"));
Path output = new Path("/friend/output/02/");
FileSystem fs = FileSystem.get(config);
if (fs.exists(output)) {
fs.delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
flag = job.waitForCompletion(true);
if (flag) {
System.out.println("job2 success...");
}
} catch (Exception e) {
e.printStackTrace();
};
return flag;
}
}
输出结果,完成好友推荐列表:
cat hello:2,hadoop:2,mr:1,world:1,
hadoop hello:3,cat:2,mr:1,
hello hadoop:3,cat:2,
hive tom:3,
mr world:2,hadoop:1,tom:1,cat:1,
tom hive:3,world:2,mr:1,
world mr:2,tom:2,cat:1,
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)