diff --git a/redis.conf b/redis.conf index bcb5fa022..04e8bb117 100644 --- a/redis.conf +++ b/redis.conf @@ -605,9 +605,13 @@ repl-diskless-sync-delay 5 # # "disabled" - Don't use diskless load (store the rdb file to the disk first) # "on-empty-db" - Use diskless load only when it is completely safe. -# "swapdb" - Keep a copy of the current db contents in RAM while parsing -# the data directly from the socket. note that this requires -# sufficient memory, if you don't have it, you risk an OOM kill. +# "swapdb" - Keep current db contents in RAM while parsing the data directly +# from the socket. Replicas in this mode can keep serving current +# data set while replication is in progress, except for cases where +# they can't recognize master as having a data set from same +# replication history. +# Note that this requires sufficient memory, if you don't have it, +# you risk an OOM kill. repl-diskless-load disabled # Replicas send PINGs to server in a predefined interval. It's possible to diff --git a/src/aof.c b/src/aof.c index 4c334e228..b345c2175 100644 --- a/src/aof.c +++ b/src/aof.c @@ -748,7 +748,7 @@ int loadAppendOnlyFile(char *filename) { serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; rioInitWithFile(&rdb,fp); - if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) { + if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL,server.db) != C_OK) { serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); goto readerr; } else { diff --git a/src/cluster.c b/src/cluster.c index 2cd98cd02..35f3a76be 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -593,8 +593,8 @@ void clusterInit(void) { serverPanic("Unrecoverable error creating Redis Cluster socket accept handler."); } - /* Reset data for the Slot to key API. */ - slotToKeyFlush(); + /* Initialize data for the Slot to key API. */ + slotToKeyInit(server.db); /* Set myself->port/cport/pport to my listening ports, we'll just need to * discover the IP address via MEET messages. */ @@ -4954,7 +4954,7 @@ NULL unsigned int keys_in_slot = countKeysInSlot(slot); unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys; addReplyArrayLen(c,numkeys); - dictEntry *de = server.cluster->slots_to_keys[slot].head; + dictEntry *de = (*server.db->slots_to_keys).by_slot[slot].head; for (unsigned int j = 0; j < numkeys; j++) { serverAssert(de != NULL); sds sdskey = dictGetKey(de); @@ -6201,26 +6201,28 @@ int clusterRedirectBlockedClientIfNeeded(client *c) { * while rehashing the cluster and in other conditions when we need to * understand if we have keys for a given hash slot. */ -void slotToKeyAddEntry(dictEntry *entry) { +void slotToKeyAddEntry(dictEntry *entry, redisDb *db) { sds key = entry->key; unsigned int hashslot = keyHashSlot(key, sdslen(key)); - server.cluster->slots_to_keys[hashslot].count++; + slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot]; + slot_to_keys->count++; /* Insert entry before the first element in the list. */ - dictEntry *first = server.cluster->slots_to_keys[hashslot].head; + dictEntry *first = slot_to_keys->head; dictEntryNextInSlot(entry) = first; if (first != NULL) { serverAssert(dictEntryPrevInSlot(first) == NULL); dictEntryPrevInSlot(first) = entry; } serverAssert(dictEntryPrevInSlot(entry) == NULL); - server.cluster->slots_to_keys[hashslot].head = entry; + slot_to_keys->head = entry; } -void slotToKeyDelEntry(dictEntry *entry) { +void slotToKeyDelEntry(dictEntry *entry, redisDb *db) { sds key = entry->key; unsigned int hashslot = keyHashSlot(key, sdslen(key)); - server.cluster->slots_to_keys[hashslot].count--; + slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot]; + slot_to_keys->count--; /* Connect previous and next entries to each other. */ dictEntry *next = dictEntryNextInSlot(entry); @@ -6232,14 +6234,14 @@ void slotToKeyDelEntry(dictEntry *entry) { dictEntryNextInSlot(prev) = next; } else { /* The removed entry was the first in the list. */ - serverAssert(server.cluster->slots_to_keys[hashslot].head == entry); - server.cluster->slots_to_keys[hashslot].head = next; + serverAssert(slot_to_keys->head == entry); + slot_to_keys->head = next; } } /* Updates neighbour entries when an entry has been replaced (e.g. reallocated * during active defrag). */ -void slotToKeyReplaceEntry(dictEntry *entry) { +void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) { dictEntry *next = dictEntryNextInSlot(entry); dictEntry *prev = dictEntryPrevInSlot(entry); if (next != NULL) { @@ -6251,33 +6253,33 @@ void slotToKeyReplaceEntry(dictEntry *entry) { /* The replaced entry was the first in the list. */ sds key = entry->key; unsigned int hashslot = keyHashSlot(key, sdslen(key)); - server.cluster->slots_to_keys[hashslot].head = entry; + slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot]; + slot_to_keys->head = entry; } } -/* Copies the slots-keys map to the specified backup structure. */ -void slotToKeyCopyToBackup(clusterSlotsToKeysData *backup) { - memcpy(backup, server.cluster->slots_to_keys, - sizeof(server.cluster->slots_to_keys)); +/* Initialize slots-keys map of given db. */ +void slotToKeyInit(redisDb *db) { + db->slots_to_keys = zcalloc(sizeof(clusterSlotToKeyMapping)); } -/* Overwrites the slots-keys map by copying the provided backup structure. */ -void slotToKeyRestoreBackup(clusterSlotsToKeysData *backup) { - memcpy(server.cluster->slots_to_keys, backup, - sizeof(server.cluster->slots_to_keys)); +/* Empty slots-keys map of given db. */ +void slotToKeyFlush(redisDb *db) { + memset(db->slots_to_keys, 0, + sizeof(clusterSlotToKeyMapping)); } -/* Empty the slots-keys map of Redis Cluster. */ -void slotToKeyFlush(void) { - memset(&server.cluster->slots_to_keys, 0, - sizeof(server.cluster->slots_to_keys)); +/* Free slots-keys map of given db. */ +void slotToKeyDestroy(redisDb *db) { + zfree(db->slots_to_keys); + db->slots_to_keys = NULL; } /* Remove all the keys in the specified hash slot. * The number of removed items is returned. */ unsigned int delKeysInSlot(unsigned int hashslot) { unsigned int j = 0; - dictEntry *de = server.cluster->slots_to_keys[hashslot].head; + dictEntry *de = (*server.db->slots_to_keys).by_slot[hashslot].head; while (de != NULL) { sds sdskey = dictGetKey(de); de = dictEntryNextInSlot(de); @@ -6290,5 +6292,5 @@ unsigned int delKeysInSlot(unsigned int hashslot) { } unsigned int countKeysInSlot(unsigned int hashslot) { - return server.cluster->slots_to_keys[hashslot].count; + return (*server.db->slots_to_keys).by_slot[hashslot].count; } diff --git a/src/cluster.h b/src/cluster.h index f97814775..e6cdc1897 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -141,14 +141,17 @@ typedef struct clusterNode { list *fail_reports; /* List of nodes signaling this as failing */ } clusterNode; -/* State for the Slot to Key API, for a single slot. The keys in the same slot - * are linked together using dictEntry metadata. See also "Slot to Key API" in - * cluster.c. */ -struct clusterSlotToKeys { +/* Slot to keys for a single slot. The keys in the same slot are linked together + * using dictEntry metadata. */ +typedef struct slotToKeys { uint64_t count; /* Number of keys in the slot. */ dictEntry *head; /* The first key-value entry in the slot. */ +} slotToKeys; + +/* Slot to keys mapping for all slots, opaque outside this file. */ +struct clusterSlotToKeyMapping { + slotToKeys by_slot[CLUSTER_SLOTS]; }; -typedef struct clusterSlotToKeys clusterSlotsToKeysData[CLUSTER_SLOTS]; /* Dict entry metadata for cluster mode, used for the Slot to Key API to form a * linked list of the entries belonging to the same slot. */ @@ -168,7 +171,6 @@ typedef struct clusterState { clusterNode *migrating_slots_to[CLUSTER_SLOTS]; clusterNode *importing_slots_from[CLUSTER_SLOTS]; clusterNode *slots[CLUSTER_SLOTS]; - clusterSlotsToKeysData slots_to_keys; /* The following fields are used to take the slave state on elections. */ mstime_t failover_auth_time; /* Time of previous or next election. */ int failover_auth_count; /* Number of votes received so far. */ @@ -315,11 +317,11 @@ unsigned long getClusterConnectionsCount(void); int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len); void clusterPropagatePublish(robj *channel, robj *message); unsigned int keyHashSlot(char *key, int keylen); -void slotToKeyAddEntry(dictEntry *entry); -void slotToKeyDelEntry(dictEntry *entry); -void slotToKeyReplaceEntry(dictEntry *entry); -void slotToKeyCopyToBackup(clusterSlotsToKeysData *backup); -void slotToKeyRestoreBackup(clusterSlotsToKeysData *backup); -void slotToKeyFlush(void); +void slotToKeyAddEntry(dictEntry *entry, redisDb *db); +void slotToKeyDelEntry(dictEntry *entry, redisDb *db); +void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db); +void slotToKeyInit(redisDb *db); +void slotToKeyFlush(redisDb *db); +void slotToKeyDestroy(redisDb *db); #endif /* __CLUSTER_H */ diff --git a/src/db.c b/src/db.c index 8a90d3215..5d44a9ab9 100644 --- a/src/db.c +++ b/src/db.c @@ -35,12 +35,6 @@ #include #include -/* Database backup. */ -struct dbBackup { - redisDb *dbarray; - clusterSlotsToKeysData slots_to_keys; -}; - /*----------------------------------------------------------------------------- * C-level DB API *----------------------------------------------------------------------------*/ @@ -187,7 +181,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) { serverAssertWithInfo(NULL, key, de != NULL); dictSetVal(db->dict, de, val); signalKeyAsReady(db, key, val->type); - if (server.cluster_enabled) slotToKeyAddEntry(de); + if (server.cluster_enabled) slotToKeyAddEntry(de, db); } /* This is a special version of dbAdd() that is used only when loading @@ -205,7 +199,7 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) { dictEntry *de = dictAddRaw(db->dict, key, NULL); if (de == NULL) return 0; dictSetVal(db->dict, de, val); - if (server.cluster_enabled) slotToKeyAddEntry(de); + if (server.cluster_enabled) slotToKeyAddEntry(de, db); return 1; } @@ -321,7 +315,7 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) { freeObjAsync(key, val, db->id); dictSetVal(db->dict, de, NULL); } - if (server.cluster_enabled) slotToKeyDelEntry(de); + if (server.cluster_enabled) slotToKeyDelEntry(de, db); dictFreeUnlinkedEntry(db->dict,de); return 1; } else { @@ -385,7 +379,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { } /* Remove all keys from the database(s) structure. The dbarray argument - * may not be the server main DBs (could be a backup). + * may not be the server main DBs (could be a temporary DB). * * The dbnum can be -1 if all the DBs should be emptied, or the specified * DB index if we want to empty only a single database. @@ -459,7 +453,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(dict*)) { /* Flush slots to keys map if enable cluster, we can flush entire * slots to keys map whatever dbnum because only support one DB * in cluster mode. */ - if (server.cluster_enabled) slotToKeyFlush(); + if (server.cluster_enabled) slotToKeyFlush(server.db); if (dbnum == -1) flushSlaveKeysWithExpireList(); @@ -472,77 +466,40 @@ long long emptyDb(int dbnum, int flags, void(callback)(dict*)) { return removed; } -/* Store a backup of the database for later use, and put an empty one - * instead of it. */ -dbBackup *backupDb(void) { - dbBackup *backup = zmalloc(sizeof(dbBackup)); - - /* Backup main DBs. */ - backup->dbarray = zmalloc(sizeof(redisDb)*server.dbnum); +/* Initialize temporary db on replica for use during diskless replication. */ +redisDb *initTempDb(void) { + redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum); for (int i=0; idbarray[i] = server.db[i]; - server.db[i].dict = dictCreate(&dbDictType); - server.db[i].expires = dictCreate(&dbExpiresDictType); + tempDb[i].dict = dictCreate(&dbDictType); + tempDb[i].expires = dictCreate(&dbExpiresDictType); + tempDb[i].slots_to_keys = NULL; } - /* Backup cluster slots to keys map if enable cluster. */ if (server.cluster_enabled) { - slotToKeyCopyToBackup(&backup->slots_to_keys); - slotToKeyFlush(); + /* Prepare temp slot to key map to be written during async diskless replication. */ + slotToKeyInit(tempDb); } - moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP, - REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE, - NULL); - - return backup; + return tempDb; } -/* Discard a previously created backup, this can be slow (similar to FLUSHALL) - * Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */ -void discardDbBackup(dbBackup *backup, int flags, void(callback)(dict*)) { - int async = (flags & EMPTYDB_ASYNC); +/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */ +void discardTempDb(redisDb *tempDb, void(callback)(dict*)) { + int async = 1; - /* Release main DBs backup . */ - emptyDbStructure(backup->dbarray, -1, async, callback); + /* Release temp DBs. */ + emptyDbStructure(tempDb, -1, async, callback); for (int i=0; idbarray[i].dict); - dictRelease(backup->dbarray[i].expires); + dictRelease(tempDb[i].dict); + dictRelease(tempDb[i].expires); } - /* Release backup. */ - zfree(backup->dbarray); - zfree(backup); - - moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP, - REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD, - NULL); -} - -/* Restore the previously created backup (discarding what currently resides - * in the db). - * This function should be called after the current contents of the database - * was emptied with a previous call to emptyDb (possibly using the async mode). */ -void restoreDbBackup(dbBackup *backup) { - /* Restore main DBs. */ - for (int i=0; idbarray[i]; + if (server.cluster_enabled) { + /* Release temp slot to key map. */ + slotToKeyDestroy(tempDb); } - /* Restore slots to keys map backup if enable cluster. */ - if (server.cluster_enabled) slotToKeyRestoreBackup(&backup->slots_to_keys); - - /* Release backup. */ - zfree(backup->dbarray); - zfree(backup); - - moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP, - REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE, - NULL); + zfree(tempDb); } int selectDb(client *c, int id) { @@ -591,6 +548,10 @@ void signalFlushedDb(int dbid, int async) { } trackingInvalidateKeysOnFlush(async); + + /* Changes in this method may take place in swapMainDbWithTempDb as well, + * where we execute similar calls, but with subtle differences as it's + * not simply flushing db. */ } /*----------------------------------------------------------------------------- @@ -1358,6 +1319,54 @@ int dbSwapDatabases(int id1, int id2) { return C_OK; } +/* Logically, this discards (flushes) the old main database, and apply the newly loaded + * database (temp) as the main (active) database, the actual freeing of old database + * (which will now be placed in the temp one) is done later. */ +void swapMainDbWithTempDb(redisDb *tempDb) { + if (server.cluster_enabled) { + /* Swap slots_to_keys from tempdb just loaded with main db slots_to_keys. */ + clusterSlotToKeyMapping *aux = server.db->slots_to_keys; + server.db->slots_to_keys = tempDb->slots_to_keys; + tempDb->slots_to_keys = aux; + } + + for (int i=0; idict = newdb->dict; + activedb->expires = newdb->expires; + activedb->avg_ttl = newdb->avg_ttl; + activedb->expires_cursor = newdb->expires_cursor; + + newdb->dict = aux.dict; + newdb->expires = aux.expires; + newdb->avg_ttl = aux.avg_ttl; + newdb->expires_cursor = aux.expires_cursor; + + /* Now we need to handle clients blocked on lists: as an effect + * of swapping the two DBs, a client that was waiting for list + * X in a given DB, may now actually be unblocked if X happens + * to exist in the new version of the DB, after the swap. + * + * However normally we only do this check for efficiency reasons + * in dbAdd() when a list is created. So here we need to rescan + * the list of clients blocked on lists and signal lists as ready + * if needed. + * + * Also the swapdb should make transaction fail if there is any + * client watching keys. */ + scanDatabaseForReadyLists(activedb); + touchAllWatchedKeysInDb(activedb, newdb); + } + + trackingInvalidateKeysOnFlush(1); + flushSlaveKeysWithExpireList(); +} + /* SWAPDB db1 db2 */ void swapdbCommand(client *c) { int id1, id2; diff --git a/src/defrag.c b/src/defrag.c index ad5e35bb5..734174c3a 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -903,7 +903,7 @@ void defragDictBucketCallback(dict *d, dictEntry **bucketref) { *bucketref = newde; if (server.cluster_enabled && d == server.db[0].dict) { /* Cluster keyspace dict. Update slot-to-entries mapping. */ - slotToKeyReplaceEntry(newde); + slotToKeyReplaceEntry(newde, server.db); } } bucketref = &(*bucketref)->next; diff --git a/src/module.c b/src/module.c index 7fe5a39a5..16f67671c 100644 --- a/src/module.c +++ b/src/module.c @@ -1344,6 +1344,10 @@ int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) { * * REDISMODULE_OPTION_NO_IMPLICIT_SIGNAL_MODIFIED: * See RM_SignalModifiedKey(). + * + * REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD: + * Setting this flag indicates module awareness of diskless async replication (repl-diskless-load=swapdb) + * and that redis could be serving reads during replication instead of blocking with LOADING status. */ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) { ctx->module->options = options; @@ -2714,7 +2718,9 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { if (server.cluster_enabled) flags |= REDISMODULE_CTX_FLAGS_CLUSTER; - if (server.loading) + if (server.async_loading) + flags |= REDISMODULE_CTX_FLAGS_ASYNC_LOADING; + else if (server.loading) flags |= REDISMODULE_CTX_FLAGS_LOADING; /* Maxmemory and eviction policy */ @@ -5515,6 +5521,24 @@ int moduleAllDatatypesHandleErrors() { return 1; } +/* Returns 0 if module did not declare REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD, in which case + * diskless async loading should be avoided because module doesn't know there can be traffic during + * database full resynchronization. */ +int moduleAllModulesHandleReplAsyncLoad() { + dictIterator *di = dictGetIterator(modules); + dictEntry *de; + + while ((de = dictNext(di)) != NULL) { + struct RedisModule *module = dictGetVal(de); + if (!(module->options & REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD)) { + dictReleaseIterator(di); + return 0; + } + } + dictReleaseIterator(di); + return 1; +} + /* Returns true if any previous IO API failed. * for `Load*` APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with * RedisModule_SetModuleOptions first. */ @@ -9165,8 +9189,12 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * int32_t dbnum_second; // Swap Db second dbnum * * * RedisModuleEvent_ReplBackup + * + * WARNING: Replication Backup events are deprecated since Redis 7.0 and are never fired. + * See RedisModuleEvent_ReplAsyncLoad for understanding how Async Replication Loading events + * are now triggered when repl-diskless-load is set to swapdb. * - * Called when diskless-repl-load config is set to swapdb, + * Called when repl-diskless-load config is set to swapdb, * And redis needs to backup the the current database for the * possibility to be restored later. A module with global data and * maybe with aux_load and aux_save callbacks may need to use this @@ -9176,6 +9204,19 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * * `REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE` * * `REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE` * * `REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD` + * + * * RedisModuleEvent_ReplAsyncLoad + * + * Called when repl-diskless-load config is set to swapdb and a replication with a master of same + * data set history (matching replication ID) occurs. + * In which case redis serves current data set while loading new database in memory from socket. + * Modules must have declared they support this mechanism in order to activate it, through + * REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD flag. + * The following sub events are available: + * + * * `REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED` + * * `REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED` + * * `REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED` * * * RedisModuleEvent_ForkChild * @@ -9255,8 +9296,8 @@ int RM_IsSubEventSupported(RedisModuleEvent event, int64_t subevent) { return subevent < _REDISMODULE_SUBEVENT_LOADING_PROGRESS_NEXT; case REDISMODULE_EVENT_SWAPDB: return subevent < _REDISMODULE_SUBEVENT_SWAPDB_NEXT; - case REDISMODULE_EVENT_REPL_BACKUP: - return subevent < _REDISMODULE_SUBEVENT_REPL_BACKUP_NEXT; + case REDISMODULE_EVENT_REPL_ASYNC_LOAD: + return subevent < _REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_NEXT; case REDISMODULE_EVENT_FORK_CHILD: return subevent < _REDISMODULE_SUBEVENT_FORK_CHILD_NEXT; default: @@ -9794,6 +9835,8 @@ sds genModulesInfoStringRenderModuleOptions(struct RedisModule *module) { sds output = sdsnew("["); if (module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS) output = sdscat(output,"handle-io-errors|"); + if (module->options & REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD) + output = sdscat(output,"handle-repl-async-load|"); output = sdstrim(output,"|"); output = sdscat(output,"]"); return output; diff --git a/src/multi.c b/src/multi.c index e1a1df93f..1b708e859 100644 --- a/src/multi.c +++ b/src/multi.c @@ -244,7 +244,11 @@ void execCommand(client *c) { "This command is no longer allowed for the " "following reason: %s", reason); } else { - call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL); + if (c->id == CLIENT_ID_AOF) + call(c,CMD_CALL_NONE); + else + call(c,CMD_CALL_FULL); + serverAssert((c->flags & CLIENT_BLOCKED) == 0); } @@ -398,7 +402,7 @@ void touchWatchedKey(redisDb *db, robj *key) { /* Set CLIENT_DIRTY_CAS to all clients of DB when DB is dirty. * It may happen in the following situations: - * FLUSHDB, FLUSHALL, SWAPDB + * FLUSHDB, FLUSHALL, SWAPDB, end of successful diskless replication. * * replaced_with: for SWAPDB, the WATCH should be invalidated if * the key exists in either of them, and skipped only if it diff --git a/src/rdb.c b/src/rdb.c index a56a3133c..9ccf3d0f5 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -48,6 +48,10 @@ /* This macro is called when RDB read failed (possibly a short read) */ #define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__) +/* This macro tells if we are in the context of a RESTORE command, and not loading an RDB or AOF. */ +#define isRestoreContext() \ + (server.current_client == NULL || server.current_client->id == CLIENT_ID_AOF) ? 0 : 1 + char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); @@ -68,7 +72,7 @@ void rdbReportError(int corruption_error, int linenum, char *reason, ...) { vsnprintf(msg+len,sizeof(msg)-len,reason,ap); va_end(ap); - if (!server.loading) { + if (isRestoreContext()) { /* If we're in the context of a RESTORE command, just propagate the error. */ /* log in VERBOSE, and return (don't exit). */ serverLog(LL_VERBOSE, "%s", msg); @@ -381,7 +385,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) { if ((clen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; if ((c = ztrymalloc(clen)) == NULL) { - serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen); + serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen); goto err; } @@ -392,7 +396,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) { val = sdstrynewlen(SDS_NOINIT,len); } if (!val) { - serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len); + serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len); goto err; } @@ -525,7 +529,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { if (plain || sds) { void *buf = plain ? ztrymalloc(len) : sdstrynewlen(SDS_NOINIT,len); if (!buf) { - serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbGenericLoadStringObject failed allocating %llu bytes", len); + serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); return NULL; } if (lenptr) *lenptr = len; @@ -541,7 +545,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { robj *o = encode ? tryCreateStringObject(SDS_NOINIT,len) : tryCreateRawStringObject(SDS_NOINIT,len); if (!o) { - serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbGenericLoadStringObject failed allocating %llu bytes", len); + serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); return NULL; } if (len && rioRead(rdb,o->ptr,len) == 0) { @@ -2517,9 +2521,10 @@ emptykey: /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. */ -void startLoading(size_t size, int rdbflags) { +void startLoading(size_t size, int rdbflags, int async) { /* Load the DB */ server.loading = 1; + if (async == 1) server.async_loading = 1; server.loading_start_time = time(NULL); server.loading_loaded_bytes = 0; server.loading_total_bytes = size; @@ -2547,7 +2552,7 @@ void startLoadingFile(FILE *fp, char* filename, int rdbflags) { if (fstat(fileno(fp), &sb) == -1) sb.st_size = 0; rdbFileBeingLoaded = filename; - startLoading(sb.st_size, rdbflags); + startLoading(sb.st_size, rdbflags, 0); } /* Refresh the loading progress info */ @@ -2560,6 +2565,7 @@ void loadingProgress(off_t pos) { /* Loading finished */ void stopLoading(int success) { server.loading = 0; + server.async_loading = 0; blockingOperationEnds(); rdbFileBeingLoaded = NULL; @@ -2610,10 +2616,10 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ -int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { +int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *dbarray) { uint64_t dbid = 0; int type, rdbver; - redisDb *db = server.db+0; + redisDb *db = dbarray+0; char buf[1024]; int error; long long empty_keys_skipped = 0; @@ -2685,7 +2691,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { "databases. Exiting\n", server.dbnum); exit(1); } - db = server.db+dbid; + db = dbarray+dbid; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_RESIZEDB) { /* RESIZEDB: Hint about the size of the keys in the currently @@ -2962,7 +2968,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { if ((fp = fopen(filename,"r")) == NULL) return C_ERR; startLoadingFile(fp, filename,rdbflags); rioInitWithFile(&rdb,fp); - retval = rdbLoadRio(&rdb,rdbflags,rsi); + retval = rdbLoadRio(&rdb,rdbflags,rsi,server.db); fclose(fp); stopLoading(retval==C_OK); return retval; diff --git a/src/rdb.h b/src/rdb.h index 1188f1032..f150bcb0d 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -166,7 +166,7 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val); int rdbLoadBinaryDoubleValue(rio *rdb, double *val); int rdbSaveBinaryFloatValue(rio *rdb, float val); int rdbLoadBinaryFloatValue(rio *rdb, float *val); -int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi); +int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *db); int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); diff --git a/src/redismodule.h b/src/redismodule.h index 1bc86750e..317d51356 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -150,11 +150,13 @@ typedef struct RedisModuleStreamID { #define REDISMODULE_CTX_FLAGS_DENY_BLOCKING (1<<21) /* The current client uses RESP3 protocol */ #define REDISMODULE_CTX_FLAGS_RESP3 (1<<22) +/* Redis is currently async loading database for diskless replication. */ +#define REDISMODULE_CTX_FLAGS_ASYNC_LOADING (1<<23) /* Next context flag, must be updated when adding new flags above! This flag should not be used directly by the module. * Use RedisModule_GetContextFlagsAll instead. */ -#define _REDISMODULE_CTX_FLAGS_NEXT (1<<23) +#define _REDISMODULE_CTX_FLAGS_NEXT (1<<24) /* Keyspace changes notification classes. Every class is associated with a * character for configuration purposes. @@ -229,6 +231,10 @@ typedef uint64_t RedisModuleTimerID; /* Declare that the module can handle errors with RedisModule_SetModuleOptions. */ #define REDISMODULE_OPTIONS_HANDLE_IO_ERRORS (1<<0) + +/* Declare that the module can handle diskless async replication with RedisModule_SetModuleOptions. */ +#define REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD (1<<1) + /* When set, Redis will not call RedisModule_SignalModifiedKey(), implicitly in * RedisModule_CloseKey, and the module needs to do that when manually when keys * are modified from the user's sperspective, to invalidate WATCH. */ @@ -249,9 +255,10 @@ typedef uint64_t RedisModuleTimerID; #define REDISMODULE_EVENT_MODULE_CHANGE 9 #define REDISMODULE_EVENT_LOADING_PROGRESS 10 #define REDISMODULE_EVENT_SWAPDB 11 -#define REDISMODULE_EVENT_REPL_BACKUP 12 +#define REDISMODULE_EVENT_REPL_BACKUP 12 /* Deprecated since Redis 7.0, not used anymore. */ #define REDISMODULE_EVENT_FORK_CHILD 13 -#define _REDISMODULE_EVENT_NEXT 14 /* Next event flag, should be updated if a new event added. */ +#define REDISMODULE_EVENT_REPL_ASYNC_LOAD 14 +#define _REDISMODULE_EVENT_NEXT 15 /* Next event flag, should be updated if a new event added. */ typedef struct RedisModuleEvent { uint64_t id; /* REDISMODULE_EVENT_... defines. */ @@ -311,8 +318,14 @@ static const RedisModuleEvent REDISMODULE_EVENT_SWAPDB, 1 }, + /* Deprecated since Redis 7.0, not used anymore. */ + __attribute__ ((deprecated)) RedisModuleEvent_ReplBackup = { - REDISMODULE_EVENT_REPL_BACKUP, + REDISMODULE_EVENT_REPL_BACKUP, + 1 + }, + RedisModuleEvent_ReplAsyncLoad = { + REDISMODULE_EVENT_REPL_ASYNC_LOAD, 1 }, RedisModuleEvent_ForkChild = { @@ -363,11 +376,17 @@ static const RedisModuleEvent #define REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF 1 #define _REDISMODULE_SUBEVENT_LOADING_PROGRESS_NEXT 2 +/* Replication Backup events are deprecated since Redis 7.0 and are never fired. */ #define REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE 0 #define REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE 1 #define REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD 2 #define _REDISMODULE_SUBEVENT_REPL_BACKUP_NEXT 3 +#define REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED 0 +#define REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED 1 +#define REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED 2 +#define _REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_NEXT 3 + #define REDISMODULE_SUBEVENT_FORK_CHILD_BORN 0 #define REDISMODULE_SUBEVENT_FORK_CHILD_DIED 1 #define _REDISMODULE_SUBEVENT_FORK_CHILD_NEXT 2 diff --git a/src/replication.c b/src/replication.c index babf0a022..6c79d72d8 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1623,7 +1623,8 @@ void replicationSendNewlineToMaster(void) { } /* Callback used by emptyDb() while flushing away old data to load - * the new dataset received by the master. */ + * the new dataset received by the master and by discardTempDb() + * after loading succeeded or failed. */ void replicationEmptyDbCallback(dict *d) { UNUSED(d); if (server.repl_state == REPL_STATE_TRANSFER) @@ -1689,36 +1690,49 @@ static int useDisklessLoad() { /* compute boolean decision to use diskless load */ int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0); - /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */ - if (enabled && !moduleAllDatatypesHandleErrors()) { - serverLog(LL_WARNING, - "Skipping diskless-load because there are modules that don't handle read errors."); - enabled = 0; + + if (enabled) { + /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */ + if (!moduleAllDatatypesHandleErrors()) { + serverLog(LL_WARNING, + "Skipping diskless-load because there are modules that don't handle read errors."); + enabled = 0; + } + /* Check all modules handle async replication, otherwise it's not safe to use diskless load. */ + else if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB && !moduleAllModulesHandleReplAsyncLoad()) { + serverLog(LL_WARNING, + "Skipping diskless-load because there are modules that are not aware of async replication."); + enabled = 0; + } } return enabled; } -/* Helper function for readSyncBulkPayload() to make backups of the current - * databases before socket-loading the new ones. The backups may be restored - * by disklessLoadRestoreBackup or freed by disklessLoadDiscardBackup later. */ -dbBackup *disklessLoadMakeBackup(void) { - return backupDb(); +/* Helper function for readSyncBulkPayload() to initialize tempDb + * before socket-loading the new db from master. The tempDb may be populated + * by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */ +redisDb *disklessLoadInitTempDb(void) { + return initTempDb(); } -/* Helper function for readSyncBulkPayload(): when replica-side diskless - * database loading is used, Redis makes a backup of the existing databases - * before loading the new ones from the socket. - * - * If the socket loading went wrong, we want to restore the old backups - * into the server databases. */ -void disklessLoadRestoreBackup(dbBackup *backup) { - restoreDbBackup(backup); +/* Helper function for readSyncBulkPayload() to discard our tempDb + * when the loading succeeded or failed. */ +void disklessLoadDiscardTempDb(redisDb *tempDb) { + discardTempDb(tempDb, replicationEmptyDbCallback); } -/* Helper function for readSyncBulkPayload() to discard our old backups - * when the loading succeeded. */ -void disklessLoadDiscardBackup(dbBackup *backup, int flag) { - discardDbBackup(backup, flag, replicationEmptyDbCallback); +/* If we know we got an entirely different data set from our master + * we have no way to incrementally feed our replicas after that. + * We want our replicas to resync with us as well, if we have any sub-replicas. + * This is useful on readSyncBulkPayload in places where we just finished transferring db. */ +void replicationAttachToNewMaster() { + /* Replica starts to apply data from new master, we must discard the cached + * master structure. */ + serverAssert(server.master == NULL); + replicationDiscardCachedMaster(); + + disconnectSlaves(); /* Force our replicas to resync with us as well. */ + freeReplicationBacklog(); /* Don't allow our chained replicas to PSYNC. */ } /* Asynchronously read the SYNC payload we receive from a master */ @@ -1727,7 +1741,7 @@ void readSyncBulkPayload(connection *conn) { char buf[PROTO_IOBUF_LEN]; ssize_t nread, readlen, nwritten; int use_diskless_load = useDisklessLoad(); - dbBackup *diskless_load_backup = NULL; + redisDb *diskless_load_tempDb = NULL; int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; off_t left; @@ -1895,58 +1909,61 @@ void readSyncBulkPayload(connection *conn) { * * 2. Or when we are done reading from the socket to the RDB file, in * such case we want just to read the RDB file in memory. */ - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); /* We need to stop any AOF rewriting child before flushing and parsing * the RDB, otherwise we'll create a copy-on-write disaster. */ if (server.aof_state != AOF_OFF) stopAppendOnly(); - /* When diskless RDB loading is used by replicas, it may be configured - * in order to save the current DB instead of throwing it away, - * so that we can restore it in case of failed transfer. */ - if (use_diskless_load && - server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) - { - /* Create a backup of server.db[] and initialize to empty - * dictionaries. */ - diskless_load_backup = disklessLoadMakeBackup(); + if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* Initialize empty tempDb dictionaries. */ + diskless_load_tempDb = disklessLoadInitTempDb(); + + moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD, + REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, + NULL); + } else { + replicationAttachToNewMaster(); + + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); + emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); } - /* Replica starts to apply data from new master, we must discard the cached - * master structure. */ - serverAssert(server.master == NULL); - replicationDiscardCachedMaster(); - - /* We want our slaves to resync with us as well, if we have any sub-slaves. - * The master already transferred us an entirely different data set and we - * have no way to incrementally feed our slaves after that. */ - disconnectSlaves(); /* Force our slaves to resync with us as well. */ - freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ - - /* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB - * (Where disklessLoadMakeBackup left server.db empty) because we - * want to execute all the auxiliary logic of emptyDb (Namely, - * fire module events) */ - emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); - /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to * time for non blocking loading. */ connSetReadHandler(conn, NULL); + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; if (use_diskless_load) { rio rdb; + redisDb *dbarray; + int asyncLoading = 0; + + if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* Async loading means we continue serving read commands during full resync, and + * "swap" the new db with the old db only when loading is done. + * It is enabled only on SWAPDB diskless replication when master replication ID hasn't changed, + * because in that state the old content of the db represents a different point in time of the same + * data set we're currently receiving from the master. */ + if (memcmp(server.replid, server.master_replid, CONFIG_RUN_ID_SIZE) == 0) { + asyncLoading = 1; + } + dbarray = diskless_load_tempDb; + } else { + dbarray = server.db; + } + rioInitWithConn(&rdb,conn,server.repl_transfer_size); /* Put the socket in blocking mode to simplify RDB transfer. * We'll restore it when the RDB is received. */ connBlock(conn); connRecvTimeout(conn, server.repl_timeout*1000); - startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION); + startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); - if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) { + if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi,dbarray) != C_OK) { /* RDB loading failed. */ serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization DB " @@ -1955,13 +1972,17 @@ void readSyncBulkPayload(connection *conn) { cancelReplicationHandshake(1); rioFreeConn(&rdb, NULL); - /* Remove the half-loaded data in case we started with - * an empty replica. */ - emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); - if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - /* Restore the backed up databases. */ - disklessLoadRestoreBackup(diskless_load_backup); + /* Discard potentially partially loaded tempDb. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD, + REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED, + NULL); + + disklessLoadDiscardTempDb(diskless_load_tempDb); + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding temporary DB in background"); + } else { + /* Remove the half-loaded data in case we started with an empty replica. */ + emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); } /* Note that there's no point in restarting the AOF on SYNC @@ -1972,12 +1993,26 @@ void readSyncBulkPayload(connection *conn) { /* RDB loading succeeded if we reach this point. */ if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - /* Delete the backup databases we created before starting to load - * the new RDB. Now the RDB was loaded with success so the old - * data is useless. */ - disklessLoadDiscardBackup(diskless_load_backup, empty_db_flags); + /* We will soon swap main db with tempDb and replicas will start + * to apply data from new master, we must discard the cached + * master structure and force resync of sub-replicas. */ + replicationAttachToNewMaster(); + + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Swapping active DB with loaded DB"); + swapMainDbWithTempDb(diskless_load_tempDb); + + moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD, + REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED, + NULL); + + /* Delete the old db as it's useless now. */ + disklessLoadDiscardTempDb(diskless_load_tempDb); + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding old DB in background"); } + /* Inform about db change, as replication was diskless and didn't cause a save. */ + server.dirty++; + /* Verify the end mark is correct. */ if (usemark) { if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) || diff --git a/src/scripting.c b/src/scripting.c index 85504bdb0..7880d1b0e 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -882,7 +882,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { "Write commands not allowed after non deterministic commands. Call redis.replicate_commands() at the start of your script in order to switch to single commands replication mode."); goto cleanup; } else if (server.masterhost && server.repl_slave_ro && - !server.loading && + server.lua_caller->id != CLIENT_ID_AOF && !(server.lua_caller->flags & CLIENT_MASTER)) { luaPushError(lua, shared.roslaveerr->ptr); @@ -905,11 +905,11 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { * could enlarge the memory usage are not allowed, but only if this is the * first write in the context of this script, otherwise we can't stop * in the middle. */ - if (server.maxmemory && /* Maxmemory is actually enabled. */ - !server.loading && /* Don't care about mem if loading. */ - !server.masterhost && /* Slave must execute the script. */ - server.lua_write_dirty == 0 && /* Script had no side effects so far. */ - server.lua_oom && /* Detected OOM when script start. */ + if (server.maxmemory && /* Maxmemory is actually enabled. */ + server.lua_caller->id != CLIENT_ID_AOF && /* Don't care about mem if loading from AOF. */ + !server.masterhost && /* Slave must execute the script. */ + server.lua_write_dirty == 0 && /* Script had no side effects so far. */ + server.lua_oom && /* Detected OOM when script start. */ (cmd->flags & CMD_DENYOOM)) { luaPushError(lua, shared.oomerr->ptr); @@ -922,7 +922,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { /* If this is a Redis Cluster node, we need to make sure Lua is not * trying to access non-local keys, with the exception of commands * received from our master or when loading the AOF back in memory. */ - if (server.cluster_enabled && !server.loading && + if (server.cluster_enabled && server.lua_caller->id != CLIENT_ID_AOF && !(server.lua_caller->flags & CLIENT_MASTER)) { int error_code; diff --git a/src/server.c b/src/server.c index 9740475ca..0773113ff 100644 --- a/src/server.c +++ b/src/server.c @@ -3680,6 +3680,7 @@ void initServerConfig(void) { server.skip_checksum_validation = 0; server.saveparams = NULL; server.loading = 0; + server.async_loading = 0; server.loading_rdb_used_mem = 0; server.aof_state = AOF_OFF; server.aof_rewrite_base_size = 0; @@ -4252,6 +4253,7 @@ void initServer(void) { server.db[j].id = j; server.db[j].avg_ttl = 0; server.db[j].defrag_later = listCreate(); + server.db[j].slots_to_keys = NULL; /* Set by clusterInit later on if necessary. */ listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree); } evictionPoolAlloc(); /* Initialize the LRU keys pool. */ @@ -5432,7 +5434,7 @@ int processCommand(client *c) { /* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */ - if (server.loading && is_denyloading_command) { + if (server.loading && !server.async_loading && is_denyloading_command) { rejectCommand(c, shared.loadingerr); return C_OK; } @@ -6420,6 +6422,7 @@ sds genRedisInfoString(const char *section) { info = sdscatprintf(info, "# Persistence\r\n" "loading:%d\r\n" + "async_loading:%d\r\n" "current_cow_peak:%zu\r\n" "current_cow_size:%zu\r\n" "current_cow_size_age:%lu\r\n" @@ -6445,7 +6448,8 @@ sds genRedisInfoString(const char *section) { "aof_last_cow_size:%zu\r\n" "module_fork_in_progress:%d\r\n" "module_fork_last_cow_size:%zu\r\n", - (int)server.loading, + (int)(server.loading && !server.async_loading), + (int)server.async_loading, server.stat_current_cow_peak, server.stat_current_cow_bytes, server.stat_current_cow_updated ? (unsigned long) elapsedMs(server.stat_current_cow_updated) / 1000 : 0, diff --git a/src/server.h b/src/server.h index f5b00e8b9..5eaa1521b 100644 --- a/src/server.h +++ b/src/server.h @@ -803,6 +803,9 @@ typedef struct replBufBlock { char buf[]; } replBufBlock; +/* Opaque type for the Slot to Key API. */ +typedef struct clusterSlotToKeyMapping clusterSlotToKeyMapping; + /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ @@ -816,13 +819,9 @@ typedef struct redisDb { long long avg_ttl; /* Average TTL, just for stats */ unsigned long expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ + clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */ } redisDb; -/* Declare database backup that include redis main DBs and slots to keys map. - * Definition is in db.c. We can't define it here since we define CLUSTER_SLOTS - * in cluster.h. */ -typedef struct dbBackup dbBackup; - /* Client MULTI/EXEC state */ typedef struct multiCmd { robj **argv; @@ -1394,6 +1393,7 @@ struct redisServer { /* RDB / AOF loading information */ volatile sig_atomic_t loading; /* We are loading data from disk if true */ + volatile sig_atomic_t async_loading; /* We are loading data without blocking the db being served */ off_t loading_total_bytes; off_t loading_rdb_used_mem; off_t loading_loaded_bytes; @@ -2035,6 +2035,7 @@ void ModuleForkDoneHandler(int exitcode, int bysignal); int TerminateModuleForkChild(int child_pid, int wait); ssize_t rdbSaveModulesAux(rio *rdb, int when); int moduleAllDatatypesHandleErrors(); +int moduleAllModulesHandleReplAsyncLoad(); sds modulesCollectInfo(sds info, const char *section, int for_crash_report, int sections); void moduleFireServerEvent(uint64_t eid, int subid, void *data); void processModuleLoadingProgressEvent(int is_aof); @@ -2338,7 +2339,7 @@ const char *getFailoverStateString(); /* Generic persistence functions */ void startLoadingFile(FILE* fp, char* filename, int rdbflags); -void startLoading(size_t size, int rdbflags); +void startLoading(size_t size, int rdbflags, int async); void loadingProgress(off_t pos); void stopLoading(int success); void startSaving(int rdbflags); @@ -2663,9 +2664,8 @@ long long emptyDb(int dbnum, int flags, void(callback)(dict*)); long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, void(callback)(dict*)); void flushAllDataAndResetRDB(int flags); long long dbTotalServerKeyCount(); -dbBackup *backupDb(void); -void restoreDbBackup(dbBackup *backup); -void discardDbBackup(dbBackup *backup, int flags, void(callback)(dict*)); +redisDb *initTempDb(void); +void discardTempDb(redisDb *tempDb, void(callback)(dict*)); int selectDb(client *c, int id); @@ -3054,6 +3054,7 @@ void debugDelay(int usec); void killIOThreads(void); void killThreads(void); void makeThreadKillable(void); +void swapMainDbWithTempDb(redisDb *tempDb); /* Use macro for checking log level to avoid evaluating arguments in cases log * should be ignored due to low level. */ diff --git a/tests/cluster/tests/17-diskless-load-swapdb.tcl b/tests/cluster/tests/17-diskless-load-swapdb.tcl index b9bf01ecd..7a56ec783 100644 --- a/tests/cluster/tests/17-diskless-load-swapdb.tcl +++ b/tests/cluster/tests/17-diskless-load-swapdb.tcl @@ -1,4 +1,4 @@ -# Check replica can restore database backup correctly if fail to diskless load. +# Check that replica keys and keys to slots map are right after failing to diskless load using SWAPDB. source "../tests/includes/init-tests.tcl" @@ -14,7 +14,7 @@ test "Cluster is writable" { cluster_write_test 0 } -test "Right to restore backups when fail to diskless load " { +test "Main db not affected when fail to diskless load" { set master [Rn 0] set replica [Rn 1] set master_id 0 @@ -63,9 +63,9 @@ test "Right to restore backups when fail to diskless load " { restart_instance redis $replica_id $replica READONLY - # Start full sync, wait till after db is flushed (backed up) + # Start full sync, wait till after db started loading in background wait_for_condition 500 10 { - [s $replica_id loading] eq 1 + [s $replica_id async_loading] eq 1 } else { fail "Fail to full sync" } @@ -75,7 +75,7 @@ test "Right to restore backups when fail to diskless load " { # Start full sync, wait till the replica detects the disconnection wait_for_condition 500 10 { - [s $replica_id loading] eq 0 + [s $replica_id async_loading] eq 0 } else { fail "Fail to full sync" } diff --git a/tests/cluster/tests/includes/init-tests.tcl b/tests/cluster/tests/includes/init-tests.tcl index caedc13e2..b787962a8 100644 --- a/tests/cluster/tests/includes/init-tests.tcl +++ b/tests/cluster/tests/includes/init-tests.tcl @@ -41,6 +41,7 @@ test "Cluster nodes hard reset" { R $id config set cluster-slave-validity-factor 10 R $id config set loading-process-events-interval-bytes 2097152 R $id config set key-load-delay 0 + R $id config set repl-diskless-load disabled R $id config rewrite } } diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 13206d634..06f079d34 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -383,72 +383,226 @@ start_server {tags {"repl external:skip"}} { } } -test {slave fails full sync and diskless load swapdb recovers it} { - start_server {tags {"repl"}} { - set slave [srv 0 client] - set slave_host [srv 0 host] - set slave_port [srv 0 port] - set slave_log [srv 0 stdout] +# Diskless load swapdb when NOT async_loading (different master replid) +foreach testType {Successful Aborted} { + start_server {tags {"repl external:skip"}} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] start_server {} { set master [srv 0 client] set master_host [srv 0 host] set master_port [srv 0 port] - # Put different data sets on the master and slave - # we need to put large keys on the master since the slave replies to info only once in 2mb - $slave debug populate 2000 slave 10 - $master debug populate 800 master 100000 - $master config set rdbcompression no - - # Set master and slave to use diskless replication + # Set master and replica to use diskless replication on swapdb mode $master config set repl-diskless-sync yes $master config set repl-diskless-sync-delay 0 - $slave config set repl-diskless-load swapdb + $master config set save "" + $replica config set repl-diskless-load swapdb + $replica config set save "" - # Set master with a slow rdb generation, so that we can easily disconnect it mid sync - # 10ms per key, with 800 keys is 8 seconds - $master config set rdb-key-save-delay 10000 + # Put different data sets on the master and replica + # We need to put large keys on the master since the replica replies to info only once in 2mb + $replica debug populate 200 slave 10 + $master debug populate 1000 master 100000 + $master config set rdbcompression no - # Start the replication process... - $slave slaveof $master_host $master_port + # Set a key value on replica to check status on failure and after swapping db + $replica set mykey myvalue - # wait for the slave to start reading the rdb - wait_for_condition 100 100 { - [s -1 loading] eq 1 - } else { - fail "Replica didn't get into loading mode" + switch $testType { + "Aborted" { + # Set master with a slow rdb generation, so that we can easily intercept loading + # 10ms per key, with 1000 keys is 10 seconds + $master config set rdb-key-save-delay 10000 + + # Start the replication process + $replica replicaof $master_host $master_port + + test {Diskless load swapdb (different replid): replica enter loading} { + # Wait for the replica to start reading the rdb + wait_for_condition 100 100 { + [s -1 loading] eq 1 + } else { + fail "Replica didn't get into loading mode" + } + + assert_equal [s -1 async_loading] 0 + } + + # Make sure that next sync will not start immediately so that we can catch the replica in between syncs + $master config set repl-diskless-sync-delay 5 + + # Kill the replica connection on the master + set killed [$master client kill type replica] + + # Wait for loading to stop (fail) + wait_for_condition 100 100 { + [s -1 loading] eq 0 + } else { + fail "Replica didn't disconnect" + } + + test {Diskless load swapdb (different replid): old database is exposed after replication fails} { + # Ensure we see old values from replica + assert_equal [$replica get mykey] "myvalue" + + # Make sure amount of replica keys didn't change + assert_equal [$replica dbsize] 201 + } + + # Speed up shutdown + $master config set rdb-key-save-delay 0 + } + "Successful" { + # Start the replication process + $replica replicaof $master_host $master_port + + # Let replica finish sync with master + wait_for_condition 100 100 { + [s -1 master_link_status] eq "up" + } else { + fail "Master <-> Replica didn't finish sync" + } + + test {Diskless load swapdb (different replid): new database is exposed after swapping} { + # Ensure we don't see anymore the key that was stored only to replica and also that we don't get LOADING status + assert_equal [$replica GET mykey] "" + + # Make sure amount of keys matches master + assert_equal [$replica dbsize] 1000 + } + } } - - # make sure that next sync will not start immediately so that we can catch the slave in between syncs - $master config set repl-diskless-sync-delay 5 - # for faster server shutdown, make rdb saving fast again (the fork is already uses the slow one) - $master config set rdb-key-save-delay 0 - - # waiting slave to do flushdb (key count drop) - wait_for_condition 50 100 { - 2000 != [scan [regexp -inline {keys\=([\d]*)} [$slave info keyspace]] keys=%d] - } else { - fail "Replica didn't flush" - } - - # make sure we're still loading - assert_equal [s -1 loading] 1 - - # kill the slave connection on the master - set killed [$master client kill type slave] - - # wait for loading to stop (fail) - wait_for_condition 50 100 { - [s -1 loading] eq 0 - } else { - fail "Replica didn't disconnect" - } - - # make sure the original keys were restored - assert_equal [$slave dbsize] 2000 } } -} {} {external:skip} +} + +# Diskless load swapdb when async_loading (matching master replid) +foreach testType {Successful Aborted} { + start_server {tags {"repl external:skip"}} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + # Set master and replica to use diskless replication on swapdb mode + $master config set repl-diskless-sync yes + $master config set repl-diskless-sync-delay 0 + $master config set save "" + $replica config set repl-diskless-load swapdb + $replica config set save "" + + # Set replica writable so we can check that a key we manually added is served + # during replication and after failure, but disappears on success + $replica config set replica-read-only no + + # Initial sync to have matching replids between master and replica + $replica replicaof $master_host $master_port + + # Let replica finish initial sync with master + wait_for_condition 100 100 { + [s -1 master_link_status] eq "up" + } else { + fail "Master <-> Replica didn't finish sync" + } + + # Put different data sets on the master and replica + # We need to put large keys on the master since the replica replies to info only once in 2mb + $replica debug populate 2000 slave 10 + $master debug populate 1000 master 100000 + $master config set rdbcompression no + + # Set a key value on replica to check status during loading, on failure and after swapping db + $replica set mykey myvalue + + # Force the replica to try another full sync (this time it will have matching master replid) + $master multi + $master client kill type replica + # Fill replication backlog with new content + $master config set repl-backlog-size 16384 + for {set keyid 0} {$keyid < 10} {incr keyid} { + $master set "$keyid string_$keyid" [string repeat A 16384] + } + $master exec + + switch $testType { + "Aborted" { + # Set master with a slow rdb generation, so that we can easily intercept loading + # 10ms per key, with 1000 keys is 10 seconds + $master config set rdb-key-save-delay 10000 + + test {Diskless load swapdb (async_loading): replica enter async_loading} { + # Wait for the replica to start reading the rdb + wait_for_condition 100 100 { + [s -1 async_loading] eq 1 + } else { + fail "Replica didn't get into async_loading mode" + } + + assert_equal [s -1 loading] 0 + } + + test {Diskless load swapdb (async_loading): old database is exposed while async replication is in progress} { + # Ensure we still see old values while async_loading is in progress and also not LOADING status + assert_equal [$replica get mykey] "myvalue" + + # Make sure we're still async_loading to validate previous assertion + assert_equal [s -1 async_loading] 1 + + # Make sure amount of replica keys didn't change + assert_equal [$replica dbsize] 2001 + } + + # Make sure that next sync will not start immediately so that we can catch the replica in between syncs + $master config set repl-diskless-sync-delay 5 + + # Kill the replica connection on the master + set killed [$master client kill type replica] + + # Wait for loading to stop (fail) + wait_for_condition 100 100 { + [s -1 async_loading] eq 0 + } else { + fail "Replica didn't disconnect" + } + + test {Diskless load swapdb (async_loading): old database is exposed after async replication fails} { + # Ensure we see old values from replica + assert_equal [$replica get mykey] "myvalue" + + # Make sure amount of replica keys didn't change + assert_equal [$replica dbsize] 2001 + } + + # Speed up shutdown + $master config set rdb-key-save-delay 0 + } + "Successful" { + # Let replica finish sync with master + wait_for_condition 100 100 { + [s -1 master_link_status] eq "up" + } else { + fail "Master <-> Replica didn't finish sync" + } + + test {Diskless load swapdb (async_loading): new database is exposed after swapping} { + # Ensure we don't see anymore the key that was stored only to replica and also that we don't get LOADING status + assert_equal [$replica GET mykey] "" + + # Make sure amount of keys matches master + assert_equal [$replica dbsize] 1010 + } + } + } + } + } +} test {diskless loading short read} { start_server {tags {"repl"}} { diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c index 5d9382a3a..6786fdf2e 100644 --- a/tests/modules/testrdb.c +++ b/tests/modules/testrdb.c @@ -13,39 +13,47 @@ RedisModuleType *testrdb_type = NULL; RedisModuleString *before_str = NULL; RedisModuleString *after_str = NULL; -void replBackupCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +/* Global values used to keep aux from db being loaded (in case of async_loading) */ +RedisModuleString *before_str_temp = NULL; +RedisModuleString *after_str_temp = NULL; + +/* Indicates whether there is an async replication in progress. + * We control this value from RedisModuleEvent_ReplAsyncLoad events. */ +int async_loading = 0; + +void replAsyncLoadCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { REDISMODULE_NOT_USED(e); REDISMODULE_NOT_USED(data); - static RedisModuleString *before_str_backup = NULL; - static RedisModuleString *after_str_backup = NULL; switch (sub) { - case REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE: - assert(before_str_backup == NULL); - assert(after_str_backup == NULL); - before_str_backup = before_str; - after_str_backup = after_str; - before_str = NULL; - after_str = NULL; + case REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED: + assert(async_loading == 0); + async_loading = 1; break; - case REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE: + case REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED: + /* Discard temp aux */ + if (before_str_temp) + RedisModule_FreeString(ctx, before_str_temp); + if (after_str_temp) + RedisModule_FreeString(ctx, after_str_temp); + before_str_temp = NULL; + after_str_temp = NULL; + + async_loading = 0; + break; + case REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED: if (before_str) RedisModule_FreeString(ctx, before_str); if (after_str) RedisModule_FreeString(ctx, after_str); - before_str = before_str_backup; - after_str = after_str_backup; - before_str_backup = NULL; - after_str_backup = NULL; - break; - case REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD: - if (before_str_backup) - RedisModule_FreeString(ctx, before_str_backup); - if (after_str_backup) - RedisModule_FreeString(ctx, after_str_backup); - before_str_backup = NULL; - after_str_backup = NULL; + before_str = before_str_temp; + after_str = after_str_temp; + + before_str_temp = NULL; + after_str_temp = NULL; + + async_loading = 0; break; default: assert(0); @@ -105,24 +113,47 @@ int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) { if (conf_aux_count==0) assert(0); RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb); if (when == REDISMODULE_AUX_BEFORE_RDB) { - if (before_str) - RedisModule_FreeString(ctx, before_str); - before_str = NULL; - int count = RedisModule_LoadSigned(rdb); - if (RedisModule_IsIOError(rdb)) - return REDISMODULE_ERR; - if (count) - before_str = RedisModule_LoadString(rdb); + if (async_loading == 0) { + if (before_str) + RedisModule_FreeString(ctx, before_str); + before_str = NULL; + int count = RedisModule_LoadSigned(rdb); + if (RedisModule_IsIOError(rdb)) + return REDISMODULE_ERR; + if (count) + before_str = RedisModule_LoadString(rdb); + } else { + if (before_str_temp) + RedisModule_FreeString(ctx, before_str_temp); + before_str_temp = NULL; + int count = RedisModule_LoadSigned(rdb); + if (RedisModule_IsIOError(rdb)) + return REDISMODULE_ERR; + if (count) + before_str_temp = RedisModule_LoadString(rdb); + } } else { - if (after_str) - RedisModule_FreeString(ctx, after_str); - after_str = NULL; - int count = RedisModule_LoadSigned(rdb); - if (RedisModule_IsIOError(rdb)) - return REDISMODULE_ERR; - if (count) - after_str = RedisModule_LoadString(rdb); + if (async_loading == 0) { + if (after_str) + RedisModule_FreeString(ctx, after_str); + after_str = NULL; + int count = RedisModule_LoadSigned(rdb); + if (RedisModule_IsIOError(rdb)) + return REDISMODULE_ERR; + if (count) + after_str = RedisModule_LoadString(rdb); + } else { + if (after_str_temp) + RedisModule_FreeString(ctx, after_str_temp); + after_str_temp = NULL; + int count = RedisModule_LoadSigned(rdb); + if (RedisModule_IsIOError(rdb)) + return REDISMODULE_ERR; + if (count) + after_str_temp = RedisModule_LoadString(rdb); + } } + if (RedisModule_IsIOError(rdb)) return REDISMODULE_ERR; return REDISMODULE_OK; @@ -162,6 +193,21 @@ int testrdb_get_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } +/* For purpose of testing module events, expose variable state during async_loading. */ +int testrdb_async_loading_get_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + if (argc != 1){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + if (before_str_temp) + RedisModule_ReplyWithString(ctx, before_str_temp); + else + RedisModule_ReplyWithStringBuffer(ctx, "", 0); + return REDISMODULE_OK; +} + int testrdb_set_after(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 2){ @@ -226,11 +272,10 @@ int testrdb_get_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); - if (RedisModule_Init(ctx,"testrdb",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; - RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS); + RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS | REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD); if (argc > 0) RedisModule_StringToLongLong(argv[0], &conf_aux_count); @@ -274,6 +319,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx,"testrdb.get.before", testrdb_get_before,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"testrdb.async_loading.get.before", testrdb_async_loading_get_before,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"testrdb.set.after", testrdb_set_after,"deny-oom",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; @@ -287,7 +335,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; RedisModule_SubscribeToServerEvent(ctx, - RedisModuleEvent_ReplBackup, replBackupCallback); + RedisModuleEvent_ReplAsyncLoad, replAsyncLoadCallback); return REDISMODULE_OK; } @@ -297,5 +345,9 @@ int RedisModule_OnUnload(RedisModuleCtx *ctx) { RedisModule_FreeString(ctx, before_str); if (after_str) RedisModule_FreeString(ctx, after_str); + if (before_str_temp) + RedisModule_FreeString(ctx, before_str_temp); + if (after_str_temp) + RedisModule_FreeString(ctx, after_str_temp); return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/testrdb.tcl b/tests/unit/moduleapi/testrdb.tcl index 8bc0f7cd4..12d6a2a77 100644 --- a/tests/unit/moduleapi/testrdb.tcl +++ b/tests/unit/moduleapi/testrdb.tcl @@ -131,5 +131,120 @@ tags "modules" { } } } + + # Module events for diskless load swapdb when async_loading (matching master replid) + foreach testType {Successful Aborted} { + start_server [list overrides [list loadmodule "$testmodule 2"] tags [list external:skip]] { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + start_server [list overrides [list loadmodule "$testmodule 2"]] { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + set start [clock clicks -milliseconds] + + # Set master and replica to use diskless replication on swapdb mode + $master config set repl-diskless-sync yes + $master config set repl-diskless-sync-delay 0 + $master config set save "" + $replica config set repl-diskless-load swapdb + $replica config set save "" + + # Initial sync to have matching replids between master and replica + $replica replicaof $master_host $master_port + + # Let replica finish initial sync with master + wait_for_condition 100 100 { + [s -1 master_link_status] eq "up" + } else { + fail "Master <-> Replica didn't finish sync" + } + + # Set global values on module so we can check if module event callbacks will pick it up correctly + $master testrdb.set.before value1_master + $replica testrdb.set.before value1_replica + + # Put different data sets on the master and replica + # We need to put large keys on the master since the replica replies to info only once in 2mb + $replica debug populate 200 slave 10 + $master debug populate 1000 master 100000 + $master config set rdbcompression no + + # Force the replica to try another full sync (this time it will have matching master replid) + $master multi + $master client kill type replica + # Fill replication backlog with new content + $master config set repl-backlog-size 16384 + for {set keyid 0} {$keyid < 10} {incr keyid} { + $master set "$keyid string_$keyid" [string repeat A 16384] + } + $master exec + + switch $testType { + "Aborted" { + # Set master with a slow rdb generation, so that we can easily intercept loading + # 10ms per key, with 1000 keys is 10 seconds + $master config set rdb-key-save-delay 10000 + + test {Diskless load swapdb RedisModuleEvent_ReplAsyncLoad handling: during loading, can keep module variable same as before} { + # Wait for the replica to start reading the rdb and module for acknowledgement + # We wanna abort only after the temp db was populated by REDISMODULE_AUX_BEFORE_RDB + wait_for_condition 100 100 { + [s -1 async_loading] eq 1 && [$replica testrdb.async_loading.get.before] eq "value1_master" + } else { + fail "Module didn't receive or react to REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED" + } + + assert_equal [$replica dbsize] 200 + assert_equal value1_replica [$replica testrdb.get.before] + } + + # Make sure that next sync will not start immediately so that we can catch the replica in between syncs + $master config set repl-diskless-sync-delay 5 + + # Kill the replica connection on the master + set killed [$master client kill type replica] + + test {Diskless load swapdb RedisModuleEvent_ReplAsyncLoad handling: when loading aborted, can keep module variable same as before} { + # Wait for loading to stop (fail) and module for acknowledgement + wait_for_condition 100 100 { + [s -1 async_loading] eq 0 && [$replica testrdb.async_loading.get.before] eq "" + } else { + fail "Module didn't receive or react to REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED" + } + + assert_equal [$replica dbsize] 200 + assert_equal value1_replica [$replica testrdb.get.before] + } + + # Speed up shutdown + $master config set rdb-key-save-delay 0 + } + "Successful" { + # Let replica finish sync with master + wait_for_condition 100 100 { + [s -1 master_link_status] eq "up" + } else { + fail "Master <-> Replica didn't finish sync" + } + + test {Diskless load swapdb RedisModuleEvent_ReplAsyncLoad handling: after db loaded, can set module variable with new value} { + assert_equal [$replica dbsize] 1010 + assert_equal value1_master [$replica testrdb.get.before] + } + } + } + + if {$::verbose} { + set end [clock clicks -milliseconds] + set duration [expr $end - $start] + puts "test took $duration ms" + } + } + } + } } }