redis系列,redis是如何执行命令(一)

2023-11-01


前言

上篇文章介绍了sds的结构,和sds的使用方法,这章我们在回到读取io数据的地方来看,redis是如何从io 读取数据最后转化成执行命令的过程。
本篇文章需要先熟悉前面两篇文章,没看的同学需要退回看一下。

redis系列,redis网络,你得知道的一些事.

一、从io读取数据

在网络这个章节我们知道,我们知道通过把客户端对应fd注册到epoll,当有数据可读的时候最后会调用到以下这个方法

//这里开始看怎么从client 客户端把参数读取出来
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;

    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    // 延迟读,让主线程有更多空间处理别的事情,然后io 线程来帮助读取
    if (postponeClientRead(c)) return;

    /* Update total number of reads on server */
    //统计处理读的数目
    server.stat_total_reads_processed++;
    // 给每次读设置一个上限默认是  (1024*16)
    // 每次从io 最多读取16kB的数据
    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
     //multi bulk request 跟inline buffer不同是
     //inline process 不支持字符串里包含空格符号和回车符号的情况
     //所以一般客户端协议不会使用inline这种格式
     // 而是选用multbulk这种处理方式
     // 但是我们这里只讲命令的传输过程
     // 我们先用简单的方式来看下命令的执行过程
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);

        /* Note that the 'remaining' variable may be zero in some edge case,
         * for example once we resume a blocked client after CLIENT PAUSE. */
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }
    //获取当前querybuf的长度
    qblen = sdslen(c->querybuf);
    //应该是一个统计querybuf 的峰值长度,如果小于则更新
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    //为querybuf 分配更多的长度
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    //这个部分是客户端连接里面读取到数据,读入到querybuf,然后返回长度
    //qblen 代表querybuf已用部分,所以这里+qblen 就是把指针指向 内存还未写入的地方
    // c-conn 是struct connection 里面会有客户端的fd
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    //这里就是看状态,通过nread 变量来决定后续处理
    // -1的时候说明连接状态出了问题
    if (nread == -1) {
        //这里判断连接是否未就绪
        if (connGetState(conn) == CONN_STATE_CONNECTED) {
            return;
        } else {
            //回收client对象
            serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
            freeClientAsync(c);
            return;
        }
    } else if (nread == 0) {
        //nread==0 表示client 已经关闭
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClientAsync(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        //这里的意思是如果client 是master 同步过来的数据,
        // 那么将数据复制到pending_querybuf 里面,当最后一个命令被执行完
        //这个copy string 会被用到,但这里不是我们的主线暂时埋下一个坑
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    
    //增加query buffer的长度
    //这里主要扩容,上篇文章sds 有讲到过
    sdsIncrLen(c->querybuf,nread);
    //更新最近迭代时间用于超时处理
    //这个值应该是用于后续命令超时会用到
    //到这里基本上io数据已经读取到主线程内存里面来了
    c->lastinteraction = server.unixtime;
    //如果客户端是master
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    //更新已经读取的长度
    server.stat_net_input_bytes += nread;
    //这里会限制客户端连接buf的长度
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        //ci 和byte 都是用于打印客户端的信息,就是当query buffer 大于1g的时候
        //总共超过1g的时候,会进入强制进入回收流程,停止处理
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
        //下面这个方法都是为了打印参数做了一些截断转义处理。
        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        //打印完就释放内存
        sdsfree(ci);
        sdsfree(bytes);
        // 释放客户端信息
        freeClientAsync(c);
        return;
    }

    /* There is more data in the client input buffer, continue parsing it
     * in case to check if there is a full command to execute. */
     //tcp 并不能保证一次性把报文全部传递到服务端
     //也就是说现在读到的buffer 不一定是一个完整的命令
     processInputBuffer(c);
}

上面代码主要就是对sds的方法使用,以及各种边界条件的判断,其中可以看到主流程就是redis 把数据从io里面读取到client->querybuf 里面。为下一步解析命令在做准备。

还有就是我们可以看到上文提到了两种解析协议一种是inline,一种是multibulk,两种协议也做了说明,inline 主要是像redis-cli 这种做简单的调试和处理,而multibulk主要针对于其客户度支持参数的多样性,multibulk多个一些占位符替换的方式,让原本不支持的空格符号和回车符号,也能在redis里面正确表达出来。但在这里我们不对这个协议单独分析,先从最简单inline串通整个命令的执行过程

二、解析buf数据

现在我们得到一个字符串,但这个结构并不是一个好执行命令的结构,所以我们继续要转化成一个好执行命令的结构
我们继续分析代码:
network.c

