上一篇博客Redis源码剖析–快速列表 带大家一起剖析了quicklist这个底层数据结构的实现原理。Redis对外开放的列表list结构就是采用quicklist作为底层实现(在新版本的Redis源码中,不再采用ziplist和sdlist两种结构,而是统一采用quicklist)。有关列表键的实现源码在t_list.c文件中,大家可以边看源码边看这篇博客,一起来理解。
List概述
其实在[Redis源码剖析—对象Object]一文中有一个错误,list数据类型的底层编码并没有采用ziplist和sdlist,而是统一采用quicklist作为底层数据结构,这点需要提前说明一下。Redis的新版本中,list的底层编码类型只有OBJ_ENCODING_QUICKLIST,那么原先关于合适进行编码类型转换的代码都省略了。
列表没有其特有的数据结构,而是采用RedisObject作为其泛型数据结构,当RedisObject的编码类型为OBJ_LIST时,该对象被认为是一个列表。
Redis为列表提供了迭代器结构,本质就是quicklist迭代器的基本上做了一层封装。
typedef struct {
robj *subject;
unsigned char encoding;
unsigned char direction;
quicklistIter *iter;
} listTypeIterator;
typedef struct {
listTypeIterator *li;
quicklistEntry entry;
} listTypeEntry;
List主要接口
列表定义了基本的接口函数,包括push,pop,insert,find等等,基本上都是在quicklist上做了一次封装。我们先来看看主要有哪些接口。
void listTypePush(robj *subject, robj *value, int where);
robj *listTypePop(robj *subject, int where);
unsigned long listTypeLength(robj *subject);
listTypeIterator *listTypeInitIterator(robj *subject, long index,
unsigned char direction);
void listTypeReleaseIterator(listTypeIterator *li);
int listTypeNext(listTypeIterator *li, listTypeEntry *entry);
robj *listTypeGet(listTypeEntry *entry) ;
void listTypeInsert(listTypeEntry *entry, robj *value, int where);
int listTypeEqual(listTypeEntry *entry, robj *o);
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry);
void listTypeConvert(robj *subject, int enc);
其中,我们以push和pop操作来简要看看这些函数的实现源码。
void listTypePush(robj *subject, robj *value, int where) {
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
value = getDecodedObject(value);
size_t len = sdslen(value->ptr);
quicklistPush(subject->ptr, value->ptr, len, pos);
decrRefCount(value);
} else {
serverPanic("Unknown list encoding");
}
}
robj *listTypePop(robj *subject, int where) {
long long vlong;
robj *value = NULL;
int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
NULL, &vlong, listPopSaver)) {
if (!value)
value = createStringObjectFromLongLong(vlong);
}
} else {
serverPanic("Unknown list encoding");
}
return value;
}
其他的一些接口函数均是调用quicklist提供的底层接口函数来实现,大家有空可以对照源码来看看。
List命令
与string一样,list也提供了很多命令以供用户使用。按照惯例,先列一张表给大家(包含部分重要指令)。
命令形式 | 命令描述 |
---|
LPUSH key value1 [value2….] | 将一个或多个值插入到列表头部 |
LPOP key | 移除并获取列表的头部第一个元素 |
RPUSH key value1 [value2….] | 将一个或者多个值插入到列表尾部 |
RPOP key | 移除并获取列表的尾部第一个元素 |
LPUSHX key value | 为已存在的列表头部添加值 |
RPUSHX key value | 为已存在的列表尾部添加值 |
BLPOP key1 [key2…] timeout | 移出并获取列表的第一个元素(阻塞模式) |
BRPOP key1 [key2…] timeout | 移出并获取列表的最后一个元素(阻塞模式) |
BRPOPLPUSH source destination timeout | 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它(阻塞模式) |
LLEN key | 获取列表长度 |
LINDEX | 通过索引获取列表中的元素 |
同样,博主仅仅贴出部分源码来供大家理解这些命令的简要实现过程,我们来看看LPUSH和RPUSH命令的实现。
void lpushCommand(client *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
pushxGenericCommand(c,NULL,c->argv[2],LIST_HEAD);
}
void rpushCommand(client *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
pushxGenericCommand(c,NULL,c->argv[2],LIST_TAIL);
}
void pushGenericCommand(client *c, int where) {
int j, waiting = 0, pushed = 0;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
if (lobj && lobj->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}
for (j = 2; j < c->argc; j++) {
c->argv[j] = tryObjectEncoding(c->argv[j]);
if (!lobj) {
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
dbAdd(c->db,c->argv[1],lobj);
}
listTypePush(lobj,c->argv[j],where);
pushed++;
}
addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
}
这些命令的源码实现基本上大同小异,不过相对于其他数据类型,list提供了带有阻塞的命令,包括BLPOP,BRPOP,BLPOPRPUSH,这些命令可能会造成客户端被阻塞。这属于list的一大特色,也是需要着重理解的地方。
阻塞命令
前面提到,list为用户提供了三个带有阻塞模式的命令,分别是BLPOP,BRPOP,BLPOPRPUSH。那么,到底这些命令是如何执行,如何进行阻塞和解阻塞的呢?首先,我们来看看BLPOP,BRPOP的源码。
void blpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_HEAD);
}
void brpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_TAIL);
}
void blockingPopGenericCommand(client *c, int where) {
robj *o;
mstime_t timeout;
int j;
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
!= C_OK) return;
for (j = 1; j < c->argc-1; j++) {
o = lookupKeyWrite(c->db,c->argv[j]);
if (o != NULL) {
if (o->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
} else {
if (listTypeLength(o) != 0) {
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
robj *value = listTypePop(o,where);
serverAssert(value != NULL);
addReplyMultiBulkLen(c,2);
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
decrRefCount(value);
notifyKeyspaceEvent(NOTIFY_LIST,event,
c->argv[j],c->db->id);
if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[j],c->db->id);
}
signalModifiedKey(c->db,c->argv[j]);
server.dirty++;
rewriteClientCommandVector(c,2,
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
c->argv[j]);
return;
}
}
}
}
if (c->flags & CLIENT_MULTI) {
addReply(c,shared.nullmultibulk);
return;
}
blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}
从这段代码中,我们可以看出,当执行带有阻塞的pop命令时,有如下两种情况。
- 如果指定的list存在于当前数据库中且list不为空,则转而执行普通的pop操作
- 如果指定的list键不存在,或者该list为空,则执行阻塞操作
那么阻塞的过程是如下进行的呢?别急,我们去查看以下blockForKeys函数,看看它干了些什么。
void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
dictEntry *de;
list *l;
int j;
c->bpop.timeout = timeout;
c->bpop.target = target;
if (target != NULL) incrRefCount(target);
for (j = 0; j < numkeys; j++) {
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
incrRefCount(keys[j]);
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;
l = listCreate();
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
serverAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
listAddNodeTail(l,c);
}
blockClient(c,BLOCKED_LIST);
}
在上述代码中,设计到server.c中的一些数据结构。这里我简要的罗列一下。
typedef struct client {
redisDb *db;
blockingState bpop;
}
typedef struct blockingState {
mstime_t timeout;
dict *keys;
robj *target;
int numreplicas;
long long reploffset;
} blockingState;
typedef struct redisDb {
dict *blocking_keys;
} redisDb;
Redis采用了一个字典结构blocking_keys,其将所有造成阻塞的键,以及阻塞于该键的所有客户端的信息存放起来。执行完这些以后,就调用blockClient函数,真正的对该客户端进行阻塞。
那么,接下来要考虑的问题就是如何解阻塞,客户端不可能一直阻塞在那吧,是不是?由我们之前设定的参数,可以推测出来,有两种情况会对客户端进行解阻塞操作。
- 执行阻塞的时候,设置了超时参数,如果阻塞时长超过了该参数设定的时间,则自动对该客户端进行解阻塞
- 执行阻塞的时候,记录了所有造成客户端阻塞的键,那么如果有其他客户端执行命令,往造成阻塞的键里面添加了新值,这个时候Redis检查到该键中有值了,就会处理pop命令,也就是说,Redis采用先阻塞,后执行的策略来执行阻塞命令。
有了这些推测之后,我们就去push命令中找关于解阻塞的操作,一番查找之后,锁定了signalListAsReady函数,该函数在dbadd函数中执行。于是,跳转到signalListAsReady函数的源码。
void signalListAsReady(redisDb *db, robj *key) {
readyList *rl;
if (dictFind(db->blocking_keys,key) == NULL) return;
if (dictFind(db->ready_keys,key) != NULL) return;
rl = zmalloc(sizeof(*rl));
rl->key = key;
rl->db = db;
incrRefCount(key);
listAddNodeTail(server.ready_keys,rl);
incrRefCount(key);
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
此代码中有一点小小的疑惑,db->ready_keys
和server.ready_keys
这不重复了吗?为什么要设计这两个同样的结构。于是我们来查看以下它们的定义。
typedef struct redisDb {
dict *ready_keys;
} redisDb;
struct redisServer {
list *ready_keys;
}
typedef struct readyList {
redisDb *db;
robj *key;
} readyList;
Redis采用了一个链表和一个字典结构存放同一个key,想了想,这似乎也有道理。假设我们往一个key中添加多个新值时,Redis只需要在server.ready_keys
中为该key保存一个readyList节点即可,这样可以避免在一个事务或者脚本中将同一个key一次又一次的添加到server.ready_keys
中,为了避免不重复添加,Redis又采用一个链表结构db->ready_keys
来快速判断server.ready_keys
中是否存在该键。这样一来,既保证了不重复添加,又保证了哈希结构带来的查找效率。
好了,理解了这一点,我们继续往下剖析,在push操作的时候,只是回收了push进来的造成阻塞的键,如何利用这些信息对已经阻塞的客户端进行解阻塞呢?Redis在运行的过程中,会一直查看server.ready_keys
里是否有值,如果有则需要对存放的值对应的客户端进行接阻塞,此操作由handleClientsBlockedOnLists函数执行。
void handleClientsBlockedOnLists(void) {
while(listLength(server.ready_keys) != 0) {
list *l;
l = server.ready_keys;
server.ready_keys = listCreate();
while(listLength(l) != 0) {
listNode *ln = listFirst(l);
readyList *rl = ln->value;
dictDelete(rl->db->ready_keys,rl->key);
robj *o = lookupKeyWrite(rl->db,rl->key);
if (o != NULL && o->type == OBJ_LIST) {
dictEntry *de;
de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
while(numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
robj *dstkey = receiver->bpop.target;
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == blpopCommand) ?
LIST_HEAD : LIST_TAIL;
robj *value = listTypePop(o,where);
if (value) {
if (dstkey) incrRefCount(dstkey);
unblockClient(receiver);
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
where) == C_ERR)
{
listTypePush(o,value,where);
}
if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
} else {
break;
}
}
}
if (listTypeLength(o) == 0) {
dbDelete(rl->db,rl->key);
}
}
decrRefCount(rl->key);
zfree(rl);
listDelNode(l,ln);
}
listRelease(l);
}
}
剖析到此,整个阻塞操作的流程就都清晰明了了。如有疑惑,可以在留言区留言,咋们继续讨论。
List小结
本篇博客剖析list的主要接口,以及所有命令的实现,值得大家注意的是带阻塞的pop命令,这个在上文中有详细的实现过程,分析源码的过程就向探索迷宫一样,一步一步的把它藏在深处的秘密挖出来,坚持下去总会有收获。keep moving!明天继续按照预定的步骤分析!
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)