后端研发Redis必知必会

2023-11-03

​ 本篇内容完全偏向于实践,也是后端开发常用到的知识,关于Redis原理与概念性的内容会另起一篇来说明。可以根据目录来选择自己所需要的内容来阅读。

文章目录

1. redis安装

​ 本章会介绍redis在ubuntu和centos上的安装, 建议使用centos上的源码安装方式,ubuntu上的安装不好确定版本,安装路径都不受自己控制,比如安装好了之后使用客户端连接出现问题解决都会麻烦一些。

1.1. ubuntu上安装redis

​ ubuntu安装redis直接就是sudo apt-get install redis-server服务是自启动的(不需要使用redis-server来进行服务启动),查看下服务是否启动了ps -aux|grep redis; 使用客户端访问redis服务端,命令直接输入redis-cli

[root@monkey ~]# ps -aux|grep redis
root       3974  0.0  0.0 140576  2356 ?        Ssl  Jan28   1:05 redis-server *:6379     
root      11381  0.0  0.0 103328   856 pts/0    S+   03:55   0:00 grep redis
[root@monkey ~]# redis-cli
127.0.0.1:6379> 

1.2. centos上安装redis

​ centos上使用源码方式来安装,按照以下命令即可。

1) wget http://download.redis.io/releases/redis-4.0.6.tar.gz
2) tar -zxvf redis-4.0.6.tar.gz
3) ln -s redis-cli redis-4.0.6 redis  
4) make 
5) make install 
  1. 下载安装包redis-4.0.6.tar.gz
  2. 解压安装包;
  3. 建立软链接指向解压后的redis目录``redis-4.0.6`,这一步可以忽略;
  4. 进行编译。redis是c语言编写的,需要有编译环境,即安装了gcc,如果没有则执行命令yum install gcc; 如果此时运行make命令报错:error: jemalloc/jemalloc.h: No such file or directory, 那么给make命令添加参数,使用命令make MALLOC=libc即可;
  5. 将redis相关运行文件放到/usr/local/bin下,那么可以再任意目录下去运行这些redis相关命令,而不需要进入安装目录;

1.3. redis的启动与停止

​ 安装完成后查看目录/usr/local/bin,里面存放了redis相关命令

-rwxr-xr-x. 1 root root  290446 1月  28 05:36 redis-benchmark #redis基准测试工具
-rwxr-xr-x. 1 root root 2977594 1月  28 05:36 redis-check-aof # redis aof 持久化文件检测修复工具
-rwxr-xr-x. 1 root root 2977594 1月  28 05:36 redis-check-rdb # redis rdb 持久化文件检测修复
-rwxr-xr-x. 1 root root  428967 1月  28 05:36 redis-cli # redis命令行客户端
lrwxrwxrwx. 1 root root      12 1月  28 05:36 redis-sentinel -> redis-server # 启动redis sentinel
-rwxr-xr-x. 1 root root 2977594 1月  28 05:36 redis-server # 启动redis

​ 首先需要启动redis服务,直接使用命令redis-server即可,该命令会使用redis安装包下的默认的redis-conf配置文件,通常我们可能会将该文复制到某个位置(比如/usr/local/redis-conf),修改该配置文件的内容,然后启动的时候指定该配置文件位置即可,如redis-server /usr/local/redis-conf来进行指定配置文件进行启动。

​ 接着我们可以启动命令行客户端来进行redis的访问,redis-cli -h ip地址 -p 端口号,默认ip为localhost,端口为6379,所以本地玩的时候直接使用redis-cli即可。

​ 需要注意的是服务端启动后,不能退出,必须使用 ctrl+c但是这样就推出了服务,这显然不够人性化,可以进行配置文件redis.conf的修改, daemonize no->daemonize yes`,使得redis进程作为守护进程运行在后台。

################################# GENERAL #####################################

# By default Redis does not run as a daemon. Use 'yes' if you need it.
# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.
daemonize yes

​ 再次运行服务, 可以看到命令执行完后又回到了命令行,服务一直在后端运行着。

[root@monkey redis]# redis-server redis.conf 
11493:C 29 Jan 04:23:40.967 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
11493:C 29 Jan 04:23:40.967 # Redis version=4.0.6, bits=64, commit=00000000, modified=0, pid=11493, just started
11493:C 29 Jan 04:23:40.967 # Configuration loaded
[root@monkey redis]# ps -aux|grep redis
root       3974  0.0  0.0 140576  2356 ?        Ssl  Jan28   1:06 redis-server *:6379   
root      11502  0.0  0.0 103328   856 pts/0    S+   04:25   0:00 grep redis

​ 如果要停止redis服务直接命令行运行redis-cli shutdown

[root@monkey redis]# ps -aux|grep redis
root       3974  0.0  0.0 140576  2356 ?        Ssl  Jan28   1:06 redis-server *:6379  
root      11502  0.0  0.0 103328   856 pts/0    S+   04:25   0:00 grep redis
[root@monkey redis]# redis-cli shutdown
[root@monkey redis]# ps -aux|grep redis
Warning: bad syntax, perhaps a bogus '-'? See /usr/share/doc/procps-3.2.8/FAQ
root      11509  0.0  0.0 103328   856 pts/0    S+   04:28   0:00 grep redis

2. redis常用命令必知必会

​ 本章会对redis的五种数据结构(类型)string, hash,list,set,zset的常用命令分别进行介绍,也会对一些也许用的不是那么频繁,但是在特定场合很实用的命令进行介绍。为了看的更直观,这里就直接展示操作内容。

​ 需要明确一点,redis中的5种数据结构是针对value的,redis中的键全为字符串类型

2.1. string

​ 字符串类型是5种数据结构中最为常用的,为其他4种数据结构的基础。可以为简单字符串hello world, 复杂字符串json, 数字(int, double),甚至是二进制等。

存储: set key value
取值: get key
删除: del key, 对于其他的几种数据结构都适用
查看所有键: keys *
示例:

192.168.25.128:6379> set str1 123
OK
192.168.25.128:6379> set str2 abc
OK
192.168.25.128:6379> set str3 xixi
OK
192.168.25.128:6379> get str1
"123"
192.168.25.128:6379> get str2
"abc"
192.168.25.128:6379> get str3
"xixi"
192.168.25.128:6379> del str1
(integer) 1
192.168.25.128:6379> keys *
 1) "str2"
 2) "str3"

增1:incr key
减1:decr key
注意, 虽然redis存储形式都是字符串,但是自增减的时候要求value必须能解析成数值类型,比如你的value是”1ad”那就不行。

示例:

192.168.25.128:6379> set str1 3
OK
192.168.25.128:6379> incr str1
(integer) 4

2.2. hash

相当于一个key对应一个map,map中还有key-value
存储:hset key field value
取值: hget key field
查看某个键对应的map里面的所有key: hkeys key
查看某个键对应的map里面的所有的value: hvals key
查看某个键的map: hgetall key

删除: 删除hash中的某个kv,hdel hone field1

示例:

192.168.25.128:6379> hset hone field1 123
(integer) 1
192.168.25.128:6379> hset hone field2 abc
(integer) 1
192.168.25.128:6379> hset hone field3 haha
(integer) 1
192.168.25.128:6379> hget hone field1
"123"
192.168.25.128:6379> hget hone field2
"abc"
192.168.25.128:6379> hget hone field3
"haha"
192.168.25.128:6379> hkeys hone
1) "field1"
2) "field2"
3) "field3"
192.168.25.128:6379> hvals hone
1) "123"
2) "abc"
3) "haha"
192.168.25.128:6379> hgetall hone
1) "field1"
2) "123"
3) "field2"
4) "abc"
5) "field3"
6) "haha"

2.3. list

存储:push,分为lpush list v1 v2 v3 v4 …(左边添加),rpush list v1 v2 v3 v4 …(右边添加)
取值:pop,分为lpop list(左边取,移除list最左边的值) , rpop list(右边取,移除list最右边的值)
查看list: lrange key 0 -1(0 -1表示查看所有)
存储,取值操作跟栈的存储,取值方法是一样的,而不是add,get,存储的值有序可以重复。用pop取值取完后该值就从list中移除了。

示例:

192.168.25.128:6379> lpush list1 1 2 3 4 5 6
(integer) 6
192.168.25.128:6379> rpush list1 a b c d e 
(integer) 11
192.168.25.128:6379> lrange list1 0 -1
 1) "6"
 2) "5"
 3) "4"
 4) "3"
 5) "2"
 6) "1"
 7) "a"
 8) "b"
 9) "c"
10) "d"
11) "e"
192.168.25.128:6379> lpop list1
"6"
192.168.25.128:6379> lrange list1 0 -1
 1) "5"
 2) "4"
 3) "3"
 4) "2"
 5) "1"
 6) "a"
 7) "b"
 8) "c"
 9) "d"
10) "e"
192.168.25.128:6379> rpop list1
"e"
192.168.25.128:6379> lrange list1 0 -1
1) "5"
2) "4"
3) "3"
4) "2"
5) "1"
6) "a"
7) "b"
8) "c"
9) "d"

2.4. set

set中的元素是无序不重复的,出现重复会覆盖
存储:sadd key v1 v2 v3 …
移除:srem key v
查看set集合: smembers key
另外还提供了差集,交集,并集操作
差集:sdiff seta setb(seta中有setb中没有的元素)
交集:sinter seta setb
并集:sunion seta setb

示例:

192.168.25.128:6379> sadd set a b a b c d
(integer) 4
192.168.25.128:6379> srem set a
(integer) 1
192.168.25.128:6379> smembers set
1) "d"
2) "b"
3) "c"
192.168.25.128:6379> sadd seta a b c d e
(integer) 5
192.168.25.128:6379> sadd setb c d e f g 
(integer) 5
192.168.25.128:6379> sdiff seta setb
1) "a"
2) "b"
192.168.25.128:6379> sdiff setb seta 
1) "g"
2) "f"
192.168.25.128:6379> sinter seta setb
1) "d"
2) "e"
3) "c"
192.168.25.128:6379> sunion seta setb
1) "a"
2) "b"
3) "d"
4) "g"
5) "f"
6) "e"
7) "c"

2.5. zset

zset或者称sortedSet,有序集合。

存储:存储的时候要求对set进行排序,需要对存储的每个value值进行打分,默认排序是分数由低到高。zadd key 分数1 v1 分数2 v2 分数3 v3…
取值:取出指定的值 zrem key value
取所有的值(不包括分数):zrange key 0 -1,降序取值用zrevrange key 0 -1
取所有的值(带分数): zrange(zrevrange) key 0 -1 withscores

192.168.25.128:6379> zadd zset1 1 a 3 b 2 c 5 d(要求给定分数,从而达到排序效果,默认升序)
(integer) 4
192.168.25.128:6379> zrange zset1 0 -1
1) "a"
2) "c"
3) "b"
4) "d"
192.168.25.128:6379> zrem zset1 a 
(integer) 1
192.168.25.128:6379> zrange zset1 0 -1
1) "c"
2) "b"
3) "d"
192.168.25.128:6379> zrevrange zset1 0 -1(按分数降序排)
1) "d"
2) "b"
3) "c"
192.168.25.128:6379> zrevrange zset1 0 -1 withscores
1) "d"
2) "5"
3) "b"
4) "3"
5) "c"
6) "2"

2.6. key命令

由于redis是内存存储数据,所以不能够存储过大的数据量,所以存储在redis中的数据,在不再需要的时候应该清除掉。比如,用户买完东西下订单,生成的订单信息存储了在redis中,但是用户一直没支付那么存储在redis中的订单信息就应该清除掉,这个时候就可以通过设置redis的过期时间来完成,一旦达到了过期时间就清除该信息。
设置key的过期时间:expired key 过期时间(秒)
查看key的有效剩余时间:ttl key
清除key的过期时间,持久化该key:persist key
-1:表示持久化
-2: 表示该key不存在

示例:

192.168.25.128:6379> expire zone 60
(integer) 1
192.168.25.128:6379> ttl zone
(integer) 55
192.168.25.128:6379> ttl zone
(integer) 51
192.168.25.128:6379> ttl zone
(integer) 48
192.168.25.128:6379> ttl zone
(integer) 37
192.168.25.128:6379> ttl zone
(integer) 16
192.168.25.128:6379> ttl zone
(integer) -2  -->(该key已经不存在)
192.168.25.128:6379> expire sone 30
(integer) 1
192.168.25.128:6379> ttl sone
(integer) 22
192.168.25.128:6379> persist sone
(integer) 1
192.168.25.128:6379> ttl sone
(integer) -1  -->(该key是持久化的)

setex和setnx

当然更多的时候是指定kv的同时就设置了过期时间,使用setex key expiretime value;

setnx类似于set,但是当key已经存在的时候,set会进行value的覆盖(即修改);而setnx则会失败,利用这个特性可以自己设计一个分布式锁,这将在后续进行介绍。

127.0.0.1:6379> SETEX b 5 m
OK
127.0.0.1:6379> ttl b
(integer) 2
127.0.0.1:6379> ttl b
(integer) 1
127.0.0.1:6379> ttl b
(integer) -2
127.0.0.1:6379> set h world
OK
127.0.0.1:6379> set h WORLD
OK
127.0.0.1:6379> get h
"WORLD"
127.0.0.1:6379> setnx h w
(integer) 0
127.0.0.1:6379> setnx tom cat
(integer) 1

3. 使用Jedis操作redis

导入Jedis依赖

<dependency>
   <groupId>redis.clients</groupId>
   <artifactId>jedis</artifactId>
   <version>2.8.0</version>
</dependency>

3.1. string类型操作

Jedis jedis = new Jedis("192.168.25.128",6379); // 创建连接
jedis.close(); //  关闭连接

// 1.存储,获取,设置过期时间,key命令; 
jedis.set("name", "张三");
String name = jedis.get("name");
Long t = jedis.ttl("name"); // 查看有效期,-1表示持久化,-2表示不存在
jedis.expire("name", 5);

//2. 测试自增自减:前提,value值能解析为数字类型; 删除
jedis.set("age", "18");
String age1 = jedis.get("age");
Long age2 = jedis.decr("age"); // 17
Long age3 = jedis.incr("age"); // 18
Long del = jedis.del("age1"); 

//3. 批量存储和获取
jedis.mset("a1","mysql","a2","oracle","a3","sqlServer","a4",
                "redis","a5","mongodb","a6","hbase");
List<String> list = jedis.mget("a1","a2","a3","a4","a5","a6");

//4. 存储值的同时设置过期时间,判断key是否存在
jedis.setex("life", 5, "享受美好");
while(jedis.exists("life")){
    System.out.println(jedis.get("life"));
    Thread.sleep(2000);
}

3.2. hash类型操作

//1.存储值
jedis.hset("student", "name", "小李");
jedis.hset("student", "class", "小学生");
jedis.hset("student", "age", "10");
jedis.hset("student", "skill", "keng");
//2.获取指定值  获取名字
String name = jedis.hget("student", "name");

//3.获取存储的map
Map<String, String> all = jedis.hgetAll("student");

//4.获取map中全部key
Set<String> keySet = jedis.hkeys("student");
for (String key : keySet) {
    System.out.println(key);
}

//5.获取map中全部values
List<String> list = jedis.hvals("student");
for (String value : list) {
    System.out.println(value);
}

//6.删除指定的值 删除map中class,name两对键值对
Long long1 = jedis.hdel("student", "class","name");
Set<String> set2 = jedis.hkeys("student");
for (String key : set2) {
    System.out.println(key);
}

//7.判断map是否存在
Boolean e = jedis.hexists("student", "class");

//8.自增自减,可以指定增加减少的数值
jedis.hincrBy("student", "age", 2);

3.3. list类型操作

//1.存储值(左边开始)。当成栈(子弹匣),先进先出,入栈
jedis.lpush("scores", "100","90","80","70","60");
//右边开始存
jedis.rpush("scores", "50","40","30","20","10");
//2.取值(左边开始),可以说是同时移除了该值,出栈
String lv = jedis.lpop("scores");
//右边开始取
String rv= jedis.rpop("scores");

//3.取所有值(只有左边开始取)0 -1表示取所有位置,位置是[start,end]
List<String> list = jedis.lrange("scores", 0, -1);

