引言

前文redis之主从复制中,在讲述redis主从模式时,提及主从模式在主节点故障时,无法自主恢复,需要人工干预才能恢复。那么有没有什么方法可以使集群能够自动进行故障恢复呢?当然有,那就是哨兵模式

哨兵机制

哨兵机制是redis高可用的解决方案: 由一个或者多个sentinel实例组成的sentinel系统监控任意多个主服务器,同时监控这些主服务器下的从服务器。在所监视的主服务器下线时,自动完成故障恢复:包括新选主、剩从切换新主、旧主变新从。故障恢复后,由新主处理命令请求。

第一次看到这里有疑惑,哨兵机制是对 主从做的改进, 那为什么 哨兵机制能够监控多个主服务器呢?此时的redis架构是多个 主从=主从-主从 模式么?

综上所述,哨兵机制需要提供如下服务

  1. 感知 主服务器故障
  2. 拥有故障恢复能力 [选主过程由谁落实,有哪些条件?]
    • 选取 leader sentinel
    • 选取新的主服务器
    • 剩从切换新主
    • 旧主变新从
      那么redis 是如何为它提供如此能力的呢?

如何理解哨兵?

目前可以先这样理解: sentinel是功能特殊的redis服务器,它能执行的命令不同于主从服务器。

如何感知主服务器故障?

简单的概括是: 主观下线 + 客观下线

主观下线

sentinel.conf中有如下配置,指定了sentinel判断主观下线的策略。

1
sentinel down-after-milliseconds <master-name> <milliseconds>

在默认情况下,sentinel会以每秒一次的频率向所有与它创建了命令连接的实例(包含主、从、sentinel)发送ping命令。通过实例的返回判断实例是否在线。
实例对ping命令的返回有两种:

  1. 有效回复:实例返回+PONG| -LOADING | -MASTERDOWN三种回复中的一种。
  2. 无效回复:上述三种外的回复,或者在指定时限内没有任何回复。
    如果超出down-after-milliseconds未有有效回复,则sentinel会将该实例的flags状态修改为SRI_S_DOWN

上述配置不仅仅是sentinel判断主服务器的标准,同样是判断从服务器、其他sentinel是否主观下线的标准。

另外,要注意的是:对于监控同一个主服务器的多个sentienl而言,每个sentinel所设置的down-after-milliseconds可能不同,即不同sentinel判断主观下线的时长可能不同。

客观下线

sentinel判断某个主服务器主观下线后,会确认是否这个主服务器真的下线了。于是,它会向其他监视该主服务器的其他sentinel发送请求,检查是否其他sentinel也认为该主服务器是否主观下线了。如果认为主服务器主观下线的sentinel数量超过配置值,则将进行后续的故障恢复操作。

判断客观下线的配置值是在sentinel启动时指定的:

1
sentinel monitor <master-name> <ip> <redis-port> <quorum>

由于每个sentinel启动配置并不一致,所以会导致由不同的sentinel判断某主服务器主观下线的判断条件是不同的。

如何进行故障恢复

当一个主服务器被标记为客观下线之后,监控该主服务器的各个sentinel会进行协商,选出一个 leader sentinel,由它完成下线主服务器的故障转移操作。

如何选取leader sentinel

所有sentinel都有可能成为领头sentinel。每次进行leader sentinel选举后,不论是否成功,所有sentinel的配置纪元configuration epoch的值都会自增。
在一个配置纪元里,所有sentinel都有一次将某个sentinel设置为局部领头sentinel的机会,且局部领头sentinel一旦被设置,在这个配置纪元里不能被更改。
发现主服务器客观下线的sentinel都会要求其他sentinel将自己设置为局部领头sentinel。发送的命令格式如下:

1
sentinel is-master-down-by-addr ip port epoch run_id

如果 run_id != *而是源sentinel的运行id时,则表明源sentinel要求目标sentinel将它设置为局部领头sentinel
sentinel设置局部领头sentinel的规则是先到先得,最先向sentinel发送请求要求设置为局部领头sentinel的将成为目标sentinel的局部领头sentinel,之后收到的设置要求都将被拒绝。

sentinel收到上述命令的返回后,会检查回复中的leader_epoch的值是否和自己的相同,相同则继续比对run_id,如果相同则表示目标sentinel将源sentinel设置为了局部领头sentinel
如果某个sentinel被超过半数sentinel选举为局部领头sentinel,则该sentinelleader sentinel
一个配置纪元里,可能产生多个局部领头sentinel,但是只会产生一个 leader sentinel。 如果本次配置纪元无法选出 leader sentinel,则继续下一轮,知道选出 leader sentinel

疑惑: 如果某一个sentinel挂了呢?似乎也不影响选主sentinel,只需要保障> sentinel_nums / 2 + 1即可 ,sentinel_nums为在线数量。

在选出 leader sentinel后正式开始故障恢复过程。

选取新的主服务器

leader sentinel 从已下线主服务器的从服务器中,找出状态良好、数据完整的从服务器。然后向这个从服务器发送slave no one命令,将这个从服务器转换为主服务器。

选主的过程是,先将下线主服务器的从服务器保存在列表中,然后按照如下条件过滤:

  • 剔除所有正处于下线状态或者断线状态的从服务器。
  • 删除列表中所有最近5min没有回复过leader sentinel发送的INFO命令的从服务器,保障所有剩下的从服务器和leader sentinel正常通信。
  • 删除所有与下线主服务器连接断开超过 down-after-millseconds * 10的从服务器,以保障从服务器的数据库状态与下线主服务器状态尽可能一致。
  • 根据从服务器的优先级,对列表中剩余的从服务器排序,选出优先级最高的从服务器。
  • 如果优先级最高的从服务器有多个,则对比从服务器中的复制偏移量,最终选出偏移量最大的从服务器。
  • 如果偏移量也存在一致的,则按照从服务器的run_id进行排序,选择run_id最小的从服务器。

