由于redis是内存数据库,其数据库状态保存在内存中。redis中提供了持久化功能,用以将redis内存中的数据库状态保存在磁盘中,避免意外的数据丢失。

Redis中提供了两种持久化方式,分别是RDBAOF

RDB

RDB持久化功能可以将某个时间点上的数据库状态保存到一个RDB文件中,所生成的RDB文件是一个经过压缩的二进制文件,通过该文件可以还原生成RDB文件时的数据库状态。

RDB文件载入

rdb文件的载入工作是在服务器启动时自动执行的。
因为aof文件的更新频率通常比rdb文件的更新频率高,所以如果服务器开启了AOF持久化功能,服务器将优先使用AOF文件来还原数据库状态。
只有在AOF持久化功能处于关闭时,服务器才会使用rdb文件来还原数据库状态。

另外,在服务器载入rdb文件期间,会一直处于阻塞状态,直到载入完毕。

RDB文件创建

redis中可以使用SAVEBGSAVE两个命令生成RDB文件。
其中,save命令会阻塞服务器进程,直到RDB文件创建完毕位置,在此期间,服务器不能处理任何命令请求。而BGSAVE命令则会派生出一个子进程,由子进程负责创建RDB文件,服务器主进程可以继续处理命令请求。
由于BGSAVE可以在不阻塞服务器的情况下执行,因此redis提供了相应配置,条件触发BGSAVE命令。
redis.conf中有如下配置:

1
2
save <seconds> <changes> [<seconds> <changes> ...]
save 3600 1 300 100 60 10000

上述命令等价于如下三行:

1
2
3
save 3600 1
save 300 100
save 60 10000
  1. 服务器在3600s内,对数据库至少修改了1次;
  2. 服务器在300s内,对数据库至少修改了100次;
  3. 服务器在60s内,对数据库至少修改了10000次;
    上述任意条件满足时,服务器就会执行BGSAVE命令。

RDB持久化相关的配置体现在redisServer中:

1
2
3
4
5
6
7
8
9
10
struct redisServer {
struct saveparam *saveparams; // 触发rdb自动保存的配置数组
long long dirty; // 上一次save执行后,数据库表更次数
time_t lastsave; // 上次成功执行 save的 unix时间
}

struct saveparam {
time_t seconds; // 秒数
int changes; // 修改次数
};

RDB文件结构

下图描述了一个RDB文件结构:

  1. 文件开头保存的是二进制的redis,占三个字节;
  2. db_version: 占四个字节,是字符串表示的整数,用以记录rdb文件的版本号;
  3. databases:用以保存任意多个非空数据库状态;
  4. EOF: 占一个字节,用以标识rdb文件中数据库状态记录的结束;
  5. check_sum: 为8字节长度的无符号整数,保存对数据库状态的校验和;
    其中,databases中每个非空数据库在rdb文件中都可以保存为SELECTDB|db_number|key_value_pair三部分。

而每一个key_value_pair都保存了一个或多个键值对,如果键值对带有过期时间,则键值对中的过期时间也会被保存。
键值对的保存参见: redis设计与实现

RDB 源码实现

save 命令的实现

redis客户端中,键入save即可以触发rdb文件的生成,如果后台没有子进程在生成rdb 就会调用rdbSave生成文件,并保存在磁盘中。

saveCommand
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void saveCommand(client *c) {
// 检查是否有进程在执行save,如有,则停止
if (server.child_type == CHILD_TYPE_RDB) {
addReplyError(c,"Background save already in progress");
return;
}

server.stat_rdb_saves++;

rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE) == C_OK) {
addReply(c,shared.ok);
} else {
addReplyErrorObject(c,shared.err);
}
}
rdbPopulateSaveInfo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT;
*rsi = rsi_init;

// 如果当前节点是主节点,且当 repl_backlog 不为null时,可以复制副本信息;
// 如果 repl_backlog 为空,则代表当前实例不再副本链中。在这种场景下,副本无用。
// 当从节点连接时,repl_backlog=null将触发完全同步,榆次同事将会创造新的副本。
if (!server.masterhost && server.repl_backlog) {
// 如果 server.slaveseldb=-1,则代表主节点在完全同步之后尚未接收任何请求;
// 因此可以将 repl_stream_db=0,以允许从节点重新加载副本偏移量;
// 此过程是安全的,因为下一次写入前将生成 select语句
rsi->repl_stream_db = server.slaveseldb == -1 ? 0 : server.slaveseldb;
return rsi;
}