//4.插队,44插入到100后面,注:没有什么rinsert()方法
jedis.linsert("scores",BinaryClient.LIST_POSITION.AFTER, "100", "44");

List<String> list3 = jedis.lrange("scores", 0, 3);

3.4. set类型操作

//1.存储
jedis.sadd("names", "Tom","Jack","Harry","Lucy","laowang");

//2.获取set中全部记录,取出来的跟存储的顺序不一样
Set<String> members = jedis.smembers("names");

//3.移除指定数据
jedis.srem("names","Tome","Jack");

//4.判断某值是否为set中成员
Boolean tom = jedis.sismember("names", "Tome");


jedis.sadd("set1", "a","b","c","d");
jedis.sadd("set2", "b","c","d","e");
//1.差集 set1中有set2中没有的
Set<String> sdiff = jedis.sdiff("set1","set2");

//2.交集
Set<String> sinter = jedis.sinter("set1","set2");

//3.并集
Set<String> sunion = jedis.sunion("set1","set2");

3.5. zset类型操作

//1.添加
jedis.zadd("table", 1, "a");
jedis.zadd("table", 3, "b");
jedis.zadd("table", 2, "c");
jedis.zadd("table", 5, "d");
jedis.zadd("table", 4, "e");
//2.取值 0 -1表示取所有,可以自己指定开始结束位置,跟list一样
//默认根据分数由低到高排
Set<String> table = jedis.zrange("table", 0, -1);

//3.排序,由高到低排
Set<String> table2 = jedis.zrevrange("table", 0, -1);

//4.修改某个值的分数
jedis.zincrby("table", 7, "a");

4. 使用Lettuce操作redis

​ 第三章介绍的Jedis作为redis客户端实例存在一个问题,即不是线程安全的,不能多线程共享一个Jedis,当然我们也可以使用连接池。但是Lettuce则是线程安全的,而且支持异步操作(基于netty),现在spring boot中redis客户端的集成就是Lettuce。但是Lettuce的坑也不少,之前就导致了redis脏数据的问题弄了一个多月,使用Jedis还是Lettuce还是需要考虑清楚。项目使用的是Lettuce,所以这部分介绍会更详细一些。

4.1. 单机Lettuce操作5种数据结构

导入lettuce和单元测试依赖

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>5.1.8.RELEASE</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13-beta-3</version>
    <scope>compile</scope>
</dependency>

​ api和Jedis基本是一样的(或者说更redis命令几乎一样),就不再多说直接看示例。

public class RedisApiDemo {
    private RedisURI redisURI;
    private RedisClient redisClient;
    private StatefulRedisConnection<String, String> connect;
    private RedisCommands<String, String> redisCommands;

    @Before
    public void init() {
        redisURI = RedisURI.builder()
                    .withHost("192.168.19.129")
                    .withPort(6379).build();
        redisClient = RedisClient.create(redisURI);
        connect = redisClient.connect(); // 还可以传入codec
        redisCommands = connect.sync();
    }

    @After
    public void destroy() {
        connect.close();
        redisClient.shutdown();
    }

    @Test
    public void stringApi(){
        String ok = redisCommands.set("mysql", "3306"); // OK
        String ans = redisCommands.get("mysql"); // 3306
        HashMap<String, String> map = Maps.newHashMap();
        map.put("play", "9001");
        redisCommands.mset(map); // 批量插入 如果需要同时插入/获取多个kv,尽量使用该操作来减少多次操作带来的额外网络开销
        List<KeyValue<String, String>> keyValues = redisCommands.mget("mysql", "play");
        Long counter = redisCommands.incr("counter"); // 1
        Long num = redisCommands.incrby("num", 2); // 2, 步长是2,相应的还有decr,decrby
        Long del = redisCommands.del("mysql", "num"); // 2  删除的key个数

        Long exists = redisCommands.exists("mysql"); // 0
        String redis = redisCommands.setex("redis", 5, "6379"); // 过期时间5秒
    }

    @Test
    public void hashApi(){
        Boolean tom = redisCommands.hset("height", "tom", "180");
        Boolean jack = redisCommands.hset("height", "jack", "170");
        String tomAns = redisCommands.hget("height", "tom");
        Map<String, String> height = redisCommands.hgetall("height");
        Boolean hexists = redisCommands.hexists("height", "tom");
        Long hdel = redisCommands.hdel("height", "tom", "jack");
    }

    @Test
    public void listApi(){
        redisCommands.del("list");
        Long r1 = redisCommands.rpush("list", "a", "b", "c");
        Long r2 = redisCommands.lpush("list", "1", "2", "3");
        List<String> list = redisCommands.lrange("list", 0, -1); // [3, 2, 1, a, b, c]
        String lp = redisCommands.lpop("list"); // 3
    }

    @Test
    public void setApi(){
        redisCommands.del("set");
        Long addNum = redisCommands.sadd("set", "a", "b", "c", "a", "c"); // 3
        Set<String> set = redisCommands.smembers("set"); // [a, c, b]

        // 排序集合
        Long zaddNum = redisCommands.zadd("zset", 3.0, "a", 2.0, "b", 4.0, "c", 1.0, "d");
        // [ScoredValue[1.000000, d], ScoredValue[2.000000, b], ScoredValue[3.000000, a], ScoredValue[4.000000, c]]
        List<ScoredValue<String>> zset1 = redisCommands.zrangeWithScores("zset", 0, -1);

    }
}

4.2. 异步操作

connect.async()便可以活动异步的Commands, 之后的操作都会返回RedisFuture,该类继承了Future<T>,所以Future的方法在这里都可以去使用,比如阻塞操作get

public class RedisAsyncDemo {

    private static RedisURI redisURI = RedisURI.builder()
            .withHost("192.168.19.129")
            .withPort(6379)
            .build();
    private static StatefulRedisConnection<String, String> connect = RedisClient.create(redisURI).connect();
    private static RedisAsyncCommands<String, String> asyncCommands = connect.async(); // 获得异步api

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        RedisFuture<String> future = asyncCommands.set("play", "9000");
        String ans = future.get(); // OK

        RedisFuture<String> future2 = asyncCommands.get("play");
        String ans2 = future2.get(); // 9000
    }
}

4.3. RedisCodec介绍

注意到上面创建连接的代码

StatefulRedisConnection<String, String> connect = redisClient.connect();

​ 直接调用了connect方法,得到的kv类型就是(String, String),这是因为默认指定了编解码器,这个待会说。除了指定参数为string类型,Lettuce还提供了字节数组的参数,创建连接的时候传入字节数组的编解码器即可,使用如下方式创建连接。

StatefulRedisConnection<byte[], byte[]> connect = redisClient.connect(new ByteArrayCodec());

​ 其实如果没有指定编解码器的时候,默认是使用String类型的编解码器, 截取了部分源代码如下

public StatefulRedisConnection<String, String> connect() {
        return connect(newStringStringCodec());
    }
protected RedisCodec<String, String> newStringStringCodec() {
        return StringCodec.UTF8;
    }
public class StringCodec implements RedisCodec<String, String>, ToByteBufEncoder<String, String> {

    public static final StringCodec UTF8 = new StringCodec(LettuceCharsets.UTF8);

所以使用无参的connect的时候等价于connect(StringCodec.UTF8),该类继承自RedisCodec<String, String>。 接口RedisCodec主要的实现类就是StringCodec,ByteArrayCodec, 还有个实现类Utf8StringCodec基本可以使用StringCodec替代,看实现就知道了。

public class Utf8StringCodec extends StringCodec implements RedisCodec<String, String> {

