Fix cluster test failure

Former-commit-id: 230fde39882766426d892fd30d0c8349f5cca912
This commit is contained in:
John Sully 2021-02-07 23:38:25 +00:00
parent ea74c705a6
commit 4c9f912c3b
4 changed files with 52 additions and 20 deletions

View File

@ -37,7 +37,7 @@
/* Database backup. */ /* Database backup. */
struct dbBackup { struct dbBackup {
redisDb **dbarray; const redisDbPersistentDataSnapshot **dbarray;
rax *slots_to_keys; rax *slots_to_keys;
uint64_t slots_keys_count[CLUSTER_SLOTS]; uint64_t slots_keys_count[CLUSTER_SLOTS];
}; };
@ -580,34 +580,66 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
/* Store a backup of the database for later use, and put an empty one /* Store a backup of the database for later use, and put an empty one
* instead of it. */ * instead of it. */
const redisDbPersistentDataSnapshot **backupDb(void) { const dbBackup *backupDb(void) {
const redisDbPersistentDataSnapshot **backup = (const redisDbPersistentDataSnapshot**)zmalloc(sizeof(redisDbPersistentDataSnapshot*)*cserver.dbnum); dbBackup *backup = new dbBackup();
backup->dbarray = (const redisDbPersistentDataSnapshot**)zmalloc(sizeof(redisDbPersistentDataSnapshot*) * cserver.dbnum);
for (int i=0; i<cserver.dbnum; i++) { for (int i=0; i<cserver.dbnum; i++) {
backup[i] = g_pserver->db[i]->createSnapshot(LLONG_MAX, false); backup->dbarray[i] = g_pserver->db[i]->createSnapshot(LLONG_MAX, false);
} }
/* Backup cluster slots to keys map if enable cluster. */
if (g_pserver->cluster_enabled) {
backup->slots_to_keys = g_pserver->cluster->slots_to_keys;
memcpy(backup->slots_keys_count, g_pserver->cluster->slots_keys_count,
sizeof(g_pserver->cluster->slots_keys_count));
g_pserver->cluster->slots_to_keys = raxNew();
memset(g_pserver->cluster->slots_keys_count, 0,
sizeof(g_pserver->cluster->slots_keys_count));
}
return backup; return backup;
} }
/* Discard a previously created backup, this can be slow (similar to FLUSHALL) /* Discard a previously created backup, this can be slow (similar to FLUSHALL)
* Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */ * Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */
void discardDbBackup(const redisDbPersistentDataSnapshot **buckup, int flags, void(callback)(void*)) { void discardDbBackup(const dbBackup *backup, int flags, void(callback)(void*)) {
int async = (flags & EMPTYDB_ASYNC);
/* Release main DBs backup . */ /* Release main DBs backup . */
for (int i=0; i<cserver.dbnum; i++) { for (int i=0; i<cserver.dbnum; i++) {
g_pserver->db[i]->endSnapshot(buckup[i]); g_pserver->db[i]->endSnapshot(backup->dbarray[i]);
} }
zfree(buckup);
/* Release slots to keys map backup if enable cluster. */
if (g_pserver->cluster_enabled) freeSlotsToKeysMap(backup->slots_to_keys, async);
zfree(backup->dbarray);
delete backup;
} }
/* Restore the previously created backup (discarding what currently resides /* Restore the previously created backup (discarding what currently resides
* in the db). * in the db).
* This function should be called after the current contents of the database * This function should be called after the current contents of the database
* was emptied with a previous call to emptyDb (possibly using the async mode). */ * was emptied with a previous call to emptyDb (possibly using the async mode). */
void restoreDbBackup(const redisDbPersistentDataSnapshot **buckup) { void restoreDbBackup(const dbBackup *backup) {
/* Restore main DBs. */ /* Restore main DBs. */
for (int i=0; i<cserver.dbnum; i++) { for (int i=0; i<cserver.dbnum; i++) {
g_pserver->db[i]->restoreSnapshot(buckup[i]); g_pserver->db[i]->restoreSnapshot(backup->dbarray[i]);
} }
zfree(buckup);
/* Restore slots to keys map backup if enable cluster. */
if (g_pserver->cluster_enabled) {
serverAssert(g_pserver->cluster->slots_to_keys->numele == 0);
raxFree(g_pserver->cluster->slots_to_keys);
g_pserver->cluster->slots_to_keys = backup->slots_to_keys;
memcpy(g_pserver->cluster->slots_keys_count, backup->slots_keys_count,
sizeof(g_pserver->cluster->slots_keys_count));
}
/* Release buckup. */
zfree(backup->dbarray);
delete backup;
} }
int selectDb(client *c, int id) { int selectDb(client *c, int id) {
@ -2481,7 +2513,7 @@ void redisDbPersistentData::clear(void(callback)(void*))
m_setexpire = new (MALLOC_LOCAL) expireset(); m_setexpire = new (MALLOC_LOCAL) expireset();
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
m_spstorage->clear(); m_spstorage->clear();
dictEmpty(m_pdictTombstone,nullptr); dictEmpty(m_pdictTombstone,callback);
m_pdbSnapshot = nullptr; m_pdbSnapshot = nullptr;
} }

