牛骨文教育服务平台(让学习变的简单)

      今天学习了Redis中比较高大上的名词,“发布订阅模式”,发布订阅模式这个词在我最开始接触听说的时候是在JMS(Java Message Service)java消息服务中听说的。这个名次用通俗的一点话说,就是我订阅了这类消息,当只有这类的消息进行广播发送的时候,我才会,其他的消息直接过滤,保证了一个高效的传输效率。下面切入正题,学习一下Redis是如何实现这个发布订阅模式的。先看看里面的简单的API构造;

/*-----------------------------------------------------------------------------
 * Pubsub low level API
 *----------------------------------------------------------------------------*/
void freePubsubPattern(void *p) /* 释放发布订阅的模式 */
int listMatchPubsubPattern(void *a, void *b) /* 发布订阅模式是否匹配 */
int clientSubscriptionsCount(redisClient *c) /* 返回客户端的所订阅的数量,包括channels + patterns管道和模式 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) /* Client订阅一个Channel管道 */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) /* 取消订阅Client中的Channel */
int pubsubSubscribePattern(redisClient *c, robj *pattern) /* Client客户端订阅一种模式 */
int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) /* Client客户端取消订阅pattern模式 */
int pubsubUnsubscribeAllChannels(redisClient *c, int notify) /* 客户端取消自身订阅的所有Channel */
int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) /* 客户端取消订阅所有的pattern模式 */
int pubsubPublishMessage(robj *channel, robj *message) /* 为所有订阅了Channel的Client发送消息message */

/* ------------PUB/SUB API ---------------- */
void subscribeCommand(redisClient *c) /* 订阅Channel的命令 */
void unsubscribeCommand(redisClient *c) /* 取消订阅Channel的命令 */
void psubscribeCommand(redisClient *c) /* 订阅模式命令 */
void punsubscribeCommand(redisClient *c) /* 取消订阅模式命令 */
void publishCommand(redisClient *c) /* 发布消息命令 */
void pubsubCommand(redisClient *c) /* 发布订阅命令 */

在这里面出现了高频的词Pattern(模式)和Channel(频道,叫管道比较别扭),也就是说,后续所有的关于发布订阅的东东都是基于这2者展开进行的。现在大致讲解一下在Redis中是如何实现此中模式的:
1.在RedisClient 内部维护了一个pubsub_channels的Channel列表,记录了此客户端所订阅的频道

2.在Server服务端,同样维护着一个类似的变量叫做,pubsub_channels,这是一个dict字典变量,每一个Channel对应着一批订阅了此频道的Client,也就是Channel-->list of Clients

3.当一个Client publish一个message的时候,会先去服务端的pubsub_channels找相应的Channel,遍历里面的Client,然后发送通知,即完成了整个发布订阅模式。

  我们可以简单的看一下Redis订阅一个Channel的方法实现;

/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed to that channel. */
/* Client订阅一个Channel管道 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    struct dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    //在Client的字典pubsub_channels中添加Channel
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        //添加Clietn到server中的pubsub_channels,对应的列表中
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
        	//如果此频道的Client列表为空,则创建新列表并添加
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
        	//否则,获取这个频道的客户端列表,在尾部添加新的客户端
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    //添加给回复客户端
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

添加操作主要分2部,Client自身的内部维护的pubsub_channels的添加,是一个dict字典对象,然后,是server端维护的pubsub_channels中的client列表的添加。在进行Channel频道的删除的时候,也是执行的这2步骤操作:

/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was not subscribed to the specified channel. */
/* 取消订阅Client中的Channel */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
    struct dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;

    /* Remove the channel from the client -> channels hash table */
    incrRefCount(channel); /* channel may be just a pointer to the same object
                            we have in the hash tables. Protect it... */
    //字典删除Client中pubsub_channels中的Channel
    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
        retval = 1;
        /* Remove the client from the channel -> clients list hash table */
        //再移除Channel对应的Client列表
        de = dictFind(server.pubsub_channels,channel);
        redisAssertWithInfo(c,NULL,de != NULL);
        clients = dictGetVal(de);
        ln = listSearchKey(clients,c);
        redisAssertWithInfo(c,NULL,ln != NULL);
        listDelNode(clients,ln);
        if (listLength(clients) == 0) {
            /* Free the list and associated hash entry at all if this was
             * the latest client, so that it will be possible to abuse
             * Redis PUBSUB creating millions of channels. */
            dictDelete(server.pubsub_channels,channel);
        }
    }
    /* Notify the client */
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReplyBulk(c,channel);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));

    }
    decrRefCount(channel); /* it is finally safe to release it */
    return retval;
}

里面还有对应的模式的订阅和取消订阅的操作,原理和channel完全一致,二者的区别在于,pattern是用来匹配的Channel的,这个是什么意思呢。在后面会做出答案,接着看。最后看一个最最核心的方法,客户端发步消息方法:

/* Publish a message */
/* 为所有订阅了Channel的Client发送消息message */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    struct dictEntry *de;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    //找到Channel所对应的dictEntry
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
    	//获取此Channel对应的客户单列表
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
        	//依次取出List中的客户单,添加消息回复
            redisClient *c = ln->value;

            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            //添加消息回复
            addReplyBulk(c,message);
            receivers++;
        }
    }
    /* Send to clients listening to matching channels */
    /* 发送给尝试匹配该Channel的客户端消息 */
    if (listLength(server.pubsub_patterns)) {
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
			
			//客户端的模式如果匹配了Channel,也会发送消息
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}

pattern的作用就在上面体现了,如果某种pattern匹配了Channel频道,则模式的客户端也会接收消息。在server->pubsub_patterns中,pubsub_patterns是一个list列表,里面的每一个pattern只对应一个Client,就是上面的pat->client,这一点和Channel还是有本质的区别的。讲完发布订阅模式的基本操作后,顺便把与此相关的notify通知类也稍稍讲讲,通知只有3个方法,

/* ----------------- API ------------------- */
int keyspaceEventsStringToFlags(char *classes) /* 键值字符类型转为对应的Class类型 */
sds keyspaceEventsFlagsToString(int flags) /* 通过输入的flag值类,转为字符类型*/
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) /* 发布通知方法,分为2类,keySpace的通知,keyEvent的通知 */

涉及到string To flag 和flag To String 的转换,也不知道这个会在哪里用到;

/* Turn a string representing notification classes into an integer
 * representing notification classes flags xored.
 *
 * The function returns -1 if the input contains characters not mapping to
 * any class. */
/* 键值字符类型转为对应的Class类型 */
int keyspaceEventsStringToFlags(char *classes) {
    char *p = classes;
    int c, flags = 0;

    while((c = *p++) != "") {
        switch(c) {
        case "A": flags |= REDIS_NOTIFY_ALL; break;
        case "g": flags |= REDIS_NOTIFY_GENERIC; break;
        case "$": flags |= REDIS_NOTIFY_STRING; break;
        case "l": flags |= REDIS_NOTIFY_LIST; break;
        case "s": flags |= REDIS_NOTIFY_SET; break;
        case "h": flags |= REDIS_NOTIFY_HASH; break;
        case "z": flags |= REDIS_NOTIFY_ZSET; break;
        case "x": flags |= REDIS_NOTIFY_EXPIRED; break;
        case "e": flags |= REDIS_NOTIFY_EVICTED; break;
        case "K": flags |= REDIS_NOTIFY_KEYSPACE; break;
        case "E": flags |= REDIS_NOTIFY_KEYEVENT; break;
        default: return -1;
        }
    }
    return flags;
}

应该是响应键盘输入的类型和Redis类型之间的转换。在notify的方法还有一个event事件的通知方法:

/* The API provided to the rest of the Redis core is a simple function:
 *
 * notifyKeyspaceEvent(char *event, robj *key, int dbid);
 *
 * "event" is a C string representing the event name.
 * "key" is a Redis object representing the key name.
 * "dbid" is the database ID where the key lives.  */
/* 发布通知方法,分为2类,keySpace的通知,keyEvent的通知 */ 
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
    sds chan;
    robj *chanobj, *eventobj;
    int len = -1;
    char buf[24];

    /* If notifications for this class of events are off, return ASAP. */
    if (!(server.notify_keyspace_events & type)) return;

    eventobj = createStringObject(event,strlen(event));
    
    //2种的通知形式,略有差别
    /* __keyspace@<db>__:<key> <event> notifications. */
    if (server.notify_keyspace_events & REDIS_NOTIFY_KEYSPACE) {
        chan = sdsnewlen("__keyspace@",11);
        len = ll2string(buf,sizeof(buf),dbid);
        chan = sdscatlen(chan, buf, len);
        chan = sdscatlen(chan, "__:", 3);
        chan = sdscatsds(chan, key->ptr);
        chanobj = createObject(REDIS_STRING, chan);
        //上述几步操作,组件格式字符串,最后发布消息,下面keyEvent的通知同理
        pubsubPublishMessage(chanobj, eventobj);
        decrRefCount(chanobj);
    }

    /* __keyevente@<db>__:<event> <key> notifications. */
    if (server.notify_keyspace_events & REDIS_NOTIFY_KEYEVENT) {
        chan = sdsnewlen("__keyevent@",11);
        if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
        chan = sdscatlen(chan, buf, len);
        chan = sdscatlen(chan, "__:", 3);
        chan = sdscatsds(chan, eventobj->ptr);
        chanobj = createObject(REDIS_STRING, chan);
        pubsubPublishMessage(chanobj, key);
        decrRefCount(chanobj);
    }
    decrRefCount(eventobj);
}

有keySpace和keyEvent的2种事件通知。具体怎么用,等后面碰到的时候在看看。