引言 redis
服务器是一个事件驱动程序,服务器需要处理两类事件:
文件事件处理器:redis基于reactor
模式开发的网络事件处理器。
事件事件处理器: 比如serverCron
函数,需要在给定的时间点执行。
文件事件处理器使用io多路复用
来同时监听多个套接字,并根据套接字目前执行的任务为套接字关联不同的事件处理器。 当被监听的套接字准备好执行连接应答、读取、写入、关闭等操作时,与之操作相对应的文件事件就会产生,这些文件事件就会调用套接字关联的事件处理器处理这些事件。
事件处理流程
server.c main redis服务端主线程中定义有如下代码:
1 2 3 4 5 6 7 8 9 10 int main (int argc, char **argv) { ... initServer(); ... initListeners(); ... aeMain(server.el); aeDeleteEventLoop(server.el); return 0 ; }
初始化事件处理器 如上,redis
服务器主线程启动时,会调用initServer
,该函数调用aeCreateEventLoop
初始化事件处理器。在创建时需要指定setsize
,表示eventLoop
可以监听的网络事件fd的个数,如果当前监听的fd的个数超过了setsize
,eventLoop
将不能继续注册。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void initServer (void ) { ... server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); ... if (aeCreateTimeEvent(server.el, 1 , serverCron, NULL , NULL ) == AE_ERR) { serverPanic("Can't create event loop timers." ); exit (1 ); } ... if (aeCreateFileEvent(server.el, server.module_pipe[0 ], AE_READABLE, modulePipeReadable,NULL ) == AE_ERR) { serverPanic( "Error registering the readable event for the module pipe." ); } aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); ... }
初始化事件循环 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 aeEventLoop *aeCreateEventLoop (int setsize) { aeEventLoop *eventLoop; int i; monotonicInit(); if ((eventLoop = zmalloc(sizeof (*eventLoop))) == NULL ) goto err; eventLoop->events = zmalloc(sizeof (aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof (aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL ) goto err; eventLoop->setsize = setsize; eventLoop->timeEventHead = NULL ; eventLoop->timeEventNextId = 0 ; eventLoop->stop = 0 ; eventLoop->maxfd = -1 ; eventLoop->beforesleep = NULL ; eventLoop->aftersleep = NULL ; eventLoop->flags = 0 ; if (aeApiCreate(eventLoop) == -1 ) goto err; for (i = 0 ; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL ; }
创建时间事件 initServer
中调用aeCreateTimeEvent
创建时间事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 long long aeCreateTimeEvent (aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof (*te)); if (te == NULL ) return AE_ERR; te->id = id; te->when = getMonotonicUs() + milliseconds * 1000 ; te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->prev = NULL ; te->next = eventLoop->timeEventHead; te->refcount = 0 ; if (te->next) te->next->prev = te; eventLoop->timeEventHead = te; return id; }
注册文件IO事件 initServer
中调用aeCreateFileEvent
创建文件IO事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 int aeCreateFileEvent (aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1 ) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
监听事件 在server.c main()
中初始化服务器之后,调用initListeners()
初始化监听器。redis
中为服务器提供了TCP/TLS/UNIX三种监听器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 void initListeners (void ) { int conn_index; connListener *listener; if (server.port != 0 ) { conn_index = connectionIndexByType(CONN_TYPE_SOCKET); if (conn_index < 0 ) serverPanic("Failed finding connection listener of %s" , CONN_TYPE_SOCKET); listener = &server.listeners[conn_index]; listener->bindaddr = server.bindaddr; listener->bindaddr_count = server.bindaddr_count; listener->port = server.port; listener->ct = connectionByType(CONN_TYPE_SOCKET); } if (server.tls_port || server.tls_replication || server.tls_cluster) { ConnectionType *ct_tls = connectionTypeTls(); if (!ct_tls) { serverLog(LL_WARNING, "Failed finding TLS support." ); exit (1 ); } if (connTypeConfigure(ct_tls, &server.tls_ctx_config, 1 ) == C_ERR) { serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info." ); exit (1 ); } } if (server.tls_port != 0 ) { conn_index = connectionIndexByType(CONN_TYPE_TLS); if (conn_index < 0 ) serverPanic("Failed finding connection listener of %s" , CONN_TYPE_TLS); listener = &server.listeners[conn_index]; listener->bindaddr = server.bindaddr; listener->bindaddr_count = server.bindaddr_count; listener->port = server.tls_port; listener->ct = connectionByType(CONN_TYPE_TLS); } if (server.unixsocket != NULL ) { conn_index = connectionIndexByType(CONN_TYPE_UNIX); if (conn_index < 0 ) serverPanic("Failed finding connection listener of %s" , CONN_TYPE_UNIX); listener = &server.listeners[conn_index]; listener->bindaddr = &server.unixsocket; listener->bindaddr_count = 1 ; listener->ct = connectionByType(CONN_TYPE_UNIX); listener->priv = &server.unixsocketperm; } int listen_fds = 0 ; for (int j = 0 ; j < CONN_TYPE_MAX; j++) { listener = &server.listeners[j]; if (listener->ct == NULL ) continue ; if (connListen(listener) == C_ERR) { serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting." , listener->port, listener->ct->get_type(NULL )); exit (1 ); } if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK) serverPanic("Unrecoverable error creating %s listener accept handler." , listener->ct->get_type(NULL )); listen_fds += listener->count; } if (listen_fds == 0 ) { serverLog(LL_WARNING, "Configured to not listen anywhere, exiting." ); exit (1 ); } }
事件循环 redis
主程序中,初始化事件循环、监听器后将调用aeMain
进入事件循环中。
1 2 3 4 5 6 7 8 void aeMain (aeEventLoop *eventLoop) { eventLoop->stop = 0 ; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } }
aeProcessEvents
先处理每个挂起的时间事件,再处理每个挂起的文件事件。 如果没有特殊标志,该函数将休眠,直到某个文件事件触发,或者下一次事件发生。
flag
的取值含义如下: 0: 什么也不做 AE_ALL_EVENTS:处理所有事件
先来看eventLoop
的定义,位于ae.h
文件中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 typedef struct aeEventLoop { int maxfd; int setsize; long long timeEventNextId; aeFileEvent *events; aeFiredEvent *fired; aeTimeEvent *timeEventHead; int stop; void *apidata; aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; int flags; } aeEventLoop;
文件事件的定义如下:
1 2 3 4 5 6 typedef struct aeFileEvent { int mask; aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;
其中,mask
的取值如下,用以指定事件循环中注册的文件的触发方式:
1 2 3 4 #define AE_NONE 0 #define AE_READABLE 1 #define AE_WRITABLE 2 #define AE_BARRIER 4
触发的事件定义如下:
1 2 3 4 typedef struct aeFiredEvent { int fd; int mask; } aeFiredEvent;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 int aeProcessEvents (aeEventLoop *eventLoop, int flags) { int processed = 0 , numevents; if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0 ; if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; struct timeval tv , *tvp = NULL ; int64_t usUntilTimer; if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP)) eventLoop->beforesleep(eventLoop); if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) { tv.tv_sec = tv.tv_usec = 0 ; tvp = &tv; } else if (flags & AE_TIME_EVENTS) { usUntilTimer = usUntilEarliestTimer(eventLoop); if (usUntilTimer >= 0 ) { tv.tv_sec = usUntilTimer / 1000000 ; tv.tv_usec = usUntilTimer % 1000000 ; tvp = &tv; } } numevents = aeApiPoll(eventLoop, tvp); if (!(flags & AE_FILE_EVENTS)) { numevents = 0 ; } if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); for (j = 0 ; j < numevents; j++) { int fd = eventLoop->fired[j].fd; aeFileEvent *fe = &eventLoop->events[fd]; int mask = eventLoop->fired[j].mask; int fired = 0 ; int invert = fe->mask & AE_BARRIER; if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; } if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } if (invert) { fe = &eventLoop->events[fd]; if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; }
如上aeProcessEvents
的代码中,调用了如下两个方法:
aeApiPoll
: 调用 多路复用 api,监控多个文件描述符的可读可写状态。
processTimeEvents
: 处理时间事件
监听文件描述符 aeApiPoll
实际上是 IO多路复用接口,redis
中提供了四种实现方式:evport、epoll、kqueue、select
。
因为 Redis 需要在多个平台上运行,同时为了最大化执行的效率与性能,所以会根据编译平台的不同选择不同的 I/O 多路复用函数作为子模块,提供给上层统一的接口;在 Redis 中,通过宏定义的使用,合理的选择不同的子模块:
1 2 3 4 5 6 7 8 9 10 11 12 13 #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
处理文件读 1 2 3 4 5 6 if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; }
处理文件写 1 2 3 4 5 6 7 if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } }
处理时间事件 processTimeEvents
方法用以处理时间事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 static int processTimeEvents (aeEventLoop *eventLoop) { int processed = 0 ; aeTimeEvent *te; long long maxId; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1 ; monotime now = getMonotonicUs(); while (te) { long long id; if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; if (te->refcount) { te = next; continue ; } if (te->prev) te->prev->next = te->next; else eventLoop->timeEventHead = te->next; if (te->next) te->next->prev = te->prev; if (te->finalizerProc) { te->finalizerProc(eventLoop, te->clientData); now = getMonotonicUs(); } zfree(te); te = next; continue ; } if (te->id > maxId) { te = te->next; continue ; } if (te->when <= now) { int retval; id = te->id; te->refcount++; retval = te->timeProc(eventLoop, id, te->clientData); te->refcount--; processed++; now = getMonotonicUs(); if (retval != AE_NOMORE) { te->when = now + retval * 1000 ; } else { te->id = AE_DELETED_EVENT_ID; } } te = te->next; } return processed; }
IO多路复用 在IO多路复用模型
中,最重要的就是select
函数,用以监听多个文件描述符的可读可写状态,当某些文件描述符可读或者可写时,该方法就会返回可读或可写的文件描述符个数。
缘起:
1 2 3 4 5 6 7 8 9 10 11 12 13 void startSaving (int rdbflags) { int subevent; if (rdbflags & RDBFLAGS_AOF_PREAMBLE && getpid() != server.pid) subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START; else if (rdbflags & RDBFLAGS_AOF_PREAMBLE) subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_AOF_START; else if (getpid()!=server.pid) subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START; else subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START; moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL ); }
1 2 3 4 5 6 7 void stopSaving (int success) { moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE, success? REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: REDISMODULE_SUBEVENT_PERSISTENCE_FAILED, NULL );
引用 1. 事件驱动 2. redis源码学习_事件 3. 浅析Redis与IO多路复用器原理 4. Redis 事件驱动详解 5. server.c 6. ae.c 7. Redis 事件循环器 (AE) 实现剖析