牛骨文教育服务平台(让学习变的简单)

        今天我是看完了t_list和t_string的代码,从名字知道,这也是和之前的t_hash非常类似的,无非就是各种形式,转化来转化去的。先讲讲t_list,对比于昨天的t_hash,t_hash是ziplist和dict之间的转换,t_list则是描述的是ziplist压缩表和linedlist普通链表,换句话说,当client这个robj作为参数传入的时候,都分为ENCODING_ZIIPLIST和ENCODING_LINKEDLIST 2类编码方式处理。还有一个在t_list出现的一个 比较新颖的东西是出现了锁的概念,操作系统的东西在这里也会碰到了,这里的代码也非常值得学习。下面,我们看看一般的t_list中包括的一些方法和命令:

/* List方法 */
/* list的操作分为2种,LINKEDLIST普通链表和ZIPLIST压缩列表的相关操作 */
void listTypeTryConversion(robj *subject, robj *value) /* 判断是否需要将ziplist转为linkedlist */
void listTypePush(robj *subject, robj *value, int where) /* 在头部或尾部插入value元素 */
robj *listTypePop(robj *subject, int where)  /* 在列表的头部或尾弹出元素 */
unsigned long listTypeLength(robj *subject) /* 列表的长度 */
listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) /* 返回列表迭代器,方向有头尾之分 */
void listTypeReleaseIterator(listTypeIterator *li) /* 释放列表迭代器 */
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) /* 根据列表迭代器,获取下一个元素 */
robj *listTypeGet(listTypeEntry *entry) /* 获取listType元素,有ziplist和linkedlist */
void listTypeInsert(listTypeEntry *entry, robj *value, int where) /* listType了类型插入元素操作 */
int listTypeEqual(listTypeEntry *entry, robj *o) /* 判断2个元素是否相等 */
void listTypeDelete(listTypeEntry *entry) /* listType类型删除元素 */
void listTypeConvert(robj *subject, int enc) /* listType类型的转换操作,这里指的是往linkedList上转 */
	
/* List的相关命令 */
void pushGenericCommand(redisClient *c, int where) /* 插入操作命令的原始操作 */
void lpushCommand(redisClient *c) /* 左边插入元素命令 */
void rpushCommand(redisClient *c) /* 右边插入元素命令 */
void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) /* 有返回状态的插入操作命令,假设首先都是能够实现插入命令的 */
void lpushxCommand(redisClient *c) /* 左边插入元素有返回消息的命令 */
void rpushxCommand(redisClient *c) /* 右边插入元素有返回消息的命令 */
void linsertCommand(redisClient *c) /* 列表指定位置插入元素操作命令 */
void llenCommand(redisClient *c) /* 列表返回长度命令 */
void lindexCommand(redisClient *c) /* 获取index位置的上的元素 */
void lsetCommand(redisClient *c) /* listType类型设置value命令 */
void popGenericCommand(redisClient *c, int where) /* 实现弹出操作的原始命令 */
void lpopCommand(redisClient *c) /* 左边弹出元素命令 */
void rpopCommand(redisClient *c) /* 右边弹出操作命令 */
void lrangeCommand(redisClient *c) /* 移动listType位置操作 */
void ltrimCommand(redisClient *c) /* listType实现截取操作,把多余范围的元素删除 */
void lremCommand(redisClient *c) /* 移除在listType中出现的与指定的元素相等的元素 */
void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) /* 元素从一个obj右边弹出,在从左侧推入另一个obj列表操作 */
void rpoplpushCommand(redisClient *c) /* 右边弹出,左边推入元素的命令 */
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) >/* 设置客户端为阻塞模式,并设置超时时间,当请求特定的key元素时 */
void unblockClientWaitingData(redisClient *c) /* 客户端解锁操作 */
void signalListAsReady(redisDb *db, robj *key) /* 将key存入server中,后续可以用于客户端的存取 */
int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where) /* 根据server,Client的key,value情况,判断server此时能否服务于Client,否则Client将被阻塞 */
void handleClientsBlockedOnLists(void) /* 服务端解除阻塞住的Client */
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) /* 获取超时时间 */ 
void blockingPopGenericCommand(redisClient *c, int where) /* 阻塞弹出命令的原始操作 */
void blpopCommand(redisClient *c) /* 左边弹出数据的阻塞式命令 */
void brpopCommand(redisClient *c) /* 右边弹出数据的阻塞式命令 */
void brpoplpushCommand(redisClient *c) /* 弹出推入阻塞式命令 */