// 如果当前是从节点,则需要连接到主节点,以获取当前选中的DB
if (server.master) {
rsi->repl_stream_db = server.master->db->id;
return rsi;
}
// 如果有一个缓存的master,可以使用它来填充选中的DB信息;从节点能根据来自master的数据增加 master_repl_offset,因此即使断开连接,master上的偏移量仍是有效的。
if (server.cached_master) {
rsi->repl_stream_db = server.cached_master->db->id;
return rsi;
}
return NULL;
}
rdbSave

rdbSave命令:将当前redis的数据库状态保存在磁盘中,返回ok即成功,否则返回C_ERR。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */

startSaving(RDBFLAGS_NONE);
// 创建临时文件
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());

if (rdbSaveInternal(req,tmpfile,rsi,rdbflags) != C_OK) {
stopSaving(0);
return C_ERR;
}

// 重命名文件
if (rename(tmpfile,filename) == -1) {
char *str_err = strerror(errno);
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Error moving temp DB file %s on the final "
"destination %s (in server root dir %s): %s",
tmpfile,
filename,
cwdp ? cwdp : "unknown",
str_err);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
if (fsyncFileDir(filename) != 0) {
serverLog(LL_WARNING,
"Failed to fsync directory while saving DB: %s", strerror(errno));
stopSaving(0);
return C_ERR;
}

serverLog(LL_NOTICE,"DB saved on disk");
// 将 server.dirty 清零,该参数用以记录在上次生成rdb文件之后有多少次数据变更
server.dirty = 0;
// 记录上一次执行save操作的时间
server.lastsave = time(NULL);
// 更新save操作状态
server.lastbgsave_status = C_OK;
stopSaving(1);
return C_OK;
}
rdbSaveInternal

上述过程调用了 rdbSaveInternal 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int rdbflags) {
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
rio rdb;
int error = 0;
int saved_errno;
char *err_op; /* For a detailed log */
// 以写权限打开文件
FILE *fp = fopen(filename,"w");
if (!fp) {
saved_errno = errno;
char *str_err = strerror(errno);
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Failed opening the temp RDB file %s (in server root dir %s) "
"for saving: %s",
filename,
cwdp ? cwdp : "unknown",
str_err);
errno = saved_errno;
return C_ERR;
}
// 初始化 rio 对象 (文件对象io)
rioInitWithFile(&rdb,fp);

if (server.rdb_save_incremental_fsync) {
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
if (!(rdbflags & RDBFLAGS_KEEP_CACHE)) rioSetReclaimCache(&rdb,1);
}

// 将数据库状态写入到rio中
if (rdbSaveRio(req,&rdb,&error,rdbflags,rsi) == C_ERR) {
errno = error;
err_op = "rdbSaveRio";
goto werr;
}
// 清理缓冲区,以保证所有数据都被写入磁盘
if (fflush(fp)) { err_op = "fflush"; goto werr; }
// 将fp指向的文件同步到磁盘中
if (fsync(fileno(fp))) { err_op = "fsync"; goto werr; }
if (!(rdbflags & RDBFLAGS_KEEP_CACHE) && reclaimFilePageCache(fileno(fp), 0, 0) == -1) {
serverLog(LL_NOTICE,"Unable to reclaim cache after saving RDB: %s", strerror(errno));
}
// 关闭文件
if (fclose(fp)) { fp = NULL; err_op = "fclose"; goto werr; }

return C_OK;

werr:
saved_errno = errno;
serverLog(LL_WARNING,"Write error while saving DB to the disk(%s): %s", err_op, strerror(errno));
if (fp) fclose(fp);
unlink(filename);
errno = saved_errno;
return C_ERR;
}
rdbSaveRio

写入文件的主要逻辑由rdbSaveRio完成:[rio是redis抽象的io层,面向缓冲区、文件IO和socket IO]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
char magic[10];
uint64_t cksum;
long key_counter = 0;
int j;

if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;

/* save functions */
if (!(req & SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS) && rdbSaveFunctions(rdb) == -1) goto werr;

// 遍历服务器上的所有数据库
/* save all databases, skip this if we're in functions-only mode */
if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA)) {
for (j = 0; j < server.dbnum; j++) {
if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
}
}

if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;

// 向 rdb文件对象中写入 EOF
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;

werr:
if (error) *error = errno;
return C_ERR;
}

rdbSaveDb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 存储每个数据库中的数据库状态
ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
dictIterator *di;
dictEntry *de;
ssize_t written = 0;
ssize_t res;
static long long info_updated_time = 0;
char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";

