本篇内容完全偏向于实践,也是后端开发常用到的知识,关于Redis原理与概念性的内容会另起一篇来说明。可以根据目录来选择自己所需要的内容来阅读。
1. redis安装
本章会介绍redis在ubuntu和centos上的安装, 建议使用centos上的源码安装方式,ubuntu上的安装不好确定版本,安装路径都不受自己控制,比如安装好了之后使用客户端连接出现问题解决都会麻烦一些。
1.1. ubuntu上安装redis
ubuntu安装redis直接就是sudo apt-get install redis-server
服务是自启动的(不需要使用redis-server
来进行服务启动),查看下服务是否启动了ps -aux|grep redis
; 使用客户端访问redis服务端,命令直接输入redis-cli
。
[root@monkey ~]# ps -aux|grep redis
root 3974 0.0 0.0 140576 2356 ? Ssl Jan28 1:05 redis-server *:6379
root 11381 0.0 0.0 103328 856 pts/0 S+ 03:55 0:00 grep redis
[root@monkey ~]# redis-cli
127.0.0.1:6379>
1.2. centos上安装redis
centos上使用源码方式来安装,按照以下命令即可。
1) wget http://download.redis.io/releases/redis-4.0.6.tar.gz
2) tar -zxvf redis-4.0.6.tar.gz
3) ln -s redis-cli redis-4.0.6 redis
4) make
5) make install
- 下载安装包
redis-4.0.6.tar.gz
;
- 解压安装包;
- 建立软链接指向解压后的redis目录``redis-4.0.6`,这一步可以忽略;
- 进行编译。redis是c语言编写的,需要有编译环境,即安装了
gcc
,如果没有则执行命令yum install gcc
; 如果此时运行make命令报错:error: jemalloc/jemalloc.h: No such file or directory, 那么给make命令添加参数,使用命令make MALLOC=libc
即可;
- 将redis相关运行文件放到
/usr/local/bin
下,那么可以再任意目录下去运行这些redis相关命令,而不需要进入安装目录;
1.3. redis的启动与停止
安装完成后查看目录/usr/local/bin
,里面存放了redis相关命令
-rwxr-xr-x. 1 root root 290446 1月 28 05:36 redis-benchmark #redis基准测试工具
-rwxr-xr-x. 1 root root 2977594 1月 28 05:36 redis-check-aof # redis aof 持久化文件检测修复工具
-rwxr-xr-x. 1 root root 2977594 1月 28 05:36 redis-check-rdb # redis rdb 持久化文件检测修复
-rwxr-xr-x. 1 root root 428967 1月 28 05:36 redis-cli # redis命令行客户端
lrwxrwxrwx. 1 root root 12 1月 28 05:36 redis-sentinel -> redis-server # 启动redis sentinel
-rwxr-xr-x. 1 root root 2977594 1月 28 05:36 redis-server # 启动redis
首先需要启动redis服务,直接使用命令redis-server
即可,该命令会使用redis安装包下的默认的redis-conf
配置文件,通常我们可能会将该文复制到某个位置(比如/usr/local/redis-conf
),修改该配置文件的内容,然后启动的时候指定该配置文件位置即可,如redis-server /usr/local/redis-conf
来进行指定配置文件进行启动。
接着我们可以启动命令行客户端来进行redis的访问,redis-cli -h ip地址 -p 端口号
,默认ip为localhost,端口为6379,所以本地玩的时候直接使用redis-cli
即可。
需要注意的是服务端启动后,不能退出,必须使用 ctrl+c
但是这样就推出了服务,这显然不够人性化,可以进行配置文件redis.conf
的修改, daemonize no->daemonize yes`,使得redis进程作为守护进程运行在后台。
################################# GENERAL #####################################
# By default Redis does not run as a daemon. Use 'yes' if you need it.
# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.
daemonize yes
再次运行服务, 可以看到命令执行完后又回到了命令行,服务一直在后端运行着。
[root@monkey redis]# redis-server redis.conf
11493:C 29 Jan 04:23:40.967 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
11493:C 29 Jan 04:23:40.967 # Redis version=4.0.6, bits=64, commit=00000000, modified=0, pid=11493, just started
11493:C 29 Jan 04:23:40.967 # Configuration loaded
[root@monkey redis]# ps -aux|grep redis
root 3974 0.0 0.0 140576 2356 ? Ssl Jan28 1:06 redis-server *:6379
root 11502 0.0 0.0 103328 856 pts/0 S+ 04:25 0:00 grep redis
如果要停止redis服务直接命令行运行redis-cli shutdown
[root@monkey redis]# ps -aux|grep redis
root 3974 0.0 0.0 140576 2356 ? Ssl Jan28 1:06 redis-server *:6379
root 11502 0.0 0.0 103328 856 pts/0 S+ 04:25 0:00 grep redis
[root@monkey redis]# redis-cli shutdown
[root@monkey redis]# ps -aux|grep redis
Warning: bad syntax, perhaps a bogus '-'? See /usr/share/doc/procps-3.2.8/FAQ
root 11509 0.0 0.0 103328 856 pts/0 S+ 04:28 0:00 grep redis
2. redis常用命令必知必会
本章会对redis的五种数据结构(类型)string, hash,list,set,zset
的常用命令分别进行介绍,也会对一些也许用的不是那么频繁,但是在特定场合很实用的命令进行介绍。为了看的更直观,这里就直接展示操作内容。
需要明确一点,redis中的5种数据结构是针对value的,redis中的键全为字符串类型。
2.1. string
字符串类型是5种数据结构中最为常用的,为其他4种数据结构的基础。可以为简单字符串hello world
, 复杂字符串json
, 数字(int, double
),甚至是二进制等。
存储: set key value
取值: get key
删除: del key
, 对于其他的几种数据结构都适用
查看所有键: keys *
示例:
192.168.25.128:6379> set str1 123
OK
192.168.25.128:6379> set str2 abc
OK
192.168.25.128:6379> set str3 xixi
OK
192.168.25.128:6379> get str1
"123"
192.168.25.128:6379> get str2
"abc"
192.168.25.128:6379> get str3
"xixi"
192.168.25.128:6379> del str1
(integer) 1
192.168.25.128:6379> keys *
1) "str2"
2) "str3"
增1:incr key
减1:decr key
注意, 虽然redis存储形式都是字符串,但是自增减的时候要求value必须能解析成数值类型,比如你的value是”1ad”那就不行。
示例:
192.168.25.128:6379> set str1 3
OK
192.168.25.128:6379> incr str1
(integer) 4
2.2. hash
相当于一个key对应一个map,map中还有key-value
存储:hset key field value
取值: hget key field
查看某个键对应的map里面的所有key: hkeys key
查看某个键对应的map里面的所有的value: hvals key
查看某个键的map: hgetall key
删除: 删除hash中的某个kv,hdel hone field1
示例:
192.168.25.128:6379> hset hone field1 123
(integer) 1
192.168.25.128:6379> hset hone field2 abc
(integer) 1
192.168.25.128:6379> hset hone field3 haha
(integer) 1
192.168.25.128:6379> hget hone field1
"123"
192.168.25.128:6379> hget hone field2
"abc"
192.168.25.128:6379> hget hone field3
"haha"
192.168.25.128:6379> hkeys hone
1) "field1"
2) "field2"
3) "field3"
192.168.25.128:6379> hvals hone
1) "123"
2) "abc"
3) "haha"
192.168.25.128:6379> hgetall hone
1) "field1"
2) "123"
3) "field2"
4) "abc"
5) "field3"
6) "haha"
2.3. list
存储:push,分为lpush list v1 v2 v3 v4 …
(左边添加),rpush list v1 v2 v3 v4 …
(右边添加)
取值:pop,分为lpop list
(左边取,移除list最左边的值) , rpop list
(右边取,移除list最右边的值)
查看list: lrange key 0 -1
(0 -1表示查看所有)
存储,取值操作跟栈的存储,取值方法是一样的,而不是add,get,存储的值有序可以重复。用pop取值取完后该值就从list中移除了。
示例:
192.168.25.128:6379> lpush list1 1 2 3 4 5 6
(integer) 6
192.168.25.128:6379> rpush list1 a b c d e
(integer) 11
192.168.25.128:6379> lrange list1 0 -1
1) "6"
2) "5"
3) "4"
4) "3"
5) "2"
6) "1"
7) "a"
8) "b"
9) "c"
10) "d"
11) "e"
192.168.25.128:6379> lpop list1
"6"
192.168.25.128:6379> lrange list1 0 -1
1) "5"
2) "4"
3) "3"
4) "2"
5) "1"
6) "a"
7) "b"
8) "c"
9) "d"
10) "e"
192.168.25.128:6379> rpop list1
"e"
192.168.25.128:6379> lrange list1 0 -1
1) "5"
2) "4"
3) "3"
4) "2"
5) "1"
6) "a"
7) "b"
8) "c"
9) "d"
2.4. set
set中的元素是无序不重复的,出现重复会覆盖
存储:sadd key v1 v2 v3 …
移除:srem key v
查看set集合: smembers key
另外还提供了差集,交集,并集操作
差集:sdiff seta setb(seta中有setb中没有的元素)
交集:sinter seta setb
并集:sunion seta setb
示例:
192.168.25.128:6379> sadd set a b a b c d
(integer) 4
192.168.25.128:6379> srem set a
(integer) 1
192.168.25.128:6379> smembers set
1) "d"
2) "b"
3) "c"
192.168.25.128:6379> sadd seta a b c d e
(integer) 5
192.168.25.128:6379> sadd setb c d e f g
(integer) 5
192.168.25.128:6379> sdiff seta setb
1) "a"
2) "b"
192.168.25.128:6379> sdiff setb seta
1) "g"
2) "f"
192.168.25.128:6379> sinter seta setb
1) "d"
2) "e"
3) "c"
192.168.25.128:6379> sunion seta setb
1) "a"
2) "b"
3) "d"
4) "g"
5) "f"
6) "e"
7) "c"
2.5. zset
zset或者称sortedSet,有序集合。
存储:存储的时候要求对set进行排序,需要对存储的每个value值进行打分,默认排序是分数由低到高。zadd key 分数1 v1 分数2 v2 分数3 v3…
取值:取出指定的值 zrem key value
取所有的值(不包括分数):zrange key 0 -1,降序取值用zrevrange key 0 -1
取所有的值(带分数): zrange(zrevrange) key 0 -1 withscores
192.168.25.128:6379> zadd zset1 1 a 3 b 2 c 5 d(要求给定分数,从而达到排序效果,默认升序)
(integer) 4
192.168.25.128:6379> zrange zset1 0 -1
1) "a"
2) "c"
3) "b"
4) "d"
192.168.25.128:6379> zrem zset1 a
(integer) 1
192.168.25.128:6379> zrange zset1 0 -1
1) "c"
2) "b"
3) "d"
192.168.25.128:6379> zrevrange zset1 0 -1(按分数降序排)
1) "d"
2) "b"
3) "c"
192.168.25.128:6379> zrevrange zset1 0 -1 withscores
1) "d"
2) "5"
3) "b"
4) "3"
5) "c"
6) "2"
2.6. key命令
由于redis是内存存储数据,所以不能够存储过大的数据量,所以存储在redis中的数据,在不再需要的时候应该清除掉。比如,用户买完东西下订单,生成的订单信息存储了在redis中,但是用户一直没支付那么存储在redis中的订单信息就应该清除掉,这个时候就可以通过设置redis的过期时间来完成,一旦达到了过期时间就清除该信息。
设置key的过期时间:expired key 过期时间(秒)
查看key的有效剩余时间:ttl key
清除key的过期时间,持久化该key:persist key
-1:表示持久化
-2: 表示该key不存在
示例:
192.168.25.128:6379> expire zone 60
(integer) 1
192.168.25.128:6379> ttl zone
(integer) 55
192.168.25.128:6379> ttl zone
(integer) 51
192.168.25.128:6379> ttl zone
(integer) 48
192.168.25.128:6379> ttl zone
(integer) 37
192.168.25.128:6379> ttl zone
(integer) 16
192.168.25.128:6379> ttl zone
(integer) -2 -->(该key已经不存在)
192.168.25.128:6379> expire sone 30
(integer) 1
192.168.25.128:6379> ttl sone
(integer) 22
192.168.25.128:6379> persist sone
(integer) 1
192.168.25.128:6379> ttl sone
(integer) -1 -->(该key是持久化的)
setex和setnx
当然更多的时候是指定kv的同时就设置了过期时间,使用setex key expiretime value
;
setnx类似于set,但是当key已经存在的时候,set会进行value的覆盖(即修改);而setnx则会失败,利用这个特性可以自己设计一个分布式锁,这将在后续进行介绍。
127.0.0.1:6379> SETEX b 5 m
OK
127.0.0.1:6379> ttl b
(integer) 2
127.0.0.1:6379> ttl b
(integer) 1
127.0.0.1:6379> ttl b
(integer) -2
127.0.0.1:6379> set h world
OK
127.0.0.1:6379> set h WORLD
OK
127.0.0.1:6379> get h
"WORLD"
127.0.0.1:6379> setnx h w
(integer) 0
127.0.0.1:6379> setnx tom cat
(integer) 1
3. 使用Jedis操作redis
导入Jedis依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.0</version>
</dependency>
3.1. string类型操作
Jedis jedis = new Jedis("192.168.25.128",6379); // 创建连接
jedis.close(); // 关闭连接
// 1.存储,获取,设置过期时间,key命令;
jedis.set("name", "张三");
String name = jedis.get("name");
Long t = jedis.ttl("name"); // 查看有效期,-1表示持久化,-2表示不存在
jedis.expire("name", 5);
//2. 测试自增自减:前提,value值能解析为数字类型; 删除
jedis.set("age", "18");
String age1 = jedis.get("age");
Long age2 = jedis.decr("age"); // 17
Long age3 = jedis.incr("age"); // 18
Long del = jedis.del("age1");
//3. 批量存储和获取
jedis.mset("a1","mysql","a2","oracle","a3","sqlServer","a4",
"redis","a5","mongodb","a6","hbase");
List<String> list = jedis.mget("a1","a2","a3","a4","a5","a6");
//4. 存储值的同时设置过期时间,判断key是否存在
jedis.setex("life", 5, "享受美好");
while(jedis.exists("life")){
System.out.println(jedis.get("life"));
Thread.sleep(2000);
}
3.2. hash类型操作
//1.存储值
jedis.hset("student", "name", "小李");
jedis.hset("student", "class", "小学生");
jedis.hset("student", "age", "10");
jedis.hset("student", "skill", "keng");
//2.获取指定值 获取名字
String name = jedis.hget("student", "name");
//3.获取存储的map
Map<String, String> all = jedis.hgetAll("student");
//4.获取map中全部key
Set<String> keySet = jedis.hkeys("student");
for (String key : keySet) {
System.out.println(key);
}
//5.获取map中全部values
List<String> list = jedis.hvals("student");
for (String value : list) {
System.out.println(value);
}
//6.删除指定的值 删除map中class,name两对键值对
Long long1 = jedis.hdel("student", "class","name");
Set<String> set2 = jedis.hkeys("student");
for (String key : set2) {
System.out.println(key);
}
//7.判断map是否存在
Boolean e = jedis.hexists("student", "class");
//8.自增自减,可以指定增加减少的数值
jedis.hincrBy("student", "age", 2);
3.3. list类型操作
//1.存储值(左边开始)。当成栈(子弹匣),先进先出,入栈
jedis.lpush("scores", "100","90","80","70","60");
//右边开始存
jedis.rpush("scores", "50","40","30","20","10");
//2.取值(左边开始),可以说是同时移除了该值,出栈
String lv = jedis.lpop("scores");
//右边开始取
String rv= jedis.rpop("scores");
//3.取所有值(只有左边开始取)0 -1表示取所有位置,位置是[start,end]
List<String> list = jedis.lrange("scores", 0, -1);
//4.插队,44插入到100后面,注:没有什么rinsert()方法
jedis.linsert("scores",BinaryClient.LIST_POSITION.AFTER, "100", "44");
List<String> list3 = jedis.lrange("scores", 0, 3);
3.4. set类型操作
//1.存储
jedis.sadd("names", "Tom","Jack","Harry","Lucy","laowang");
//2.获取set中全部记录,取出来的跟存储的顺序不一样
Set<String> members = jedis.smembers("names");
//3.移除指定数据
jedis.srem("names","Tome","Jack");
//4.判断某值是否为set中成员
Boolean tom = jedis.sismember("names", "Tome");
jedis.sadd("set1", "a","b","c","d");
jedis.sadd("set2", "b","c","d","e");
//1.差集 set1中有set2中没有的
Set<String> sdiff = jedis.sdiff("set1","set2");
//2.交集
Set<String> sinter = jedis.sinter("set1","set2");
//3.并集
Set<String> sunion = jedis.sunion("set1","set2");
3.5. zset类型操作
//1.添加
jedis.zadd("table", 1, "a");
jedis.zadd("table", 3, "b");
jedis.zadd("table", 2, "c");
jedis.zadd("table", 5, "d");
jedis.zadd("table", 4, "e");
//2.取值 0 -1表示取所有,可以自己指定开始结束位置,跟list一样
//默认根据分数由低到高排
Set<String> table = jedis.zrange("table", 0, -1);
//3.排序,由高到低排
Set<String> table2 = jedis.zrevrange("table", 0, -1);
//4.修改某个值的分数
jedis.zincrby("table", 7, "a");
4. 使用Lettuce操作redis
第三章介绍的Jedis作为redis客户端实例存在一个问题,即不是线程安全的,不能多线程共享一个Jedis,当然我们也可以使用连接池。但是Lettuce则是线程安全的,而且支持异步操作(基于netty),现在spring boot中redis客户端的集成就是Lettuce。但是Lettuce的坑也不少,之前就导致了redis脏数据的问题弄了一个多月,使用Jedis还是Lettuce还是需要考虑清楚。项目使用的是Lettuce,所以这部分介绍会更详细一些。
4.1. 单机Lettuce操作5种数据结构
导入lettuce和单元测试依赖
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13-beta-3</version>
<scope>compile</scope>
</dependency>
api和Jedis基本是一样的(或者说更redis命令几乎一样),就不再多说直接看示例。
public class RedisApiDemo {
private RedisURI redisURI;
private RedisClient redisClient;
private StatefulRedisConnection<String, String> connect;
private RedisCommands<String, String> redisCommands;
@Before
public void init() {
redisURI = RedisURI.builder()
.withHost("192.168.19.129")
.withPort(6379).build();
redisClient = RedisClient.create(redisURI);
connect = redisClient.connect(); // 还可以传入codec
redisCommands = connect.sync();
}
@After
public void destroy() {
connect.close();
redisClient.shutdown();
}
@Test
public void stringApi(){
String ok = redisCommands.set("mysql", "3306"); // OK
String ans = redisCommands.get("mysql"); // 3306
HashMap<String, String> map = Maps.newHashMap();
map.put("play", "9001");
redisCommands.mset(map); // 批量插入 如果需要同时插入/获取多个kv,尽量使用该操作来减少多次操作带来的额外网络开销
List<KeyValue<String, String>> keyValues = redisCommands.mget("mysql", "play");
Long counter = redisCommands.incr("counter"); // 1
Long num = redisCommands.incrby("num", 2); // 2, 步长是2,相应的还有decr,decrby
Long del = redisCommands.del("mysql", "num"); // 2 删除的key个数
Long exists = redisCommands.exists("mysql"); // 0
String redis = redisCommands.setex("redis", 5, "6379"); // 过期时间5秒
}
@Test
public void hashApi(){
Boolean tom = redisCommands.hset("height", "tom", "180");
Boolean jack = redisCommands.hset("height", "jack", "170");
String tomAns = redisCommands.hget("height", "tom");
Map<String, String> height = redisCommands.hgetall("height");
Boolean hexists = redisCommands.hexists("height", "tom");
Long hdel = redisCommands.hdel("height", "tom", "jack");
}
@Test
public void listApi(){
redisCommands.del("list");
Long r1 = redisCommands.rpush("list", "a", "b", "c");
Long r2 = redisCommands.lpush("list", "1", "2", "3");
List<String> list = redisCommands.lrange("list", 0, -1); // [3, 2, 1, a, b, c]
String lp = redisCommands.lpop("list"); // 3
}
@Test
public void setApi(){
redisCommands.del("set");
Long addNum = redisCommands.sadd("set", "a", "b", "c", "a", "c"); // 3
Set<String> set = redisCommands.smembers("set"); // [a, c, b]
// 排序集合
Long zaddNum = redisCommands.zadd("zset", 3.0, "a", 2.0, "b", 4.0, "c", 1.0, "d");
// [ScoredValue[1.000000, d], ScoredValue[2.000000, b], ScoredValue[3.000000, a], ScoredValue[4.000000, c]]
List<ScoredValue<String>> zset1 = redisCommands.zrangeWithScores("zset", 0, -1);
}
}
4.2. 异步操作
connect.async()
便可以活动异步的Commands
, 之后的操作都会返回RedisFuture
,该类继承了Future<T>
,所以Future
的方法在这里都可以去使用,比如阻塞操作get
。
public class RedisAsyncDemo {
private static RedisURI redisURI = RedisURI.builder()
.withHost("192.168.19.129")
.withPort(6379)
.build();
private static StatefulRedisConnection<String, String> connect = RedisClient.create(redisURI).connect();
private static RedisAsyncCommands<String, String> asyncCommands = connect.async(); // 获得异步api
public static void main(String[] args) throws ExecutionException, InterruptedException {
RedisFuture<String> future = asyncCommands.set("play", "9000");
String ans = future.get(); // OK
RedisFuture<String> future2 = asyncCommands.get("play");
String ans2 = future2.get(); // 9000
}
}
4.3. RedisCodec介绍
注意到上面创建连接的代码
StatefulRedisConnection<String, String> connect = redisClient.connect();
直接调用了connect
方法,得到的kv类型就是(String, String)
,这是因为默认指定了编解码器,这个待会说。除了指定参数为string类型,Lettuce还提供了字节数组的参数,创建连接的时候传入字节数组的编解码器即可,使用如下方式创建连接。
StatefulRedisConnection<byte[], byte[]> connect = redisClient.connect(new ByteArrayCodec());
其实如果没有指定编解码器的时候,默认是使用String类型的编解码器, 截取了部分源代码如下
public StatefulRedisConnection<String, String> connect() {
return connect(newStringStringCodec());
}
protected RedisCodec<String, String> newStringStringCodec() {
return StringCodec.UTF8;
}
public class StringCodec implements RedisCodec<String, String>, ToByteBufEncoder<String, String> {
public static final StringCodec UTF8 = new StringCodec(LettuceCharsets.UTF8);
所以使用无参的connect的时候等价于connect(StringCodec.UTF8)
,该类继承自RedisCodec<String, String>
。 接口RedisCodec
主要的实现类就是StringCodec,ByteArrayCodec
, 还有个实现类Utf8StringCodec
基本可以使用StringCodec
替代,看实现就知道了。
public class Utf8StringCodec extends StringCodec implements RedisCodec<String, String> {
/**
* Initialize a new instance that encodes and decodes strings using the UTF-8 charset;
*/
public Utf8StringCodec() {
super(LettuceCharsets.UTF8);
}
}
使用StatefulRedisConnection<String, String> connect
那么操作的键和值都需要是string类型,显然如果使用了StatefulRedisConnection<byte[], byte[]> connect
则键和值都需要是byte[],像下面这样。
RedisCommands<byte[], byte[]> commands = connect.sync();
String ok = commands.set("hello".getBytes(Charset.forName("UTF-8")), "world".getBytes(Charset.forName("UTF-8")));
byte[] bytes = commands.get("hello".getBytes(Charset.forName("UTF-8")));
System.out.println(new String(bytes,Charset.forName("UTF-8"))); // world
4.4. 使用Thrift进行编解码
4.3介绍了kv为byte[]类型的时候,可以将string转换为byte[], 但是传递的是java对象呢?Lettuce并没有提供序列号的工具,也就是说开发者需要自己引入序列化工具。比如,json(可以理解为还是字符串),谷歌的Protobuf
,Facebook的Thrift
等。本节就使用Thrift作为序列化工具。
关于Thrift
的介绍可以参考 : Thrift快速入门
导入依赖:
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.10.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
这里顺便导入了谷歌的guava
包,里面有很多实用的工具类。
编写IDL文件data.thrift
, 并使用工具生陈java文件Person.java,生成的Person
类中是具有序列化和反序列方法的。
data.thrift:
namespace java thrift.generated
typedef i16 short
typedef i32 int
typedef i64 long
typedef bool boolean
typedef string String
struct Person {
1: optional String username,
2: optional int age,
3: optional boolean married
}
Person.java
public class Person implements org.apache.thrift.TBase<Person, Person._Fields>, java.io.Serializable, Cloneable, Comparable<Person> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Person");
... 内容过多
最后来看如果进行序列化和反序列化,以及lettuce操作
public class RedisSerializableDemo {
public static <T extends TBase> byte[] writeThriftObject(T thrift) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
TTransport transport = new TIOStreamTransport(outputStream);
TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
thrift.write(binaryProtocol);
byte[] bytes = outputStream.toByteArray();
return bytes;
// ThriftCodecManager codecManager = new ThriftCodecManager();
// codecManager.write(thrift, outputStream, new TBinaryProtocol.Factory());
// return outputStream.toByteArray();
}
public static <T extends TBase> T readThriftObject(Class<T> clazz, byte[] bytes) throws Exception {
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
TTransport transport = new TIOStreamTransport(inputStream);
TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
T instance = clazz.newInstance();
instance.read(binaryProtocol);
return instance;
// ThriftCodecManager codecManager = new ThriftCodecManager();
// return codecManager.read(bytes, clazz, new TBinaryProtocol.Factory());
}
public static void main(String[] args) throws Exception {
RedisURI uri = RedisURI.builder()
.withHost("192.168.19.129")
.withPort(6379)
.build();
RedisClient redisClient = RedisClient.create(uri);
// 不传则默认是 new StringCodec(LettuceCharsets.UTF8), UTF8 = Charset.forName("UTF-8"); 使用常量StringCodec.UTF8即可
// ByteArrayCodec提供了常量ByteArrayCodec.INSTANCE
StatefulRedisConnection<byte[], byte[]> connect = redisClient.connect(new ByteArrayCodec());
RedisCommands<byte[], byte[]> commands = connect.sync();
Person tom = new Person().setAge(18).setUsername("tom").setMarried(true);
byte[] buffer = writeThriftObject(tom);
commands.del("tom".getBytes(Charset.forName("UTF-8")));
String ok2 = commands.set("tom".getBytes(Charset.forName("UTF-8")), buffer);
byte[] bytes2 = commands.get("tom".getBytes(Charset.forName("UTF-8")));
Person person = readThriftObject(Person.class, bytes2); // Person(username:tom, age:18, married:true)
}
}
4.5. 自定义RedisCodec
使用ObjectInputStream,ObjectoutputStream
可以将对象写出和读入,前提是该对象实现了接口Serializable
, 基于此我们可以自定义编解码器来进行对象的序列化和反序列化。
如何实现可以参考StringCodec
,但是我们会设计的简单的多, 实现RedisCodec<K,V>
接口即可,主要是4个方法,分别是对K,V的编解码。其中K使用string类型,V则是泛型(对象类型)。
public class SerializedTCodec<T> implements RedisCodec<String, T> {
private Charset charset = Charset.forName("UTF-8");
@Override
public String decodeKey(ByteBuffer bytes) {
return charset.decode(bytes).toString();
}
@Override
public T decodeValue(ByteBuffer bytes) {
try {
byte[] array = new byte[bytes.remaining()];
bytes.get(array);
ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(array));
return (T)is.readObject();
} catch (Exception e) {
return null;
}
}
@Override
public ByteBuffer encodeKey(String key) { // str -> bytes -> byteBuffer
return ByteBuffer.wrap(key.getBytes(charset));
}
@Override
public ByteBuffer encodeValue(T value) {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
ObjectOutputStream os = new ObjectOutputStream(bytes);
os.writeObject(value);
return ByteBuffer.wrap(bytes.toByteArray());
} catch (IOException e) {
return null;
}
}
}
来测试以下是否生效了
public class TestSerializedTCodec {
public static void main(String[] args) {
RedisURI uri = RedisURI.builder()
.withHost("192.168.19.129")
.withPort(6379)
.build();
// 注意这里,传入自定义的SerializedTCodec, V的泛型为Person
RedisCommands<String, Person> commands = RedisClient.create(uri).connect(new SerializedTCodec<Person>()).sync();
// Person实现了`Serializable`
Person tom = new Person().setAge(18).setUsername("jack").setMarried(true);
String ok = commands.set("tank", tom);
Person tank = commands.get("tank"); // Person(username:jack, age:18, married:true)
}
}
命令行查看
127.0.0.1:6379> get tank
"\xac\xed\x00\x05sr\x00\"com.scu.cache.redis.example.Persont\x86a\x04\xdb\xd5L\xe9\x03\x00\x04B\x00\x10__isset
4.6. RedisCluster
关于RedisCluster内容不是这里需要讲的,主要说明一些Lettuce为RedisCluster专门提供的API。
以下配置在生产环境下使用是OK的。
RedisURI node1 = RedisURI.create("192.168.19.129", 6379);
RedisURI node2 = RedisURI.create("192.168.19.128", 6379);
RedisURI node3 = RedisURI.create("192.168.19.127", 6379);
RedisClusterClient redisClusterClient = RedisClusterClient.create(Lists.newArrayList(node1, node2, node3));
// 拓扑刷新相关options
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(Duration.ofMinutes(10)) // 指定阶段性刷新间隔,默认60秒
.enableAllAdaptiveRefreshTriggers() // 使用所有触发器的拓扑自适应刷新MOVED_REDIRECT,ASK_REDIRECT,PERSISTENT_RECONNECTS,UNKNOWN_NODE
.adaptiveRefreshTriggersTimeout(Duration.ofMinutes(10)) // 自适应拓扑刷新的超时时间
.build();
// cluster客户端相关options
ClusterClientOptions clientOptions = ClusterClientOptions.builder()
.topologyRefreshOptions(topologyRefreshOptions)
.build();
redisClusterClient.setOptions(clientOptions);
StatefulRedisClusterConnection<String, String> redisClusterConnection = redisClusterClient.connect(StringCodec.UTF8);
RedisAdvancedClusterAsyncCommands<String, String> clusterAsyncCommands = redisClusterConnection.async();
// 像使用单机commands使用一样使用clusterAsyncCommands
redisClusterClient.shutdown();
5. Redis经典使用场景实践
5.1. 缓存
Redis本身就是缓存数据库,全内存操作,单线程qps达到10W自然是做缓存的首选。比如正在做的app, 首页流量肯定是最高的,所以在首页会大量使用缓存,避免每次去数据库查询,从而减轻数据库服务器压力,提高响应速度。
常规操作为: 访问首页, 去redis查询,如果有直接返回内容:如果没有,去数据库查询,将查询结果放入redis(根据需求看是否需要设置过期时间),最后返回查询结果。
下面写个简单的伪代码:
public class UserService {
...
public User findUserById(Long id) {
// 查询缓存
String redisKey = "user:info" + id;
String redisVal = redisDao.findUserById(redisKey);
User user = dec(redisVal); // String转User
if(StringUtils.isBlank(user)) {
// 缓存没有,查db
user = mySQLDao.findUserById(id);
// 存入缓存, 设置过期时间1h
if(StringUtils.isBlank(user))
redisDao.insertUser(redisKey,enc(user),3600);
}
return user;
}
}
5.2. 缓存穿透问题解决
5.1中缓存的使用方式是最常见的,我们的也是这样使用的,不过我们的app日活只有600多万,如果设计到的缓存操作都这样使用其实还是有点小问题的。首先就是可能存在缓存传统问题,即在查询数据库的时候user = mySQLDao.findUserById(id);
,可没有查到信息,导致所有的请求都会去数据库查询。
举个实际点的例子,过节的时候,在app一些板块的按钮,图标等可能会变的很靓,也就是后台配置了一些icon,native或者h5来给这些图标按钮装饰上去。 假设就装饰在首页,那么每次都会去获取首页活动信息,在有活动的时候使用5.1的方法是没问题的,但是活动是具有时效性的,比如活动时间过了,那么自然就从mysql中查询不到了,最终会导致每次都去数据库查询,导致出现缓存穿透现象。
解决方案:mysql没有查询到数据的时候,缓存默认值。
public ActivityModel fetchActivityById(Long id, String env) {
String redisKey = "activity:id:" + id + ":env:" + env;
String model = redisDao.fetchActivityById(redisKey);
if(StringUtils.isBlank(model)) {
ActivityModel activity = mySQLDao.fetchActivityById(id,env);
if(activity == null) {
redis.insertActivity(redisKey, "none", 3600); // mysql中没有数据,缓存默认值none
}else {
redis.insertActivity(redisKey, enc(activity), 3600);
}
return activity;
}
if(StringUtils.equals(model, "none")) {
return null;
}
return dec(model); // string -> ActivityModel
}
5.3. 缓存一致性问题解决
缓存和数据库内容不一致问题是必须要考虑到的,否则用户得到的只会是老数据(如果没有添加到缓存的时候没有设置过期时间,或者设置的过长那影响会更大)。解决这类问题简单点可以在添加或者更新数据库中数据的时候就修改缓存中的数据。或者麻烦点就使用消息队列,推送活动的时候发送个消息(服务1),服务2监听到后消费消息,进行缓存更新。
第一种方式: 更新数据库中数据的时候同时更新缓存
public void insertActivity(ActivityModel model) {
mySQLDao.insertActivity(model);
String redisKey = "activity:id:" + model.getId() + ":env:" + model.getEnv();
redisDao.insertActivity(redisKey, enc(model), 3600)
}
第二种方式:使用消息队列
参考: 消息队列的使用
5.4. 动手实现Redis分布式锁
还是考虑5.1的问题, 设置的缓存时间为1小时,如果在该时间点失效,仍然有大量请求会到mysql数据库,依旧可能导致穿透(当然是dau很高的app才考虑了,我们的项目完全不担心这个问题),那么可以使用分布式锁来解决这个问题,保证只有一个请求到数据库获取最新数据,放入缓存,其他请求仍然从刚才更新的缓存中获取数据。
5.4.1. 实现思路
记得之前说过一个命令setnx
,setnx
类似于set,但是当key已经存在的时候,set会进行value的覆盖(即修改),而setnx则会失败。 另外,由于redis是单线程的,所以多个客户端发送setnx
命令的时候,只会排队依次去执行,但第一个执行成功后,后面的肯定都会失败。基于此我们来实现自己的Redis分布式锁。
我们已经知道setnx lock value
可以进行加锁,即多个客户端同时执行这个命令的时候,只有一个会成功,那么认为这个客户端拿到了锁,接着执行业务代码;那么释放锁也就是delete lock
,特殊地考虑到万一业务代码执行时间过长,导致锁得不到释放,这个时候我们可以设置过期时间expire lock timeout
, 将获得锁和设置过期时间合为一个命令就是set lock value ex 50 nx
(如果直接使用两个命令那么不具有原子性, 当然我们可以使用lua来实现原子性操作)。对应的api在Lettuce
中也是提供了的。
超时问题:Redis 的分布式锁不能解决超时问题,如果在加锁和释放锁之间的逻辑执行的太长,以至于超出了锁的超时限制,就会出现问题。因为这时候锁过期了,第二个线程重新持有了这把锁,但是紧接着第一个线程执行完了业务逻辑,就把锁给释放了,第三个线程就会在第二个线程逻辑执行完之间拿到了锁。
#A命令行:
192.168.247.128:6379> set m true ex 5 nx
OK
#5秒过后去B命令行执行
192.168.247.128:6379> set m true ex 50 nx
OK
#然后此时在A命令行,按照前面写的逻辑,应该del m了
192.168.247.128:6379> del m
(integer) 1
发现del成功了(如果没有B命令行的操作则是0),所以释放掉了B的锁。为了避免这个问题,Redis 分布式锁不要用于较长时间的任务
但是还是有解决方案:为 set 指令的 value 参数设置为一个随机数,释放锁时先匹配随机数是否一致,然后再删除 key,这是为了确保当前线程占有的锁不会被其它线程释放,除非这个锁是过期了被服务器自动释放的。 但是匹配 value 和删除 key 不是一个原子操作,Redis 也没有提供类似于delifequals这样的指令,这就需要使用 Lua 脚本来处理了,因为 Lua 脚本可以保证连续多个指令的原子性执行。所以接下来像对lua有个基本的了解
5.4.2. Lua入门
首先是安装,linux执行以下命令即可
wget -c http://www.lua.org/ftp/lua-5.3.5.tar.gz
tar -zxvf lua-5.3.5.tar.gz
yum -y install readline-devel ncurses-devel
cd lua-5.3.5
make linux
make install
控制台输入Lua
则进入了Lua
的命令行交互界面
[root@monkey lua-5.3.5]# lua
Lua 5.3.5 Copyright (C) 1994-2018 Lua.org, PUC-Rio
> print("hello world")
> hello world
使用lua test.lua
则可以运行Lua
脚本test.lua
,下面对Lua
语法有个基本的认识
-- boolean, numbers, strings, tables(表格)类型
-- local表示局部变量,没有local表示全局变量
local strings val = "hello world"
print(val)
-- 数组: 使用tables lua的数组下表是从1开始而不是0
local tables arr = {"hello", 10, true}
print(arr[1])
-- 循环: for: 以end作为结尾, #nums 表示该数组长度; while同for
local tables nums = {1,2,3,4,5}
for i =1, #nums
do
print(i)
end
-- 条件判断: if else, if后紧跟then
-- 当数组元素=="hello"的时候打印true并退出
for i = 1, #arr
do
if arr[i] == "hello"
then
print(true)
break
else
-- do nothing
end
end
-- hash表,同样使用tables来获取hash相似的功能,写成k=v的形式
-- str..str2表示将两个字符串连接起来
local tables user = {name = "tom", age = 18}
print("user name:"..user["name"])
for k,v in pairs(user)
do
print(k..":"..v)
end
-- 函数定义: function开头, end结尾
function contact(s1,s2)
return s1..s2
end
print(contact("tom","jack"))
5.4.3. 在Redis中使用Lua
Redis中使用Lua主要有两种方法:
- eval: eval 脚本内容 key 个数 key列表 参数列表
- evalsha:
这里只介绍eval:
127.0.0.1:6379> eval 'return "hello"..KEYS[1]..ARGV[1]' 1 safe world
"hellosafeworld"
Lua可以使用redis.call
函数来实现对redis的访问,比如下面的代码就是Lua使用redis.call
调用了Redis
的get,set
方法。
127.0.0.1:6379> eval 'return redis.call("set",KEYS[1],ARGV[1])' 1 hello world
OK
127.0.0.1:6379> get hello
"world"
127.0.0.1:6379> eval 'return redis.call("get",KEYS[1])' 1 hello
"world"
5.4.4. 代码实现
加锁用到的方法: String set(K key, V value, SetArgs setArgs);
释放锁用到的方法:
`<T> T eval(String script, ScriptOutputType type, K[] keys, V... values);`// 带k,v
<T> T eval(String script, ScriptOutputType type, K... keys); // 只带k
来看下SetArgs 类,用来指定是否使用setnx,以及设置过期时间
public class SetArgs implements CompositeArgument {
private Long ex; // 过期时间,单位s
private Long px; // 过期时间,单位ms
private boolean nx = false; // 是否使用nx(为set方法的参数,有nx,ex),相当于setnx
看枚举ScriptOutputType
,指定了脚本Script的返回类型
public enum ScriptOutputType {
BOOLEAN, INTEGER, MULTI, STATUS, VALUE
}
做个简单测试
public static void main(String[] args) {
SetArgs setArgs = new SetArgs().px(10000).nx(); // 使用setnx key存在则设置失败,设置过期时间10000ms ex为秒
String s = redisCommands.set("lock", "true", setArgs); // OK或者null(key存在设置失败的情况)
System.out.println(s); // true
String script = "if " +
"redis.call('get', KEYS[1]) == ARGV[1] " +
"then return redis.call('del', KEYS[1]) " +
"else return -1 " +
"end";
// 如果key=lock,value=goods 那么删除该key同时获取返回结果(del会返回1),否则返回-1
Long eval = (Long)redisCommands.eval(script, ScriptOutputType.INTEGER, new String[]{"lock"}, new String[]{"goods"});
System.out.println(eval); // -1
System.out.println(redisCommands.get("lock")); // true
String key = "hello";
String sc = "return redis.call('get',KEYS[1])";
Object eval1 = redisCommands.eval(sc, ScriptOutputType.VALUE, key);
System.out.println(eval1);
}
了解了上面的测试代码,分布式锁的实现也就很简单了
UserService.java
public class UserService {
private static final long timeout = 5000;
private RedisDao redisDao = new RedisDao();
private boolean getDistributedLock(String lock, String randId) {
return redisDao.getDistributedLock(lock, randId, timeout);
}
private boolean delDistributedLock(String lock, String randId) {
return redisDao.delDistributedLock(lock, randId);
}
public String getUser(String id) throws InterruptedException {
String user = redisDao.getCacheUser(id);
if (Strings.isNullOrEmpty(user)) {
String randId = String.valueOf(new Random().nextInt(Integer.MAX_VALUE));
String lockKey = "user_service:get_user:" + id;
// 获取锁
boolean lock = getDistributedLock(lockKey, randId);
if(lock) {
System.out.println("从数据库中查询数据");
user = "jack"; // 数据库中查询
delDistributedLock(lockKey, randId); // 删除锁
redisDao.setCacheUser(id, user); // 放入缓存
}else {
Thread.sleep(10);
user = getUser(id);
}
}
return user;
}
// 运行程序,只会输出一条从数据库中查询数据
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = null;
try{
threadPool = Executors.newFixedThreadPool(30);
UserService userService = new UserService();
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 30; i ++) {
threadPool.execute(() ->{
try {
latch.await();
String ans = userService.getUser("10086");
System.out.println(ans);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Thread.sleep(100);
latch.countDown();
}finally {
threadPool.shutdown();
}
}
}
UserDao.java
public class RedisDao {
private static RedisURI redisURI = RedisURI.builder()
.withHost("192.168.19.129")
.withPort(6379)
.build();
private static RedisClient client = RedisClient.create(redisURI);
private static RedisCommands<String, String> redisCommands = client.connect(StringCodec.UTF8).sync();
public String getCacheUser(String key){
return redisCommands.get(key);
}
public String setCacheUser(String key, String value){
return redisCommands.setex(key, 10, value);
}
public boolean getDistributedLock(String lock, String randId, long timeout) {
SetArgs setArgs = new SetArgs().px(timeout).nx(); // 使用setnx key存在则设置失败,设置过期时间10000ms ex为秒
String ok = redisCommands.set(lock, randId, setArgs); // OK或者null
System.out.println(ok);
return !Strings.isNullOrEmpty(ok);
}
public boolean delDistributedLock(String lock, String randId) {
String script = "if " +
"redis.call('get', KEYS[1]) == ARGV[1] " +
"then return redis.call('del', KEYS[1]) " +
"else return -1 " +
"end";
Long eval = (Long)redisCommands.eval(script, ScriptOutputType.INTEGER, new String[]{lock}, new String[]{randId});
return eval != -1;
}
}
5.4.5. 官方分布式锁解决方案Redisson
官方实现的分布式锁解决方案即为Redisson,以下根据官方例子写了个简单的使用。
导包:
<dependency>--><!--netty包冲突,导致服务连接redis失败-->
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency
public class Redisson4Java {
public static void main(String[] args) {
// 1. Create config object
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.19.129:6379"); // 这里使用了单机,Sentinel,Cluster都OK
// 2. Create Redisson instance
// Sync and Async API
RedissonClient redissonClient = Redisson.create(config);
// 3. Get Redis based Map
RMap<Object, Object> map = redissonClient.getMap("myLock");
// 4. Get Redis based Lock
RLock lock = redissonClient.getLock("myLock"); // 和jdk的Lock一样去使用
// 4. Get Redis based ExecutorService
RScheduledExecutorService executorService = redissonClient.getExecutorService("myExecutorService");
try{
lock.lock();
// doSomething
}finally {
lock.unlock();
}
}
}
5.5. 计数器的使用
redis提供了自增incr和自减decr的功能,可以方便作为计数器来使用。比如配置了n条消息,根据用户打开次数来进行消息的轮播(一次一条)。
public News getNews(String userId) {
String redisKey = "user:user_id:" + userId;
int time = Integer.valueOf(redisDao.getAccessTime(redisKey)); // 获取访问次数
int size = newsList.size();
int index = time % size;
redisDao.incrAccessTime(redisKey); // 访问次数 + 1
return newsList.get(index);
}
另外踩藏也可以使用redis去实现,可以使用两个key, upKey,downKey
,点击赞的时候upKey
自增,踩的时候downKey
自增。 如果只显示赞的情况,那么使用一个key即可, 赞的时候自增,踩的时候自减。
5.6. 分布式Session
在服务器集群环境下,如果仍然使用传统意义上的session来存储用户信息,那么可能会出现要求用户多次登陆的情况。 所以通常单独搞个服务专门来存储验证用户登录情况,简单点我们单独搞个服务,其中可以使用redis来存储用户信息。具体的参考: 单点登陆。
5.7. 限速
很多应用出于安全考虑,要求用户登录的时候输入短信验证码,但是需要获取验证码的接口不被频繁访问,比如限制一分钟不能超过2次,可以考虑使用redis来做该限制
public boolean checkAccessPhone(String phone) {
String redisKey = "access:limit:phone_num:" + phone;
SetArgs setArgs = new SetArgs().ex(60).nx();
String s = redisCommands.set(redisKey, "1", setArgs);
if(s == null || redis.incr(redisKey) <= 2) return true; // 通过
return false; // 需要限速
}
5.8. 列表的使用
使用的并不是很多,但是基于列表api特性,可以当队列/栈/消息队列来使用。
lpush+rpop = 队列
lpush+lpop = 栈
lpush+brpop = MQ消息队列
注意:redis的list提供了阻塞操作,比如brpop=rpop+block,即rpop的阻塞操作。
5.9. 排行榜系统
有序集合ZSET的典型使用场景就是排行榜,假设射频网站,用户上传视频后,根据获得赞数进行排名。
127.0.0.1:6379> zadd user:ranking:20200130 3 tom # tom上传赞3的视频
(integer) 1
127.0.0.1:6379> zadd user:ranking:20200130 5 jack # jack上传赞5的视频
(integer) 1
127.0.0.1:6379> zadd user:ranking:20200130 8 lucy # lucy上传赞8的视频
(integer) 1
127.0.0.1:6379> ZREVRANGE user:ranking:20200130 0 -1 # 获取排名
1) "lucy"
2) "jack"
3) "tom"
127.0.0.1:6379> ZINCRBY user:ranking:20200130 3 tom # tom的视频获得了三个赞
"6"
127.0.0.1:6379> ZREVRANGE user:ranking:20200130 0 -1 # tom视频排行到了jack前面
1) "lucy"
2) "tom"
3) "jack"
127.0.0.1:6379> zrem user:ranking:20200130 tom # tom删除掉了视频
(integer) 1
127.0.0.1:6379> ZREVRANGE user:ranking:20200130 0 -1
1) "lucy"
2) "jack"
127.0.0.1:6379> ZSCORE user:ranking:20200130 jack # 获取jack分数
"5"
#获取排名
127.0.0.1:6379> ZRANK user:ranking:20200130 jack
(integer) 0
127.0.0.1:6379> ZRANK user:ranking:20200130 lucy
(integer) 1
# 获取用户jack信息,存储在hash中
hgetall user:info:jack
下面使用lettuce实现几个简单的功能
public class VideoRankingService {
private static RedisURI redisURI = RedisURI.builder()
.withHost("192.168.19.129")
.withPort(6379)
.build();
private static RedisClient client = RedisClient.create(redisURI);
private static RedisCommands<String, String> redisCommands = client.connect(StringCodec.UTF8).sync();
private static final String REDIS_VIDEO_RANK_KEY = "user:video:ranking" ;
class Video{
String id;
String userId;
byte[] contents = null;
}
public boolean uploadVideo(String userId, Video video) { //上传视频
// 视频处理略去
return redisCommands.zadd(REDIS_VIDEO_RANK_KEY,0.0, userId) > 0;
}
public void addPraise(String userId) { //点赞+1
redisCommands.zincrby(REDIS_VIDEO_RANK_KEY, 1.0, userId);
}
public List<String> getRank(){ // 获取排行榜,不带分数
return redisCommands.zrevrange(REDIS_VIDEO_RANK_KEY, 0, -1);
}
public List<ScoredValue<String>> getRankWithScores(){ // 获取排行榜,带分数
return redisCommands.zrevrangeWithScores(REDIS_VIDEO_RANK_KEY, 0, -1);
}
public Long getUserVideoRank(String userId) { // 获取排行,数字越大越靠前,0最后
return redisCommands.zrank(REDIS_VIDEO_RANK_KEY, userId);
}
public Double getUserVideoScore(String userId) { // 获取得分
return redisCommands.zscore(REDIS_VIDEO_RANK_KEY, userId);
}
}
5.10. Redis的监听功能在订单系统中的使用
背景:用户点击付款,发送请求,后台接收到请求后,生成订单信息和商品销售信息,保存到数据库表中。同时把订单信息存入到redis中,key可以设为订单编号,同时设置过期时间。到了过期时间后,redis监听器监听到了过期的key,取出该key查询数据库订单表,如果发现支付状态不是成功(用户为付款,需要使订单失效),那么修改支付状态为失败(也就是用户下单后一直不付款,到了一定时间后,那么就应该让这个订单作废。如果用户付款了,在支付宝回调的接口里面会将支付状态修改为成功)。
参考: Redis的监听功能在订单系统中的使用