剩从切换新主

leader sentinel向剩余的从服务器发送slave of <new_master_ip> <new_master_port>

结合源码对应 故障转移状态

旧主变新从

最后,修改旧主服务器的实例结构,将其主服务器设置为 <new_master>

源码解读

main函数中启动sentinel

server.h中有这样一段代码,用以检测当前是否是sentinel模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int main(int argc, char **argv){
...
server.sentinel_mode = checkForSentinelMode(argc,argv, exec_name);
initServerConfig(); // 初始化服务器配置
...
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
...
loadServerConfig(server.configfile, config_from_stdin, options);
// 如果指定了配置文件
if (argc >= 2){
if (server.sentinel_mode) loadSentinelConfigFromQueue();
}

...
if (server.sentinel_mode) sentinelCheckConfigFile();
...
initServer(); // void initServer(void)
...
initListeners();
...
if (!server.sentinel_mode){
// 非 sentinel模式,正常模式里的功能。--> 可以对比 sentinel和 其他模式的差异!此处按下不表
aofLoadManifestFromDisk();
loadDataFromDisk();
aofOpenIfNeededOnServerStart();
aofDelHistoryFiles();
...
}else{
sentinelIsRunning();
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
redisCommunicateSystemd("STATUS=Ready to accept connections\n");
redisCommunicateSystemd("READY=1\n");
}
}
}

main checkForSentinelMode

通过sentinel的两种启动命令来判断是否是sentinel模式。
checkForSentinelMode的 判断逻辑是:

  • 程序使用redis-sentinel可执行文件启动
  • 启动配置中是否有配置项--sentinel

分别对应了sentinel的两种启动命令:

1
2
redis-sentinel /path/to/sentinel.conf // redis-sentinel程序
redis-server /path/to/sentinel.conf --sentinel // redis-server程序

具体的实现如下:

1
2
3
4
5
6
7
int checkForSentinelMode(int argc, char **argv, char *exec_name) {
if (strstr(exec_name,"redis-sentinel") != NULL) return 1;

for (int j = 1; j < argc; j++)
if (!strcmp(argv[j],"--sentinel")) return 1;
return 0;
}

main initSentinelConfig

如果是sentinel模式,则执行初始化操作:
initSentinelConfig函数中使用26379代替6379作为服务的端口号,同时禁用服务器运行保护模式。

1
2
3
4
5
6
/* This function overwrites a few normal Redis config default with Sentinel
* specific defaults. */
void initSentinelConfig(void) {
server.port = REDIS_SENTINEL_PORT; // #define REDIS_SENTINEL_PORT 26379
server.protected_mode = 0; /* Sentinel must be exposed. */
}

main initSentinel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* Perform the Sentinel mode initialization. */
void initSentinel(void) {
/* Initialize various data structures. */
sentinel.current_epoch = 0;
sentinel.masters = dictCreate(&instancesDictType);
sentinel.tilt = 0;
sentinel.tilt_start_time = 0;
sentinel.previous_time = mstime();
sentinel.running_scripts = 0;
sentinel.scripts_queue = listCreate(); // 初始化一个列表
sentinel.announce_ip = NULL;
sentinel.announce_port = 0;
sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
sentinel.deny_scripts_reconfig = SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG;
sentinel.sentinel_auth_pass = NULL;
sentinel.sentinel_auth_user = NULL;
sentinel.resolve_hostnames = SENTINEL_DEFAULT_RESOLVE_HOSTNAMES;
sentinel.announce_hostnames = SENTINEL_DEFAULT_ANNOUNCE_HOSTNAMES;
memset(sentinel.myid,0,sizeof(sentinel.myid));
server.sentinel_config = NULL;
}

initSentinel函数中使用dict.h/dictCreate初始化一个哈希表,用以保存sentinelRedisInstance名称sentinelRedisInstance的映射。(后文中使用sri替代sentinelRedisInstance

1
2
3
4
5
6
7
8
9
10
11
12
13
/* Instance name (sds) -> instance (sentinelRedisInstance pointer)
*
* also used for: sentinelRedisInstance->sentinels dictionary that maps
* sentinels ip:port to last seen time in Pub/Sub hello message. */
dictType instancesDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
dictInstancesValDestructor,/* val destructor */
NULL /* allow to expand */
};

注意到initSentinel执行完毕之后,诸多参数类似sentinel_config都还是空值。

main -> loadSentinelConfigFromQueue

