HBaseCell类
package com.vic.flink.entity;
import lombok.Data;
import java.util.HashMap;
@Data
public class HBaseCell {
private String tableName;
private String rowKey;
private String cf;
private HashMap<String,String> kv; // column,value
public HBaseCell(String tableName,String rowKey,String cf,HashMap<String,String> kv){
this.tableName = tableName;
this.rowKey = rowKey;
this.cf = cf;
this.kv = kv;
}
}
HBaseSink类
package com.vic.flink.sink;
import com.vic.flink.entity.HBaseCell;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.HashMap;
/**
* 自定义HbaseSink类
*/
public class HbaseSink extends RichSinkFunction<HBaseCell> {
private static org.apache.hadoop.conf.Configuration configuration;
private static Connection connection = null;
private static BufferedMutator mutator;
@Override
public void open(Configuration parameters) throws Exception {
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","127.0.0.1,127.0.0.2,127.0.0.3");
configuration.set("hbase.zookeeper.property.clientPort","2181");
configuration.setInt("hbase.client.operation.timeout",30000);
configuration.setInt("hbase.client.scanner.timeout.period",200000);
connection = ConnectionFactory.createConnection(configuration);
}
@Override
public void close() throws Exception {
if (connection != null) connection.close();
}
@Override
public void invoke(HBaseCell value, Context context) throws Exception {
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(value.getTableName()));
params.writeBufferSize(1024 * 1024);
mutator = connection.getBufferedMutator(params);
Put put = new Put(Bytes.toBytes(value.getRowKey()));
HashMap<String, String> kv = value.getKv();
for(String key:kv.keySet()){
put.addColumn(Bytes.toBytes(value.getCf()), Bytes.toBytes(key), Bytes.toBytes(kv.get(key)));
}
mutator.mutate(put);
mutator.flush();
}
}