目录
关于TableMapper和TableReducer
TableMapReduceUtil
initTableMapperJob
initTableReducerJob
TableInputFormat
TableRecordReader
TableOutputFormat
关于TableMapper和TableReducer
在自定义TableMapper时需要指定两个泛型,即K2和V2的数据类型。K1默认为ImmutableBytesWritable即行键(从hbase中读取到是字节数组),V2默认为Result类型,也就是说一行可能会Get到多个列,这和Hbase的按列存储的存储方式相同,通过Mapper获取到表的数据可以看成在hbase shell中scan得到的结果是类似的,可以说普通的Mapper会对每个kv对调用一次map()函数,而TableMapper是对每列调用一次map()函数(k为行键,v为单元格)
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT>
而自定义TableReducer时需要指定三个泛型,分别为K2,V2和K3的类型。V3默认是org.apache.hadoop.hbase.client.Mutation类型,也就是具体的Put或Delete实例。在reduce()方法中需要写出K3,V3,此时K3的具体内容并不重要,可以通过下面的源码发现后续并没有对K3进行操作。因为Put或Delete实例当中已经包含了本来应该是K3的行键,而TableReducer中使用的Context是org.apache.hadoop.mapreduce.TaskInputOutputContext,该类中只有传入两个参数的write(),作者估计是懒得写一个特殊的Context而直接使用了通用的TaskInputOutputContext,所以write()方法中的第一个参数没有什么意义
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
}
TableMapReduceUtil
可以通过TableMapReduceUtil类来创建Mapper和Reducer的任务
initTableMapperJob
在上篇中进行hbase中表复制中
//参数分别为:表名,扫面器,Mapper类,K2类型,V2类型,任务Job
TableMapReduceUtil.initTableMapperJob(oldTableName, scan, MyMapper.class, Text.class,Text.class,job);
一共6个参数,而其实上底层会自动增加一些参数进行调整,底层方法为
public static void initTableMapperJob(String table, Scan scan,
Class<? extends TableMapper> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, Job job,
boolean addDependencyJars, boolean initCredentials,
Class<? extends InputFormat> inputFormatClass)
throws IOException
参数分析
- table:表名,用户传入
- scan:扫描器,用户传入
- mapper:Mapper类,用户传入
- outputKeyClass:Mapper输出Key的类型,用户传入
- outputValueClass:Mapper输出Value的类型,用户传入
- job:可能仅读取了hdfs有关配置文件的任务,需要确保加载了hbase有关配置参数,用户传入
- addDependencyJars:是否需要加载hbase有关jar包,默认为true
- initCredentials:是否需要为作业初始化与hbase之间的连接验证,默认为true
- inputFormatClass:输入格式,默认为TableInputForamt
//设置输入格式,inputFormatClass为TableInputFormat
job.setInputFormatClass(inputFormatClass);
//设置K2,V2数据类型,若为空则会在以后setOutputKey/ValueClass中设置
if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
//设置Mapper类,mapper为MyMapper.class
job.setMapperClass(mapper);
//若V2为Put类,设置Combiner类为PutCombiner.class
if (Put.class.equals(outputValueClass)) {
job.setCombinerClass(PutCombiner.class);
}
Configuration conf = job.getConfiguration();
//将原来的conf对象与通过HBaseConfiguration创建的conf进行融合,这一步是为了确保读取到了hbase-site.xml中的属性
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
//设置要读取的表,可以理解为输入路径
conf.set(TableInputFormat.INPUT_TABLE, table);
//设置TableInputFormat的扫描器,因为实际上是TableRecordReader在进行读
conf.set(TableInputFormat.SCAN, convertScanToString(scan));
//加载序列化有关类
conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
if (addDependencyJars) {
//在classpath下加载jar包,如hbase-common-1.3.1.jar、hbase-client-1.3.1.jar等等
//以及其他jar包
addDependencyJars(job);
}
if (initCredentials) {
//初始化job与hbase的连接验证
initCredentials(job);
}
在addDependencyJars中不光加载了conf中的jar包,还有以下jar包
public static void addDependencyJars(Job job) throws IOException {
addHBaseDependencyJars(job.getConfiguration());
try {
addDependencyJarsForClasses(job.getConfiguration(),
job.getMapOutputKeyClass(),
job.getMapOutputValueClass(),
job.getInputFormatClass(),
//!默认为LongWritable
job.getOutputKeyClass(),
//默认为Text
job.getOutputValueClass(),
job.getOutputFormatClass(),
job.getPartitionerClass(),
job.getCombinerClass());
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
所以与其他输入格式相同,真正从客户端读数据的部分是在阅读器TableRecordReader实现的,initTableMapperJob仅仅是为TableInputFormat指定一些描述性参数以供TableRecordReader使用
TableInputFormat继承于TableInputFormatBase
TableInputFormatBase中的属性
private Scan scan = null;
private Admin admin;
private Table table;
//分区器
private RegionLocator regionLocator;
//阅读器实例
private TableRecordReader tableRecordReader = null;
private Connection connection;
InputFormat无非有两个功能:创建阅读器TableRecordReader和进行分片、如何分片暂不关心
在初始化中创建阅读器
@Override
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException {
TableSplit tSplit = (TableSplit) split;
final TableRecordReader trr =
this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader();
//从用户传入的扫描器中获取属性构造内部扫描器
Scan sc = new Scan(this.scan);
sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow());
//将扫描器传给阅读器TableRecordReader
trr.setScan(sc);
//将表传给阅读器TableRecordReader
trr.setTable(getTable());
//这个阅读器其实上全是调用TableRecordReader上的方法实现阅读
return new RecordReader<ImmutableBytesWritable, Result>() {
@Override
public void close() throws IOException {
trr.close();
closeTable();
}
@Override
public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
return trr.getCurrentKey();
}
@Override
public Result getCurrentValue() throws IOException, InterruptedException {
return trr.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return trr.getProgress();
}
@Override
public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException,
InterruptedException {
trr.initialize(inputsplit, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return trr.nextKeyValue();
}
};
}
所以具体的实现还是由TableRecordReader来完成的
TableRecordReader
TableRecordReader内部实际上创建了阅读器的具体实现类TableRecordReaderImpl对象
我们需要关注的是initialize()与nextKeyValue()方法
initialize():
public void initialize(InputSplit inputsplit,
TaskAttemptContext context) throws IOException,
InterruptedException {
if (context != null) {
this.context = context;
//计数器有关
getCounter = retrieveGetCounterWithStringsParams(context);
}
//生成一个内部的扫描器
restart(scan.getStartRow());
}
实际上读数据时在 restart()中实现的,得到了ResultScanner实例
public void restart(byte[] firstRow) throws IOException {
//对表的扫描实际上是在内部的currentScan实现的,将TableInputFormat传入的扫描器相关属性获取给currentScan
//同时可以从可生存的异常中重新创建扫描器
currentScan = new Scan(scan);
currentScan.setStartRow(firstRow);
currentScan.setScanMetricsEnabled(true);
if (this.scanner != null) {
this.scanner.close();
}
//在这里读取到了表数据,返回ResultScanner类型
this.scanner = this.htable.getScanner(currentScan);
if (logScannerActivity) {
rowcount = 0;
}
}
nextKeyValue():
public boolean nextKeyValue() throws IOException, InterruptedException {
//构造K1,V1
if (key == null) key = new ImmutableBytesWritable();
if (value == null) value = new Result();
try {
try {
//遍历ResultScanner
value = this.scanner.next();
//读取到了一个单元格作为V1
if (value != null && value.isStale()) numStale++;
} catch (IOException e) {
if (e instanceof DoNotRetryIOException) {
throw e;
}
//遇到IOExceptions时尝试重新重建扫描器,即调用restart()。若调用失败,异常再次抛出
if (lastSuccessfulRow == null) {
//即没有成功读取的行,重建扫描器,从头开始
restart(scan.getStartRow());
} else {
//在成功读取到行的基础上重建扫描器
restart(lastSuccessfulRow);
scanner.next();
}
//若此时扫描正常了,该干啥干啥,这里没有使用递归,所以扫描器最多只能出错一次
value = scanner.next();
if (value != null && value.isStale()) numStale++;
numRestarts++;
}
if (value != null && value.size() > 0) {
key.set(value.getRow());
lastSuccessfulRow = key.get();
return true;
}
updateCounters();
return false;
} catch (IOException ioe) {
//扫描器第二次出错时,没救了,直接抛出
throw ioe;
}
}
initTableReducerJob
在上篇中进行hbase中表复制中
//参数分别为:表名(String),Reducer类,任务Job
TableMapReduceUtil.initTableReducerJob("wordcount_copy", MyReducer.class, job);
同样底层添加了一些参数
public static void initTableReducerJob(
String table,
Class<? extends TableReducer> reducer,
Job job,
Class partitioner,
String quorumAddress,
String serverClass,
String serverImpl,
boolean addDependencyJars) throws IOException
与initTableMapperJob比较类似,不过多了一些与集群地址,服务器端地址有关的参数,毕竟要写到hbase上
有了阅读有关TableMapper的经验后,接下来TableReducer端的阅读轻松了不少
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
job.setOutputFormatClass(TableOutputFormat.class);
if (reducer != null) job.setReducerClass(reducer);
conf.set(TableOutputFormat.OUTPUT_TABLE, table);
conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
// 设置zk集群地址
if (quorumAddress != null) {
ZKConfig.validateClusterKey(quorumAddress);
conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
}
//提供serverClass、serverImp以供对hbase服务器交互
if (serverClass != null && serverImpl != null) {
conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
}
//K3类型为ImmutableBytesWritable,虽然K3没啥用
job.setOutputKeyClass(ImmutableBytesWritable.class);
//V3类型为父类Writable,可在Driver类中设置覆盖
job.setOutputValueClass(Writable.class);
if (partitioner == HRegionPartitioner.class) {
job.setPartitionerClass(HRegionPartitioner.class);
//Reducer个数为region数
int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
if (job.getNumReduceTasks() > regions) {
job.setNumReduceTasks(regions);
}
} else if (partitioner != null) {
job.setPartitionerClass(partitioner);
}
if (addDependencyJars) {
//若在Mapper之后改变了配置属性,重新加载一遍jar包
addDependencyJars(job);
}
initCredentials(job);
再此创建了RecordWriter实现写到hbase,与TableInputFormat不同的是,RecordWriter是TableOutputFormat的内部类
重点关心以下TableRecordWriter下的write()方法
public void write(KEY key, Mutation value)
throws IOException {
//K3只能是Put或Delete实例
if (!(value instanceof Put) && !(value instanceof Delete)) {
throw new IOException("Pass a Delete or a Put");
}
mutator.mutate(value);
}
具体的写操作是通过BufferedMutator实例的mutate()方法写出去的
public void mutate(List<? extends Mutation> ms){
//...
for (Mutation m : ms) {
if (m instanceof Put) {
validatePut((Put) m);
}
toAddSize += m.heapSize();
}
//...
}
public void validatePut(final Put put) throws IllegalArgumentException {
HTable.validatePut(put, maxKeyValueSize);
}
可见最终由HTable实例写进去的,maxKeyValueSize为10485760即10M,这个方法同put()相比不过是对行数据大小进行了限制,即一行不能超过10M
public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert");
}
if (maxKeyValueSize > 0) {
for (List<Cell> list : put.getFamilyCellMap().values()) {
for (Cell cell : list) {
if (KeyValueUtil.length(cell) > maxKeyValueSize) {
throw new IllegalArgumentException("KeyValue size too large");
}
}
}
}
}