/* This function is called every time, in the client structure 'c', there is
 * more query buffer to process, because we read more data from the socket
 * or because a client was blocked and later reactivated, so there could be
 * pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {
    /* Keep processing while there is something in the input buffer */
    // qb_pos 表示客户端buffer 已经读取到的位置,
    // 如果已读的pos比query buffer(客户端传过来的buf)总长度要小,那么执行下面流程。
    // 下面条件成立表示还有数据需要处理
    while(c->qb_pos < sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        //如果客户端已经暂停则跳出,等待下一次执行,
        // 但是一种例外是client来自于从服务器
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

        /* Immediately abort if the client is in the middle of something. */
        // 退出当客户端被其它情况阻塞住 比如rdb 
        if (c->flags & CLIENT_BLOCKED) break;

        /* Don't process more buffers from clients that have already pending
         * commands to execute in c->argv. */
        // 客户端等待执行命令阶段
        if (c->flags & CLIENT_PENDING_COMMAND) break;

        /* Don't process input from the master while there is a busy script
         * condition on the slave. We want just to accumulate the replication
         * stream (instead of replying -BUSY like we do with other clients) and
         * later resume the processing. */
        //slave服务器繁忙时也会暂时不处理从master收到的buffer
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;

        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands).
         *
         * The same applies for clients we want to terminate ASAP. */

        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        /* Determine request type when unknown. */
        // req type 为空
        if (!c->reqtype) {
            //判断是否是一个批处理操作
            if (c->querybuf[c->qb_pos] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                // inline请求
                c->reqtype = PROTO_REQ_INLINE;
            }
        }

        if (c->reqtype == PROTO_REQ_INLINE) {
            //这个地方是把从网络io读到的报文转化为robj 的指针数组
            if (processInlineBuffer(c) != C_OK) break;
            /* If the Gopher mode and we got zero or one argument, process
             * the request in Gopher mode. */
            //gopher 协议
            if (server.gopher_enabled &&
                ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
                  c->argc == 0))
            {
                processGopherRequest(c);
                resetClient(c);
                c->flags |= CLIENT_CLOSE_AFTER_REPLY;
                break;
            }
        //这里批处理命令的的地方转化参数
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* If we are in the context of an I/O thread, we can't really
             * execute the command here. All we can do is to flag the client
             * as one that needs to process the command. */
            // 这里就是根据状态,让多线程止步于此。
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }

            /* We are finally ready to execute the command. */
            //如果处理为error直接跳过
            //这里开始是执行命令的地方。
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid exiting this
                 * loop and trimming the client buffer later. So we return
                 * ASAP in that case. */
                return;
            }
        }
    }

    /* Trim to pos */
    //
    if (c->qb_pos) {
        //将处理完的sds 在这里处理调,留下未处理完的String
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }
}

//从这里开始把buffer 转化为参数,但是有可能现在还不能构成一个完整的命令,那么就会return error,
// 等待下一次执行。
int processInlineBuffer(client *c) {
    char *newline;
    int argc, j, linefeed_chars = 1;
    sds *argv, aux;
    size_t querylen;

    /* Search for end of line */
    //从上次结束的地方开始读,直到第一个回车符号结束
    //如果没有匹配到回车符号,则返回null,newline 是指向回车符号这个地址
    newline = strchr(c->querybuf+c->qb_pos,'\n');

    /* Nothing to do without a \r\n */
    if (newline == NULL) {
        //未读的数据不能超过1024*64 个字节, 即一行命令不能超过64kb,
        if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
            addReplyError(c,"Protocol error: too big inline request");
            setProtocolError("too big inline request",c);
        }
        return C_ERR;
    }

    /* Handle the \r\n case. */
    //处理不同的系统可能回车是\r\n的情况
    if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
        newline--, linefeed_chars++;

    /* Split the input buffer up to the \r\n */
    //这里返回的是新读的string 的长度。
    querylen = newline-(c->querybuf+c->qb_pos);
    //生成一个新的sds ,然后value 就是我们新读的这一行命令
    aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
    //将参数分解出来
    //变成了一个指针数组
    argv = sdssplitargs(aux,&argc);
    //释放aux 空间
    sdsfree(aux);
    if (argv == NULL) {
        addReplyError(c,"Protocol error: unbalanced quotes in request");
        setProtocolError("unbalanced quotes in inline request",c);
        return C_ERR;
    }

    /* Newline from slaves can be used to refresh the last ACK time.
     * This is useful for a slave to ping back while loading a big
     * RDB file. */
    //这里跟主服务器和从服务器同步数据有关系,我们到时还会回到这里继续分析,埋个点
    if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
        c->repl_ack_time = server.unixtime;

    /* Masters should never send us inline protocol to run actual
     * commands. If this happens, it is likely due to a bug in Redis where
     * we got some desynchronization in the protocol, for example
     * beause of a PSYNC gone bad.
     *
     * However the is an exception: masters may send us just a newline
     * to keep the connection active. */
    // 假如本服务器是slaver, slaver 是不会收到master inline格式的命令,只有一种情况就是保持连接活跃
    // 这个slave 会重新去连master,然后丢弃到现有的连接
    if (querylen != 0 && c->flags & CLIENT_MASTER) {
        serverLog(LL_WARNING,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master.");
        setProtocolError("Master using the inline protocol. Desync?",c);
        return C_ERR;
    }

    /* Move querybuffer position to the next query in the buffer. */
    //更新已读的字符串包括一些/r的一些情况
    c->qb_pos += querylen+linefeed_chars;

    /* Setup argv array on client structure */
    if (argc) {
        //如果argv 为空则释放空间
        if (c->argv) zfree(c->argv);
        //开始分配内存空间
        c->argv = zmalloc(sizeof(robj*)*argc);
    }

    /* Create redis objects for all arguments. */
    for (c->argc = 0, j = 0; j < argc; j++) {
        //所有的参数都转化为robj 这种结构,包括命令行
        c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
        c->argc++;
    }
    //回收空间
    zfree(argv);
    return C_OK;
}

