Redis 6 剖析(二) 主从同步

本篇是 Redis 6 剖析的第二篇,主要探讨 Redis 是怎么做主从同步的,对代码会有所删减。

SLAVE

通常启用主从同步,只要在从服务器执行 SLAVEOF HOST PORT 即可,这个时候就会执行到 replicaofCommand 。由于主从同步是从服务器发起的,因此我们先从 Slave 开始进行剖析。

Redis6-Replication

repl_state

Redis 的主从同步,是通过状态机驱动的,因此有必要在本篇一开始前,就先看看有哪些状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef enum {
REPL_STATE_NONE = 0, /* No active replication */
REPL_STATE_CONNECT, /* Must connect to master */
REPL_STATE_CONNECTING, /* Connecting to master */
/* --- Handshake states, must be ordered --- */
REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */
REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequance to master */
REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */
REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
/* --- End of handshake states --- */
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
REPL_STATE_CONNECTED, /* Connected to master */
} repl_state;
  • REPL_STATE_NONE,未启动同步。
  • REPL_STATE_CONNECT,需要连接到 Master
  • REPL_STATE_RECEIVE_PING_REPLY,等待 PING 的回包。
  • REPL_STATE_SEND_HANDSHAKE,验证密码。
  • REPL_STATE_RECEIVE_AUTH_REPLY ,等待 AUTH 的回包。
  • REPL_STATE_RECEIVE_PORT_REPLY,等待 REPLCONF 针对端口的回包。
  • REPL_STATE_RECEIVE_IP_REPLY ,等待 REPLCONF 针对IP的回包。
  • REPL_STATE_RECEIVE_CAPA_REPLY ,等待 REPLCONF 针对”能力”(即支持的功能)的回包。
  • REPL_STATE_SEND_PSYNC ,发送 PSYNC
  • REPL_STATE_RECEIVE_PSYNC_REPLY ,等待 PSYNC 的回包。
  • REPL_STATE_TRANSFER ,传送快照。
  • REPL_STATE_CONNECTED ,主从同步完成。

replicaofCommand

拿到 MasterIPPort

1
2
3
4
5
6
void replicaofCommand(client *c) {
....
long port;
replicationSetMaster(c->argv[1]->ptr, port);
addReply(c,shared.ok);
}

replicationSetMaster

断连所有的 Slave ,然后取消掉原先的主从连接(如果有),设置 Cache Master 为了复用 PSYNC (保存当前进度,不进行全量同步)。

  • 设置状态 REPL_STATE_CONNECT ,表示需要连接 Master
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void replicationSetMaster(char *ip, int port) {
server.masterhost = sdsnew(ip);
server.masterport = port;

/* Force our slaves to resync with us as well. They may hopefully be able
* to partially resync with us, but we can notify the replid change. */
disconnectSlaves();
cancelReplicationHandshake(0);
/* Before destroying our master state, create a cached master using
* our own parameters, to later PSYNC with the new master. */
if (was_master) {
replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself();
}

server.repl_state = REPL_STATE_CONNECT;
connectWithMaster();
}

connectWithMaster

Redis 6 支持 TLS,为了简化剖析过程,此处默认不采用 TLS 连接

  • server.repl_transfer_lastio,最后一次 IO 时间,用于超时处理。
  • 设置状态 REPL_STATE_CONNECTING ,表示已连接到 Master
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int connectWithMaster(void) {
server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
connGetLastError(server.repl_transfer_s));
connClose(server.repl_transfer_s);
server.repl_transfer_s = NULL;
return C_ERR;
}

server.repl_transfer_lastio = server.unixtime;
server.repl_state = REPL_STATE_CONNECTING;
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
return C_OK;
}

syncWithMaster

Slave → Master 连接完成后,会进入到 syncWithMaster 回调。这个函数共有 300多行,因此分为多个部分讲解。

  1. 若当前状态机状态为 REPL_STATE_NONE ,直接返回。
  2. 检查链接是否正常。

