kettle向redis同步数据
网上kettle向redis同步数据的完整案例不是很多,本文将以案例形式对整个过程进行详细讲解。
一、案例描述
本文以最简单的案例描述,大家在应用过程中可根据实际情况进行调整。现有学生表和成绩表。如何将表中的数据按照如下要求同步至redis?
1、将学生表的数据同步至redis,学生表的id为redis的key
2、按照如下结构同步每个学生的信息以及所关联的成绩。redis的key使用学生id_学生姓名形式。
{
"学生详细信息":[],
"成绩信息":[],
"学生id":"",
"学生姓名":""
}
二、数据准备
1、student表
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for student
-- ----------------------------
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`id` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`xh` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of student
-- ----------------------------
INSERT INTO `student` VALUES ('1', '张三', '001');
INSERT INTO `student` VALUES ('2', '李四', '002');
INSERT INTO `student` VALUES ('3', '王五', '003');
SET FOREIGN_KEY_CHECKS = 1;
2、grade表
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for grade
-- ----------------------------
DROP TABLE IF EXISTS `grade`;
CREATE TABLE `grade` (
`id` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`classname` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`score` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`stuid` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of grade
-- ----------------------------
INSERT INTO `grade` VALUES ('1', '语文', '90', '1');
INSERT INTO `grade` VALUES ('2', '数学', '100', '1');
INSERT INTO `grade` VALUES ('3', '英语', '80', '1');
INSERT INTO `grade` VALUES ('4', '语文', '85', '2');
INSERT INTO `grade` VALUES ('5', '数学', '100', '2');
INSERT INTO `grade` VALUES ('6', '英语', '95', '2');
INSERT INTO `grade` VALUES ('7', '语文', '98', '3');
INSERT INTO `grade` VALUES ('8', '数学', '87', '3');
INSERT INTO `grade` VALUES ('9', '英语', '65', '3');
SET FOREIGN_KEY_CHECKS = 1;
三、工具类准备
1、启动redis
2、使用redis管理工具进行连接
链接:https://pan.baidu.com/s/1a7a2GBV9SGThr0efx8EFSA
提取码:e8wj
3、下载kettle中连接redis的相关jar包(jedis-3.1.0.jar和fastjson-1.2.47.jar)
链接:https://pan.baidu.com/s/1q1jymAEo9mI7GDHNrGuRyw
提取码:4kjw
下载后将jar文件放在kettle根目录下的lib文件中。切记要重启,否则不生效
四、 案例实现
1、案例一:将学生表的数据同步至redis,学生表的id为redis的key
(1)kettle任务
作业:
为了后续方便,将redis相关设置提出。
设置抽取变量
(2)转换内容
转换:
表输入:
JSON output:
Java 代码-向redis写数据
可进行设置redis连接、向redis写数据以及设置过期时间等。
java代码如下:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;
private Jedis jedis=null;
private JedisPool pool=null;
Pipeline pipe = null;
int cache_size=10000; // 批量提交大小
int cur_size=0; // 当前数据缓存量
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
String redis_ip = getVariable("REDIS_IP", "");
String redis_port = getVariable("REDIS_PORT", "");
String redis_password = getVariable("REDIS_PWD", "");
int redis_db = Integer.parseInt(getVariable("REDIS_DB", ""));
cache_size = Integer.valueOf(getVariable("redis.cache_size", "10000"));
logBasic(redis_ip+":"+redis_port);
logBasic("redis_password:"+redis_password);
// 连接池方式
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(8);
config.setMaxTotal(18);
pool = new JedisPool(config, redis_ip, Integer.valueOf(redis_port), 2000, redis_password);
jedis = pool.getResource();
jedis.select(redis_db);// 切换数据库
pipe = jedis.pipelined(); // 创建pipeline 对象
logBasic("Server is running: " + jedis.ping());
}
Object[] r = getRow();
if (r == null) {
setOutputDone();
pipe.sync();
jedis.close();
pool.close();
return false;
}
r = createOutputRow(r, data.outputRowMeta.size());
String key = get(Fields.In, "id").getString(r);
String value = get(Fields.In, "JsonData").getString(r);
logDebug(key +"***"+ "\t" + value);
// 写入redis
pipe.set(key, value);
//设置redis过期时间:单位为秒
pipe.expire(key, 3600);
cur_size++;
if (cur_size % cache_size == 0 && cur_size > 0) {// 当达到缓存最大值时提交
pipe.sync(); // 同步
cur_size=0; // 复位
}
putRow(data.outputRowMeta, r);
return true;
}
(3)redis效果
使用json解析网址查看的结果如下。
https://www.json.cn/
{
"data":[
{
"xh":"001",
"name":"张三",
"id":"1"
}
]
}
2、案例二:按照如下结构同步每个学生的信息以及所关联的成绩。redis的key使用学生id_学生姓名形式。
(1)作业
基本与案例一类似,不在赘述
(2)获取关键参数
(3)同步业务数据到redis
多余的环节不在赘述,只展示关键点:
Java 代码-向redis写数据源码:
如果不对组装结果进行处理,输出的内容如下,比较杂乱。因此我在此处写了工具类对这个结果进行了处理组装。
未处理结果:
{
"data":[
{
"tagjson":"{\"student\":[{\"xh\":\"001\",\"name\":\"张三\",\"id\":\"1\"}]}"
},
{
"tagjson":"{\"grade\":[{\"score\":\"90\",\"classname\":\"语文\",\"stuid\":\"1\",\"id\":\"1\"},{\"score\":\"100\",\"classname\":\"数学\",\"stuid\":\"1\",\"id\":\"2\"},{\"score\":\"80\",\"classname\":\"英语\",\"stuid\":\"1\",\"id\":\"3\"}]}"
}
]
}
处理过程:Java 代码-向redis写数据源码
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;
import unit.TestJsonUntil;
private Jedis jedis=null;
private JedisPool pool=null;
Pipeline pipe = null;
int cache_size=10000; // 批量提交大小
int cur_size=0; // 当前数据缓存量
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
String redis_ip = getVariable("REDIS_IP", "");
String redis_port = getVariable("REDIS_PORT", "");
String redis_password = getVariable("REDIS_PWD", "");
int redis_db = Integer.parseInt(getVariable("REDIS_DB", ""));
cache_size = Integer.valueOf(getVariable("redis.cache_size", "10000"));
logBasic(redis_ip+":"+redis_port);
logBasic("redis_password:"+redis_password);
// 连接池方式
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(8);
config.setMaxTotal(18);
pool = new JedisPool(config, redis_ip, Integer.valueOf(redis_port), 2000, redis_password);
jedis = pool.getResource();
jedis.select(redis_db);// 切换数据库
pipe = jedis.pipelined(); // 创建pipeline 对象
logBasic("Server is running: " + jedis.ping());
}
Object[] r = getRow();
if (r == null) {
setOutputDone();
pipe.sync();
jedis.close();
pool.close();
return false;
}
r = createOutputRow(r, data.outputRowMeta.size());
String jsondata = get(Fields.In, "JsonData").getString(r);
String value = TestJsonUntil.formatTestJson(jsondata);
String key = TestJsonUntil.getID(value);
logDebug(key +"***"+ "\t" + value);
// 写入redis
pipe.set(key, value);
//设置redis过期时间:单位为秒
pipe.expire(key, 3600);
cur_size++;
if (cur_size % cache_size == 0 && cur_size > 0) {// 当达到缓存最大值时提交
pipe.sync(); // 同步
cur_size=0; // 复位
}
putRow(data.outputRowMeta, r);
return true;
}
所用到的工具类,此处是我写的java组件,打包后放入kettle根目录下的lib中即可。记得放入后需要重启。
package unit;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
/**
* Created with IntelliJ IDEA.
*
* @Author: qzc
* @Date: 2023/03/13/11:51
* @Description:
*/
public class TestJsonUntil {
public static String getID(String str){
String retstr="";
JSONObject json = JSONObject.parseObject(str);
if(null !=json){
retstr= (json.getString("stuid")+"-"+json.getString("name"));
}
return retstr;
}
public static String formatTestJson(String str){
JSONObject jsonObject = new JSONObject();
jsonObject.put("stuid",null);
jsonObject.put("name",null);
jsonObject.put("student",null);
jsonObject.put("grade",null);
try {
JSONObject json = JSONObject.parseObject(str);
if(null != json){
JSONArray jsonArray =JSONArray.parseArray(json.get("data").toString()) ;
if(null != jsonArray && jsonArray.size()>0){
//定义集合
JSONArray stuArr = new JSONArray();
JSONArray grdArr = new JSONArray();
for(int i =0;i<jsonArray.size();i++){
JSONObject jsondata = jsonArray.getJSONObject(i);
if(null != jsondata && null != jsondata.getJSONObject("tagjson") ){
JSONObject tagjson = JSONObject.parseObject(jsondata.getJSONObject("tagjson").toJSONString());
if(null != tagjson ){
JSONArray curjsonArr = new JSONArray();
String currtype=null;
if(null != tagjson.get("student")){
currtype="student";
curjsonArr = JSONArray.parseArray(tagjson.get("student").toString());
}else if(null != tagjson.get("grade")){
currtype="grade";
curjsonArr = JSONArray.parseArray(tagjson.get("grade").toString());
}
if(null != curjsonArr && curjsonArr.size()>0){
for(int j =0;j<curjsonArr.size();j++){
JSONObject tag = curjsonArr.getJSONObject(j);
if(null == jsonObject.get("name") && null != currtype && "student".equals(currtype)){
jsonObject.put("name",tag.get("name"));
}
if(null == jsonObject.get("stuid") && null != currtype && "student".equals(currtype)){
jsonObject.put("stuid",tag.get("id"));
}
if(null != currtype && "grade".equals(currtype)){
grdArr.add(tag);
}else if(null != currtype && "student".equals(currtype)){
stuArr.add(tag);
}
}
}
}
}
}
//将处理后的数据放入jsonobject
jsonObject.put("grade",grdArr);
jsonObject.put("student",stuArr);
}
}
}catch (Exception e){
e.printStackTrace();
}
return jsonObject.toJSONString();
}
//
// public static void main(String[] args) {
// String str ="{\"data\":[{\"tagjson\":\"{\\\"student\\\":[{\\\"xh\\\":\\\"001\\\",\\\"name\\\":\\\"张三\\\",\\\"id\\\":\\\"1\\\"}]}\"},{\"tagjson\":\"{\\\"grade\\\":[{\\\"score\\\":\\\"90\\\",\\\"classname\\\":\\\"语文\\\",\\\"stuid\\\":\\\"1\\\",\\\"id\\\":\\\"1\\\"},{\\\"score\\\":\\\"100\\\",\\\"classname\\\":\\\"数学\\\",\\\"stuid\\\":\\\"1\\\",\\\"id\\\":\\\"2\\\"},{\\\"score\\\":\\\"80\\\",\\\"classname\\\":\\\"英语\\\",\\\"stuid\\\":\\\"1\\\",\\\"id\\\":\\\"3\\\"}]}\"}]}";
// String strs = TestJsonUntil.formatTestJson(str);
// String str2 = TestJsonUntil.getID(strs);
// System.out.println(str2);
//
// }
}
处理完成后输出的结果如下:
这样输出显然结构很清晰。
{
"stuid":"1",
"student":[
{
"xh":"001",
"name":"张三",
"id":"1"
}
],
"grade":[
{
"score":"90",
"classname":"语文",
"stuid":"1",
"id":"1"
},
{
"score":"100",
"classname":"数学",
"stuid":"1",
"id":"2"
},
{
"score":"80",
"classname":"英语",
"stuid":"1",
"id":"3"
}
],
"name":"张三"
}
五、总结
总体来说整个过程不难,大家可以根据自己的实际需要进行调整。在正常的生成过程中业务往往及其复杂,单纯的靠kettle组件处理一些复杂的数据可能比较麻烦,这时候我觉得可以借助java组件处理业务逻辑,这样会使整个作业相对来说更加简洁方便。