sentinelHandleConfiguration函数中多个if-else块分别对应不同的执行命令。其中最主要的就是通过调用sentinelHandleConfiguration处理sentinel配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/* This function is used for loading the sentinel configuration from
* pre_monitor_cfg, monitor_cfg and post_monitor_cfg list */
void loadSentinelConfigFromQueue(void) {
const char *err = NULL;
listIter li;
listNode *ln;
int linenum = 0;
sds line = NULL;
unsigned int j;

/* if there is no sentinel_config entry, we can return immediately */
if (server.sentinel_config == NULL) return;

list *sentinel_configs[3] = {
server.sentinel_config->pre_monitor_cfg,
server.sentinel_config->monitor_cfg,
server.sentinel_config->post_monitor_cfg
};
/* loading from pre monitor config queue first to avoid dependency issues
* loading from monitor config queue
* loading from the post monitor config queue */
for (j = 0; j < sizeof(sentinel_configs) / sizeof(sentinel_configs[0]); j++) {
listRewind(sentinel_configs[j],&li);
while((ln = listNext(&li))) {
struct sentinelLoadQueueEntry *entry = ln->value;
err = sentinelHandleConfiguration(entry->argv,entry->argc);
if (err) {
linenum = entry->linenum;
line = entry->line;
goto loaderr;
}
}
}

/* free sentinel_config when config loading is finished */
freeSentinelConfig();
return;

loaderr:
fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR (Redis %s) ***\n",
REDIS_VERSION);
fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
fprintf(stderr, ">>> '%s'\n", line);
fprintf(stderr, "%s\n", err);
exit(1);
}
loadSentinelConfigFromQueue -> sentinelHandleConfiguration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
const char *sentinelHandleConfiguration(char **argv, int argc) {

sentinelRedisInstance *ri;

if (!strcasecmp(argv[0],"monitor") && argc == 5) {
/* monitor <name> <host> <port> <quorum> */
int quorum = atoi(argv[4]);

if (quorum <= 0) return "Quorum must be 1 or greater.";
if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
atoi(argv[3]),quorum,NULL) == NULL)
{
return sentinelCheckCreateInstanceErrors(SRI_MASTER);
}
} else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
/* down-after-milliseconds <name> <milliseconds> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->down_after_period = atoi(argv[2]);
if (ri->down_after_period <= 0)
return "negative or zero time parameter.";
sentinelPropagateDownAfterPeriod(ri);
} else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
/* failover-timeout <name> <milliseconds> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->failover_timeout = atoi(argv[2]);
if (ri->failover_timeout <= 0)
return "negative or zero time parameter.";
} else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
/* parallel-syncs <name> <milliseconds> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->parallel_syncs = atoi(argv[2]);
} else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
/* notification-script <name> <path> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
if (access(argv[2],X_OK) == -1)
return "Notification script seems non existing or non executable.";
ri->notification_script = sdsnew(argv[2]);
} else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
/* client-reconfig-script <name> <path> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
if (access(argv[2],X_OK) == -1)
return "Client reconfiguration script seems non existing or "
"non executable.";
ri->client_reconfig_script = sdsnew(argv[2]);
} else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
/* auth-pass <name> <password> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->auth_pass = sdsnew(argv[2]);
} else if (!strcasecmp(argv[0],"auth-user") && argc == 3) {
/* auth-user <name> <username> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->auth_user = sdsnew(argv[2]);
} else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) {
/* current-epoch <epoch> */
unsigned long long current_epoch = strtoull(argv[1],NULL,10);
if (current_epoch > sentinel.current_epoch)
sentinel.current_epoch = current_epoch;
} else if (!strcasecmp(argv[0],"myid") && argc == 2) {
if (strlen(argv[1]) != CONFIG_RUN_ID_SIZE)
return "Malformed Sentinel id in myid option.";
memcpy(sentinel.myid,argv[1],CONFIG_RUN_ID_SIZE);
} else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) {
/* config-epoch <name> <epoch> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->config_epoch = strtoull(argv[2],NULL,10);
/* The following update of current_epoch is not really useful as
* now the current epoch is persisted on the config file, but
* we leave this check here for redundancy. */
if (ri->config_epoch > sentinel.current_epoch)
sentinel.current_epoch = ri->config_epoch;
} else if (!strcasecmp(argv[0],"leader-epoch") && argc == 3) {
/* leader-epoch <name> <epoch> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->leader_epoch = strtoull(argv[2],NULL,10);
} else if ((!strcasecmp(argv[0],"known-slave") ||
!strcasecmp(argv[0],"known-replica")) && argc == 4)
{
sentinelRedisInstance *slave;

/* known-replica <name> <ip> <port> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2],
atoi(argv[3]), ri->quorum, ri)) == NULL)
{
return sentinelCheckCreateInstanceErrors(SRI_SLAVE);
}
} else if (!strcasecmp(argv[0],"known-sentinel") &&
(argc == 4 || argc == 5)) {
sentinelRedisInstance *si;

if (argc == 5) { /* Ignore the old form without runid. */
/* known-sentinel <name> <ip> <port> [runid] */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2],
atoi(argv[3]), ri->quorum, ri)) == NULL)
{
return sentinelCheckCreateInstanceErrors(SRI_SENTINEL);
}
si->runid = sdsnew(argv[4]);
sentinelTryConnectionSharing(si);
}
} else if (!strcasecmp(argv[0],"rename-command") && argc == 4) {
/* rename-command <name> <command> <renamed-command> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
sds oldcmd = sdsnew(argv[2]);
sds newcmd = sdsnew(argv[3]);
if (dictAdd(ri->renamed_commands,oldcmd,newcmd) != DICT_OK) {
sdsfree(oldcmd);
sdsfree(newcmd);
return "Same command renamed multiple times with rename-command.";
}
} else if (!strcasecmp(argv[0],"announce-ip") && argc == 2) {
/* announce-ip <ip-address> */
if (strlen(argv[1]))
sentinel.announce_ip = sdsnew(argv[1]);
} else if (!strcasecmp(argv[0],"announce-port") && argc == 2) {
/* announce-port <port> */
sentinel.announce_port = atoi(argv[1]);
} else if (!strcasecmp(argv[0],"deny-scripts-reconfig") && argc == 2) {
/* deny-scripts-reconfig <yes|no> */
if ((sentinel.deny_scripts_reconfig = yesnotoi(argv[1])) == -1) {
return "Please specify yes or no for the "
"deny-scripts-reconfig options.";
}
} else if (!strcasecmp(argv[0],"sentinel-user") && argc == 2) {
/* sentinel-user <user-name> */
if (strlen(argv[1]))
sentinel.sentinel_auth_user = sdsnew(argv[1]);
} else if (!strcasecmp(argv[0],"sentinel-pass") && argc == 2) {
/* sentinel-pass <password> */
if (strlen(argv[1]))
sentinel.sentinel_auth_pass = sdsnew(argv[1]);
} else if (!strcasecmp(argv[0],"resolve-hostnames") && argc == 2) {
/* resolve-hostnames <yes|no> */
if ((sentinel.resolve_hostnames = yesnotoi(argv[1])) == -1) {
return "Please specify yes or no for the resolve-hostnames option.";
}
} else if (!strcasecmp(argv[0],"announce-hostnames") && argc == 2) {
/* announce-hostnames <yes|no> */
if ((sentinel.announce_hostnames = yesnotoi(argv[1])) == -1) {
return "Please specify yes or no for the announce-hostnames option.";
}
} else if (!strcasecmp(argv[0],"master-reboot-down-after-period") && argc == 3) {
/* master-reboot-down-after-period <name> <milliseconds> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->master_reboot_down_after_period = atoi(argv[2]);
if (ri->master_reboot_down_after_period < 0)
return "negative time parameter.";
} else {
return "Unrecognized sentinel configuration statement.";
}
return NULL;
}

第一处if-else语句对应了monitor <name> <host> <port> <quorum>,从配置中读取 客观下线配置quorum,如果小于0,则直接退出。
而后调用createSentinelRedisInstance函数尝试创建rsi实例:

sentinelHandleConfiguration -> createSentinelRedisInstance
1
2
// 调用者
createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],atoi(argv[3]),quorum,NULL)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
sentinelRedisInstance *ri;
sentinelAddr *addr;
dict *table = NULL;
sds sdsname;

serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
serverAssert((flags & SRI_MASTER) || master != NULL);

/* Check address validity. */
addr = createSentinelAddr(hostname,port,1);
if (addr == NULL) return NULL;

/* For slaves use ip/host:port as name. */
if (flags & SRI_SLAVE)
sdsname = announceSentinelAddrAndPort(addr);
else
sdsname = sdsnew(name);

// 检查以防止master、slave、sentinel重复添加
if (flags & SRI_MASTER) table = sentinel.masters;
else if (flags & SRI_SLAVE) table = master->slaves;
else if (flags & SRI_SENTINEL) table = master->sentinels;
if (dictFind(table,sdsname)) {
releaseSentinelAddr(addr);
sdsfree(sdsname);
errno = EBUSY;
return NULL;
}

// 创建 rsi 实例,并赋初值
ri = zmalloc(sizeof(*ri));
/* Note that all the instances are started in the disconnected state,
* the event loop will take care of connecting them. */
ri->flags = flags;
ri->name = sdsname;
ri->runid = NULL;
ri->config_epoch = 0;
ri->addr = addr;
ri->link = createInstanceLink(); // 这个要重点关注
ri->last_pub_time = mstime();
ri->last_hello_time = mstime();
ri->last_master_down_reply_time = mstime();
ri->s_down_since_time = 0;
ri->o_down_since_time = 0;
ri->down_after_period = master ? master->down_after_period : sentinel_default_down_after;
ri->master_reboot_down_after_period = 0;
ri->master_link_down_time = 0;
ri->auth_pass = NULL;
ri->auth_user = NULL;
ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
ri->replica_announced = 1;
ri->slave_reconf_sent_time = 0;
ri->slave_master_host = NULL;
ri->slave_master_port = 0;
ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
ri->slave_repl_offset = 0;
ri->sentinels = dictCreate(&instancesDictType);
ri->quorum = quorum;
ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
ri->master = master;
ri->slaves = dictCreate(&instancesDictType);
ri->info_refresh = 0;
ri->renamed_commands = dictCreate(&renamedCommandsDictType);

// 初始化 故障恢复 相关功能配置
ri->leader = NULL;
ri->leader_epoch = 0;
ri->failover_epoch = 0;
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
ri->failover_state_change_time = 0;
ri->failover_start_time = 0;
ri->failover_timeout = sentinel_default_failover_timeout;
ri->failover_delay_logged = 0;
ri->promoted_slave = NULL;
ri->notification_script = NULL;
ri->client_reconfig_script = NULL;
ri->info = NULL;

// 设置当前 rsi的实例角色
ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE);
ri->role_reported_time = mstime();
ri->slave_conf_change_time = mstime();