这种情况主要是出现在 Slave 连接上 Master 之后,Client 后悔了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void syncWithMaster(connection *conn) {
char tmpfile[256], *err = NULL;
int dfd = -1, maxtries = 5;
int psync_result;

/* If this event fired after the user turned the instance into a master
* with SLAVEOF NO ONE we must just return ASAP. */
if (server.repl_state == REPL_STATE_NONE) {
connClose(conn);
return;
}

/* Check for errors in the socket: after a non blocking connect() we
* may find that the socket is in error state. */
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
connGetLastError(conn));
goto error;
}
  1. REPL_STATE_CONNECTING ,设置 Read Handler 为当前函数。

  2. 发送命令 PINGMaster

  3. 设置状态 REPL_STATE_RECEIVE_PING_REPLY ,表示等待 Master 返回 PONG

主要是因为 Connect Handler 只会执行一次,后面的状态机的处理流程都在本函数,因此需要再次进入该函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* Send a PING to check the master is able to reply without errors. */
if (server.repl_state == REPL_STATE_CONNECTING) {
serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
connSetReadHandler(conn, syncWithMaster);
connSetWriteHandler(conn, NULL);
server.repl_state = REPL_STATE_RECEIVE_PING_REPLY;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
err = sendCommand(conn,"PING",NULL);
if (err) goto write_error;
return;
}
  1. 同步读 MasterPING 的回包,正常情况只要有回包都是没错误的,除非对方是旧版本。

  2. 设置状态 REPL_STATE_SEND_HANDSHAKE ,表示需要进行握手。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* Receive the PONG command. */
if (server.repl_state == REPL_STATE_RECEIVE_PING_REPLY) {
err = receiveSynchronousResponse(conn);

/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
if (err[0] != '+' &&
strncmp(err,"-NOAUTH",7) != 0 &&
strncmp(err,"-NOPERM",7) != 0 &&
strncmp(err,"-ERR operation not permitted",28) != 0)
{
serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
sdsfree(err);
goto error;
} else {
serverLog(LL_NOTICE,
"Master replied to PING, replication can continue...");
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_HANDSHAKE;
}
  1. 握手阶段主要是进行密码验证,将 SlaveIPPORT 传给 Master 方便查询,同时告诉 Master 我当前的能力,比如 EOF 为我支持 无盘传输psync2 表示支持部分同步。

  2. 设置状态 REPL_STATE_RECEIVE_AUTH_REPLY ,表示等待认证回包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) {
/* AUTH with the master if required. */
if (server.masterauth) {
char *args[3] = {"AUTH",NULL,NULL};
size_t lens[3] = {4,0,0};
int argc = 1;
if (server.masteruser) {
args[argc] = server.masteruser;
lens[argc] = strlen(server.masteruser);
argc++;
}
args[argc] = server.masterauth;
lens[argc] = sdslen(server.masterauth);
argc++;
err = sendCommandArgv(conn, argc, args, lens);
if (err) goto write_error;
}

/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
{
int port;
if (server.slave_announce_port)
port = server.slave_announce_port;
else if (server.tls_replication && server.tls_port)
port = server.tls_port;
else
port = server.port;
sds portstr = sdsfromlonglong(port);
err = sendCommand(conn,"REPLCONF",
"listening-port",portstr, NULL);
sdsfree(portstr);
if (err) goto write_error;
}

/* Set the slave ip, so that Master's INFO command can list the
* slave IP address port correctly in case of port forwarding or NAT.
* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
if (server.slave_announce_ip) {
err = sendCommand(conn,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
if (err) goto write_error;
}

/* Inform the master of our (slave) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The master will ignore capabilities it does not understand. */
err = sendCommand(conn,"REPLCONF",
"capa","eof","capa","psync2",NULL);
if (err) goto write_error;

server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY;
return;
}
  1. 检测认证情况。

  2. 设置状态 REPL_STATE_RECEIVE_PORT_REPLY ,表示等待 Master 确认端口配置是否正常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.masterauth)