    /**
     * Initialize a new instance that encodes and decodes strings using the UTF-8 charset;
     */
    public Utf8StringCodec() {
        super(LettuceCharsets.UTF8);
    }
}

​ 使用StatefulRedisConnection<String, String> connect那么操作的键和值都需要是string类型,显然如果使用了StatefulRedisConnection<byte[], byte[]> connect则键和值都需要是byte[],像下面这样。

RedisCommands<byte[], byte[]> commands = connect.sync();

String ok = commands.set("hello".getBytes(Charset.forName("UTF-8")), "world".getBytes(Charset.forName("UTF-8")));
byte[] bytes = commands.get("hello".getBytes(Charset.forName("UTF-8")));
System.out.println(new String(bytes,Charset.forName("UTF-8"))); // world

4.4. 使用Thrift进行编解码

​ 4.3介绍了kv为byte[]类型的时候,可以将string转换为byte[], 但是传递的是java对象呢?Lettuce并没有提供序列号的工具,也就是说开发者需要自己引入序列化工具。比如,json(可以理解为还是字符串),谷歌的Protobuf,Facebook的Thrift等。本节就使用Thrift作为序列化工具。

​ 关于Thrift的介绍可以参考 : Thrift快速入门

​ 导入依赖:

<dependency>
    <groupId>org.apache.thrift</groupId>
    <artifactId>libthrift</artifactId>
    <version>0.10.0</version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>20.0</version>
</dependency>

​ 这里顺便导入了谷歌的guava包,里面有很多实用的工具类。

​ 编写IDL文件data.thrift, 并使用工具生陈java文件Person.java,生成的Person类中是具有序列化和反序列方法的。

data.thrift:

namespace java thrift.generated

typedef i16 short
typedef i32 int
typedef i64 long
typedef bool boolean
typedef string String

struct Person {
    1: optional String username, 
    2: optional int age,
    3: optional boolean married
}

Person.java

public class Person implements org.apache.thrift.TBase<Person, Person._Fields>, java.io.Serializable, Cloneable, Comparable<Person> {
  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Person");
... 内容过多

最后来看如果进行序列化和反序列化,以及lettuce操作

public class RedisSerializableDemo {

    public static <T extends TBase> byte[] writeThriftObject(T thrift) throws Exception {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        TTransport transport = new TIOStreamTransport(outputStream);
        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
        thrift.write(binaryProtocol);
        byte[] bytes = outputStream.toByteArray();
        return bytes;
//        ThriftCodecManager codecManager = new ThriftCodecManager();
//        codecManager.write(thrift, outputStream, new TBinaryProtocol.Factory());
//        return outputStream.toByteArray();
    }

    public static <T extends TBase> T readThriftObject(Class<T> clazz, byte[] bytes) throws Exception {

        ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
        TTransport transport = new TIOStreamTransport(inputStream);
        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);

        T instance = clazz.newInstance();
        instance.read(binaryProtocol);
        return instance;
//        ThriftCodecManager codecManager = new ThriftCodecManager();
//        return codecManager.read(bytes, clazz, new TBinaryProtocol.Factory());

    }

    public static void main(String[] args) throws Exception {
        RedisURI uri = RedisURI.builder()
                .withHost("192.168.19.129")
                .withPort(6379)
                .build();
        RedisClient redisClient = RedisClient.create(uri);
        // 不传则默认是 new StringCodec(LettuceCharsets.UTF8), UTF8 = Charset.forName("UTF-8"); 使用常量StringCodec.UTF8即可
        // ByteArrayCodec提供了常量ByteArrayCodec.INSTANCE
        StatefulRedisConnection<byte[], byte[]> connect = redisClient.connect(new ByteArrayCodec());
        RedisCommands<byte[], byte[]> commands = connect.sync();

        Person tom = new Person().setAge(18).setUsername("tom").setMarried(true);
        byte[] buffer = writeThriftObject(tom);
        commands.del("tom".getBytes(Charset.forName("UTF-8")));
        String ok2 = commands.set("tom".getBytes(Charset.forName("UTF-8")), buffer);
        byte[] bytes2 = commands.get("tom".getBytes(Charset.forName("UTF-8")));
        Person person = readThriftObject(Person.class, bytes2); // Person(username:tom, age:18, married:true)
    }
}

4.5. 自定义RedisCodec

​ 使用ObjectInputStream,ObjectoutputStream可以将对象写出和读入,前提是该对象实现了接口Serializable, 基于此我们可以自定义编解码器来进行对象的序列化和反序列化。

​ 如何实现可以参考StringCodec,但是我们会设计的简单的多, 实现RedisCodec<K,V>接口即可,主要是4个方法,分别是对K,V的编解码。其中K使用string类型,V则是泛型(对象类型)。

public class SerializedTCodec<T> implements RedisCodec<String, T> {
    private Charset charset = Charset.forName("UTF-8");

    @Override
    public String decodeKey(ByteBuffer bytes) {
        return charset.decode(bytes).toString();
    }

    @Override
    public T decodeValue(ByteBuffer bytes) {
        try {
            byte[] array = new byte[bytes.remaining()];
            bytes.get(array);
            ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(array));
            return (T)is.readObject();
        } catch (Exception e) {
            return null;
        }
    }

    @Override
    public ByteBuffer encodeKey(String key) { // str -> bytes -> byteBuffer
        return ByteBuffer.wrap(key.getBytes(charset));
    }

    @Override
    public ByteBuffer encodeValue(T value) {
        try {
            ByteArrayOutputStream bytes = new ByteArrayOutputStream();
            ObjectOutputStream os = new ObjectOutputStream(bytes);
            os.writeObject(value);
            return ByteBuffer.wrap(bytes.toByteArray());
        } catch (IOException e) {
            return null;
        }
    }
}

​ 来测试以下是否生效了

public class TestSerializedTCodec {
    public static void main(String[] args) {
        RedisURI uri = RedisURI.builder()
                .withHost("192.168.19.129")
                .withPort(6379)
                .build();
        // 注意这里,传入自定义的SerializedTCodec, V的泛型为Person
        RedisCommands<String, Person> commands = RedisClient.create(uri).connect(new SerializedTCodec<Person>()).sync();

        // Person实现了`Serializable`
        Person tom = new Person().setAge(18).setUsername("jack").setMarried(true);
        
        String ok = commands.set("tank", tom);

        Person tank = commands.get("tank"); // Person(username:jack, age:18, married:true)

    }
}

命令行查看

127.0.0.1:6379> get tank
        "\xac\xed\x00\x05sr\x00\"com.scu.cache.redis.example.Persont\x86a\x04\xdb\xd5L\xe9\x03\x00\x04B\x00\x10__isset

4.6. RedisCluster

​ 关于RedisCluster内容不是这里需要讲的,主要说明一些Lettuce为RedisCluster专门提供的API。

以下配置在生产环境下使用是OK的。

RedisURI node1 = RedisURI.create("192.168.19.129", 6379);
        RedisURI node2 = RedisURI.create("192.168.19.128", 6379);
        RedisURI node3 = RedisURI.create("192.168.19.127", 6379);

        RedisClusterClient redisClusterClient = RedisClusterClient.create(Lists.newArrayList(node1, node2, node3));

        // 拓扑刷新相关options
        ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
                .enablePeriodicRefresh(Duration.ofMinutes(10)) // 指定阶段性刷新间隔,默认60秒
                .enableAllAdaptiveRefreshTriggers() // 使用所有触发器的拓扑自适应刷新MOVED_REDIRECT,ASK_REDIRECT,PERSISTENT_RECONNECTS,UNKNOWN_NODE
                .adaptiveRefreshTriggersTimeout(Duration.ofMinutes(10)) // 自适应拓扑刷新的超时时间
                .build();

        // cluster客户端相关options
        ClusterClientOptions clientOptions = ClusterClientOptions.builder()
                .topologyRefreshOptions(topologyRefreshOptions)
                .build();

        redisClusterClient.setOptions(clientOptions);

        StatefulRedisClusterConnection<String, String> redisClusterConnection = redisClusterClient.connect(StringCodec.UTF8);
        RedisAdvancedClusterAsyncCommands<String, String> clusterAsyncCommands = redisClusterConnection.async();

		// 像使用单机commands使用一样使用clusterAsyncCommands
        redisClusterClient.shutdown();

5. Redis经典使用场景实践

5.1. 缓存

​ Redis本身就是缓存数据库,全内存操作,单线程qps达到10W自然是做缓存的首选。比如正在做的app, 首页流量肯定是最高的,所以在首页会大量使用缓存,避免每次去数据库查询,从而减轻数据库服务器压力,提高响应速度。

