一、配置hadoop读取hbase的支持包
在hadoop-env.sh 添加export HADOOP_CLASSPATH=$HBASE_HOME/lib/*
没有配置HBASE_HOME的去/etc/profile配置环境变量,路径根据你自己的自行修改
二、把需要读取的文件上传到hdfs的/user目录下
hdfs dfs -put /usr/local/software/hadoop/hadoop-3.3.0/stumer_in_out_details.txt /user/
三、编写代码实现
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
/**
* 使用MapReduce读取linux 本地文件写入hbase
*/
public class MapreduceFileToHbase {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String tableName="identify_rmb_records";
//默认hdfs上的文件,需要提前把文件放上去
Path inputDir=new Path("/user/stumer_in_out_details.txt");
Configuration configuration=new Configuration();
Job job=Job.getInstance(configuration,"mapreduce to hbase");
job.setJarByClass(MapreduceFileToHbase.class);
FileInputFormat.setInputPaths(job,inputDir);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(ImportMapper.class);
TableMapReduceUtil.initTableReducerJob(
tableName,null,job
);
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0:1);
}
public static class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private Put put= null;
private ImmutableBytesWritable rowkey=new ImmutableBytesWritable();
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm");
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values=value.toString().split(",",-1);
rowkey.set(Bytes.toBytes(values[0]));
put=new Put(values[0].getBytes(StandardCharsets.UTF_8));
long timeStamp=0;
try {
timeStamp=format.parse(values[2]).getTime();
} catch (ParseException e) {
e.printStackTrace();
}
put.addColumn("op_www".getBytes(StandardCharsets.UTF_8),"exists".getBytes(StandardCharsets.UTF_8),timeStamp,values[1].getBytes(StandardCharsets.UTF_8));
put.addColumn("op_www".getBytes(StandardCharsets.UTF_8),"Bank".getBytes(StandardCharsets.UTF_8),timeStamp,values[3].getBytes(StandardCharsets.UTF_8));
put.addColumn("op_www".getBytes(StandardCharsets.UTF_8),"uId".getBytes(StandardCharsets.UTF_8),timeStamp,values[4].getBytes(StandardCharsets.UTF_8));
context.write(rowkey,put);
}
}
}
四、使用maven打包项目,上传到linux,执行程序
把jar包文件复制出来传到linu目录下
在hadoop的bin目录下执行 hadoop jar jar包名字 类名
./hadoop jar ../wordcount-1.0-SNAPSHOT.jar com.example.usehbase.MapreduceFileToHbase