server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY;

/* Receive AUTH reply. */
if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY) {
err = receiveSynchronousResponse(conn);
if (err[0] == '-') {
serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY;
return;
}
  1. 检测端口配置情况。

  2. 设置状态 REPL_STATE_RECEIVE_CAPA_REPLY,表示 Master 确认能力回包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.slave_announce_ip)
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;

/* Receive REPLCONF ip-address reply. */
if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY) {
err = receiveSynchronousResponse(conn);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF ip-address: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
return;
}
  1. 检测能力设置是否正常。

  2. 设置状态 REPL_STATE_SEND_PSYNC ,表示 开始进行同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
/* Receive CAPA reply. */
if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) {
err = receiveSynchronousResponse(conn);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF capa. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF capa: %s", err);
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_PSYNC;
}
  1. slaveTryPartialResynchronization(conn, 0) 表示给 Master 发送 PSYNC ? -1 ? 为 Master RunID-1 为进度。

  2. 设置状态 REPL_STATE_RECEIVE_PSYNC_REPLY ,表示等待 MasterPSYNC 回包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master replid
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
abortFailover("Write error to failover target");
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
return;
}
  1. slaveTryPartialResynchronization(conn,1) 表示同步读 Master 针对 PSYNC 的回包,看是要全量同步,还是要增量同步。不支持 PSYNC 则进行全量同步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
psync_result = slaveTryPartialResynchronization(conn,1);
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */

/* Check the status of the planned failover. We expect PSYNC_CONTINUE,
* but there is nothing technically wrong with a full resync which
* could happen in edge cases. */
if (server.failover_state == FAILOVER_IN_PROGRESS) {
if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) {
clearFailoverState();
} else {
abortFailover("Failover target rejected psync request");
return;
}
}
  1. 能够增量同步,在 slaveTryPartialResynchronization 中设置状态 REPL_STATE_CONNECTED ,表示已连接成功,直接返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* If the master is in an transient error, we should try to PSYNC
* from scratch later, so go to the error path. This happens when
* the server is loading the dataset or is not connected with its
* master and so forth. */
if (psync_result == PSYNC_TRY_LATER) goto error;

/* Note: if PSYNC does not return WAIT_REPLY, it will take care of
* uninstalling the read handler from the file descriptor. */

if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n");
}
return;
}
  1. 不支持增量同步,与所有 Slaves 端口,清空 backLog ,毕竟要重头开始了,通过 SYNC 进行同步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* PSYNC failed or is not supported: we want our slaves to resync with us
* as well, if we have any sub-slaves. The master may transfer us an
* entirely different data set and we have no way to incrementally feed
* our slaves after that. */
disconnectSlaves(); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */

/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.master_replid and master_initial_offset are
* already populated. */
if (psync_result == PSYNC_NOT_SUPPORTED) {
serverLog(LL_NOTICE,"Retrying with SYNC...");
if (connSyncWrite(conn,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
  1. 通过 RDB 文件传输,则先创建临时文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Prepare a suitable temp file for bulk transfer */
if (!useDisklessLoad()) {
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
goto error;
}
server.repl_transfer_tmpfile = zstrdup(tmpfile);
server.repl_transfer_fd = dfd;
}
  1. 设置 Read Handler,读文件,同时 设置状态 REPL_STATE_TRANSFER ,表示文件传送中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* Setup the non blocking download of the bulk file. */
if (connSetReadHandler(conn, readSyncBulkPayload)
== C_ERR)
{
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (%s)",
strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
goto error;
}

server.repl_state = REPL_STATE_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_lastio = server.unixtime;
return;

slaveTryPartialResynchronization

slaveTryPartialResynchronization 主要是和 Master 通信获取是否可以增量同步的信息。

前半部分,则是通过发送命令 PSYNC 来进行对接, cached_master 是之前意外断开的 Master 节点信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#define PSYNC_WRITE_ERROR 0
#define PSYNC_WAIT_REPLY 1
#define PSYNC_CONTINUE 2
#define PSYNC_FULLRESYNC 3
#define PSYNC_NOT_SUPPORTED 4
#define PSYNC_TRY_LATER 5

int slaveTryPartialResynchronization(connection *conn, int read_reply) {
char *psync_replid;
char psync_offset[32];
sds reply;

/* Writing half */
if (!read_reply) {
/* Initially set master_initial_offset to -1 to mark the current
* master replid and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.master_initial_offset = -1;

if (server.cached_master) {
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_replid = "?";
memcpy(psync_offset,"-1",3);
}

/* Issue the PSYNC command, if this is a master with a failover in
* progress then send the failover argument to the replica to cause it
* to become a master */
if (server.failover_state == FAILOVER_IN_PROGRESS) {
reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,"FAILOVER",NULL);
} else {
reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,NULL);
}

if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
connSetReadHandler(conn, NULL);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
}

后半部分则是读到 Master 的回包,并确认其是 全量同步 +FULLRESYNC 还是 增量同步 +CONTINUE

其中 RUN_ID 为一个40字符的随机值,每次启动实例随机生成, offset 相当于一个偏移量,用于之后同步完 RDB 后进行增量同步。

replid2 的出现主要是因为若从服务器被提拔为主服务器,其他的从服务器连到现在新的主服务器时,若直接校验 replid 则必然失败,因此出现了这个变量来保存上次同步的主服务器ID。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
    /* Reading half */
reply = receiveSynchronousResponse(conn);
if (sdslen(reply) == 0) {
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}

connSetReadHandler(conn, NULL);

if (!strncmp(reply,"+FULLRESYNC",11)) {
char *replid = NULL, *offset = NULL;

/* FULL RESYNC, parse the reply in order to extract the replid
* and the replication offset. */
replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
}
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
server.master_replid,
server.master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}