​ 常规操作为: 访问首页, 去redis查询,如果有直接返回内容:如果没有,去数据库查询,将查询结果放入redis(根据需求看是否需要设置过期时间),最后返回查询结果。

​ 下面写个简单的伪代码:

public class UserService {
    ...
	public User findUserById(Long id) {
        // 查询缓存
        String redisKey = "user:info" + id;
        String redisVal = redisDao.findUserById(redisKey);
        User user = dec(redisVal); // String转User
        if(StringUtils.isBlank(user)) {
            // 缓存没有,查db
            user = mySQLDao.findUserById(id);
            // 存入缓存, 设置过期时间1h
            if(StringUtils.isBlank(user))
            	redisDao.insertUser(redisKey,enc(user),3600);
        }
        return user;
    }
}

5.2. 缓存穿透问题解决

5.1中缓存的使用方式是最常见的,我们的也是这样使用的,不过我们的app日活只有600多万,如果设计到的缓存操作都这样使用其实还是有点小问题的。首先就是可能存在缓存传统问题,即在查询数据库的时候user = mySQLDao.findUserById(id);,可没有查到信息,导致所有的请求都会去数据库查询。

​ 举个实际点的例子,过节的时候,在app一些板块的按钮,图标等可能会变的很靓,也就是后台配置了一些icon,native或者h5来给这些图标按钮装饰上去。 假设就装饰在首页,那么每次都会去获取首页活动信息,在有活动的时候使用5.1的方法是没问题的,但是活动是具有时效性的,比如活动时间过了,那么自然就从mysql中查询不到了,最终会导致每次都去数据库查询,导致出现缓存穿透现象。

​ 解决方案:mysql没有查询到数据的时候,缓存默认值。

public ActivityModel fetchActivityById(Long id, String env) {
    String redisKey = "activity:id:" + id + ":env:" + env;
    String model = redisDao.fetchActivityById(redisKey); 
    if(StringUtils.isBlank(model)) {
        ActivityModel activity = mySQLDao.fetchActivityById(id,env);
        if(activity == null) {
            redis.insertActivity(redisKey, "none", 3600); // mysql中没有数据,缓存默认值none
        }else {
            redis.insertActivity(redisKey, enc(activity), 3600);
        }
        return activity;
    }
    if(StringUtils.equals(model, "none")) {
        return null;
    }
    return dec(model); // string -> ActivityModel
}

5.3. 缓存一致性问题解决

​ 缓存和数据库内容不一致问题是必须要考虑到的,否则用户得到的只会是老数据(如果没有添加到缓存的时候没有设置过期时间,或者设置的过长那影响会更大)。解决这类问题简单点可以在添加或者更新数据库中数据的时候就修改缓存中的数据。或者麻烦点就使用消息队列,推送活动的时候发送个消息(服务1),服务2监听到后消费消息,进行缓存更新。

第一种方式: 更新数据库中数据的时候同时更新缓存

public void insertActivity(ActivityModel model) {
    mySQLDao.insertActivity(model);
    String redisKey = "activity:id:" + model.getId() + ":env:" + model.getEnv();
    redisDao.insertActivity(redisKey, enc(model), 3600)
}

第二种方式:使用消息队列

参考: 消息队列的使用

5.4. 动手实现Redis分布式锁

​ 还是考虑5.1的问题, 设置的缓存时间为1小时,如果在该时间点失效,仍然有大量请求会到mysql数据库,依旧可能导致穿透(当然是dau很高的app才考虑了,我们的项目完全不担心这个问题),那么可以使用分布式锁来解决这个问题,保证只有一个请求到数据库获取最新数据,放入缓存,其他请求仍然从刚才更新的缓存中获取数据。

5.4.1. 实现思路

​ 记得之前说过一个命令setnx,setnx类似于set,但是当key已经存在的时候,set会进行value的覆盖(即修改),而setnx则会失败。 另外,由于redis是单线程的,所以多个客户端发送setnx命令的时候,只会排队依次去执行,但第一个执行成功后,后面的肯定都会失败。基于此我们来实现自己的Redis分布式锁。

​ 我们已经知道setnx lock value可以进行加锁,即多个客户端同时执行这个命令的时候,只有一个会成功,那么认为这个客户端拿到了锁,接着执行业务代码;那么释放锁也就是delete lock,特殊地考虑到万一业务代码执行时间过长,导致锁得不到释放,这个时候我们可以设置过期时间expire lock timeout, 将获得锁和设置过期时间合为一个命令就是set lock value ex 50 nx(如果直接使用两个命令那么不具有原子性, 当然我们可以使用lua来实现原子性操作)。对应的api在Lettuce中也是提供了的。

​ 超时问题:Redis 的分布式锁不能解决超时问题,如果在加锁和释放锁之间的逻辑执行的太长,以至于超出了锁的超时限制,就会出现问题。因为这时候锁过期了,第二个线程重新持有了这把锁,但是紧接着第一个线程执行完了业务逻辑,就把锁给释放了,第三个线程就会在第二个线程逻辑执行完之间拿到了锁。

#A命令行:
192.168.247.128:6379> set m true ex 5 nx
OK

#5秒过后去B命令行执行
192.168.247.128:6379> set m true ex 50 nx
OK

#然后此时在A命令行,按照前面写的逻辑,应该del m了
192.168.247.128:6379> del m
(integer) 1

​ 发现del成功了(如果没有B命令行的操作则是0),所以释放掉了B的锁。为了避免这个问题,Redis 分布式锁不要用于较长时间的任务

​ 但是还是有解决方案:为 set 指令的 value 参数设置为一个随机数,释放锁时先匹配随机数是否一致,然后再删除 key,这是为了确保当前线程占有的锁不会被其它线程释放,除非这个锁是过期了被服务器自动释放的。 但是匹配 value 和删除 key 不是一个原子操作,Redis 也没有提供类似于delifequals这样的指令,这就需要使用 Lua 脚本来处理了,因为 Lua 脚本可以保证连续多个指令的原子性执行。所以接下来像对lua有个基本的了解

5.4.2. Lua入门

首先是安装,linux执行以下命令即可

wget -c http://www.lua.org/ftp/lua-5.3.5.tar.gz

tar -zxvf lua-5.3.5.tar.gz

yum -y install readline-devel ncurses-devel

cd lua-5.3.5

make linux

make install

​ 控制台输入Lua则进入了Lua的命令行交互界面

[root@monkey lua-5.3.5]# lua  
Lua 5.3.5  Copyright (C) 1994-2018 Lua.org, PUC-Rio

> print("hello world")
> hello world

​ 使用lua test.lua 则可以运行Lua脚本test.lua,下面对Lua语法有个基本的认识

-- boolean, numbers, strings, tables(表格)类型
-- local表示局部变量,没有local表示全局变量
local strings val = "hello world"
print(val)

-- 数组: 使用tables  lua的数组下表是从1开始而不是0
local tables arr = {"hello", 10, true}
print(arr[1])

-- 循环: for: 以end作为结尾, #nums 表示该数组长度; while同for
local tables nums = {1,2,3,4,5}
for i =1, #nums
do
    print(i)
end

-- 条件判断: if else, if后紧跟then
-- 当数组元素=="hello"的时候打印true并退出
for i = 1, #arr
do
    if arr[i] == "hello"
    then
        print(true)
        break
    else
        -- do nothing
    end
end

-- hash表,同样使用tables来获取hash相似的功能,写成k=v的形式
-- str..str2表示将两个字符串连接起来
local tables user = {name = "tom", age = 18}
print("user name:"..user["name"])

for k,v in pairs(user)
do
    print(k..":"..v)
end

-- 函数定义: function开头, end结尾
function contact(s1,s2)
    return s1..s2
end
print(contact("tom","jack"))

5.4.3. 在Redis中使用Lua

Redis中使用Lua主要有两种方法:

  • eval: eval 脚本内容 key 个数 key列表 参数列表
  • evalsha:

这里只介绍eval:

127.0.0.1:6379> eval 'return "hello"..KEYS[1]..ARGV[1]' 1 safe world
"hellosafeworld"

​ Lua可以使用redis.call函数来实现对redis的访问,比如下面的代码就是Lua使用redis.call调用了Redisget,set方法。

