Diskless replica: refactoring of DBs backups.
This commit is contained in:
parent
3300e98ff0
commit
3bbb9a1413
@ -1119,6 +1119,49 @@ static int useDisklessLoad() {
|
|||||||
(server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
|
(server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Helper function for readSyncBulkPayload() to make backups of the current
|
||||||
|
* DBs before socket-loading the new ones. The backups may be restored later
|
||||||
|
* or freed by disklessLoadRestoreBackups(). */
|
||||||
|
redisDb *disklessLoadMakeBackups(void) {
|
||||||
|
redisDb *backups = zmalloc(sizeof(redisDb)*server.dbnum);
|
||||||
|
for (int i=0; i<server.dbnum; i++) {
|
||||||
|
backups[i] = server.db[i];
|
||||||
|
server.db[i].dict = dictCreate(&dbDictType,NULL);
|
||||||
|
server.db[i].expires = dictCreate(&keyptrDictType,NULL);
|
||||||
|
}
|
||||||
|
return backups;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Helper function for readSyncBulkPayload(): when replica-side diskless
|
||||||
|
* database loading is used, Redis makes a backup of the existing databases
|
||||||
|
* before loading the new ones from the socket.
|
||||||
|
*
|
||||||
|
* If the socket loading went wrong, we want to restore the old backups
|
||||||
|
* into the server databases. This function does just that in the case
|
||||||
|
* the 'count' argument (the number of DBs to replace) is non-zero.
|
||||||
|
*
|
||||||
|
* When instead the loading succeeded we want just to free our old backups,
|
||||||
|
* in that case the funciton will do just that when 'count' is 0. */
|
||||||
|
void disklessLoadRestoreBackups(redisDb *backup, int count, int empty_db_flags)
|
||||||
|
{
|
||||||
|
if (count) {
|
||||||
|
/* Restore. */
|
||||||
|
emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback);
|
||||||
|
for (int i=0; i<count; i++) {
|
||||||
|
dictRelease(server.db[i].dict);
|
||||||
|
dictRelease(server.db[i].expires);
|
||||||
|
server.db[i] = backup[i];
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
/* Delete. */
|
||||||
|
emptyDbGeneric(backup,-1,empty_db_flags,replicationEmptyDbCallback);
|
||||||
|
for (int i=0; i<count; i++) {
|
||||||
|
dictRelease(backup[i].dict);
|
||||||
|
dictRelease(backup[i].expires);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
zfree(backup);
|
||||||
|
}
|
||||||
|
|
||||||
/* Asynchronously read the SYNC payload we receive from a master */
|
/* Asynchronously read the SYNC payload we receive from a master */
|
||||||
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
|
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
|
||||||
@ -1129,7 +1172,6 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
redisDb *diskless_load_backup = NULL;
|
redisDb *diskless_load_backup = NULL;
|
||||||
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
|
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
|
||||||
EMPTYDB_NO_FLAGS;
|
EMPTYDB_NO_FLAGS;
|
||||||
int i;
|
|
||||||
off_t left;
|
off_t left;
|
||||||
UNUSED(el);
|
UNUSED(el);
|
||||||
UNUSED(privdata);
|
UNUSED(privdata);
|
||||||
@ -1308,12 +1350,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
if (use_diskless_load &&
|
if (use_diskless_load &&
|
||||||
server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
|
server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
|
||||||
{
|
{
|
||||||
diskless_load_backup = zmalloc(sizeof(redisDb)*server.dbnum);
|
diskless_load_backup = disklessLoadMakeBackups();
|
||||||
for (i=0; i<server.dbnum; i++) {
|
|
||||||
diskless_load_backup[i] = server.db[i];
|
|
||||||
server.db[i].dict = dictCreate(&dbDictType,NULL);
|
|
||||||
server.db[i].expires = dictCreate(&keyptrDictType,NULL);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
|
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
|
||||||
}
|
}
|
||||||
@ -1345,14 +1382,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
rioFreeFd(&rdb, NULL);
|
rioFreeFd(&rdb, NULL);
|
||||||
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
|
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
|
||||||
/* Restore the backed up databases. */
|
/* Restore the backed up databases. */
|
||||||
emptyDbGeneric(server.db,-1,empty_db_flags,
|
disklessLoadRestoreBackups(diskless_load_backup,server.dbnum,
|
||||||
replicationEmptyDbCallback);
|
empty_db_flags);
|
||||||
for (i=0; i<server.dbnum; i++) {
|
|
||||||
dictRelease(server.db[i].dict);
|
|
||||||
dictRelease(server.db[i].expires);
|
|
||||||
server.db[i] = diskless_load_backup[i];
|
|
||||||
}
|
|
||||||
zfree(diskless_load_backup);
|
|
||||||
} else {
|
} else {
|
||||||
/* Remove the half-loaded data in case we started with
|
/* Remove the half-loaded data in case we started with
|
||||||
* an empty replica. */
|
* an empty replica. */
|
||||||
@ -1371,13 +1402,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
/* Delete the backup databases we created before starting to load
|
/* Delete the backup databases we created before starting to load
|
||||||
* the new RDB. Now the RDB was loaded with success so the old
|
* the new RDB. Now the RDB was loaded with success so the old
|
||||||
* data is useless. */
|
* data is useless. */
|
||||||
emptyDbGeneric(diskless_load_backup,-1,empty_db_flags,
|
disklessLoadRestoreBackups(diskless_load_backup,0,empty_db_flags);
|
||||||
replicationEmptyDbCallback);
|
|
||||||
for (i=0; i<server.dbnum; i++) {
|
|
||||||
dictRelease(diskless_load_backup[i].dict);
|
|
||||||
dictRelease(diskless_load_backup[i].expires);
|
|
||||||
}
|
|
||||||
zfree(diskless_load_backup);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Verify the end mark is correct. */
|
/* Verify the end mark is correct. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user