redisDb *db = server.db + dbid;
dict *d = db->dict;
if (dictSize(d) == 0) return 0; // 如果此数据库中未保存数据,则跳过此数据库。
di = dictGetSafeIterator(d);

// 向rdb文件中写入SELECTDB,以标识接下来将要读到数据库编号
if ((res = rdbSaveType(rdb,RDB_OPCODE_SELECTDB)) < 0) goto werr;
written += res;
// 向rdb文件中写入此数据库键值对占用的的长度
if ((res = rdbSaveLen(rdb, dbid)) < 0) goto werr;
written += res;

/* Write the RESIZE DB opcode. */
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if ((res = rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb,db_size)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb,expires_size)) < 0) goto werr;
written += res;

// 遍历当前db的每一个entry
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
size_t rdb_bytes_before_key = rdb->processed_bytes;

initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr;
written += res;

// 在fork出来的子进程中,可以尝试将内存释放回操作系统,同时尽可能减少COW
size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key;
if (server.in_fork_child) dismissObject(o, dump_size);

// 每秒更新一次 child 状态信息,以避免在每次迭代中调用调用mstime()
if (((*key_counter)++ & 1023) == 0) {
long long now = mstime();
if (now - info_updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_counter, pname);
info_updated_time = now;
}
}
}

dictReleaseIterator(di);
return written;

werr:
dictReleaseIterator(di);
return -1;
}

由于redis是单线程模型,执行save会阻塞服务器主进程执行其他请求吗,故而提供了另外一种实现bgsave,通过fork子进程的方式,使用子进程创建rdb文件,文件的保存过程中不影响服务器的正常读写。

bgsave 命令的实现

bgsaveCommand
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void bgsaveCommand(client *c) {
int schedule = 0;

/* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite
* is in progress. Instead of returning an error a BGSAVE gets scheduled. */
if (c->argc > 1) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {
schedule = 1;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}

rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);

if (server.child_type == CHILD_TYPE_RDB) {
addReplyError(c,"Background save already in progress");
} else if (hasActiveChildProcess() || server.in_exec) {
if (schedule || server.in_exec) {
server.rdb_bgsave_scheduled = 1;
addReplyStatus(c,"Background saving scheduled");
} else {
addReplyError(c,
"Another child process is active (AOF?): can't BGSAVE right now. "
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
"possible.");
}
} else if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE) == C_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReplyErrorObject(c,shared.err);
}
}

rdbPopulateSaveInfo

前文已有描述,不再复述。

rdbSaveBackground

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
pid_t childpid;

// 如果有正在执行 save、bgsave、aof-rewriting、被加载模块派生的某个副进程时,返回 C_ERR
if (hasActiveChildProcess()) return C_ERR;
// 记录服务器上rdb save的执行次数
server.stat_rdb_saves++;

server.dirty_before_bgsave = server.dirty;
// 记录上一次执行 bgsave 的unix时间
server.lastbgsave_try = time(NULL);

if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
int retval;

// child子进程,修改进程标题
redisSetProcTitle("redis-rdb-bgsave");
redisSetCpuAffinity(server.bgsave_cpulist);
// 进行rdb持久化,之后与save执行过程无异
retval = rdbSave(req, filename,rsi,rdbflags);
if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
// 父进程中,记录子进程的fork时间等信息
if (childpid == -1) {
server.lastbgsave_status = C_ERR;
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return C_ERR;
}
serverLog(LL_NOTICE,"Background saving started by pid %ld",(long) childpid);
// 记录子进程中执行rdbsave的时间
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
return C_OK;
}
return C_OK; /* unreached */
}