127.0.0.1:6379> eval 'return redis.call("set",KEYS[1],ARGV[1])' 1 hello world
OK
127.0.0.1:6379> get hello
"world"
127.0.0.1:6379> eval 'return redis.call("get",KEYS[1])' 1 hello
"world"

5.4.4. 代码实现

加锁用到的方法: String set(K key, V value, SetArgs setArgs);

释放锁用到的方法:

`<T> T eval(String script, ScriptOutputType type, K[] keys, V... values);`// 带k,v
<T> T eval(String script, ScriptOutputType type, K... keys); // 只带k

来看下SetArgs 类,用来指定是否使用setnx,以及设置过期时间

public class SetArgs implements CompositeArgument {

    private Long ex; // 过期时间,单位s
    private Long px; // 过期时间,单位ms
    private boolean nx = false; // 是否使用nx(为set方法的参数,有nx,ex),相当于setnx

看枚举ScriptOutputType,指定了脚本Script的返回类型

public enum ScriptOutputType {
    BOOLEAN, INTEGER, MULTI, STATUS, VALUE
}

做个简单测试

public static void main(String[] args) {
        SetArgs setArgs = new SetArgs().px(10000).nx(); // 使用setnx key存在则设置失败,设置过期时间10000ms ex为秒
        String s = redisCommands.set("lock", "true", setArgs); // OK或者null(key存在设置失败的情况)
        System.out.println(s); // true

        String script = "if " +
                            "redis.call('get', KEYS[1]) == ARGV[1] " +
                        "then return redis.call('del', KEYS[1]) " +
                        "else return -1 " +
                        "end";
        // 如果key=lock,value=goods  那么删除该key同时获取返回结果(del会返回1),否则返回-1
        Long eval = (Long)redisCommands.eval(script, ScriptOutputType.INTEGER, new String[]{"lock"}, new String[]{"goods"});
        System.out.println(eval); // -1
        System.out.println(redisCommands.get("lock")); // true

        String key = "hello";
        String sc = "return redis.call('get',KEYS[1])";
        Object eval1 = redisCommands.eval(sc, ScriptOutputType.VALUE, key); 
        System.out.println(eval1);
    }

了解了上面的测试代码,分布式锁的实现也就很简单了

UserService.java

public class UserService {

    private static final long timeout = 5000;
    private RedisDao redisDao = new RedisDao();

    private boolean getDistributedLock(String lock, String randId) {
        return redisDao.getDistributedLock(lock, randId, timeout);
    }

    private boolean delDistributedLock(String lock, String randId) {
        return redisDao.delDistributedLock(lock, randId);
    }

    public String getUser(String id) throws InterruptedException {
        String user = redisDao.getCacheUser(id);
        if (Strings.isNullOrEmpty(user)) {
            String randId = String.valueOf(new Random().nextInt(Integer.MAX_VALUE));
            String lockKey = "user_service:get_user:" + id;
            // 获取锁
            boolean lock = getDistributedLock(lockKey, randId);
            if(lock) {
                System.out.println("从数据库中查询数据");
                user = "jack"; // 数据库中查询
                delDistributedLock(lockKey, randId); // 删除锁
                redisDao.setCacheUser(id, user); // 放入缓存
            }else {
                Thread.sleep(10);
                user = getUser(id);
            }
        }
        return user;
    }

    // 运行程序,只会输出一条从数据库中查询数据
    public static void main(String[] args) throws InterruptedException {
        ExecutorService threadPool = null;
        try{
            threadPool = Executors.newFixedThreadPool(30);
            UserService userService = new UserService();
            CountDownLatch latch = new CountDownLatch(1);
            for (int i = 0; i < 30; i ++) {
                threadPool.execute(() ->{
                    try {
                        latch.await();
                        String ans = userService.getUser("10086");
                        System.out.println(ans);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
            Thread.sleep(100);
            latch.countDown();
        }finally {
            threadPool.shutdown();
        }
    }
}

UserDao.java

public class RedisDao {
    private static RedisURI redisURI = RedisURI.builder()
            .withHost("192.168.19.129")
            .withPort(6379)
            .build();
    private static RedisClient client = RedisClient.create(redisURI);
    private static RedisCommands<String, String> redisCommands = client.connect(StringCodec.UTF8).sync();

    public String getCacheUser(String key){
        return redisCommands.get(key);
    }
    public String setCacheUser(String key, String value){
        return redisCommands.setex(key, 10, value);
    }

    public boolean getDistributedLock(String lock, String randId, long timeout) {
        SetArgs setArgs = new SetArgs().px(timeout).nx(); // 使用setnx key存在则设置失败,设置过期时间10000ms ex为秒
        String ok = redisCommands.set(lock, randId, setArgs); // OK或者null
        System.out.println(ok);
        return !Strings.isNullOrEmpty(ok);
    }

    public boolean delDistributedLock(String lock, String randId) {
        String script = "if " +
                "redis.call('get', KEYS[1]) == ARGV[1] " +
                "then return redis.call('del', KEYS[1]) " +
                "else return -1 " +
                "end";
        Long eval = (Long)redisCommands.eval(script, ScriptOutputType.INTEGER, new String[]{lock}, new String[]{randId});
        return eval != -1;
    }
   
}

5.4.5. 官方分布式锁解决方案Redisson

​ 官方实现的分布式锁解决方案即为Redisson,以下根据官方例子写了个简单的使用。

导包:

<dependency>--><!--netty包冲突,导致服务连接redis失败-->
     <groupId>org.redisson</groupId>
     <artifactId>redisson</artifactId>
     <version>3.12.0</version>
</dependency
public class Redisson4Java {
    public static void main(String[] args) {
        // 1. Create config object
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.19.129:6379"); // 这里使用了单机,Sentinel,Cluster都OK

        // 2. Create Redisson instance
        // Sync and Async API
        RedissonClient redissonClient = Redisson.create(config);

        // 3. Get Redis based Map
        RMap<Object, Object> map = redissonClient.getMap("myLock");

        // 4. Get Redis based Lock
        RLock lock = redissonClient.getLock("myLock"); // 和jdk的Lock一样去使用

        // 4. Get Redis based ExecutorService
        RScheduledExecutorService executorService = redissonClient.getExecutorService("myExecutorService");

        try{
            lock.lock();

            // doSomething
        }finally {
            lock.unlock();
        }
    }

}

5.5. 计数器的使用

​ redis提供了自增incr和自减decr的功能,可以方便作为计数器来使用。比如配置了n条消息,根据用户打开次数来进行消息的轮播(一次一条)。

public News getNews(String userId) {
    String redisKey = "user:user_id:" + userId;
    int time = Integer.valueOf(redisDao.getAccessTime(redisKey)); // 获取访问次数
    int size = newsList.size();
    int index = time % size;
    redisDao.incrAccessTime(redisKey); //  访问次数 + 1
    return newsList.get(index);
}

​ 另外踩藏也可以使用redis去实现,可以使用两个key, upKey,downKey,点击赞的时候upKey 自增,踩的时候downKey自增。 如果只显示赞的情况,那么使用一个key即可, 赞的时候自增,踩的时候自减。

5.6. 分布式Session

​ 在服务器集群环境下,如果仍然使用传统意义上的session来存储用户信息,那么可能会出现要求用户多次登陆的情况。 所以通常单独搞个服务专门来存储验证用户登录情况,简单点我们单独搞个服务,其中可以使用redis来存储用户信息。具体的参考: 单点登陆

5.7. 限速

​ 很多应用出于安全考虑,要求用户登录的时候输入短信验证码,但是需要获取验证码的接口不被频繁访问,比如限制一分钟不能超过2次,可以考虑使用redis来做该限制

public boolean checkAccessPhone(String phone) {
	String redisKey = "access:limit:phone_num:" + phone;
    SetArgs setArgs = new SetArgs().ex(60).nx(); 
    String s = redisCommands.set(redisKey, "1", setArgs); 
    if(s == null || redis.incr(redisKey) <= 2) return true; // 通过
    return false; // 需要限速
}

5.8. 列表的使用

使用的并不是很多,但是基于列表api特性,可以当队列/栈/消息队列来使用。

lpush+rpop = 队列
lpush+lpop = 栈
lpush+brpop = MQ消息队列

注意:redis的list提供了阻塞操作,比如brpop=rpop+block,即rpop的阻塞操作。

5.9. 排行榜系统

​ 有序集合ZSET的典型使用场景就是排行榜,假设射频网站,用户上传视频后,根据获得赞数进行排名。

127.0.0.1:6379> zadd user:ranking:20200130 3 tom  #  tom上传赞3的视频
(integer) 1
127.0.0.1:6379> zadd user:ranking:20200130 5 jack #  jack上传赞5的视频
(integer) 1
127.0.0.1:6379> zadd user:ranking:20200130 8 lucy #  lucy上传赞8的视频
(integer) 1
127.0.0.1:6379> ZREVRANGE user:ranking:20200130 0 -1 # 获取排名
1) "lucy"
2) "jack"
3) "tom"

127.0.0.1:6379> ZINCRBY user:ranking:20200130 3 tom # tom的视频获得了三个赞
"6"
127.0.0.1:6379> ZREVRANGE user:ranking:20200130 0 -1 # tom视频排行到了jack前面
1) "lucy"
2) "tom"
3) "jack"

