1.编写DBWritable类
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class MYDBWritable implements DBWritable, Writable {
private int id=0;
private String txt="";
private String name="";
private String word="";
private int wordcount=0;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getWordcount() {
return wordcount;
}
public void setWordcount(int wordcount) {
this.wordcount = wordcount;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTxt() {
return txt;
}
public void setTxt(String txt) {
this.txt = txt;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(id);
dataOutput.writeUTF(txt);
dataOutput.writeUTF(name);
dataOutput.writeInt(wordcount);
dataOutput.writeUTF(word);
}
public void readFields(DataInput dataInput) throws IOException {
id=dataInput.readInt();
txt=dataInput.readUTF();
name=dataInput.readUTF();
word=dataInput.readUTF();
wordcount=dataInput.readInt();
}
//id txt name 是从需要从第一个数据库中读取的数据所以只需要读操作 而word 和wordcount是处理后的数据 只需要写入
public void write(PreparedStatement preparedStatement) throws SQLException {
// preparedStatement.setInt(1,id);
// preparedStatement.setString(2,txt);
// preparedStatement.setString(3,name);
preparedStatement.setInt(2,wordcount);
preparedStatement.setString(1,word);
}
public void readFields(ResultSet resultSet) throws SQLException {
id=resultSet.getInt(1);
txt=resultSet.getString(2);
name=resultSet.getString(3);
//wordcount=resultSet.getInt(1);
//word=resultSet.getString(2);
}
}
map的输入kv为 LongWritable, MYDBWritable
public class mysqlmap extends Mapper<LongWritable, MYDBWritable,Text, IntWritable> {
protected void map(LongWritable key, MYDBWritable value, Context context) throws IOException, InterruptedException {
String line = value.getTxt();
String arr[]=line.split(" ");
for(String s : arr) {
context.write(new Text(s), new IntWritable(1));
}
}
}
reduce需要负责往数据库写入数据 k必须为DBWritable的之类 而 v不起作用数据主要由MYDBWritable携带进行写入所以v为NULLWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class mysqlreduce extends Reducer<Text, IntWritable, MYDBWritable, NullWritable> {
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0;
for(IntWritable w : values){
count = count + w.get();
}
MYDBWritable keyout= new MYDBWritable();
keyout.setWord(key.toString());
keyout.setWordcount(count);
context.write(keyout,NullWritable.get());
}
}
在job中设置
job.setInputFormatClass(DBInputFormat.class);
String driverclass= "com.mysql.cj.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/big4?serverTimezone=UTC";
String name = "root";
String password = "liu10010";
//配置连接
DBConfiguration.configureDB(job.getConfiguration(),driverclass,url,name,password);
//设置输入方法
DBInputFormat.setInput(job,MYDBWritable.class,"select id,txt,name from words","select count(*) from words");
//设置输出方法
DBOutputFormat.setOutput(job,"stats","word","c");