看到这么多API,是否有被吓到的感觉,但里面其实很多的方法都很简单,我只挑几个比较典型的方法做一下分析,向什么返回长度了,获取,设置键值的大家可以自己看具体的方法,不难的。一般的t_list的处理流程,比如获取迭代器的方法:

/* Stores pointer to current the entry in the provided entry structure
 * and advances the position of the iterator. Returns 1 when the current
 * entry is in fact an entry, 0 otherwise. */
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
    /* Protect from converting when iterating */
    redisAssert(li->subject->encoding == li->encoding);

    entry->li = li;
    if (li->encoding == REDIS_ENCODING_ZIPLIST) {
        entry->zi = li->zi;
        if (entry->zi != NULL) {
            if (li->direction == REDIS_TAIL)
            	//根据方向调用pre或next的方法
                li->zi = ziplistNext(li->subject->ptr,li->zi);
            else
                li->zi = ziplistPrev(li->subject->ptr,li->zi);
            return 1;
        }
    } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
        entry->ln = li->ln;
        if (entry->ln != NULL) {
            if (li->direction == REDIS_TAIL)
            	//普通的列表的调用方式同上
                li->ln = li->ln->next;
            else
                li->ln = li->ln->prev;
            return 1;
        }
    } else {
        redisPanic("Unknown list encoding");
    }
    return 0;
}

很多API和t_hash的方法都是极其类似的,我就不多说了,我们将关注点,放在列表的锁控制上,先讲个前提,在redis客户端执行操作的时候,会有个请求超时时间,在这个请求的时间内,客户端如果没有找到key对应的数据,是会被阻塞的,什么意思呢,比如:

/* 设置客户端为阻塞模式,并设置超时时间,当请求特定的key元素时 */
/* 这个客户端阻塞的意思:当客户端请求list中某个特定key值时,如果key存在且列表非空,当然 */
/* 当然不会阻塞,正常返回数据,如果当客户端请求某个key不存在,或列表为empty的时候,客户端将被阻塞 */
/* 只有当有这个key被写入的时候,客户端才会被解锁 *

这个其实就是操作系统中的PV操作类似的原理,跟java里的wait(),signal()方法的模型是一样的,他的实现代码如下:

/* This is how the current blocking POP works, we use BLPOP as example:
 * - If the user calls BLPOP and the key exists and contains a non empty list
 *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
 *   if blocking is not required.
 * - If instead BLPOP is called and the key does not exists or the list is
 *   empty we need to block. In order to do so we remove the notification for
 *   new data to read in the client socket (so that we"ll not serve new
 *   requests if the blocking request is not served). Also we put the client
 *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
 *   blocking for this keys.
 * - If a PUSH operation against a key with blocked clients waiting is
 *   performed, we mark this key as "ready", and after the current command,
 *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
 *   for this list, from the one that blocked first, to the last, accordingly
 *   to the number of elements we have in the ready list.
 */

/* Set a client in blocking mode for the specified key, with the specified
 * timeout */
