From f5d6057cbb645b7dda2fb4e00dcbc06085444d61 Mon Sep 17 00:00:00 2001 From: Wang Yuan Date: Wed, 2 Dec 2020 19:56:11 +0800 Subject: [PATCH] Backup keys to slots map and restore when fail to sync if diskless-load type is swapdb in cluster mode (#8108) When replica diskless-load type is swapdb in cluster mode, we didn't backup keys to slots map, so we will lose keys to slots map if fail to sync. Now we backup keys to slots map at first, and restore it properly when fail. This commit includes a refactory/cleanup of the backups mechanism (moving it to db.c and re-structuring it a bit). Co-authored-by: Oran Agra (cherry picked from commit 10712afaf3e7f2ea859622fa5b27c96ee8f478c5) --- src/bio.c | 2 +- src/db.c | 218 +++++++++++++----- src/lazyfree.c | 20 +- src/replication.c | 69 ++---- src/server.h | 20 +- .../cluster/tests/17-diskless-load-swapdb.tcl | 77 +++++++ 6 files changed, 280 insertions(+), 126 deletions(-) create mode 100644 tests/cluster/tests/17-diskless-load-swapdb.tcl diff --git a/src/bio.c b/src/bio.c index ff1108799..5e5f6afd9 100644 --- a/src/bio.c +++ b/src/bio.c @@ -203,7 +203,7 @@ void *bioProcessBackgroundJobs(void *arg) { /* What we free changes depending on what arguments are set: * arg1 -> free the object at pointer. * arg2 & arg3 -> free two dictionaries (a Redis DB). - * only arg3 -> free the skiplist. */ + * only arg3 -> free the radix tree. */ if (job->arg1) lazyfreeFreeObjectFromBioThread(job->arg1); else if (job->arg2 && job->arg3) diff --git a/src/db.c b/src/db.c index 09ba38348..e3186dbd2 100644 --- a/src/db.c +++ b/src/db.c @@ -34,6 +34,13 @@ #include #include +/* Database backup. */ +struct dbBackup { + redisDb *dbarray; + rax *slots_to_keys; + uint64_t slots_keys_count[CLUSTER_SLOTS]; +}; + /*----------------------------------------------------------------------------- * C-level DB API *----------------------------------------------------------------------------*/ @@ -359,48 +366,18 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { return o; } -/* Remove all keys from all the databases in a Redis server. - * If callback is given the function is called from time to time to - * signal that work is in progress. +/* Remove all keys from the database(s) structure. The dbarray argument + * may not be the server main DBs (could be a backup). * - * The dbnum can be -1 if all the DBs should be flushed, or the specified - * DB number if we want to flush only a single Redis database number. - * - * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or - * 1. EMPTYDB_ASYNC if we want the memory to be freed in a different thread. - * 2. EMPTYDB_BACKUP if we want to empty the backup dictionaries created by - * disklessLoadMakeBackups. In that case we only free memory and avoid - * firing module events. - * and the function to return ASAP. - * - * On success the function returns the number of keys removed from the - * database(s). Otherwise -1 is returned in the specific case the - * DB number is out of range, and errno is set to EINVAL. */ -long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) { - int async = (flags & EMPTYDB_ASYNC); - int backup = (flags & EMPTYDB_BACKUP); /* Just free the memory, nothing else */ - RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum}; + * The dbnum can be -1 if all the DBs should be emptied, or the specified + * DB index if we want to empty only a single database. + * The function returns the number of keys removed from the database(s). */ +long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, + void(callback)(void*)) +{ long long removed = 0; - - if (dbnum < -1 || dbnum >= server.dbnum) { - errno = EINVAL; - return -1; - } - - /* Pre-flush actions */ - if (!backup) { - /* Fire the flushdb modules event. */ - moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, - REDISMODULE_SUBEVENT_FLUSHDB_START, - &fi); - - /* Make sure the WATCHed keys are affected by the FLUSH* commands. - * Note that we need to call the function while the keys are still - * there. */ - signalFlushedDb(dbnum); - } - int startdb, enddb; + if (dbnum == -1) { startdb = 0; enddb = server.dbnum-1; @@ -416,34 +393,139 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)( dictEmpty(dbarray[j].dict,callback); dictEmpty(dbarray[j].expires,callback); } - /* Because we will start a new database, reset average ttl. */ + /* Because all keys of database are removed, reset average ttl. */ dbarray[j].avg_ttl = 0; dbarray[j].expires_cursor = 0; } - /* Post-flush actions */ - if (!backup) { - if (server.cluster_enabled) { - if (async) { - slotToKeyFlushAsync(); - } else { - slotToKeyFlush(); - } - } - if (dbnum == -1) flushSlaveKeysWithExpireList(); - - /* Also fire the end event. Note that this event will fire almost - * immediately after the start event if the flush is asynchronous. */ - moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, - REDISMODULE_SUBEVENT_FLUSHDB_END, - &fi); - } - return removed; } +/* Remove all keys from all the databases in a Redis server. + * If callback is given the function is called from time to time to + * signal that work is in progress. + * + * The dbnum can be -1 if all the DBs should be flushed, or the specified + * DB number if we want to flush only a single Redis database number. + * + * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or + * EMPTYDB_ASYNC if we want the memory to be freed in a different thread + * and the function to return ASAP. + * + * On success the function returns the number of keys removed from the + * database(s). Otherwise -1 is returned in the specific case the + * DB number is out of range, and errno is set to EINVAL. */ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { - return emptyDbGeneric(server.db, dbnum, flags, callback); + int async = (flags & EMPTYDB_ASYNC); + RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum}; + long long removed = 0; + + if (dbnum < -1 || dbnum >= server.dbnum) { + errno = EINVAL; + return -1; + } + + /* Fire the flushdb modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, + REDISMODULE_SUBEVENT_FLUSHDB_START, + &fi); + + /* Make sure the WATCHed keys are affected by the FLUSH* commands. + * Note that we need to call the function while the keys are still + * there. */ + signalFlushedDb(dbnum); + + /* Empty redis database structure. */ + removed = emptyDbStructure(server.db, dbnum, async, callback); + + /* Flush slots to keys map if enable cluster, we can flush entire + * slots to keys map whatever dbnum because only support one DB + * in cluster mode. */ + if (server.cluster_enabled) slotToKeyFlush(async); + + if (dbnum == -1) flushSlaveKeysWithExpireList(); + + /* Also fire the end event. Note that this event will fire almost + * immediately after the start event if the flush is asynchronous. */ + moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, + REDISMODULE_SUBEVENT_FLUSHDB_END, + &fi); + + return removed; +} + +/* Store a backup of the database for later use, and put an empty one + * instead of it. */ +dbBackup *backupDb(void) { + dbBackup *backup = zmalloc(sizeof(dbBackup)); + + /* Backup main DBs. */ + backup->dbarray = zmalloc(sizeof(redisDb)*server.dbnum); + for (int i=0; idbarray[i] = server.db[i]; + server.db[i].dict = dictCreate(&dbDictType,NULL); + server.db[i].expires = dictCreate(&keyptrDictType,NULL); + } + + /* Backup cluster slots to keys map if enable cluster. */ + if (server.cluster_enabled) { + backup->slots_to_keys = server.cluster->slots_to_keys; + memcpy(backup->slots_keys_count, server.cluster->slots_keys_count, + sizeof(server.cluster->slots_keys_count)); + server.cluster->slots_to_keys = raxNew(); + memset(server.cluster->slots_keys_count, 0, + sizeof(server.cluster->slots_keys_count)); + } + + return backup; +} + +/* Discard a previously created backup, this can be slow (similar to FLUSHALL) + * Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */ +void discardDbBackup(dbBackup *buckup, int flags, void(callback)(void*)) { + int async = (flags & EMPTYDB_ASYNC); + + /* Release main DBs backup . */ + emptyDbStructure(buckup->dbarray, -1, async, callback); + for (int i=0; idbarray[i].dict); + dictRelease(buckup->dbarray[i].expires); + } + + /* Release slots to keys map backup if enable cluster. */ + if (server.cluster_enabled) freeSlotsToKeysMap(buckup->slots_to_keys, async); + + /* Release buckup. */ + zfree(buckup->dbarray); + zfree(buckup); +} + +/* Restore the previously created backup (discarding what currently resides + * in the db). + * This function should be called after the current contents of the database + * was emptied with a previous call to emptyDb (possibly using the async mode). */ +void restoreDbBackup(dbBackup *buckup) { + /* Restore main DBs. */ + for (int i=0; idbarray[i]; + } + + /* Restore slots to keys map backup if enable cluster. */ + if (server.cluster_enabled) { + serverAssert(server.cluster->slots_to_keys->numele == 0); + raxFree(server.cluster->slots_to_keys); + server.cluster->slots_to_keys = buckup->slots_to_keys; + memcpy(server.cluster->slots_keys_count, buckup->slots_keys_count, + sizeof(server.cluster->slots_keys_count)); + } + + /* Release buckup. */ + zfree(buckup->dbarray); + zfree(buckup); } int selectDb(client *c, int id) { @@ -1705,11 +1787,25 @@ void slotToKeyDel(sds key) { slotToKeyUpdateKey(key,0); } -void slotToKeyFlush(void) { - raxFree(server.cluster->slots_to_keys); +/* Release the radix tree mapping Redis Cluster keys to slots. If 'async' + * is true, we release it asynchronously. */ +void freeSlotsToKeysMap(rax *rt, int async) { + if (async) { + freeSlotsToKeysMapAsync(rt); + } else { + raxFree(rt); + } +} + +/* Empty the slots-keys map of Redis CLuster by creating a new empty one and + * freeing the old one. */ +void slotToKeyFlush(int async) { + rax *old = server.cluster->slots_to_keys; + server.cluster->slots_to_keys = raxNew(); memset(server.cluster->slots_keys_count,0, sizeof(server.cluster->slots_keys_count)); + freeSlotsToKeysMap(old, async); } /* Pupulate the specified array of objects with keys in the specified slot. diff --git a/src/lazyfree.c b/src/lazyfree.c index 821dc50df..31197df2e 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -136,16 +136,10 @@ void emptyDbAsync(redisDb *db) { bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2); } -/* Empty the slots-keys map of Redis CLuster by creating a new empty one - * and scheduling the old for lazy freeing. */ -void slotToKeyFlushAsync(void) { - rax *old = server.cluster->slots_to_keys; - - server.cluster->slots_to_keys = raxNew(); - memset(server.cluster->slots_keys_count,0, - sizeof(server.cluster->slots_keys_count)); - atomicIncr(lazyfree_objects,old->numele); - bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,old); +/* Release the radix tree mapping Redis Cluster keys to slots asynchronously. */ +void freeSlotsToKeysMapAsync(rax *rt) { + atomicIncr(lazyfree_objects,rt->numele); + bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,rt); } /* Release objects from the lazyfree thread. It's just decrRefCount() @@ -157,9 +151,7 @@ void lazyfreeFreeObjectFromBioThread(robj *o) { /* Release a database from the lazyfree thread. The 'db' pointer is the * database which was substituted with a fresh one in the main thread - * when the database was logically deleted. 'sl' is a skiplist used by - * Redis Cluster in order to take the hash slots -> keys mapping. This - * may be NULL if Redis Cluster is disabled. */ + * when the database was logically deleted. */ void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) { size_t numkeys = dictSize(ht1); dictRelease(ht1); @@ -167,7 +159,7 @@ void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) { atomicDecr(lazyfree_objects,numkeys); } -/* Release the skiplist mapping Redis Cluster keys to slots in the +/* Release the radix tree mapping Redis Cluster keys to slots in the * lazyfree thread. */ void lazyfreeFreeSlotsMapFromBioThread(rax *rt) { size_t len = rt->numele; diff --git a/src/replication.c b/src/replication.c index 5f1868e84..5ce9f0208 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1434,16 +1434,10 @@ static int useDisklessLoad() { } /* Helper function for readSyncBulkPayload() to make backups of the current - * DBs before socket-loading the new ones. The backups may be restored later - * or freed by disklessLoadRestoreBackups(). */ -redisDb *disklessLoadMakeBackups(void) { - redisDb *backups = zmalloc(sizeof(redisDb)*server.dbnum); - for (int i=0; i