robj *createObject(int type, void *ptr) {
    //分配空间
    robj *o = zmalloc(sizeof(*o));
    //类型 1,String 类型, 2, set 类型 ,3, sorted set 类型, 4, 字典类型
    o->type = type;
    // ecoding 有10个类型,后面将11类型,这里指的就是一个普通的string 类型来用
    o->encoding = OBJ_ENCODING_RAW;
    // 指针指向数据
    o->ptr = ptr;
	//引用初始化1
    o->refcount = 1;

    /* Set the LRU to the current lruclock (minutes resolution), or
     * alternatively the LFU counter. */
    //根据策略放入lru的时间戳或者lfu类型的时间戳
    if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
        o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
    } else {
        o->lru = LRU_CLOCK();
    }
    return o;
}

typedef struct redisObject {
    //类型 1,String 类型, 2, set 类型 ,3, sorted set 类型, 4, 字典类型 ,‘
    //一个只有4位的非负整数
    unsigned type:4;
    //encoding 有10个类型,具体见下面介绍,
    //后面会展开分析为什么要分type 和encoding
    unsigned encoding:4;
    //这个是24位的字段,选择lru 和lfu 的时候表达含义也不同,具体在lru和lfu 章节再回过头揭秘
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time). */
    //这个应该引用计数器 ,具体我们会回过头来再讨论
    int refcount;
    //指向具体的数据
    void *ptr;
} robj;
//原生string类型
#define OBJ_ENCODING_RAW 0     /* Raw representation */
//整型
#define OBJ_ENCODING_INT 1     /* Encoded as integer */
//hash 
#define OBJ_ENCODING_HT 2      /* Encoded as hash table */
//用ziplist 的map
#define OBJ_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
// 链表类型已经没有被用到
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
// zip list
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
// 整型 set
#define OBJ_ENCODING_INTSET 6  /* Encoded as intset */
// 跳表
#define OBJ_ENCODING_SKIPLIST 7  /* Encoded as skiplist */
// 压缩的sds类型
#define OBJ_ENCODING_EMBSTR 8  /* Embedded sds string encoding */
// 链表类型的ziplist
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
// stream 类型
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */

上面我们的源代码从一个raw sds 类型 最终转换成了我们redis object 类型,然后稍微提及了下redis object , 为我们后面讲数据结构会有所铺垫。

图解数据结构转变过程:

在这里插入图片描述

思考下为什么redis 作者不从阶段1直接解析到阶段3了,

  1. 因为从一个sds 字符串,转换成一个sds数组,其归属方法可以放在sds的里面的split的方法。这里就可以直接复用方法
  2. 第二个阶段也只要把单个sds,转换成redisObject结构即可。这样整体上是不耦合的。
    这里可以认为是职责分类,这样在代码处理方便也更为清晰一点,而且从性能来说只是临时多分配了一些内存,也不会影响太大。

三、解析命令流程

在进入执行命令的代码前,我们得首先来看下命令的结构,和命令的初始化。
server.h

struct redisCommand {
    //命令名字
    char *name;
    //命令处理的process
    redisCommandProc *proc;
    //对应命令arg的个数,正数表示查询,增加,修改等操作,负数的时候表示删除操作
    int arity;
    // 这个可以看做作这个命令的标签,
    // 每个标签通过空格进行分隔, 比如set 命令 ,
    // 这个值为 "write use-memory @string",
    // 表示这是一个写入,需要用到内存,格式为string的命令
    char *sflags;   /* Flags as string representation, one char per flag. */
    // 这个通过二进制的表现方式来表现出sflag的值
    uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
    /* Use a function to determine keys arguments in a command line.
     * Used for Redis Cluster redirect. */
    // 这个方法在redis cluster, 当不属于自己的键值请求到slot,
    // 怎么去重定向的方法。
    redisGetKeysProc *getkeys_proc;
    /* What keys should be loaded in background when calling this command? */
    // 表示第几个参数是 first key, 0 表示没有key
    int firstkey; /* The first argument that's a key (0 = no keys) */
    // 表示最后一个key是第几个参数,-1 的时候表示可以有n个key ,比如del命令
    // 1 表示firstkey 和lastkey 是同一个
    int lastkey;  /* The last argument that's a key */
    // 这个key step ,last key 我觉得要翻译成相邻key的距离
    int keystep;  /* The step between first and last key */
    // 这个应该是用于统计这个命令执行的时长,calls 是执行次数。
    long long microseconds, calls;
    //这个command 的id ,是一个自增的数字
    int id;     /* Command ID. This is a progressive ID starting from 0 that
                   is assigned at runtime, and is used in order to check
                   ACLs. A connection is able to execute a given command if
                   the user associated to the connection has this command
                   bit set in the bitmap of allowed commands. */
};

redisCommandTable 在java 里面类似一个枚举类,里面对redis命令进行了一些初始化的设值,下面省略了比较多的命令代码,具体的常用命令我们再接下来的文章再分析。
server.c

struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,
     "admin no-script",
     0,NULL,0,0,0,0,0,0},

    {"get",getCommand,2,
     "read-only fast @string",
     0,NULL,1,1,1,0,0,0},
     ........

