diff --git a/src/aof.c b/src/aof.c index fd0355e18..099a0bc47 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2240,11 +2240,11 @@ werr: } int rewriteAppendOnlyFileRio(rio *aof) { - dictIterator *di = NULL; dictEntry *de; int j; long key_count = 0; long long updated_time = 0; + dbIterator *dbit = NULL; /* Record timestamp at the beginning of rewriting AOF. */ if (server.aof_timestamp_enabled) { @@ -2257,17 +2257,13 @@ int rewriteAppendOnlyFileRio(rio *aof) { for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; - redisDb *db = server.db+j; - dict *d = db->dict; - if (dictSize(d) == 0) continue; - di = dictGetSafeIterator(d); - /* SELECT the new DB */ if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(aof,j) == 0) goto werr; - + redisDb *db = server.db + j; + dbit = dbIteratorInit(db, DB_MAIN); /* Iterate this DB writing every entry */ - while((de = dictNext(di)) != NULL) { + while((de = dbIteratorNext(dbit)) != NULL) { sds keystr; robj key, *o; long long expiretime; @@ -2332,13 +2328,12 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (server.rdb_key_save_delay) debugDelay(server.rdb_key_save_delay); } - dictReleaseIterator(di); - di = NULL; + zfree(dbit); } return C_OK; werr: - if (di) dictReleaseIterator(di); + if (dbit) zfree(dbit); return C_ERR; } diff --git a/src/cluster.c b/src/cluster.c index 332d76572..bd2b7c525 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1050,9 +1050,6 @@ void clusterInit(void) { exit(1); } - /* Initialize data for the Slot to key API. */ - slotToKeyInit(server.db); - /* The slots -> channels map is a radix tree. Initialize it here. */ server.cluster->slots_to_channels = raxNew(); @@ -5113,7 +5110,7 @@ int verifyClusterConfigWithData(void) { /* Make sure we only have keys in DB0. */ for (j = 1; j < server.dbnum; j++) { - if (dictSize(server.db[j].dict)) return C_ERR; + if (dbSize(&server.db[j], DB_MAIN)) return C_ERR; } /* Check that all the slots we see populated memory have a corresponding @@ -5986,7 +5983,7 @@ NULL clusterReplyShards(c); } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) { /* CLUSTER FLUSHSLOTS */ - if (dictSize(server.db[0].dict) != 0) { + if (dbSize(&server.db[0], DB_MAIN) != 0) { addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS."); return; } @@ -6248,13 +6245,16 @@ NULL unsigned int keys_in_slot = countKeysInSlot(slot); unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys; addReplyArrayLen(c,numkeys); - dictEntry *de = (*server.db->slots_to_keys).by_slot[slot].head; - for (unsigned int j = 0; j < numkeys; j++) { + dictIterator *iter = NULL; + dictEntry *de = NULL; + iter = dictGetIterator(server.db->dict[slot]); + for (unsigned int i = 0; i < numkeys; i++) { + de = dictNext(iter); serverAssert(de != NULL); sds sdskey = dictGetKey(de); addReplyBulkCBuffer(c, sdskey, sdslen(sdskey)); - de = dictEntryNextInSlot(de); } + dictReleaseIterator(iter); } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) { /* CLUSTER FORGET */ clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); @@ -6303,7 +6303,7 @@ NULL * slots nor keys to accept to replicate some other node. * Slaves can switch to another master without issues. */ if (nodeIsMaster(myself) && - (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) { + (myself->numslots != 0 || dbSize(&server.db[0], DB_MAIN) != 0)) { addReplyError(c, "To set a master the node must be empty and " "without assigned slots."); @@ -6462,7 +6462,7 @@ NULL /* Slaves can be reset while containing data, but not master nodes * that must be empty. */ - if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) { + if (nodeIsMaster(myself) && dbSize(c->db, DB_MAIN) != 0) { addReplyError(c,"CLUSTER RESET can't be called with " "master nodes containing keys"); return; @@ -7544,98 +7544,16 @@ int clusterRedirectBlockedClientIfNeeded(client *c) { return 0; } -/* Slot to Key API. This is used by Redis Cluster in order to obtain in - * a fast way a key that belongs to a specified hash slot. This is useful - * while rehashing the cluster and in other conditions when we need to - * understand if we have keys for a given hash slot. */ - -void slotToKeyAddEntry(dictEntry *entry, redisDb *db) { - sds key = dictGetKey(entry); - unsigned int hashslot = keyHashSlot(key, sdslen(key)); - slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot]; - slot_to_keys->count++; - - /* Insert entry before the first element in the list. */ - dictEntry *first = slot_to_keys->head; - dictEntryNextInSlot(entry) = first; - if (first != NULL) { - serverAssert(dictEntryPrevInSlot(first) == NULL); - dictEntryPrevInSlot(first) = entry; - } - serverAssert(dictEntryPrevInSlot(entry) == NULL); - slot_to_keys->head = entry; -} - -void slotToKeyDelEntry(dictEntry *entry, redisDb *db) { - sds key = dictGetKey(entry); - unsigned int hashslot = keyHashSlot(key, sdslen(key)); - slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot]; - slot_to_keys->count--; - - /* Connect previous and next entries to each other. */ - dictEntry *next = dictEntryNextInSlot(entry); - dictEntry *prev = dictEntryPrevInSlot(entry); - if (next != NULL) { - dictEntryPrevInSlot(next) = prev; - } - if (prev != NULL) { - dictEntryNextInSlot(prev) = next; - } else { - /* The removed entry was the first in the list. */ - serverAssert(slot_to_keys->head == entry); - slot_to_keys->head = next; - } -} - -/* Updates neighbour entries when an entry has been replaced (e.g. reallocated - * during active defrag). */ -void slotToKeyReplaceEntry(dict *d, dictEntry *entry) { - dictEntry *next = dictEntryNextInSlot(entry); - dictEntry *prev = dictEntryPrevInSlot(entry); - if (next != NULL) { - dictEntryPrevInSlot(next) = entry; - } - if (prev != NULL) { - dictEntryNextInSlot(prev) = entry; - } else { - /* The replaced entry was the first in the list. */ - sds key = dictGetKey(entry); - unsigned int hashslot = keyHashSlot(key, sdslen(key)); - clusterDictMetadata *dictmeta = dictMetadata(d); - redisDb *db = dictmeta->db; - slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot]; - slot_to_keys->head = entry; - } -} - -/* Initialize slots-keys map of given db. */ -void slotToKeyInit(redisDb *db) { - db->slots_to_keys = zcalloc(sizeof(clusterSlotToKeyMapping)); - clusterDictMetadata *dictmeta = dictMetadata(db->dict); - dictmeta->db = db; -} - -/* Empty slots-keys map of given db. */ -void slotToKeyFlush(redisDb *db) { - memset(db->slots_to_keys, 0, - sizeof(clusterSlotToKeyMapping)); -} - -/* Free slots-keys map of given db. */ -void slotToKeyDestroy(redisDb *db) { - zfree(db->slots_to_keys); - db->slots_to_keys = NULL; -} - /* Remove all the keys in the specified hash slot. * The number of removed items is returned. */ unsigned int delKeysInSlot(unsigned int hashslot) { unsigned int j = 0; - dictEntry *de = (*server.db->slots_to_keys).by_slot[hashslot].head; - while (de != NULL) { + dictIterator *iter = NULL; + dictEntry *de = NULL; + iter = dictGetSafeIterator(server.db->dict[hashslot]); + while((de = dictNext(iter)) != NULL) { sds sdskey = dictGetKey(de); - de = dictEntryNextInSlot(de); robj *key = createStringObject(sdskey, sdslen(sdskey)); dbDelete(&server.db[0], key); propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del); @@ -7646,12 +7564,13 @@ unsigned int delKeysInSlot(unsigned int hashslot) { j++; server.dirty++; } + dictReleaseIterator(iter); return j; } -unsigned int countKeysInSlot(unsigned int hashslot) { - return (*server.db->slots_to_keys).by_slot[hashslot].count; +unsigned int countKeysInSlot(unsigned int slot) { + return dictSize(server.db->dict[slot]); } /* ----------------------------------------------------------------------------- diff --git a/src/cluster.h b/src/cluster.h index 21c9c4499..210052ae3 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -5,7 +5,9 @@ * Redis cluster data structures, defines, exported API. *----------------------------------------------------------------------------*/ -#define CLUSTER_SLOTS 16384 +#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */ +#define CLUSTER_SLOTS (1< #include +/* Structure for DB iterator that allows iterating across multiple slot specific dictionaries in cluster mode. */ +struct dbIterator { + redisDb *db; + int slot; + int next_slot; + dictIterator di; + dbKeyType keyType; +}; + /*----------------------------------------------------------------------------- * C-level DB API *----------------------------------------------------------------------------*/ @@ -47,8 +56,73 @@ int expireIfNeeded(redisDb *db, robj *key, int flags); int keyIsExpired(redisDb *db, robj *key); +void cumulativeKeyCountAdd(redisDb *db, int idx, long delta, dbKeyType keyType); static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEntry *de); +dict *dbGetDictFromIterator(dbIterator *dbit) { + if (dbit->keyType == DB_MAIN) + return dbit->db->dict[dbit->slot]; + else if (dbit->keyType == DB_EXPIRES) + return dbit->db->expires[dbit->slot]; + else + serverAssert(0); +} + +/* Returns next dictionary from the iterator, or NULL if iteration is complete. */ +dict *dbIteratorNextDict(dbIterator *dbit) { + if (dbit->next_slot == -1) return NULL; + dbit->slot = dbit->next_slot; + dbit->next_slot = dbGetNextNonEmptySlot(dbit->db, dbit->slot, dbit->keyType); + return dbGetDictFromIterator(dbit); +} + +int dbIteratorGetCurrentSlot(dbIterator *dbit) { + serverAssert(dbit->slot >= 0 && dbit->slot < CLUSTER_SLOTS); + return dbit->slot; +} + +/* Returns next entry from the multi slot db. */ +dictEntry *dbIteratorNext(dbIterator *dbit) { + dictEntry *de = dbit->di.d ? dictNext(&dbit->di) : NULL; + if (!de) { /* No current dict or reached the end of the dictionary. */ + dict *d = dbIteratorNextDict(dbit); + if (!d) return NULL; + dictInitSafeIterator(&dbit->di, d); + de = dictNext(&dbit->di); + } + return de; +} + +/* Returns DB iterator that can be used to iterate through sub-dictionaries. + * Primary database contains only one dictionary when node runs without cluster mode, + * or 16k dictionaries (one per slot) when node runs with cluster mode enabled. */ +dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType) { + dbIterator *dbit = zmalloc(sizeof(*dbit)); + dbit->db = db; + dbit->slot = -1; + dbit->keyType = keyType; + dbit->next_slot = findSlotByKeyIndex(dbit->db, 1, dbit->keyType); /* Finds first non-empty slot. */ + dictInitSafeIterator(&dbit->di, NULL); + return dbit; +} + +dbIterator *dbIteratorInitFromSlot(redisDb *db, dbKeyType keyType, int slot) { + dbIterator *dbit = zmalloc(sizeof(*dbit)); + dbit->db = db; + dbit->slot = slot; + dbit->keyType = keyType; + dbit->next_slot = dbGetNextNonEmptySlot(dbit->db, dbit->slot, dbit->keyType); + dictInitSafeIterator(&dbit->di, NULL); + return dbit; +} + +/* Returns next non-empty slot strictly after given one, or -1 if provided slot is the last one. */ +int dbGetNextNonEmptySlot(redisDb *db, int slot, dbKeyType keyType) { + unsigned long long next_key = cumulativeKeyCountRead(db, slot, keyType) + 1; + return next_key <= dbSize(db, keyType) ? findSlotByKeyIndex(db, next_key, keyType) : -1; +} + + /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. * Then logarithmically increment the counter, and update the access time. */ @@ -86,7 +160,7 @@ void updateLFU(robj *val) { * expired on replicas even if the master is lagging expiring our key via DELs * in the replication link. */ robj *lookupKey(redisDb *db, robj *key, int flags) { - dictEntry *de = dictFind(db->dict,key->ptr); + dictEntry *de = dbFind(db, key->ptr, DB_MAIN); robj *val = NULL; if (de) { val = dictGetVal(de); @@ -192,17 +266,20 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { * if the key already exists, otherwise, it can fall back to dbOverwrite. */ static void dbAddInternal(redisDb *db, robj *key, robj *val, int update_if_existing) { dictEntry *existing; - dictEntry *de = dictAddRaw(db->dict, key->ptr, &existing); + int slot = getKeySlot(key->ptr); + dict *d = db->dict[slot]; + dictEntry *de = dictAddRaw(d, key->ptr, &existing); if (update_if_existing && existing) { dbSetValue(db, key, val, 1, existing); return; } serverAssertWithInfo(NULL, key, de != NULL); - dictSetKey(db->dict, de, sdsdup(key->ptr)); + dictSetKey(d, de, sdsdup(key->ptr)); initObjectLRUOrLFU(val); - dictSetVal(db->dict, de, val); + dictSetVal(d, de, val); + db->sub_dict[DB_MAIN].key_count++; + cumulativeKeyCountAdd(db, slot, 1, DB_MAIN); signalKeyAsReady(db, key, val->type); - if (server.cluster_enabled) slotToKeyAddEntry(de, db); notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id); } @@ -210,6 +287,28 @@ void dbAdd(redisDb *db, robj *key, robj *val) { dbAddInternal(db, key, val, 0); } +/* Returns key's hash slot when cluster mode is enabled, or 0 when disabled. + * The only difference between this function and getKeySlot, is that it's not using cached key slot from the current_client + * and always calculates CRC hash. + * This is useful when slot needs to be calculated for a key that user didn't request for, such as in case of eviction. */ +int calculateKeySlot(sds key) { + return server.cluster_enabled ? keyHashSlot(key, (int) sdslen(key)) : 0; +} + +/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/ +int getKeySlot(sds key) { + /* This is performance optimization that uses pre-set slot id from the current command, + * in order to avoid calculation of the key hash. + * This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set. + * It only gets set during the execution of command under `call` method. Other flows requesting + * the key slot would fallback to calculateKeySlot. + */ + if (server.current_client && server.current_client->slot >= 0 && server.current_client->flags & CLIENT_EXECUTING_COMMAND) { + return server.current_client->slot; + } + return calculateKeySlot(key); +} + /* This is a special version of dbAdd() that is used only when loading * keys from the RDB file: the key is passed as an SDS string that is * retained by the function (and not freed by the caller). @@ -222,11 +321,14 @@ void dbAdd(redisDb *db, robj *key, robj *val) { * ownership of the SDS string, otherwise 0 is returned, and is up to the * caller to free the SDS string. */ int dbAddRDBLoad(redisDb *db, sds key, robj *val) { - dictEntry *de = dictAddRaw(db->dict, key, NULL); + int slot = getKeySlot(key); + dict *d = db->dict[slot]; + dictEntry *de = dictAddRaw(d, key, NULL); if (de == NULL) return 0; initObjectLRUOrLFU(val); - dictSetVal(db->dict, de, val); - if (server.cluster_enabled) slotToKeyAddEntry(de, db); + dictSetVal(d, de, val); + db->sub_dict[DB_MAIN].key_count++; + cumulativeKeyCountAdd(db, slot, 1, DB_MAIN); return 1; } @@ -243,7 +345,7 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) { * * The program is aborted if the key was not already present. */ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEntry *de) { - if (!de) de = dictFind(db->dict,key->ptr); + if (!de) de = dbFind(db, key->ptr, DB_MAIN); serverAssertWithInfo(NULL,key,de != NULL); robj *old = dictGetVal(de); @@ -263,13 +365,14 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEnt /* Because of RM_StringDMA, old may be changed, so we need get old again */ old = dictGetVal(de); } - dictSetVal(db->dict, de, val); + dict *d = db->dict[getKeySlot(key->ptr)]; + dictSetVal(d, de, val); if (server.lazyfree_lazy_server_del) { freeObjAsync(key,old,db->id); } else { /* This is just decrRefCount(old); */ - db->dict->type->valDestructor(db->dict, old); + d->type->valDestructor(d, old); } } @@ -321,18 +424,18 @@ void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) { robj *dbRandomKey(redisDb *db) { dictEntry *de; int maxtries = 100; - int allvolatile = dictSize(db->dict) == dictSize(db->expires); + int allvolatile = dbSize(db, DB_MAIN) == dbSize(db, DB_EXPIRES); while(1) { sds key; robj *keyobj; - - de = dictGetFairRandomKey(db->dict); + int randomSlot = getFairRandomSlot(db, DB_MAIN); + de = dictGetFairRandomKey(db->dict[randomSlot]); if (de == NULL) return NULL; key = dictGetKey(de); keyobj = createStringObject(key,sdslen(key)); - if (dictFind(db->expires,key)) { + if (dbFind(db, key, DB_EXPIRES)) { if (allvolatile && server.masterhost && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, * it could happen that all the keys are already logically @@ -353,11 +456,96 @@ robj *dbRandomKey(redisDb *db) { } } +/* Updates binary index tree (also known as Fenwick tree), increasing key count for a given slot. + * You can read more about this data structure here https://en.wikipedia.org/wiki/Fenwick_tree + * Time complexity is O(log(CLUSTER_SLOTS)). */ +void cumulativeKeyCountAdd(redisDb *db, int slot, long delta, dbKeyType keyType) { + if (!server.cluster_enabled) return; + int idx = slot + 1; /* Unlike slots, BIT is 1-based, so we need to add 1. */ + while (idx <= CLUSTER_SLOTS) { + if (delta < 0) { + serverAssert(db->sub_dict[keyType].slot_size_index[idx] >= (unsigned long long)labs(delta)); + } + db->sub_dict[keyType].slot_size_index[idx] += delta; + idx += (idx & -idx); + } +} + +/* Returns total (cumulative) number of keys up until given slot (inclusive). + * Time complexity is O(log(CLUSTER_SLOTS)). */ +unsigned long long cumulativeKeyCountRead(redisDb *db, int slot, dbKeyType keyType) { + if (!server.cluster_enabled) { + serverAssert(slot == 0); + return dbSize(db, keyType); + } + int idx = slot + 1; + unsigned long long sum = 0; + while (idx > 0) { + sum += db->sub_dict[keyType].slot_size_index[idx]; + idx -= (idx & -idx); + } + return sum; +} + +/* Returns fair random slot, probability of each slot being returned is proportional to the number of elements that slot dictionary holds. + * Implementation uses binary search on top of binary index tree. + * This function guarantees that it returns a slot whose dict is non-empty, unless the entire db is empty. + * Time complexity of this function is O(log^2(CLUSTER_SLOTS)). */ +int getFairRandomSlot(redisDb *db, dbKeyType keyType) { + unsigned long target = dbSize(db, keyType) ? (randomULong() % dbSize(db, keyType)) + 1 : 0; + int slot = findSlotByKeyIndex(db, target, keyType); + return slot; +} + +static inline unsigned long dictSizebySlot(redisDb *db, int slot, dbKeyType keyType) { + if (keyType == DB_MAIN) + return dictSize(db->dict[slot]); + else if (keyType == DB_EXPIRES) + return dictSize(db->expires[slot]); + else + serverAssert(0); +} + +/* Finds a slot containing target element in a key space ordered by slot id. + * Consider this example. Slots are represented by brackets and keys by dots: + * #0 #1 #2 #3 #4 + * [..][....][...][.......][.] + * ^ + * target + * + * In this case slot #3 contains key that we are trying to find. + * + * This function is 1 based and the range of the target is [1..dbSize], dbSize inclusive. + * Time complexity of this function is O(log^2(CLUSTER_SLOTS)) + * */ +int findSlotByKeyIndex(redisDb *db, unsigned long target, dbKeyType keyType) { + if (!server.cluster_enabled || dbSize(db, keyType) == 0) return 0; + serverAssert(target <= dbSize(db, keyType)); + int lo = 0, hi = CLUSTER_SLOTS - 1; + /* We use binary search to find a slot, we are allowed to do this, because we have a quick way to find a total number of keys + * up until certain slot, using binary index tree. */ + while (lo <= hi) { + int mid = lo + (hi - lo) / 2; + unsigned long keys_up_to_mid = cumulativeKeyCountRead(db, mid, keyType); /* Total number of keys up until a given slot (inclusive). */ + unsigned long keys_in_mid = dictSizebySlot(db, mid, keyType); + if (target > keys_up_to_mid) { /* Target is to the right from mid. */ + lo = mid + 1; + } else if (target <= keys_up_to_mid - keys_in_mid) { /* Target is to the left from mid. */ + hi = mid - 1; + } else { /* Located target. */ + return mid; + } + } + serverPanic("Unable to find a slot that contains target key."); +} + /* Helper for sync and async delete. */ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { dictEntry **plink; int table; - dictEntry *de = dictTwoPhaseUnlinkFind(db->dict,key->ptr,&plink,&table); + int slot = getKeySlot(key->ptr); + dict *d = db->dict[slot]; + dictEntry *de = dictTwoPhaseUnlinkFind(d,key->ptr,&plink,&table); if (de) { robj *val = dictGetVal(de); /* RM_StringDMA may call dbUnshareStringValue which may free val, so we @@ -373,14 +561,19 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { if (async) { /* Because of dbUnshareStringValue, the val in de may change. */ freeObjAsync(key, dictGetVal(de), db->id); - dictSetVal(db->dict, de, NULL); + dictSetVal(d, de, NULL); } - if (server.cluster_enabled) slotToKeyDelEntry(de, db); - /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ - if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); - dictTwoPhaseUnlinkFree(db->dict,de,plink,table); + if (dictSize(db->expires[slot]) > 0) { + if (dictDelete(db->expires[slot],key->ptr) == DICT_OK) { + cumulativeKeyCountAdd(db, slot, -1, DB_EXPIRES); + db->sub_dict[DB_EXPIRES].key_count--; + } + } + dictTwoPhaseUnlinkFree(d,de,plink,table); + cumulativeKeyCountAdd(db, slot, -1, DB_MAIN); + db->sub_dict[DB_MAIN].key_count--; return 1; } else { return 0; @@ -462,16 +655,26 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, } for (int j = startdb; j <= enddb; j++) { - removed += dictSize(dbarray[j].dict); + removed += dbSize(&dbarray[j], DB_MAIN); if (async) { emptyDbAsync(&dbarray[j]); } else { - dictEmpty(dbarray[j].dict,callback); - dictEmpty(dbarray[j].expires,callback); + for (int k = 0; k < dbarray[j].dict_count; k++) { + dictEmpty(dbarray[j].dict[k],callback); + dictEmpty(dbarray[j].expires[k],callback); + } } /* Because all keys of database are removed, reset average ttl. */ dbarray[j].avg_ttl = 0; dbarray[j].expires_cursor = 0; + for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { + dbarray[j].sub_dict[subdict].key_count = 0; + dbarray[j].sub_dict[subdict].resize_cursor = 0; + if (server.cluster_enabled) { + unsigned long long *slot_size_index = dbarray[j].sub_dict[subdict].slot_size_index; + memset(slot_size_index, 0, sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)); + } + } } return removed; @@ -516,11 +719,6 @@ long long emptyData(int dbnum, int flags, void(callback)(dict*)) { /* Empty redis database structure. */ removed = emptyDbStructure(server.db, dbnum, async, callback); - /* Flush slots to keys map if enable cluster, we can flush entire - * slots to keys map whatever dbnum because only support one DB - * in cluster mode. */ - if (server.cluster_enabled) slotToKeyFlush(server.db); - if (dbnum == -1) flushSlaveKeysWithExpireList(); if (with_functions) { @@ -541,14 +739,12 @@ long long emptyData(int dbnum, int flags, void(callback)(dict*)) { redisDb *initTempDb(void) { redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum); for (int i=0; iargv[1]->ptr; int plen = sdslen(pattern), allkeys; - unsigned long numkeys = 0; + long numkeys = 0; void *replylen = addReplyDeferredLen(c); - - di = dictGetSafeIterator(c->db->dict); + dbIterator *dbit = dbIteratorInit(c->db, DB_MAIN); allkeys = (pattern[0] == '*' && plen == 1); robj keyobj; - while((de = dictNext(di)) != NULL) { + while ((de = dbIteratorNext(dbit)) != NULL) { sds key = dictGetKey(de); if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { @@ -806,7 +1004,7 @@ void keysCommand(client *c) { if (c->flags & CLIENT_CLOSE_ASAP) break; } - dictReleaseIterator(di); + zfree(dbit); setDeferredArrayLen(c,replylen,numkeys); } @@ -886,15 +1084,8 @@ void scanCallback(void *privdata, const dictEntry *de) { * if the cursor is valid, store it as unsigned integer into *cursor and * returns C_OK. Otherwise return C_ERR and send an error to the * client. */ -int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) { - char *eptr; - - /* Use strtoul() because we need an *unsigned* long, so - * getLongLongFromObject() does not cover the whole cursor space. */ - errno = 0; - *cursor = strtoul(o->ptr, &eptr, 10); - if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE) - { +int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor) { + if (!string2ull(o->ptr, cursor)) { addReplyError(c, "invalid cursor"); return C_ERR; } @@ -952,7 +1143,7 @@ char *getObjectTypeName(robj *o) { * * In the case of a Hash object the function returns both the field and value * of every element on the Hash. */ -void scanGenericCommand(client *c, robj *o, unsigned long cursor) { +void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { int i, j; listNode *node; long count = 10; @@ -1022,7 +1213,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { /* Handle the case of a hash table. */ ht = NULL; if (o == NULL) { - ht = c->db->dict; + ht = NULL; } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) { ht = o->ptr; } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { @@ -1045,7 +1236,8 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { listSetFreeMethod(keys, (void (*)(void*))sdsfree); } - if (ht) { + /* For main dictionary scan or data structure using hashtable. */ + if (!o || ht) { /* We set the max number of iterations to ten times the specified * COUNT, so if the hash table is in a pathological state (very * sparsely populated) we avoid to block too much time at the cost @@ -1071,7 +1263,13 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { .sampled = 0, }; do { - cursor = dictScan(ht, cursor, scanCallback, &data); + /* In cluster mode there is a separate dictionary for each slot. + * If cursor is empty, we should try exploring next non-empty slot. */ + if (o == NULL) { + cursor = dbScan(c->db, DB_MAIN, cursor, scanCallback, NULL, &data); + } else { + cursor = dictScan(ht, cursor, scanCallback, &data); + } } while (cursor && maxiterations-- && data.sampled < count); } else if (o->type == OBJ_SET) { char *str; @@ -1158,15 +1356,111 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { listRelease(keys); } +void addSlotIdToCursor(int slot, unsigned long long *cursor) { + if (!server.cluster_enabled) return; + /* Slot id can be -1 when iteration is over and there are no more slots to visit. */ + if (slot < 0) return; + *cursor = (*cursor << CLUSTER_SLOT_MASK_BITS) | slot; +} + +int getAndClearSlotIdFromCursor(unsigned long long *cursor) { + if (!server.cluster_enabled) return 0; + int slot = (int) (*cursor & CLUSTER_SLOT_MASK); + *cursor = *cursor >> CLUSTER_SLOT_MASK_BITS; + return slot; +} + /* The SCAN command completely relies on scanGenericCommand. */ void scanCommand(client *c) { - unsigned long cursor; + unsigned long long cursor; if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return; scanGenericCommand(c,NULL,cursor); } void dbsizeCommand(client *c) { - addReplyLongLong(c,dictSize(c->db->dict)); + redisDb *db = c->db; + unsigned long long int size = dbSize(db, DB_MAIN); + addReplyLongLong(c, size); +} + +unsigned long long int dbSize(redisDb *db, dbKeyType keyType) { + return db->sub_dict[keyType].key_count; +} + +/* This method proivdes the cumulative sum of all the dictionary buckets + * across dictionaries in a database. */ +unsigned long dbBuckets(redisDb *db, dbKeyType keyType) { + unsigned long buckets = 0; + dict *d; + dbIterator *dbit = dbIteratorInit(db, keyType); + while ((d = dbIteratorNextDict(dbit))) { + buckets += dictBuckets(d); + } + zfree(dbit); + return buckets; +} + +size_t dbMemUsage(redisDb *db, dbKeyType keyType) { + size_t mem = 0; + unsigned long long keys_count = dbSize(db, keyType); + mem += keys_count * dictEntryMemUsage() + + dbBuckets(db, keyType) * sizeof(dictEntry*) + + db->dict_count * sizeof(dict); + if (keyType == DB_MAIN) { + mem+=keys_count * sizeof(robj); + } + return mem; +} + +dictEntry *dbFind(redisDb *db, void *key, dbKeyType keyType){ + int slot = getKeySlot(key); + if (keyType == DB_MAIN) + return dictFind(db->dict[slot], key); + else if (keyType == DB_EXPIRES) + return dictFind(db->expires[slot], key); + else + serverAssert(0); +} + +/* + * This method is used to iterate over the elements of the entire database specifically across slots. + * It's a three pronged approach. + * + * 1. It uses the provided cursor `v` to retrieve the slot from it. + * 2. If the dictionary is in a valid state checked through the provided callback `dictScanValidFunction`, + * it performs a dictScan over the appropriate `keyType` dictionary of `db`. + * 3. If the slot is entirely scanned i.e. the cursor has reached 0, the next non empty slot is discovered. + * The slot information is embedded into the cursor and returned. + */ +unsigned long long dbScan(redisDb *db, dbKeyType keyType, unsigned long long v, dictScanFunction *fn, int (dictScanValidFunction)(dict *d), void *privdata) { + dict *d; + unsigned long long cursor = 0; + /* During main dictionary traversal in cluster mode, 48 lower bits in the cursor are used for positioning in the HT. + * Following 14 bits are used for the slot number, ranging from 0 to 2^14-1. + * Slot is always 0 at the start of iteration and can be incremented only in cluster mode. */ + int slot = getAndClearSlotIdFromCursor(&v); + if (keyType == DB_MAIN) + d = db->dict[slot]; + else if (keyType == DB_EXPIRES) + d = db->expires[slot]; + else + serverAssert(0); + + int is_dict_valid = (dictScanValidFunction == NULL || dictScanValidFunction(d) == C_OK); + if (is_dict_valid) { + cursor = dictScan(d, v, fn, privdata); + } else { + serverLog(LL_DEBUG, "Slot [%d] not valid for scanning, skipping.", slot); + } + /* scanning done for the current dictionary or if the scanning wasn't possible, move to the next slot. */ + if (cursor == 0 || !is_dict_valid) { + slot = dbGetNextNonEmptySlot(db, slot, keyType); + } + if (slot == -1) { + return 0; + } + addSlotIdToCursor(slot, &cursor); + return cursor; } void lastsaveCommand(client *c) { @@ -1463,7 +1757,7 @@ void scanDatabaseForReadyKeys(redisDb *db) { dictIterator *di = dictGetSafeIterator(db->blocking_keys); while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); - dictEntry *kde = dictFind(db->dict,key->ptr); + dictEntry *kde = dbFind(db, key->ptr, DB_MAIN); if (kde) { robj *value = dictGetVal(kde); signalKeyAsReady(db, key, value->type); @@ -1483,7 +1777,7 @@ void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) { int existed = 0, exists = 0; int original_type = -1, curr_type = -1; - dictEntry *kde = dictFind(emptied->dict, key->ptr); + dictEntry *kde = dbFind(emptied, key->ptr, DB_MAIN); if (kde) { robj *value = dictGetVal(kde); original_type = value->type; @@ -1491,7 +1785,7 @@ void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) { } if (replaced_with) { - dictEntry *kde = dictFind(replaced_with->dict, key->ptr); + kde = dbFind(replaced_with, key->ptr, DB_MAIN); if (kde) { robj *value = dictGetVal(kde); curr_type = value->type; @@ -1536,11 +1830,23 @@ int dbSwapDatabases(int id1, int id2) { db1->expires = db2->expires; db1->avg_ttl = db2->avg_ttl; db1->expires_cursor = db2->expires_cursor; + db1->dict_count = db2->dict_count; + for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { + db1->sub_dict[subdict].key_count = db2->sub_dict[subdict].key_count; + db1->sub_dict[subdict].resize_cursor = db2->sub_dict[subdict].resize_cursor; + db1->sub_dict[subdict].slot_size_index = db2->sub_dict[subdict].slot_size_index; + } db2->dict = aux.dict; db2->expires = aux.expires; db2->avg_ttl = aux.avg_ttl; db2->expires_cursor = aux.expires_cursor; + db2->dict_count = aux.dict_count; + for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { + db2->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count; + db2->sub_dict[subdict].resize_cursor = aux.sub_dict[subdict].resize_cursor; + db2->sub_dict[subdict].slot_size_index = aux.sub_dict[subdict].slot_size_index; + } /* Now we need to handle clients blocked on lists: as an effect * of swapping the two DBs, a client that was waiting for list @@ -1560,13 +1866,6 @@ int dbSwapDatabases(int id1, int id2) { * database (temp) as the main (active) database, the actual freeing of old database * (which will now be placed in the temp one) is done later. */ void swapMainDbWithTempDb(redisDb *tempDb) { - if (server.cluster_enabled) { - /* Swap slots_to_keys from tempdb just loaded with main db slots_to_keys. */ - clusterSlotToKeyMapping *aux = server.db->slots_to_keys; - server.db->slots_to_keys = tempDb->slots_to_keys; - tempDb->slots_to_keys = aux; - } - for (int i=0; iexpires = newdb->expires; activedb->avg_ttl = newdb->avg_ttl; activedb->expires_cursor = newdb->expires_cursor; + activedb->dict_count = newdb->dict_count; + for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { + activedb->sub_dict[subdict].key_count = newdb->sub_dict[subdict].key_count; + activedb->sub_dict[subdict].resize_cursor = newdb->sub_dict[subdict].resize_cursor; + activedb->sub_dict[subdict].slot_size_index = newdb->sub_dict[subdict].slot_size_index; + } newdb->dict = aux.dict; newdb->expires = aux.expires; newdb->avg_ttl = aux.avg_ttl; newdb->expires_cursor = aux.expires_cursor; - + newdb->dict_count = aux.dict_count; + for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { + newdb->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count; + newdb->sub_dict[subdict].resize_cursor = aux.sub_dict[subdict].resize_cursor; + newdb->sub_dict[subdict].slot_size_index = aux.sub_dict[subdict].slot_size_index; + } /* Now we need to handle clients blocked on lists: as an effect * of swapping the two DBs, a client that was waiting for list * X in a given DB, may now actually be unblocked if X happens @@ -1643,7 +1953,13 @@ void swapdbCommand(client *c) { *----------------------------------------------------------------------------*/ int removeExpire(redisDb *db, robj *key) { - return dictDelete(db->expires,key->ptr) == DICT_OK; + if (dictDelete(db->expires[(getKeySlot(key->ptr))],key->ptr) == DICT_OK) { + db->sub_dict[DB_EXPIRES].key_count--; + cumulativeKeyCountAdd(db, getKeySlot(key->ptr), -1, DB_EXPIRES); + return 1; + } else { + return 0; + } } /* Set an expire to the specified key. If the expire is set in the context @@ -1651,13 +1967,20 @@ int removeExpire(redisDb *db, robj *key) { * to NULL. The 'when' parameter is the absolute unix time in milliseconds * after which the key will no longer be considered valid. */ void setExpire(client *c, redisDb *db, robj *key, long long when) { - dictEntry *kde, *de; + dictEntry *kde, *de, *existing; /* Reuse the sds from the main dict in the expire dict */ - kde = dictFind(db->dict,key->ptr); + kde = dbFind(db, key->ptr, DB_MAIN); serverAssertWithInfo(NULL,key,kde != NULL); - de = dictAddOrFind(db->expires,dictGetKey(kde)); - dictSetSignedIntegerVal(de,when); + int slot = getKeySlot(key->ptr); + de = dictAddRaw(db->expires[slot], dictGetKey(kde), &existing); + if (existing) { + dictSetSignedIntegerVal(existing, when); + } else { + dictSetSignedIntegerVal(de, when); + db->sub_dict[DB_EXPIRES].key_count++; + cumulativeKeyCountAdd(db, slot, 1, DB_EXPIRES); + } int writable_slave = server.masterhost && server.repl_slave_ro == 0; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) @@ -1670,8 +1993,8 @@ long long getExpire(redisDb *db, robj *key) { dictEntry *de; /* No expire? return ASAP */ - if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key->ptr)) == NULL) return -1; + if (dictSize(db->expires[getKeySlot(key->ptr)]) == 0 || + (de = dbFind(db,key->ptr, DB_EXPIRES)) == NULL) return -1; return dictGetSignedIntegerVal(de); } @@ -1817,6 +2140,54 @@ int expireIfNeeded(redisDb *db, robj *key, int flags) { return 1; } +/* + * This functions increases size of the main/expires db to match desired number. + * In cluster mode resizes all individual dictionaries for slots that this node owns. + * + * Based on the parameter `try_expand`, appropriate dict expand API is invoked. + * if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`. + * The return code is either `DICT_OK`/`DICT_ERR` for both the API(s). + * `DICT_OK` response is for successful expansion. However ,`DICT_ERR` response signifies failure in allocation in + * `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed. + */ +int dbExpand(const redisDb *db, uint64_t db_size, dbKeyType keyType, int try_expand) { + dict *d; + if (server.cluster_enabled) { + for (int i = 0; i < CLUSTER_SLOTS; i++) { + if (clusterNodeGetSlotBit(server.cluster->myself, i)) { + /* We don't know exact number of keys that would fall into each slot, but we can approximate it, assuming even distribution. */ + if (keyType == DB_MAIN) { + d = db->dict[i]; + } else { + d = db->expires[i]; + } + int result = try_expand ? dictTryExpand(d, db_size) : dictExpand(d, db_size); + if (try_expand && result == DICT_ERR) { + serverLog(LL_WARNING, "Dict expansion failed for type :%s slot: %d", + keyType == DB_MAIN ? "main" : "expires", i); + return C_ERR; + } else if (result == DICT_ERR) { + serverLog(LL_DEBUG, "Dict expansion skipped for type :%s slot: %d", + keyType == DB_MAIN ? "main" : "expires", i); + } + } + } + } else { + if (keyType == DB_MAIN) { + d = db->dict[0]; + } else { + d = db->expires[0]; + } + int result = try_expand ? dictTryExpand(d, db_size) : dictExpand(d, db_size); + if (try_expand && result == DICT_ERR) { + serverLog(LL_WARNING, "Dict expansion failed for db type: %s", + keyType == DB_MAIN ? "main" : "expires"); + return C_ERR; + } + } + return C_OK; +} + /* ----------------------------------------------------------------------------- * API to get key arguments from commands * ---------------------------------------------------------------------------*/ @@ -2556,3 +2927,42 @@ int bitfieldGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResu } return 1; } + +void dbGetStats(char *buf, size_t bufsize, redisDb *db, int full, dbKeyType keyType) { + size_t l; + char *orig_buf = buf; + size_t orig_bufsize = bufsize; + dictStats *mainHtStats = NULL; + dictStats *rehashHtStats = NULL; + dict *d; + dbIterator *dbit = dbIteratorInit(db, keyType); + while ((d = dbIteratorNextDict(dbit))) { + dictStats *stats = dictGetStatsHt(d, 0, full); + if (!mainHtStats) { + mainHtStats = stats; + } else { + dictCombineStats(stats, mainHtStats); + dictFreeStats(stats); + } + if (dictIsRehashing(d)) { + stats = dictGetStatsHt(d, 1, full); + if (!rehashHtStats) { + rehashHtStats = stats; + } else { + dictCombineStats(stats, rehashHtStats); + dictFreeStats(stats); + } + } + } + zfree(dbit); + l = dictGetStatsMsg(buf, bufsize, mainHtStats, full); + dictFreeStats(mainHtStats); + buf += l; + bufsize -= l; + if (rehashHtStats && bufsize > 0) { + dictGetStatsMsg(buf, bufsize, rehashHtStats, full); + dictFreeStats(rehashHtStats); + } + /* Make sure there is a NULL term at the end. */ + if (orig_bufsize) orig_buf[orig_bufsize - 1] = '\0'; +} diff --git a/src/debug.c b/src/debug.c index 7948c0c9f..8bbb59b04 100644 --- a/src/debug.c +++ b/src/debug.c @@ -76,6 +76,7 @@ void bugReportStart(void); void printCrashReport(void); void bugReportEnd(int killViaSignal, int sig); void logStackTrace(void *eip, int uplevel); +void dbGetStats(char *buf, size_t bufsize, redisDb *db, int full, dbKeyType keyType); void sigalrmSignalHandler(int sig, siginfo_t *info, void *secret); /* ================================= Debugging ============================== */ @@ -281,7 +282,6 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) * a different digest. */ void computeDatasetDigest(unsigned char *final) { unsigned char digest[20]; - dictIterator *di = NULL; dictEntry *de; int j; uint32_t aux; @@ -290,17 +290,15 @@ void computeDatasetDigest(unsigned char *final) { for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; + if (dbSize(db, DB_MAIN) == 0) continue; + dbIterator *dbit = dbIteratorInit(db, DB_MAIN); - if (dictSize(db->dict) == 0) continue; - di = dictGetSafeIterator(db->dict); - - /* hash the DB id, so the same dataset moved in a different - * DB will lead to a different digest */ + /* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */ aux = htonl(j); mixDigest(final,&aux,sizeof(aux)); /* Iterate this DB writing every entry */ - while((de = dictNext(di)) != NULL) { + while((de = dbIteratorNext(dbit)) != NULL) { sds key; robj *keyobj, *o; @@ -317,7 +315,7 @@ void computeDatasetDigest(unsigned char *final) { xorDigest(final,digest,20); decrRefCount(keyobj); } - dictReleaseIterator(di); + zfree(dbit); } } @@ -610,7 +608,7 @@ NULL robj *val; char *strenc; - if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) { + if ((de = dbFind(c->db, c->argv[2]->ptr, DB_MAIN)) == NULL) { addReplyErrorObject(c,shared.nokeyerr); return; } @@ -662,7 +660,7 @@ NULL robj *val; sds key; - if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) { + if ((de = dbFind(c->db, c->argv[2]->ptr, DB_MAIN)) == NULL) { addReplyErrorObject(c,shared.nokeyerr); return; } @@ -718,7 +716,7 @@ NULL if (getPositiveLongFromObjectOrReply(c, c->argv[2], &keys, NULL) != C_OK) return; - if (dictTryExpand(c->db->dict, keys) != DICT_OK) { + if (dbExpand(c->db, keys, DB_MAIN, 1) == C_ERR) { addReplyError(c, "OOM in dictTryExpand"); return; } @@ -766,7 +764,7 @@ NULL /* We don't use lookupKey because a debug command should * work on logically expired keys */ dictEntry *de; - robj *o = ((de = dictFind(c->db->dict,c->argv[j]->ptr)) == NULL) ? NULL : dictGetVal(de); + robj *o = ((de = dbFind(c->db, c->argv[j]->ptr, DB_MAIN)) == NULL) ? NULL : dictGetVal(de); if (o) xorObjectDigest(c->db,c->argv[j],digest,o); sds d = sdsempty(); @@ -910,11 +908,11 @@ NULL full = 1; stats = sdscatprintf(stats,"[Dictionary HT]\n"); - dictGetStats(buf,sizeof(buf),server.db[dbid].dict,full); + dbGetStats(buf, sizeof(buf), &server.db[dbid], full, DB_MAIN); stats = sdscat(stats,buf); stats = sdscatprintf(stats,"[Expires HT]\n"); - dictGetStats(buf,sizeof(buf),server.db[dbid].expires,full); + dbGetStats(buf, sizeof(buf), &server.db[dbid], full, DB_EXPIRES); stats = sdscat(stats,buf); addReplyVerbatim(c,stats,sdslen(stats),"txt"); @@ -2046,7 +2044,7 @@ void logCurrentClient(client *cc, const char *title) { dictEntry *de; key = getDecodedObject(cc->argv[1]); - de = dictFind(cc->db->dict, key->ptr); + de = dbFind(cc->db, key->ptr, DB_MAIN); if (de) { val = dictGetVal(de); serverLog(LL_WARNING,"key '%s' found in DB containing the following object:", (char*)key->ptr); diff --git a/src/defrag.c b/src/defrag.c index af403398a..92f335c2e 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -40,6 +40,11 @@ #ifdef HAVE_DEFRAG +typedef struct defragCtx { + redisDb *db; + int slot; +} defragCtx; + /* this method was added to jemalloc in order to help us understand which * pointers are worthwhile moving and which aren't */ int je_get_defrag_hint(void* ptr); @@ -669,30 +674,31 @@ void defragModule(redisDb *db, dictEntry *kde) { /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. Returns a stat of how many pointers were * moved. */ -void defragKey(redisDb *db, dictEntry *de) { +void defragKey(defragCtx *ctx, dictEntry *de) { sds keysds = dictGetKey(de); robj *newob, *ob; unsigned char *newzl; sds newsds; - + redisDb *db = ctx->db; + int slot = ctx->slot; /* Try to defrag the key name. */ newsds = activeDefragSds(keysds); if (newsds) { - dictSetKey(db->dict, de, newsds); - if (dictSize(db->expires)) { + dictSetKey(db->dict[slot], de, newsds); + if (dbSize(db, DB_EXPIRES)) { /* We can't search in db->expires for that key after we've released * the pointer it holds, since it won't be able to do the string * compare, but we can find the entry using key hash and pointer. */ - uint64_t hash = dictGetHash(db->dict, newsds); - dictEntry *expire_de = dictFindEntryByPtrAndHash(db->expires, keysds, hash); - if (expire_de) dictSetKey(db->expires, expire_de, newsds); + uint64_t hash = dictGetHash(db->dict[slot], newsds); + dictEntry *expire_de = dictFindEntryByPtrAndHash(db->expires[slot], keysds, hash); + if (expire_de) dictSetKey(db->expires[slot], expire_de, newsds); } } /* Try to defrag robj and / or string value. */ ob = dictGetVal(de); if ((newob = activeDefragStringOb(ob))) { - dictSetVal(db->dict, de, newob); + dictSetVal(db->dict[slot], de, newob); ob = newob; } @@ -749,7 +755,7 @@ void defragKey(redisDb *db, dictEntry *de) { /* Defrag scan callback for the main db dictionary. */ void defragScanCallback(void *privdata, const dictEntry *de) { long long hits_before = server.stat_active_defrag_hits; - defragKey((redisDb*)privdata, (dictEntry*)de); + defragKey((defragCtx*)privdata, (dictEntry*)de); if (server.stat_active_defrag_hits != hits_before) server.stat_active_defrag_key_hits++; else @@ -820,7 +826,7 @@ static sds defrag_later_current_key = NULL; static unsigned long defrag_later_cursor = 0; /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -int defragLaterStep(redisDb *db, long long endtime) { +int defragLaterStep(redisDb *db, int slot, long long endtime) { unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; unsigned long long prev_scanned = server.stat_active_defrag_scanned; @@ -850,7 +856,7 @@ int defragLaterStep(redisDb *db, long long endtime) { } /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ - dictEntry *de = dictFind(db->dict, defrag_later_current_key); + dictEntry *de = dictFind(db->dict[slot], defrag_later_current_key); key_defragged = server.stat_active_defrag_hits; do { int quit = 0; @@ -918,7 +924,10 @@ void computeDefragCycles(void) { * This works in a similar way to activeExpireCycle, in the sense that * we do incremental work across calls. */ void activeDefragCycle(void) { + static defragCtx ctx; + static int slot = -1; static int current_db = -1; + static int defrag_later_item_in_progress = 0; static unsigned long cursor = 0; static unsigned long expires_cursor = 0; static redisDb *db = NULL; @@ -940,6 +949,9 @@ void activeDefragCycle(void) { defrag_later_cursor = 0; current_db = -1; cursor = 0; + expires_cursor = 0; + slot = -1; + defrag_later_item_in_progress = 0; db = NULL; goto update_metrics; } @@ -967,9 +979,9 @@ void activeDefragCycle(void) { dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc}; do { /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!cursor && !expires_cursor) { + if (!cursor && !expires_cursor && (slot < 0)) { /* finish any leftovers from previous db before moving to the next one */ - if (db && defragLaterStep(db, endtime)) { + if (db && defragLaterStep(db, slot, endtime)) { quit = 1; /* time is up, we didn't finish all the work */ break; /* this will exit the function and we'll continue on the next cycle */ } @@ -989,7 +1001,11 @@ void activeDefragCycle(void) { start_scan = now; current_db = -1; cursor = 0; + expires_cursor = 0; + slot = -1; + defrag_later_item_in_progress = 0; db = NULL; + memset(&ctx, -1, sizeof(ctx)); server.active_defrag_running = 0; computeDefragCycles(); /* if another scan is needed, start it right away */ @@ -1005,32 +1021,47 @@ void activeDefragCycle(void) { db = &server.db[current_db]; cursor = 0; + expires_cursor = 0; + slot = findSlotByKeyIndex(db, 1, DB_MAIN); + defrag_later_item_in_progress = 0; + ctx.db = db; + ctx.slot = slot; } - do { + dict *d = db->dict[slot]; /* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */ - if (defragLaterStep(db, endtime)) { + if (defragLaterStep(db, slot, endtime)) { quit = 1; /* time is up, we didn't finish all the work */ break; /* this will exit the function and we'll continue on the next cycle */ } - /* Scan the keyspace dict unless we're scanning the expire dict. */ - if (!expires_cursor) - cursor = dictScanDefrag(db->dict, cursor, defragScanCallback, - &defragfns, db); - - /* When done scanning the keyspace dict, we scan the expire dict. */ - if (!cursor) - expires_cursor = dictScanDefrag(db->expires, expires_cursor, - scanCallbackCountScanned, - &defragfns, NULL); - + if (!defrag_later_item_in_progress) { + /* Scan the keyspace dict unless we're scanning the expire dict. */ + if (!expires_cursor) + cursor = dictScanDefrag(d, cursor, defragScanCallback, + &defragfns, &ctx); + /* When done scanning the keyspace dict, we scan the expire dict. */ + if (!cursor) + expires_cursor = dictScanDefrag(db->expires[slot], expires_cursor, + scanCallbackCountScanned, + &defragfns, NULL); + } + if (!(cursor || expires_cursor)) { + /* Move to the next slot only if regular and large item scanning has been completed. */ + if (listLength(db->defrag_later) > 0) { + defrag_later_item_in_progress = 1; + continue; + } + slot = dbGetNextNonEmptySlot(db, slot, DB_MAIN); + ctx.slot = slot; + } + /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys * (if we have a lot of pointers in one hash bucket or rehashing), * check if we reached the time limit. * But regardless, don't start a new db in this loop, this is because after * the last db we call defragOtherGlobals, which must be done in one cycle */ - if (!(cursor || expires_cursor) || + if ((!(cursor || expires_cursor) && slot == -1) || ++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64) @@ -1043,7 +1074,7 @@ void activeDefragCycle(void) { prev_defragged = server.stat_active_defrag_hits; prev_scanned = server.stat_active_defrag_scanned; } - } while((cursor || expires_cursor) && !quit); + } while(((cursor || expires_cursor) || slot > 0) && !quit); } while(!quit); latencyEndMonitor(latency); diff --git a/src/dict.c b/src/dict.c index 2eb9aa41f..e5214ae37 100644 --- a/src/dict.c +++ b/src/dict.c @@ -46,6 +46,7 @@ #include "dict.h" #include "zmalloc.h" #include "redisassert.h" +#include "monotonic.h" /* Using dictEnableResize() / dictDisableResize() we make possible to disable * resizing and rehashing of the hash table as needed. This is very important @@ -59,7 +60,6 @@ static dictResizeEnable dict_can_resize = DICT_RESIZE_ENABLE; static unsigned int dict_force_resize_ratio = 5; /* -------------------------- types ----------------------------------------- */ - struct dictEntry { void *key; union { @@ -69,9 +69,6 @@ struct dictEntry { double d; } v; struct dictEntry *next; /* Next entry in the same hash bucket. */ - void *metadata[]; /* An arbitrary number of bytes (starting at a - * pointer-aligned address) of size as returned - * by dictType's dictEntryMetadataBytes(). */ }; typedef struct { @@ -184,16 +181,21 @@ static void _dictReset(dict *d, int htidx) /* Create a new hash table */ dict *dictCreate(dictType *type) { - size_t metasize = type->dictMetadataBytes ? type->dictMetadataBytes() : 0; - dict *d = zmalloc(sizeof(*d) + metasize); - if (metasize) { - memset(dictMetadata(d), 0, metasize); - } - + dict *d = zmalloc(sizeof(*d)); _dictInit(d,type); return d; } +/* Create an array of dictionaries */ +dict **dictCreateMultiple(dictType *type, int count) +{ + dict **d = zmalloc(sizeof(dict*) * count); + for (int i = 0; i < count; i++) { + d[i] = dictCreate(type); + } + return d; +} + /* Initialize the hash table */ int _dictInit(dict *d, dictType *type) { @@ -268,6 +270,7 @@ int _dictExpand(dict *d, unsigned long size, int* malloc_failed) d->ht_used[1] = new_ht_used; d->ht_table[1] = new_ht_table; d->rehashidx = 0; + if (d->type->rehashingStarted) d->type->rehashingStarted(d); return DICT_OK; } @@ -390,15 +393,16 @@ long long timeInMilliseconds(void) { /* Rehash in ms+"delta" milliseconds. The value of "delta" is larger * than 0, and is smaller than 1 in most cases. The exact upper bound * depends on the running time of dictRehash(d,100).*/ -int dictRehashMilliseconds(dict *d, int ms) { +int dictRehashMilliseconds(dict *d, unsigned int ms) { if (d->pauserehash > 0) return 0; - long long start = timeInMilliseconds(); + monotime timer; + elapsedStart(&timer); int rehashes = 0; while(dictRehash(d,100)) { rehashes += 100; - if (timeInMilliseconds()-start > ms) break; + if (elapsedMs(timer) >= ms) break; } return rehashes; } @@ -415,11 +419,6 @@ static void _dictRehashStep(dict *d) { if (d->pauserehash == 0) dictRehash(d,1); } -/* Return a pointer to the metadata section within the dict. */ -void *dictMetadata(dict *d) { - return &d->metadata; -} - /* Add an element to the target hash table */ int dictAdd(dict *d, void *key, void *val) { @@ -472,9 +471,7 @@ dictEntry *dictInsertAtPosition(dict *d, void *key, void *position) { int htidx = dictIsRehashing(d) ? 1 : 0; assert(bucket >= &d->ht_table[htidx][0] && bucket <= &d->ht_table[htidx][DICTHT_SIZE_MASK(d->ht_size_exp[htidx])]); - size_t metasize = dictEntryMetadataSize(d); if (d->type->no_value) { - assert(!metasize); /* Entry metadata + no value not supported. */ if (d->type->keys_are_odd && !*bucket) { /* We can store the key directly in the destination bucket without the * allocated entry. @@ -494,11 +491,8 @@ dictEntry *dictInsertAtPosition(dict *d, void *key, void *position) { * Insert the element in top, with the assumption that in a database * system it is more likely that recently added entries are accessed * more frequently. */ - entry = zmalloc(sizeof(*entry) + metasize); + entry = zmalloc(sizeof(*entry)); assert(entryIsNormal(entry)); /* Check alignment of allocation */ - if (metasize > 0) { - memset(dictEntryMetadata(entry), 0, metasize); - } entry->key = key; entry->next = *bucket; } @@ -788,12 +782,6 @@ double dictIncrDoubleVal(dictEntry *de, double val) { return de->v.d += val; } -/* A pointer to the metadata section within the dict entry. */ -void *dictEntryMetadata(dictEntry *de) { - assert(entryHasValue(de)); - return &de->metadata; -} - void *dictGetKey(const dictEntry *de) { if (entryIsKey(de)) return (void*)de; if (entryIsNoValue(de)) return decodeEntryNoValue(de)->key; @@ -856,7 +844,7 @@ static void dictSetNext(dictEntry *de, dictEntry *next) { * and values. */ size_t dictMemUsage(const dict *d) { return dictSize(d) * sizeof(dictEntry) + - dictSlots(d) * sizeof(dictEntry*); + dictBuckets(d) * sizeof(dictEntry*); } size_t dictEntryMemUsage(void) { @@ -1000,7 +988,7 @@ dictEntry *dictGetRandomKey(dict *d) do { /* We are sure there are no elements in indexes from 0 * to rehashidx-1 */ - h = d->rehashidx + (randomULong() % (dictSlots(d) - d->rehashidx)); + h = d->rehashidx + (randomULong() % (dictBuckets(d) - d->rehashidx)); he = (h >= s0) ? d->ht_table[1][h - s0] : d->ht_table[0][h]; } while(he == NULL); } else { @@ -1132,7 +1120,7 @@ end: /* Reallocate the dictEntry, key and value allocations in a bucket using the * provided allocation functions in order to defrag them. */ -static void dictDefragBucket(dict *d, dictEntry **bucketref, dictDefragFunctions *defragfns) { +static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns) { dictDefragAllocFunction *defragalloc = defragfns->defragAlloc; dictDefragAllocFunction *defragkey = defragfns->defragKey; dictDefragAllocFunction *defragval = defragfns->defragVal; @@ -1159,8 +1147,6 @@ static void dictDefragBucket(dict *d, dictEntry **bucketref, dictDefragFunctions } if (newde) { *bucketref = newde; - if (d->type->afterReplaceEntry) - d->type->afterReplaceEntry(d, newde); } bucketref = dictGetNextRef(*bucketref); } @@ -1323,7 +1309,7 @@ unsigned long dictScanDefrag(dict *d, /* Emit entries at cursor */ if (defragfns) { - dictDefragBucket(d, &d->ht_table[htidx0][v & m0], defragfns); + dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns); } de = d->ht_table[htidx0][v & m0]; while (de) { @@ -1356,7 +1342,7 @@ unsigned long dictScanDefrag(dict *d, /* Emit entries at cursor */ if (defragfns) { - dictDefragBucket(d, &d->ht_table[htidx0][v & m0], defragfns); + dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns); } de = d->ht_table[htidx0][v & m0]; while (de) { @@ -1370,7 +1356,7 @@ unsigned long dictScanDefrag(dict *d, do { /* Emit entries at cursor */ if (defragfns) { - dictDefragBucket(d, &d->ht_table[htidx1][v & m1], defragfns); + dictDefragBucket(&d->ht_table[htidx1][v & m1], defragfns); } de = d->ht_table[htidx1][v & m1]; while (de) { @@ -1519,77 +1505,87 @@ dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) } /* ------------------------------- Debugging ---------------------------------*/ - #define DICT_STATS_VECTLEN 50 -size_t _dictGetStatsHt(char *buf, size_t bufsize, dict *d, int htidx, int full) { - unsigned long i, slots = 0, chainlen, maxchainlen = 0; - unsigned long totchainlen = 0; - unsigned long clvector[DICT_STATS_VECTLEN]; - size_t l = 0; +void dictFreeStats(dictStats *stats) { + zfree(stats->clvector); + zfree(stats); +} - if (d->ht_used[htidx] == 0) { - return snprintf(buf,bufsize, - "Hash table %d stats (%s):\n" - "No stats available for empty dictionaries\n", - htidx, (htidx == 0) ? "main hash table" : "rehashing target"); - } - - if (!full) { - l += snprintf(buf+l,bufsize-l, - "Hash table %d stats (%s):\n" - " table size: %lu\n" - " number of elements: %lu\n", - htidx, (htidx == 0) ? "main hash table" : "rehashing target", - DICTHT_SIZE(d->ht_size_exp[htidx]), d->ht_used[htidx]); - - /* Make sure there is a NULL term at the end. */ - buf[bufsize-1] = '\0'; - /* Unlike snprintf(), return the number of characters actually written. */ - return strlen(buf); +void dictCombineStats(dictStats *from, dictStats *into) { + into->buckets += from->buckets; + into->maxChainLen = (from->maxChainLen > into->maxChainLen) ? from->maxChainLen : into->maxChainLen; + into->totalChainLen += from->totalChainLen; + into->htSize += from->htSize; + into->htUsed += from->htUsed; + for (int i = 0; i < DICT_STATS_VECTLEN; i++) { + into->clvector[i] += from->clvector[i]; } +} +dictStats *dictGetStatsHt(dict *d, int htidx, int full) { + unsigned long *clvector = zcalloc(sizeof(unsigned long) * DICT_STATS_VECTLEN); + dictStats *stats = zcalloc(sizeof(dictStats)); + stats->htidx = htidx; + stats->clvector = clvector; + stats->htSize = DICTHT_SIZE(d->ht_size_exp[htidx]); + stats->htUsed = d->ht_used[htidx]; + if (!full) return stats; /* Compute stats. */ - for (i = 0; i < DICT_STATS_VECTLEN; i++) clvector[i] = 0; - for (i = 0; i < DICTHT_SIZE(d->ht_size_exp[htidx]); i++) { + for (unsigned long i = 0; i < DICTHT_SIZE(d->ht_size_exp[htidx]); i++) { dictEntry *he; if (d->ht_table[htidx][i] == NULL) { clvector[0]++; continue; } - slots++; + stats->buckets++; /* For each hash entry on this slot... */ - chainlen = 0; + unsigned long chainlen = 0; he = d->ht_table[htidx][i]; while(he) { chainlen++; he = dictGetNext(he); } clvector[(chainlen < DICT_STATS_VECTLEN) ? chainlen : (DICT_STATS_VECTLEN-1)]++; - if (chainlen > maxchainlen) maxchainlen = chainlen; - totchainlen += chainlen; + if (chainlen > stats->maxChainLen) stats->maxChainLen = chainlen; + stats->totalChainLen += chainlen; } - /* Generate human readable stats. */ - l += snprintf(buf+l,bufsize-l, - "Hash table %d stats (%s):\n" - " table size: %lu\n" - " number of elements: %lu\n" - " different slots: %lu\n" - " max chain length: %lu\n" - " avg chain length (counted): %.02f\n" - " avg chain length (computed): %.02f\n" - " Chain length distribution:\n", - htidx, (htidx == 0) ? "main hash table" : "rehashing target", - DICTHT_SIZE(d->ht_size_exp[htidx]), d->ht_used[htidx], slots, maxchainlen, - (float)totchainlen/slots, (float)d->ht_used[htidx]/slots); + return stats; +} - for (i = 0; i < DICT_STATS_VECTLEN-1; i++) { - if (clvector[i] == 0) continue; - if (l >= bufsize) break; - l += snprintf(buf+l,bufsize-l, - " %ld: %ld (%.02f%%)\n", - i, clvector[i], ((float)clvector[i]/DICTHT_SIZE(d->ht_size_exp[htidx]))*100); +/* Generates human readable stats. */ +size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full) { + if (stats->htUsed == 0) { + return snprintf(buf,bufsize, + "Hash table %d stats (%s):\n" + "No stats available for empty dictionaries\n", + stats->htidx, (stats->htidx == 0) ? "main hash table" : "rehashing target"); + } + size_t l = 0; + l += snprintf(buf + l, bufsize - l, + "Hash table %d stats (%s):\n" + " table size: %lu\n" + " number of elements: %lu\n", + stats->htidx, (stats->htidx == 0) ? "main hash table" : "rehashing target", + stats->htSize, stats->htUsed); + if (full) { + l += snprintf(buf + l, bufsize - l, + " different slots: %lu\n" + " max chain length: %lu\n" + " avg chain length (counted): %.02f\n" + " avg chain length (computed): %.02f\n" + " Chain length distribution:\n", + stats->buckets, stats->maxChainLen, + (float) stats->totalChainLen / stats->buckets, (float) stats->htUsed / stats->buckets); + + for (unsigned long i = 0; i < DICT_STATS_VECTLEN - 1; i++) { + if (stats->clvector[i] == 0) continue; + if (l >= bufsize) break; + l += snprintf(buf + l, bufsize - l, + " %ld: %ld (%.02f%%)\n", + i, stats->clvector[i], ((float) stats->clvector[i] / stats->htSize) * 100); + } } /* Make sure there is a NULL term at the end. */ @@ -1603,11 +1599,15 @@ void dictGetStats(char *buf, size_t bufsize, dict *d, int full) { char *orig_buf = buf; size_t orig_bufsize = bufsize; - l = _dictGetStatsHt(buf,bufsize,d,0,full); - if (dictIsRehashing(d) && bufsize > l) { - buf += l; - bufsize -= l; - _dictGetStatsHt(buf,bufsize,d,1,full); + dictStats *mainHtStats = dictGetStatsHt(d, 0, full); + l = dictGetStatsMsg(buf, bufsize, mainHtStats, full); + dictFreeStats(mainHtStats); + buf += l; + bufsize -= l; + if (dictIsRehashing(d) && bufsize > 0) { + dictStats *rehashHtStats = dictGetStatsHt(d, 1, full); + dictGetStatsMsg(buf, bufsize, rehashHtStats, full); + dictFreeStats(rehashHtStats); } /* Make sure there is a NULL term at the end. */ orig_buf[orig_bufsize-1] = '\0'; diff --git a/src/dict.h b/src/dict.h index e96cd44eb..c016598fa 100644 --- a/src/dict.h +++ b/src/dict.h @@ -45,7 +45,6 @@ #define DICT_ERR 1 typedef struct dictEntry dictEntry; /* opaque */ - typedef struct dict dict; typedef struct dictType { @@ -56,6 +55,7 @@ typedef struct dictType { void (*keyDestructor)(dict *d, void *key); void (*valDestructor)(dict *d, void *obj); int (*expandAllowed)(size_t moreMem, double usedRatio); + void (*rehashingStarted)(dict *d); /* Flags */ /* The 'no_value' flag, if set, indicates that values are not used, i.e. the * dict is a set. When this flag is set, it's not possible to access the @@ -68,14 +68,6 @@ typedef struct dictType { unsigned int keys_are_odd:1; /* TODO: Add a 'keys_are_even' flag and use a similar optimization if that * flag is set. */ - - /* Allow each dict and dictEntry to carry extra caller-defined metadata. The - * extra memory is initialized to 0 when allocated. */ - size_t (*dictEntryMetadataBytes)(dict *d); - size_t (*dictMetadataBytes)(void); - /* Optional callback called after an entry has been reallocated (due to - * active defrag). Only called if the entry has metadata. */ - void (*afterReplaceEntry)(dict *d, dictEntry *entry); } dictType; #define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1<<(exp)) @@ -92,10 +84,6 @@ struct dict { /* Keep small vars at end for optimal (minimal) struct padding */ int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */ signed char ht_size_exp[2]; /* exponent of size. (size = 1<type->keyCompare((d), key1, key2) : \ (key1) == (key2)) -#define dictEntryMetadataSize(d) ((d)->type->dictEntryMetadataBytes \ - ? (d)->type->dictEntryMetadataBytes(d) : 0) -#define dictMetadataSize(d) ((d)->type->dictMetadataBytes \ - ? (d)->type->dictMetadataBytes() : 0) - #define dictHashKey(d, key) ((d)->type->hashFunction(key)) -#define dictSlots(d) (DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1])) +#define dictBuckets(d) (DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1])) #define dictSize(d) ((d)->ht_used[0]+(d)->ht_used[1]) +#define dictIsEmpty(d) ((d)->ht_used[0] == 0 && (d)->ht_used[1] == 0) #define dictIsRehashing(d) ((d)->rehashidx != -1) #define dictPauseRehashing(d) ((d)->pauserehash++) #define dictResumeRehashing(d) ((d)->pauserehash--) @@ -165,6 +159,7 @@ typedef enum { /* API */ dict *dictCreate(dictType *type); +dict **dictCreateMultiple(dictType *type, int count); int dictExpand(dict *d, unsigned long size); int dictTryExpand(dict *d, unsigned long size); void *dictMetadata(dict *d); @@ -216,7 +211,7 @@ uint64_t dictGenCaseHashFunction(const unsigned char *buf, size_t len); void dictEmpty(dict *d, void(callback)(dict*)); void dictSetResizeEnabled(dictResizeEnable enable); int dictRehash(dict *d, int n); -int dictRehashMilliseconds(dict *d, int ms); +int dictRehashMilliseconds(dict *d, unsigned int ms); void dictSetHashFunctionSeed(uint8_t *seed); uint8_t *dictGetHashFunctionSeed(void); unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata); @@ -224,6 +219,11 @@ unsigned long dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dic uint64_t dictGetHash(dict *d, const void *key); dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash); +size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full); +dictStats* dictGetStatsHt(dict *d, int htidx, int full); +void dictCombineStats(dictStats *from, dictStats *into); +void dictFreeStats(dictStats *stats); + #ifdef REDIS_TEST int dictTest(int argc, char *argv[], int flags); #endif diff --git a/src/evict.c b/src/evict.c index 909714b43..6d27c1380 100644 --- a/src/evict.c +++ b/src/evict.c @@ -58,6 +58,7 @@ struct evictionPoolEntry { sds key; /* Key name. */ sds cached; /* Cached SDS object for key name. */ int dbid; /* Key DB number. */ + int slot; /* Slot. */ }; static struct evictionPoolEntry *EvictionPoolLRU; @@ -143,7 +144,7 @@ void evictionPoolAlloc(void) { * idle time are on the left, and keys with the higher idle time on the * right. */ -void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { +void evictionPoolPopulate(int dbid, int slot, dict *sampledict, redisDb *db, struct evictionPoolEntry *pool) { int j, k, count; dictEntry *samples[server.maxmemory_samples]; @@ -161,13 +162,13 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic * dictionary (but the expires one) we need to lookup the key * again in the key dictionary to obtain the value object. */ if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) { - if (sampledict != keydict) de = dictFind(keydict, key); + if (!(server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS)) de = dictFind(db->dict[calculateKeySlot(key)], key); o = dictGetVal(de); } /* Calculate the idle time according to the policy. This is called * idle just because the code initially handled LRU, but is in fact - * just a score where an higher score means better candidate. */ + * just a score where a higher score means better candidate. */ if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) { idle = estimateObjectIdleTime(o); } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { @@ -237,6 +238,7 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic } pool[k].idle = idle; pool[k].dbid = dbid; + pool[k].slot = slot; } } @@ -569,6 +571,7 @@ int performEvictions(void) { /* Try to smoke-out bugs (server.also_propagate should be empty here) */ serverAssert(server.also_propagate.numops == 0); + /* Evictions are performed on random keys that have nothing to do with the current command slot. */ while (mem_freed < (long long)mem_tofree) { int j, k, i; @@ -592,12 +595,24 @@ int performEvictions(void) { * every DB. */ for (i = 0; i < server.dbnum; i++) { db = server.db+i; - dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ? - db->dict : db->expires; - if ((keys = dictSize(dict)) != 0) { - evictionPoolPopulate(i, dict, db->dict, pool); - total_keys += keys; - } + do { + int slot = 0; + if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { + slot = getFairRandomSlot(db, DB_MAIN); + dict = db->dict[slot]; + } else { + slot = getFairRandomSlot(db, DB_EXPIRES); + dict = db->expires[slot]; + } + if ((keys = dictSize(dict)) != 0) { + evictionPoolPopulate(i, slot, dict, db, pool); + total_keys += keys; + } + /* Since keys are distributed across smaller slot-specific dictionaries in cluster mode, we may need to + * visit more than one dictionary in order to populate required number of samples into eviction pool. */ + } while (server.cluster_enabled && keys != 0 && server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS && + total_keys < (unsigned long) server.maxmemory_samples + ); } if (!total_keys) break; /* No keys to evict. */ @@ -607,11 +622,11 @@ int performEvictions(void) { bestdbid = pool[k].dbid; if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { - de = dictFind(server.db[bestdbid].dict, - pool[k].key); + de = dictFind(server.db[bestdbid].dict[pool[k].slot], + pool[k].key); } else { - de = dictFind(server.db[bestdbid].expires, - pool[k].key); + de = dictFind(server.db[bestdbid].expires[pool[k].slot], + pool[k].key); } /* Remove the entry from the pool. */ @@ -643,7 +658,7 @@ int performEvictions(void) { j = (++next_db) % server.dbnum; db = server.db+j; dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? - db->dict : db->expires; + db->dict[getFairRandomSlot(db, DB_MAIN)] : db->expires[getFairRandomSlot(db, DB_EXPIRES)]; if (dictSize(dict) != 0) { de = dictGetRandomKey(dict); bestkey = dictGetKey(de); diff --git a/src/expire.c b/src/expire.c index 425491af6..79f10f2d6 100644 --- a/src/expire.c +++ b/src/expire.c @@ -137,6 +137,18 @@ void expireScanCallback(void *privdata, const dictEntry *const_de) { data->sampled++; } +static inline int isExpiryDictValidForSamplingCb(dict *d) { + long long numkeys = dictSize(d); + unsigned long buckets = dictBuckets(d); + /* When there are less than 1% filled buckets, sampling the key + * space is expensive, so stop here waiting for better times... + * The dictionary will be resized asap. */ + if (buckets > DICT_HT_INITIAL_SIZE && (numkeys * 100/buckets < 1)) { + return C_ERR; + } + return C_OK; +} + void activeExpireCycle(int type) { /* Adjust the running parameters according to the configured expire * effort. The default effort is 1, and the maximum configurable effort @@ -229,23 +241,16 @@ void activeExpireCycle(int type) { * we scanned. The percentage, stored in config_cycle_acceptable_stale * is not fixed, but depends on the Redis configured "expire effort". */ do { - unsigned long num, slots; + unsigned long num; iteration++; /* If there is nothing to expire try next DB ASAP. */ - if ((num = dictSize(db->expires)) == 0) { + if ((num = dbSize(db, DB_EXPIRES)) == 0) { db->avg_ttl = 0; break; } - slots = dictSlots(db->expires); data.now = mstime(); - /* When there are less than 1% filled slots, sampling the key - * space is expensive, so stop here waiting for better times... - * The dictionary will be resized asap. */ - if (slots > DICT_HT_INITIAL_SIZE && - (num*100/slots < 1)) break; - /* The main collection cycle. Scan through keys among keys * with an expire set, checking for expired ones. */ data.sampled = 0; @@ -270,8 +275,10 @@ void activeExpireCycle(int type) { long checked_buckets = 0; while (data.sampled < num && checked_buckets < max_buckets) { - db->expires_cursor = dictScan(db->expires, db->expires_cursor, - expireScanCallback, &data); + db->expires_cursor = dbScan(db, DB_EXPIRES, db->expires_cursor, expireScanCallback, isExpiryDictValidForSamplingCb, &data); + if (db->expires_cursor == 0) { + break; + } checked_buckets++; } total_expired += data.expired; @@ -378,7 +385,7 @@ void expireSlaveKeys(void) { while(dbids && dbid < server.dbnum) { if ((dbids & 1) != 0) { redisDb *db = server.db+dbid; - dictEntry *expire = dictFind(db->expires,keyname); + dictEntry *expire = dictFind(db->expires[getKeySlot(keyname)],keyname); int expired = 0; if (expire && diff --git a/src/lazyfree.c b/src/lazyfree.c index 8ac55f777..2a6d1b7e1 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -19,14 +19,19 @@ void lazyfreeFreeObject(void *args[]) { * database which was substituted with a fresh one in the main thread * when the database was logically deleted. */ void lazyfreeFreeDatabase(void *args[]) { - dict *ht1 = (dict *) args[0]; - dict *ht2 = (dict *) args[1]; - - size_t numkeys = dictSize(ht1); - dictRelease(ht1); - dictRelease(ht2); - atomicDecr(lazyfree_objects,numkeys); - atomicIncr(lazyfreed_objects,numkeys); + dict **ht1 = (dict **) args[0]; + dict **ht2 = (dict **) args[1]; + int *dictCount = (int *) args[2]; + for (int i=0; i<*dictCount; i++) { + size_t numkeys = dictSize(ht1[i]); + dictRelease(ht1[i]); + dictRelease(ht2[i]); + atomicDecr(lazyfree_objects,numkeys); + atomicIncr(lazyfreed_objects,numkeys); + } + zfree(ht1); + zfree(ht2); + zfree(dictCount); } /* Release the key tracking table. */ @@ -174,11 +179,14 @@ void freeObjAsync(robj *key, robj *obj, int dbid) { * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ void emptyDbAsync(redisDb *db) { - dict *oldht1 = db->dict, *oldht2 = db->expires; - db->dict = dictCreate(&dbDictType); - db->expires = dictCreate(&dbExpiresDictType); - atomicIncr(lazyfree_objects,dictSize(oldht1)); - bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,oldht2); + dict **oldDict = db->dict; + dict **oldExpires = db->expires; + atomicIncr(lazyfree_objects,dbSize(db, DB_MAIN)); + db->dict = dictCreateMultiple(&dbDictType, db->dict_count); + db->expires = dictCreateMultiple(&dbExpiresDictType, db->dict_count); + int *count = zmalloc(sizeof(int)); + *count = db->dict_count; + bioCreateLazyFreeJob(lazyfreeFreeDatabase, 3, oldDict, oldExpires, count); } /* Free the key tracking table. diff --git a/src/module.c b/src/module.c index 093973261..c49e03bf0 100644 --- a/src/module.c +++ b/src/module.c @@ -4243,7 +4243,7 @@ void RM_ResetDataset(int restart_aof, int async) { /* Returns the number of keys in the current db. */ unsigned long long RM_DbSize(RedisModuleCtx *ctx) { - return dictSize(ctx->client->db->dict); + return dbSize(ctx->client->db, DB_MAIN); } /* Returns a name of a random key, or NULL if current db is empty. */ @@ -10879,7 +10879,7 @@ typedef struct { } ScanCBData; typedef struct RedisModuleScanCursor{ - unsigned long cursor; + unsigned long long cursor; int done; }RedisModuleScanCursor; @@ -10981,7 +10981,7 @@ int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanC } int ret = 1; ScanCBData data = { ctx, privdata, fn }; - cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, &data); + cursor->cursor = dbScan(ctx->client->db, DB_MAIN, cursor->cursor, moduleScanCallback, NULL, &data); if (cursor->cursor == 0) { cursor->done = 1; ret = 0; diff --git a/src/multi.c b/src/multi.c index 65d502c25..5a0bd7ebf 100644 --- a/src/multi.c +++ b/src/multi.c @@ -394,7 +394,7 @@ void touchWatchedKey(redisDb *db, robj *key) { /* The key was already expired when WATCH was called. */ if (db == wk->db && equalStringObjects(key, wk->key) && - dictFind(db->dict, key->ptr) == NULL) + dictFind(db->dict[calculateKeySlot(key->ptr)], key->ptr) == NULL) { /* Already expired key is deleted, so logically no change. Clear * the flag. Deleted keys are not flagged as expired. */ @@ -432,9 +432,9 @@ void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) { dictIterator *di = dictGetSafeIterator(emptied->watched_keys); while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); - int exists_in_emptied = dictFind(emptied->dict, key->ptr) != NULL; + int exists_in_emptied = dictFind(emptied->dict[calculateKeySlot(key->ptr)], key->ptr) != NULL; if (exists_in_emptied || - (replaced_with && dictFind(replaced_with->dict, key->ptr))) + (replaced_with && dictFind(replaced_with->dict[calculateKeySlot(key->ptr)], key->ptr))) { list *clients = dictGetVal(de); if (!clients) continue; @@ -442,7 +442,7 @@ void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) { while((ln = listNext(&li))) { watchedKey *wk = redis_member2struct(watchedKey, node, ln); if (wk->expired) { - if (!replaced_with || !dictFind(replaced_with->dict, key->ptr)) { + if (!replaced_with || !dictFind(replaced_with->dict[calculateKeySlot(key->ptr)], key->ptr)) { /* Expired key now deleted. No logical change. Clear the * flag. Deleted keys are not flagged as expired. */ wk->expired = 0; diff --git a/src/object.c b/src/object.c index 4b3526a02..930dd1a1d 100644 --- a/src/object.c +++ b/src/object.c @@ -1035,7 +1035,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { if (o->encoding == OBJ_ENCODING_HT) { d = o->ptr; di = dictGetIterator(d); - asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d)); + asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictBuckets(d)); while((de = dictNext(di)) != NULL && samples < sample_size) { ele = dictGetKey(de); elesize += dictEntryMemUsage() + sdsZmallocSize(ele); @@ -1058,7 +1058,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { zskiplist *zsl = ((zset*)o->ptr)->zsl; zskiplistNode *znode = zsl->header->level[0].forward; asize = sizeof(*o)+sizeof(zset)+sizeof(zskiplist)+sizeof(dict)+ - (sizeof(struct dictEntry*)*dictSlots(d))+ + (sizeof(struct dictEntry*)*dictBuckets(d))+ zmalloc_size(zsl->header); while(znode != NULL && samples < sample_size) { elesize += sdsZmallocSize(znode->ele); @@ -1076,7 +1076,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { } else if (o->encoding == OBJ_ENCODING_HT) { d = o->ptr; di = dictGetIterator(d); - asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d)); + asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictBuckets(d)); while((de = dictNext(di)) != NULL && samples < sample_size) { ele = dictGetKey(de); ele2 = dictGetVal(de); @@ -1246,28 +1246,21 @@ struct redisMemOverhead *getMemoryOverheadData(void) { for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; - long long keyscount = dictSize(db->dict); - if (keyscount==0) continue; + unsigned long long keyscount = dbSize(db, DB_MAIN); + if (keyscount == 0) continue; mh->total_keys += keyscount; mh->db = zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1)); mh->db[mh->num_dbs].dbid = j; - mem = dictMemUsage(db->dict) + - dictSize(db->dict) * sizeof(robj); + mem = dbMemUsage(db, DB_MAIN); mh->db[mh->num_dbs].overhead_ht_main = mem; mem_total+=mem; - mem = dictMemUsage(db->expires); + mem = dbMemUsage(db, DB_EXPIRES); mh->db[mh->num_dbs].overhead_ht_expires = mem; mem_total+=mem; - /* Account for the slot to keys map in cluster mode */ - mem = dictSize(db->dict) * dictEntryMetadataSize(db->dict) + - dictMetadataSize(db->dict); - mh->db[mh->num_dbs].overhead_ht_slot_to_keys = mem; - mem_total+=mem; - mh->num_dbs++; } @@ -1551,14 +1544,13 @@ NULL return; } } - if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) { + if ((de = dbFind(c->db, c->argv[2]->ptr, DB_MAIN)) == NULL) { addReplyNull(c); return; } size_t usage = objectComputeSize(c->argv[2],dictGetVal(de),samples,c->db->id); usage += sdsZmallocSize(dictGetKey(de)); usage += dictEntryMemUsage(); - usage += dictMetadataSize(c->db->dict); addReplyLongLong(c,usage); } else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) { struct redisMemOverhead *mh = getMemoryOverheadData(); @@ -1599,16 +1591,13 @@ NULL char dbname[32]; snprintf(dbname,sizeof(dbname),"db.%zd",mh->db[j].dbid); addReplyBulkCString(c,dbname); - addReplyMapLen(c,3); + addReplyMapLen(c,2); addReplyBulkCString(c,"overhead.hashtable.main"); addReplyLongLong(c,mh->db[j].overhead_ht_main); addReplyBulkCString(c,"overhead.hashtable.expires"); addReplyLongLong(c,mh->db[j].overhead_ht_expires); - - addReplyBulkCString(c,"overhead.hashtable.slot-to-keys"); - addReplyLongLong(c,mh->db[j].overhead_ht_slot_to_keys); } diff --git a/src/rdb.c b/src/rdb.c index 2c39df325..7cb180642 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1298,17 +1298,16 @@ werr: } ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { - dictIterator *di; dictEntry *de; ssize_t written = 0; ssize_t res; + dbIterator *dbit = NULL; static long long info_updated_time = 0; char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB"; redisDb *db = server.db + dbid; - dict *d = db->dict; - if (dictSize(d) == 0) return 0; - di = dictGetSafeIterator(d); + unsigned long long int db_size = dbSize(db, DB_MAIN); + if (db_size == 0) return 0; /* Write the SELECT DB opcode */ if ((res = rdbSaveType(rdb,RDB_OPCODE_SELECTDB)) < 0) goto werr; @@ -1317,9 +1316,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { written += res; /* Write the RESIZE DB opcode. */ - uint64_t db_size, expires_size; - db_size = dictSize(db->dict); - expires_size = dictSize(db->expires); + unsigned long long expires_size = dbSize(db, DB_EXPIRES); if ((res = rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)) < 0) goto werr; written += res; if ((res = rdbSaveLen(rdb,db_size)) < 0) goto werr; @@ -1327,8 +1324,23 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { if ((res = rdbSaveLen(rdb,expires_size)) < 0) goto werr; written += res; + dbit = dbIteratorInit(db, DB_MAIN); + int last_slot = -1; /* Iterate this DB writing every entry */ - while((de = dictNext(di)) != NULL) { + while ((de = dbIteratorNext(dbit)) != NULL) { + int curr_slot = dbIteratorGetCurrentSlot(dbit); + /* Save slot info. */ + if (server.cluster_enabled && curr_slot != last_slot) { + if ((res = rdbSaveType(rdb, RDB_OPCODE_SLOT_INFO)) < 0) goto werr; + written += res; + if ((res = rdbSaveLen(rdb, curr_slot)) < 0) goto werr; + written += res; + if ((res = rdbSaveLen(rdb, dictSize(db->dict[curr_slot]))) < 0) goto werr; + written += res; + if ((res = rdbSaveLen(rdb, dictSize(db->expires[curr_slot]))) < 0) goto werr; + written += res; + last_slot = curr_slot; + } sds keystr = dictGetKey(de); robj key, *o = dictGetVal(de); long long expire; @@ -1356,12 +1368,11 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { } } } - - dictReleaseIterator(di); + zfree(dbit); return written; werr: - dictReleaseIterator(di); + if (dbit) zfree(dbit); return -1; } @@ -3023,6 +3034,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) { uint64_t dbid = 0; int type, rdbver; + uint64_t db_size = 0, expires_size = 0; + int should_expand_db = 1; redisDb *db = rdb_loading_ctx->dbarray+0; char buf[1024]; int error; @@ -3098,13 +3111,26 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin } else if (type == RDB_OPCODE_RESIZEDB) { /* RESIZEDB: Hint about the size of the keys in the currently * selected data base, in order to avoid useless rehashing. */ - uint64_t db_size, expires_size; if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; - dictExpand(db->dict,db_size); - dictExpand(db->expires,expires_size); + continue; /* Read next opcode. */ + } else if (type == RDB_OPCODE_SLOT_INFO) { + uint64_t slot_id, slot_size, expires_slot_size; + if ((slot_id = rdbLoadLen(rdb,NULL)) == RDB_LENERR) + goto eoferr; + if ((slot_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) + goto eoferr; + if ((expires_slot_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) + goto eoferr; + if (!server.cluster_enabled) { + continue; /* Ignore gracefully. */ + } + /* In cluster mode we resize individual slot specific dictionaries based on the number of keys that slot holds. */ + dictExpand(db->dict[slot_id], slot_size); + dictExpand(db->expires[slot_id], expires_slot_size); + should_expand_db = 0; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_AUX) { /* AUX: generic string-string fields. Use to add state to RDB @@ -3234,6 +3260,20 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin continue; } + /* If there is no slot info, it means that it's either not cluster mode or we are trying to load legacy RDB file. + * In this case we want to estimate number of keys per slot and resize accordingly. */ + if (should_expand_db) { + if (dbExpand(db, db_size, DB_MAIN, 0) == C_ERR) { + serverLog(LL_WARNING, "OOM in dict expand of main dict"); + return C_ERR; + } + if (dbExpand(db, expires_size, DB_EXPIRES, 0) == C_ERR) { + serverLog(LL_WARNING, "OOM in dict expand of expire dict"); + return C_ERR; + } + should_expand_db = 0; + } + /* Read key */ if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) goto eoferr; diff --git a/src/rdb.h b/src/rdb.h index df1095d88..cf94444eb 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -38,7 +38,7 @@ /* The current RDB version. When the format changes in a way that is no longer * backward compatible this number gets incremented. */ -#define RDB_VERSION 11 +#define RDB_VERSION 12 /* Defines related to the dump file format. To store 32 bits lengths for short * keys requires a lot of space, so we check the most significant 2 bits of @@ -103,6 +103,7 @@ #define rdbIsObjectType(t) (((t) >= 0 && (t) <= 7) || ((t) >= 9 && (t) <= 21)) /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ +#define RDB_OPCODE_SLOT_INFO 244 /* Individual slot info, such as slot id and size (cluster mode only). */ #define RDB_OPCODE_FUNCTION2 245 /* function library data */ #define RDB_OPCODE_FUNCTION_PRE_GA 246 /* old function library data for 7.0 rc1 and rc2 */ #define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index 682135e55..c38284380 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -276,6 +276,13 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { if ((expires_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR) goto eoferr; continue; /* Read type again. */ + } else if (type == RDB_OPCODE_SLOT_INFO) { + uint64_t slot_id, slot_size; + if ((slot_id = rdbLoadLen(&rdb,NULL)) == RDB_LENERR) + goto eoferr; + if ((slot_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR) + goto eoferr; + continue; /* Read type again. */ } else if (type == RDB_OPCODE_AUX) { /* AUX: generic string-string fields. Use to add state to RDB * which is backward compatible. Implementations of RDB loading diff --git a/src/server.c b/src/server.c index b164b8cbb..29e9c71fd 100644 --- a/src/server.c +++ b/src/server.c @@ -400,25 +400,17 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) { } } -/* Returns the size of the DB dict entry metadata in bytes. In cluster mode, the - * metadata is used for constructing a doubly linked list of the dict entries - * belonging to the same cluster slot. See the Slot to Key API in cluster.c. */ -size_t dbDictEntryMetadataSize(dict *d) { - UNUSED(d); - /* NOTICE: this also affects overhead_ht_slot_to_keys in getMemoryOverheadData. - * If we ever add non-cluster related data here, that code must be modified too. */ - return server.cluster_enabled ? sizeof(clusterDictEntryMetadata) : 0; +/* Adds dictionary to the rehashing list in cluster mode, which allows us + * to quickly find rehash targets during incremental rehashing. + * In non-cluster mode, we don't need this list as there is only one dictionary per DB. */ +void dictRehashingStarted(dict *d) { + if (!server.cluster_enabled || !server.activerehashing) return; + listAddNodeTail(server.db[0].sub_dict[DB_MAIN].rehashing, d); } -/* Returns the size of the DB dict metadata in bytes. In cluster mode, we store - * a pointer to the db in the main db dict, used for updating the slot-to-key - * mapping when a dictEntry is reallocated. */ -size_t dbDictMetadataSize(void) { - return server.cluster_enabled ? sizeof(clusterDictMetadata) : 0; -} - -void dbDictAfterReplaceEntry(dict *d, dictEntry *de) { - if (server.cluster_enabled) slotToKeyReplaceEntry(d, de); +void dictRehashingStartedForExpires(dict *d) { + if (!server.cluster_enabled || !server.activerehashing) return; + listAddNodeTail(server.db[0].sub_dict[DB_EXPIRES].rehashing, d); } /* Generic hash table type where keys are Redis Objects, Values @@ -476,9 +468,7 @@ dictType dbDictType = { dictSdsDestructor, /* key destructor */ dictObjectDestructor, /* val destructor */ dictExpandAllowed, /* allow to expand */ - .dictEntryMetadataBytes = dbDictEntryMetadataSize, - .dictMetadataBytes = dbDictMetadataSize, - .afterReplaceEntry = dbDictAfterReplaceEntry + dictRehashingStarted, }; /* Db->expires */ @@ -489,7 +479,8 @@ dictType dbExpiresDictType = { dictSdsKeyCompare, /* key compare */ NULL, /* key destructor */ NULL, /* val destructor */ - dictExpandAllowed /* allow to expand */ + dictExpandAllowed, /* allow to expand */ + dictRehashingStartedForExpires, }; /* Command table. sds string -> command struct pointer. */ @@ -600,19 +591,33 @@ dictType sdsHashDictType = { int htNeedsResize(dict *dict) { long long size, used; - size = dictSlots(dict); + size = dictBuckets(dict); used = dictSize(dict); return (size > DICT_HT_INITIAL_SIZE && (used*100/size < HASHTABLE_MIN_FILL)); } -/* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL - * we resize the hash table to save memory */ +/* In cluster-enabled setup, this method traverses through all main/expires dictionaries (CLUSTER_SLOTS) + * and triggers a resize if the percentage of used buckets in the HT reaches HASHTABLE_MIN_FILL + * we resize the hash table to save memory. + * + * In non cluster-enabled setup, it resize main/expires dictionary based on the same condition described above. */ void tryResizeHashTables(int dbid) { - if (htNeedsResize(server.db[dbid].dict)) - dictResize(server.db[dbid].dict); - if (htNeedsResize(server.db[dbid].expires)) - dictResize(server.db[dbid].expires); + redisDb *db = &server.db[dbid]; + int slot = 0; + for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { + dbIterator *dbit = dbIteratorInitFromSlot(db, subdict, db->sub_dict[subdict].resize_cursor); + for (int i = 0; i < CRON_DBS_PER_CALL; i++) { + dict *d = dbIteratorNextDict(dbit); + slot = dbIteratorGetCurrentSlot(dbit); + if (!d) break; + if (htNeedsResize(d)) + dictResize(d); + } + /* Save current iterator position in the resize_cursor. */ + db->sub_dict[subdict].resize_cursor = slot; + zfree(dbit); + } } /* Our hash table implementation performs rehashing incrementally while @@ -623,15 +628,42 @@ void tryResizeHashTables(int dbid) { * The function returns 1 if some rehashing was performed, otherwise 0 * is returned. */ int incrementallyRehash(int dbid) { - /* Keys dictionary */ - if (dictIsRehashing(server.db[dbid].dict)) { - dictRehashMilliseconds(server.db[dbid].dict,1); - return 1; /* already used our millisecond for this loop... */ - } - /* Expires */ - if (dictIsRehashing(server.db[dbid].expires)) { - dictRehashMilliseconds(server.db[dbid].expires,1); - return 1; /* already used our millisecond for this loop... */ + /* Rehash main and expire dictionary . */ + if (server.cluster_enabled) { + listNode *node, *nextNode; + monotime timer; + elapsedStart(&timer); + /* Our goal is to rehash as many slot specific dictionaries as we can before reaching predefined threshold, + * while removing those that already finished rehashing from the queue. */ + for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { + serverLog(LL_DEBUG,"Rehashing list length: %lu", listLength(server.db[dbid].sub_dict[subdict].rehashing)); + while ((node = listFirst(server.db[dbid].sub_dict[subdict].rehashing))) { + if (dictIsRehashing((dict *) listNodeValue(node))) { + dictRehashMilliseconds(listNodeValue(node), INCREMENTAL_REHASHING_THRESHOLD_MS); + if (elapsedMs(timer) >= INCREMENTAL_REHASHING_THRESHOLD_MS) { + return 1; /* Reached the time limit. */ + } + } else { /* It is possible that rehashing has already completed for this dictionary, simply remove it from the queue. */ + nextNode = listNextNode(node); + listDelNode(server.db[dbid].sub_dict[subdict].rehashing, node); + node = nextNode; + } + } + } + /* When cluster mode is disabled, only one dict is used for the entire DB and rehashing list isn't populated. */ + } else { + /* Rehash main dict. */ + dict *main_dict = server.db[dbid].dict[0]; + if (main_dict) { + dictRehashMilliseconds(main_dict, INCREMENTAL_REHASHING_THRESHOLD_MS); + return 1; /* already used our millisecond for this loop... */ + } + /* Rehash expires. */ + dict *expires_dict = server.db[dbid].expires[0]; + if (expires_dict) { + dictRehashMilliseconds(expires_dict, INCREMENTAL_REHASHING_THRESHOLD_MS); + return 1; /* already used our millisecond for this loop... */ + } } return 0; } @@ -1356,9 +1388,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { for (j = 0; j < server.dbnum; j++) { long long size, used, vkeys; - size = dictSlots(server.db[j].dict); - used = dictSize(server.db[j].dict); - vkeys = dictSize(server.db[j].expires); + size = dbBuckets(&server.db[j], DB_MAIN); + used = dbSize(&server.db[j], DB_MAIN); + vkeys = dbSize(&server.db[j], DB_EXPIRES); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); } @@ -2564,6 +2596,15 @@ void makeThreadKillable(void) { pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); } +void initDbState(redisDb *db){ + for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { + db->sub_dict[subdict].rehashing = listCreate(); + db->sub_dict[subdict].key_count = 0; + db->sub_dict[subdict].resize_cursor = 0; + db->sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL; + } +} + void initServer(void) { int j; @@ -2640,8 +2681,9 @@ void initServer(void) { /* Create the Redis databases, and initialize other internal state. */ for (j = 0; j < server.dbnum; j++) { - server.db[j].dict = dictCreate(&dbDictType); - server.db[j].expires = dictCreate(&dbExpiresDictType); + int slotCount = (server.cluster_enabled) ? CLUSTER_SLOTS : 1; + server.db[j].dict = dictCreateMultiple(&dbDictType, slotCount); + server.db[j].expires = dictCreateMultiple(&dbExpiresDictType,slotCount); server.db[j].expires_cursor = 0; server.db[j].blocking_keys = dictCreate(&keylistDictType); server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType); @@ -2650,7 +2692,8 @@ void initServer(void) { server.db[j].id = j; server.db[j].avg_ttl = 0; server.db[j].defrag_later = listCreate(); - server.db[j].slots_to_keys = NULL; /* Set by clusterInit later on if necessary. */ + server.db[j].dict_count = slotCount; + initDbState(&server.db[j]); listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree); } evictionPoolAlloc(); /* Initialize the LRU keys pool. */ @@ -4171,7 +4214,6 @@ int processCommand(client *c) { if (listLength(server.ready_keys) && !isInsideYieldingLongCommand()) handleClientsBlockedOnKeys(); } - return C_OK; } @@ -6013,8 +6055,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { for (j = 0; j < server.dbnum; j++) { long long keys, vkeys; - keys = dictSize(server.db[j].dict); - vkeys = dictSize(server.db[j].expires); + keys = dbSize(&server.db[j], DB_MAIN); + vkeys = dbSize(&server.db[j], DB_EXPIRES); if (keys || vkeys) { info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", @@ -6805,6 +6847,7 @@ int main(int argc, char **argv) { char config_from_stdin = 0; #ifdef REDIS_TEST + monotonicInit(); /* Required for dict tests, that are relying on monotime during dict rehashing. */ if (argc >= 3 && !strcasecmp(argv[1], "test")) { int flags = 0; for (j = 3; j < argc; j++) { diff --git a/src/server.h b/src/server.h index 27803573a..e6c45051d 100644 --- a/src/server.h +++ b/src/server.h @@ -137,6 +137,7 @@ struct hdr_histogram; #define CONFIG_BINDADDR_MAX 16 #define CONFIG_MIN_RESERVED_FDS 32 #define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}" +#define INCREMENTAL_REHASHING_THRESHOLD_MS 1 /* Bucket sizes for client eviction pools. Each bucket stores clients with * memory usage of up to twice the size of the bucket below it. */ @@ -959,15 +960,24 @@ typedef struct replBufBlock { char buf[]; } replBufBlock; -/* Opaque type for the Slot to Key API. */ -typedef struct clusterSlotToKeyMapping clusterSlotToKeyMapping; +typedef struct dbDictState { + list *rehashing; /* List of dictionaries in this DB that are currently rehashing. */ + int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries. */ + unsigned long long key_count; /* Total number of keys in this DB. */ + unsigned long long *slot_size_index; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given slot. */ +} dbDictState; + +typedef enum dbKeyType { + DB_MAIN, + DB_EXPIRES +} dbKeyType; /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { - dict *dict; /* The keyspace for this DB */ - dict *expires; /* Timeout of keys with a timeout set */ + dict **dict; /* The keyspace for this DB */ + dict **expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *blocking_keys_unblock_on_nokey; /* Keys with clients waiting for * data, and should be unblocked if key is deleted (XREADEDGROUP). @@ -978,7 +988,8 @@ typedef struct redisDb { long long avg_ttl; /* Average TTL, just for stats */ unsigned long expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ - clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */ + int dict_count; /* Indicates total number of dictionaires owned by this DB, 1 dict per slot in cluster mode. */ + dbDictState sub_dict[2]; /* Metadata for main and expires dictionaries */ } redisDb; /* forward declaration for functions ctx */ @@ -1418,7 +1429,6 @@ struct redisMemOverhead { size_t dbid; size_t overhead_ht_main; size_t overhead_ht_expires; - size_t overhead_ht_slot_to_keys; } *db; }; @@ -2412,6 +2422,19 @@ typedef struct { unsigned char *lpi; /* listpack iterator */ } setTypeIterator; +typedef struct dbIterator dbIterator; + +/* DB iterator specific functions */ +dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType); +dbIterator *dbIteratorInitFromSlot(redisDb *db, dbKeyType keyType, int slot); +dict *dbIteratorNextDict(dbIterator *dbit); +int dbIteratorGetCurrentSlot(dbIterator *dbit); +dictEntry *dbIteratorNext(dbIterator *iter); + +/* SCAN specific commands for easy cursor manipulation, shared between main code and modules. */ +int getAndClearSlotIdFromCursor(unsigned long long *cursor); +void addSlotIdToCursor(int slot, unsigned long long *cursor); + /* Structure to hold hash iteration abstraction. Note that iteration over * hashes involves both fields and values. Because it is possible that * not both are required, store pointers in the iterator to avoid @@ -3089,6 +3112,18 @@ void dismissMemoryInChild(void); #define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */ #define RESTART_SERVER_CONFIG_REWRITE (1<<1) /* CONFIG REWRITE before restart.*/ int restartServer(int flags, mstime_t delay); +unsigned long long int dbSize(redisDb *db, dbKeyType keyType); +int getKeySlot(sds key); +int calculateKeySlot(sds key); +unsigned long dbBuckets(redisDb *db, dbKeyType keyType); +size_t dbMemUsage(redisDb *db, dbKeyType keyType); +dictEntry *dbFind(redisDb *db, void *key, dbKeyType keyType); +unsigned long long dbScan(redisDb *db, dbKeyType keyType, unsigned long long cursor, dictScanFunction *fn, int (dictScanValidFunction)(dict *d), void *privdata); +int dbExpand(const redisDb *db, uint64_t db_size, dbKeyType keyType, int try_expand); +unsigned long long cumulativeKeyCountRead(redisDb *db, int idx, dbKeyType keyType); +int getFairRandomSlot(redisDb *db, dbKeyType keyType); +int dbGetNextNonEmptySlot(redisDb *db, int slot, dbKeyType keyType); +int findSlotByKeyIndex(redisDb *db, unsigned long target, dbKeyType keyType); /* Set data type */ robj *setTypeCreate(sds value, size_t size_hint); @@ -3278,8 +3313,8 @@ void discardTempDb(redisDb *tempDb, void(callback)(dict*)); int selectDb(client *c, int id); void signalModifiedKey(client *c, redisDb *db, robj *key); void signalFlushedDb(int dbid, int async); -void scanGenericCommand(client *c, robj *o, unsigned long cursor); -int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor); +void scanGenericCommand(client *c, robj *o, unsigned long long cursor); +int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor); int dbAsyncDelete(redisDb *db, robj *key); void emptyDbAsync(redisDb *db); size_t lazyfreeGetPendingObjectsCount(void); diff --git a/src/t_hash.c b/src/t_hash.c index b199d8c69..9242d27cc 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -888,7 +888,7 @@ void hexistsCommand(client *c) { void hscanCommand(client *c) { robj *o; - unsigned long cursor; + unsigned long long cursor; if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL || diff --git a/src/t_set.c b/src/t_set.c index ff7dc8ffc..c2729105d 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -1671,7 +1671,7 @@ void sdiffstoreCommand(client *c) { void sscanCommand(client *c) { robj *set; - unsigned long cursor; + unsigned long long cursor; if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return; if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL || diff --git a/src/t_zset.c b/src/t_zset.c index 1242732d1..e8fea7e47 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -3901,7 +3901,7 @@ void zrevrankCommand(client *c) { void zscanCommand(client *c) { robj *o; - unsigned long cursor; + unsigned long long cursor; if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL || diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 8941d1ae8..4136fb17e 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -602,16 +602,24 @@ proc stop_bg_complex_data {handle} { # Write num keys with the given key prefix and value size (in bytes). If idx is # given, it's the index (AKA level) used with the srv procedure and it specifies # to which Redis instance to write the keys. -proc populate {num {prefix key:} {size 3} {idx 0} {prints false}} { +proc populate {num {prefix key:} {size 3} {idx 0} {prints false} {expires 0}} { r $idx deferred 1 if {$num > 16} {set pipeline 16} else {set pipeline $num} set val [string repeat A $size] for {set j 0} {$j < $pipeline} {incr j} { - r $idx set $prefix$j $val + if {$expires > 0} { + r $idx set $prefix$j $val ex $expires + } else { + r $idx set $prefix$j $val + } if {$prints} {puts $j} } for {} {$j < $num} {incr j} { - r $idx set $prefix$j $val + if {$expires > 0} { + r $idx set $prefix$j $val ex $expires + } else { + r $idx set $prefix$j $val + } r $idx read if {$prints} {puts $j} } diff --git a/tests/unit/cluster/scripting.tcl b/tests/unit/cluster/scripting.tcl index 1ade36ea2..b60c1255b 100644 --- a/tests/unit/cluster/scripting.tcl +++ b/tests/unit/cluster/scripting.tcl @@ -30,6 +30,13 @@ start_cluster 1 0 {tags {external:skip cluster}} { redis.call('set', 'foo', 'bar'); redis.call('set', 'bar', 'foo') } 0 + # Retrieve data from different slot to verify data has been stored in the correct dictionary in cluster-enabled setup + # during cross-slot operation from the above lua script. + assert_equal "bar" [r 0 get foo] + assert_equal "foo" [r 0 get bar] + r 0 del foo + r 0 del bar + # Functions with allow-cross-slot-keys flag are allowed r 0 function load REPLACE {#!lua name=crossslot local function test_cross_slot(keys, args) @@ -40,6 +47,11 @@ start_cluster 1 0 {tags {external:skip cluster}} { redis.register_function{function_name='test_cross_slot', callback=test_cross_slot, flags={ 'allow-cross-slot-keys' }}} r FCALL test_cross_slot 0 + + # Retrieve data from different slot to verify data has been stored in the correct dictionary in cluster-enabled setup + # during cross-slot operation from the above lua function. + assert_equal "bar" [r 0 get foo] + assert_equal "foo" [r 0 get bar] } test {Cross slot commands are also blocked if they disagree with pre-declared keys} { diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index 15dae20e4..e429c1ccc 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -192,8 +192,8 @@ start_server {tags {"expire"}} { # two seconds. wait_for_condition 20 100 { [r dbsize] eq 0 - } fail { - "Keys did not actively expire." + } else { + fail "Keys did not actively expire." } } @@ -378,8 +378,8 @@ start_server {tags {"expire"}} { {set foo15 bar} {pexpireat foo15 *} {set foo16 bar} - {restore foo17 * {*} ABSTTL} - {restore foo18 * {*} absttl} + {restore foo17 * * ABSTTL} + {restore foo18 * * absttl} } # Remember the absolute TTLs of all the keys @@ -507,8 +507,8 @@ start_server {tags {"expire"}} { {pexpireat foo4 *} {pexpireat foo4 *} {set foo5 bar} - {restore foo6 * {*} ABSTTL} - {restore foo7 * {*} absttl} + {restore foo6 * * ABSTTL} + {restore foo7 * * absttl} } close_replication_stream $repl } {} {needs:repl} @@ -833,3 +833,75 @@ start_server {tags {"expire"}} { assert_equal [r debug set-active-expire 1] {OK} } {} {needs:debug} } + +start_cluster 1 0 {tags {"expire external:skip cluster"}} { + test "expire scan should skip dictionaries with lot's of empty buckets" { + # Collect two slots to help determine the expiry scan logic is able + # to go past certain slots which aren't valid for scanning at the given point of time. + # And the next non empyt slot after that still gets scanned and expiration happens. + + # hashslot(alice) is 749 + r psetex alice 500 val + + # hashslot(foo) is 12182 + # fill data across different slots with expiration + for {set j 1} {$j <= 100} {incr j} { + r psetex "{foo}$j" 500 a + } + # hashslot(key) is 12539 + r psetex key 500 val + + assert_equal 102 [r dbsize] + + # disable resizing + r config set rdb-key-save-delay 10000000 + r bgsave + + # delete data to have lot's (99%) of empty buckets (slot 12182 should be skipped) + for {set j 1} {$j <= 99} {incr j} { + r del "{foo}$j" + } + + # Verify {foo}5 still exists and remaining got cleaned up + wait_for_condition 20 100 { + [r dbsize] eq 1 + } else { + if {[r dbsize] eq 0} { + fail "scan didn't handle slot skipping logic." + } else { + fail "scan didn't process all valid slots." + } + } + + # Enable resizing + r config set rdb-key-save-delay 0 + catch {exec kill -9 [get_child_pid 0]} + wait_for_condition 1000 10 { + [s rdb_bgsave_in_progress] eq 0 + } else { + fail "bgsave did not stop in time." + } + + # Verify dict is under rehashing + set htstats [r debug HTSTATS 0] + assert_match {*rehashing target*} $htstats + + # put some data into slot 12182 and trigger the resize + r psetex "{foo}0" 500 a + + # Verify dict rehashing has completed + set htstats [r debug HTSTATS 0] + wait_for_condition 20 100 { + ![string match {*rehashing target*} $htstats] + } else { + fail "rehashing didn't complete" + } + + # Verify all keys have expired + wait_for_condition 20 100 { + [r dbsize] eq 0 + } else { + fail "Keys did not actively expire." + } + } +} diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 564f1e4b2..34ff83015 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -37,9 +37,9 @@ start_server {tags {"memefficiency external:skip"}} { } run_solo {defrag} { -start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-rewrite-percentage 0 save ""}} { + proc test_active_defrag {type} { if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} { - test "Active defrag" { + test "Active defrag main dictionary: $type" { r config set hz 100 r config set activedefrag no r config set active-defrag-threshold-lower 5 @@ -50,7 +50,11 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r r config set maxmemory-policy allkeys-lru populate 700000 asdf1 150 + populate 100 asdf1 150 0 false 1000 populate 170000 asdf2 300 + populate 100 asdf2 300 0 false 1000 + + assert {[scan [regexp -inline {expires\=([\d]*)} [r info keyspace]] expires=%d] > 0} after 120 ;# serverCron only updates the info once in 100ms set frag [s allocator_frag_ratio] if {$::verbose} { @@ -115,7 +119,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r r save ;# saving an rdb iterates over all the data / pointers # if defrag is supported, test AOF loading too - if {[r config get activedefrag] eq "activedefrag yes"} { + if {[r config get activedefrag] eq "activedefrag yes" && $type eq "standalone"} { test "Active defrag - AOF loading" { # reset stats and load the AOF file r config resetstat @@ -159,8 +163,8 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r } r config set appendonly no r config set key-load-delay 0 - - test "Active defrag eval scripts" { + + test "Active defrag eval scripts: $type" { r flushdb r script flush sync r config resetstat @@ -171,7 +175,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r r config set active-defrag-cycle-max 75 r config set active-defrag-ignore-bytes 1500kb r config set maxmemory 0 - + set n 50000 # Populate memory with interleaving script-key pattern of same size @@ -192,9 +196,9 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r puts "rss [s allocator_active]" puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" - } + } assert_lessthan [s allocator_frag_ratio] 1.05 - + # Delete all the keys to create fragmentation for {set j 0} {$j < $n} {incr j} { $rd del k$j } for {set j 0} {$j < $n} {incr j} { $rd read } ; # Discard del replies @@ -205,7 +209,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r puts "rss [s allocator_active]" puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" - } + } assert_morethan [s allocator_frag_ratio] 1.4 catch {r config set activedefrag yes} e @@ -235,14 +239,14 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r puts "rss [s allocator_active]" puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" - } + } assert_lessthan_equal [s allocator_frag_ratio] 1.05 - } + } # Flush all script to make sure we don't crash after defragging them r script flush sync } {OK} - test "Active defrag big keys" { + test "Active defrag big keys: $type" { r flushdb r config resetstat r config set hz 100 @@ -277,6 +281,14 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r $rd read ; # Discard replies } + # create some small items (effective in cluster-enabled) + r set "{bighash}smallitem" val + r set "{biglist}smallitem" val + r set "{bigzset}smallitem" val + r set "{bigset}smallitem" val + r set "{bigstream}smallitem" val + + set expected_frag 1.7 if {$::accurate} { # scale the hash to 1m fields in order to have a measurable the latency @@ -297,7 +309,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r for {set j 0} {$j < 500000} {incr j} { $rd read ; # Discard replies } - assert_equal [r dbsize] 500010 + assert_equal [r dbsize] 500015 # create some fragmentation for {set j 0} {$j < 500000} {incr j 2} { @@ -306,7 +318,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r for {set j 0} {$j < 500000} {incr j 2} { $rd read ; # Discard replies } - assert_equal [r dbsize] 250010 + assert_equal [r dbsize] 250015 # start defrag after 120 ;# serverCron only updates the info once in 100ms @@ -371,7 +383,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r r save ;# saving an rdb iterates over all the data / pointers } {OK} - test "Active defrag big list" { + test "Active defrag big list: $type" { r flushdb r config resetstat r config set hz 100 @@ -473,7 +485,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r r del biglist1 ;# coverage for quicklistBookmarksClear } {1} - test "Active defrag edge case" { + test "Active defrag edge case: $type" { # there was an edge case in defrag where all the slabs of a certain bin are exact the same # % utilization, with the exception of the current slab from which new allocations are made # if the current slab is lower in utilization the defragger would have ended up in stagnation, @@ -576,5 +588,13 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r } } } -} + } + + start_cluster 1 0 {tags {"defrag external:skip cluster"} overrides {appendonly yes auto-aof-rewrite-percentage 0 save ""}} { + test_active_defrag "cluster" + } + + start_server {tags {"defrag external:skip standalone"} overrides {appendonly yes auto-aof-rewrite-percentage 0 save ""}} { + test_active_defrag "standalone" + } } ;# run_solo diff --git a/tests/unit/scan.tcl b/tests/unit/scan.tcl index d688d7cda..1ec38ddbc 100644 --- a/tests/unit/scan.tcl +++ b/tests/unit/scan.tcl @@ -1,5 +1,5 @@ -start_server {tags {"scan network"}} { - test "SCAN basic" { +proc test_scan {type} { + test "{$type} SCAN basic" { r flushdb populate 1000 @@ -17,7 +17,7 @@ start_server {tags {"scan network"}} { assert_equal 1000 [llength $keys] } - test "SCAN COUNT" { + test "{$type} SCAN COUNT" { r flushdb populate 1000 @@ -35,7 +35,7 @@ start_server {tags {"scan network"}} { assert_equal 1000 [llength $keys] } - test "SCAN MATCH" { + test "{$type} SCAN MATCH" { r flushdb populate 1000 @@ -53,7 +53,7 @@ start_server {tags {"scan network"}} { assert_equal 100 [llength $keys] } - test "SCAN TYPE" { + test "{$type} SCAN TYPE" { r flushdb # populate only creates strings populate 1000 @@ -98,7 +98,7 @@ start_server {tags {"scan network"}} { assert_equal 1000 [llength $keys] } - test "SCAN unknown type" { + test "{$type} SCAN unknown type" { r flushdb # make sure that passive expiration is triggered by the scan r debug set-active-expire 0 @@ -131,7 +131,7 @@ start_server {tags {"scan network"}} { r debug set-active-expire 1 } {OK} {needs:debug} - test "SCAN with expired keys" { + test "{$type} SCAN with expired keys" { r flushdb # make sure that passive expiration is triggered by the scan r debug set-active-expire 0 @@ -164,7 +164,7 @@ start_server {tags {"scan network"}} { r debug set-active-expire 1 } {OK} {needs:debug} - test "SCAN with expired keys with TYPE filter" { + test "{$type} SCAN with expired keys with TYPE filter" { r flushdb # make sure that passive expiration is triggered by the scan r debug set-active-expire 0 @@ -201,7 +201,7 @@ start_server {tags {"scan network"}} { } {OK} {needs:debug} foreach enc {intset listpack hashtable} { - test "SSCAN with encoding $enc" { + test "{$type} SSCAN with encoding $enc" { # Create the Set r del set if {$enc eq {intset}} { @@ -236,7 +236,7 @@ start_server {tags {"scan network"}} { } foreach enc {listpack hashtable} { - test "HSCAN with encoding $enc" { + test "{$type} HSCAN with encoding $enc" { # Create the Hash r del hash if {$enc eq {listpack}} { @@ -276,7 +276,7 @@ start_server {tags {"scan network"}} { } foreach enc {listpack skiplist} { - test "ZSCAN with encoding $enc" { + test "{$type} ZSCAN with encoding $enc" { # Create the Sorted Set r del zset if {$enc eq {listpack}} { @@ -315,7 +315,7 @@ start_server {tags {"scan network"}} { } } - test "SCAN guarantees check under write load" { + test "{$type} SCAN guarantees check under write load" { r flushdb populate 100 @@ -344,7 +344,7 @@ start_server {tags {"scan network"}} { assert_equal 100 [llength $keys2] } - test "SSCAN with integer encoded object (issue #1345)" { + test "{$type} SSCAN with integer encoded object (issue #1345)" { set objects {1 a} r del set r sadd set {*}$objects @@ -354,28 +354,28 @@ start_server {tags {"scan network"}} { assert_equal [lsort -unique [lindex $res 1]] {1} } - test "SSCAN with PATTERN" { + test "{$type} SSCAN with PATTERN" { r del mykey r sadd mykey foo fab fiz foobar 1 2 3 4 set res [r sscan mykey 0 MATCH foo* COUNT 10000] lsort -unique [lindex $res 1] } {foo foobar} - test "HSCAN with PATTERN" { + test "{$type} HSCAN with PATTERN" { r del mykey r hmset mykey foo 1 fab 2 fiz 3 foobar 10 1 a 2 b 3 c 4 d set res [r hscan mykey 0 MATCH foo* COUNT 10000] lsort -unique [lindex $res 1] } {1 10 foo foobar} - test "ZSCAN with PATTERN" { + test "{$type} ZSCAN with PATTERN" { r del mykey r zadd mykey 1 foo 2 fab 3 fiz 10 foobar set res [r zscan mykey 0 MATCH foo* COUNT 10000] lsort -unique [lindex $res 1] } - test "ZSCAN scores: regression test for issue #2175" { + test "{$type} ZSCAN scores: regression test for issue #2175" { r del mykey for {set j 0} {$j < 500} {incr j} { r zadd mykey 9.8813129168249309e-323 $j @@ -385,7 +385,7 @@ start_server {tags {"scan network"}} { assert {$first_score != 0} } - test "SCAN regression test for issue #4906" { + test "{$type} SCAN regression test for issue #4906" { for {set k 0} {$k < 100} {incr k} { r del set r sadd set x; # Make sure it's not intset encoded @@ -431,3 +431,11 @@ start_server {tags {"scan network"}} { } } } + +start_server {tags {"scan network standalone"}} { + test_scan "standalone" +} + +start_cluster 1 0 {tags {"external:skip cluster scan"}} { + test_scan "cluster" +}