引言

redis服务器是一个事件驱动程序,服务器需要处理两类事件:

  1. 文件事件处理器:redis基于reactor模式开发的网络事件处理器。
  2. 事件事件处理器: 比如serverCron函数,需要在给定的时间点执行。

文件事件处理器使用io多路复用来同时监听多个套接字,并根据套接字目前执行的任务为套接字关联不同的事件处理器。
当被监听的套接字准备好执行连接应答、读取、写入、关闭等操作时,与之操作相对应的文件事件就会产生,这些文件事件就会调用套接字关联的事件处理器处理这些事件。

事件处理流程

server.c main

redis服务端主线程中定义有如下代码:

1
2
3
4
5
6
7
8
9
10
int main(int argc, char **argv) {
...
initServer(); // 创建、初始化服务端数据结构,
...
initListeners(); // 初始化服务器后,初始化监听器
...
aeMain(server.el); // 启动 事件循环
aeDeleteEventLoop(server.el); // 服务器关闭时,停止事件循环
return 0;
}

初始化事件处理器

如上,redis服务器主线程启动时,会调用initServer,该函数调用aeCreateEventLoop初始化事件处理器。在创建时需要指定setsize,表示eventLoop可以监听的网络事件fd的个数,如果当前监听的fd的个数超过了setsize,eventLoop将不能继续注册。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void initServer(void) {

...
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
...
if (aeCreateFileEvent(server.el, server.module_pipe[0], AE_READABLE,
modulePipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module pipe.");
}

aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
...
}
初始化事件循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;

monotonicInit(); /* just in case the calling app didn't initialize */
// 创建事件循环数据结构
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// 初始化 事件循环中的注册事件和已触发事件
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
// 设置数组大小
eventLoop->setsize = setsize;
// 初始化时间事件结构
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
// 创建 io多路复用实例,赋值给apidata字段
if (aeApiCreate(eventLoop) == -1) goto err;
// 初始化监听事件
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;

err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
创建时间事件

initServer中调用aeCreateTimeEvent创建时间事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
// 更新 时间计数器
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
// 创建时间事件结构
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
// 设置id
te->id = id;
// 设定处理事件的时间
te->when = getMonotonicUs() + milliseconds * 1000;
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
注册文件IO事件

initServer中调用aeCreateFileEvent创建文件IO事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 取出事件循环中的文件描述符对应的文件事件
aeFileEvent *fe = &eventLoop->events[fd];
// 监听该 文件描述符指定的 事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// 设置 文件事件类型,以及事件的处理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}

监听事件

server.c main()中初始化服务器之后,调用initListeners()初始化监听器。
redis中为服务器提供了TCP/TLS/UNIX三种监听器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
void initListeners(void) {
/* Setup listeners from server config for TCP/TLS/Unix */
int conn_index;
connListener *listener;
if (server.port != 0) {
conn_index = connectionIndexByType(CONN_TYPE_SOCKET);
if (conn_index < 0)
serverPanic("Failed finding connection listener of %s", CONN_TYPE_SOCKET);
listener = &server.listeners[conn_index];
listener->bindaddr = server.bindaddr;
listener->bindaddr_count = server.bindaddr_count;
listener->port = server.port;
listener->ct = connectionByType(CONN_TYPE_SOCKET);
}

if (server.tls_port || server.tls_replication || server.tls_cluster) {
ConnectionType *ct_tls = connectionTypeTls();
if (!ct_tls) {
serverLog(LL_WARNING, "Failed finding TLS support.");
exit(1);
}
if (connTypeConfigure(ct_tls, &server.tls_ctx_config, 1) == C_ERR) {
serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info.");
exit(1);
}
}

if (server.tls_port != 0) {
conn_index = connectionIndexByType(CONN_TYPE_TLS);
if (conn_index < 0)
serverPanic("Failed finding connection listener of %s", CONN_TYPE_TLS);
listener = &server.listeners[conn_index];
listener->bindaddr = server.bindaddr;
listener->bindaddr_count = server.bindaddr_count;
listener->port = server.tls_port;
listener->ct = connectionByType(CONN_TYPE_TLS);
}
if (server.unixsocket != NULL) {
conn_index = connectionIndexByType(CONN_TYPE_UNIX);
if (conn_index < 0)
serverPanic("Failed finding connection listener of %s", CONN_TYPE_UNIX);
listener = &server.listeners[conn_index];
listener->bindaddr = &server.unixsocket;
listener->bindaddr_count = 1;
listener->ct = connectionByType(CONN_TYPE_UNIX);
listener->priv = &server.unixsocketperm; /* Unix socket specified */
}

/* create all the configured listener, and add handler to start to accept */
int listen_fds = 0;
for (int j = 0; j < CONN_TYPE_MAX; j++) {
listener = &server.listeners[j];
if (listener->ct == NULL)
continue;

if (connListen(listener) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL));
exit(1);
}

if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));

