Fix DB iterator not resetting pauserehash causing dict being unable to rehash (#12757)
When using DB iterator, it will use dictInitSafeIterator to init a old safe dict iterator. When dbIteratorNext is used, it will jump to the next slot db dict when we are done a dict. During this process, we do not have any calls to dictResumeRehashing, which causes the dict's pauserehash to always be > 0. And at last, it will be returned directly in dictRehashMilliseconds, which causes us to have slot dict in a state where rehash cannot be completed. In the "expire scan should skip dictionaries with lot's of empty buckets" test, adding a `keys *` can reproduce the problem stably. `keys *` will call dbIteratorNext to trigger a traversal of all slot dicts. Added dbReleaseIterator and dbIteratorInitNextSafeIterator methods to call dictResetIterator. Issue was introduced in #11695.
This commit is contained in:
parent
a9e73c00bc
commit
fe36306340
@ -2328,12 +2328,12 @@ int rewriteAppendOnlyFileRio(rio *aof) {
|
|||||||
if (server.rdb_key_save_delay)
|
if (server.rdb_key_save_delay)
|
||||||
debugDelay(server.rdb_key_save_delay);
|
debugDelay(server.rdb_key_save_delay);
|
||||||
}
|
}
|
||||||
zfree(dbit);
|
dbReleaseIterator(dbit);
|
||||||
}
|
}
|
||||||
return C_OK;
|
return C_OK;
|
||||||
|
|
||||||
werr:
|
werr:
|
||||||
if (dbit) zfree(dbit);
|
if (dbit) dbReleaseIterator(dbit);
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
27
src/db.c
27
src/db.c
@ -87,6 +87,12 @@ dictEntry *dbIteratorNext(dbIterator *dbit) {
|
|||||||
if (!de) { /* No current dict or reached the end of the dictionary. */
|
if (!de) { /* No current dict or reached the end of the dictionary. */
|
||||||
dict *d = dbIteratorNextDict(dbit);
|
dict *d = dbIteratorNextDict(dbit);
|
||||||
if (!d) return NULL;
|
if (!d) return NULL;
|
||||||
|
|
||||||
|
if (dbit->di.d) {
|
||||||
|
/* Before we move to the next dict, reset the iter of the previous dict. */
|
||||||
|
dictIterator *iter = &dbit->di;
|
||||||
|
dictResetIterator(iter);
|
||||||
|
}
|
||||||
dictInitSafeIterator(&dbit->di, d);
|
dictInitSafeIterator(&dbit->di, d);
|
||||||
de = dictNext(&dbit->di);
|
de = dictNext(&dbit->di);
|
||||||
}
|
}
|
||||||
@ -95,7 +101,9 @@ dictEntry *dbIteratorNext(dbIterator *dbit) {
|
|||||||
|
|
||||||
/* Returns DB iterator that can be used to iterate through sub-dictionaries.
|
/* Returns DB iterator that can be used to iterate through sub-dictionaries.
|
||||||
* Primary database contains only one dictionary when node runs without cluster mode,
|
* Primary database contains only one dictionary when node runs without cluster mode,
|
||||||
* or 16k dictionaries (one per slot) when node runs with cluster mode enabled. */
|
* or 16k dictionaries (one per slot) when node runs with cluster mode enabled.
|
||||||
|
*
|
||||||
|
* The caller should free the resulting dbit with dbReleaseIterator. */
|
||||||
dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType) {
|
dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType) {
|
||||||
dbIterator *dbit = zmalloc(sizeof(*dbit));
|
dbIterator *dbit = zmalloc(sizeof(*dbit));
|
||||||
dbit->db = db;
|
dbit->db = db;
|
||||||
@ -106,6 +114,9 @@ dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType) {
|
|||||||
return dbit;
|
return dbit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Returns DB iterator that can be used to iterate through sub-dictionaries.
|
||||||
|
*
|
||||||
|
* The caller should free the resulting dbit with dbReleaseIterator. */
|
||||||
dbIterator *dbIteratorInitFromSlot(redisDb *db, dbKeyType keyType, int slot) {
|
dbIterator *dbIteratorInitFromSlot(redisDb *db, dbKeyType keyType, int slot) {
|
||||||
dbIterator *dbit = zmalloc(sizeof(*dbit));
|
dbIterator *dbit = zmalloc(sizeof(*dbit));
|
||||||
dbit->db = db;
|
dbit->db = db;
|
||||||
@ -116,6 +127,14 @@ dbIterator *dbIteratorInitFromSlot(redisDb *db, dbKeyType keyType, int slot) {
|
|||||||
return dbit;
|
return dbit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Free the dbit returned by dbIteratorInit or dbIteratorInitFromSlot. */
|
||||||
|
void dbReleaseIterator(dbIterator *dbit) {
|
||||||
|
dictIterator *iter = &dbit->di;
|
||||||
|
dictResetIterator(iter);
|
||||||
|
|
||||||
|
zfree(dbit);
|
||||||
|
}
|
||||||
|
|
||||||
/* Returns next non-empty slot strictly after given one, or -1 if provided slot is the last one. */
|
/* Returns next non-empty slot strictly after given one, or -1 if provided slot is the last one. */
|
||||||
int dbGetNextNonEmptySlot(redisDb *db, int slot, dbKeyType keyType) {
|
int dbGetNextNonEmptySlot(redisDb *db, int slot, dbKeyType keyType) {
|
||||||
unsigned long long next_key = cumulativeKeyCountRead(db, slot, keyType) + 1;
|
unsigned long long next_key = cumulativeKeyCountRead(db, slot, keyType) + 1;
|
||||||
@ -999,7 +1018,7 @@ void keysCommand(client *c) {
|
|||||||
if (c->flags & CLIENT_CLOSE_ASAP)
|
if (c->flags & CLIENT_CLOSE_ASAP)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
zfree(dbit);
|
dbReleaseIterator(dbit);
|
||||||
setDeferredArrayLen(c,replylen,numkeys);
|
setDeferredArrayLen(c,replylen,numkeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1388,7 +1407,7 @@ unsigned long long int dbSize(redisDb *db, dbKeyType keyType) {
|
|||||||
return db->sub_dict[keyType].key_count;
|
return db->sub_dict[keyType].key_count;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This method proivdes the cumulative sum of all the dictionary buckets
|
/* This method provides the cumulative sum of all the dictionary buckets
|
||||||
* across dictionaries in a database. */
|
* across dictionaries in a database. */
|
||||||
unsigned long dbBuckets(redisDb *db, dbKeyType keyType) {
|
unsigned long dbBuckets(redisDb *db, dbKeyType keyType) {
|
||||||
if (server.cluster_enabled) {
|
if (server.cluster_enabled) {
|
||||||
@ -2976,7 +2995,7 @@ void dbGetStats(char *buf, size_t bufsize, redisDb *db, int full, dbKeyType keyT
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
zfree(dbit);
|
dbReleaseIterator(dbit);
|
||||||
l = dictGetStatsMsg(buf, bufsize, mainHtStats, full);
|
l = dictGetStatsMsg(buf, bufsize, mainHtStats, full);
|
||||||
dictFreeStats(mainHtStats);
|
dictFreeStats(mainHtStats);
|
||||||
buf += l;
|
buf += l;
|
||||||
|
@ -315,7 +315,7 @@ void computeDatasetDigest(unsigned char *final) {
|
|||||||
xorDigest(final,digest,20);
|
xorDigest(final,digest,20);
|
||||||
decrRefCount(keyobj);
|
decrRefCount(keyobj);
|
||||||
}
|
}
|
||||||
zfree(dbit);
|
dbReleaseIterator(dbit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1368,11 +1368,11 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
zfree(dbit);
|
dbReleaseIterator(dbit);
|
||||||
return written;
|
return written;
|
||||||
|
|
||||||
werr:
|
werr:
|
||||||
if (dbit) zfree(dbit);
|
if (dbit) dbReleaseIterator(dbit);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -654,7 +654,7 @@ void tryResizeHashTables(int dbid) {
|
|||||||
}
|
}
|
||||||
/* Save current iterator position in the resize_cursor. */
|
/* Save current iterator position in the resize_cursor. */
|
||||||
db->sub_dict[subdict].resize_cursor = slot;
|
db->sub_dict[subdict].resize_cursor = slot;
|
||||||
zfree(dbit);
|
dbReleaseIterator(dbit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1511,7 +1511,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
* call this function when needed. */
|
* call this function when needed. */
|
||||||
updateDictResizePolicy();
|
updateDictResizePolicy();
|
||||||
|
|
||||||
|
|
||||||
/* AOF postponed flush: Try at every cron cycle if the slow fsync
|
/* AOF postponed flush: Try at every cron cycle if the slow fsync
|
||||||
* completed. */
|
* completed. */
|
||||||
if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
|
if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
|
||||||
|
@ -997,7 +997,7 @@ typedef struct redisDb {
|
|||||||
long long avg_ttl; /* Average TTL, just for stats */
|
long long avg_ttl; /* Average TTL, just for stats */
|
||||||
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
|
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
|
||||||
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
||||||
int dict_count; /* Indicates total number of dictionaires owned by this DB, 1 dict per slot in cluster mode. */
|
int dict_count; /* Indicates total number of dictionaries owned by this DB, 1 dict per slot in cluster mode. */
|
||||||
dbDictState sub_dict[2]; /* Metadata for main and expires dictionaries */
|
dbDictState sub_dict[2]; /* Metadata for main and expires dictionaries */
|
||||||
} redisDb;
|
} redisDb;
|
||||||
|
|
||||||
@ -2436,6 +2436,7 @@ typedef struct dbIterator dbIterator;
|
|||||||
/* DB iterator specific functions */
|
/* DB iterator specific functions */
|
||||||
dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType);
|
dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType);
|
||||||
dbIterator *dbIteratorInitFromSlot(redisDb *db, dbKeyType keyType, int slot);
|
dbIterator *dbIteratorInitFromSlot(redisDb *db, dbKeyType keyType, int slot);
|
||||||
|
void dbReleaseIterator(dbIterator *dbit);
|
||||||
dict *dbIteratorNextDict(dbIterator *dbit);
|
dict *dbIteratorNextDict(dbIterator *dbit);
|
||||||
dict *dbGetDictFromIterator(dbIterator *dbit);
|
dict *dbGetDictFromIterator(dbIterator *dbit);
|
||||||
int dbIteratorGetCurrentSlot(dbIterator *dbit);
|
int dbIteratorGetCurrentSlot(dbIterator *dbit);
|
||||||
|
@ -840,7 +840,7 @@ start_cluster 1 0 {tags {"expire external:skip cluster slow"}} {
|
|||||||
|
|
||||||
# Collect two slots to help determine the expiry scan logic is able
|
# Collect two slots to help determine the expiry scan logic is able
|
||||||
# to go past certain slots which aren't valid for scanning at the given point of time.
|
# to go past certain slots which aren't valid for scanning at the given point of time.
|
||||||
# And the next non empyt slot after that still gets scanned and expiration happens.
|
# And the next non empty slot after that still gets scanned and expiration happens.
|
||||||
|
|
||||||
# hashslot(alice) is 749
|
# hashslot(alice) is 749
|
||||||
r psetex alice 500 val
|
r psetex alice 500 val
|
||||||
@ -862,6 +862,9 @@ start_cluster 1 0 {tags {"expire external:skip cluster slow"}} {
|
|||||||
r del "{foo}$j"
|
r del "{foo}$j"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Trigger a full traversal of all dictionaries.
|
||||||
|
r keys *
|
||||||
|
|
||||||
r debug set-active-expire 1
|
r debug set-active-expire 1
|
||||||
|
|
||||||
# Verify {foo}100 still exists and remaining got cleaned up
|
# Verify {foo}100 still exists and remaining got cleaned up
|
||||||
|
Loading…
x
Reference in New Issue
Block a user