cd /opt/software
tar -zcvf hadoop313.tar.gz
sz hadoop313.tar.gz
以管理员身份解压 D:\software\hadoop313
hadoop.dll------>C:\windows\system32
winutils.exe------>D:\software\hadoop313/bin
windows中hadoop环境变量配置
HADOOP_HOME D:\software\hadoop313
path %HADOOP_HOME%/bin;%HADOOP_HOME%/sbin;
HADOOP_USER_NAME root
配置windows环境下 linux的hostname ip映射
windows/system32/driver/etc/hosts
192.168.75.202 singlefang
sheel 退出安全模式
hadoop dfsadmin -safemode leave
写一个空的maven 自己写pom.xml
<groupId>cn.kgc</groupId>
<artifactId>hd02</artifactId>
<version>1.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<hadoop.version>3.1.3</hadoop.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>2.0.0-alpha1</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest><!--执行jar包的启动类,入口-->
<main-class>cn.kgc.hd.bbb.ReduceJoinJob</main-class><!--陪主类-->
</manifest>
</archive>
</configuration>
<executions><!--可执行-->
<execution>
<id>make-assembly</id>
<phase>package</phase><!--包-->
<goals>
<goal>single</goal><!--单例-->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
将 hadoop313/etc/hadoop/4个site文件 拖拽到resources 中 另加 文件log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=log/hd.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
提前将文件上传到hadoop服务器上
hdfs dfs -put 文件 /文件分类/小文件分类
写文件ReduceJoinJob
package cn.kgc.hd.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
public class MapJoinJob {
public static void main(String[] args) {
Configuration conf = new Configuration(true);
conf.set("mapreduce.app-submission.cross-platform","true");
Job job = null;
try {
Path pathOut = new Path("/test/avgScore");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(pathOut)) {
fs.deleteOnExit(pathOut);
System.out.println(pathOut.getName() + "removed");
}
fs.close();
job = Job.getInstance(conf,"mapJoinJob");
job.setJarByClass(MapJoinJob.class);
job.setJar("target/hd02-1.0-jar-with-dependencies.jar");
job.setMapperClass(ClaAvgMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(ClaAvgReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
job.addCacheFile(URI.create("/test/class/class.log"));
FileInputFormat.addInputPath(job,new Path("/test/score/score.log"));
FileOutputFormat.setOutputPath(job,pathOut);
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
写文件ScoreMapper
package cn.kgc.hd.bbb;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ScoreMapper extends Mapper<LongWritable, Text, Text, Text> {
Text keyOut = new Text();
Text valueOut = new Text();
final String NUM_REGEX = "^\\d+$";
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] ps = value.toString().trim().split(",");
keyOut.set(ps[0]);
if (ps.length == 2){
valueOut.set("C_" + ps[1]);
}else {
valueOut.set((ps[2].matches(NUM_REGEX) ? "S" : "U") + "_" + ps[1] + "_" + ps[2]);
}
context.write(keyOut,valueOut);
}
}
写文件ScoreReducer
package cn.kgc.hd.bbb;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ScoreReducer extends Reducer<Text, Text, Text, Text> {
Text valueOut = new Text();
Text keyOut = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String className = null;
Map<String,String> idName = new HashMap<>();
Map<String, List<Integer>> idScore = new HashMap<>();
for (Text value : values) {
String[] ps = value.toString().trim().split("_");
switch (ps[0]){
case "C":
className = ps[1];
break;
case "S":
int score = Integer.parseInt(ps[2]);
if (idScore.containsKey(ps[1]))
idScore.get(ps[1]).add(score);
else {
List<Integer> list = new ArrayList<>();
list.add(score);
idScore.put(ps[1],list);
}
break;
case "U":
idName.put(ps[1],ps[2]);
break;
}
}
keyOut.set(className);
if (idName.isEmpty()){
valueOut.set("NO_STUDENT");
context.write(keyOut,valueOut);
return;
}
if (idScore.isEmpty()){
valueOut.set("NO_SCORE");
context.write(keyOut,valueOut);
return;
}
StringBuilder builder = new StringBuilder();
int count;
float avg;
for (Map.Entry<String,String> e : idName.entrySet()){
String stuId = e.getKey();
String stuName = e.getValue();
count = 0;
avg = 0;
if (!idScore.containsKey(stuId)) continue;
for (Integer score : idScore.get(stuId)) {
avg += score;
count++;
}
avg /= count;
builder.append(stuName);
builder.append(":");
builder.append(avg);
builder.append(",");
}
builder.deleteCharAt(builder.length() - 1);
valueOut.set(builder.toString());
context.write(keyOut,valueOut);
}
}
修改pom主类信息:
<manifest><!--执行jar包的启动类,入口-->
<main-class>cn.kgc.hd.bbb.ReduceJoinJob</main-class><!--陪主类-->
</manifest>
打胖包
clear backage 复制胖包地址 粘贴
job.setJar("target/hd02-1.0-jar-with-dependencies.jar");
运行主类
ctrl+shift_F10
检查结果
hdfs dfs -cat 输出路径/*
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)