listen_fds += listener->count;
}

if (listen_fds == 0) {
serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
exit(1);
}
}

事件循环

redis主程序中,初始化事件循环、监听器后将调用aeMain进入事件循环中。

1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}

aeProcessEvents先处理每个挂起的时间事件,再处理每个挂起的文件事件。
如果没有特殊标志,该函数将休眠,直到某个文件事件触发,或者下一次事件发生。

flag的取值含义如下:
0: 什么也不做
AE_ALL_EVENTS:处理所有事件

先来看eventLoop的定义,位于ae.h文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; // 当前注册的事件中,最大的文件描述符
int setsize; // 追踪的文件描述符的最大值
long long timeEventNextId;
aeFileEvent *events; // 已注册的文件事件
aeFiredEvent *fired; // 已触发的事件
aeTimeEvent *timeEventHead; // 时间事件的头指针
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;

文件事件的定义如下:

1
2
3
4
5
6
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc; // 回调方法的指针,类似于接口
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;

其中,mask的取值如下,用以指定事件循环中注册的文件的触发方式:

1
2
3
4
#define AE_NONE 0      // 没有事件注册
#define AE_READABLE 1 // 文件描述符为可读时,触发该事件
#define AE_WRITABLE 2 // 文件描述符为可写时,触发触发此事件
#define AE_BARRIER 4 // AE_BARRIER时,不会再可读事件后继续触发可写事件;此情形下,会导致读写顺序,使写先发生,再执行读

触发的事件定义如下:

1
2
3
4
typedef struct aeFiredEvent {
int fd; // 文件描述符
int mask; // 触发事件标志位
} aeFiredEvent;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/* 取值如下:
AE_FILE_EVENTS: 0000 0001
AE_TIME_EVENTS: 0000 0010
AE_ALL_EVENTS: 0000 0011
AE_DONT_WAIT: 0000 0100
AE_CALL_BEFORE_SLEEP: 0000 1000
AE_CALL_AFTER_SLEEP: 0001 0000
*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;

if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

/* Note that we want to call aeApiPoll() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
struct timeval tv, *tvp = NULL; /* NULL means infinite wait. */
int64_t usUntilTimer;

if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
eventLoop->beforesleep(eventLoop);
// 在 beforesleep 中,eventLoop->flags可能会变化,两者存在时,flags的优先级更高
// 如果 flags = AE_DONT_WAIT,无需处理 eventLoop->flags
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
}
}
// 调用多路复用的api,仅在超时或者某些事件触发时返回
numevents = aeApiPoll(eventLoop, tvp);

// 如果flag不包含AE_FILE_EVENTS,则 待处理事件数为0[包含情况仅为file_event和 all_event]
if (!(flags & AE_FILE_EVENTS)) {
numevents = 0;
}
// 如果不为 after_sleep 不为空, 且flags为after_sleep时,设置后置处理
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);