if (!strncmp(reply,"+CONTINUE",9)) {
/* Partial resync was accepted. */
serverLog(LL_NOTICE,
"Successful partial resynchronization with master.");

/* Check the new replication ID advertised by the master. If it
* changed, we need to set the new ID as primary ID, and set or
* secondary ID as the old master ID up to the current offset, so
* that our sub-slaves will be able to PSYNC with us after a
* disconnection. */
char *start = reply+10;
char *end = reply+9;
while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
if (end-start == CONFIG_RUN_ID_SIZE) {
char new[CONFIG_RUN_ID_SIZE+1];
memcpy(new,start,CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0';

if (strcmp(new,server.cached_master->replid)) {
/* Master ID changed. */
serverLog(LL_WARNING,"Master replication ID changed to %s",new);

/* Set the old ID as our ID2, up to the current offset+1. */
memcpy(server.replid2,server.cached_master->replid,
sizeof(server.replid2));
server.second_replid_offset = server.master_repl_offset+1;

/* Update the cached master ID and our own primary ID to the
* new one. */
memcpy(server.replid,new,sizeof(server.replid));
memcpy(server.cached_master->replid,new,sizeof(server.replid));

/* Disconnect all the sub-slaves: they need to be notified. */
disconnectSlaves();
}
}

/* Setup the replication to continue. */
sdsfree(reply);
replicationResurrectCachedMaster(conn);

/* If this instance was restarted and we read the metadata to
* PSYNC from the persistence file, our replication backlog could
* be still not initialized. Create it. */
if (server.repl_backlog == NULL) createReplicationBacklog();
return PSYNC_CONTINUE;
}

/* If we reach this point we received either an error (since the master does
* not understand PSYNC or because it is in a special state and cannot
* serve our request), or an unexpected reply from the master.
*
* Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
* return PSYNC_TRY_LATER if we believe this is a transient error. */

if (!strncmp(reply,"-NOMASTERLINK",13) ||
!strncmp(reply,"-LOADING",8))
{
serverLog(LL_NOTICE,
"Master is currently unable to PSYNC "
"but should be in the future: %s", reply);
sdsfree(reply);
return PSYNC_TRY_LATER;
}

if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
serverLog(LL_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
serverLog(LL_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}

readSyncBulkPayload

readSyncBulkPayload 主要负责读取 MasterRDB 文件(也可以是无盘传输)。

  1. 如果刚开始传输(通过 server.repl_transfer_size == 1 判断),则先检查协议,同时查看是通过文件传输还是无盘传输,如果是文件,则可以提前获取文件大小,否则通过 EOF 标记代表无盘传输,以 eofmark 作为结尾的标记。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(connection *conn) {
char buf[PROTO_IOBUF_LEN];
ssize_t nread, readlen, nwritten;
int use_diskless_load = useDisklessLoad();
dbBackup *diskless_load_backup = NULL;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
off_t left;

/* Static vars used to hold the EOF mark, and the last bytes received
* from the server: when they match, we reached the end of the transfer. */
static char eofmark[CONFIG_RUN_ID_SIZE];
static char lastbytes[CONFIG_RUN_ID_SIZE];
static int usemark = 0;

/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
if (server.repl_transfer_size == -1) {
if (connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
}

if (buf[0] == '-') {
serverLog(LL_WARNING,
"MASTER aborted replication with an error: %s",
buf+1);
goto error;
} else if (buf[0] == '\0') {
/* At this stage just a newline works as a PING in order to take
* the connection live. So we refresh our last interaction
* timestamp. */
server.repl_transfer_lastio = server.unixtime;
return;
} else if (buf[0] != '$') {
serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}

/* There are two possible forms for the bulk payload. One is the
* usual $<count> bulk format. The other is used for diskless transfers
* when the master does not know beforehand the size of the file to
* transfer. In the latter case, the following format is used:
*
* $EOF:<40 bytes delimiter>
*
* At the end of the file the announced delimiter is transmitted. The
* delimiter is long and random enough that the probability of a
* collision with the actual file content can be ignored. */
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
usemark = 1;
memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
/* Set any repl_transfer_size to avoid entering this code path
* at the next call. */
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
use_diskless_load? "to parser":"to disk");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master %s",
(long long) server.repl_transfer_size,
use_diskless_load? "to parser":"to disk");
}
return;
}
  1. 非无盘加载 则无论文件传输还是无盘传输都先写入文件再读取。

若是无盘传输,通过 eofmarklastbytes 对比得到是否传输完成。

Redis 源码将 无盘加载和有盘加载的代码进行拆分,为了方便剖析,此处进行合并。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
if (!use_diskless_load) {
/* Read the data from the socket, store it to a file and search
* for the EOF. */
if (usemark) {
readlen = sizeof(buf);
} else {
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}

nread = connRead(conn,buf,readlen);
if (nread <= 0) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
/* equivalent to EAGAIN */
return;
}
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake(1);
return;
}
atomicIncr(server.stat_net_input_bytes, nread);

/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */
int eof_reached = 0;

if (usemark) {
/* Update the last bytes array, and check if it matches our
* delimiter. */
if (nread >= CONFIG_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,
CONFIG_RUN_ID_SIZE);
} else {
int rem = CONFIG_RUN_ID_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0)
eof_reached = 1;
}

/* Update the last I/O time for the replication transfer (used in
* order to detect timeouts during replication), and write what we
* got from the socket to the dump file on disk. */
server.repl_transfer_lastio = server.unixtime;
if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
serverLog(LL_WARNING,
"Write error or short write writing to the DB dump file "
"needed for MASTER <-> REPLICA synchronization: %s",
(nwritten == -1) ? strerror(errno) : "short write");
goto error;
}
server.repl_transfer_read += nread;

/* Delete the last 40 bytes from the file if we reached EOF. */
if (usemark && eof_reached) {
if (ftruncate(server.repl_transfer_fd,
server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
{
serverLog(LL_WARNING,
"Error truncating the RDB file received from the master "
"for SYNC: %s", strerror(errno));
goto error;
}
}

/* Sync data on disk from time to time, otherwise at the end of the
* transfer we may suffer a big delay as the memory buffers are copied
* into the actual disk. */
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read -
server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off, sync_size);
server.repl_transfer_last_fsync_off += sync_size;
}

