/* This function overwrites a few normal Redis config default with Sentinel * specific defaults. */ voidinitSentinelConfig(void) { server.port = REDIS_SENTINEL_PORT; // #define REDIS_SENTINEL_PORT 26379 server.protected_mode = 0; /* Sentinel must be exposed. */ }
/* This function is used for loading the sentinel configuration from * pre_monitor_cfg, monitor_cfg and post_monitor_cfg list */ voidloadSentinelConfigFromQueue(void) { constchar *err = NULL; listIter li; listNode *ln; int linenum = 0; sds line = NULL; unsignedint j;
/* if there is no sentinel_config entry, we can return immediately */ if (server.sentinel_config == NULL) return;
list *sentinel_configs[3] = { server.sentinel_config->pre_monitor_cfg, server.sentinel_config->monitor_cfg, server.sentinel_config->post_monitor_cfg }; /* loading from pre monitor config queue first to avoid dependency issues * loading from monitor config queue * loading from the post monitor config queue */ for (j = 0; j < sizeof(sentinel_configs) / sizeof(sentinel_configs[0]); j++) { listRewind(sentinel_configs[j],&li); while((ln = listNext(&li))) { structsentinelLoadQueueEntry *entry = ln->value; err = sentinelHandleConfiguration(entry->argv,entry->argc); if (err) { linenum = entry->linenum; line = entry->line; goto loaderr; } } }
/* free sentinel_config when config loading is finished */ freeSentinelConfig(); return;
loaderr: fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR (Redis %s) ***\n", REDIS_VERSION); fprintf(stderr, "Reading the configuration file, at line %d\n", linenum); fprintf(stderr, ">>> '%s'\n", line); fprintf(stderr, "%s\n", err); exit(1); }
/* This function is for checking whether sentinel config file has been set, * also checking whether we have write permissions. */ voidsentinelCheckConfigFile(void) { // server.configfile 在 server.c 中已被赋值 if (server.configfile == NULL) { serverLog(LL_WARNING, "Sentinel needs config file on disk to save state. Exiting..."); exit(1); } elseif (access(server.configfile,W_OK) == -1) { serverLog(LL_WARNING, "Sentinel config file %s is not writable: %s. Exiting...", server.configfile,strerror(errno)); exit(1); } }
sentinelGenerateInitialMonitorEvents函数遍历sentinel.masters,为每一个待监听的sentinel创建+monitor事件。 该方法在每次调用sentinel monitor master ip port quorum命令时触发,用以为每一个配置的master创建+monitor event事件。 sentinel monitor命令的作用是让sentinel监听主节点配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* This function is called only at startup and is used to generate a * +monitor event for every configured master. The same events are also * generated when a master to monitor is added at runtime via the * SENTINEL MONITOR command. */ voidsentinelGenerateInitialMonitorEvents(void) { dictIterator *di; dictEntry *de;
di = dictGetIterator(sentinel.masters); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum); } dictReleaseIterator(di); }
/* Use vsprintf for the rest of the formatting if any. */ if (fmt[0] != '\0') { va_start(ap, fmt); vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap); va_end(ap); }
/* Log the message if the log level allows it to be logged. */ if (level >= server.verbosity) serverLog(level,"%s %s",type,msg);
/* Publish the message via Pub/Sub if it's not a debugging one. */ if (level != LL_DEBUG) { channel = createStringObject(type,strlen(type)); payload = createStringObject(msg,strlen(msg)); pubsubPublishMessage(channel,payload,0); decrRefCount(channel); decrRefCount(payload); }
/* Call the notification script if applicable. */ if (level == LL_WARNING && ri != NULL) { sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master; if (master && master->notification_script) { sentinelScheduleScriptExecution(master->notification_script, type,msg,NULL); } } }
/* We continuously change the frequency of the Redis "timer interrupt" * in order to desynchronize every Sentinel from every other. * This non-determinism avoids that Sentinels started at the same time * exactly continue to stay synchronized asking to be voted at the * same time again and again (resulting in nobody likely winning the * election because of split brain voting). */ server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ; }
可以看到sentinelTimer中主要分为三部分:
检查是否处于titl静默模式中
处理rsi实例
处理脚本
sentinelTimer -> sentinelCheckTiltCondition
下面逐一来看:
1 2 3 4 5 6 7 8 9 10 11
voidsentinelCheckTiltCondition(void) { mstime_t now = mstime(); mstime_t delta = now - sentinel.previous_time;
/* Perform scheduled operations for all the instances in the dictionary. * Recursively call the function against dictionaries of slaves. */ voidsentinelHandleDictOfRedisInstances(dict *instances) { dictIterator *di; dictEntry *de; sentinelRedisInstance *switch_to_promoted = NULL;
/* There are a number of things we need to perform against every master. */ di = dictGetIterator(instances); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de);
sentinelHandleRedisInstance(ri); if (ri->flags & SRI_MASTER) { sentinelHandleDictOfRedisInstances(ri->slaves); sentinelHandleDictOfRedisInstances(ri->sentinels); if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) { switch_to_promoted = ri; } } } if (switch_to_promoted) sentinelFailoverSwitchToPromotedSlave(switch_to_promoted); dictReleaseIterator(di); }
/* Perform scheduled operations for the specified Redis instance. */ voidsentinelHandleRedisInstance(sentinelRedisInstance *ri) { /* ========== MONITORING HALF ============ */ /* Every kind of instance */ sentinelReconnectInstance(ri); sentinelSendPeriodicCommands(ri);
/* ============== ACTING HALF ============= */ /* We don't proceed with the acting half if we are in TILT mode. * TILT happens when we find something odd with the time, like a * sudden change in the clock. */ if (sentinel.tilt) { if (mstime()-sentinel.tilt_start_time < sentinel_tilt_period) return; sentinel.tilt = 0; sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited"); }
/* Every kind of instance */ sentinelCheckSubjectivelyDown(ri);
/* Masters and slaves */ if (ri->flags & (SRI_MASTER|SRI_SLAVE)) { /* Nothing so far. */ }
/* Only masters */ if (ri->flags & SRI_MASTER) { sentinelCheckObjectivelyDown(ri); if (sentinelStartFailoverIfNeeded(ri)) sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED); sentinelFailoverStateMachine(ri); sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS); } }
/* Create the async connections for the instance link if the link * is disconnected. Note that link->disconnected is true even if just * one of the two links (commands and pub/sub) is missing. */ voidsentinelReconnectInstance(sentinelRedisInstance *ri) {
if (ri->link->disconnected == 0) return; if (ri->addr->port == 0) return; /* port == 0 means invalid address. */ instanceLink *link = ri->link; mstime_t now = mstime();
if (now - ri->link->last_reconn_time < sentinel_ping_period) return; ri->link->last_reconn_time = now;
/* Commands connection. */ if (link->cc == NULL) {
/* It might be that the instance is disconnected because it wasn't available earlier when the instance * allocated, say during failover, and therefore we failed to resolve its ip. * Another scenario is that the instance restarted with new ip, and we should resolve its new ip based on * its hostname */ if (sentinel.resolve_hostnames) { sentinelAddr *tryResolveAddr = createSentinelAddr(ri->addr->hostname, ri->addr->port, 0); if (tryResolveAddr != NULL) { releaseSentinelAddr(ri->addr); ri->addr = tryResolveAddr; } }
/* Send a PING ASAP when reconnecting. */ sentinelSendPing(ri); } } /* Pub / Sub */ if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) { link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,server.bind_source_addr); if (link->pc && !link->pc->err) anetCloexec(link->pc->c.fd); if (!link->pc) { sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to establish connection"); } elseif (!link->pc->err && server.tls_replication && (instanceLinkNegotiateTLS(link->pc) == C_ERR)) { sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to initialize TLS"); } elseif (link->pc->err) { sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s", link->pc->errstr); instanceLinkCloseConnection(link,link->pc); } else { int retval; link->pc_conn_time = mstime(); link->pc->data = link; redisAeAttach(server.el,link->pc); redisAsyncSetConnectCallback(link->pc, sentinelLinkEstablishedCallback); redisAsyncSetDisconnectCallback(link->pc, sentinelDisconnectCallback); sentinelSendAuthIfNeeded(ri,link->pc); sentinelSetClientName(ri,link->pc,"pubsub"); /* Now we subscribe to the Sentinels "Hello" channel. */ retval = redisAsyncCommand(link->pc, sentinelReceiveHelloMessages, ri, "%s %s", sentinelInstanceMapCommand(ri,"SUBSCRIBE"), SENTINEL_HELLO_CHANNEL); if (retval != C_OK) { /* If we can't subscribe, the Pub/Sub connection is useless * and we can simply disconnect it and try again. */ instanceLinkCloseConnection(link,link->pc); return; } } } /* Clear the disconnected status only if we have both the connections * (or just the commands connection if this is a sentinel instance). */ if (link->cc && (ri->flags & SRI_SENTINEL || link->pc)) link->disconnected = 0; }
/* Send periodic PING, INFO, and PUBLISH to the Hello channel to * the specified master or slave instance. */ voidsentinelSendPeriodicCommands(sentinelRedisInstance *ri) { mstime_t now = mstime(); mstime_t info_period, ping_period; int retval;
/* Return ASAP if we have already a PING or INFO already pending, or * in the case the instance is not properly connected. */ if (ri->link->disconnected) return;
/* For INFO, PING, PUBLISH that are not critical commands to send we * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't * want to use a lot of memory just because a link is not working * properly (note that anyway there is a redundant protection about this, * that is, the link will be disconnected and reconnected if a long * timeout condition is detected. */ if (ri->link->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
/* If this is a slave of a master in O_DOWN condition we start sending * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD * period. In this state we want to closely monitor slaves in case they * are turned into masters by another Sentinel, or by the sysadmin. * * Similarly we monitor the INFO output more often if the slave reports * to be disconnected from the master, so that we can have a fresh * disconnection time figure. */ if ((ri->flags & SRI_SLAVE) && ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) || (ri->master_link_down_time != 0))) { info_period = 1000; } else { info_period = sentinel_info_period; }
/* We ping instances every time the last received pong is older than * the configured 'down-after-milliseconds' time, but every second * anyway if 'down-after-milliseconds' is greater than 1 second. */ ping_period = ri->down_after_period; if (ping_period > sentinel_ping_period) ping_period = sentinel_ping_period;
/* Send INFO to masters and slaves, not sentinels. */ if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, "%s", sentinelInstanceMapCommand(ri,"INFO")); if (retval == C_OK) ri->link->pending_commands++; }
/* Send PING to all the three kinds of instances. */ if ((now - ri->link->last_pong_time) > ping_period && (now - ri->link->last_ping_time) > ping_period/2) { sentinelSendPing(ri); }
/* PUBLISH hello messages to all the three kinds of instances. */ if ((now - ri->last_pub_time) > sentinel_publish_period) { sentinelSendHello(ri); } }
voidsentinelRunPendingScripts(void) { listNode *ln; listIter li; mstime_t now = mstime();
/* Find jobs that are not running and run them, from the top to the * tail of the queue, so we run older jobs first. */ listRewind(sentinel.scripts_queue,&li); while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING && (ln = listNext(&li)) != NULL) { sentinelScriptJob *sj = ln->value; pid_t pid;
/* Skip if already running. */ if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
/* Skip if it's a retry, but not enough time has elapsed. */ if (sj->start_time && sj->start_time > now) continue;
if (pid == -1) { /* Parent (fork error). * We report fork errors as signal 99, in order to unify the * reporting with other kind of errors. */ sentinelEvent(LL_WARNING,"-script-error",NULL, "%s %d %d", sj->argv[0], 99, 0); sj->flags &= ~SENTINEL_SCRIPT_RUNNING; sj->pid = 0; } elseif (pid == 0) { /* Child */ connTypeCleanupAll(); execve(sj->argv[0],sj->argv,environ); /* If we are here an error occurred. */ _exit(2); /* Don't retry execution. */ } else { sentinel.running_scripts++; sj->pid = pid; sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid); } } }
/* Check for scripts that terminated, and remove them from the queue if the * script terminated successfully. If instead the script was terminated by * a signal, or returned exit code "1", it is scheduled to run again if * the max number of retries did not already elapsed. */ voidsentinelCollectTerminatedScripts(void) { int statloc; pid_t pid;
while ((pid = waitpid(-1, &statloc, WNOHANG)) > 0) { int exitcode = WEXITSTATUS(statloc); int bysignal = 0; listNode *ln; sentinelScriptJob *sj;
ln = sentinelGetScriptListNodeByPid(pid); if (ln == NULL) { serverLog(LL_WARNING,"waitpid() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid); continue; } sj = ln->value;
/* If the script was terminated by a signal or returns an * exit code of "1" (that means: please retry), we reschedule it * if the max number of retries is not already reached. */ if ((bysignal || exitcode == 1) && sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY) { sj->flags &= ~SENTINEL_SCRIPT_RUNNING; sj->pid = 0; sj->start_time = mstime() + sentinelScriptRetryDelay(sj->retry_num); } else { /* Otherwise let's remove the script, but log the event if the * execution did not terminated in the best of the ways. */ if (bysignal || exitcode != 0) { sentinelEvent(LL_WARNING,"-script-error",NULL, "%s %d %d", sj->argv[0], bysignal, exitcode); } listDelNode(sentinel.scripts_queue,ln); sentinelReleaseScriptJob(sj); } sentinel.running_scripts--; } }
/* Kill scripts in timeout, they'll be collected by the * sentinelCollectTerminatedScripts() function. */ voidsentinelKillTimedoutScripts(void) { listNode *ln; listIter li; mstime_t now = mstime();
/* Check if we are in need for a reconnection of one of the * links, because we are detecting low activity. * * 1) Check if the command link seems connected, was connected not less * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a * pending ping for more than half the timeout. */ if (ri->link->cc && (mstime() - ri->link->cc_conn_time) > sentinel_min_link_reconnect_period && ri->link->act_ping_time != 0 && /* There is a pending ping... */ /* The pending ping is delayed, and we did not receive * error replies as well. */ (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) && (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2)) { instanceLinkCloseConnection(ri->link,ri->link->cc); }
/* 2) Check if the pubsub link seems connected, was connected not less * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no * activity in the Pub/Sub channel for more than * SENTINEL_PUBLISH_PERIOD * 3. */ if (ri->link->pc && (mstime() - ri->link->pc_conn_time) > sentinel_min_link_reconnect_period && (mstime() - ri->link->pc_last_activity) > (sentinel_publish_period*3)) { instanceLinkCloseConnection(ri->link,ri->link->pc); }
/* Update the SDOWN flag. We believe the instance is SDOWN if: * * 1) It is not replying. * 2) We believe it is a master, it reports to be a slave for enough time * to meet the down_after_period, plus enough time to get two times * INFO report from the instance. */ if (elapsed > ri->down_after_period || (ri->flags & SRI_MASTER && ri->role_reported == SRI_SLAVE && mstime() - ri->role_reported_time > (ri->down_after_period+sentinel_info_period*2)) || (ri->flags & SRI_MASTER_REBOOT && mstime()-ri->master_reboot_since_time > ri->master_reboot_down_after_period)) { /* Is subjectively down */ if ((ri->flags & SRI_S_DOWN) == 0) { sentinelEvent(LL_WARNING,"+sdown",ri,"%@"); ri->s_down_since_time = mstime(); ri->flags |= SRI_S_DOWN; } } else { /* Is subjectively up */ if (ri->flags & SRI_S_DOWN) { sentinelEvent(LL_WARNING,"-sdown",ri,"%@"); ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT); } } }
if (master->flags & SRI_S_DOWN) { /* Is down for enough sentinels? */ quorum = 1; /* the current sentinel. */ /* Count all the other sentinels. */ di = dictGetIterator(master->sentinels); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de);
if (ri->flags & SRI_MASTER_DOWN) quorum++; } dictReleaseIterator(di); if (quorum >= master->quorum) odown = 1; }
/* Set the flag accordingly to the outcome. */ if (odown) { if ((master->flags & SRI_O_DOWN) == 0) { sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d", quorum, master->quorum); master->flags |= SRI_O_DOWN; master->o_down_since_time = mstime(); } } else { if (master->flags & SRI_O_DOWN) { sentinelEvent(LL_WARNING,"-odown",master,"%@"); master->flags &= ~SRI_O_DOWN; } } }
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
switch(ri->failover_state) { case SENTINEL_FAILOVER_STATE_WAIT_START: sentinelFailoverWaitStart(ri); break; case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: sentinelFailoverSelectSlave(ri); break; case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: sentinelFailoverSendSlaveOfNoOne(ri); break; case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: sentinelFailoverWaitPromotion(ri); break; case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: sentinelFailoverReconfNextSlave(ri); break; } }
/* If we think the master is down, we start sending * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels * in order to get the replies that allow to reach the quorum * needed to mark the master in ODOWN state and trigger a failover. */ #define SENTINEL_ASK_FORCED (1<<0) voidsentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) { dictIterator *di; dictEntry *de;
/* If the master state from other sentinel is too old, we clear it. */ if (elapsed > sentinel_ask_period*5) { ri->flags &= ~SRI_MASTER_DOWN; sdsfree(ri->leader); ri->leader = NULL; }
/* Only ask if master is down to other sentinels if: * * 1) We believe it is down, or there is a failover in progress. * 2) Sentinel is connected. * 3) We did not receive the info within SENTINEL_ASK_PERIOD ms. */ if ((master->flags & SRI_S_DOWN) == 0) continue; if (ri->link->disconnected) continue; if (!(flags & SENTINEL_ASK_FORCED) && mstime() - ri->last_master_down_reply_time < sentinel_ask_period) continue;
/* This function is called when the slave is in * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need * to remove it from the master table and add the promoted slave instead. */ voidsentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) { sentinelRedisInstance *ref = master->promoted_slave ? master->promoted_slave : master;
/* This function is called when the slave is in * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need * to remove it from the master table and add the promoted slave instead. */ voidsentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) { sentinelRedisInstance *ref = master->promoted_slave ? master->promoted_slave : master;
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
switch(ri->failover_state) { case SENTINEL_FAILOVER_STATE_WAIT_START: sentinelFailoverWaitStart(ri); break; case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: sentinelFailoverSelectSlave(ri); break; case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: sentinelFailoverSendSlaveOfNoOne(ri); break; case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: sentinelFailoverWaitPromotion(ri); break; case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: sentinelFailoverReconfNextSlave(ri); break; } }