127.0.0.1:6379> zrem user:ranking:20200130 tom  # tom删除掉了视频
(integer) 1
127.0.0.1:6379> ZREVRANGE user:ranking:20200130 0 -1
1) "lucy"
2) "jack"

127.0.0.1:6379> ZSCORE user:ranking:20200130 jack # 获取jack分数
"5"
#获取排名
127.0.0.1:6379> ZRANK user:ranking:20200130 jack
(integer) 0
127.0.0.1:6379> ZRANK user:ranking:20200130 lucy
(integer) 1
# 获取用户jack信息,存储在hash中
hgetall user:info:jack

下面使用lettuce实现几个简单的功能

public class VideoRankingService {
    private static RedisURI redisURI = RedisURI.builder()
            .withHost("192.168.19.129")
            .withPort(6379)
            .build();
    private static RedisClient client = RedisClient.create(redisURI);
    private static RedisCommands<String, String> redisCommands = client.connect(StringCodec.UTF8).sync();
    private static final String REDIS_VIDEO_RANK_KEY = "user:video:ranking" ;
    class Video{
        String id;
        String userId;
        byte[] contents = null;
    }
    public boolean uploadVideo(String userId, Video video) { //上传视频
        // 视频处理略去
        return redisCommands.zadd(REDIS_VIDEO_RANK_KEY,0.0, userId) > 0;
    }

    public void addPraise(String userId) { //点赞+1
        redisCommands.zincrby(REDIS_VIDEO_RANK_KEY, 1.0, userId);
    }

    public List<String> getRank(){ // 获取排行榜,不带分数
        return redisCommands.zrevrange(REDIS_VIDEO_RANK_KEY, 0, -1);
    }

    public List<ScoredValue<String>> getRankWithScores(){ // 获取排行榜,带分数
        return redisCommands.zrevrangeWithScores(REDIS_VIDEO_RANK_KEY, 0, -1);
    }

    public Long getUserVideoRank(String userId) { // 获取排行,数字越大越靠前,0最后
        return redisCommands.zrank(REDIS_VIDEO_RANK_KEY, userId);
    }

    public Double getUserVideoScore(String userId) { // 获取得分
        return redisCommands.zscore(REDIS_VIDEO_RANK_KEY, userId);
    }
}

5.10. Redis的监听功能在订单系统中的使用

​ 背景:用户点击付款,发送请求,后台接收到请求后,生成订单信息和商品销售信息,保存到数据库表中。同时把订单信息存入到redis中,key可以设为订单编号,同时设置过期时间。到了过期时间后,redis监听器监听到了过期的key,取出该key查询数据库订单表,如果发现支付状态不是成功(用户为付款,需要使订单失效),那么修改支付状态为失败(也就是用户下单后一直不付款,到了一定时间后,那么就应该让这个订单作废。如果用户付款了,在支付宝回调的接口里面会将支付状态修改为成功)。

参考: Redis的监听功能在订单系统中的使用

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

后端研发Redis必知必会 的相关文章

  • $redis 全局变量与 ruby​​ on Rails

    我使用 redis 作为读取缓存 我创建了一个初始化程序 配置 初始化程序 redis rb redis Redis new host gt ENV REDIS HOST port gt ENV REDIS PORT 每当创建新工作人员时
  • 即使增加超时后,stackexchange.redis 也会抛出超时?

    尝试从缓存中删除时出现以下错误 Timeout performing DEL test com inst 0 mgr ExecuteSelect err never queue 0 qu 0 qs 0 qc 0 wr 0 wq 0 in 0
  • 如何在node.js中为相同的两个应用程序分离redis数据库

    我有两个相同的应用程序 一个用于演示 一个用于开发 并且我使用 redis 数据库来存储键值 我如何为这两个不同的应用程序分离 redis 数据库 我使用 Node js 作为 Redis 客户端 我用这个https github com
  • 是否可以使用带有 FUSE 文件系统的 Linux VFS 缓存?

    默认情况下 Linux VFS 缓存似乎不适用于 FUSE 文件系统 例如 read 调用似乎被系统地转发到 FUSE 文件系统 我在 FUSE 特定的远程文件系统上工作 我需要一个非常积极的缓存 我需要实现自己的页面缓存吗 或者是否可以为
  • 找不到模块“socket.io/node_modules/redis”

    当尝试做的时候 var redis require socket io node modules redis 我收到错误 找不到模块 socket io node modules redis 我不明白为什么 我正在运行 Windows 并运
  • 使用 sidekiq 处理两个独立的 Redis 实例?

    下午好 我有两个独立但相关的应用程序 他们都应该有自己的后台队列 阅读 单独的 Sidekiq 和 Redis 进程 然而 我希望偶尔能够将工作推给app2的队列来自app1 从简单的队列 推送的角度来看 如果app1没有现有的 Sidek
  • 如何在redis中创建自己的数据库?

    There are 0 to 15 databases in redis 我想使用 redis cli 创建自己的数据库 有什么命令可以实现吗 Redis 数据库并不等同于 MySQL 等 DBMS 中的数据库名称 这是一种为键创建隔离和命
  • 使用brew在MacOSx上安装Redis JSON

    如何使用brew 在 macOSx 上安装 RedisJSON 如何在不编译redis的情况下启用redis上的模块 我不想使用 docker 客户端 Redis Stack 可能是最简单的方法 它不仅仅是 RedisJSON 还包括 Re
  • Spring RedisTemplate:8次调用后方法键挂起

    我使用 Spring RedisTemplate spring data redis 1 7 1 与 Redis 进行通信 我需要通过正则表达式获取然后删除键 例如 context user1 我用的方法 RedisTemplate key
  • connect-redis - 如何保护会话对象免受竞争条件影响

    我使用 nodejs 和 connect redis 来存储会话数据 我将用户数据保存在会话中 并在会话生命周期中使用它 我注意到两个更改会话数据的请求之间可能存在竞争条件 我尝试过使用 redis lock 来锁定会话 但这对我来说有点问
  • Node.js 上通过套接字连接 Redis

    由于共享托管 目标主机上的我的 redis 服务器不在端口上运行 而是在非常特定的套接字上运行 可以通过套接字文件连接到该套接字 只有我的用户可以访问 但是 我还没有找到如何通过套接字指定连接node redis and connect r
  • 如何使用Spring Cache处理redis异常?

    我目前正在开发一个包含 Spring Data Redis 和 Spring Cache 的项目 在spring data redis中 我使用redis模板调用redis 我在 try catch 块中处理 redis 模板抛出的所有异常
  • 如何设置 Celery 以通过 ssl 与 Azure Redis 实例对话

    使用 的伟大答案 如何在microsoft azure上的django项目中配置celery redis https stackoverflow com questions 39616701 how to configure celery
  • Redis INCRBY 有限制

    我想知道是否有一种方法可以通过我的应用程序的单次往返在 Redis 中执行此操作 对于给定的键K 其可能值V是范围内的任意整数 A B 基本上 它有上限和下限 When an INCRBY or DECRBY发出命令 例如INCRBY ke
  • Spring Data Redis JedisConnectionException:流意外结束

    雷迪斯3 0 5Spring数据Redis 1 3 6绝地武士2 6 3 我们的 Web 应用程序通过 pub sub 从 Redis 接收数据 还以键 值对的形式在 Redis 上执行数据读 写 读 写发生在监听线程 独立监控线程和htt
  • 从redis中检索大数据集

    一台服务器上的应用程序查询另一台服务器上运行的 Redis 查询的结果数据集约为 250kzrangebyscore objects locations inf inf这在应用程序服务器上似乎需要 40 秒 当使用命令执行时redis cl
  • 如何将 ActionController::Live 与 Resque + Redis 一起使用(用于聊天应用程序)

    我正在尝试为我的 Rails 应用程序构建聊天功能 我在用ActionController Live Puma Resque Redis为了这 所以基本上在这种情况下 redissubscribe方法正在后台运行 使用resque 到目前为
  • Caffeine Expiry 中如何设置多个过期标准?

    我正在使用 Caffeine v2 8 5 我想创建一个具有可变到期时间的缓存 基于 值的创建 更新以及 该值的最后一次访问 读取 无论先发生什么都应该触发该条目的删除 缓存将成为三层值解析的一部分 The key is present i
  • 通过 StackExchange.Redis 连接到 Redis Servier

    我尝试使用以下方法制作一个测试项目Redis https redis io服务器 通过 Virtual Box 安装在 Linux Ubuntu 虚拟机上 Linux 机器通过 Virtual Box 的桥接适配器与本地网络连接 Virtu
  • 如何在Redis中进行持久化存储?

    关闭redis服务器后 使用set存储的值被破坏 在这里我找到了使用持久性存储的方法 有人帮助我 如何使用javascript实现这一点 我想将客户端的一些值存储在 redis 数据库中 并且必须在其他客户端中使用该值 您需要配置 Redi

