diff --git a/src/bio.c b/src/bio.c index ff1108799..5e5f6afd9 100644 --- a/src/bio.c +++ b/src/bio.c @@ -203,7 +203,7 @@ void *bioProcessBackgroundJobs(void *arg) { /* What we free changes depending on what arguments are set: * arg1 -> free the object at pointer. * arg2 & arg3 -> free two dictionaries (a Redis DB). - * only arg3 -> free the skiplist. */ + * only arg3 -> free the radix tree. */ if (job->arg1) lazyfreeFreeObjectFromBioThread(job->arg1); else if (job->arg2 && job->arg3) diff --git a/src/db.c b/src/db.c index 09ba38348..e3186dbd2 100644 --- a/src/db.c +++ b/src/db.c @@ -34,6 +34,13 @@ #include #include +/* Database backup. */ +struct dbBackup { + redisDb *dbarray; + rax *slots_to_keys; + uint64_t slots_keys_count[CLUSTER_SLOTS]; +}; + /*----------------------------------------------------------------------------- * C-level DB API *----------------------------------------------------------------------------*/ @@ -359,48 +366,18 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { return o; } -/* Remove all keys from all the databases in a Redis server. - * If callback is given the function is called from time to time to - * signal that work is in progress. +/* Remove all keys from the database(s) structure. The dbarray argument + * may not be the server main DBs (could be a backup). * - * The dbnum can be -1 if all the DBs should be flushed, or the specified - * DB number if we want to flush only a single Redis database number. - * - * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or - * 1. EMPTYDB_ASYNC if we want the memory to be freed in a different thread. - * 2. EMPTYDB_BACKUP if we want to empty the backup dictionaries created by - * disklessLoadMakeBackups. In that case we only free memory and avoid - * firing module events. - * and the function to return ASAP. - * - * On success the function returns the number of keys removed from the - * database(s). Otherwise -1 is returned in the specific case the - * DB number is out of range, and errno is set to EINVAL. */ -long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) { - int async = (flags & EMPTYDB_ASYNC); - int backup = (flags & EMPTYDB_BACKUP); /* Just free the memory, nothing else */ - RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum}; + * The dbnum can be -1 if all the DBs should be emptied, or the specified + * DB index if we want to empty only a single database. + * The function returns the number of keys removed from the database(s). */ +long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, + void(callback)(void*)) +{ long long removed = 0; - - if (dbnum < -1 || dbnum >= server.dbnum) { - errno = EINVAL; - return -1; - } - - /* Pre-flush actions */ - if (!backup) { - /* Fire the flushdb modules event. */ - moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, - REDISMODULE_SUBEVENT_FLUSHDB_START, - &fi); - - /* Make sure the WATCHed keys are affected by the FLUSH* commands. - * Note that we need to call the function while the keys are still - * there. */ - signalFlushedDb(dbnum); - } - int startdb, enddb; + if (dbnum == -1) { startdb = 0; enddb = server.dbnum-1; @@ -416,34 +393,139 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)( dictEmpty(dbarray[j].dict,callback); dictEmpty(dbarray[j].expires,callback); } - /* Because we will start a new database, reset average ttl. */ + /* Because all keys of database are removed, reset average ttl. */ dbarray[j].avg_ttl = 0; dbarray[j].expires_cursor = 0; } - /* Post-flush actions */ - if (!backup) { - if (server.cluster_enabled) { - if (async) { - slotToKeyFlushAsync(); - } else { - slotToKeyFlush(); - } - } - if (dbnum == -1) flushSlaveKeysWithExpireList(); - - /* Also fire the end event. Note that this event will fire almost - * immediately after the start event if the flush is asynchronous. */ - moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, - REDISMODULE_SUBEVENT_FLUSHDB_END, - &fi); - } - return removed; } +/* Remove all keys from all the databases in a Redis server. + * If callback is given the function is called from time to time to + * signal that work is in progress. + * + * The dbnum can be -1 if all the DBs should be flushed, or the specified + * DB number if we want to flush only a single Redis database number. + * + * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or + * EMPTYDB_ASYNC if we want the memory to be freed in a different thread + * and the function to return ASAP. + * + * On success the function returns the number of keys removed from the + * database(s). Otherwise -1 is returned in the specific case the + * DB number is out of range, and errno is set to EINVAL. */ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { - return emptyDbGeneric(server.db, dbnum, flags, callback); + int async = (flags & EMPTYDB_ASYNC); + RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum}; + long long removed = 0; + + if (dbnum < -1 || dbnum >= server.dbnum) { + errno = EINVAL; + return -1; + } + + /* Fire the flushdb modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, + REDISMODULE_SUBEVENT_FLUSHDB_START, + &fi); + + /* Make sure the WATCHed keys are affected by the FLUSH* commands. + * Note that we need to call the function while the keys are still + * there. */ + signalFlushedDb(dbnum); + + /* Empty redis database structure. */ + removed = emptyDbStructure(server.db, dbnum, async, callback); + + /* Flush slots to keys map if enable cluster, we can flush entire + * slots to keys map whatever dbnum because only support one DB + * in cluster mode. */ + if (server.cluster_enabled) slotToKeyFlush(async); + + if (dbnum == -1) flushSlaveKeysWithExpireList(); + + /* Also fire the end event. Note that this event will fire almost + * immediately after the start event if the flush is asynchronous. */ + moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, + REDISMODULE_SUBEVENT_FLUSHDB_END, + &fi); + + return removed; +} + +/* Store a backup of the database for later use, and put an empty one + * instead of it. */ +dbBackup *backupDb(void) { + dbBackup *backup = zmalloc(sizeof(dbBackup)); + + /* Backup main DBs. */ + backup->dbarray = zmalloc(sizeof(redisDb)*server.dbnum); + for (int i=0; idbarray[i] = server.db[i]; + server.db[i].dict = dictCreate(&dbDictType,NULL); + server.db[i].expires = dictCreate(&keyptrDictType,NULL); + } + + /* Backup cluster slots to keys map if enable cluster. */ + if (server.cluster_enabled) { + backup->slots_to_keys = server.cluster->slots_to_keys; + memcpy(backup->slots_keys_count, server.cluster->slots_keys_count, + sizeof(server.cluster->slots_keys_count)); + server.cluster->slots_to_keys = raxNew(); + memset(server.cluster->slots_keys_count, 0, + sizeof(server.cluster->slots_keys_count)); + } + + return backup; +} + +/* Discard a previously created backup, this can be slow (similar to FLUSHALL) + * Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */ +void discardDbBackup(dbBackup *buckup, int flags, void(callback)(void*)) { + int async = (flags & EMPTYDB_ASYNC); + + /* Release main DBs backup . */ + emptyDbStructure(buckup->dbarray, -1, async, callback); + for (int i=0; idbarray[i].dict); + dictRelease(buckup->dbarray[i].expires); + } + + /* Release slots to keys map backup if enable cluster. */ + if (server.cluster_enabled) freeSlotsToKeysMap(buckup->slots_to_keys, async); + + /* Release buckup. */ + zfree(buckup->dbarray); + zfree(buckup); +} + +/* Restore the previously created backup (discarding what currently resides + * in the db). + * This function should be called after the current contents of the database + * was emptied with a previous call to emptyDb (possibly using the async mode). */ +void restoreDbBackup(dbBackup *buckup) { + /* Restore main DBs. */ + for (int i=0; idbarray[i]; + } + + /* Restore slots to keys map backup if enable cluster. */ + if (server.cluster_enabled) { + serverAssert(server.cluster->slots_to_keys->numele == 0); + raxFree(server.cluster->slots_to_keys); + server.cluster->slots_to_keys = buckup->slots_to_keys; + memcpy(server.cluster->slots_keys_count, buckup->slots_keys_count, + sizeof(server.cluster->slots_keys_count)); + } + + /* Release buckup. */ + zfree(buckup->dbarray); + zfree(buckup); } int selectDb(client *c, int id) { @@ -1705,11 +1787,25 @@ void slotToKeyDel(sds key) { slotToKeyUpdateKey(key,0); } -void slotToKeyFlush(void) { - raxFree(server.cluster->slots_to_keys); +/* Release the radix tree mapping Redis Cluster keys to slots. If 'async' + * is true, we release it asynchronously. */ +void freeSlotsToKeysMap(rax *rt, int async) { + if (async) { + freeSlotsToKeysMapAsync(rt); + } else { + raxFree(rt); + } +} + +/* Empty the slots-keys map of Redis CLuster by creating a new empty one and + * freeing the old one. */ +void slotToKeyFlush(int async) { + rax *old = server.cluster->slots_to_keys; + server.cluster->slots_to_keys = raxNew(); memset(server.cluster->slots_keys_count,0, sizeof(server.cluster->slots_keys_count)); + freeSlotsToKeysMap(old, async); } /* Pupulate the specified array of objects with keys in the specified slot. diff --git a/src/lazyfree.c b/src/lazyfree.c index 821dc50df..31197df2e 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -136,16 +136,10 @@ void emptyDbAsync(redisDb *db) { bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2); } -/* Empty the slots-keys map of Redis CLuster by creating a new empty one - * and scheduling the old for lazy freeing. */ -void slotToKeyFlushAsync(void) { - rax *old = server.cluster->slots_to_keys; - - server.cluster->slots_to_keys = raxNew(); - memset(server.cluster->slots_keys_count,0, - sizeof(server.cluster->slots_keys_count)); - atomicIncr(lazyfree_objects,old->numele); - bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,old); +/* Release the radix tree mapping Redis Cluster keys to slots asynchronously. */ +void freeSlotsToKeysMapAsync(rax *rt) { + atomicIncr(lazyfree_objects,rt->numele); + bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,rt); } /* Release objects from the lazyfree thread. It's just decrRefCount() @@ -157,9 +151,7 @@ void lazyfreeFreeObjectFromBioThread(robj *o) { /* Release a database from the lazyfree thread. The 'db' pointer is the * database which was substituted with a fresh one in the main thread - * when the database was logically deleted. 'sl' is a skiplist used by - * Redis Cluster in order to take the hash slots -> keys mapping. This - * may be NULL if Redis Cluster is disabled. */ + * when the database was logically deleted. */ void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) { size_t numkeys = dictSize(ht1); dictRelease(ht1); @@ -167,7 +159,7 @@ void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) { atomicDecr(lazyfree_objects,numkeys); } -/* Release the skiplist mapping Redis Cluster keys to slots in the +/* Release the radix tree mapping Redis Cluster keys to slots in the * lazyfree thread. */ void lazyfreeFreeSlotsMapFromBioThread(rax *rt) { size_t len = rt->numele; diff --git a/src/replication.c b/src/replication.c index 5f1868e84..5ce9f0208 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1434,16 +1434,10 @@ static int useDisklessLoad() { } /* Helper function for readSyncBulkPayload() to make backups of the current - * DBs before socket-loading the new ones. The backups may be restored later - * or freed by disklessLoadRestoreBackups(). */ -redisDb *disklessLoadMakeBackups(void) { - redisDb *backups = zmalloc(sizeof(redisDb)*server.dbnum); - for (int i=0; i