/* Check if the transfer is now complete */
if (!usemark) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}

/* If the transfer is yet not complete, we need to read more, so
* return ASAP and wait for the handler to be called again. */
if (!eof_reached) return;
}

/* We reach this point in one of the following cases:
*
* 1. The replica is using diskless replication, that is, it reads data
* directly from the socket to the Redis memory, without using
* a temporary RDB file on disk. In that case we just block and
* read everything from the socket.
*
* 2. Or when we are done reading from the socket to the RDB file, in
* such case we want just to read the RDB file in memory. */
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");

/* We need to stop any AOF rewriting child before flusing and parsing
* the RDB, otherwise we'll create a copy-on-write disaster. */
if (server.aof_state != AOF_OFF) stopAppendOnly();

/* When diskless RDB loading is used by replicas, it may be configured
* in order to save the current DB instead of throwing it away,
* so that we can restore it in case of failed transfer. */

/* Ensure background save doesn't overwrite synced data */
if (server.child_type == CHILD_TYPE_RDB) {
serverLog(LL_NOTICE,
"Replica is about to load the RDB file received from the "
"master, but there is a pending RDB child running. "
"Killing process %ld and removing its temp file to avoid "
"any race",
(long) server.child_pid);
killRDBChild();
}

/* Make sure the new file (also used for persistence) is fully synced
* (not covered by earlier calls to rdb_fsync_range). */
if (fsync(server.repl_transfer_fd) == -1) {
serverLog(LL_WARNING,
"Failed trying to sync the temp DB to disk in "
"MASTER <-> REPLICA synchronization: %s",
strerror(errno));
cancelReplicationHandshake(1);
return;
}

/* Rename rdb like renaming rewrite aof asynchronously. */
int old_rdb_fd = open(server.rdb_filename,O_RDONLY|O_NONBLOCK);
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,
"Failed trying to rename the temp DB into %s in "
"MASTER <-> REPLICA synchronization: %s",
server.rdb_filename, strerror(errno));
cancelReplicationHandshake(1);
if (old_rdb_fd != -1) close(old_rdb_fd);
return;
}
/* Close old rdb asynchronously. */
if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd);

if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization "
"DB from disk");
cancelReplicationHandshake(1);
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
serverLog(LL_NOTICE,"Removing the RDB file obtained from "
"the master. This replica has persistence "
"disabled");
bg_unlink(server.rdb_filename);
}
/* Note that there's no point in restarting the AOF on sync failure,
it'll be restarted when sync succeeds or replica promoted. */
return;
}

/* Cleanup. */
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
serverLog(LL_NOTICE,"Removing the RDB file obtained from "
"the master. This replica has persistence "
"disabled");
bg_unlink(server.rdb_filename);
}

zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
server.repl_transfer_fd = -1;
server.repl_transfer_tmpfile = NULL;
}
  1. 无盘加载

删除 socketRead Handler ,因为后续的加载操作通过 RIO 去加载,一边读取 TCP流,一边进行加载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
if (use_diskless_load &&
server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
{
/* Create a backup of server.db[] and initialize to empty
* dictionaries. */
diskless_load_backup = disklessLoadMakeBackup();
}
/* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
* (Where disklessLoadMakeBackup left server.db empty) because we
* want to execute all the auxiliary logic of emptyDb (Namely,
* fire module events) */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);

/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
connSetReadHandler(conn, NULL);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
rioInitWithConn(&rdb,conn,server.repl_transfer_size);

/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout*1000);
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);

if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) {
/* RDB loading failed. */
stopLoading(0);
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB "
"from socket");
cancelReplicationHandshake(1);
rioFreeConn(&rdb, NULL);

/* Remove the half-loaded data in case we started with
* an empty replica. */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);