redisFork

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/* purpose is one of CHILD_TYPE_ types */
int redisFork(int purpose) {
if (isMutuallyExclusiveChildType(purpose)) {
if (hasActiveChildProcess()) {
errno = EEXIST;
return -1;
}

openChildInfoPipe();
}

int childpid;
long long start = ustime();
if ((childpid = fork()) == 0) {

// 设置遵循如下原因
// 1. 由于信号随时可能触发,所以需要先设置信号处理程序
// 2. 随后调整OOM得分然后再做其他事情,以此帮助OOM killer在资源不足时进行回收
server.in_fork_child = purpose;
setupChildSignalHandlers();
setOOMScoreAdj(CONFIG_OOM_BGCHILD);
updateDictResizePolicy();
dismissMemoryInChild();
// fork结束后关闭子进程中未用到的资源
closeChildUnusedResourceAfterFork();
/* Close the reading part, so that if the parent crashes, the child will
* get a write error and exit. */
if (server.child_info_pipe[0] != -1)
close(server.child_info_pipe[0]);
} else {
/* Parent */
if (childpid == -1) {
int fork_errno = errno;
if (isMutuallyExclusiveChildType(purpose)) closeChildInfoPipe();
errno = fork_errno;
return -1;
}

server.stat_total_forks++;
server.stat_fork_time = ustime()-start;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);

/* The child_pid and child_type are only for mutually exclusive children.
* other child types should handle and store their pid's in dedicated variables.
*
* Today, we allows CHILD_TYPE_LDB to run in parallel with the other fork types:
* - it isn't used for production, so it will not make the server be less efficient
* - used for debugging, and we don't want to block it from running while other
* forks are running (like RDB and AOF) */
if (isMutuallyExclusiveChildType(purpose)) {
server.child_pid = childpid;
server.child_type = purpose;
server.stat_current_cow_peak = 0;
server.stat_current_cow_bytes = 0;
server.stat_current_cow_updated = 0;
server.stat_current_save_keys_processed = 0;
server.stat_module_progress = 0;
server.stat_current_save_keys_total = dbTotalServerKeyCount();
}

updateDictResizePolicy();
moduleFireServerEvent(REDISMODULE_EVENT_FORK_CHILD,
REDISMODULE_SUBEVENT_FORK_CHILD_BORN,
NULL);
}
return childpid;
}

另外,redis中定义了五种子进程类型:

1
2
3
4
5
#define CHILD_TYPE_NONE 0
#define CHILD_TYPE_RDB 1
#define CHILD_TYPE_AOF 2
#define CHILD_TYPE_LDB 3
#define CHILD_TYPE_MODULE 4

rdbsave 命令总结

AOF

除了rdb持久化外,redis还提供了aof, append only file持久化功能,将服务器执行的命令以redis的命令请求协议格式记录到aof文件中。
redis中默认不开启aof持久化功能,可通过修改如下配置实现:

1
appendonly no ==> appendonly yes

redis.conf中定义了aof持久化功能

AOF 命令追加

redisServeraof相关参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct redisServer {
long long stat_aof_rewrites; /* number of aof file rewrites performed */
/* AOF persistence */
int aof_enabled; /* AOF configuration */
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync; // aof 文件同步模式
int aof_no_fsync_on_rewrite; // aof 重写标志位, 如果已经在重写,则无需同步
int aof_rewrite_perc; // aof重写间隔
off_t aof_rewrite_min_size; // aof文件的最小bytes
off_t aof_rewrite_base_size; // 最近执行aof重写时 aof文件的大小
off_t aof_current_size; // 当前aof文件大小
off_t aof_last_incr_size; // 上一次aof重写后,文件增加的字节数
int aof_rewrite_scheduled; // 一旦 bgsave终止,执行aof重写
sds aof_buf; // aof 缓冲区
int aof_fd; // 当前aof的文件描述符
int aof_selected_db; /* Currently selected DB in AOF */
}

客户端执行命令时,服务器在执行命令时,会将该值用协议进行封装,而后追加到aof_buf缓冲区中。

AOF文件写入与同步

redis服务器进程实际上是一个事件循环eventLoop,循环中包含两个事件:文件事件时间事件,其中,文件时间负责接收客户端的请求、向客户端发送回复命令等;时间事件则执行像severCron之类需要定时运行的函数。

服务器处理文件事件时,可能会执行写命令,这些内容将被加入到redisServer.aof_buf中。所以在结束一个事件循环后,都会调用flushAppendOnlyFile函数,考虑是否将aof_buf缓冲区中的内容写入和保存到AOF文件中。
系统提供了fsyncfdatasync两个同步函数,可以强制让操作系统立即将缓冲区中的数据写入到磁盘中,从而确保写入数据的安全性。

AOF持久化模式

redisServeraof_fsync定义了服务器的持久化行为,其取值如下:

1
2
3
#define AOF_FSYNC_NO 0
#define AOF_FSYNC_ALWAYS 1
#define AOF_FSYNC_EVERYSEC 2
  1. AOF_FSYNC_NO: 将aof_buf缓冲区中的所有内容写入到AOF文件中,但不对AOF文件进行同步,何时同步由操作系统决定。[aof不是在磁盘上么?为什么会有同步的差异]
  2. AOF_FSYNC_ALWAYS: 将aof_buf缓冲区中的所有内容写入并同步到AOF文件中。
  3. AOF_FSYNC_EVERYSEC: 将aof_buf缓冲区中的所有内容写入到AOF文件中,如果距上次同步AOF文件的时间超过一秒钟,则再次对AOF文件进行同步,此同步过程有一个子线程处理。
    默认为 AOF_FSYNC_EVERYSEC

    这里之所以有写入和同步的差异,是因为在操作系统中,用户调用write函数将数据写入到文件时,并不是立即将该数据落盘,而是先将数据保存在一个内存缓冲区中,待缓冲区中的文件被填满,或者超出指定时限后,才真正将缓冲区中的数据写入到磁盘中。

AOF持久化模式的对比

  1. 安全性角度:always> every_sec>no,即使出现停机故障,aof持久化也只会丢失一个事件循环中所产生的命令数据。而every_sec也只会丢失一秒钟的命令数据。
  2. 效率:always< every_sec < no,因为 服务器在每个事件循环中,既需要将aof_buf缓冲区中的所有内容写入到AOF文件中,也需要将页缓存中的数据同步到AOF文件中。而no模式下,无需执行同步操作,故而该模式下AOF文件的写入速度最快,但是同步时间反而是最长的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// 如果 force=1,则无论是否有后台调用fsync,文件都将被写入
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;

if (sdslen(server.aof_buf) == 0) {
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_last_incr_fsync_offset != server.aof_last_incr_size &&
server.unixtime > server.aof_last_fsync &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;

/* Check if we need to do fsync even the aof buffer is empty,
* the reason is described in the previous AOF_FSYNC_EVERYSEC block,
* and AOF_FSYNC_ALWAYS is also checked here to handle a case where
* aof_fsync is changed from everysec to always. */
} else if (server.aof_fsync == AOF_FSYNC_ALWAYS &&
server.aof_last_incr_fsync_offset != server.aof_last_incr_size)
{
goto try_fsync;
} else {
return;
}
}

if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = aofFsyncInProgress();

if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
// EVERYSEC 模式下, 将在后台执行fsync
// 如果fsync正在进行中,则在2秒后尝试进行 fsync
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponing, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
// 否则失败
/* Otherwise fall through, and go write since we can't wait
* over two seconds. */
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */

if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
usleep(server.aof_flush_sleep);
}

latencyStartMonitor(latency);
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
/* We want to capture different events for delayed writes:
* when the delay happens with a pending fsync, or with a saving child
* active, and when the above two conditions are missing.
* We also use an additional event name to save all samples which is
* useful for graphing / monitoring purposes. */
if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (hasActiveChildProcess()) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);

/* We performed the write so reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;

if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;

/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}

/* Log the AOF write error and record the error code. */
if (nwritten == -1) {
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
}
server.aof_last_write_errno = errno;
} else {
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}

if (ftruncate(server.aof_fd, server.aof_last_incr_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}

/* Handle the AOF write error. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the reply
* for the client is already in the output buffers (both writes and
* reads), and the changes to the db can't be rolled back. Since we
* have a contract with the user that on acknowledged or observed
* writes are is synced on disk, we must exit. */
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = C_ERR;

/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
server.aof_last_incr_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_NOTICE,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
server.aof_current_size += nwritten;
server.aof_last_incr_size += nwritten;

/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}

try_fsync:
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
return;

/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* redis_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
/* Let's try to get this data on the disk. To guarantee data safe when
* the AOF fsync policy is 'always', we should exit if failed to fsync
* AOF (see comment next to the exit(1) after write error above). */
if (redis_fsync(server.aof_fd) == -1) {
serverLog(LL_WARNING,"Can't persist AOF for fsync error when the "
"AOF fsync policy is 'always': %s. Exiting...", strerror(errno));
exit(1);
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
server.aof_last_fsync = server.unixtime;
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
}
server.aof_last_fsync = server.unixtime;
}
}
  1. 判断aof_buf==0,检查我们是否需要做fsync,即使aof缓冲区是空的,因为以前在EVERYSEC模式下,fsync只有在aof缓冲区不是空的时候才会被调用,所以如果用户在fsync调用前一秒停止写命令,那么页面缓存中的数据就不能及时刷新。这里就能解释写入和同步的差异: 同步是指,将用户输入的内容从页缓存中同步到aof文件中。而写入则是指将aof_buf中的数据写入到aof文件中
  2. AOF_FSYNC_ALWAYS 模式下也需要检查,以防止持久化模式是从EVERYSEC被修改为AOF_FSYNC_ALWAYS

AOF重写