随机推荐

  • Xshell连接时显示“服务器发送了一个意外的数据包。received:3,expected:20“问题的解决方法

    一 问题描述 最近在大数据服务器上安装了openbsd6 7版本 安装完后通过xshell连接 弹出一个错误对话框 提示 服务器发送了一个意外的数据包 received 3 expected 20 的错误信息 检查sshd服务是正常开启的
  • OSI七层模型及对应的数据包格式

    我接触网络协议也比较久了 不过一直都只懂个皮毛 最近比较深入研究之后终于有点豁然开朗的感觉 也因为网络上各种协议的资料太多但是都比较分散杂乱 所以在这里做点总结 给大家提供一些资料也备自己以后查阅 鉴于有些朋友没有耐心完全看完整篇文章 所以
  • 如何快速将WPS表格或者excel数据将表格转化为json

    目录 简介 一 在表格数据的前后插入列 加上双引号 分号 逗号 二 利用表格的公式合并内容 1 在表格合并的项行后面选择或插入新的一列或一行 然后在第一个空格输入 号 2 然后用鼠标点击要合并的第一行的第一个内容格 即相对应等号的那一列 在
  • 程序员必须知道机器学习与数据挖掘十大经典算法:PageRank算法篇

    由于公司架构调整和业务方向的转变 我所在的项目组即将接手一个机器学习和数据挖掘的项目 为了后续更好地开展工作 也为了能提高自己的专业技能 我决定开始学习机器和数据挖掘方面的知识 那么 问题就来了 到底应该从哪里开始学起呢 最开始我也买了一些
  • 【基于hadoop+spark的短视频大数据分析平台-哔哩哔哩】 https://b23.tv/JoObZaH

    基于hadoop spark的短视频大数据分析平台 哔哩哔哩 https b23 tv JoObZaH https b23 tv JoObZaH
  • 心理学的166个现象---之九

    161 增减效应 人们最喜欢那些对自己的喜欢显得不断增加的人 最不喜欢那些对自己的喜欢显得不断减少的人 心理学家们将人际交往中的这种现象称为 增减效应 162 植物心理学和巴克斯特效应BACKSTER 我出生在新泽西州的LAFAYETTE市
  • 51单片机---DS18B20温度采集

    51单片机 DS18B20温度采集 实验目标 51单片机读取DS18B20温度显示在液晶显示屏上 实验步骤 在Proteus里画出原理图 在Keil里用C语言编写程序 在Proteus中导入HEX文件 启动仿真 DS18B20简介 DS18
  • pwnstack-攻防世界

    pwnstack 攻防世界 text 0000000000400762 giantbranch ubuntu Desktop file pwn2 pwn2 ELF 64 bit LSB executable x86 64 version 1
  • new String(value.getBytes (“iso8859-1“),“utf-8“)

    tomcat容器默认采用了iso 8859 1的编码方法 通过本为UTF 8编码却被tomcat用iso 8859 1解码的字进行恢复 其将解码后的字通过iso 8859 1反解码成二进制数组 再将该字节数组用UTF 8解码 最终被new
  • nodes are available: 1 node(s) had taints that the pod didn‘t tolerate

    记录最近玩k8s创建pod部署服务老是出现pending 并且查看详情describe的时候出现 nodes are available 1 node s had taints that the pod didn t tolerate 经过
  • Matlab 指针函数,MATLAB / Simulink - C MEX S函数:总线信号和涉及指针的传统C结构之间的转换...

    目前我正在MATLAB Simulink R2017b 中将遗留C代码实现为C MEX S函数 我已经在Entry at MathWorks MATLAB Answers上放了一个重复的条目 遗留函数作为一般规则指针指向涉及指向其他结构的指
  • vue3使用百度地图(详)

    前情提要 提示 该博客vue采用vue3 使用百度地图通过组件vue baidu map 3x 组件官网 https map heifahaizei com doc baidu map html 下面会从头开始介绍如何使用百度地图以及常用组
  • Java学习笔记 --- 成员方法

    一 成员方法 基本介绍 在某些情况下 我们需要定义成员方法 简称方法 比如人类 除了有一些属性外 年龄 姓名 我们人类还有一些行为比如 可以说花 跑步 通过学习 还可以做算术题 这时就需要成员方法才能完成 案例演示 public class
  • 1.3 C++ 关键字升级

    实用性增强 遍历数组的方式 关键字的升级 register vs volatile 仅能修饰局部变量 不能修饰全局变量和函数 修饰的变量不能通过 取地址 C 当对register变量取地址时 会将该变量重新保存到内存中 寄存器变量 提高运行
  • HashMap和HashSet

    一 什么是HashSet HashSet实现了Set接口 它不允许集合中有重复的元素 而且集合中的元素都是无序的 在将对象储存在HashSet之前 要确保对象重写了equals 方法和hashCode 方法 这样才能比较对象是否相等 以确保
  • 服务器级的kvm维修,企业级KVM解决方案 Altusen KM0932评测

    IT168评测中心 KVM多电脑切换器是一种可以通过由单一键盘 Keyboard 显示器 Video 及鼠标 Mouse 组成的控制端集中管理多台电脑的硬件装置 是现代数据中心的基础设备之一 使用KVM可以有效地节约机房空间 并有效地提升机
  • Linux 查看CPU架构及内核版本

    涉及arch命令和 proc version文件 1 查看CPU架构 有些软件的安装需要和CPU架构相匹配 如JDK等等 所以需要确定主机的CPU架构类型 可使用命令arch查看Linux系统的CPU架构 如下 arch 2 查看内核版本
  • 国家网络安全宣传周知识竞赛活动小程序界面分享

    国家网络安全宣传周知识竞赛活动小程序界面分享
  • el-menu多级动态菜单渲染

    使用el menu时往往会遇到动态渲染使用场景 安装和引用element暂不赘叙 对其使用方法不是很熟悉的小伙伴可以去官网按照指引完成使用 先简单叙述一下渲染思路 首先我们要知道要渲染一个多级菜单的过程实际上就是对后端给我们的多层数据格式进
  • 后端研发Redis必知必会

    本篇内容完全偏向于实践 也是后端开发常用到的知识 关于Redis原理与概念性的内容会另起一篇来说明 可以根据目录来选择自己所需要的内容来阅读 文章目录 1 redis安装 1 1 ubuntu上安装redis 1 2 centos上安装re