/* Add into the right table. */
dictAdd(table, ri->name, ri);
return ri;
}

调用createSentinelRedisInstance函数时需要传入:

  1. runid: 设置为null,将在第一次收到info的响应时填充。
  2. info_refresh: 如果置为0,则说明尚未收到info响应。
  3. 如果设置为SRI_MASTER,它将被加入到sentinel.masters中;如果是SRI_SLAVE或者SRI_SENTINEL,则master!=null且将会被加入到master->slaves或者master->sentinels中。
  4. SRI_SLAVE时,传入的name字段将会被忽略,创建出的sri中的名称将自动创建,形如ip/hostname:port
  5. createSentinelRedisInstance创建失败的场景包括:
    • hostname无法被解析。
    • 端口超出范围。
    • master重名,slave地址重复或者 sentinel id重复等。

createSentinelRedisInstance中主要包含如下逻辑:

  1. 检查地址有效性
  2. 为slave等分配name
  3. 对比table, 保障实例不会被反复添加
  4. 创建rsi并赋初值
    • sentinel配置
    • 故障恢复配置
    • sentinel 角色配置
  5. name-rsi table更新

而后调用栈返回,抵达loadSentinelConfigFromQueue,释放sentinel_config配置。

main -> sentinelCheckConfigFile

此时程序返回,继续执行server.c/sentinelCheckConfigFile中,检查sentinel是否被设置以及检查该配置为简是否有写权限,如果异常则返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* This function is for checking whether sentinel config file has been set,
* also checking whether we have write permissions. */
void sentinelCheckConfigFile(void) {
// server.configfile 在 server.c 中已被赋值
if (server.configfile == NULL) {
serverLog(LL_WARNING,
"Sentinel needs config file on disk to save state. Exiting...");
exit(1);
} else if (access(server.configfile,W_OK) == -1) {
serverLog(LL_WARNING,
"Sentinel config file %s is not writable: %s. Exiting...",
server.configfile,strerror(errno));
exit(1);
}
}