View File

@ -1874,7 +1874,7 @@ static int useDisklessLoad() {
/* Helper function for readSyncBulkPayload() to make backups of the current /* Helper function for readSyncBulkPayload() to make backups of the current
* databases before socket-loading the new ones. The backups may be restored * databases before socket-loading the new ones. The backups may be restored
* by disklessLoadRestoreBackup or freed by disklessLoadDiscardBackup later. */ * by disklessLoadRestoreBackup or freed by disklessLoadDiscardBackup later. */
const redisDbPersistentDataSnapshot **disklessLoadMakeBackup(void) { const dbBackup *disklessLoadMakeBackup(void) {
return backupDb(); return backupDb();
} }
@ -1884,13 +1884,13 @@ const redisDbPersistentDataSnapshot **disklessLoadMakeBackup(void) {
* *
* If the socket loading went wrong, we want to restore the old backups * If the socket loading went wrong, we want to restore the old backups
* into the server databases. */ * into the server databases. */
void disklessLoadRestoreBackup(const redisDbPersistentDataSnapshot **buckup) { void disklessLoadRestoreBackup(const dbBackup *buckup) {
restoreDbBackup(buckup); restoreDbBackup(buckup);
} }
/* Helper function for readSyncBulkPayload() to discard our old backups /* Helper function for readSyncBulkPayload() to discard our old backups
* when the loading succeeded. */ * when the loading succeeded. */
void disklessLoadDiscardBackup(const redisDbPersistentDataSnapshot **buckup, int flag) { void disklessLoadDiscardBackup(const dbBackup *buckup, int flag) {
discardDbBackup(buckup, flag, replicationEmptyDbCallback); discardDbBackup(buckup, flag, replicationEmptyDbCallback);
} }
@ -1900,7 +1900,7 @@ void readSyncBulkPayload(connection *conn) {
char buf[PROTO_IOBUF_LEN]; char buf[PROTO_IOBUF_LEN];
ssize_t nread, readlen, nwritten; ssize_t nread, readlen, nwritten;
int use_diskless_load = useDisklessLoad(); int use_diskless_load = useDisklessLoad();
const redisDbPersistentDataSnapshot **diskless_load_backup = NULL; const dbBackup *diskless_load_backup = NULL;
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC : int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS; EMPTYDB_NO_FLAGS;

View File

@ -3025,9 +3025,9 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*));
long long emptyDbStructure(redisDb **dbarray, int dbnum, int flags, void(callback)(void*)); long long emptyDbStructure(redisDb **dbarray, int dbnum, int flags, void(callback)(void*));
void flushAllDataAndResetRDB(int flags); void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount(); long long dbTotalServerKeyCount();
const redisDbPersistentDataSnapshot **backupDb(void); const dbBackup *backupDb(void);
void restoreDbBackup(const redisDbPersistentDataSnapshot **buckup); void restoreDbBackup(const dbBackup *buckup);
void discardDbBackup(const redisDbPersistentDataSnapshot **buckup, int flags, void(callback)(void*)); void discardDbBackup(const dbBackup *buckup, int flags, void(callback)(void*));
int selectDb(client *c, int id); int selectDb(client *c, int id);

View File

@ -38,7 +38,7 @@ test "Right to restore backups when fail to diskless load " {
$master set $slot0_key 1 $master set $slot0_key 1
after 100 after 100
assert_equal {1} [$replica get $slot0_key] assert_equal {1} [$replica get $slot0_key]
assert_equal $slot0_key [$replica CLUSTER GETKEYSINSLOT 0 1] assert_equal $slot0_key [$replica CLUSTER GETKEYSINSLOT 0 1] "THIS ONE"
# Save an RDB and kill the replica # Save an RDB and kill the replica
$replica save $replica save
@ -75,5 +75,5 @@ test "Right to restore backups when fail to diskless load " {
# Replica keys and keys to slots map still both are right # Replica keys and keys to slots map still both are right
assert_equal {1} [$replica get $slot0_key] assert_equal {1} [$replica get $slot0_key]
assert_equal $slot0_key [$replica CLUSTER GETKEYSINSLOT 0 1] assert_equal $slot0_key [$replica CLUSTER GETKEYSINSLOT 0 1] "POST RUN"
} }