From fe3630634031bde9c2b8a9cc8a3ab4e94ad8f680 Mon Sep 17 00:00:00 2001 From: Binbin Date: Tue, 14 Nov 2023 20:28:46 +0800 Subject: [PATCH] Fix DB iterator not resetting pauserehash causing dict being unable to rehash (#12757) When using DB iterator, it will use dictInitSafeIterator to init a old safe dict iterator. When dbIteratorNext is used, it will jump to the next slot db dict when we are done a dict. During this process, we do not have any calls to dictResumeRehashing, which causes the dict's pauserehash to always be > 0. And at last, it will be returned directly in dictRehashMilliseconds, which causes us to have slot dict in a state where rehash cannot be completed. In the "expire scan should skip dictionaries with lot's of empty buckets" test, adding a `keys *` can reproduce the problem stably. `keys *` will call dbIteratorNext to trigger a traversal of all slot dicts. Added dbReleaseIterator and dbIteratorInitNextSafeIterator methods to call dictResetIterator. Issue was introduced in #11695. --- src/aof.c | 4 ++-- src/db.c | 27 +++++++++++++++++++++++---- src/debug.c | 2 +- src/rdb.c | 4 ++-- src/server.c | 3 +-- src/server.h | 3 ++- tests/unit/expire.tcl | 5 ++++- 7 files changed, 35 insertions(+), 13 deletions(-) diff --git a/src/aof.c b/src/aof.c index 099a0bc47..6ba4bcc7a 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2328,12 +2328,12 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (server.rdb_key_save_delay) debugDelay(server.rdb_key_save_delay); } - zfree(dbit); + dbReleaseIterator(dbit); } return C_OK; werr: - if (dbit) zfree(dbit); + if (dbit) dbReleaseIterator(dbit); return C_ERR; } diff --git a/src/db.c b/src/db.c index 77bcb8e95..8260676d4 100644 --- a/src/db.c +++ b/src/db.c @@ -87,6 +87,12 @@ dictEntry *dbIteratorNext(dbIterator *dbit) { if (!de) { /* No current dict or reached the end of the dictionary. */ dict *d = dbIteratorNextDict(dbit); if (!d) return NULL; + + if (dbit->di.d) { + /* Before we move to the next dict, reset the iter of the previous dict. */ + dictIterator *iter = &dbit->di; + dictResetIterator(iter); + } dictInitSafeIterator(&dbit->di, d); de = dictNext(&dbit->di); } @@ -95,7 +101,9 @@ dictEntry *dbIteratorNext(dbIterator *dbit) { /* Returns DB iterator that can be used to iterate through sub-dictionaries. * Primary database contains only one dictionary when node runs without cluster mode, - * or 16k dictionaries (one per slot) when node runs with cluster mode enabled. */ + * or 16k dictionaries (one per slot) when node runs with cluster mode enabled. + * + * The caller should free the resulting dbit with dbReleaseIterator. */ dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType) { dbIterator *dbit = zmalloc(sizeof(*dbit)); dbit->db = db; @@ -106,6 +114,9 @@ dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType) { return dbit; } +/* Returns DB iterator that can be used to iterate through sub-dictionaries. + * + * The caller should free the resulting dbit with dbReleaseIterator. */ dbIterator *dbIteratorInitFromSlot(redisDb *db, dbKeyType keyType, int slot) { dbIterator *dbit = zmalloc(sizeof(*dbit)); dbit->db = db; @@ -116,6 +127,14 @@ dbIterator *dbIteratorInitFromSlot(redisDb *db, dbKeyType keyType, int slot) { return dbit; } +/* Free the dbit returned by dbIteratorInit or dbIteratorInitFromSlot. */ +void dbReleaseIterator(dbIterator *dbit) { + dictIterator *iter = &dbit->di; + dictResetIterator(iter); + + zfree(dbit); +} + /* Returns next non-empty slot strictly after given one, or -1 if provided slot is the last one. */ int dbGetNextNonEmptySlot(redisDb *db, int slot, dbKeyType keyType) { unsigned long long next_key = cumulativeKeyCountRead(db, slot, keyType) + 1; @@ -999,7 +1018,7 @@ void keysCommand(client *c) { if (c->flags & CLIENT_CLOSE_ASAP) break; } - zfree(dbit); + dbReleaseIterator(dbit); setDeferredArrayLen(c,replylen,numkeys); } @@ -1388,7 +1407,7 @@ unsigned long long int dbSize(redisDb *db, dbKeyType keyType) { return db->sub_dict[keyType].key_count; } -/* This method proivdes the cumulative sum of all the dictionary buckets +/* This method provides the cumulative sum of all the dictionary buckets * across dictionaries in a database. */ unsigned long dbBuckets(redisDb *db, dbKeyType keyType) { if (server.cluster_enabled) { @@ -2976,7 +2995,7 @@ void dbGetStats(char *buf, size_t bufsize, redisDb *db, int full, dbKeyType keyT } } } - zfree(dbit); + dbReleaseIterator(dbit); l = dictGetStatsMsg(buf, bufsize, mainHtStats, full); dictFreeStats(mainHtStats); buf += l; diff --git a/src/debug.c b/src/debug.c index fd1cf21c2..e537126f8 100644 --- a/src/debug.c +++ b/src/debug.c @@ -315,7 +315,7 @@ void computeDatasetDigest(unsigned char *final) { xorDigest(final,digest,20); decrRefCount(keyobj); } - zfree(dbit); + dbReleaseIterator(dbit); } } diff --git a/src/rdb.c b/src/rdb.c index 7cb180642..d983a99f2 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1368,11 +1368,11 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { } } } - zfree(dbit); + dbReleaseIterator(dbit); return written; werr: - if (dbit) zfree(dbit); + if (dbit) dbReleaseIterator(dbit); return -1; } diff --git a/src/server.c b/src/server.c index e8553984c..e63a2ffff 100644 --- a/src/server.c +++ b/src/server.c @@ -654,7 +654,7 @@ void tryResizeHashTables(int dbid) { } /* Save current iterator position in the resize_cursor. */ db->sub_dict[subdict].resize_cursor = slot; - zfree(dbit); + dbReleaseIterator(dbit); } } @@ -1511,7 +1511,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * call this function when needed. */ updateDictResizePolicy(); - /* AOF postponed flush: Try at every cron cycle if the slow fsync * completed. */ if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) && diff --git a/src/server.h b/src/server.h index 4f20fe231..902050889 100644 --- a/src/server.h +++ b/src/server.h @@ -997,7 +997,7 @@ typedef struct redisDb { long long avg_ttl; /* Average TTL, just for stats */ unsigned long expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ - int dict_count; /* Indicates total number of dictionaires owned by this DB, 1 dict per slot in cluster mode. */ + int dict_count; /* Indicates total number of dictionaries owned by this DB, 1 dict per slot in cluster mode. */ dbDictState sub_dict[2]; /* Metadata for main and expires dictionaries */ } redisDb; @@ -2436,6 +2436,7 @@ typedef struct dbIterator dbIterator; /* DB iterator specific functions */ dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType); dbIterator *dbIteratorInitFromSlot(redisDb *db, dbKeyType keyType, int slot); +void dbReleaseIterator(dbIterator *dbit); dict *dbIteratorNextDict(dbIterator *dbit); dict *dbGetDictFromIterator(dbIterator *dbit); int dbIteratorGetCurrentSlot(dbIterator *dbit); diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index 339a52a48..5aa763578 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -840,7 +840,7 @@ start_cluster 1 0 {tags {"expire external:skip cluster slow"}} { # Collect two slots to help determine the expiry scan logic is able # to go past certain slots which aren't valid for scanning at the given point of time. - # And the next non empyt slot after that still gets scanned and expiration happens. + # And the next non empty slot after that still gets scanned and expiration happens. # hashslot(alice) is 749 r psetex alice 500 val @@ -862,6 +862,9 @@ start_cluster 1 0 {tags {"expire external:skip cluster slow"}} { r del "{foo}$j" } + # Trigger a full traversal of all dictionaries. + r keys * + r debug set-active-expire 1 # Verify {foo}100 still exists and remaining got cleaned up