main -> sentinelIsRunning

完成rsi初始化以及initServerinitListeners之后,还会检查是否sentinel状态,如果是sentinel模式,则调用sentinelIsRunning检查其运行状态;否则,开启额外配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void sentinelIsRunning(void) {
int j;
for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
if (sentinel.myid[j] != 0) break;

if (j == CONFIG_RUN_ID_SIZE) {
/* Pick ID and persist the config. */
getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
sentinelFlushConfig();
}

/* Log its ID to make debugging of issues simpler. */
serverLog(LL_NOTICE,"Sentinel ID is %s", sentinel.myid);

/* We want to generate a +monitor event for every configured master
* at startup. */
sentinelGenerateInitialMonitorEvents();
}

sentinelIsRunning中检查该rsi是否有runid,如果没有则随机生成一个并刷入到磁盘文件中。然后调用sentinelGenerateInitialMonitorEvents函数生成监听事件。

sentinelIsRunning -> sentinelGenerateInitialMonitorEvents

sentinelGenerateInitialMonitorEvents函数遍历sentinel.masters,为每一个待监听的sentinel创建+monitor事件。
该方法在每次调用sentinel monitor master ip port quorum命令时触发,用以为每一个配置的master创建+monitor event事件。
sentinel monitor命令的作用是让sentinel监听主节点配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* This function is called only at startup and is used to generate a
* +monitor event for every configured master. The same events are also
* generated when a master to monitor is added at runtime via the
* SENTINEL MONITOR command. */
void sentinelGenerateInitialMonitorEvents(void) {
dictIterator *di;
dictEntry *de;

di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
}
dictReleaseIterator(di);
}

sentinelEvent函数的逻辑如下:
如果日志级别非LL_DEBUG,则调用pubsubPublishMessage传递该事件通知。其实现位于pubsub.c中。
如果日志级别为LL_DEBUGri!=null,且指定了通知脚本,则调用sentinelScheduleScriptExecution传递。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
const char *fmt, ...) {
va_list ap;
char msg[LOG_MAX_LEN];
robj *channel, *payload;

/* Handle %@ */
if (fmt[0] == '%' && fmt[1] == '@') {
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
NULL : ri->master;

if (master) {
snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
sentinelRedisInstanceTypeStr(ri),
ri->name, announceSentinelAddr(ri->addr), ri->addr->port,
master->name, announceSentinelAddr(master->addr), master->addr->port);
} else {
snprintf(msg, sizeof(msg), "%s %s %s %d",
sentinelRedisInstanceTypeStr(ri),
ri->name, announceSentinelAddr(ri->addr), ri->addr->port);
}
fmt += 2;
} else {
msg[0] = '\0';
}

/* Use vsprintf for the rest of the formatting if any. */
if (fmt[0] != '\0') {
va_start(ap, fmt);
vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
va_end(ap);
}

/* Log the message if the log level allows it to be logged. */
if (level >= server.verbosity)
serverLog(level,"%s %s",type,msg);

/* Publish the message via Pub/Sub if it's not a debugging one. */
if (level != LL_DEBUG) {
channel = createStringObject(type,strlen(type));
payload = createStringObject(msg,strlen(msg));
pubsubPublishMessage(channel,payload,0);
decrRefCount(channel);
decrRefCount(payload);
}

/* Call the notification script if applicable. */
if (level == LL_WARNING && ri != NULL) {
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
ri : ri->master;
if (master && master->notification_script) {
sentinelScheduleScriptExecution(master->notification_script,
type,msg,NULL);
}
}
}

至此rsi就创建完毕了,但是各实例间是如何通信的呢?来看下面这段代码:

serverCron

1
2
3
4
5
6
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
if (server.sentinel_mode) sentinelTimer();
...
}

serverCron -> sentinelTimer

server.c/main代码块的末尾,会注册事件循环,每次注册事件的时会调用aeCreateTimeEvent创建时间事件。
来看sentinelTimer函数具体做了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void sentinelTimer(void) {
sentinelCheckTiltCondition();
sentinelHandleDictOfRedisInstances(sentinel.masters);
sentinelRunPendingScripts();
sentinelCollectTerminatedScripts();
sentinelKillTimedoutScripts();

/* We continuously change the frequency of the Redis "timer interrupt"
* in order to desynchronize every Sentinel from every other.
* This non-determinism avoids that Sentinels started at the same time
* exactly continue to stay synchronized asking to be voted at the
* same time again and again (resulting in nobody likely winning the
* election because of split brain voting). */
server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}

可以看到sentinelTimer中主要分为三部分:

  1. 检查是否处于titl静默模式中
  2. 处理rsi实例
  3. 处理脚本

sentinelTimer -> sentinelCheckTiltCondition

下面逐一来看:

1
2
3
4
5
6
7
8
9
10
11
void sentinelCheckTiltCondition(void) {
mstime_t now = mstime();
mstime_t delta = now - sentinel.previous_time;

if (delta < 0 || delta > sentinel_tilt_trigger) {
sentinel.tilt = 1;
sentinel.tilt_start_time = mstime();
sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered");
}
sentinel.previous_time = mstime();
}