下面这是命令行的初始化,用字典的形势存着命令结构,好让我们去通过客户端传过来的参数来进行匹配
server.c

void initServerConfig(void) {
......
   // 这个type 是什么我会在字典环节做讲解,你可以认为,redis作者对不同类型
   // 的字典给别的地方留下的接口
   // 已方便个性化的处理
    server.commands = dictCreate(&commandTableDictType,NULL);
    server.orig_commands = dictCreate(&commandTableDictType,NULL);
    populateCommandTable();
......

void populateCommandTable(void) {
    int j;
    int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);

    for (j = 0; j < numcommands; j++) {
        //循环遍历枚举table
        struct redisCommand *c = redisCommandTable+j;
        int retval1, retval2;

        /* Translate the command string flags description into an actual
         * set of flags. */
        // 将sflags 的标签用二进制表示出来
        if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
            serverPanic("Unsupported command flag");
        // 这里是跟权限相关
        // 后续在权限环节我们继续分析
        c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
        // 用字典建立了映射关系
        retval1 = dictAdd(server.commands, sdsnew(c->name), c);
        /* Populate an additional dictionary that will be unaffected
         * by rename-command statements in redis.conf. */
        //这里应该是支持一些别名的操作
        //不过别名操作是推荐不使用的
        retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
        serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
    }
}


有了以上的基础后我们继续接着看命令是如何执行的环节

network.c

/* This function calls processCommand(), but also performs a few sub tasks
 * for the client that are useful in that context:
 *
 * 1. It sets the current client to the client 'c'.
 * 2. calls commandProcessed() if the command was handled.
 *
 * The function returns C_ERR in case the client was freed as a side effect
 * of processing the command, otherwise C_OK is returned. */
int processCommandAndResetClient(client *c) {
    int deadclient = 0;
    //因为是单线程所以通过赋值给current_client 能够知道当前正在处理的client
    server.current_client = c;
    //processCommand 是执行命令的主要方法,
    // 所有执行命令的入口都会调用它
    if (processCommand(c) == C_OK) {
        // 这里是命令执行完的后续操作
        // 这里会涉及到比较多的主从同步的知识,后续会继续从主从同步环节来讲
        commandProcessed(c);
    }
    //如果client 被释放 就会出现deadclient 
    if (server.current_client == NULL) deadclient = 1;
    server.current_client = NULL;
    /* freeMemoryIfNeeded may flush slave output buffers. This may
     * result into a slave, that may be the active client, to be
     * freed. */
    
    return deadclient ? C_ERR : C_OK;
}

处理命令主要在processCommand 这个方法
server.c