if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Restore the backed up databases. */
disklessLoadRestoreBackup(diskless_load_backup);
}

/* Note that there's no point in restarting the AOF on SYNC
* failure, it'll be restarted when sync succeeds or the replica
* gets promoted. */
return;
}

/* RDB loading succeeded if we reach this point. */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Delete the backup databases we created before starting to load
* the new RDB. Now the RDB was loaded with success so the old
* data is useless. */
disklessLoadDiscardBackup(diskless_load_backup, empty_db_flags);
}

/* Verify the end mark is correct. */
if (usemark) {
if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) ||
memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0)
{
stopLoading(0);
serverLog(LL_WARNING,"Replication stream EOF marker is broken");
cancelReplicationHandshake(1);
rioFreeConn(&rdb, NULL);
return;
}
}

stopLoading(1);

/* Cleanup and restore the socket to the original state to continue
* with the normal replication. */
rioFreeConn(&rdb, NULL);
connNonBlock(conn);
connRecvTimeout(conn,0);

replicationCron

replicationCronMasterSlave 都会走到, MasterSlave 发心跳,而 SlaveMaster 发当前的进度,用于展示时使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void replicationCron(void) {
/* Non blocking connection timeout? */
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
cancelReplicationHandshake(1);
}

/* Bulk transfer I/O timeout? */
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
cancelReplicationHandshake(1);
}

/* Timed out master when we are an already connected slave? */
if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
freeClient(server.master);
}

/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
connectWithMaster();
}

/* Send ACK to master from time to time.
* Note that we do not send periodic acks to masters that don't
* support PSYNC and replication offsets. */
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
replicationSendAck();
}

Master

Master 在收到 PSYNC 或者 SYNC 后,会调用 syncCommand

syncCommand

  1. 若是 PSYNC 则会调用 masterTryPartialResynchronization 来判断是否可以增量同步(从 repl_backlog 缓冲区中查找),否则全量同步。
  2. 若为 SYNC 则 设置 Client→flagsCLIENT_PRE_PSYNC ,表示 Slave 不会发送 ACK ,不能因为其不发就认为其宕机。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void syncCommand(client *c) {
....
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
*
* +FULLRESYNC <replid> <offset>
*
* So the slave knows the new replid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= CLIENT_PRE_PSYNC;
}
  1. 往下走就全是全量同步了,若已有 BGSAVE 命令再执行,则尝试复用 生成出来的 RDB ,将其他 Slave 的输出缓冲区拷给当前 Slave 来达到同步的目的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (server.repl_disable_tcp_nodelay)
connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);

/* Create the replication backlog if needed. */
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
serverLog(LL_NOTICE,"Replication backlog created, my new "
"replication IDs are '%s' and '%s'",
server.replid, server.replid2);
}

/* CASE 1: BGSAVE is in progress, with disk target. */
if (server.child_type == CHILD_TYPE_RDB &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save. */
client *slave;
listNode *ln;
listIter li;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
/* If the client needs a buffer of commands, we can't use
* a replica without replication buffer. */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
(!(slave->flags & CLIENT_REPL_RDBONLY) ||
(c->flags & CLIENT_REPL_RDBONLY)))
break;
}
/* To attach this slave, we check that it has at least all the
* capabilities of the slave that triggered the current BGSAVE. */
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer.
* We don't copy buffer if clients don't want. */
if (!(c->flags & CLIENT_REPL_RDBONLY)) copyClientOutputBuffer(c,slave);
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences. */
serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");
}
  1. 若在执行无盘传送,说明启用了子进程进行序列化,再通过匿名管道传给父进程,父进程再通过 Socket 发给 Slave ,因此我们在这个时候应该等待。
