intrdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) { char tmpfile[256]; char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
staticintrdbSaveInternal(int req, constchar *filename, rdbSaveInfo *rsi, int rdbflags) { char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ rio rdb; int error = 0; int saved_errno; char *err_op; /* For a detailed log */ // 以写权限打开文件 FILE *fp = fopen(filename,"w"); if (!fp) { saved_errno = errno; char *str_err = strerror(errno); char *cwdp = getcwd(cwd,MAXPATHLEN); serverLog(LL_WARNING, "Failed opening the temp RDB file %s (in server root dir %s) " "for saving: %s", filename, cwdp ? cwdp : "unknown", str_err); errno = saved_errno; return C_ERR; } // 初始化 rio 对象 (文件对象io) rioInitWithFile(&rdb,fp);
if (server.rdb_save_incremental_fsync) { rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); if (!(rdbflags & RDBFLAGS_KEEP_CACHE)) rioSetReclaimCache(&rdb,1); }
werr: saved_errno = errno; serverLog(LL_WARNING,"Write error while saving DB to the disk(%s): %s", err_op, strerror(errno)); if (fp) fclose(fp); unlink(filename); errno = saved_errno; return C_ERR; }
intrdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { char magic[10]; uint64_t cksum; long key_counter = 0; int j;
if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
/* save functions */ if (!(req & SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS) && rdbSaveFunctions(rdb) == -1) goto werr;
// 遍历服务器上的所有数据库 /* save all databases, skip this if we're in functions-only mode */ if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA)) { for (j = 0; j < server.dbnum; j++) { if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr; } }
// 向 rdb文件对象中写入 EOF if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
/* CRC64 checksum. It will be zero if checksum computation is disabled, the * loading code skips the check in this case. */ cksum = rdb->cksum; memrev64ifbe(&cksum); if (rioWrite(rdb,&cksum,8) == 0) goto werr; return C_OK;
/* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite * is in progress. Instead of returning an error a BGSAVE gets scheduled. */ if (c->argc > 1) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) { schedule = 1; } else { addReplyErrorObject(c,shared.syntaxerr); return; } }
/* purpose is one of CHILD_TYPE_ types */ intredisFork(int purpose) { if (isMutuallyExclusiveChildType(purpose)) { if (hasActiveChildProcess()) { errno = EEXIST; return-1; }
openChildInfoPipe(); }
int childpid; longlong start = ustime(); if ((childpid = fork()) == 0) { // 设置遵循如下原因 // 1. 由于信号随时可能触发,所以需要先设置信号处理程序 // 2. 随后调整OOM得分然后再做其他事情,以此帮助OOM killer在资源不足时进行回收 server.in_fork_child = purpose; setupChildSignalHandlers(); setOOMScoreAdj(CONFIG_OOM_BGCHILD); updateDictResizePolicy(); dismissMemoryInChild(); // fork结束后关闭子进程中未用到的资源 closeChildUnusedResourceAfterFork(); /* Close the reading part, so that if the parent crashes, the child will * get a write error and exit. */ if (server.child_info_pipe[0] != -1) close(server.child_info_pipe[0]); } else { /* Parent */ if (childpid == -1) { int fork_errno = errno; if (isMutuallyExclusiveChildType(purpose)) closeChildInfoPipe(); errno = fork_errno; return-1; }
/* The child_pid and child_type are only for mutually exclusive children. * other child types should handle and store their pid's in dedicated variables. * * Today, we allows CHILD_TYPE_LDB to run in parallel with the other fork types: * - it isn't used for production, so it will not make the server be less efficient * - used for debugging, and we don't want to block it from running while other * forks are running (like RDB and AOF) */ if (isMutuallyExclusiveChildType(purpose)) { server.child_pid = childpid; server.child_type = purpose; server.stat_current_cow_peak = 0; server.stat_current_cow_bytes = 0; server.stat_current_cow_updated = 0; server.stat_current_save_keys_processed = 0; server.stat_module_progress = 0; server.stat_current_save_keys_total = dbTotalServerKeyCount(); }
/* Check if we need to do fsync even the aof buffer is empty, * the reason is described in the previous AOF_FSYNC_EVERYSEC block, * and AOF_FSYNC_ALWAYS is also checked here to handle a case where * aof_fsync is changed from everysec to always. */ } elseif (server.aof_fsync == AOF_FSYNC_ALWAYS && server.aof_last_incr_fsync_offset != server.aof_last_incr_size) { goto try_fsync; } else { return; } }
if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = aofFsyncInProgress();
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { // EVERYSEC 模式下, 将在后台执行fsync // 如果fsync正在进行中,则在2秒后尝试进行 fsync if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { /* No previous write postponing, remember that we are * postponing the flush and return. */ server.aof_flush_postponed_start = server.unixtime; return; } elseif (server.unixtime - server.aof_flush_postponed_start < 2) { /* We were already waiting for fsync to finish, but for less * than two seconds this is still ok. Postpone again. */ return; } // 否则失败 /* Otherwise fall through, and go write since we can't wait * over two seconds. */ server.aof_delayed_fsync++; serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } } /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */
if (server.aof_flush_sleep && sdslen(server.aof_buf)) { usleep(server.aof_flush_sleep); }
latencyStartMonitor(latency); nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); latencyEndMonitor(latency); /* We want to capture different events for delayed writes: * when the delay happens with a pending fsync, or with a saving child * active, and when the above two conditions are missing. * We also use an additional event name to save all samples which is * useful for graphing / monitoring purposes. */ if (sync_in_progress) { latencyAddSampleIfNeeded("aof-write-pending-fsync",latency); } elseif (hasActiveChildProcess()) { latencyAddSampleIfNeeded("aof-write-active-child",latency); } else { latencyAddSampleIfNeeded("aof-write-alone",latency); } latencyAddSampleIfNeeded("aof-write",latency);
/* We performed the write so reset the postponed flush sentinel to zero. */ server.aof_flush_postponed_start = 0;
if (nwritten != (ssize_t)sdslen(server.aof_buf)) { statictime_t last_write_error_log = 0; int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */ if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) { can_log = 1; last_write_error_log = server.unixtime; }
/* Log the AOF write error and record the error code. */ if (nwritten == -1) { if (can_log) { serverLog(LL_WARNING,"Error writing to the AOF file: %s", strerror(errno)); } server.aof_last_write_errno = errno; } else { if (can_log) { serverLog(LL_WARNING,"Short write while writing to " "the AOF file: (nwritten=%lld, " "expected=%lld)", (longlong)nwritten, (longlong)sdslen(server.aof_buf)); }
if (ftruncate(server.aof_fd, server.aof_last_incr_size) == -1) { if (can_log) { serverLog(LL_WARNING, "Could not remove short write " "from the append-only file. Redis may refuse " "to load the AOF the next time it starts. " "ftruncate: %s", strerror(errno)); } } else { /* If the ftruncate() succeeded we can set nwritten to * -1 since there is no longer partial data into the AOF. */ nwritten = -1; } server.aof_last_write_errno = ENOSPC; }
/* Handle the AOF write error. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* We can't recover when the fsync policy is ALWAYS since the reply * for the client is already in the output buffers (both writes and * reads), and the changes to the db can't be rolled back. Since we * have a contract with the user that on acknowledged or observed * writes are is synced on disk, we must exit. */ serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting..."); exit(1); } else { /* Recover from failed write leaving data into the buffer. However * set an error to stop accepting writes as long as the error * condition is not cleared. */ server.aof_last_write_status = C_ERR;
/* Trim the sds buffer if there was a partial write, and there * was no way to undo it with ftruncate(2). */ if (nwritten > 0) { server.aof_current_size += nwritten; server.aof_last_incr_size += nwritten; sdsrange(server.aof_buf,nwritten,-1); } return; /* We'll try again on the next call... */ } } else { /* Successful write(2). If AOF was in error state, restore the * OK state and log the event. */ if (server.aof_last_write_status == C_ERR) { serverLog(LL_NOTICE, "AOF write error looks solved, Redis can write again."); server.aof_last_write_status = C_OK; } } server.aof_current_size += nwritten; server.aof_last_incr_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the * arena size of 4k minus some overhead (but is otherwise arbitrary). */ if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { sdsclear(server.aof_buf); } else { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); }
try_fsync: /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess()) return;
/* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* redis_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ latencyStartMonitor(latency); /* Let's try to get this data on the disk. To guarantee data safe when * the AOF fsync policy is 'always', we should exit if failed to fsync * AOF (see comment next to the exit(1) after write error above). */ if (redis_fsync(server.aof_fd) == -1) { serverLog(LL_WARNING,"Can't persist AOF for fsync error when the " "AOF fsync policy is 'always': %s. Exiting...", strerror(errno)); exit(1); } latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_last_incr_fsync_offset = server.aof_last_incr_size; server.aof_last_fsync = server.unixtime; atomicSet(server.fsynced_reploff_pending, server.master_repl_offset); } elseif (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync) { if (!sync_in_progress) { aof_background_fsync(server.aof_fd); server.aof_last_incr_fsync_offset = server.aof_last_incr_size; } server.aof_last_fsync = server.unixtime; } }
/* This is how rewriting of the append only file in background works: * * 1) The user calls BGREWRITEAOF * 2) Redis calls this function, that forks(): * 2a) the child rewrite the append only file in a temp file. * 2b) the parent open a new INCR AOF file to continue writing. * 3) When the child finished '2a' exists. * 4) The parent will trap the exit code, if it's OK, it will: * 4a) get a new BASE file name and mark the previous (if we have) as the HISTORY type * 4b) rename(2) the temp file in new BASE file name * 4c) mark the rewritten INCR AOFs as history type * 4d) persist AOF manifest file * 4e) Delete the history files use bio */ intrewriteAppendOnlyFileBackground(void) { pid_t childpid;
if (hasActiveChildProcess()) return C_ERR; // appenddirname 默认值为"appendonlydir",如果不存在或者创建失败 if (dirCreateIfMissing(server.aof_dirname) == -1) { serverLog(LL_WARNING, "Can't open or create append-only dir %s: %s", server.aof_dirname, strerror(errno)); server.aof_lastbgrewrite_status = C_ERR; return C_ERR; }
/* We set aof_selected_db to -1 in order to force the next call to the * feedAppendOnlyFile() to issue a SELECT command. */ server.aof_selected_db = -1; // 无论后台是否有调用fsync,都执行落盘动作 flushAppendOnlyFile(1); if (openNewIncrAofForAppend() != C_OK) { server.aof_lastbgrewrite_status = C_ERR; return C_ERR; } server.stat_aof_rewrites++; if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) { char tmpfile[256];
/* Child */ redisSetProcTitle("redis-aof-rewrite"); redisSetCpuAffinity(server.aof_rewrite_cpulist); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == C_OK) { serverLog(LL_NOTICE, "Successfully created the temporary AOF base file %s", tmpfile); sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite"); exitFromChild(0); } else { exitFromChild(1); } } else { /* Parent */ if (childpid == -1) { server.aof_lastbgrewrite_status = C_ERR; serverLog(LL_WARNING, "Can't rewrite append only file in background: fork: %s", strerror(errno)); return C_ERR; } serverLog(LL_NOTICE, "Background append only file rewriting started by pid %ld",(long) childpid); server.aof_rewrite_scheduled = 0; server.aof_rewrite_time_start = time(NULL); return C_OK; } return C_OK; /* unreached */ }
/* Called in `rewriteAppendOnlyFileBackground`. If `server.aof_state` * is 'AOF_ON', It will do two things: * 1. Open a new INCR type AOF for writing * 2. Synchronously update the manifest file to the disk * * The above two steps of modification are atomic, that is, if * any step fails, the entire operation will rollback and returns * C_ERR, and if all succeeds, it returns C_OK. * * If `server.aof_state` is 'AOF_WAIT_REWRITE', It will open a temporary INCR AOF * file to accumulate data during AOF_WAIT_REWRITE, and it will eventually be * renamed in the `backgroundRewriteDoneHandler` and written to the manifest file. * */ intopenNewIncrAofForAppend(void) { serverAssert(server.aof_manifest != NULL); int newfd = -1; aofManifest *temp_am = NULL; sds new_aof_name = NULL;
/* Only open new INCR AOF when AOF enabled. */ if (server.aof_state == AOF_OFF) return C_OK;
/* Open new AOF. */ if (server.aof_state == AOF_WAIT_REWRITE) { /* Use a temporary INCR AOF file to accumulate data during AOF_WAIT_REWRITE. */ new_aof_name = getTempIncrAofName(); } else { /* Dup a temp aof_manifest to modify. */ temp_am = aofManifestDup(server.aof_manifest); new_aof_name = sdsdup(getNewIncrAofName(temp_am)); } sds new_aof_filepath = makePath(server.aof_dirname, new_aof_name); newfd = open(new_aof_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644); sdsfree(new_aof_filepath); if (newfd == -1) { serverLog(LL_WARNING, "Can't open the append-only file %s: %s", new_aof_name, strerror(errno)); goto cleanup; }
if (temp_am) { /* Persist AOF Manifest. */ if (persistAofManifest(temp_am) == C_ERR) { goto cleanup; } }
/* If reaches here, we can safely modify the `server.aof_manifest` * and `server.aof_fd`. */
/* fsync and close old aof_fd if needed. In fsync everysec it's ok to delay * the fsync as long as we grantee it happens, and in fsync always the file * is already synced at this point so fsync doesn't matter. */ if (server.aof_fd != -1) { aof_background_fsync_and_close(server.aof_fd); server.aof_last_fsync = server.unixtime; } server.aof_fd = newfd;
/* Reset the aof_last_incr_size. */ server.aof_last_incr_size = 0; /* Reset the aof_last_incr_fsync_offset. */ server.aof_last_incr_fsync_offset = 0; /* Update `server.aof_manifest`. */ if (temp_am) aofManifestFreeAndUpdate(temp_am); return C_OK;
cleanup: if (new_aof_name) sdsfree(new_aof_name); if (newfd != -1) close(newfd); if (temp_am) aofManifestFree(temp_am); return C_ERR; }
/* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. * * In order to minimize the number of commands needed in the rewritten * log Redis uses variadic commands when possible, such as RPUSH, SADD * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time * are inserted using a single command. */ intrewriteAppendOnlyFile(char *filename) { rio aof; FILE *fp = NULL; char tmpfile[256];
/* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() function. */ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); return C_ERR; } // 初始化aof为 文件io对象 rioInitWithFile(&aof,fp); // 如果开启aof增量同步功能 防止在缓存中堆积太多命令,造成写入时的IO阻塞时长过长 if (server.aof_rewrite_incremental_fsync) { // 将自动同步的字节数限制为 REDIS_AUTOSYNC_BYTES, 默认4MB rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); rioSetReclaimCache(&aof,1); } // 触发持久性模块启动事件 startSaving(RDBFLAGS_AOF_PREAMBLE);
// 在aof重写时,基础的aof重写使用rdb编码 if (server.aof_use_rdb_preamble) { int error;
/* Make sure data will not remain on the OS's output buffers */ if (fflush(fp)) goto werr; if (fsync(fileno(fp))) goto werr; if (reclaimFilePageCache(fileno(fp), 0, 0) == -1) { /* A minor error. Just log to know what happens */ serverLog(LL_NOTICE,"Unable to reclaim page cache: %s", strerror(errno)); } if (fclose(fp)) { fp = NULL; goto werr; } fp = NULL;
/* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); // REDISMODULE_SUBEVENT_PERSISTENCE_FAILED 失败 stopSaving(0); return C_ERR; } // REDISMODULE_SUBEVENT_PERSISTENCE_ENDED 成功 stopSaving(1);
return C_OK;
werr: serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); if (fp) fclose(fp); unlink(tmpfile); stopSaving(0); return C_ERR; }
intrewriteAppendOnlyFileRio(rio *aof) { dictIterator *di = NULL; dictEntry *de; int j; long key_count = 0; longlong updated_time = 0;
/* Record timestamp at the beginning of rewriting AOF. */ if (server.aof_timestamp_enabled) { sds ts = genAofTimestampAnnotationIfNeeded(1); if (rioWrite(aof,ts,sdslen(ts)) == 0) { sdsfree(ts); goto werr; } sdsfree(ts); }
if (rewriteFunctions(aof) == 0) goto werr;
for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d);
/* SELECT the new DB */ if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
/* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { sds keystr; robj key, *o; longlong expiretime; size_t aof_bytes_before_key = aof->processed_bytes;
keystr = dictGetKey(de); o = dictGetVal(de); initStaticStringObject(key,keystr);
expiretime = getExpire(db,&key);
/* Save the key and associated value */ if (o->type == OBJ_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ if (rioWriteBulkObject(aof,&key) == 0) goto werr; if (rioWriteBulkObject(aof,o) == 0) goto werr; } elseif (o->type == OBJ_LIST) { if (rewriteListObject(aof,&key,o) == 0) goto werr; } elseif (o->type == OBJ_SET) { if (rewriteSetObject(aof,&key,o) == 0) goto werr; } elseif (o->type == OBJ_ZSET) { if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr; } elseif (o->type == OBJ_HASH) { if (rewriteHashObject(aof,&key,o) == 0) goto werr; } elseif (o->type == OBJ_STREAM) { if (rewriteStreamObject(aof,&key,o) == 0) goto werr; } elseif (o->type == OBJ_MODULE) { if (rewriteModuleObject(aof,&key,o,j) == 0) goto werr; } else { serverPanic("Unknown object type"); }
/* In fork child process, we can try to release memory back to the * OS and possibly avoid or decrease COW. We give the dismiss * mechanism a hint about an estimated size of the object we stored. */ size_t dump_size = aof->processed_bytes - aof_bytes_before_key; if (server.in_fork_child) dismissObject(o, dump_size);
/* Save the expire time */ if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(aof,&key) == 0) goto werr; if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr; }
/* Update info every 1 second (approximately). * in order to avoid calling mstime() on each iteration, we will * check the diff every 1024 keys */ if ((key_count++ & 1023) == 0) { longlong now = mstime(); if (now - updated_time >= 1000) { sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite"); updated_time = now; } }
/* Delay before next key if required (for testing) */ if (server.rdb_key_save_delay) debugDelay(server.rdb_key_save_delay); } dictReleaseIterator(di); di = NULL; } return C_OK;
werr: if (di) dictReleaseIterator(di); return C_ERR; }