structredisServer { /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ dict *pubsub_patterns; /* A dict of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of NOTIFY_... flags. */ dict *pubsubshard_channels; /* Map shard channels to list of subscribed clients */ }
同时,客户端上也会记录这一信息
1 2 3 4 5
typedefstructclient { dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ dict *pubsub_patterns; /* patterns a client is interested in (PSUBSCRIBE) */ dict *pubsubshard_channels; /* shard level channels a client is interested in (SSUBSCRIBE) */ }
/* * Publish a message to all the subscribers. */ intpubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) { int receivers = 0; dictEntry *de; dictIterator *di; listNode *ln; listIter li;
/* Send to clients listening for that channel */ de = dictFind(*type.serverPubSubChannels, channel); if (de) { list *list = dictGetVal(de); listNode *ln; listIter li;
/* Send a pubsub message of type "message" to the client. * Normally 'msg' is a Redis object containing the string to send as * message. However if the caller sets 'msg' as NULL, it will be able * to send a special message (for instance an Array type) by using the * addReply*() API family. */ voidaddReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) { uint64_t old_flags = c->flags; c->flags |= CLIENT_PUSHING; if (c->resp == 2) // resp版本,客户端服务器通信协议版本 addReply(c,shared.mbulkhdr[3]); else addReplyPushLen(c,3); addReply(c,message_bulk); addReplyBulk(c,channel); if (msg) addReplyBulk(c,msg); if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Send a pubsub message of type "pmessage" to the client. The difference * with the "message" type delivered by addReplyPubsubMessage() is that * this message format also includes the pattern that matched the message. */ voidaddReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) { uint64_t old_flags = c->flags; c->flags |= CLIENT_PUSHING; if (c->resp == 2) addReply(c,shared.mbulkhdr[4]); else addReplyPushLen(c,4); addReply(c,shared.pmessagebulk); addReplyBulk(c,pat); addReplyBulk(c,channel); addReplyBulk(c,msg); if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING; }
/* ----------------------------------------------------------------------------- * Higher level functions to queue data on the client output buffer. * The following functions are the ones that commands implementations will call. * -------------------------------------------------------------------------- */
/* Add the object 'obj' string representation to the client output buffer. */ voidaddReply(client *c, robj *obj) { if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) { _addReplyToBufferOrList(c,obj->ptr,sdslen(obj->ptr)); } elseif (obj->encoding == OBJ_ENCODING_INT) { /* For integer encoded strings we just convert it into a string * using our optimized function, and attach the resulting string * to the output buffer. */ char buf[32]; size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); _addReplyToBufferOrList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); } }
/* Replicas should normally not cause any writes to the reply buffer. In case a rogue replica sent a command on the * replication link that caused a reply to be generated we'll simply disconnect it. * Note this is the simplest way to check a command added a response. Replication links are used to write data but * not for responses, so we should normally never get here on a replica client. */ if (getClientType(c) == CLIENT_TYPE_SLAVE) { sds cmdname = c->lastcmd ? c->lastcmd->fullname : NULL; logInvalidUseAndFreeClientAsync(c, "Replica generated a reply to command '%s'", cmdname ? cmdname : "<unknown>"); return; }
/* We call it here because this function may affect the reply * buffer offset (see function comment) */ reqresSaveClientReplyOffset(c);
/* If we're processing a push message into the current client (i.e. executing PUBLISH * to a channel which we are subscribed to, then we wanna postpone that message to be added * after the command's reply (specifically important during multi-exec). the exception is * the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply. * The check for executing_client also avoids affecting push messages that are part of eviction. */ if (c == server.current_client && (c->flags & CLIENT_PUSHING) && server.executing_client && !cmdHasPushAsReply(server.executing_client->cmd)) { _addReplyProtoToList(c,server.pending_push_messages,s,len); return; }
size_t reply_len = _addReplyToBuffer(c,s,len); if (len > reply_len) _addReplyProtoToList(c,c->reply,s+reply_len,len-reply_len); }