由于AOF持久化是通过保存服务器执行的命令来记录数据库状态的,故而AOF文件中的内容会越来越多,文件也会越来越大,体积过大时,可能会对redis服务器造成影响,还原数据库状态所耗费的时间也越多。
为了解决AOF文件膨胀的问题,redis提供了AOF rewrite功能,使用该功能时,redis服务器将创建一个新的AOF文件来代替现有文件。

AOF文件重写的实现

实际上AOF rewrite并不需要对现有的AOF文件做读取、分析、写入操作,而是通过读取服务器当前的数据库状态实现的,其完整流程如下:

  1. 创建新的AOF文件
  2. 遍历所有数据库[忽略空数据库]
  3. 遍历数据库中的所有键[忽略已过期的键]
  4. 根据键的类型进行重写
  5. 写入完毕,关闭文件

为了避免在执行命令时,造成客户端输入缓冲区移除,重写时会先检查键所包含的元素数量,如果元素数量超过了AOF_REWRITE_ITEMS_PER_CMD,则将分作多个命令写入。默认值为 64。

1
#define AOF_REWRITE_ITEMS_PER_CMD 64

AOF后台重写

上述的AOF rewrite过程包含大量的写入操作,调用时将长时间阻塞服务器主线程,故而在redis中通常将aof rewrite操作放到子进程中执行。这样做有两个好处:

  1. 子进程在aof rewrite期间,服务器主进程可以继续处理命令请求。
  2. 子进程带有服务器进程的数据副本,可以在避免使用锁的情况下,保证数据的安全性。

使用子进程执行AOF rewrite有一个问题,即主进程中仍能处理命令请求,会对现有数据库状态进行修改,故而导致服务器当前状态和子进程重写AOF文件所保存的数据库状态不一致的情况。
为此,redis设置了一个AOF 重写缓冲区,,该缓存在fork出子进程后开始使用,主进程执行完写命令后,会同时将这个写命令写入到aof_bufAOF 重写缓冲区
当子进程完成重写之后,向父进程发送信号,执行如下任务:

  1. AOF 重写缓冲区中的所有内容写入到新的AOF文件中,以此保证新的aof文件保存的服务器状态和当前服务器的状态一致。
  2. 对新的AOF文件进行改名,原子的覆盖现有AOF文件。
    整个过程中,服务器只有处理 信号处理函数时会被阻塞。
    以上即是BGREWRITEAOF命令的实现原理。

AOF后台重写的触发条件

redisServer中有如下三个和AOF rewrite相关的变量,用以控制aof 后台重写的执行。

1
2
3
4
5
6
struct redisServer {
off_t aof_current_size; // aof文件当前的大小
off_t aof_rewrite_base_size; // 最近一次aof重写后 aof文件的大小
int aof_rewrite_perc; // 增长百分比变量
off_t aof_rewrite_min_size; // 执行aof重写时,aof文件的最小值
}

每当serverCron函数执行时,如果满足如下条件时,则会自动触发aof rewrite

  1. 没有bgsave命令在执行。
  2. 没有bgrewriteaof命令在执行。
  3. 当前AOF文件的大小大于redisServer.aof_rewrite_min_size
  4. (aof_current_size - aof_base_size) / aof_base_size > auto-aof-rewrite-percentage
    上述默认参数值为:
    1
    2
    auto-aof-rewrite-percentage 100
    auto-aof-rewrite-min-size 64mb