sentinelCheckTiltCondition函数检查当前sentinel是否进入TILT状态: 如果检测到两次时间任务的返回值为负数或者非常大的数,则将进入TILT模式。在进入TITL模式后,等待SENTINEL_TILT_PERIOD 再执行操作。
出现如上现象的原因有:

  1. sentinel在处理某些任务时阻塞了,可能原因是加载过大的数据,主机io异常,进程接收到信号stop。
  2. 系统时钟发生了重大变化。

sentinelTimer -> sentinelHandleDictOfRedisInstances

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* Perform scheduled operations for all the instances in the dictionary.
* Recursively call the function against dictionaries of slaves. */
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;

/* There are a number of things we need to perform against every master. */
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}
if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}

sentinelHandleDictOfRedisInstances函数执行时传参为sentinel.masters,该值在initSentinel时初始化,在createSentinelRedisInstance中完成赋值。
遍历当前模式中的所有主服务器,执行sentinelHandleRedisInstance操作。
如果当前遍历到的服务器是主服务器,则还需要对其sentinelsslaves进行相同的处理。
sentinelHandleDictOfRedisInstances中递归处理rsi实例,并判断是否需要故障恢复并完成主节点的选举

sentinelHandleDictOfRedisInstances -> sentinelHandleRedisInstance

sentinelHandleRedisInstance函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/* Perform scheduled operations for the specified Redis instance. */
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* ========== MONITORING HALF ============ */
/* Every kind of instance */
sentinelReconnectInstance(ri);
sentinelSendPeriodicCommands(ri);

/* ============== ACTING HALF ============= */
/* We don't proceed with the acting half if we are in TILT mode.
* TILT happens when we find something odd with the time, like a
* sudden change in the clock. */
if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < sentinel_tilt_period) return;
sentinel.tilt = 0;
sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}

/* Every kind of instance */
sentinelCheckSubjectivelyDown(ri);

/* Masters and slaves */
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
/* Nothing so far. */
}

/* Only masters */
if (ri->flags & SRI_MASTER) {
sentinelCheckObjectivelyDown(ri);
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}

主要包含如下操作:

  1. sentinel重连其他实例。
  2. sentinel向其他实例发送定期任务。
  3. 判断是否titl模式,如果是则发送-titl事件。
  4. 对每种类型的实例判断是否主观下线。
  5. 针对master,判断是否客观下线。
    • 如果需要进行故障恢复,向其他sentinel询问master的状态。
    • 选出执行故障恢复的 leader sentinel
    • 向其他sentinel询问master的状态。

如果当前已经变为保护模式,则执行sentinelFailoverSwitchToPromotedSlave函数。

这部分代码逻辑不在此处展开,放到后面的具体场景中介绍。

sentinelHandleRedisInstance -> sentinelReconnectInstance
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/* Create the async connections for the instance link if the link
* is disconnected. Note that link->disconnected is true even if just
* one of the two links (commands and pub/sub) is missing. */
void sentinelReconnectInstance(sentinelRedisInstance *ri) {

if (ri->link->disconnected == 0) return;
if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
instanceLink *link = ri->link;
mstime_t now = mstime();

if (now - ri->link->last_reconn_time < sentinel_ping_period) return;
ri->link->last_reconn_time = now;

/* Commands connection. */
if (link->cc == NULL) {

/* It might be that the instance is disconnected because it wasn't available earlier when the instance
* allocated, say during failover, and therefore we failed to resolve its ip.
* Another scenario is that the instance restarted with new ip, and we should resolve its new ip based on
* its hostname */
if (sentinel.resolve_hostnames) {
sentinelAddr *tryResolveAddr = createSentinelAddr(ri->addr->hostname, ri->addr->port, 0);
if (tryResolveAddr != NULL) {
releaseSentinelAddr(ri->addr);
ri->addr = tryResolveAddr;
}
}

link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,server.bind_source_addr);

if (link->cc && !link->cc->err) anetCloexec(link->cc->c.fd);
if (!link->cc) {
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to establish connection");
} else if (!link->cc->err && server.tls_replication &&
(instanceLinkNegotiateTLS(link->cc) == C_ERR)) {
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to initialize TLS");
instanceLinkCloseConnection(link,link->cc);
} else if (link->cc->err) {
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
link->cc->errstr);
instanceLinkCloseConnection(link,link->cc);
} else {
link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link;
redisAeAttach(server.el,link->cc);
redisAsyncSetConnectCallback(link->cc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->cc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,link->cc);
sentinelSetClientName(ri,link->cc,"cmd");

/* Send a PING ASAP when reconnecting. */
sentinelSendPing(ri);
}
}
/* Pub / Sub */
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,server.bind_source_addr);
if (link->pc && !link->pc->err) anetCloexec(link->pc->c.fd);
if (!link->pc) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to establish connection");
} else if (!link->pc->err && server.tls_replication &&
(instanceLinkNegotiateTLS(link->pc) == C_ERR)) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to initialize TLS");
} else if (link->pc->err) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
link->pc->errstr);
instanceLinkCloseConnection(link,link->pc);
} else {
int retval;
link->pc_conn_time = mstime();
link->pc->data = link;
redisAeAttach(server.el,link->pc);
redisAsyncSetConnectCallback(link->pc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->pc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,link->pc);
sentinelSetClientName(ri,link->pc,"pubsub");
/* Now we subscribe to the Sentinels "Hello" channel. */
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "%s %s",
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
SENTINEL_HELLO_CHANNEL);
if (retval != C_OK) {
/* If we can't subscribe, the Pub/Sub connection is useless
* and we can simply disconnect it and try again. */
instanceLinkCloseConnection(link,link->pc);
return;
}
}
}
/* Clear the disconnected status only if we have both the connections
* (or just the commands connection if this is a sentinel instance). */
if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
link->disconnected = 0;
}
sentinelHandleRedisInstance -> sentinelSendPeriodicCommands
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/* Send periodic PING, INFO, and PUBLISH to the Hello channel to
* the specified master or slave instance. */
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;

