Redis的学习
1、Nosql概述
为什么要用Nosql
1、单击MySQL的年代
90年代,一个基本的网站访问量一般不会太大,单个数据库完全足够!
那个时候,更多的去使用静态网页Html ~服务器根本没有太大的压力!
思考一下,这种情况下:整个网站的瓶颈是什么?
1、数据量如果太大、一个机器放不下了!
2、数据的索引( B+ Tree ) , 一个机器内存也放不下
3、访问量(读写混合) ,一个服务器承受不了~
只要你开始出现以上的三种情况之一, 那么你就必须要晋级!
2、Memcached(缓存)+MySQL+垂直拆分(读写分离)
网站80%的情况都是在读,每次都要去查询数据库的话就十分的麻烦!所以说我们希望减轻数据的压力,我们可以使用缓存来保证效率!
发展过程:优化数据结构和索引–>文件缓存(IO ) --> Memcached (当时最热门的技术! )
3、分库分表+水平拆分+MySQL集群
本质:数据库(读,写)
早些年MyISAM:表锁,十分影响效率!高并发下就会出现严重的锁问题
转战Innodb :行锁
慢慢的就开始使用分库分表来解决写的压力! MySQL 在哪个年代推出了表分区!
4、如今最近的年代
2010-2020十年之间,世界已经发生了翻天覆地的变化; ( 定位,也是一种数据, 音乐,热榜! )
MySQL等关系型数据库就不够用了!数据量很多,变化很快~ !
MySQL有的使用它来存储一些比较大的文件,博客,图片!数据库表很大,效率就低了!如果有一数据库来专门处理这种数据
MySQL压力就变得十分小(研究如何处理这些问题! ) 大数据的IO压力下,表几乎没法更大!
为什么要用NoSQL
用户的个人信息,社交网络,地理位置。用户自己产生的数据,用户日志等等爆发式增长!
这时候我们就需要使用NoSQL数据库的, Nosql可以很好的处理以上的情况!
什么是NoSQL
NoSQL
NoSQL = Not Only SQL(不仅仅是SQL)
泛指非关系型数据库的,随着web2.0互联网的诞生!传统的关系型数据库很难对付web2.0时代!尤其是超大规模的高并发的社区!暴露出来很多难以克服的问题, NoSQL在当今大数据环境下发展的十分迅速, Redis是发展最快的,而且是我们当下必须要掌握的一个技术!
很多的数据类型用户的个人信息,社交网络,地理位置。这些数据类型的存储不需要一个固定的格式!不需要多余的操作就可以横向扩展的! map<String,Object> 使用键值对来控制!
NoSQL特点
1、方便扩展(数据之间没有关系,很好扩展!)
2、大数据量高性能(Redis一秒写8万次,读取11万次,NoSQL的缓存记录级,是一种细粒度的缓存,性能会比较高!)
3、数据类型是多样型的! (不需要事先设计数据库!随取随用!如果是数据量十分大的表,很多人就无法设计了! )
4、传统RDBMS和NoSQL
传统的RDBMS
-结构化组织
-SQL
-数据和关系都存在单独的表中
-数据操作,数据定义语言
-严格的一致性
-基础的事务
-。。。。。
NoSQL
-不仅仅是数据
-没有固定的查询语言
-键值对存储,列存储,文档存储,图形数据库(社交关系)
-最终一致性
-CAP定理和BASE (异地多活)
-高性能,高可用,高可扩
-。。。。。
了解:3V+3高
大数据时代的3V:主要是描述问题的
1、 海量 Volume
2、多样Variety
3、实时Velocity
大数据时代的3高:主要是对程序的要求
1、高并发
2、高可扩
3、高性能
真正在公司中的实践: NoSQL + RDBMS 一起使用才是最强的
NoSQL的四大分类
KV键值对:
- 新浪:Redis
- 美团:Redis+Tair
- 阿里、百度:Redis + memecache
文档型数据库(bson格式 和 json 一样):
- MongoDB (一般必须要掌握 )
- MongoDB是一个基于分布式文件存储的数据库, C++编写,主要用来处理大量的文档!
- MongoDB是一个介于关系型数据库和非关系型数据中中间的产品! MongoDB是非关系型数据库中功能最丰富,最像关系型数据库的!
- ConthDB
列存储数据库
图关系数据库:
- 他不是存图形,放的是关系,比如:朋友圈社交网络,广告推荐!
- Neo4jr InfoGrid ;
2、Redis入门
概述
Redis 是什么?
Redis ( Remote Dictionary Server ),即远程字典服务!
是一个开源的使用ANSI
C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库 ,并提供多种语言的API。
redis
会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave
(主从)同步。
免费和开源!是当下最热门的NoSQL技术之一!也被人们称之为结构化数据库!
Redis 能干嘛?
1、内存存储、持久化,内存中是断电即失、所以说持久化很重要( rdb、 aof )
2、效率高,可以用于高速缓存
3、发布订阅系统.
4、地图信息分析
5、计时器、计数器(浏览量! )
6、……
特性
1、多样的数据类型
2、持久化
3、集群
4、事务
……
测试性能
redis-benchmark 是一个压力测试工具!
官方自带的性能测试工具!
redis-benchmark 命令参数
序号 | 选项 | 描述 | 默认值 |
---|
1 | -h | 指定服务器主机名 | 127.0.0.1 |
2 | -p | 指定服务器端口 | 6379 |
3 | -s | 指定服务器socket | |
4 | -c | 指定并发连接数 | 50 |
5 | -n | 指定请求数 | 10000 |
6 | -d | 以字节的形式指定 SET/GET 值的数据大小 | 2 |
7 | -k | 1=keep alive 0=reconnect | 1 |
8 | -r | SET/GET/INCR 使用随机 key, SADD 使用随机值 | |
9 | -p | 通过管道传输 | 1 |
10 | -q | 强制退出 redis。仅显示 query/sec 值 | |
11 | -csv | 以 CSV 格式输出 | |
12 | -l | 生成循环,永久执行测试 | |
13 | -t | 仅运行以逗号分隔的测试命令列表。 | |
14 | -l | Idle 模式。仅打开 N 个 idle 连接并等待。 | |
简单测试:
基础知识
redis默认有16个数据库
默认使用的是第0个
可以使用select进行切换
127.0.0.1:6379> select 3
OK
127.0.0.1:6379[3]> dbsize
(integer) 0
127.0.0.1:6379[3]>
127.0.0.1:6379> keys *
1) "\xac\xed\x00\x05t\x00\x1buushop-sms-code-13992794421"
2) "name"
3) "myset:{tag}"
4) "key:{tag}:__rand_int__"
5) "counter:{tag}:__rand_int__"
清空当前数据库flushdb
清空全部数据库的内容FLUSHALL
思考:为什么redis是6379!(了解一下即可!)
Redis 是单线程的
明白Redis是很快的,官方表示, Redis是基于内存操作, CPU不是Redis性能瓶颈, Redis的瓶颈是根据机器的内存和网络带宽,既然可以使用单线程来实现,就使用单线程了!所有就使用了单线程了!
Redis是C语言写的,官方提供的数据为100000+ 的QPS ,完全不比同样是使用key-value的Memecache差!
Redis为什么单线程还这么快?
1、误区1 :高性能的服务器一定是多线程的?
2、误区2:多线程( CPU上下文会切换! ) 一定比单线程效率高!
先对CPU>内存>硬盘的速度要有所了解!
核心: redis是将所有的数据全部放在内存中的,所以说使用单线程去操作效率就是最高的,多线程( CPU上下文会切换:耗时的操作! ! ! ) , 对于内存系统来说,如果没有上下文切换效率就是最高的!
3、五大数据类型
官网文档
Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets) 与范围查询, bitmaps, hyperloglogs 和 地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication),LUA脚本(Lua scripting), LRU驱动事件(LRU eviction),事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)。
Redis-Key
在redis中无论什么数据类型,在数据库中都是以key-value形式保存,通过进行对Redis-key的操作,来完成对数据库中数据的操作。
exists key
:判断键是否存在del key
:删除键值对move key db
:将键值对移动到指定数据库expire key second
:设置键值对的过期时间type key
:查看value的数据类型
127.0.0.1:6379> keys *
1) "name"
127.0.0.1:6379> set age 10
OK
127.0.0.1:6379> keys *
1) "age"
2) "name"
127.0.0.1:6379> exists name
(integer) 1
127.0.0.1:6379> exists aga
(integer) 0
127.0.0.1:6379> move name 1
(integer) 1
127.0.0.1:6379> keys *
1) "age"
127.0.0.1:6379> set name zhaotong
OK
127.0.0.1:6379> get name
"zhaotong"
127.0.0.1:6379> expire name 10
(integer) 1
127.0.0.1:6379> ttl name
(integer) 5
127.0.0.1:6379> ttl name
(integer) -2
127.0.0.1:6379> get name
(nil)
127.0.0.1:6379> type name
none
127.0.0.1:6379> type age
string
String(字符串)
命令 | 描述 |
---|
APPEND key value | 向指定的key的value后追加字符串 |
DECR/INCR key | 将指定key的value数值进行+1/-1(仅对于数字) |
INCRBY/DECRBY key n | 按指定的步长对数值进行加减 |
INCRBYFLOAT key n | 为数值加上浮点型数值 |
STRLEN key | 获取key保存值的字符串长度 |
GETRANGE key start end | 按起止位置获取字符串(闭区间,起止位置都取) |
SETRANGE key offset value | 用指定的value 替换key中 offset开始的值 |
GETSET key value | 将给定 key 的值设为 value ,并返回 key 的旧值(old value)。 |
SETNX key value | 仅当key不存在时进行set |
SETEX key seconds value | set 键值对并设置过期时间 |
MSET key1 value1 [key2 value2…] | 批量set键值对 |
MSETNX key1 value1 [key2 value2…] | 批量设置键值对,仅当参数中所有的key都不存在时执行 |
MGET key1 [key2…] | 批量获取多个key保存的值 |
PSETEX key milliseconds value | 和 SETEX 命令相似,但它以毫秒为单位设置 key 的生存时间, |
127.0.0.1:6379> set key1 zhao
OK
127.0.0.1:6379> get key1
"zhao"
127.0.0.1:6379> keys *
1) "age"
2) "key1"
127.0.0.1:6379> exists key1
(integer) 1
127.0.0.1:6379> append key1 "tong"
(integer) 8
127.0.0.1:6379> get key1
"zhaotong"
127.0.0.1:6379> strlen key1
(integer) 8
127.0.0.1:6379> append key1 "zuiniubi"
(integer) 16
127.0.0.1:6379> strlen key1
(integer) 16
127.0.0.1:6379> get key1
"zhaotongzuiniubi"
127.0.0.1:6379> set views 0
OK
127.0.0.1:6379> get views
"0"
127.0.0.1:6379> incr views
(integer) 1
127.0.0.1:6379> incr views
(integer) 2
127.0.0.1:6379> get views
"2"
127.0.0.1:6379> decr views
(integer) 1
127.0.0.1:6379> decr views
(integer) 0
127.0.0.1:6379> decr views
(integer) -1
127.0.0.1:6379> get views
"-1"
127.0.0.1:6379> incrby views 10
(integer) 9
127.0.0.1:6379> get views
"9"
127.0.0.1:6379> decrby views 5
(integer) 4
127.0.0.1:6379> set key1 "hello,world"
OK
127.0.0.1:6379> get key1
"hello,world"
127.0.0.1:6379> getrange key1 0 4
"hello"
127.0.0.1:6379> getrange key1 0 -1
"hello,world"
127.0.0.1:6379> set key2 abcdef
OK
127.0.0.1:6379> get key2
"abcdef"
127.0.0.1:6379> setrange key2 1 xxxxxxxx
(integer) 9
127.0.0.1:6379> get key2
"axxxxxxxx"
127.0.0.1:6379> setex key1 30 "zhao"
OK
127.0.0.1:6379> ttl key1
(integer) 25
127.0.0.1:6379> get key1
"zhao"
127.0.0.1:6379> setnx mykey "redis"
(integer) 1
127.0.0.1:6379> keys *
1) "mykey"
127.0.0.1:6379> ttl key1
(integer) -2
127.0.0.1:6379> setnx mykey "MongoDB"
(integer) 0
127.0.0.1:6379> get mykey
"redis"
mset
mget
127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3
OK
127.0.0.1:6379> keys *
1) "k3"
2) "k1"
3) "k2"
127.0.0.1:6379> mget k1 k2 k3
1) "v1"
2) "v2"
3) "v3"
127.0.0.1:6379> msetnx k1 v1 k4 v4
(integer) 0
127.0.0.1:6379> get k4
(nil)
set user:1 {name:zhaotong,age:21}
127.0.0.1:6379> set user:1 {name:zhaotong,age:21}
OK
127.0.0.1:6379> get user:1
"{name:zhaotong,age:21}"
127.0.0.1:6379> mset user:2:name zhansan user:2:age 20
OK
127.0.0.1:6379> mget user:2:name user:2:age
1) "zhansan"
2) "20"
getset
127.0.0.1:6379> getset db redis
(nil)
127.0.0.1:6379> get db
"redis"
127.0.0.1:6379> getset db mysql
"redis"
127.0.0.1:6379> get db
"mysql"
String类似的使用场景:value除了是字符串还可以是数字,用途举例:
- 计数器
- 统计多单位的数量:uid:123666:follow 0
- 粉丝数
- 对象存储缓存
List(列表)
基本的数据类型,列表
在redis里面,我们可以把list玩成 栈 、队列、阻塞队列!
所有的list命令都是用L开头的
127.0.0.1:6379> lpush list one
(integer) 1
127.0.0.1:6379> lpush list two
(integer) 2
127.0.0.1:6379> lpush list three
(integer) 3
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "two"
3) "one"
127.0.0.1:6379> lrange list 0 1
1) "three"
2) "two"
127.0.0.1:6379> Rpush list 4
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "two"
3) "one"
4) "4"
127.0.0.1:6379> lpop list
"three"
127.0.0.1:6379> rpop list
"4"
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
Lindex
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
127.0.0.1:6379> lindex list 0
"two"
127.0.0.1:6379> lindex list 1
"one"
Llen
127.0.0.1:6379> lrange list 0 -1
1) "three"
127.0.0.1:6379> lpush list one
(integer) 2
127.0.0.1:6379> lpush list two
(integer) 3
127.0.0.1:6379> llen list
(integer) 3
lrem 移除指定的值
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "one"
3) "two"
4) "one"
127.0.0.1:6379> lrem list 1 two
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "one"
3) "one"
127.0.0.1:6379> lrem list 2 one
(integer) 2
127.0.0.1:6379> lrange list 0 -1
1) "three"
trim 修剪 :list 截断!
127.0.0.1:6379> rpush list1 "1111"
(integer) 1
127.0.0.1:6379> rpush list1 "2222"
(integer) 2
127.0.0.1:6379> rpush list1 3333
(integer) 3
127.0.0.1:6379> rpush list1 4444
(integer) 4
127.0.0.1:6379> lrange list1 0 -1
1) "1111"
2) "2222"
3) "3333"
4) "4444"
127.0.0.1:6379> ltrim list1 1 2
OK
127.0.0.1:6379> lrange list1 0 -1
1) "2222"
rpoplpush
127.0.0.1:6379> lrange list1 0 -1
1) "2222"
2) "3333"
127.0.0.1:6379> rpoplpush list1 mylist
"3333"
127.0.0.1:6379> lrange list 0 -1
1) "4444"
127.0.0.1:6379> lrange mylist 0 -1
1) "3333"
lset 将列表中指定下标的值替换为另外一个值,更新操作
127.0.0.1:6379> exists list
(integer) 0
127.0.0.1:6379> lset list 0 item
(error) ERR no such key
127.0.0.1:6379> lpush list value1
(integer) 1
127.0.0.1:6379> lrange list 0 0
1) "value1"
127.0.0.1:6379> lset list 0 item
OK
127.0.0.1:6379> lrange list 0 0
1) "item"
127.0.0.1:6379> lset list 1 other
(error) ERR index out of range
linsert
127.0.0.1:6379> rpush list hello
(integer) 1
127.0.0.1:6379> rpush list world
(integer) 2
127.0.0.1:6379> linsert list before "world" "other"
(integer) 3
127.0.0.1:6379> lrange list 0 -1
1) "hello"
2) "other"
3) "world"
127.0.0.1:6379> linsert list after world 00
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "hello"
2) "other"
3) "world"
4) "00"
小结
- 他实际上是一个链表, before Node after ,left , right都可以插入值
- 如果key 不存在,创建新的链表
- 如果key存在,新增内容
- 如果移除了所有值,空链表,也代表不存在!
- 在两边插入或者改动值,效率最高!中间元素,相对来说效率会低一点
消息排队!消息队列( Lpush Rpop ),栈(Lpush Lpop)!
Set(集合)
127.0.0.1:6379> sadd myset hello
(integer) 1
127.0.0.1:6379> sadd myset zhao
(integer) 1
127.0.0.1:6379> sadd myset tong
(integer) 1
127.0.0.1:6379> smembers myset
1) "tong"
2) "hello"
3) "zhao"
127.0.0.1:6379> sismember myset hello
(integer) 1
127.0.0.1:6379> sismember myset zzzz
(integer) 0
127.0.0.1:6379> scard myset
(integer) 3
Srem
127.0.0.1:6379> srem myset hello
(integer) 1
127.0.0.1:6379> scard myset
(integer) 2
127.0.0.1:6379> smembers myset
1) "tong"
2) "zhao"
set
127.0.0.1:6379> smembers myset
1) "2"
2) "1"
3) "tong"
4) "zhao"
5) "3"
127.0.0.1:6379> srandmember myset
"zhao"
127.0.0.1:6379> srandmember myset
"3"
127.0.0.1:6379> srandmember myset
"zhao"
127.0.0.1:6379> srandmember myset
"tong"
127.0.0.1:6379> srandmember myset 2
1) "1"
2) "zhao"
127.0.0.1:6379> srandmember myset 2
1) "zhao"
2) "3"
删除指定的key,随即删除key
127.0.0.1:6379> smembers myset
1) "tong"
2) "1"
3) "zhao"
4) "3"
5) "2"
127.0.0.1:6379> spop myset
"tong"
127.0.0.1:6379> spop myset
"3"
127.0.0.1:6379> smembers myset
1) "1"
2) "zhao"
3) "2"
将一个指定的值,移动到另外一个集合
127.0.0.1:6379> sadd set 666
(integer) 1
127.0.0.1:6379> sadd set 888
(integer) 1
127.0.0.1:6379> smembers myset
1) "1"
2) "zhao"
3) "2"
127.0.0.1:6379> smove myset set zhao
(integer) 1
127.0.0.1:6379> smembers myset
1) "1"
2) "2"
127.0.0.1:6379> smembers set
1) "888"
2) "666"
3) "zhao"
数字集合类:
- 差集 SDIFF
- 交集 SINTER
- 并集 SUNION
127.0.0.1:6379> smembers set1
1) "c"
2) "b"
3) "a"
127.0.0.1:6379> smembers set2
1) "d"
2) "c"
3) "e"
127.0.0.1:6379> sdiff set1 set2
1) "b"
2) "a"
127.0.0.1:6379> sinter set1 set2
1) "c"
127.0.0.1:6379> sunion set1 set2
1) "b"
2) "c"
3) "a"
4) "d"
5) "e"
Hash(哈希)
Map集合,key-map!这个时候的值是一个map集合!本质和String类型没有太大的区别,还是一个简单的key - value!
127.0.0.1:6379> hset hash one zhao
(integer) 1
127.0.0.1:6379> hget hash one
"zhao"
127.0.0.1:6379> hmset hash one tong two tang
OK
127.0.0.1:6379> hmget hash one two
1) "tong"
2) "tang"
127.0.0.1:6379> hgetall hash
1) "one"
2) "tong"
3) "two"
4) "tang"
127.0.0.1:6379> hdel hash one
(integer) 1
127.0.0.1:6379> hgetall hash
1) "two"
2) "tang"
127.0.0.1:6379> hgetall hash
1) "two"
2) "tang"
3) "one"
4) "1"
5) "three"
6) "3"
127.0.0.1:6379> hlen hash
(integer) 3
127.0.0.1:6379> hexists hash one
(integer) 1
127.0.0.1:6379> hexists hash hello
(integer) 0
127.0.0.1:6379> hkeys hash
1) "two"
2) "one"
3) "three"
127.0.0.1:6379> hvals hash
1) "tang"
2) "1"
3) "3"
127.0.0.1:6379> hincrby hash one 3
(integer) 4
127.0.0.1:6379> hincrby hash one -1
(integer) 3
127.0.0.1:6379> hsetnx hash field1 hello
(integer) 1
127.0.0.1:6379> hsetnx hash field1 world
(integer) 0
hash变更的数据user name age,尤其是是用户信息之类的,经常变动的信息! hash 更适合于对象的存储, String更加适合字符串存储!
Zset(有序集合)
在set的基础上,增加了一个值, set k1 V1 zset k1 score1 V1
127.0.0.1:6379> zadd myset 1 one
(integer) 1
127.0.0.1:6379> zadd myset 2 two
(integer) 1
127.0.0.1:6379> zadd myset 3 three
(integer) 1
127.0.0.1:6379> zadd myset 4 hello 5 world
(integer) 2
127.0.0.1:6379> zrange myset 0 -1
1) "one"
2) "two"
3) "three"
4) "hello"
5) "world"
排序
127.0.0.1:6379> zadd salary 10000 zhaotong
(integer) 1
127.0.0.1:6379> zadd salary 5000 zhangsan
(integer) 1
127.0.0.1:6379> zadd salary 500 lisi
(integer) 1
127.0.0.1:6379> zrangebyscore salary -inf +inf
1) "lisi"
2) "zhangsan"
3) "zhaotong"
127.0.0.1:6379> zrangebyscore salary -inf +inf withscores
1) "lisi"
2) "500"
3) "zhangsan"
4) "5000"
5) "zhaotong"
6) "10000"
127.0.0.1:6379> zrangebyscore salary 5000 10000
1) "zhangsan"
2) "zhaotong"
127.0.0.1:6379> zrevrange salary 0 -1 withscores
1) "zhaotong"
2) "10000"
3) "zhangsan"
4) "5000"
5) "lisi"
6) "500"
zrem
127.0.0.1:6379> zrem salary lisi
(integer) 1
127.0.0.1:6379> zrange salary 0 -1
1) "zhangsan"
2) "zhaotong"
127.0.0.1:6379> zcard salary
(integer) 2
127.0.0.1:6379> zadd myset 1 hello 2 world 3 shijie
(integer) 3
127.0.0.1:6379> zcount myset 1 3
(integer) 3
127.0.0.1:6379> zcount myset 1 2
(integer) 2
案例思路: set排序存储班级成绩表,工资表排序!
普通消息,1,重要消息,2,带权重进行判断!
排行榜应用实现,取Top N测试!
4、三种特殊数据类型
Geospatial 地理位置
朋友的定位,附近的人,打车距离计算?
Redis的Geo在Redis3.2版本就推出了!这个功能可以推算地理位置的信息,两地之间的距离,方圆几里的人!
只有六个命令
GEOADD
(error) ERR invalid longitude,latitude pair 40.046000,106.408000
127.0.0.1:6379> geoadd china:city 121.445 31.213 shanghai 117.246 39.117 tianjin 101.778 36.623 xian
(integer) 3
127.0.0.1:6379> geoadd china:city 106.408 40.046 chongqing
(integer) 1
GEOPOS
获得当前定位:一定是一个坐标值!
127.0.0.1:6379> geopos china:city beijing
1) 1) "116.40800267457962036"
2) "39.90399988166036138"
127.0.0.1:6379> geopos china:city shanghai xian
1) 1) "121.44499808549880981"
2) "31.213001199663303"
2) 1) "101.77800089120864868"
2) "36.62300104914651655"
GEODIST
两人之间的距离!
单位:
- m 表示单位为米。
- km 表示单位为千米。
- mi 表示单位为英里。
- ft 表示单位为英尺。
127.0.0.1:6379> geodist china:city beijing shanghai km
"1068.2320"
127.0.0.1:6379> geodist china:city beijing xian km
"1326.9914"
127.0.0.1:6379> geodist china:city chongqing xian km
"554.9371"
GEORADUIS 已给定的经纬度为中心,找出某一半径内的元素
附近的人(获得所有附近的人的地址,定位!)通过半径来查询
127.0.0.1:6379> georadius china:city 110 35 1000 km
1) "xian"
2) "chongqing"
3) "tianjin"
4) "beijing"
127.0.0.1:6379> GEORADIUS china:city 120 30 1000 km withdist
1) 1) "shanghai"
2) "193.2264"
127.0.0.1:6379> GEORADIUS china:city 120 30 1000 km withcoord
1) 1) "shanghai"
2) 1) "121.44499808549880981"
2) "31.213001199663303"
127.0.0.1:6379> GEORADIUS china:city 110 35 1000 km withdist withcoord count 2
1) 1) "chongqing"
2) "644.3693"
3) 1) "106.40799790620803833"
2) "40.0460000304488446"
2) 1) "xian"
2) "762.9935"
3) 1) "101.77800089120864868"
2) "36.62300104914651655"
127.0.0.1:6379> GEORADIUS china:city 110 35 1000 km withdist withcoord count 1
1) 1) "chongqing"
2) "644.3693"
3) 1) "106.40799790620803833"
2) "40.0460000304488446"
GEORADIUSBYMEMBER
找出位于指定元素周围的其他元素
127.0.0.1:6379> georadiusbymember china:city beijing 1000 km
1) "tianjin"
2) "beijing"
3) "chongqing"
GEOHASH 返回一个或多个未知元素的Geohash表示
该命令返回11个字符的Geohash字符串!
127.0.0.1:6379> geohash china:city xian chongqing
1) "wq82k502rq0"
2) "wr5ev65uy50"
Hyperloglog(基数统计)
简介
Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。
花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。
因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
什么是基数?
数据集中不重复的元素的个数。
测试使用
127.0.0.1:6379> pfadd num1 1 2 3 4 5 6 7 8 9 10
(integer) 1
127.0.0.1:6379> pfcount num1
(integer) 10
127.0.0.1:6379> pfadd num2 5 4 3 22 66 44 88
(integer) 1
127.0.0.1:6379> pfcount num2
(integer) 7
127.0.0.1:6379> pfmerge num num1 num2
OK
127.0.0.1:6379> pfcount num
(integer) 14
作用
网页的访问量(UV):一个用户多次访问,也只能算作一个人。
传统的方式,set 保存用户的id ,然后就可以统计set中的元素数量作为标准判断!
这个方式如果保存大量的用户id ,就会比较麻烦!我们的目的是为了计数,而不是保存用户id ;
0.81%错误率!统计UV任务,可以忽略不计的!
Bitmaps
位存储
统计用户信息,活跃,不活跃!登录、未登录!打卡, 365打卡!两个状态的,都可以使用Bitmaps !
Bitmaps位图,数据结构!都是操作二进制位来进行记录,就只有0和1两个状态!
365天= 365 bit 1字节=8bit 46 个字节左右!
测试
127.0.0.1:6379> setbit sign 1 1
(integer) 0
127.0.0.1:6379> setbit sign 2 0
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 4 0
(integer) 0
127.0.0.1:6379> setbit sign 5 0
(integer) 0
127.0.0.1:6379> setbit sign 6 1
(integer) 0
127.0.0.1:6379> setbit sign 7 0
(integer) 0
127.0.0.1:6379> getbit sign 1
(integer) 1
127.0.0.1:6379> getbit sign 2
(integer) 0
127.0.0.1:6379> bitcount sign
(integer) 3
5、事务
Redis事务本质:一组命令的集合 !
-------------------队列 set set set 执行---------------------
一个事务中的所有命令都会被序列化,在事务执行过程的中,会按照顺序执行!
Redis事务没有隔离级别的概念
所有的命令在事务中,并没有直接被执行!只有发起执行命令的时候才会执行! Exec
Redis的单条命令是保证原子性的,但是redis事务不能保证原子性
Redis事务操作过程
- 开启事务(
multi
) - 命令入队
- 执行事务(
exec
)
正常执行事务!
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 1
QUEUED
127.0.0.1:6379> set k2 2
QUEUED
127.0.0.1:6379> get k1
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
3) "1"
4) OK
放弃事务!
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> discard
OK
127.0.0.1:6379> get k1
(nil)
事务错误
编译型异常(代码有问题!命令有错!),事务中所有的命令都不会被执行!
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> getset k3
(error) ERR wrong number of arguments for 'getset' command
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k4
(nil)
运行时异常(1/0),如果事务队列中存在语法性,那么执行命令的时候,其他命令是可以正常执行的,错误命令抛出异常!
127.0.0.1:6379> set k1 v1
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> incr k1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> exec
1) (error) ERR value is not an integer or out of range
2) OK
3) OK
4) "v2"
127.0.0.1:6379> get k3
"v3"
监控
悲观锁:
- 很悲观,认为什么时候都会出现问题,无论做什么都会加锁
乐观锁:
- 很乐观,认为什么时候都不会出现问题,所以不会上锁!更新数据的时候去判断一下,在此期间是否有人修改过这个数据
- 获取version
- 更新的时候比较version
使用watch key监控指定数据,相当于乐观锁加锁。
正常执行
127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch money
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> decrby money 20
QUEUED
127.0.0.1:6379> incrby out 20
QUEUED
127.0.0.1:6379> exec
1) (integer) 80
2) (integer) 20
失败执行
线程1:
127.0.0.1:6379> set in 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch in
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> decrby in 20
QUEUED
127.0.0.1:6379> incrby out 20
QUEUED
127.0.0.1:6379>
模拟线程插队,线程2:
127.0.0.1:6379> get in
"100"
127.0.0.1:6379> incrby in 500
(integer) 600
回到线程1,执行事务:
127.0.0.1:6379> exec
(nil)
127.0.0.1:6379> get in
"600"
解锁获取最新值,然后再加锁进行事务。unwatch
进行解锁。
注意:每次提交执行exec后都会自动释放锁,不管是否成功
6、Jedis
什么是Jedis 是Redis官方推荐的Java连接开发工具!使用Java操作Redis中间件!如果你要使用Java操作Redis,那么对Redis要熟悉!
1、导入依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
2、编码测试
package jedis;
import redis.clients.jedis.Jedis;
public class TestPing {
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.203.128",6379);
String ping = jedis.ping();
System.out.println(ping);
String set = jedis.set("name", "辰东");
String s = jedis.get("name");
System.out.println(s);
jedis.close();
}
}
3、事务
package jedis;
import com.alibaba.fastjson.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
public class TestTX {
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.203.128",6379);
JSONObject jsonObject = new JSONObject();
jsonObject.put("遮天","辰东");
jsonObject.put("盘龙","我吃西红柿");
String string = jsonObject.toJSONString();
Transaction multi = jedis.multi();
try {
multi.set("book1",string);
multi.set("book2",string);
multi.exec();
} catch (Exception e) {
multi.discard();
} finally {
System.out.println(jedis.get("book1"));
System.out.println(jedis.get("book2"));
jedis.close();
}
}
}
7、Spring Boot的整合
1、导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
说明:在SpringBoot2.x之后,原来使用的jedis被替换为了lettuce?
jedis :采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用jedis pool 连接池!更像BIO模式
lettuce :采用netty ,实例可以再多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据了,更像NIO模式
2、源码分析:
@Bean
@ConditionalOnMissingBean(name = {"redisTemplate"})
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
测试连接
package com.wdzl;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
@SpringBootTest
class RedisSpringbootApplicationTests {
@Autowired
private RedisTemplate redisTemplate;
@Test
void contextLoads() {
redisTemplate.opsForValue().set("hello","世界");
String hello = (String)redisTemplate.opsForValue().get("hello");
System.out.println(hello);
}
}
自定义Redis Tempalte
对象的存储要系列化之后才行
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redis(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
工具类
package com.wdzl.utils;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@Component
public final class RedisUtil {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
public boolean expire(String key, long time) {
try {
if ( time > 0){
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public void delete(String key) {
redisTemplate.delete(key);
}
public void delete(Collection<String> keys) {
redisTemplate.delete(keys);
}
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else{
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
8、Redis.conf 详解
单位
容量单位不区分大小写,G和GB有区别
可以使用 include 组合多个配置问题
网络配置
bind 127.0.0.1
protected-mode yes
port 6379
通用 GENERAL
daemonize yes
pidfile /var/run/redis_6379.pid
loglevel notice
logfile ""
databases 16
always-show-logo yes
快照
持久化,在规定的时间内,执行了多少次操作,则会持久化到文件.rdb. aof
redis是内存数据库,如果没有持久化,那么数据断电及失!
save 900 1
save 300 10
save 60 10000
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dir ./
SECURITY 安全
可以在这里设置redis的密码,默认是没有密码!
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) ""
127.0.0.1:6379> config set requirepass "123456"
OK
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "123456"
127.0.0.1:6379> auth 123456
OK
限制 CLIENTS
maxclients 10000
maxmemory <bytes>
maxmemory-policy noeviction
1、volatile-lru:只对设置了过期时间的key进行LRU(默认值)
2、allkeys-lru : 删除lru算法的key
3、volatile-random:随机删除即将过期key
4、allkeys-random:随机删除
5、volatile-ttl : 删除即将过期的
6、noeviction : 永不过期,返回错误
APPEND ONLY 模式 AOF配置
appendonly no
appendfilename "appendonly.aof"
appendfsync everysec
9、Redis持久化
Redis是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出 ,服务器中的数据库状态也会消失。所以Redis提供了持久化功能!
RDB(Redis DateBase)
什么是RDB
在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot
快照,它恢复时是将快照文件直接读到内存里。
Redis会单独创建( fork )一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO
操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。我们默认的就是RDB ,一般情况下不需要修改这个配置!
有时候在生产环境我们会将这个文件进行备份!
rdb保存的文件是dump.rdb 都是在我们的配置文件中快照中进行配置的!
触发机制
- save的规则满足的情况下,会自动触发rdb规则
- 执行flushall命令,也会触发我们的rdb规则!
- 退出redis,也会产生rdb文件!
备份就自动生成一个dump.rdb
使用 save
命令,会立刻对当前内存中的数据进行持久化 ,但是会阻塞,也就是不接受其他操作了;
bgsave和save对比
命令 | save | bgsave |
---|
IO类型 | 同步 | 异步 |
阻塞? | 是 | 是(阻塞发生在fock(),通常非常快) |
复杂度 | O(n) | O(n) |
优点 | 不会消耗额外的内存 | 不阻塞客户端命令 |
缺点 | 阻塞客户端命令 | 需要fock子进程,消耗内存 |
如何恢复rdb文件
1、只需要将rdb文件放在我们redis启动目录就可以, redis启动的时候会自动检查dump.rdb恢复其中的数据!
2、查看需要存在的位置
127.0.0.1:6379> config get dir
1) "dir"
2) "/"
优缺点
优点:
缺点:
- 需要一定的时间间隔进行操作,如果redis意外宕机了,这个最后一次修改的数据就没有了。
- fork进程的时候,会占用一定的内容空间。
AOF(Append Only File)
AOF 是什么?
以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录) ,只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作.
Aof保存的是appendonly.aof文件
append
appendonly no yes则表示启用AOF
默认是不开启的,我们需要手动配置,然后重启redis,就可以生效了!
如果这个aof文件有错误,这时候redis是启动不起来的,我需要修改这个aof文件
redis给我们提供了一个工具redis-check-aof --fix
重写规则
aof默认就是文件的无限追加,文件会越来越大!
如果aof文件大于64m,太大了! fork一个新的进程来将我们的文件进行重写
优点和缺点
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec
优点
- 每一次修改都会同步,文件的完整性会更加好
- 没秒同步一次,可能会丢失一秒的数据
- 从不同步,效率最高
缺点
- 相对于数据文件来说,aof远远大于rdb,修复速度比rdb慢!
- Aof运行效率也要比rdb慢,所以我们redis默认的配置就是rdb持久化
10、Redis发布订阅
Redis 发布订阅(pub/sub)是一种消息通信模式:发布者(pub)发送消息,订阅者(sub)接受消息。微信、微博、关注系统!
Redis客户端可以订阅任意数量的频道。
订阅/发布消息图:
三个要素:1、消息发送者 2、频道 3、消息订阅者
命令
命令 | 描述 |
---|
PSUBSCRIBE pattern [pattern…] | 订阅一个或多个符合给定模式的频道。 |
PUNSUBSCRIBE pattern [pattern…] | 退订一个或多个符合给定模式的频道。 |
PUBSUB subcommand [argument[argument]] | 查看订阅与发布系统状态。 |
PUBLISH channel message | 向指定频道发布消息 |
SUBSCRIBE channel [channel…] | 订阅给定的一个或多个频道 |
SUBSCRIBE channel [channel…] | 退订一个或多个频道 |
实例
------------订阅端----------------------
127.0.0.1:6379> subscribe read
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "read"
3) (integer) 1
1) "message"
2) "read"
3) "hello,world"
1) "message"
2) "read"
3) "tong"
--------------消息发布端-------------------
127.0.0.1:6379> publish read "hello,world"
(integer) 1
127.0.0.1:6379> publish read "tong"
(integer) 1
-----------------查看活跃的频道------------
127.0.0.1:6379> pubsub channels
1) "read"
原理
Redis是使用C实现的,通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现,籍此加深对Redis的理解。
Redis通过PUBLISH
、SUBSCRIBE
和PSUBSCRIBE
等命令实现发布和订阅功能。
微信:
通过SUBSCRIBE命令订阅某频道后, redis-server里维护了一个字典,字典的键就是一个个频道, 而字典的值则是一个链表 ,链表中保存了所有订阅这个channel的客户端。SUBSCRIBE 命令的关键,就是将客户端添加到给定channel的订阅链表中。
通过PUBLISH命令向订阅者发送消息, redis-server 会使用给定的频道作为键,在它所维护的channel字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
Pub/Sub从字面上理解就是发布( Publish)与订阅( Subscribe ) , 在Redis中,你可以设定对某一个key值进行消 息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。
缺点
- 如果一个客户端订阅了频道,但自己读取消息的速度却不够快的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。
- 这和数据传输可靠性有关,如果在订阅方断线,那么他将会丢失所有在断线期间发布者发布的消息。
应用
- 消息订阅:公众号订阅,微博关注等等(起始更多是使用消息队列来进行实现)
- 多人在线聊天室。
稍微复杂的场景,我们就会使用消息中间件MQ处理。
11、Redis主从复制
概念
主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(Master/Leader),后者称为从节点(Slave/Follower), 数据的复制是单向的!只能由主节点复制到从节点(主节点以写为主、从节点以读为主)。
默认情况下,每台Redis服务器都是主节点
一个主节点可以有0个或者多个从节点,但每个从节点只能由一个主节点。
作用
- 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余的方式。
- 故障恢复:当主节点故障时,从节点可以暂时替代主节点提供服务,是一种服务冗余的方式
- 负载均衡:在主从复制的基础上,配合读写分离,由主节点进行写操作,从节点进行读操作,分担服务器的负载;尤其是在多读少写的场景下,通过多个从节点分担负载,提高并发量。
- 高可用(集群)基石:主从复制还是哨兵和集群能够实施的基础。
为什么使用集群
1、单台服务器难以负载大量的请求
2、单台服务器故障率高,系统崩坏概率大
3、单台服务器内存容量有限。
单台Redis最大使用内存不应该超过20G。
环境配置
我们在讲解配置文件的时候,注意到有一个replication
模块
查看当前库的信息:info replication
127.0.0.1:6379> info replication
role:master
connected_slaves:0
master_replid:fd21a2e99b3dda5736a4d866ac7f70b68e53ef2e
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
既然需要启动多个服务,就需要多个配置文件。每个配置文件对应修改以下信息:
启动单机多服务集群:
一主二从配置
默认情况下,每台Redis服务器都是主节点我们一般情况下只用配置从机就好了!
认老大!一主(79)二从(80,81)
使用SLAVEOF host port
就可以为从机配置主机了
然后主机上也能看到从机的状态:
127.0.0.1:6379> info replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6381,state=online,offset=1120,lag=0
slave1:ip=127.0.0.1,port=6380,state=online,offset=1120,lag=1
master_replid:f57f36ecf423d552d391896584ae580fd82dd548
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:1120
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:1120
我们这里是使用命令搭建,是暂时的,真实开发中应该在从机的配置文件中进行配置这样的话是永久的。
使用规则
127.0.0.1:6380> get name
"zhao"
127.0.0.1:6380> set k1 v1
(error) READONLY You can't write against a read only replica.
- 当主机断电宕机后,默认情况下从机的角色不会发生变化 ,集群中只是失去了写操作,当主机恢复以后,又会连接上从机恢复原状。
- 当从机断电宕机后,若不是使用配置文件配置的从机,再次启动后作为主机是无法获取之前主机的数据的,若此时重新配置称为从机,又可以获取到主机的所有数据。这里就要提到一个同步原理。
Slave启动成功连接到master后会发送一个sync同步命令
Master接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后, master将传送整个数据文件到slave ,并完成一次完全同步。
全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
增量复制: Master继续将新的所有收集到的修改命令依次传给slave ,完成同步
但是只要是重新连接master , 一次完全同步(全量复制)将被自动执行
- 第二条中提到,默认情况下,主机故障后,不会出现新的主机,有两种方式可以产生新的主机:
- 从机手动执行命令
slaveof no one
,这样执行以后从机会独立出来成为一个主机 - 使用哨兵模式(自动选举)
层层链路
上一个M链接下一个S!
如果主机宕机了,这个时候我们手动设置!
谋朝篡位
如果主机断开了连接,我们可以使用SLAVEOF no one
让自己变成主机!其他的节点就可以手动连接到最新的这个主节点
12、哨兵模式
概述
主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。
单机单个哨兵
哨兵的作用:
- 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
- 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。
多哨兵模式
哨兵的核心配置
sentinel monitor mymaster 127.0.0.1 6379 1
- 数字1表示 :当一个哨兵主观认为主机断开,就可以客观认为主机故障,然后开始选举新的主机。
测试
redis-sentinel xxx/sentinel.conf
成功启动哨兵模式
[root@localhost etc]
3867:X 01 Aug 2021 02:23:34.206
3867:X 01 Aug 2021 02:23:34.207
3867:X 01 Aug 2021 02:23:34.207
3867:X 01 Aug 2021 02:23:34.207 * Increased maximum number of open files to 10032 (it was originally set to 1024).
_._
_.-``__ ''-._
_.-`` `. `_. ''-._ Redis 6.0.6 (00000000/0) 64 bit
.-`` .-```. ```\/ _.,_ ''-._
( ' , .-` | `, ) Running in sentinel mode
|`-._`-...-` __...-.``-._|'` _.-'| Port: 26379
| `-._ `._ / _.-' | PID: 3867
`-._ `-._ `-./ _.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' | http://redis.io
`-._ `-._`-.__.-'_.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' |
`-._ `-._`-.__.-'_.-' _.-'
`-._ `-.__.-' _.-'
`-._ _.-'
`-.__.-'
3867:X 01 Aug 2021 02:23:34.208
3867:X 01 Aug 2021 02:23:34.209
3867:X 01 Aug 2021 02:23:34.209
3867:X 01 Aug 2021 02:23:34.210 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
3867:X 01 Aug 2021 02:23:34.211 * +slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
此时哨兵监视着我们的主机6379,当我们断开主机后
哨兵模式优缺点
优点:
- 哨兵集群,基于主从复制模式,所有主从复制的优点,它都有
- 主从可以切换,故障可以转移,系统的可用性更好
- 哨兵模式是主从模式的升级,手动到自动,更加健壮
缺点:
- Redis不好在线扩容,集群容量一旦达到上限,在线扩容就十分麻烦
- 实现哨兵模式的配置其实是很麻烦的,里面有很多配置项
哨兵模式的全部配置
完整的哨兵模式配置文件 sentinel.conf
port 26379
dir /tmp
sentinel monitor mymaster 127.0.0.1 6379 1
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
sentinel down-after-milliseconds mymaster 30000
这个数字越小,完成failover所需的时间就越长,
但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。
可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000
sentinel notification-script mymaster /var/redis/notify.sh
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
13、缓存穿透与雪崩
缓存穿透(查不到)
概念
在默认情况下,用户请求数据时,会先在缓存(Redis)中查找,若没找到即缓存未命中,再在数据库中进行查找,数量少可能问题不大,可是一旦大量的请求数据(例如秒杀场景)缓存都没有命中的话,就会全部转移到数据库上,造成数据库极大的压力,就有可能导致数据库崩溃。网络安全中也有人恶意使用这种手段进行攻击被称为洪水攻击。
解决方案
布隆过滤器
对所有可能查询的参数以Hash的形式存储,以便快速确定是否存在这个值,在控制层先进行拦截校验,校验不通过直接打回,减轻了存储系统的压力。
缓存空对象
一次请求若在缓存和数据库中都没找到,就在缓存中方一个空对象用于处理后续这个请求。
这样做有一个缺陷:存储空对象也需要空间,大量的空对象会耗费一定的空间,存储效率并不高。解决这个缺陷的方式就是设置较短过期时间
即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。
缓存击穿(量太大,缓存过期)
概念
相较于缓存穿透,缓存击穿的目的性更强,一个存在的key,在缓存过期的一刻,同时有大量的请求,这些请求都会击穿到DB,造成瞬时DB请求量大、压力骤增。这就是缓存被击穿,只是针对其中某个key的缓存不可用而导致击穿,但是其他的key依然可以使用缓存响应。
比如热搜排行上,一个热点新闻被同时大量访问就可能导致缓存击穿。
解决方案
设置热点数据永不过期
这样就不会出现热点数据过期的情况,但是当Redis内存空间满的时候也会清理部分数据,而且此种方案会占用空间,一旦热点数据多了起来,就会占用部分空间。
加互斥锁(分布式锁)
在访问key之前,采用SETNX(set if not exists)来设置另一个短期key来锁住当前key的访问,访问结束再删除该短期key。保证同时刻只有一个线程访问。这样对锁的要求就十分高。
缓存雪崩
概念
大量的key设置了相同的过期时间,导致在缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。
解决方案
-
redis高可用
这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群
-
限流降级
这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。
-
数据预热
数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)