Hadoop中的MapReduce库支持集中不同的格式的输入数据。例如,文本模式的输入数据的每一行被视为一个key/value键值对。key是文件的偏移量,value是那一行的内容。另一种常见的格式是以key进行排序来存储key/value键值对的序列。每种输入类型的实现都必须能够把数据分割成数据片段,并能够由单独的Map任务来对数据片段进行后续处理。
1. 输入格式-InputFormat
当运行一个MapReduce作业的时候,我们需要为作业制定它的输入格式。InputFormat作为Hadoop作业的所有数入格式的抽象基类,它描述了作业的输入需要满足的规范细节。
1.1 InputFormate抽象类
InputFormat所在的包为org.apache.hadoop.mapredce,在该抽象类中定义了连个抽象类:
public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, nterruptedException;
InputFormat有三个直接子类:FIleInputFormat、DBInputFormat、DelegatingInputFormat。而FileInputFormat又包括TextInputFormat、KeyValueInputFormat、CombineFileInputFormat、NineInputFormat和SequenceFIleInputFormat五个子类。
1.2 FIleInputFormat文件输入格式
文件是Hadoop作业最为常用的格式,FIleInputFormat可以根据文件的总大小来将输入文件分割成若干个输入切片。为了保证整个记录不被截断,我们必须采取相应的处理策略,如通过创建RecordReader来保证记录的完整性,从而为Map提供一个面向记录的逻辑分块的试图。
1.2.1 成员属性
public static final String INPUT_DIR =
"mapreduce.input.fileinputformat.inputdir";
public static final String SPLIT_MAXSIZE =
"mapreduce.input.fileinputformat.split.maxsize";
public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";
public static final String PATHFILTER_CLASS =
"mapreduce.input.pathFilter.class";
public static final String NUM_INPUT_FILES =
"mapreduce.input.fileinputformat.numinputfiles";
public static final String INPUT_DIR_RECURSIVE =
"mapreduce.input.fileinputformat.input.dir.recursive";
public static final String LIST_STATUS_NUM_THREADS =
"mapreduce.input.fileinputformat.list-status.num-threads";
public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
private static final double SPLIT_SLOP = 1.1;
1.2.2 成员方法
protected List<FileStatus> listStatus(JobContext job) throws IOException
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else {
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}
1.3 TextInputFormat文本文件输入格式
TextInputFormat是FIleInputFormat抽象类的默认实现。该输入格式主要针对的是文本类型的文件,文件被分割成许多的行,而且每一行使用换行符后者【Enter】键作为每一行结束标识。该类主要重写了createRecordReader和isSplitable方法:
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
1.4 KeyValueTextInputFormat键值对文本输入格式
该类是与TextInputFormat具有类似性质的针对文本类型文件,文件被分割成许多的行,而且每一行使用换行符后者【Enter】键作为每一行结束标识。只不过KeyValueTextInputFormat所对应的文件中的每一行是由一个特殊的分割符所割成的键值对。
1.5 CombineFileInputFormat组合文件输入格式
1.6 SequenceFileInputFormat序列文件输入格式
2. 输入切片-InputSplit
输入切片InputSplit是一个单独的Map要处理的数据单元。输入切片的数据类型一般都是字节类型。输入切片经过相应的RecordReader处理之后,转化成记录视图的形式,然后交给Map处理。一般一条记录一一个键值对的形式来展现。
InputSplitt所在的包为org.apache.hadoop.mapredce,该抽象类定义了两个抽象方法:
public abstract long getLength() throws IOException, InterruptedException;
public abstract String[] getLocations() throws IOException, InterruptedException;
InputSplit的实现类包括:FileSplit、CombineFielSplit和DBInputFormat.DBInputSplit。
2.1 FileSplit文件输入切片
FileSplit是默认的InputSplit,这一点可以从FileInputFormat的创建输入切片的方法中体现出来。
2.1.1 成员变量
private Path file;
private long start;
private long length;
private String[] hosts;
private SplitLocationInfo[] hostInfos;
2.1.2 成员方法
该类主要的成员方法就是上面成员变量所对应的get方法,以及Wrtiable接口的实现方法。FileSplit只对它所在的文件、起始位置、和切片大小属性进行序列化:
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, file.toString());
out.writeLong(start);
out.writeLong(length);
}
同理,FileSplit也只对它所在的文件、起始位置、和切片大小属性进行反序列化,主机列表属性会被默认为0:
@Override
public void readFields(DataInput in) throws IOException {
file = new Path(Text.readString(in));
start = in.readLong();
length = in.readLong();
hosts = null;
}
2.2 CombineFileSplit多文件输入切片
CombineFielSplit是与前面介绍的CombineFielInputFormat输入格式相对应的输入切片类型。FileSpilt代表一个文件的一个输入切片,而CombinFileSplit切片代表将来自多个文件的输入切片的一个输入切片。虽然每个CombineFIleSplit切片一般会包含来自不同文件的数据块,但是在同一个切片中的所有数据块一般都是在同一个机架上的。
2.2.1 成员变量
private Path[] paths;
private long[] startoffset;
private long[] lengths;
private String[] locations;
private long totLength;
2.2.2 成员方法
该类主要的成员方法就是上面成员变量所对应的get方法,以及Writable接口的实现方法。
3. 记录读取器-RecordReader
FileInputFormat的一些子类中都实现了createRecordReader方法,并返回了用于处理输入切片的RecordReader。该类包含6个抽象方法:
public abstract void initialize(InputSplit split,
TaskAttemptContext context
) throws IOException, InterruptedException;
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;
4. 输出格式-OutputFormat
4.1 OutputFormat抽象类
OutputFormat抽象类描述了MapReduce作业的输出规范,它决定了MapReduce的作业输出结果保存到哪里,以及如何对输出结果进行持久化操作。主要工作有:
- 检查作业的输出是否有效,比如检查输出目录是否存在;
- 提供一个具体的RecordWriter实现类。Hadoop依靠该实现类将MapReduce作业的处理结果保存到制定文件系统的文件中,一般写到HDFS文件系统中。
public abstract void checkOutputSpecs(JobContext context
) throws IOException,
InterruptedException;
public abstract RecordWriter<K, V>
getRecordWriter(TaskAttemptContext context
) throws IOException, InterruptedException;
public abstract
OutputCommitter getOutputCommitter(TaskAttemptContext context
) throws IOException, InterruptedException;
4.2 FileOutputFormat 文件输出格式
该类在工作的过程中利用大量配置对象中所包含的配置项,如是否压缩等。
4.2.1 枚举类
public static enum Counter {
BYTES_WRITTEN
}
4.2.2 成员变量
private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
protected static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
protected static final String PART = "part";
private FileOutputCommitter committer = null;
4.2.3 静态代码块
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
}
4.2.4 成员方法
public static void setCompressOutput(Job job, boolean compress) {
job.getConfiguration().setBoolean(FileOutputFormat.COMPRESS, compress);
}
public static boolean getCompressOutput(JobContext job) {
return job.getConfiguration().getBoolean(
FileOutputFormat.COMPRESS, false);
}
public static void setOutputPath(Job job, Path outputDir) {
...
}
public synchronized static String getUniqueFile(TaskAttemptContext context,
String name,
String extension) {
...
}
4.3 TextOutFormat 文本格式的文件输出格式
4.4 SequenceFileOutputFormat 普通序列文件输出格式
4.5 SequenceFileAsBinaryOutputFormat 二进制序列文件输出格式
4.6 FilterOutputFormat 过滤器输出格式
4.6 DBOutputFormat 数据库输出格式
4.8 MultipleOutputs 多种输出格式
5. 记录写入器-RecordWriter
RecordWriter用于将MapReduce作业的键值对结果写入到制定的输出中。
public abstract void write(K key, V value
) throws IOException, InterruptedException;
public abstract void close(TaskAttemptContext context
) throws IOException, InterruptedException;
6. 输出提交器-OutputCommitter
OutputCommitter主要用于控制MapReduce作业的输出环境。主要一下工作:
- 在OutputCommitter初始化时启动jon。比如会创建job的临时输出目录;
- 在作业完成之后清除job申请的资源。比如会删除job的临时目录;
- 为Mapper或者Reducer任务创建临时的输出目录;
- 检查Mapper或者Reducer任务是否需要提交;
- 提交或者丢弃MapReduce任务的输出。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)