/* Return ASAP if we have already a PING or INFO already pending, or
* in the case the instance is not properly connected. */
if (ri->link->disconnected) return;

/* For INFO, PING, PUBLISH that are not critical commands to send we
* also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
* want to use a lot of memory just because a link is not working
* properly (note that anyway there is a redundant protection about this,
* that is, the link will be disconnected and reconnected if a long
* timeout condition is detected. */
if (ri->link->pending_commands >=
SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;

/* If this is a slave of a master in O_DOWN condition we start sending
* it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
* period. In this state we want to closely monitor slaves in case they
* are turned into masters by another Sentinel, or by the sysadmin.
*
* Similarly we monitor the INFO output more often if the slave reports
* to be disconnected from the master, so that we can have a fresh
* disconnection time figure. */
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0)))
{
info_period = 1000;
} else {
info_period = sentinel_info_period;
}

/* We ping instances every time the last received pong is older than
* the configured 'down-after-milliseconds' time, but every second
* anyway if 'down-after-milliseconds' is greater than 1 second. */
ping_period = ri->down_after_period;
if (ping_period > sentinel_ping_period) ping_period = sentinel_ping_period;

/* Send INFO to masters and slaves, not sentinels. */
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"INFO"));
if (retval == C_OK) ri->link->pending_commands++;
}

/* Send PING to all the three kinds of instances. */
if ((now - ri->link->last_pong_time) > ping_period &&
(now - ri->link->last_ping_time) > ping_period/2) {
sentinelSendPing(ri);
}

/* PUBLISH hello messages to all the three kinds of instances. */
if ((now - ri->last_pub_time) > sentinel_publish_period) {
sentinelSendHello(ri);
}
}