/* 设置客户端为阻塞模式,并设置超时时间,当请求特定的key元素时 */
/* 这个客户端阻塞的意思:当客户端请求list中某个特定key值时,如果key存在且列表非空,当然 */
/* 当然不会阻塞,正常返回数据,如果当客户端请求某个key不存在,或列表为empty的时候,客户端将被阻塞 */
/* 只有当有这个key被写入的时候,客户端才会被解锁 */		
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
    dictEntry *de;
    list *l;
    int j;

    //设置超时时间
    c->bpop.timeout = timeout;
    c->bpop.target = target;

    if (target != NULL) incrRefCount(target);

    for (j = 0; j < numkeys; j++) {
    	//下面为为找到的key上锁
        /* If the key already exists in the dict ignore it. */
        //如果此时,某些key已经存在了,直接忽略
        if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
        incrRefCount(keys[j]);

        /* And in the other "side", to map keys -> clients */
        //根据key找到对于请求该key的客户端,也就是将要被阻塞的Client
        de = dictFind(c->db->blocking_keys,keys[j]);
        if (de == NULL) {
            int retval;

            /* For every key we take a list of clients blocked for it */
            l = listCreate();
            //为c客户端添加一个阻塞的key
            retval = dictAdd(c->db->blocking_keys,keys[j],l);
            //增加key的引用次数
            incrRefCount(keys[j]);
            redisAssertWithInfo(c,keys[j],retval == DICT_OK);
        } else {
            l = dictGetVal(de);
        }
        listAddNodeTail(l,c);
    }

    /* Mark the client as a blocked client */
    /* 标记Client为阻塞的客户端 */
    c->flags |= REDIS_BLOCKED;
    //服务端的阻塞客户端计数递增
    server.bpop_blocked_clients++;
}

所有的key未来都会存在于server.readykeys里面,客户端判断自己所请求的key是否存在于服务端中的readykeys中,如果不存在就会被阻塞了。将key存入服务单的数据库的操作:

/* If the specified key has clients blocked waiting for list pushes, this
 * function will put the key reference into the server.ready_keys list.
 * Note that db->ready_keys is a hash table that allows us to avoid putting
 * the same key again and again in the list in case of multiple pushes
 * made by a script or in the context of MULTI/EXEC.
 *
 * The list will be finally processed by handleClientsBlockedOnLists() */
/* 将key存入server中,后续可以用于客户端的存取 */
void signalListAsReady(redisDb *db, robj *key) {
    readyList *rl;

    /* No clients blocking for this key? No need to queue it. */
    /* 如果没有客户端为此key所阻塞的,直接不添加 */
    if (dictFind(db->blocking_keys,key) == NULL) return;

    /* Key was already signaled? No need to queue it again. */
    /* 如果key已经存在,不不要重复添加 */
    if (dictFind(db->ready_keys,key) != NULL) return;

    /* Ok, we need to queue this key into server.ready_keys. */
    rl = zmalloc(sizeof(*rl));
    rl->key = key;
    rl->db = db;
    incrRefCount(key);
    //添加到list列表尾部
    listAddNodeTail(server.ready_keys,rl);

    /* We also add the key in the db->ready_keys dictionary in order
     * to avoid adding it multiple times into a list with a simple O(1)
     * check. */
    incrRefCount(key);
    redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}

其实这里我不免会有个小问题,为什么list出现Client被阻塞的操作,而在t_hash中不会出现类似的操作呢?略疑惑。

    下面说说t_string的用法,t_string里面没有涉及什么特定的结构体,就是最直接的键值对的设置,直接改变数据库中的键值,先列出里面的主要方法:

/* 方法 API */
static int checkStringLength(redisClient *c, long long size) /* 检查字符串长度是否超出限制最大长度,512*1024*1024个字节长度 */
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) /* 设置的泛型指令操作 */
void setCommand(redisClient *c) /* 设置的综合方法,设置都是根据Client中的参数而定 */
void setnxCommand(redisClient *c) /* key不存在的时候才设置值,flag为REDIS_SET_NX时 */
void setexCommand(redisClient *c) /* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为秒 */
void psetexCommand(redisClient *c) /* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为毫秒 */
int getGenericCommand(redisClient *c) /* 获取命令的泛型命令 */
void getCommand(redisClient *c) /* 获取命令的操作 */
void getsetCommand(redisClient *c) /* 获取set命令的操作 */
void setrangeCommand(redisClient *c) /* 设置rangge命令的操作 */
void getrangeCommand(redisClient *c) /* 获取range命令的操作 */
void mgetCommand(redisClient *c) /* 执行多次getCommand指令 */
void msetGenericCommand(redisClient *c, int nx) /* 一次运行多个设置操作泛型命令 */
void msetCommand(redisClient *c) /* 多次设置指令的非NX模式 */
void msetnxCommand(redisClient *c)  /* 多次设置指令的NX模式,即只在不存在的时候才设置 */
void incrDecrCommand(redisClient *c, long long incr) /* 增加或减少固定值的操作,减少其实就是增加-1 */
void incrCommand(redisClient *c) /* 递增1操作,incr递增设置为1 */
void decrCommand(redisClient *c) /* 递减1操作,incr递增设置为-1 */
void incrbyCommand(redisClient *c) /* 增加指令,从Client中获取递增值 */
void decrbyCommand(redisClient *c) /* 减少指令,从Client中获取减少值 */
void incrbyfloatCommand(redisClient *c) /* 增加float类型值的操作命令,在获取原始值的时候,获取的方法不一样,为getLongDoubleFromObjectOrReply*/
void appendCommand(redisClient *c) /* 追加命令操作,其实也是key:value的形式 */
void strlenCommand(redisClient *c) /* 获取命令长度 */

