redis的发布订阅功能由publish|subscribe|psubscribe等命令组成。

频道的订阅与退订

subscribe

当客户端执行subscribe 命令订阅一个或多个频道时,这个客户端与被订阅的频道建就建立了一种订阅关系。

redis中将所有频道的订阅关系存储在服务器状态pubsub_channels中,键为被订阅频道,值为记录了所有订阅此频道的客户端。:

1
2
3
4
5
6
7
8
struct redisServer {
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
dict *pubsub_patterns; /* A dict of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of NOTIFY_... flags. */
dict *pubsubshard_channels; /* Map shard channels to list of subscribed clients */
}

同时,客户端上也会记录这一信息

1
2
3
4
5
typedef struct client {
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
dict *pubsub_patterns; /* patterns a client is interested in (PSUBSCRIBE) */
dict *pubsubshard_channels; /* shard level channels a client is interested in (SSUBSCRIBE) */
}

订阅过程的实现就是单链表的插入过程:

  1. 检查频道中是否有其他订阅者
  2. 如果没有,则创建一个键值对,值为空链表,然后将当前客户端加到链表中
  3. 如果有,则将当前客户端加入到链表为

unscbscribe

当一个客户端退订一个或多个频道时,将从pubsub_channels中解除客户端与该退订频道之间的关联。
操作过程即遍历pubsub_channels,对每个频道上的链表做删除操作。

模式的订阅与退订

与频道订阅类似,redis客户端通过psubscribe命令订阅模式,并将模式的订阅关系保存在服务器的pubsub_patterns状态中。在redis2.x中,这一结构保存在pubsubPattern结构中:

1
2
3
4
typedef struct pubsubPattern{
redisClient *client; // 记录模式订阅的客户端
robj *pattern; // 记录被订阅的模式
}

但是在当前版本中redis 6.x已经被修改为了dict。

psubscribe

在新版本中,其操作和subscribe一致,将模式名作为key,使用链表记录订阅模式的客户端。
具体执行逻辑即链表的插入操作

punsubscribe

客户端执行此命令时,服务器将会剔除掉pubsub_patterns中维护的模式订阅关系,具体操作即对链表进行删除操作。

发送消息

当客户端执行publish <channel> <message>命令将休息message发送给频道channel时,服务器需要执行如下两个动作:

  1. 将消息发送给 channel频道的所有订阅者。
  2. 检测是否频道所符合的模式,如果匹配,则将消息发送给这些订阅了这些模式的订阅者。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/*
* Publish a message to all the subscribers.
*/
int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
int receivers = 0;
dictEntry *de;
dictIterator *di;
listNode *ln;
listIter li;

/* Send to clients listening for that channel */
de = dictFind(*type.serverPubSubChannels, channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;

listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReplyPubsubMessage(c,channel,message,*type.messageBulk);
updateClientMemUsageAndBucket(c);
receivers++;
}
}

if (type.shard) {
/* Shard pubsub ignores patterns. */
return receivers;
}

/* Send to clients listening to matching channels */
di = dictGetIterator(server.pubsub_patterns);
if (di) {
channel = getDecodedObject(channel);
while((de = dictNext(di)) != NULL) {
robj *pattern = dictGetKey(de);
list *clients = dictGetVal(de);
if (!stringmatchlen((char*)pattern->ptr,
sdslen(pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) continue;

listRewind(clients,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
addReplyPubsubPatMessage(c,pattern,channel,message);
updateClientMemUsageAndBucket(c);
receivers++;
}
}
decrRefCount(channel);
dictReleaseIterator(di);
}
return receivers;
}

消息发布

可以看到,主体逻辑即是遍历订阅该频道的客户端,调用addReplyPubsubMessage发送消息;找到匹配的模式的客户端,调用addReplyPubsubPatMessage方法发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* Send a pubsub message of type "message" to the client.
* Normally 'msg' is a Redis object containing the string to send as
* message. However if the caller sets 'msg' as NULL, it will be able
* to send a special message (for instance an Array type) by using the
* addReply*() API family. */
void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2) // resp版本,客户端服务器通信协议版本
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,message_bulk);
addReplyBulk(c,channel);
if (msg) addReplyBulk(c,msg);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Send a pubsub message of type "pmessage" to the client. The difference
* with the "message" type delivered by addReplyPubsubMessage() is that
* this message format also includes the pattern that matched the message. */
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[4]);
else
addReplyPushLen(c,4);
addReply(c,shared.pmessagebulk);
addReplyBulk(c,pat);
addReplyBulk(c,channel);
addReplyBulk(c,msg);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

可以看到上述函数的实现中,在判断客户端服务器通信协议版本之后,都是通过调用addReply函数实现的消息发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* -----------------------------------------------------------------------------
* Higher level functions to queue data on the client output buffer.
* The following functions are the ones that commands implementations will call.
* -------------------------------------------------------------------------- */

/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;

if (sdsEncodedObject(obj)) {
_addReplyToBufferOrList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
_addReplyToBufferOrList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}

message的字符串表征后调用_addReplyToBufferOrList发送给客户端输出缓冲区.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;

/* Replicas should normally not cause any writes to the reply buffer. In case a rogue replica sent a command on the
* replication link that caused a reply to be generated we'll simply disconnect it.
* Note this is the simplest way to check a command added a response. Replication links are used to write data but
* not for responses, so we should normally never get here on a replica client. */
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
sds cmdname = c->lastcmd ? c->lastcmd->fullname : NULL;
logInvalidUseAndFreeClientAsync(c, "Replica generated a reply to command '%s'",
cmdname ? cmdname : "<unknown>");
return;
}

/* We call it here because this function may affect the reply
* buffer offset (see function comment) */
reqresSaveClientReplyOffset(c);

/* If we're processing a push message into the current client (i.e. executing PUBLISH
* to a channel which we are subscribed to, then we wanna postpone that message to be added
* after the command's reply (specifically important during multi-exec). the exception is
* the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply.
* The check for executing_client also avoids affecting push messages that are part of eviction. */
if (c == server.current_client && (c->flags & CLIENT_PUSHING) &&
server.executing_client && !cmdHasPushAsReply(server.executing_client->cmd))
{
_addReplyProtoToList(c,server.pending_push_messages,s,len);
return;
}

size_t reply_len = _addReplyToBuffer(c,s,len);
if (len > reply_len) _addReplyProtoToList(c,c->reply,s+reply_len,len-reply_len);
}

查看订阅信息

PUBSUB是redis 2.8新增命令之一,客户端以此查看频道或者模式的相关信息。

pubsub channels

pubsub channels [patterns]用于返回服务器当前被订阅的频道,其中pattern可选。

  1. 指定pattern,返回服务器当前被订阅的频道中那些与pattern模式相匹配的频道。
  2. 不指定pattern参数,则命令返回服务器当前被订阅的所有频道。

pubsub numsub

pubsub numsub [channel1 channel2]接收多个频道作为输入,返回这些频道的订阅者数量。这里是统计pubsub_channels的长度来实现的。

pubsub numpat

pubsub numpat用于返回服务器当前被订阅模式的数量。实现原理是返回pubsub_patterns的长度实现。

引用

1. pubsub.c
2. networking.c