sentinelTimer -> sentinelRunPendingScripts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void sentinelRunPendingScripts(void) {
listNode *ln;
listIter li;
mstime_t now = mstime();

/* Find jobs that are not running and run them, from the top to the
* tail of the queue, so we run older jobs first. */
listRewind(sentinel.scripts_queue,&li);
while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
(ln = listNext(&li)) != NULL)
{
sentinelScriptJob *sj = ln->value;
pid_t pid;

/* Skip if already running. */
if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;

/* Skip if it's a retry, but not enough time has elapsed. */
if (sj->start_time && sj->start_time > now) continue;

sj->flags |= SENTINEL_SCRIPT_RUNNING;
sj->start_time = mstime();
sj->retry_num++;
pid = fork();

if (pid == -1) {
/* Parent (fork error).
* We report fork errors as signal 99, in order to unify the
* reporting with other kind of errors. */
sentinelEvent(LL_WARNING,"-script-error",NULL,
"%s %d %d", sj->argv[0], 99, 0);
sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
sj->pid = 0;
} else if (pid == 0) {
/* Child */
connTypeCleanupAll();
execve(sj->argv[0],sj->argv,environ);
/* If we are here an error occurred. */
_exit(2); /* Don't retry execution. */
} else {
sentinel.running_scripts++;
sj->pid = pid;
sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid);
}
}
}
sentinelTimer -> sentinelCollectTerminatedScripts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/* Check for scripts that terminated, and remove them from the queue if the
* script terminated successfully. If instead the script was terminated by
* a signal, or returned exit code "1", it is scheduled to run again if
* the max number of retries did not already elapsed. */
void sentinelCollectTerminatedScripts(void) {
int statloc;
pid_t pid;

while ((pid = waitpid(-1, &statloc, WNOHANG)) > 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
listNode *ln;
sentinelScriptJob *sj;

if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
sentinelEvent(LL_DEBUG,"-script-child",NULL,"%ld %d %d",
(long)pid, exitcode, bysignal);

ln = sentinelGetScriptListNodeByPid(pid);
if (ln == NULL) {
serverLog(LL_WARNING,"waitpid() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
continue;
}
sj = ln->value;

/* If the script was terminated by a signal or returns an
* exit code of "1" (that means: please retry), we reschedule it
* if the max number of retries is not already reached. */
if ((bysignal || exitcode == 1) &&
sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
{
sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
sj->pid = 0;
sj->start_time = mstime() +
sentinelScriptRetryDelay(sj->retry_num);
} else {
/* Otherwise let's remove the script, but log the event if the
* execution did not terminated in the best of the ways. */
if (bysignal || exitcode != 0) {
sentinelEvent(LL_WARNING,"-script-error",NULL,
"%s %d %d", sj->argv[0], bysignal, exitcode);
}
listDelNode(sentinel.scripts_queue,ln);
sentinelReleaseScriptJob(sj);
}
sentinel.running_scripts--;
}
}
sentinelTimer -> sentinelKillTimedoutScripts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* Kill scripts in timeout, they'll be collected by the
* sentinelCollectTerminatedScripts() function. */
void sentinelKillTimedoutScripts(void) {
listNode *ln;
listIter li;
mstime_t now = mstime();

listRewind(sentinel.scripts_queue,&li);
while ((ln = listNext(&li)) != NULL) {
sentinelScriptJob *sj = ln->value;

if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
(now - sj->start_time) > sentinel_script_max_runtime)
{
sentinelEvent(LL_WARNING,"-script-timeout",NULL,"%s %ld",
sj->argv[0], (long)sj->pid);
kill(sj->pid,SIGKILL);
}
}
}

sentinel 创建过程

启动并初始化sentinel

初始化服务器

初始化 sentinel状态

初始化sentinel状态和masters属性

创建连向主服务器的网络连接

sentinel、server之间如何通信?

通信过程中的命令?

sentinel如何应对脑裂?

sentinelSendPeriodicCommands函数中定义了通信过程中使用的命令

什么是TILT mode?

故障恢复

1
2
3
4
5
6
7
8
/* Only masters */
if (ri->flags & SRI_MASTER) {
sentinelCheckObjectivelyDown(ri);
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}

如何检测主观下线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/* Is this instance down from our point of view? */
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;

if (ri->link->act_ping_time)
elapsed = mstime() - ri->link->act_ping_time;
else if (ri->link->disconnected)
elapsed = mstime() - ri->link->last_avail_time;

/* Check if we are in need for a reconnection of one of the
* links, because we are detecting low activity.
*
* 1) Check if the command link seems connected, was connected not less
* than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
* pending ping for more than half the timeout. */
if (ri->link->cc &&
(mstime() - ri->link->cc_conn_time) >
sentinel_min_link_reconnect_period &&
ri->link->act_ping_time != 0 && /* There is a pending ping... */
/* The pending ping is delayed, and we did not receive
* error replies as well. */
(mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
(mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
{
instanceLinkCloseConnection(ri->link,ri->link->cc);
}

/* 2) Check if the pubsub link seems connected, was connected not less
* than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
* activity in the Pub/Sub channel for more than
* SENTINEL_PUBLISH_PERIOD * 3.
*/
if (ri->link->pc &&
(mstime() - ri->link->pc_conn_time) >
sentinel_min_link_reconnect_period &&
(mstime() - ri->link->pc_last_activity) > (sentinel_publish_period*3))
{
instanceLinkCloseConnection(ri->link,ri->link->pc);
}

/* Update the SDOWN flag. We believe the instance is SDOWN if:
*
* 1) It is not replying.
* 2) We believe it is a master, it reports to be a slave for enough time
* to meet the down_after_period, plus enough time to get two times
* INFO report from the instance. */
if (elapsed > ri->down_after_period ||
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time >
(ri->down_after_period+sentinel_info_period*2)) ||
(ri->flags & SRI_MASTER_REBOOT &&
mstime()-ri->master_reboot_since_time > ri->master_reboot_down_after_period))
{
/* Is subjectively down */
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {
/* Is subjectively up */
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}

如何检测客观下线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;

if (master->flags & SRI_S_DOWN) {
/* Is down for enough sentinels? */
quorum = 1; /* the current sentinel. */
/* Count all the other sentinels. */
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
if (quorum >= master->quorum) odown = 1;
}

/* Set the flag accordingly to the outcome. */
if (odown) {
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
} else {
if (master->flags & SRI_O_DOWN) {
sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
}
}
}
sentinelFailoverStateMachine

根据rsi当前的故障恢复状态,进行不同的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);

if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

switch(ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}
sentinelAskMasterStateToOtherSentinels

向其他sentinel询问master的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/* If we think the master is down, we start sending
* SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
* in order to get the replies that allow to reach the quorum
* needed to mark the master in ODOWN state and trigger a failover. */
#define SENTINEL_ASK_FORCED (1<<0)
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
dictIterator *di;
dictEntry *de;

di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
char port[32];
int retval;

/* If the master state from other sentinel is too old, we clear it. */
if (elapsed > sentinel_ask_period*5) {
ri->flags &= ~SRI_MASTER_DOWN;
sdsfree(ri->leader);
ri->leader = NULL;
}

/* Only ask if master is down to other sentinels if:
*
* 1) We believe it is down, or there is a failover in progress.
* 2) Sentinel is connected.
* 3) We did not receive the info within SENTINEL_ASK_PERIOD ms. */
if ((master->flags & SRI_S_DOWN) == 0) continue;
if (ri->link->disconnected) continue;
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < sentinel_ask_period)
continue;

/* Ask */
ll2string(port,sizeof(port),master->addr->port);
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, ri,
"%s is-master-down-by-addr %s %s %llu %s",
sentinelInstanceMapCommand(ri,"SENTINEL"),
announceSentinelAddr(master->addr), port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
if (retval == C_OK) ri->link->pending_commands++;
}
dictReleaseIterator(di);
}
sentinelHandleDictOfRedisInstances -> sentinelFailoverSwitchToPromotedSlave
1
2
3
4
5
6
7
8
9
10
11
12
13
/* This function is called when the slave is in
* SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
* to remove it from the master table and add the promoted slave instead. */
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
sentinelRedisInstance *ref = master->promoted_slave ?
master->promoted_slave : master;

sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d",
master->name, announceSentinelAddr(master->addr), master->addr->port,
announceSentinelAddr(ref->addr), ref->addr->port);

sentinelResetMasterAndChangeAddress(master,ref->addr->hostname,ref->addr->port);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/* This function is called when the slave is in
* SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
* to remove it from the master table and add the promoted slave instead. */
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
sentinelRedisInstance *ref = master->promoted_slave ?
master->promoted_slave : master;

sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d",
master->name, announceSentinelAddr(master->addr), master->addr->port,
announceSentinelAddr(ref->addr), ref->addr->port);

sentinelResetMasterAndChangeAddress(master,ref->addr->hostname,ref->addr->port);
}

void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);

if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

switch(ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}

sentinelFailoverSwitchToPromotedSlave函数首先发起+switch-master事件,并通知所有sentinel

引用

1. redis设计与实现
2. Raft协议实战之Redis Sentinel的选举Leader源码解析
3. 极客时间-关于哨兵模式的讨论
4. Redis主从、哨兵、 Cluster集群一锅端!
5. Redis Sentinel 源码分析(1)Sentinel的初始化
6. Raft协议实战之Redis Sentinel的选举Leader源码解析
7. sentinel命令参考
8. Redis Sentinel 源码:Redis的高可用模型分析
9. Redis专题:深入解读哨兵模式