Fix incorrect cluster slot tracking (regression from merge)
Former-commit-id: 4705f29e2f62d90c374e072319c8cd486d32f807
This commit is contained in:
parent
f69c169c04
commit
2da48575e4
24
src/db.cpp
24
src/db.cpp
@ -204,8 +204,9 @@ int dbAddCore(redisDb *db, robj *key, robj *val) {
|
|||||||
if (retval == DICT_OK)
|
if (retval == DICT_OK)
|
||||||
{
|
{
|
||||||
if (val->type == OBJ_LIST ||
|
if (val->type == OBJ_LIST ||
|
||||||
val->type == OBJ_ZSET)
|
val->type == OBJ_ZSET ||
|
||||||
signalKeyAsReady(db, key);
|
val->type == OBJ_STREAM)
|
||||||
|
signalKeyAsReady(db, key);
|
||||||
if (g_pserver->cluster_enabled) slotToKeyAdd(key);
|
if (g_pserver->cluster_enabled) slotToKeyAdd(key);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -224,11 +225,6 @@ void dbAdd(redisDb *db, robj *key, robj *val)
|
|||||||
{
|
{
|
||||||
int retval = dbAddCore(db, key, val);
|
int retval = dbAddCore(db, key, val);
|
||||||
serverAssertWithInfo(NULL,key,retval == DICT_OK);
|
serverAssertWithInfo(NULL,key,retval == DICT_OK);
|
||||||
if (val->type == OBJ_LIST ||
|
|
||||||
val->type == OBJ_ZSET ||
|
|
||||||
val->type == OBJ_STREAM)
|
|
||||||
signalKeyAsReady(db, key);
|
|
||||||
if (g_pserver->cluster_enabled) slotToKeyAdd(key);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire)
|
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire)
|
||||||
@ -1880,6 +1876,8 @@ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys)
|
|||||||
* while rehashing the cluster and in other conditions when we need to
|
* while rehashing the cluster and in other conditions when we need to
|
||||||
* understand if we have keys for a given hash slot. */
|
* understand if we have keys for a given hash slot. */
|
||||||
void slotToKeyUpdateKey(robj *key, int add) {
|
void slotToKeyUpdateKey(robj *key, int add) {
|
||||||
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
|
||||||
size_t keylen = sdslen(szFromObj(key));
|
size_t keylen = sdslen(szFromObj(key));
|
||||||
unsigned int hashslot = keyHashSlot(szFromObj(key),keylen);
|
unsigned int hashslot = keyHashSlot(szFromObj(key),keylen);
|
||||||
unsigned char buf[64];
|
unsigned char buf[64];
|
||||||
@ -1890,11 +1888,13 @@ void slotToKeyUpdateKey(robj *key, int add) {
|
|||||||
indexed[0] = (hashslot >> 8) & 0xff;
|
indexed[0] = (hashslot >> 8) & 0xff;
|
||||||
indexed[1] = hashslot & 0xff;
|
indexed[1] = hashslot & 0xff;
|
||||||
memcpy(indexed+2,ptrFromObj(key),keylen);
|
memcpy(indexed+2,ptrFromObj(key),keylen);
|
||||||
|
int fModified = false;
|
||||||
if (add) {
|
if (add) {
|
||||||
raxInsert(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL,NULL);
|
fModified = raxInsert(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL,NULL);
|
||||||
} else {
|
} else {
|
||||||
raxRemove(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL);
|
fModified = raxRemove(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL);
|
||||||
}
|
}
|
||||||
|
serverAssert(fModified);
|
||||||
if (indexed != buf) zfree(indexed);
|
if (indexed != buf) zfree(indexed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1907,6 +1907,8 @@ void slotToKeyDel(robj *key) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void slotToKeyFlush(void) {
|
void slotToKeyFlush(void) {
|
||||||
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
|
||||||
raxFree(g_pserver->cluster->slots_to_keys);
|
raxFree(g_pserver->cluster->slots_to_keys);
|
||||||
g_pserver->cluster->slots_to_keys = raxNew();
|
g_pserver->cluster->slots_to_keys = raxNew();
|
||||||
memset(g_pserver->cluster->slots_keys_count,0,
|
memset(g_pserver->cluster->slots_keys_count,0,
|
||||||
@ -1936,6 +1938,8 @@ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int coun
|
|||||||
/* Remove all the keys in the specified hash slot.
|
/* Remove all the keys in the specified hash slot.
|
||||||
* The number of removed items is returned. */
|
* The number of removed items is returned. */
|
||||||
unsigned int delKeysInSlot(unsigned int hashslot) {
|
unsigned int delKeysInSlot(unsigned int hashslot) {
|
||||||
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
|
||||||
raxIterator iter;
|
raxIterator iter;
|
||||||
int j = 0;
|
int j = 0;
|
||||||
unsigned char indexed[2];
|
unsigned char indexed[2];
|
||||||
@ -1947,8 +1951,10 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
|
|||||||
raxSeek(&iter,">=",indexed,2);
|
raxSeek(&iter,">=",indexed,2);
|
||||||
raxNext(&iter);
|
raxNext(&iter);
|
||||||
|
|
||||||
|
auto count = g_pserver->cluster->slots_keys_count[hashslot];
|
||||||
robj *key = createStringObject((char*)iter.key+2,iter.key_len-2);
|
robj *key = createStringObject((char*)iter.key+2,iter.key_len-2);
|
||||||
dbDelete(&g_pserver->db[0],key);
|
dbDelete(&g_pserver->db[0],key);
|
||||||
|
serverAssert(count > g_pserver->cluster->slots_keys_count[hashslot]); // we should have deleted something or we will be in an infinite loop
|
||||||
decrRefCount(key);
|
decrRefCount(key);
|
||||||
j++;
|
j++;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user