1
2
3
4
5
6
7
8
/* CASE 2: BGSAVE is in progress, with socket target. */
} else if (server.child_type == CHILD_TYPE_RDB &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
/* There is an RDB child process but it is writing directly to
* children sockets. We need to wait for the next BGSAVE
* in order to synchronize. */
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
  1. 没有后台进程再运行,则无论是 无盘同步 还是 RDB同步 都会走到 startBgsaveForReplication 这个函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* CASE 3: There is no BGSAVE is progress. */
} else {
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF) &&
server.repl_diskless_sync_delay)
{
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
} else {
/* We don't have a BGSAVE in progress, let's start one. Diskless
* or disk-based mode is determined by replica's capacity. */
if (!hasActiveChildProcess()) {
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but another BG operation is active. "
"BGSAVE for replication delayed");
}
}
}

startBgsaveForReplication

决定无盘同步还是RDB同步, rdbSaveToSlavesSocketrdbSaveBackground 名字已经很清晰了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int startBgsaveForReplication(int mincapa) {
int retval;
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li;
listNode *ln;

serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "replicas sockets" : "disk");

rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
/* Only do rdbSave* when rsiptr is not NULL,
* otherwise slave will miss repl-stream-db. */
if (rsiptr) {
if (socket_target)
retval = rdbSaveToSlavesSockets(rsiptr);
else
retval = rdbSaveBackground(server.rdb_filename,rsiptr);
} else {
serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
retval = C_ERR;
}

/* If the target is socket, rdbSaveToSlavesSockets() already setup
* the slaves for a full resync. Otherwise for disk target do it now.*/
if (!socket_target) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
replicationSetupSlaveForFullResync(slave,
getPsyncInitialOffset());
}
}
}
return retval;
}

特别注意的是,无盘传输也是采用子进程的形式完成,但是绝不是通过子进程进行发送,而是子进程序列化好后通过匿名管道发给父进程,父进程再读取将其发往 Slave

rdbSaveToSlavesSockets

创建 匿名管道 ,通过 RIO 将内存序列化后写入 管道 中,父进程通过管道取出发到 Slave

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/* Spawn an RDB child that writes the RDB to the sockets of the slaves
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
listNode *ln;
listIter li;
pid_t childpid;
int pipefds[2], rdb_pipe_write, safe_to_exit_pipe;

server.rdb_pipe_read = pipefds[0]; /* read end */
rdb_pipe_write = pipefds[1]; /* write end */
anetNonBlock(NULL, server.rdb_pipe_read);

safe_to_exit_pipe = pipefds[0]; /* read end */
server.rdb_child_exit_pipe = pipefds[1]; /* write end */

/* Collect the connections of the replicas we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */
server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
}
}

/* Create the child process. */
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
/* Child */
int retval, dummy;
rio rdb;

rioInitWithFd(&rdb,rdb_pipe_write);

retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
if (retval == C_OK && rioFlush(&rdb) == 0)
retval = C_ERR;

if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}

rioFreeFd(&rdb);
/* wake up the reader, tell it we're done. */
close(rdb_pipe_write);
close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
/* hold exit until the parent tells us it's safe. we're not expecting
* to read anything, just get the error when the pipe is closed. */
dummy = read(safe_to_exit_pipe, pipefds, 1);
UNUSED(dummy);
exitFromChild((retval == C_OK) ? 0 : 1);

父进程注册管道的可读事件,从 rdbPipeReadHandler 读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
    } else {
/* Parent */
close(safe_to_exit_pipe);
if (childpid == -1) {
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));

/* Undo the state change. The caller will perform cleanup on
* all the slaves in BGSAVE_START state, but an early call to
* replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
}
}
close(rdb_pipe_write);
close(server.rdb_pipe_read);
zfree(server.rdb_pipe_conns);
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
} else {
serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld",
(long) childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
}
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */
}

至此主从同步就已剖析完了,之后的命令传送则通过 propagate 函数进行传递。

主从同步的坑

主从数据不一致

  • 主从同步本来就是异步过程,应从部署方面考虑。

读到过期数据

  • 带有相对时间过期的命令发送到从服务器后,已经滞后了,最好使用绝对时间。
  • Redis 旧版本的从库就算读过期数据,也会原样返回(新版本返回空值)。