里面都是一些键值对的简单操作,有一个不同点是,里面的设置操作分为3种:

/* SET操作的一些FLAG */
#define REDIS_SET_NO_FLAGS 0
#define REDIS_SET_NX (1<<0)     /* Set if key not exists. */
#define REDIS_SET_XX (1<<1)     /* Set if key exists. */

如上面所说,nx模式指的是key不存在的时候才设置值,xx为存在的时候设置,这种模式设置有命令到期时间的限制,分为单位为秒级和毫秒级,而nx模式下,没有时间上的限制,调用的泛型方法:

void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply)

注意其中的expire,和时间单位unit:

/* key不存在的时候才设置值,flag为REDIS_SET_NX时 */
void setnxCommand(redisClient *c) {
    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,REDIS_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero);
}

/* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为秒 */
void setexCommand(redisClient *c) {
    c->argv[3] = tryObjectEncoding(c->argv[3]);
    setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL);
}

/* key存在的时候才设置值,flag为REDIS_SET_NO_FLAGS,命令到期时间单位为毫秒 */
void psetexCommand(redisClient *c) {
    c->argv[3] = tryObjectEncoding(c->argv[3]);
    setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL);
}

nx模式时expire和unit参数分别为NULL,0,所以我做出了如下猜想,可能是因为,当键值对存在的时候,考虑多并发设置的情况,有些Client可能被阻塞,所以有超时时间的存在,而key不存在的时候,就看谁更快了吧,就是直接设置。下面还有一个这个文件比较有意思的方法,有点批处理的味道,换句话说,一个方法里执行多次set操作:

/* 一次运行多个设置操作泛型命令 */
void msetGenericCommand(redisClient *c, int nx) {
    int j, busykeys = 0;

    /* 设置操作一定是成对出现的 */
    if ((c->argc % 2) == 0) {
        addReplyError(c,"wrong number of arguments for MSET");
        return;
    }
    /* Handle the NX flag. The MSETNX semantic is to return zero and don"t
     * set nothing at all if at least one already key exists. */
    if (nx) {
        for (j = 1; j < c->argc; j += 2) {
            if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
                busykeys++;
            }
        }
        
        //如果key存在,返回0
        if (busykeys) {
            addReply(c, shared.czero);
            return;
        }
    }

	//隔2个执行设置key操作指令一次
    for (j = 1; j < c->argc; j += 2) {
        c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
        setKey(c->db,c->argv[j],c->argv[j+1]);
        notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[j],c->db->id);
    }
    server.dirty += (c->argc-1)/2;
    addReply(c, nx ? shared.cone : shared.ok);
}

也不是什么特别的操作,应该redis编写者为了方便效率的提高吧。其他的一些方法,请读者可以学习t_string.c的文件,还有提醒一点,在redis数据库中,存储的形式都是K-V形式的,所有的获取,设置都是通过KEY的形式操作,你会看到很多这样的方法:

dbAdd(c->db,c->argv[1],new);
setKey(c->db,c->argv[j],c->argv[j+1]);