AOF后台重写 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void bgrewriteaofCommand(client *c) {
// 如果当前服务器正在执行 aof重写
if (server.child_type == CHILD_TYPE_AOF) {
addReplyError(c,"Background append only file rewriting already in progress");
} else if (hasActiveChildProcess() || server.in_exec) {
server.aof_rewrite_scheduled = 1; // 标记bgsave结束后执行aof重写
server.stat_aofrw_consecutive_failures = 0; // aofrw 连续失败的次数
addReplyStatus(c,"Background append only file rewriting scheduled");
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
addReplyStatus(c,"Background append only file rewriting started");
} else {
addReplyError(c,"Can't execute an AOF background rewriting. "
"Please check the server logs for more information.");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/* ----------------------------------------------------------------------------
* AOF background rewrite
* ------------------------------------------------------------------------- */

/* This is how rewriting of the append only file in background works:
*
* 1) The user calls BGREWRITEAOF
* 2) Redis calls this function, that forks():
* 2a) the child rewrite the append only file in a temp file.
* 2b) the parent open a new INCR AOF file to continue writing.
* 3) When the child finished '2a' exists.
* 4) The parent will trap the exit code, if it's OK, it will:
* 4a) get a new BASE file name and mark the previous (if we have) as the HISTORY type
* 4b) rename(2) the temp file in new BASE file name
* 4c) mark the rewritten INCR AOFs as history type
* 4d) persist AOF manifest file
* 4e) Delete the history files use bio
*/
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;

if (hasActiveChildProcess()) return C_ERR;
// appenddirname 默认值为"appendonlydir",如果不存在或者创建失败
if (dirCreateIfMissing(server.aof_dirname) == -1) {
serverLog(LL_WARNING, "Can't open or create append-only dir %s: %s",
server.aof_dirname, strerror(errno));
server.aof_lastbgrewrite_status = C_ERR;
return C_ERR;
}

/* We set aof_selected_db to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command. */
server.aof_selected_db = -1;
// 无论后台是否有调用fsync,都执行落盘动作
flushAppendOnlyFile(1);
if (openNewIncrAofForAppend() != C_OK) {
server.aof_lastbgrewrite_status = C_ERR;
return C_ERR;
}
server.stat_aof_rewrites++;
if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
char tmpfile[256];

/* Child */
redisSetProcTitle("redis-aof-rewrite");
redisSetCpuAffinity(server.aof_rewrite_cpulist);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
serverLog(LL_NOTICE,
"Successfully created the temporary AOF base file %s", tmpfile);
sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite");
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
/* Parent */
if (childpid == -1) {
server.aof_lastbgrewrite_status = C_ERR;
serverLog(LL_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
return C_ERR;
}
serverLog(LL_NOTICE,
"Background append only file rewriting started by pid %ld",(long) childpid);
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
return C_OK;
}
return C_OK; /* unreached */
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/* Called in `rewriteAppendOnlyFileBackground`. If `server.aof_state`
* is 'AOF_ON', It will do two things:
* 1. Open a new INCR type AOF for writing
* 2. Synchronously update the manifest file to the disk
*
* The above two steps of modification are atomic, that is, if
* any step fails, the entire operation will rollback and returns
* C_ERR, and if all succeeds, it returns C_OK.
*
* If `server.aof_state` is 'AOF_WAIT_REWRITE', It will open a temporary INCR AOF
* file to accumulate data during AOF_WAIT_REWRITE, and it will eventually be
* renamed in the `backgroundRewriteDoneHandler` and written to the manifest file.
* */
int openNewIncrAofForAppend(void) {
serverAssert(server.aof_manifest != NULL);
int newfd = -1;
aofManifest *temp_am = NULL;
sds new_aof_name = NULL;

/* Only open new INCR AOF when AOF enabled. */
if (server.aof_state == AOF_OFF) return C_OK;

/* Open new AOF. */
if (server.aof_state == AOF_WAIT_REWRITE) {
/* Use a temporary INCR AOF file to accumulate data during AOF_WAIT_REWRITE. */
new_aof_name = getTempIncrAofName();
} else {
/* Dup a temp aof_manifest to modify. */
temp_am = aofManifestDup(server.aof_manifest);
new_aof_name = sdsdup(getNewIncrAofName(temp_am));
}
sds new_aof_filepath = makePath(server.aof_dirname, new_aof_name);
newfd = open(new_aof_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644);
sdsfree(new_aof_filepath);
if (newfd == -1) {
serverLog(LL_WARNING, "Can't open the append-only file %s: %s",
new_aof_name, strerror(errno));
goto cleanup;
}

if (temp_am) {
/* Persist AOF Manifest. */
if (persistAofManifest(temp_am) == C_ERR) {
goto cleanup;
}
}

serverLog(LL_NOTICE, "Creating AOF incr file %s on background rewrite",
new_aof_name);
sdsfree(new_aof_name);

/* If reaches here, we can safely modify the `server.aof_manifest`
* and `server.aof_fd`. */

/* fsync and close old aof_fd if needed. In fsync everysec it's ok to delay
* the fsync as long as we grantee it happens, and in fsync always the file
* is already synced at this point so fsync doesn't matter. */
if (server.aof_fd != -1) {
aof_background_fsync_and_close(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
server.aof_fd = newfd;

/* Reset the aof_last_incr_size. */
server.aof_last_incr_size = 0;
/* Reset the aof_last_incr_fsync_offset. */
server.aof_last_incr_fsync_offset = 0;
/* Update `server.aof_manifest`. */
if (temp_am) aofManifestFreeAndUpdate(temp_am);
return C_OK;

cleanup:
if (new_aof_name) sdsfree(new_aof_name);
if (newfd != -1) close(newfd);
if (temp_am) aofManifestFree(temp_am);
return C_ERR;
}

AOF重写的主逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp = NULL;
char tmpfile[256];

/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}
// 初始化aof为 文件io对象
rioInitWithFile(&aof,fp);
// 如果开启aof增量同步功能 防止在缓存中堆积太多命令,造成写入时的IO阻塞时长过长
if (server.aof_rewrite_incremental_fsync) {
// 将自动同步的字节数限制为 REDIS_AUTOSYNC_BYTES, 默认4MB
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
rioSetReclaimCache(&aof,1);
}
// 触发持久性模块启动事件
startSaving(RDBFLAGS_AOF_PREAMBLE);

// 在aof重写时,基础的aof重写使用rdb编码
if (server.aof_use_rdb_preamble) {
int error;

if (rdbSaveRio(SLAVE_REQ_NONE,&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}

/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp)) goto werr;
if (fsync(fileno(fp))) goto werr;
if (reclaimFilePageCache(fileno(fp), 0, 0) == -1) {
/* A minor error. Just log to know what happens */
serverLog(LL_NOTICE,"Unable to reclaim page cache: %s", strerror(errno));
}
if (fclose(fp)) { fp = NULL; goto werr; }
fp = NULL;

/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
// REDISMODULE_SUBEVENT_PERSISTENCE_FAILED 失败
stopSaving(0);
return C_ERR;
}
// REDISMODULE_SUBEVENT_PERSISTENCE_ENDED 成功
stopSaving(1);

return C_OK;

werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
if (fp) fclose(fp);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}

注意到,redis5.0之后,aof_use_rdb_preamble = yes服务器配置中配置了aof_use_rdb_preamble,指定在aof重写时基础aof写入时使用rdb编码;否则调用rewriteAppendOnlyFileRio完成文件写入。

对比一下差异:https://blog.csdn.net/Aquester/article/details/88550655

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
int rewriteAppendOnlyFileRio(rio *aof) {
dictIterator *di = NULL;
dictEntry *de;
int j;
long key_count = 0;
long long updated_time = 0;

/* Record timestamp at the beginning of rewriting AOF. */
if (server.aof_timestamp_enabled) {
sds ts = genAofTimestampAnnotationIfNeeded(1);
if (rioWrite(aof,ts,sdslen(ts)) == 0) { sdsfree(ts); goto werr; }
sdsfree(ts);
}

if (rewriteFunctions(aof) == 0) goto werr;

for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);

/* SELECT the new DB */
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;

/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;
size_t aof_bytes_before_key = aof->processed_bytes;

keystr = dictGetKey(de);
o = dictGetVal(de);
initStaticStringObject(key,keystr);

expiretime = getExpire(db,&key);

/* Save the key and associated value */
if (o->type == OBJ_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkObject(aof,o) == 0) goto werr;
} else if (o->type == OBJ_LIST) {
if (rewriteListObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_SET) {
if (rewriteSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_ZSET) {
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_STREAM) {
if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof,&key,o,j) == 0) goto werr;
} else {
serverPanic("Unknown object type");
}

/* In fork child process, we can try to release memory back to the
* OS and possibly avoid or decrease COW. We give the dismiss
* mechanism a hint about an estimated size of the object we stored. */
size_t dump_size = aof->processed_bytes - aof_bytes_before_key;
if (server.in_fork_child) dismissObject(o, dump_size);

/* Save the expire time */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
}

/* Update info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
* check the diff every 1024 keys */
if ((key_count++ & 1023) == 0) {
long long now = mstime();
if (now - updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite");
updated_time = now;
}
}

/* Delay before next key if required (for testing) */
if (server.rdb_key_save_delay)
debugDelay(server.rdb_key_save_delay);
}
dictReleaseIterator(di);
di = NULL;
}
return C_OK;

werr:
if (di) dictReleaseIterator(di);
return C_ERR;
}

上述有几个点需要注意:

  1. sendChildInfo
1
2
3
4
5
6
7
void rioInitWithFile(rio *r, FILE *fp) {
*r = rioFileIO;
r->io.file.fp = fp;
r->io.file.buffered = 0;
r->io.file.autosync = 0;
r->io.file.reclaim_cache = 0;
}

引用

1. rdb.c
2. redis rdb源码阅读
3. redis aof实现
4. aof源码
5. aof流程
6. aof源码分析