From 2da48575e43070c993eada3e3e0674a7d91eecc0 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 15 Apr 2020 16:34:07 -0400 Subject: [PATCH] Fix incorrect cluster slot tracking (regression from merge) Former-commit-id: 4705f29e2f62d90c374e072319c8cd486d32f807 --- src/db.cpp | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index db95113d8..666812982 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -204,8 +204,9 @@ int dbAddCore(redisDb *db, robj *key, robj *val) { if (retval == DICT_OK) { if (val->type == OBJ_LIST || - val->type == OBJ_ZSET) - signalKeyAsReady(db, key); + val->type == OBJ_ZSET || + val->type == OBJ_STREAM) + signalKeyAsReady(db, key); if (g_pserver->cluster_enabled) slotToKeyAdd(key); } else @@ -224,11 +225,6 @@ void dbAdd(redisDb *db, robj *key, robj *val) { int retval = dbAddCore(db, key, val); serverAssertWithInfo(NULL,key,retval == DICT_OK); - if (val->type == OBJ_LIST || - val->type == OBJ_ZSET || - val->type == OBJ_STREAM) - signalKeyAsReady(db, key); - if (g_pserver->cluster_enabled) slotToKeyAdd(key); } void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire) @@ -1880,6 +1876,8 @@ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) * while rehashing the cluster and in other conditions when we need to * understand if we have keys for a given hash slot. */ void slotToKeyUpdateKey(robj *key, int add) { + serverAssert(GlobalLocksAcquired()); + size_t keylen = sdslen(szFromObj(key)); unsigned int hashslot = keyHashSlot(szFromObj(key),keylen); unsigned char buf[64]; @@ -1890,11 +1888,13 @@ void slotToKeyUpdateKey(robj *key, int add) { indexed[0] = (hashslot >> 8) & 0xff; indexed[1] = hashslot & 0xff; memcpy(indexed+2,ptrFromObj(key),keylen); + int fModified = false; if (add) { - raxInsert(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL,NULL); + fModified = raxInsert(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL,NULL); } else { - raxRemove(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL); + fModified = raxRemove(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL); } + serverAssert(fModified); if (indexed != buf) zfree(indexed); } @@ -1907,6 +1907,8 @@ void slotToKeyDel(robj *key) { } void slotToKeyFlush(void) { + serverAssert(GlobalLocksAcquired()); + raxFree(g_pserver->cluster->slots_to_keys); g_pserver->cluster->slots_to_keys = raxNew(); memset(g_pserver->cluster->slots_keys_count,0, @@ -1936,6 +1938,8 @@ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int coun /* Remove all the keys in the specified hash slot. * The number of removed items is returned. */ unsigned int delKeysInSlot(unsigned int hashslot) { + serverAssert(GlobalLocksAcquired()); + raxIterator iter; int j = 0; unsigned char indexed[2]; @@ -1947,8 +1951,10 @@ unsigned int delKeysInSlot(unsigned int hashslot) { raxSeek(&iter,">=",indexed,2); raxNext(&iter); + auto count = g_pserver->cluster->slots_keys_count[hashslot]; robj *key = createStringObject((char*)iter.key+2,iter.key_len-2); dbDelete(&g_pserver->db[0],key); + serverAssert(count > g_pserver->cluster->slots_keys_count[hashslot]); // we should have deleted something or we will be in an infinite loop decrRefCount(key); j++; }