/* If this function gets called we already read a whole
 * command, arguments are in the client argv/argc fields.
 * processCommand() execute the command or prepare the
 * server for a bulk read from the client.
 *
 * If C_OK is returned the client is still alive and valid and
 * other operations can be performed by the caller. Otherwise
 * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
//
int processCommand(client *c) {
    //这里是一些加载的模块的filter ,暂时先跳过这里
    moduleCallCommandFilters(c);

    /* The QUIT command is handled separately. Normal command procs will
     * go through checking for replication and QUIT will cause trouble
     * when FORCE_REPLICATION is enabled and would be implemented in
     * a regular command proc. */
    //第一个参数一般就是命令关键字
    //quit 关键字比较特殊,会进入检查同步机制,造成一些问题,
    // 但这里注释也没说会出现啥问题,
    // 后面会回过头来再继续分析等整个
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }

    /* Now lookup the command and check ASAP about trivial error conditions
     * such as wrong arity, bad command name and so forth. */
    //转换成命令的结构
    //这里转换命令的地方
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        //这里就是当找不到对应的命令
        //返回错误信息给上一级处理
        sds args = sdsempty();
        int i;
        for (i=1; i < c->argc && sdslen(args) < 128; i++)
            args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
        rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
            (char*)c->argv[0]->ptr, args);
        sdsfree(args);
        return C_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
         //参数个数不匹配          
        rejectCommandFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return C_OK;
    }
    /* "write" flag */
    //是否是一个写操作
    int is_write_command = (c->cmd->flags & CMD_WRITE) ||
                           (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
       /* "use-memory" flag */  
    //是否拒绝用到内存                    
    int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
                             (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
    // 表示这个命令是能访问到脏数据的
    int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
                               (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
   // 是否是拒绝在redis loading 状态执行的命令
    int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
                                 (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));

    /* Check if the user is authenticated. This check is skipped in case
     * the default user is flagged as "nopass" and is active. */
    // 权限相关的后续再讨论
    int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
                          (DefaultUser->flags & USER_FLAG_DISABLED)) &&
                        !c->authenticated;
    if (auth_required) {
        /* AUTH and HELLO and no auth modules are valid even in
         * non-authenticated state. */
        if (!(c->cmd->flags & CMD_NO_AUTH)) {
            rejectCommand(c,shared.noautherr);
            return C_OK;
        }
    }

    /* Check if the user can run this command according to the current
     * ACLs. */
    //权限相关的逻辑
    int acl_keypos;
    int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
    if (acl_retval != ACL_OK) {
        addACLLogEntry(c,acl_retval,acl_keypos,NULL);
        if (acl_retval == ACL_DENIED_CMD)
            rejectCommandFormat(c,
                "-NOPERM this user has no permissions to run "
                "the '%s' command or its subcommand", c->cmd->name);
        else
            rejectCommandFormat(c,
                "-NOPERM this user has no permissions to access "
                "one of the keys used as arguments");
        return C_OK;
    }

    /* If cluster is enabled perform the cluster redirection here.
     * However we don't perform the redirection if:
     * 1) The sender of this command is our master.
     * 2) The command has no key arguments. */
    //cluster 相关
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
          c->cmd->proc != execCommand))
    {
        int hashslot;
        int error_code;
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                        &hashslot,&error_code);
        if (n == NULL || n != server.cluster->myself) {
            if (c->cmd->proc == execCommand) {
                discardTransaction(c);
            } else {
                flagTransaction(c);
            }
            clusterRedirectClient(c,n,hashslot,error_code);
            return C_OK;
        }
    }

    /* Handle the maxmemory directive.
     *
     * Note that we do not want to reclaim memory if we are here re-entering
     * the event loop since there is a busy Lua script running in timeout
     * condition, to avoid mixing the propagation of scripts with the
     * propagation of DELs due to eviction. */
    //这里会检查内存溢出的时候的操作,继续埋坑,
    // 每次再执行命令前我们都会判断是否有足够的内存。
    // 这里也是执行lfu,lru的入口
    if (server.maxmemory && !server.lua_timedout) {
        int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
        /* freeMemoryIfNeeded may flush slave output buffers. This may result
         * into a slave, that may be the active client, to be freed. */
        if (server.current_client == NULL) return C_ERR;

        int reject_cmd_on_oom = is_denyoom_command;
        /* If client is in MULTI/EXEC context, queuing may consume an unlimited
         * amount of memory, so we want to stop that.
         * However, we never want to reject DISCARD, or even EXEC (unless it
         * contains denied commands, in which case is_denyoom_command is already
         * set. */
        if (c->flags & CLIENT_MULTI &&
            c->cmd->proc != execCommand &&
            c->cmd->proc != discardCommand) {
            reject_cmd_on_oom = 1;
        }

        if (out_of_memory && reject_cmd_on_oom) {
            rejectCommand(c, shared.oomerr);
            return C_OK;
        }

        /* Save out_of_memory result at script start, otherwise if we check OOM
         * untill first write within script, memory used by lua stack and
         * arguments might interfere. */
        if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) {
            server.lua_oom = out_of_memory;
        }
    }

    /* Make sure to use a reasonable amount of memory for client side
     * caching metadata. */
    // 这里应该是跟客户端缓存的关系,后续再来看这一块
    if (server.tracking_clients) trackingLimitUsedSlots();

    /* Don't accept write commands if there are problems persisting on disk
     * and if this is a master instance. */
    // 如果写入文件出现问题,比如rdb 错误 aof 错误都会阻止写入命令的操作
    int deny_write_type = writeCommandsDeniedByDiskError();
    if (deny_write_type != DISK_ERROR_TYPE_NONE &&
        server.masterhost == NULL &&
        (is_write_command ||c->cmd->proc == pingCommand))
    {
        if (deny_write_type == DISK_ERROR_TYPE_RDB)
            rejectCommand(c, shared.bgsaveerr);
        else
            rejectCommandFormat(c,
                "-MISCONF Errors writing to the AOF file: %s",
                strerror(server.aof_last_write_errno));
        return C_OK;
    }

    /* Don't accept write commands if there are not enough good slaves and
     * user configured the min-slaves-to-write option. */
    //这里可以到从服务没准备好是不能执行写入类型的命令
    if (server.masterhost == NULL &&
        server.repl_min_slaves_to_write &&
        server.repl_min_slaves_max_lag &&
        is_write_command &&
        server.repl_good_slaves_count < server.repl_min_slaves_to_write)
    {
        rejectCommand(c, shared.noreplicaserr);
        return C_OK;
    }

    /* Don't accept write commands if this is a read only slave. But
     * accept write commands if this is our master. */
    // 如果从服务器是一个只读slave,那么写入命令就不能被执行
    if (server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) &&
        is_write_command)
    {
        rejectCommand(c, shared.roslaveerr);
        return C_OK;
    }

    /* Only allow a subset of commands in the context of Pub/Sub if the
     * connection is in RESP2 mode. With RESP3 there are no limits. */
    // client 处于pub sub 状态。那就只能执行pub sub相关的命令
    if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        rejectCommandFormat(c,
            "Can't execute '%s': only (P)SUBSCRIBE / "
            "(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
            c->cmd->name);
        return C_OK;
    }

    /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
     * when slave-serve-stale-data is no and we are a slave with a broken
     * link with master. */
    // 如果是从服务器的话,且与主服务器断连状态,且命令类型(不属于脏数据允许)
    // 如info 这种类型命令就可以执行,set 就不允许执行
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
        server.repl_serve_stale_data == 0 &&
        is_denystale_command)
    {
        rejectCommand(c, shared.masterdownerr);
        return C_OK;
    }

    /* Loading DB? Return an error if the command has not the
     * CMD_LOADING flag. */
    // 当db处于loadding 状态 ,只有特定命令能执行
    if (server.loading && is_denyloading_command) {
        rejectCommand(c, shared.loadingerr);
        return C_OK;
    }

    /* Lua script too slow? Only allow a limited number of commands.
     * Note that we need to allow the transactions commands, otherwise clients
     * sending a transaction with pipelining without error checking, may have
     * the MULTI plus a few initial commands refused, then the timeout
     * condition resolves, and the bottom-half of the transaction gets
     * executed, see Github PR #7022. */
    //执行lua script 太慢的情况下,只允许以下几种命令操作。
    if (server.lua_timedout &&
          c->cmd->proc != authCommand &&
          c->cmd->proc != helloCommand &&
          c->cmd->proc != replconfCommand &&
          c->cmd->proc != multiCommand &&
          c->cmd->proc != discardCommand &&
          c->cmd->proc != watchCommand &&
          c->cmd->proc != unwatchCommand &&
        !(c->cmd->proc == shutdownCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
        !(c->cmd->proc == scriptCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
    {
        rejectCommand(c, shared.slowscripterr);
        return C_OK;
    }

    /* Exec the command */
    //执行了multi 命令那么 client状态就变成了
    //client multi, 这个除开exec,discard,multi,watch这个命令
    //剩下的命令都会放入queue
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        //在这里执行multi操作
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        //这里是整个执行命令地方的逻辑
        call(c,CMD_CALL_FULL);
        // 这里是和同步数据有关系
        c->woff = server.master_repl_offset;
        //BLPOP 相关操作会执行的事情
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}
//call 方法是整个执行方法的核心方法,我们后续会回过头继续讲解。
void call(client *c, int flags) {
    long long dirty;
    ustime_t start, duration;
    int client_old_flags = c->flags;
    struct redisCommand *real_cmd = c->cmd;

    server.fixed_time_expire++;

    /* Send the command to clients in MONITOR mode if applicable.
     * Administrative commands are considered too dangerous to be shown. */
    if (listLength(server.monitors) &&
        !server.loading &&
        !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
    {
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }

    /* Initialization: clear the flags that must be set by the command on
     * demand, and initialize the array for additional commands propagation. */
    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    redisOpArray prev_also_propagate = server.also_propagate;
    redisOpArrayInit(&server.also_propagate);

    /* Call the command. */
    dirty = server.dirty;
    updateCachedTime(0);
    start = server.ustime;
    // 这里会调用每个命令的process
    c->cmd->proc(c);
   .......

可以看到不同的命令在服务不同状态下,其能否执行都会有不同的判断,我们看到命令对应的标签在这里被使用到了。当然这里展开有非常多的分支。要想看懂这边全部的代码,我们需要从一些常用的命令开始分析。

总结

这篇文章主要给出了从io读取到buffer之后到如何执行命令的逻辑,但是还有一些细节没有完全诠释,因为本篇博文主要的目的是io处理的后续,到执行方法的一个过程。其具体执行逻辑我们会在后续的文章在每个模块细讲。
我们从这篇博文能看到redis 作者对于模版模式,命令模式的完美理解,使得代码非常有层次,也便于后续的一些扩展。
此篇文章主要是为后续各种命令操作做铺垫,以及主从同步里面涉及到的逻辑,都会反复会到这里来验证。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

redis系列,redis是如何执行命令(一) 的相关文章

  • 在哈希图中存储字符和二进制数

    我正在尝试存储字母到二进制数的映射 这是我的映射 h 001 i 010 k 011 l 100 r 101 s 110 t 111 为此 我创建了一个哈希映射并存储了键值对 我现在想显示给定句子的相应二进制值 这是我的代码 package
  • JBoss AS 5 中的共享库应该放在哪里?

    我是 Jboss 新手 但我有多个 Web 应用程序 每个应用程序都使用 spring hibernate 和其他开源库和 portlet 所以基本上现在每个 war 文件都包含这些 jar 文件 如何将这些 jar 移动到一个公共位置 以
  • org.postgresql.util.PSQLException:协议错误。会话设置失败

    我知道这些类型的问题已经存在 但提供的解决方案对我不起作用 在我的应用程序中 没有版本不匹配的黑白驱动程序和 PostgreSQL 服务器 我还没有找到任何其他解决方案 我正在使用 PostgreSQL 服务器 9 4 和 postgres
  • BigDecimal 的 JPA @Size 注释

    我该如何使用 SizeMySQL 的注释DECIMAL x y 列 我在用着BigDecimal 但是当我尝试包括 Size max它不起作用 这是我的代码 Size max 7 2 Column name weight private B
  • 通过 JNI 从 Applet 调用 DLL

    我有一个 概念验证 的作品 它跨越了一些不熟悉的领域 我的任务是将 EFTPOS 机器连接到在内联网浏览器中作为小程序运行的应用程序 我暂时忽略了 EFTPOS dll 并用我选择的语言 Delphi 创建了一个简单的 JNI 修饰的 DL
  • 带有面板的 Java Swing JToolbar:外观和感觉

    我有一个JToolbar其中包含多个JPanels 需要 因为我希望每个都有特定的边界 不幸的是 外观管理器无法识别JPanels属于工具栏和JButtons因此 渲染器与普通按钮一样 即没有工具栏上的特殊鼠标悬停效果 更换JPanels
  • 如何在Gradle中支持多种语言(Java和Scala)的多个项目?

    我正在尝试将过时的 Ant 构建转换为 Gradle 该项目包含约50个Java子项目和10个Scala子项目 Java 项目仅包含 Java Scala 项目仅包含 Scala 每个项目都是由 Java 和 Scala 构建的 这大大减慢
  • JavaFx 中装饰且不可移动的舞台

    我想在 JavaFx 中创建一个装饰舞台 它也将不可移动 我正在从另一个控制器类创建这个阶段 我能够创造和展示舞台 但它是自由移动的 我怎样才能创建这个 非常感谢帮助和建议 我把打开新关卡的方法贴出来 private void addRec
  • 如何使用 Spring MVC 和 Thymeleaf 添加静态文件

    我的问题是如何添加 CSS 和图像文件等静态文件 以便我可以使用它们 我正在使用 Spring MVC 和 Thymeleaf 我查看了有关此主题的各种帖子 但它们对我没有帮助 所以我才来问 根据这些帖子 我将 CSS 和图像文件放在res
  • 中间件 API 的最佳实践是什么? [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我们正在开发一个中间件 SDK 采用 C 和 Java 语言 供游戏开发人员 动画软件开发人员 阿凡达开
  • Scala(或 Java)中泛型函数的特化

    是否可以在 Scala 中专门化泛型函数 或类 例如 我想编写一个将数据写入 ByteBuffer 的通用函数 def writeData T buffer ByteBuffer data T buffer put data 但由于 put
  • 如何在命令提示符中检查 JAVA_OPTS 值?

    我们的应用程序部署 JBoss 服务器然后抛出错误 PermGen space 然后在 jboss bat 和配置文件中设置 permgen 变量中的 java OPTS JAVA OPTs 中是否有值 assige 如何检查 如何在命令提
  • MessageDigest MD5 算法未返回我期望的结果

    我脑后的某个东西告诉我 我在这里遗漏了一些明显的东西 我正在将现有的 java 项目与第三方 api 集成 该第三方 api 使用 api 密钥的 md5 哈希进行身份验证 它对我不起作用 在调试过程中我意识到我生成的哈希值与他们提供的示例
  • 膨胀类片段 InflateException 二进制 XML 文件时出错

    我正在使用 Material Design 和 NavigationDrawer 布局等设计我的第一个应用程序 但我遇到了一个问题 该应用程序非常简单 它只显示文本 并且基于 Android Studio 中提供的模板 尝试启动我的应用程序
  • C++ 中的 Java ArrayList [重复]

    这个问题在这里已经有答案了 在Java中我可以做 List
  • 在 netBeans 中运行程序时,字体看起来非常奇怪

    我在我的新 MacBook M1 上设置了 netBeans 和 SceneBuilder 除了运行程序时的字体外 一切正常 它看起来像这样 我不知道为什么 按钮应显示 Click me 标签应显示 Hello 我收到的错误消息是 M rz
  • 无法仅在控制台中启动 androidstudio

    你好 我的问题是下一个 我下载了Android Studio如果我去 路径 android studio bin 我执行studio sh 我收到以下错误 No JDK found Please validate either STUDIO
  • 如何使用 SAX Java 解析器读取注释文本

    我只想使用 Java 中的 SAX 解析器读取 XML 文件中对象标记的注释 这是我的文件的摘要
  • 当我在 Java 中输入 IP 时无法连接到我的服务器

    好的 我正在尝试学习 Java 客户端 服务器的内容 并且正在浏览教程代码 如下所示 当我将 localhost 更改为我的 IP 时 它会停止工作 请帮忙 编辑 127 0 0 1 似乎也可以工作 但不是我的真实IP Copyright
  • 条件查询:按计数排序

    我正在尝试执行一个标准查询 该查询返回 stackoverflow 中回答最多的问题 例如常见问题解答 一个问题包含多个答案 我正在尝试使用标准查询返回按每个问题的答案数排序的回答最多的问题 任何人都知道我应该在 hibernate cri

随机推荐

  • (4)各个属性角色分析显示-4

    将折线图 数据集 散点图集合在一个html文件中 1 将折线图 数据集 散点图设置为函数a b c 2 再调用page add 函数 将三个图片组合在一起 3 运行page render x html 函数 将该页面命名 打开html文件
  • Java线程中处理运行时异常(UncaughtExceptionHandler)

    线程在执行单元中不允许抛出checked异常 而且线程运行在自己的上下文中 派生它的线程无法直接获得它运行中出现的异常信息 对此 Java为我们提供了UncaughtExceptionHandler接口 当线程在运行过程中出现异常时 会回调
  • ElasticSearch多字段查询best_fields、most_fields和cross_fields理解

    基于elasticsearch7 6 1 和 kibana7 6 1 本文通过案例进行讲解 希望读者耐心阅读 一 介绍 字段中心查询式 就是以字段为中心 代表就是 best fields和most fields 把所有的字段全都散列 然后从
  • 详解曼哈顿距离&欧式距离&切比雪夫距离

    详解曼哈顿 欧式距离 切比雪夫距离 曼哈顿距离 基本概念 出租车几何或曼哈顿距离 Manhattan Distance 是由十九世纪的赫尔曼 闵可夫斯基所创词汇 是种使用在几何度量空间的几何学用语 用以标明两个点在标准坐标系上的绝对轴距总和
  • 基于32单片机的16通道ADC的数据采集

    基于32单片机的16通道ADC的数据采集 这个部分的内容 是作为外部模拟量部分的采集工作 按照任务要求 所设计的方案 需要完成以下指标 ADC必须能采集16通道的模拟量 ADC的分辨率是16bit 采样率不小于20khz 由此分析可以得出
  • 请使用正确的入口登录面板—解决方案

    错误信息提示 解决方案 一 找回安全登录地址 宝塔登录地址 http 你的服务器ip 8888 但是这种格式是不安全的 目前新安装的宝塔面板默认都开启了安全目录登录 所以如果使用这种不带有8位字符随机码的登录地址就会提示 请使用正确的入口登
  • 【AI】《动手学-深度学习-PyTorch版》笔记(二十一):目标检测

    AI学习目录汇总 1 简述 通过前面的学习 已经了解了图像分类模型的原理及实现 图像分类是假定图像中只有一个目标 算法上是对整个图像做的分类 下面我们来学习 目标检测 即从一张图像中找出需要的目标 并标记出位置 2 边界框 边界框 boun
  • xctf攻防世界—Web新手练习区robots单题思路

    xctf攻防世界 Web新手练习区robots单题思路 邱邱邱自强 前言 随着互联网的发展 互联网界的行为也越来越被重视 那么国际互联网界通行的道德规范是什么呢 它就是Robots协议 一 Robots协议是什么 robots协议也叫rob
  • 点击Path环境变量编辑不展开的问题

    分析 将 MYSQL HOME bin移动到 JAVA HOME bin的上面 点击确定 再次点击Path环境变量就会出现编辑不展开的问题 如图所示 分析原因 因为我把两个 MYSQL HOME bin和 JAVA HOME bin一起放在
  • zip、unzip命令使用

    1 zip压缩命令 1 压缩文件 zip test test txt 将text txt文件压缩到test zip文件中 2 压缩文件夹 r zip r attack zip attack 将当前路径下attack文件夹中的文件进行压缩 压
  • assert在debug 和 release版本中的区别

    转自 https blog csdn net panfengsoftware article details 8910468 debug版本与release的不同点 debug版本中含有调试信息 不会对程序进行优化 assert相应的宏会被
  • python-一些坑点

    一些python使用中遇到的坑点 记录一下 同样的问题也可能只是对当前我的环境下有作用 AttributeError module urllib has no attribute splittype 使用urllib中的一些工具时 提示这个
  • FPGA中task语法基本使用

    1 task定义为任务 完成的是某个具体功能 可以在initial语句和always语句中使用 不过initial语句使用较多 2 task如何使用 1 定义任务 task 任务名 端口及数据类型声明语句 语句1 语句2 语句n endta
  • Qt 3D的研究(三):显示3D模型

    Qt 3D的研究 三 显示3D模型 上一篇文章介绍了如何使用最少的代码创建一个Qt 3D的应用 和大家最初接触的glut一样 对于3D应用来说 需要做的准备工作还真不少 不过呢 Qt 3D把一些窗口相关的琐碎事情解决了 剩下的 该由我们完成
  • 修复nanopi2的SPI无法使用50MHZ传输的问题(S5P4418)

    关于S5P4418使用SPI DMA传输时出现的超时问题 一 问题背景 二 启用SPI的DMA传输 2 1 修改cfg main h 文件 2 2 make menuconfig 配置SPI 2 3 修改SPI主机驱动代码 2 4 增加设备
  • vue cmd 创建新项目在指定文件夹

    1 cmd 进入 后 转到指定目录 cd D 2 创建vue新项目 vue create test
  • 基于minikube搭建的SpringBoot实战

    现在比较多的互联网公司都在尝试将微服务迁到云上 这样的能够通过一些成熟的云容器管理平台更为方便地管理微服务集群 从而提高微服务的稳定性 同时也能较好地提升团队开发效率 但是迁云存在一定的技术难点 今天这篇文章主要介绍如何从0开始搭建一套基于
  • 我的世界服务器修改空岛范围,我的世界空岛指令权限大全

    发布时间 2016 08 07 我的世界ess指令是什么 我的世界ess指令在ess插件运行中十分重要的一部分 那么今天小编就为大家带来了我的世界ess指令用法大全 一起看看吧 我的世界ess指令 Essentials插件 用户组权限管理插
  • OTA:目标检测的最优运输分配

    引言 该论文主要是关于目标检测中的标签分配问题 作者创新性地从全局的角度重新审视了该问题 并提出将标签分配问题看成是一个最优运输问题 要知道最优传输问题是当前最优化理论和GAN理论研究领域中的一个很火的研究课题 论文的实验效果俱佳 而且作者
  • redis系列,redis是如何执行命令(一)

    文章目录 前言 一 从io读取数据 二 解析buf数据 三 解析命令流程 总结 前言 上篇文章介绍了sds的结构 和sds的使用方法 这章我们在回到读取io数据的地方来看 redis是如何从io 读取数据最后转化成执行命令的过程 本篇文章需