Backup keys to slots map and restore when fail to sync if diskless-load type is swapdb in cluster mode (#8108)
When replica diskless-load type is swapdb in cluster mode, we didn't backup keys to slots map, so we will lose keys to slots map if fail to sync. Now we backup keys to slots map at first, and restore it properly when fail. This commit includes a refactory/cleanup of the backups mechanism (moving it to db.c and re-structuring it a bit). Co-authored-by: Oran Agra <oran@redislabs.com> (cherry picked from commit 10712afaf3e7f2ea859622fa5b27c96ee8f478c5)
This commit is contained in:
parent
ce6eee028d
commit
f5d6057cbb
@ -203,7 +203,7 @@ void *bioProcessBackgroundJobs(void *arg) {
|
|||||||
/* What we free changes depending on what arguments are set:
|
/* What we free changes depending on what arguments are set:
|
||||||
* arg1 -> free the object at pointer.
|
* arg1 -> free the object at pointer.
|
||||||
* arg2 & arg3 -> free two dictionaries (a Redis DB).
|
* arg2 & arg3 -> free two dictionaries (a Redis DB).
|
||||||
* only arg3 -> free the skiplist. */
|
* only arg3 -> free the radix tree. */
|
||||||
if (job->arg1)
|
if (job->arg1)
|
||||||
lazyfreeFreeObjectFromBioThread(job->arg1);
|
lazyfreeFreeObjectFromBioThread(job->arg1);
|
||||||
else if (job->arg2 && job->arg3)
|
else if (job->arg2 && job->arg3)
|
||||||
|
218
src/db.c
218
src/db.c
@ -34,6 +34,13 @@
|
|||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
#include <ctype.h>
|
#include <ctype.h>
|
||||||
|
|
||||||
|
/* Database backup. */
|
||||||
|
struct dbBackup {
|
||||||
|
redisDb *dbarray;
|
||||||
|
rax *slots_to_keys;
|
||||||
|
uint64_t slots_keys_count[CLUSTER_SLOTS];
|
||||||
|
};
|
||||||
|
|
||||||
/*-----------------------------------------------------------------------------
|
/*-----------------------------------------------------------------------------
|
||||||
* C-level DB API
|
* C-level DB API
|
||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
@ -359,48 +366,18 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
|
|||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Remove all keys from all the databases in a Redis server.
|
/* Remove all keys from the database(s) structure. The dbarray argument
|
||||||
* If callback is given the function is called from time to time to
|
* may not be the server main DBs (could be a backup).
|
||||||
* signal that work is in progress.
|
|
||||||
*
|
*
|
||||||
* The dbnum can be -1 if all the DBs should be flushed, or the specified
|
* The dbnum can be -1 if all the DBs should be emptied, or the specified
|
||||||
* DB number if we want to flush only a single Redis database number.
|
* DB index if we want to empty only a single database.
|
||||||
*
|
* The function returns the number of keys removed from the database(s). */
|
||||||
* Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
|
long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
|
||||||
* 1. EMPTYDB_ASYNC if we want the memory to be freed in a different thread.
|
void(callback)(void*))
|
||||||
* 2. EMPTYDB_BACKUP if we want to empty the backup dictionaries created by
|
{
|
||||||
* disklessLoadMakeBackups. In that case we only free memory and avoid
|
|
||||||
* firing module events.
|
|
||||||
* and the function to return ASAP.
|
|
||||||
*
|
|
||||||
* On success the function returns the number of keys removed from the
|
|
||||||
* database(s). Otherwise -1 is returned in the specific case the
|
|
||||||
* DB number is out of range, and errno is set to EINVAL. */
|
|
||||||
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) {
|
|
||||||
int async = (flags & EMPTYDB_ASYNC);
|
|
||||||
int backup = (flags & EMPTYDB_BACKUP); /* Just free the memory, nothing else */
|
|
||||||
RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
|
|
||||||
long long removed = 0;
|
long long removed = 0;
|
||||||
|
|
||||||
if (dbnum < -1 || dbnum >= server.dbnum) {
|
|
||||||
errno = EINVAL;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Pre-flush actions */
|
|
||||||
if (!backup) {
|
|
||||||
/* Fire the flushdb modules event. */
|
|
||||||
moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
|
|
||||||
REDISMODULE_SUBEVENT_FLUSHDB_START,
|
|
||||||
&fi);
|
|
||||||
|
|
||||||
/* Make sure the WATCHed keys are affected by the FLUSH* commands.
|
|
||||||
* Note that we need to call the function while the keys are still
|
|
||||||
* there. */
|
|
||||||
signalFlushedDb(dbnum);
|
|
||||||
}
|
|
||||||
|
|
||||||
int startdb, enddb;
|
int startdb, enddb;
|
||||||
|
|
||||||
if (dbnum == -1) {
|
if (dbnum == -1) {
|
||||||
startdb = 0;
|
startdb = 0;
|
||||||
enddb = server.dbnum-1;
|
enddb = server.dbnum-1;
|
||||||
@ -416,34 +393,139 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(
|
|||||||
dictEmpty(dbarray[j].dict,callback);
|
dictEmpty(dbarray[j].dict,callback);
|
||||||
dictEmpty(dbarray[j].expires,callback);
|
dictEmpty(dbarray[j].expires,callback);
|
||||||
}
|
}
|
||||||
/* Because we will start a new database, reset average ttl. */
|
/* Because all keys of database are removed, reset average ttl. */
|
||||||
dbarray[j].avg_ttl = 0;
|
dbarray[j].avg_ttl = 0;
|
||||||
dbarray[j].expires_cursor = 0;
|
dbarray[j].expires_cursor = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Post-flush actions */
|
|
||||||
if (!backup) {
|
|
||||||
if (server.cluster_enabled) {
|
|
||||||
if (async) {
|
|
||||||
slotToKeyFlushAsync();
|
|
||||||
} else {
|
|
||||||
slotToKeyFlush();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (dbnum == -1) flushSlaveKeysWithExpireList();
|
|
||||||
|
|
||||||
/* Also fire the end event. Note that this event will fire almost
|
|
||||||
* immediately after the start event if the flush is asynchronous. */
|
|
||||||
moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
|
|
||||||
REDISMODULE_SUBEVENT_FLUSHDB_END,
|
|
||||||
&fi);
|
|
||||||
}
|
|
||||||
|
|
||||||
return removed;
|
return removed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Remove all keys from all the databases in a Redis server.
|
||||||
|
* If callback is given the function is called from time to time to
|
||||||
|
* signal that work is in progress.
|
||||||
|
*
|
||||||
|
* The dbnum can be -1 if all the DBs should be flushed, or the specified
|
||||||
|
* DB number if we want to flush only a single Redis database number.
|
||||||
|
*
|
||||||
|
* Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
|
||||||
|
* EMPTYDB_ASYNC if we want the memory to be freed in a different thread
|
||||||
|
* and the function to return ASAP.
|
||||||
|
*
|
||||||
|
* On success the function returns the number of keys removed from the
|
||||||
|
* database(s). Otherwise -1 is returned in the specific case the
|
||||||
|
* DB number is out of range, and errno is set to EINVAL. */
|
||||||
long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
|
long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
|
||||||
return emptyDbGeneric(server.db, dbnum, flags, callback);
|
int async = (flags & EMPTYDB_ASYNC);
|
||||||
|
RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
|
||||||
|
long long removed = 0;
|
||||||
|
|
||||||
|
if (dbnum < -1 || dbnum >= server.dbnum) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Fire the flushdb modules event. */
|
||||||
|
moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
|
||||||
|
REDISMODULE_SUBEVENT_FLUSHDB_START,
|
||||||
|
&fi);
|
||||||
|
|
||||||
|
/* Make sure the WATCHed keys are affected by the FLUSH* commands.
|
||||||
|
* Note that we need to call the function while the keys are still
|
||||||
|
* there. */
|
||||||
|
signalFlushedDb(dbnum);
|
||||||
|
|
||||||
|
/* Empty redis database structure. */
|
||||||
|
removed = emptyDbStructure(server.db, dbnum, async, callback);
|
||||||
|
|
||||||
|
/* Flush slots to keys map if enable cluster, we can flush entire
|
||||||
|
* slots to keys map whatever dbnum because only support one DB
|
||||||
|
* in cluster mode. */
|
||||||
|
if (server.cluster_enabled) slotToKeyFlush(async);
|
||||||
|
|
||||||
|
if (dbnum == -1) flushSlaveKeysWithExpireList();
|
||||||
|
|
||||||
|
/* Also fire the end event. Note that this event will fire almost
|
||||||
|
* immediately after the start event if the flush is asynchronous. */
|
||||||
|
moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
|
||||||
|
REDISMODULE_SUBEVENT_FLUSHDB_END,
|
||||||
|
&fi);
|
||||||
|
|
||||||
|
return removed;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Store a backup of the database for later use, and put an empty one
|
||||||
|
* instead of it. */
|
||||||
|
dbBackup *backupDb(void) {
|
||||||
|
dbBackup *backup = zmalloc(sizeof(dbBackup));
|
||||||
|
|
||||||
|
/* Backup main DBs. */
|
||||||
|
backup->dbarray = zmalloc(sizeof(redisDb)*server.dbnum);
|
||||||
|
for (int i=0; i<server.dbnum; i++) {
|
||||||
|
backup->dbarray[i] = server.db[i];
|
||||||
|
server.db[i].dict = dictCreate(&dbDictType,NULL);
|
||||||
|
server.db[i].expires = dictCreate(&keyptrDictType,NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Backup cluster slots to keys map if enable cluster. */
|
||||||
|
if (server.cluster_enabled) {
|
||||||
|
backup->slots_to_keys = server.cluster->slots_to_keys;
|
||||||
|
memcpy(backup->slots_keys_count, server.cluster->slots_keys_count,
|
||||||
|
sizeof(server.cluster->slots_keys_count));
|
||||||
|
server.cluster->slots_to_keys = raxNew();
|
||||||
|
memset(server.cluster->slots_keys_count, 0,
|
||||||
|
sizeof(server.cluster->slots_keys_count));
|
||||||
|
}
|
||||||
|
|
||||||
|
return backup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Discard a previously created backup, this can be slow (similar to FLUSHALL)
|
||||||
|
* Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */
|
||||||
|
void discardDbBackup(dbBackup *buckup, int flags, void(callback)(void*)) {
|
||||||
|
int async = (flags & EMPTYDB_ASYNC);
|
||||||
|
|
||||||
|
/* Release main DBs backup . */
|
||||||
|
emptyDbStructure(buckup->dbarray, -1, async, callback);
|
||||||
|
for (int i=0; i<server.dbnum; i++) {
|
||||||
|
dictRelease(buckup->dbarray[i].dict);
|
||||||
|
dictRelease(buckup->dbarray[i].expires);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Release slots to keys map backup if enable cluster. */
|
||||||
|
if (server.cluster_enabled) freeSlotsToKeysMap(buckup->slots_to_keys, async);
|
||||||
|
|
||||||
|
/* Release buckup. */
|
||||||
|
zfree(buckup->dbarray);
|
||||||
|
zfree(buckup);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Restore the previously created backup (discarding what currently resides
|
||||||
|
* in the db).
|
||||||
|
* This function should be called after the current contents of the database
|
||||||
|
* was emptied with a previous call to emptyDb (possibly using the async mode). */
|
||||||
|
void restoreDbBackup(dbBackup *buckup) {
|
||||||
|
/* Restore main DBs. */
|
||||||
|
for (int i=0; i<server.dbnum; i++) {
|
||||||
|
serverAssert(dictSize(server.db[i].dict) == 0);
|
||||||
|
serverAssert(dictSize(server.db[i].expires) == 0);
|
||||||
|
dictRelease(server.db[i].dict);
|
||||||
|
dictRelease(server.db[i].expires);
|
||||||
|
server.db[i] = buckup->dbarray[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Restore slots to keys map backup if enable cluster. */
|
||||||
|
if (server.cluster_enabled) {
|
||||||
|
serverAssert(server.cluster->slots_to_keys->numele == 0);
|
||||||
|
raxFree(server.cluster->slots_to_keys);
|
||||||
|
server.cluster->slots_to_keys = buckup->slots_to_keys;
|
||||||
|
memcpy(server.cluster->slots_keys_count, buckup->slots_keys_count,
|
||||||
|
sizeof(server.cluster->slots_keys_count));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Release buckup. */
|
||||||
|
zfree(buckup->dbarray);
|
||||||
|
zfree(buckup);
|
||||||
}
|
}
|
||||||
|
|
||||||
int selectDb(client *c, int id) {
|
int selectDb(client *c, int id) {
|
||||||
@ -1705,11 +1787,25 @@ void slotToKeyDel(sds key) {
|
|||||||
slotToKeyUpdateKey(key,0);
|
slotToKeyUpdateKey(key,0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void slotToKeyFlush(void) {
|
/* Release the radix tree mapping Redis Cluster keys to slots. If 'async'
|
||||||
raxFree(server.cluster->slots_to_keys);
|
* is true, we release it asynchronously. */
|
||||||
|
void freeSlotsToKeysMap(rax *rt, int async) {
|
||||||
|
if (async) {
|
||||||
|
freeSlotsToKeysMapAsync(rt);
|
||||||
|
} else {
|
||||||
|
raxFree(rt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Empty the slots-keys map of Redis CLuster by creating a new empty one and
|
||||||
|
* freeing the old one. */
|
||||||
|
void slotToKeyFlush(int async) {
|
||||||
|
rax *old = server.cluster->slots_to_keys;
|
||||||
|
|
||||||
server.cluster->slots_to_keys = raxNew();
|
server.cluster->slots_to_keys = raxNew();
|
||||||
memset(server.cluster->slots_keys_count,0,
|
memset(server.cluster->slots_keys_count,0,
|
||||||
sizeof(server.cluster->slots_keys_count));
|
sizeof(server.cluster->slots_keys_count));
|
||||||
|
freeSlotsToKeysMap(old, async);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Pupulate the specified array of objects with keys in the specified slot.
|
/* Pupulate the specified array of objects with keys in the specified slot.
|
||||||
|
@ -136,16 +136,10 @@ void emptyDbAsync(redisDb *db) {
|
|||||||
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
|
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Empty the slots-keys map of Redis CLuster by creating a new empty one
|
/* Release the radix tree mapping Redis Cluster keys to slots asynchronously. */
|
||||||
* and scheduling the old for lazy freeing. */
|
void freeSlotsToKeysMapAsync(rax *rt) {
|
||||||
void slotToKeyFlushAsync(void) {
|
atomicIncr(lazyfree_objects,rt->numele);
|
||||||
rax *old = server.cluster->slots_to_keys;
|
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,rt);
|
||||||
|
|
||||||
server.cluster->slots_to_keys = raxNew();
|
|
||||||
memset(server.cluster->slots_keys_count,0,
|
|
||||||
sizeof(server.cluster->slots_keys_count));
|
|
||||||
atomicIncr(lazyfree_objects,old->numele);
|
|
||||||
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,old);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Release objects from the lazyfree thread. It's just decrRefCount()
|
/* Release objects from the lazyfree thread. It's just decrRefCount()
|
||||||
@ -157,9 +151,7 @@ void lazyfreeFreeObjectFromBioThread(robj *o) {
|
|||||||
|
|
||||||
/* Release a database from the lazyfree thread. The 'db' pointer is the
|
/* Release a database from the lazyfree thread. The 'db' pointer is the
|
||||||
* database which was substituted with a fresh one in the main thread
|
* database which was substituted with a fresh one in the main thread
|
||||||
* when the database was logically deleted. 'sl' is a skiplist used by
|
* when the database was logically deleted. */
|
||||||
* Redis Cluster in order to take the hash slots -> keys mapping. This
|
|
||||||
* may be NULL if Redis Cluster is disabled. */
|
|
||||||
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
|
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
|
||||||
size_t numkeys = dictSize(ht1);
|
size_t numkeys = dictSize(ht1);
|
||||||
dictRelease(ht1);
|
dictRelease(ht1);
|
||||||
@ -167,7 +159,7 @@ void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
|
|||||||
atomicDecr(lazyfree_objects,numkeys);
|
atomicDecr(lazyfree_objects,numkeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Release the skiplist mapping Redis Cluster keys to slots in the
|
/* Release the radix tree mapping Redis Cluster keys to slots in the
|
||||||
* lazyfree thread. */
|
* lazyfree thread. */
|
||||||
void lazyfreeFreeSlotsMapFromBioThread(rax *rt) {
|
void lazyfreeFreeSlotsMapFromBioThread(rax *rt) {
|
||||||
size_t len = rt->numele;
|
size_t len = rt->numele;
|
||||||
|
@ -1434,16 +1434,10 @@ static int useDisklessLoad() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Helper function for readSyncBulkPayload() to make backups of the current
|
/* Helper function for readSyncBulkPayload() to make backups of the current
|
||||||
* DBs before socket-loading the new ones. The backups may be restored later
|
* databases before socket-loading the new ones. The backups may be restored
|
||||||
* or freed by disklessLoadRestoreBackups(). */
|
* by disklessLoadRestoreBackup or freed by disklessLoadDiscardBackup later. */
|
||||||
redisDb *disklessLoadMakeBackups(void) {
|
dbBackup *disklessLoadMakeBackup(void) {
|
||||||
redisDb *backups = zmalloc(sizeof(redisDb)*server.dbnum);
|
return backupDb();
|
||||||
for (int i=0; i<server.dbnum; i++) {
|
|
||||||
backups[i] = server.db[i];
|
|
||||||
server.db[i].dict = dictCreate(&dbDictType,NULL);
|
|
||||||
server.db[i].expires = dictCreate(&keyptrDictType,NULL);
|
|
||||||
}
|
|
||||||
return backups;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Helper function for readSyncBulkPayload(): when replica-side diskless
|
/* Helper function for readSyncBulkPayload(): when replica-side diskless
|
||||||
@ -1451,30 +1445,15 @@ redisDb *disklessLoadMakeBackups(void) {
|
|||||||
* before loading the new ones from the socket.
|
* before loading the new ones from the socket.
|
||||||
*
|
*
|
||||||
* If the socket loading went wrong, we want to restore the old backups
|
* If the socket loading went wrong, we want to restore the old backups
|
||||||
* into the server databases. This function does just that in the case
|
* into the server databases. */
|
||||||
* the 'restore' argument (the number of DBs to replace) is non-zero.
|
void disklessLoadRestoreBackup(dbBackup *buckup) {
|
||||||
*
|
restoreDbBackup(buckup);
|
||||||
* When instead the loading succeeded we want just to free our old backups,
|
}
|
||||||
* in that case the function will do just that when 'restore' is 0. */
|
|
||||||
void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags)
|
/* Helper function for readSyncBulkPayload() to discard our old backups
|
||||||
{
|
* when the loading succeeded. */
|
||||||
if (restore) {
|
void disklessLoadDiscardBackup(dbBackup *buckup, int flag) {
|
||||||
/* Restore. */
|
discardDbBackup(buckup, flag, replicationEmptyDbCallback);
|
||||||
emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback);
|
|
||||||
for (int i=0; i<server.dbnum; i++) {
|
|
||||||
dictRelease(server.db[i].dict);
|
|
||||||
dictRelease(server.db[i].expires);
|
|
||||||
server.db[i] = backup[i];
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
/* Delete (Pass EMPTYDB_BACKUP in order to avoid firing module events) . */
|
|
||||||
emptyDbGeneric(backup,-1,empty_db_flags|EMPTYDB_BACKUP,replicationEmptyDbCallback);
|
|
||||||
for (int i=0; i<server.dbnum; i++) {
|
|
||||||
dictRelease(backup[i].dict);
|
|
||||||
dictRelease(backup[i].expires);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
zfree(backup);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Asynchronously read the SYNC payload we receive from a master */
|
/* Asynchronously read the SYNC payload we receive from a master */
|
||||||
@ -1483,7 +1462,7 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
char buf[PROTO_IOBUF_LEN];
|
char buf[PROTO_IOBUF_LEN];
|
||||||
ssize_t nread, readlen, nwritten;
|
ssize_t nread, readlen, nwritten;
|
||||||
int use_diskless_load = useDisklessLoad();
|
int use_diskless_load = useDisklessLoad();
|
||||||
redisDb *diskless_load_backup = NULL;
|
dbBackup *diskless_load_backup = NULL;
|
||||||
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
|
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
|
||||||
EMPTYDB_NO_FLAGS;
|
EMPTYDB_NO_FLAGS;
|
||||||
off_t left;
|
off_t left;
|
||||||
@ -1664,11 +1643,11 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
|
server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
|
||||||
{
|
{
|
||||||
/* Create a backup of server.db[] and initialize to empty
|
/* Create a backup of server.db[] and initialize to empty
|
||||||
* dictionaries */
|
* dictionaries. */
|
||||||
diskless_load_backup = disklessLoadMakeBackups();
|
diskless_load_backup = disklessLoadMakeBackup();
|
||||||
}
|
}
|
||||||
/* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
|
/* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
|
||||||
* (Where disklessLoadMakeBackups left server.db empty) because we
|
* (Where disklessLoadMakeBackup left server.db empty) because we
|
||||||
* want to execute all the auxiliary logic of emptyDb (Namely,
|
* want to execute all the auxiliary logic of emptyDb (Namely,
|
||||||
* fire module events) */
|
* fire module events) */
|
||||||
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
|
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
|
||||||
@ -1698,14 +1677,14 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
"from socket");
|
"from socket");
|
||||||
cancelReplicationHandshake();
|
cancelReplicationHandshake();
|
||||||
rioFreeConn(&rdb, NULL);
|
rioFreeConn(&rdb, NULL);
|
||||||
|
|
||||||
|
/* Remove the half-loaded data in case we started with
|
||||||
|
* an empty replica. */
|
||||||
|
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
|
||||||
|
|
||||||
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
|
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
|
||||||
/* Restore the backed up databases. */
|
/* Restore the backed up databases. */
|
||||||
disklessLoadRestoreBackups(diskless_load_backup,1,
|
disklessLoadRestoreBackup(diskless_load_backup);
|
||||||
empty_db_flags);
|
|
||||||
} else {
|
|
||||||
/* Remove the half-loaded data in case we started with
|
|
||||||
* an empty replica. */
|
|
||||||
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Note that there's no point in restarting the AOF on SYNC
|
/* Note that there's no point in restarting the AOF on SYNC
|
||||||
@ -1720,7 +1699,7 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
/* Delete the backup databases we created before starting to load
|
/* Delete the backup databases we created before starting to load
|
||||||
* the new RDB. Now the RDB was loaded with success so the old
|
* the new RDB. Now the RDB was loaded with success so the old
|
||||||
* data is useless. */
|
* data is useless. */
|
||||||
disklessLoadRestoreBackups(diskless_load_backup,0,empty_db_flags);
|
disklessLoadDiscardBackup(diskless_load_backup, empty_db_flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Verify the end mark is correct. */
|
/* Verify the end mark is correct. */
|
||||||
|
20
src/server.h
20
src/server.h
@ -674,6 +674,11 @@ typedef struct redisDb {
|
|||||||
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
||||||
} redisDb;
|
} redisDb;
|
||||||
|
|
||||||
|
/* Declare database backup that include redis main DBs and slots to keys map.
|
||||||
|
* Definition is in db.c. We can't define it here since we define CLUSTER_SLOTS
|
||||||
|
* in cluster.h. */
|
||||||
|
typedef struct dbBackup dbBackup;
|
||||||
|
|
||||||
/* Client MULTI/EXEC state */
|
/* Client MULTI/EXEC state */
|
||||||
typedef struct multiCmd {
|
typedef struct multiCmd {
|
||||||
robj **argv;
|
robj **argv;
|
||||||
@ -2153,11 +2158,14 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
|
|||||||
|
|
||||||
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
|
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
|
||||||
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
|
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
|
||||||
#define EMPTYDB_BACKUP (1<<2) /* DB array is a backup for REPL_DISKLESS_LOAD_SWAPDB. */
|
|
||||||
long long emptyDb(int dbnum, int flags, void(callback)(void*));
|
long long emptyDb(int dbnum, int flags, void(callback)(void*));
|
||||||
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
|
long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, void(callback)(void*));
|
||||||
void flushAllDataAndResetRDB(int flags);
|
void flushAllDataAndResetRDB(int flags);
|
||||||
long long dbTotalServerKeyCount();
|
long long dbTotalServerKeyCount();
|
||||||
|
dbBackup *backupDb(void);
|
||||||
|
void restoreDbBackup(dbBackup *buckup);
|
||||||
|
void discardDbBackup(dbBackup *buckup, int flags, void(callback)(void*));
|
||||||
|
|
||||||
|
|
||||||
int selectDb(client *c, int id);
|
int selectDb(client *c, int id);
|
||||||
void signalModifiedKey(client *c, redisDb *db, robj *key);
|
void signalModifiedKey(client *c, redisDb *db, robj *key);
|
||||||
@ -2170,12 +2178,14 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor);
|
|||||||
int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor);
|
int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor);
|
||||||
void slotToKeyAdd(sds key);
|
void slotToKeyAdd(sds key);
|
||||||
void slotToKeyDel(sds key);
|
void slotToKeyDel(sds key);
|
||||||
void slotToKeyFlush(void);
|
|
||||||
int dbAsyncDelete(redisDb *db, robj *key);
|
int dbAsyncDelete(redisDb *db, robj *key);
|
||||||
void emptyDbAsync(redisDb *db);
|
void emptyDbAsync(redisDb *db);
|
||||||
void slotToKeyFlushAsync(void);
|
void slotToKeyFlush(int async);
|
||||||
size_t lazyfreeGetPendingObjectsCount(void);
|
size_t lazyfreeGetPendingObjectsCount(void);
|
||||||
void freeObjAsync(robj *o);
|
void freeObjAsync(robj *obj);
|
||||||
|
void freeSlotsToKeysMapAsync(rax *rt);
|
||||||
|
void freeSlotsToKeysMap(rax *rt, int async);
|
||||||
|
|
||||||
|
|
||||||
/* API to get key arguments from commands */
|
/* API to get key arguments from commands */
|
||||||
int *getKeysPrepareResult(getKeysResult *result, int numkeys);
|
int *getKeysPrepareResult(getKeysResult *result, int numkeys);
|
||||||
|
77
tests/cluster/tests/17-diskless-load-swapdb.tcl
Normal file
77
tests/cluster/tests/17-diskless-load-swapdb.tcl
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
# Check replica can restore database buckup correctly if fail to diskless load.
|
||||||
|
|
||||||
|
source "../tests/includes/init-tests.tcl"
|
||||||
|
|
||||||
|
test "Create a primary with a replica" {
|
||||||
|
create_cluster 1 1
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Cluster should start ok" {
|
||||||
|
assert_cluster_state ok
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Cluster is writable" {
|
||||||
|
cluster_write_test 0
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Right to restore backups when fail to diskless load " {
|
||||||
|
set master [Rn 0]
|
||||||
|
set replica [Rn 1]
|
||||||
|
set master_id 0
|
||||||
|
set replica_id 1
|
||||||
|
|
||||||
|
$replica READONLY
|
||||||
|
$replica config set repl-diskless-load swapdb
|
||||||
|
$replica config rewrite
|
||||||
|
$master config set repl-backlog-size 1024
|
||||||
|
$master config set repl-diskless-sync yes
|
||||||
|
$master config set repl-diskless-sync-delay 0
|
||||||
|
$master config set rdb-key-save-delay 10000
|
||||||
|
$master config set rdbcompression no
|
||||||
|
$master config set appendonly no
|
||||||
|
$master config set save ""
|
||||||
|
|
||||||
|
# Write a key that belongs to slot 0
|
||||||
|
set slot0_key "06S"
|
||||||
|
$master set $slot0_key 1
|
||||||
|
after 100
|
||||||
|
assert_equal {1} [$replica get $slot0_key]
|
||||||
|
assert_equal $slot0_key [$replica CLUSTER GETKEYSINSLOT 0 1]
|
||||||
|
|
||||||
|
# Kill the replica
|
||||||
|
kill_instance redis $replica_id
|
||||||
|
|
||||||
|
# Delete the key from master
|
||||||
|
$master del $slot0_key
|
||||||
|
|
||||||
|
# Replica must full sync with master when start because replication
|
||||||
|
# backlog size is very small, and dumping rdb will cost several seconds.
|
||||||
|
set num 10000
|
||||||
|
set value [string repeat A 1024]
|
||||||
|
set rd [redis_deferring_client redis $master_id]
|
||||||
|
for {set j 0} {$j < $num} {incr j} {
|
||||||
|
$rd set $j $value
|
||||||
|
}
|
||||||
|
for {set j 0} {$j < $num} {incr j} {
|
||||||
|
$rd read
|
||||||
|
}
|
||||||
|
|
||||||
|
# Start the replica again
|
||||||
|
restart_instance redis $replica_id
|
||||||
|
$replica READONLY
|
||||||
|
|
||||||
|
# Start full sync
|
||||||
|
wait_for_condition 500 10 {
|
||||||
|
[string match "*sync*" [$replica role]]
|
||||||
|
} else {
|
||||||
|
fail "Fail to full sync"
|
||||||
|
}
|
||||||
|
after 100
|
||||||
|
|
||||||
|
# Kill master, abort full sync
|
||||||
|
kill_instance redis $master_id
|
||||||
|
|
||||||
|
# Replica keys and keys to slots map still both are right
|
||||||
|
assert_equal {1} [$replica get $slot0_key]
|
||||||
|
assert_equal $slot0_key [$replica CLUSTER GETKEYSINSLOT 0 1]
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user