// 遍历事件
for (j = 0; j < numevents; j++) {
// 获取当前事件的文件描述符
int fd = eventLoop->fired[j].fd;
// 根据文件描述符,定位到注册的文件事件
aeFileEvent *fe = &eventLoop->events[fd];
// 获取该文件事件的标志位
int mask = eventLoop->fired[j].mask;
// 计量 当前文件描述符 调用的事件数量
int fired = 0;

// 是否需要倒置 写读 事件
int invert = fe->mask & AE_BARRIER;

// 防止该文件事件已经被触发且已经从fireevents中移除
if (!invert && fe->mask & mask & AE_READABLE) {
// 调用该文件事件的 读操作 , 触发事件++
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}

// 触发 写操作
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
// 调用该文件事件的 写方法, 触发事件++
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}

// 如果需要 倒置写读 操作,则在上述 写操作完毕后,再判断是否需要执行 读操作
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
// 已处理事件 ++
processed++;
}
}
// 检查是否需要处理时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
// 返回处理过的时间/文件事件
return processed;
}

如上aeProcessEvents的代码中,调用了如下两个方法:

  1. aeApiPoll: 调用 多路复用 api,监控多个文件描述符的可读可写状态。
  2. processTimeEvents: 处理时间事件
监听文件描述符

aeApiPoll实际上是 IO多路复用接口,redis中提供了四种实现方式:
evport、epoll、kqueue、select

因为 Redis 需要在多个平台上运行,同时为了最大化执行的效率与性能,所以会根据编译平台的不同选择不同的 I/O 多路复用函数作为子模块,提供给上层统一的接口;在 Redis 中,通过宏定义的使用,合理的选择不同的子模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
处理文件读
1
2
3
4
5
6
if (!invert && fe->mask & mask & AE_READABLE) {
// 调用该文件事件的 读操作 , 触发事件++
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
处理文件写
1
2
3
4
5
6
7
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
// 调用该文件事件的 写方法, 触发事件++
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
处理时间事件

processTimeEvents方法用以处理时间事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;

te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
monotime now = getMonotonicUs();
while(te) {
long long id;

// 删除计划删除的事件, AE_DELETED_EVENT_ID = -1
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
/* If a reference exists for this timer event,
* don't free it. This is currently incremented
* for recursive timerProc calls */
if (te->refcount) {
te = next;
continue;
}
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc) {
te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs();
}
zfree(te);
te = next;
continue;
}
// 在这次的事件循环中,不处理由被时间事件创建的时间事件。目前已无用
if (te->id > maxId) {
te = te->next;
continue;
}

if (te->when <= now) {
int retval;

id = te->id;
te->refcount++;
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
now = getMonotonicUs();
if (retval != AE_NOMORE) {
te->when = now + retval * 1000;
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}

IO多路复用

IO多路复用模型中,最重要的就是select函数,用以监听多个文件描述符的可读可写状态,当某些文件描述符可读或者可写时,该方法就会返回可读或可写的文件描述符个数。

缘起:

1
2
3
4
5
6
7
8
9
10
11
12
13
void startSaving(int rdbflags) {
/* Fire the persistence modules start event. */
int subevent;
if (rdbflags & RDBFLAGS_AOF_PREAMBLE && getpid() != server.pid)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START;
else if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_AOF_START;
else if (getpid()!=server.pid)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START;
else
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START;
moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL);
}
1
2
3
4
5
6
7
void stopSaving(int success) {
/* Fire the persistence modules end event. */
moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,
success?
REDISMODULE_SUBEVENT_PERSISTENCE_ENDED:
REDISMODULE_SUBEVENT_PERSISTENCE_FAILED,
NULL);

引用

1. 事件驱动
2. redis源码学习_事件
3. 浅析Redis与IO多路复用器原理
4. Redis 事件驱动详解
5. server.c
6. ae.c
7. Redis 事件循环器 (AE) 实现剖析