首先,您尝试使用的 JSON 对象不可用。为了解决这个问题:
- 转到此处并下载 zip 文件:https://github.com/douglascrockford/JSON-java https://github.com/douglascrockford/JSON-java
- 解压到子目录 org/json/* 中的源文件夹中
接下来,代码的第一行创建一个包“org.json”,这是不正确的,您应该创建一个单独的包,例如“my.books”。
第三,这里使用combiner是没有用的。
这是我最终得到的代码,它可以工作并解决您的问题:
package my.books;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.json.*;
import javax.security.auth.callback.TextInputCallback;
public class CombineBooks {
public static class Map extends Mapper<LongWritable, Text, Text, Text>{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String author;
String book;
String line = value.toString();
String[] tuple = line.split("\\n");
try{
for(int i=0;i<tuple.length; i++){
JSONObject obj = new JSONObject(tuple[i]);
author = obj.getString("author");
book = obj.getString("book");
context.write(new Text(author), new Text(book));
}
}catch(JSONException e){
e.printStackTrace();
}
}
}
public static class Reduce extends Reducer<Text,Text,NullWritable,Text>{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
try{
JSONObject obj = new JSONObject();
JSONArray ja = new JSONArray();
for(Text val : values){
JSONObject jo = new JSONObject().put("book", val.toString());
ja.put(jo);
}
obj.put("books", ja);
obj.put("author", key.toString());
context.write(NullWritable.get(), new Text(obj.toString()));
}catch(JSONException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println("Usage: CombineBooks <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "CombineBooks");
job.setJarByClass(CombineBooks.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
这是我的项目的文件夹结构:
src
src/my
src/my/books
src/my/books/CombineBooks.java
src/org
src/org/json
src/org/json/zip
src/org/json/zip/BitReader.java
...
src/org/json/zip/None.java
src/org/json/JSONStringer.java
src/org/json/JSONML.java
...
src/org/json/JSONException.java
这是输入
[localhost:CombineBooks]$ hdfs dfs -cat /example.txt
{"author":"author1", "book":"book1"}
{"author":"author1", "book":"book2"}
{"author":"author1", "book":"book3"}
{"author":"author2", "book":"book4"}
{"author":"author2", "book":"book5"}
{"author":"author3", "book":"book6"}
要运行的命令:
hadoop jar ./bookparse.jar my.books.CombineBooks /example.txt /test_output
这是输出:
[pivhdsne:CombineBooks]$ hdfs dfs -cat /test_output/part-r-00000
{"books":[{"book":"book3"},{"book":"book2"},{"book":"book1"}],"author":"author1"}
{"books":[{"book":"book5"},{"book":"book4"}],"author":"author2"}
{"books":[{"book":"book6"}],"author":"author3"}
您可以使用这三个选项中的一个来放置org.json.*
类到您的集群中:
- 打包
org.json.*
类到您的 jar 文件中(可以使用 GUI IDE 轻松完成)。这是我在答案中使用的选项
- 将包含的 jar 文件放入
org.json.*
将每个集群节点上的类放入 CLASSPATH 目录之一(请参阅yarn.application.classpath)
- 将包含的 jar 文件放入
org.json.*
进入HDFS(hdfs dfs -put <org.json jar> <hdfs path>
)并使用job.addFileToClassPath
要求此 jar 文件可用于在集群上执行作业的所有任务。在我的回答中你应该添加job.addFileToClassPath(new Path("<jar_file_on_hdfs_location>"));
to the main