Accelerate hash table iterator with value prefetching (#1568)
This PR builds upon the [previous entry prefetching optimization](https://github.com/valkey-io/valkey/pull/1501) to further enhance performance by implementing value prefetching for hashtable iterators. ## Implementation Modified `hashtableInitIterator` to accept a new flags parameter, allowing control over iterator behavior. Implemented conditional value prefetching within `hashtableNext` based on the new `HASHTABLE_ITER_PREFETCH_VALUES` flag. When the flag is set, hashtableNext now calls `prefetchBucketValues` at the start of each new bucket, preemptively loading the values of filled entries into the CPU cache. The actual prefetching of values is performed using type-specific callback functions implemented in `server.c`: - For `robj` the `hashtableObjectPrefetchValue` callback is used to prefetch the value if not embeded. This implementation is specifically focused on main database iterations at this stage. Applying it to hashtables that hold other object types should not be problematic, but its performance benefits for those cases will need to be proven through testing and benchmarking. ## Performance ### Setup: - 64cores Graviton 3 Amazon EC2 instance. - 50 mil keys with different value sizes. - Running valkey server over RAM file system. - crc checksum and comperssion off. ### Action - save command. ### Results The results regarding the duration of “save” command was taken from “info all” command. ``` +--------------------+------------------+------------------+ | Prefetching | Value size (byte)| Time (seconds) | +--------------------+------------------+------------------+ | No | 100 | 20.112279 | | Yes | 100 | 12.758519 | | No | 40 | 16.945366 | | Yes | 40 | 10.902022 | | No | 20 | 9.817000 | | Yes | 20 | 9.626821 | | No | 10 | 9.71510 | | Yes | 10 | 9.510565 | +--------------------+------------------+------------------+ ``` The results largely align with our expectations, showing significant improvements for larger values (100 bytes and 40 bytes) that are stored outside the robj. For smaller values (20 bytes and 10 bytes) that are embedded within the robj, we see almost no improvement, which is as expected. However, the small improvement observed even for these embedded values is somewhat surprising. Given that we are not actively prefetching these embedded values, this minor performance gain was not anticipated. perf record on save command **without** value prefetching: ``` --99.98%--rdbSaveDb | |--91.38%--rdbSaveKeyValuePair | | | |--42.72%--rdbSaveRawString | | | | | |--26.69%--rdbWriteRaw | | | | | | | --25.75%--rioFileWrite.lto_priv.0 | | | | | --15.41%--rdbSaveLen | | | | | |--7.58%--rdbWriteRaw | | | | | | | --7.08%--rioFileWrite.lto_priv.0 | | | | | | | --6.54%--_IO_fwrite | | | | | | | | --7.42%--rdbWriteRaw.constprop.1 | | | | | --7.18%--rioFileWrite.lto_priv.0 | | | | | --6.73%--_IO_fwrite | | | | | |--40.44%--rdbSaveStringObject | | | --7.62%--rdbSaveObjectType | | | --7.39%--rdbWriteRaw.constprop.1 | | | --7.04%--rioFileWrite.lto_priv.0 | | | --6.59%--_IO_fwrite | | --7.33%--hashtableNext.constprop.1 | --6.28%--prefetchNextBucketEntries.lto_priv.0 ``` perf record on save command **with** value prefetching: ``` rdbSaveRio | --99.93%--rdbSaveDb | |--79.81%--rdbSaveKeyValuePair | | | |--66.79%--rdbSaveRawString | | | | | |--42.31%--rdbWriteRaw | | | | | | | --40.74%--rioFileWrite.lto_priv.0 | | | | | --23.37%--rdbSaveLen | | | | | |--11.78%--rdbWriteRaw | | | | | | | --11.03%--rioFileWrite.lto_priv.0 | | | | | | | --10.30%--_IO_fwrite | | | | | | | | | --10.98%--rdbWriteRaw.constprop.1 | | | | | --10.44%--rioFileWrite.lto_priv.0 | | | | | --9.74%--_IO_fwrite | | | | | | |--11.33%--rdbSaveObjectType | | | | | --10.96%--rdbWriteRaw.constprop.1 | | | | | --10.51%--rioFileWrite.lto_priv.0 | | | | | --9.75%--_IO_fwrite | | | | | | --0.77%--rdbSaveStringObject | --18.39%--hashtableNext | |--10.04%--hashtableObjectPrefetchValue | --6.06%--prefetchNextBucketEntries ``` Conclusions: The prefetching strategy appears to be working as intended, shifting the performance bottleneck from data access to I/O operations. The significant reduction in rdbSaveStringObject time suggests that string objects(which are the values) are being accessed more efficiently. Signed-off-by: NadavGigi <nadavgigi102@gmail.com>
This commit is contained in:
parent
99ed308817
commit
f2510783f9
@ -655,7 +655,7 @@ void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *cmd, int
|
|||||||
ACLResetFirstArgsForCommand(selector, id);
|
ACLResetFirstArgsForCommand(selector, id);
|
||||||
if (cmd->subcommands_ht) {
|
if (cmd->subcommands_ht) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitSafeIterator(&iter, cmd->subcommands_ht);
|
hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *sub = next;
|
struct serverCommand *sub = next;
|
||||||
@ -673,7 +673,7 @@ void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *cmd, int
|
|||||||
* found and the operation was performed. */
|
* found and the operation was performed. */
|
||||||
void ACLSetSelectorCommandBitsForCategory(hashtable *commands, aclSelector *selector, uint64_t cflag, int value) {
|
void ACLSetSelectorCommandBitsForCategory(hashtable *commands, aclSelector *selector, uint64_t cflag, int value) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, commands);
|
hashtableInitIterator(&iter, commands, 0);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *cmd = next;
|
struct serverCommand *cmd = next;
|
||||||
@ -741,7 +741,7 @@ void ACLCountCategoryBitsForCommands(hashtable *commands,
|
|||||||
unsigned long *off,
|
unsigned long *off,
|
||||||
uint64_t cflag) {
|
uint64_t cflag) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, commands);
|
hashtableInitIterator(&iter, commands, 0);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *cmd = next;
|
struct serverCommand *cmd = next;
|
||||||
@ -2765,7 +2765,7 @@ sds getAclErrorMessage(int acl_res, user *user, struct serverCommand *cmd, sds e
|
|||||||
/* ACL CAT category */
|
/* ACL CAT category */
|
||||||
void aclCatWithFlags(client *c, hashtable *commands, uint64_t cflag, int *arraylen) {
|
void aclCatWithFlags(client *c, hashtable *commands, uint64_t cflag, int *arraylen) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, commands);
|
hashtableInitIterator(&iter, commands, 0);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *cmd = next;
|
struct serverCommand *cmd = next;
|
||||||
|
@ -1891,7 +1891,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
|
|||||||
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
|
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||||
zset *zs = o->ptr;
|
zset *zs = o->ptr;
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, zs->ht);
|
hashtableInitIterator(&iter, zs->ht, 0);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
zskiplistNode *node = next;
|
zskiplistNode *node = next;
|
||||||
@ -2217,7 +2217,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
|
|||||||
if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr;
|
if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr;
|
||||||
if (rioWriteBulkLongLong(aof, j) == 0) goto werr;
|
if (rioWriteBulkLongLong(aof, j) == 0) goto werr;
|
||||||
|
|
||||||
kvs_it = kvstoreIteratorInit(db->keys);
|
kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES);
|
||||||
/* Iterate this DB writing every entry */
|
/* Iterate this DB writing every entry */
|
||||||
void *next;
|
void *next;
|
||||||
while (kvstoreIteratorNext(kvs_it, &next)) {
|
while (kvstoreIteratorNext(kvs_it, &next)) {
|
||||||
|
@ -910,7 +910,7 @@ void clusterCommand(client *c) {
|
|||||||
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
|
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
|
||||||
addReplyArrayLen(c, numkeys);
|
addReplyArrayLen(c, numkeys);
|
||||||
kvstoreHashtableIterator *kvs_di = NULL;
|
kvstoreHashtableIterator *kvs_di = NULL;
|
||||||
kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot);
|
kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot, 0);
|
||||||
for (unsigned int i = 0; i < numkeys; i++) {
|
for (unsigned int i = 0; i < numkeys; i++) {
|
||||||
void *next;
|
void *next;
|
||||||
serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next));
|
serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next));
|
||||||
|
@ -6347,7 +6347,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
|
|||||||
|
|
||||||
kvstoreHashtableIterator *kvs_di = NULL;
|
kvstoreHashtableIterator *kvs_di = NULL;
|
||||||
void *next;
|
void *next;
|
||||||
kvs_di = kvstoreGetHashtableSafeIterator(server.db->keys, hashslot);
|
kvs_di = kvstoreGetHashtableIterator(server.db->keys, hashslot, HASHTABLE_ITER_SAFE);
|
||||||
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
|
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
|
||||||
robj *valkey = next;
|
robj *valkey = next;
|
||||||
enterExecutionUnit(1, 0);
|
enterExecutionUnit(1, 0);
|
||||||
|
4
src/db.c
4
src/db.c
@ -895,9 +895,9 @@ void keysCommand(client *c) {
|
|||||||
kvstoreHashtableIterator *kvs_di = NULL;
|
kvstoreHashtableIterator *kvs_di = NULL;
|
||||||
kvstoreIterator *kvs_it = NULL;
|
kvstoreIterator *kvs_it = NULL;
|
||||||
if (pslot != -1) {
|
if (pslot != -1) {
|
||||||
kvs_di = kvstoreGetHashtableSafeIterator(c->db->keys, pslot);
|
kvs_di = kvstoreGetHashtableIterator(c->db->keys, pslot, HASHTABLE_ITER_SAFE);
|
||||||
} else {
|
} else {
|
||||||
kvs_it = kvstoreIteratorInit(c->db->keys);
|
kvs_it = kvstoreIteratorInit(c->db->keys, HASHTABLE_ITER_SAFE);
|
||||||
}
|
}
|
||||||
void *next;
|
void *next;
|
||||||
while (kvs_di ? kvstoreHashtableIteratorNext(kvs_di, &next) : kvstoreIteratorNext(kvs_it, &next)) {
|
while (kvs_di ? kvstoreHashtableIteratorNext(kvs_di, &next) : kvstoreIteratorNext(kvs_it, &next)) {
|
||||||
|
@ -207,7 +207,7 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
|
|||||||
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
|
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||||
zset *zs = o->ptr;
|
zset *zs = o->ptr;
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, zs->ht);
|
hashtableInitIterator(&iter, zs->ht, 0);
|
||||||
|
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
@ -291,7 +291,7 @@ void computeDatasetDigest(unsigned char *final) {
|
|||||||
for (int j = 0; j < server.dbnum; j++) {
|
for (int j = 0; j < server.dbnum; j++) {
|
||||||
serverDb *db = server.db + j;
|
serverDb *db = server.db + j;
|
||||||
if (kvstoreSize(db->keys) == 0) continue;
|
if (kvstoreSize(db->keys) == 0) continue;
|
||||||
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys);
|
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES);
|
||||||
|
|
||||||
/* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */
|
/* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */
|
||||||
aux = htonl(j);
|
aux = htonl(j);
|
||||||
|
110
src/hashtable.c
110
src/hashtable.c
@ -300,7 +300,7 @@ typedef struct {
|
|||||||
long index;
|
long index;
|
||||||
uint16_t pos_in_bucket;
|
uint16_t pos_in_bucket;
|
||||||
uint8_t table;
|
uint8_t table;
|
||||||
uint8_t safe;
|
uint8_t flags;
|
||||||
union {
|
union {
|
||||||
/* Unsafe iterator fingerprint for misuse detection. */
|
/* Unsafe iterator fingerprint for misuse detection. */
|
||||||
uint64_t fingerprint;
|
uint64_t fingerprint;
|
||||||
@ -936,6 +936,7 @@ static inline incrementalFind *incrementalFindFromOpaque(hashtableIncrementalFin
|
|||||||
|
|
||||||
/* Prefetches all filled entries in the given bucket to optimize future memory access. */
|
/* Prefetches all filled entries in the given bucket to optimize future memory access. */
|
||||||
static void prefetchBucketEntries(bucket *b) {
|
static void prefetchBucketEntries(bucket *b) {
|
||||||
|
if (!b->presence) return;
|
||||||
for (int pos = 0; pos < numBucketPositions(b); pos++) {
|
for (int pos = 0; pos < numBucketPositions(b); pos++) {
|
||||||
if (isPositionFilled(b, pos)) {
|
if (isPositionFilled(b, pos)) {
|
||||||
valkey_prefetch(b->entries[pos]);
|
valkey_prefetch(b->entries[pos]);
|
||||||
@ -979,6 +980,26 @@ static void prefetchNextBucketEntries(iter *iter, bucket *current_bucket) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Prefetches the values associated with the entries in the given bucket by
|
||||||
|
* calling the entryPrefetchValue callback in the hashtableType */
|
||||||
|
static void prefetchBucketValues(bucket *b, hashtable *ht) {
|
||||||
|
if (!b->presence) return;
|
||||||
|
assert(ht->type->entryPrefetchValue != NULL);
|
||||||
|
for (int pos = 0; pos < numBucketPositions(b); pos++) {
|
||||||
|
if (isPositionFilled(b, pos)) {
|
||||||
|
ht->type->entryPrefetchValue(b->entries[pos]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int isSafe(iter *iter) {
|
||||||
|
return (iter->flags & HASHTABLE_ITER_SAFE);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int shouldPrefetchValues(iter *iter) {
|
||||||
|
return (iter->flags & HASHTABLE_ITER_PREFETCH_VALUES);
|
||||||
|
}
|
||||||
|
|
||||||
/* --- API functions --- */
|
/* --- API functions --- */
|
||||||
|
|
||||||
/* Allocates and initializes a new hashtable specified by the given type. */
|
/* Allocates and initializes a new hashtable specified by the given type. */
|
||||||
@ -1792,31 +1813,32 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
|
|||||||
|
|
||||||
/* --- Iterator --- */
|
/* --- Iterator --- */
|
||||||
|
|
||||||
/* Initialize a iterator, that is not allowed to insert, delete or even lookup
|
/* Initialize an iterator for a hashtable.
|
||||||
* entries in the hashtable, because such operations can trigger incremental
|
|
||||||
* rehashing which moves entries around and confuses the iterator. Only
|
|
||||||
* hashtableNext is allowed. Each entry is returned exactly once. Call
|
|
||||||
* hashtableResetIterator when you are done. See also
|
|
||||||
* hashtableInitSafeIterator. */
|
|
||||||
void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) {
|
|
||||||
iter *iter;
|
|
||||||
iter = iteratorFromOpaque(iterator);
|
|
||||||
iter->hashtable = ht;
|
|
||||||
iter->table = 0;
|
|
||||||
iter->index = -1;
|
|
||||||
iter->safe = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Initialize a safe iterator, which is allowed to modify the hash table while
|
|
||||||
* iterating. It pauses incremental rehashing to prevent entries from moving
|
|
||||||
* around. Call hashtableNext to fetch each entry. You must call
|
|
||||||
* hashtableResetIterator when you are done with a safe iterator.
|
|
||||||
*
|
*
|
||||||
* It's allowed to insert and replace entries. Deleting entries is only allowed
|
* The 'flags' argument can be used to tweak the behaviour. It's a bitwise-or
|
||||||
* for the entry that was just returned by hashtableNext. Deleting other entries
|
* (zero means no flags) of the following:
|
||||||
* is possible, but doing so can cause internal fragmentation, so don't.
|
|
||||||
*
|
*
|
||||||
* Guarantees:
|
* - HASHTABLE_ITER_SAFE: Use a safe iterator that can handle
|
||||||
|
* modifications to the hash table while iterating.
|
||||||
|
* - HASHTABLE_ITER_PREFETCH_VALUES: Enables prefetching of entries values,
|
||||||
|
* which can improve performance in some scenarios. Because the hashtable is generic and
|
||||||
|
* doesn't care which object we store, the callback entryPrefetchValue must be set to help
|
||||||
|
* us prefetch necessary fields of specific object types stored in the hashtable.
|
||||||
|
*
|
||||||
|
* For a non-safe iterator (default, when HASHTABLE_ITER_SAFE is not set):
|
||||||
|
* It is not allowed to insert, delete or even lookup entries in the hashtable,
|
||||||
|
* because such operations can trigger incremental rehashing which moves entries
|
||||||
|
* around and confuses the iterator. Only hashtableNext is allowed. Each entry
|
||||||
|
* is returned exactly once.
|
||||||
|
*
|
||||||
|
* For a safe iterator (when HASHTABLE_ITER_SAFE is set):
|
||||||
|
* It is allowed to modify the hash table while iterating. It pauses incremental
|
||||||
|
* rehashing to prevent entries from moving around. It's allowed to insert and
|
||||||
|
* replace entries. Deleting entries is only allowed for the entry that was just
|
||||||
|
* returned by hashtableNext. Deleting other entries is possible, but doing so
|
||||||
|
* can cause internal fragmentation, so don't.
|
||||||
|
*
|
||||||
|
* Guarantees for safe iterators:
|
||||||
*
|
*
|
||||||
* - Entries that are in the hash table for the entire iteration are returned
|
* - Entries that are in the hash table for the entire iteration are returned
|
||||||
* exactly once.
|
* exactly once.
|
||||||
@ -1829,18 +1851,31 @@ void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) {
|
|||||||
*
|
*
|
||||||
* - Entries that are inserted during the iteration may or may not be returned
|
* - Entries that are inserted during the iteration may or may not be returned
|
||||||
* by the iterator.
|
* by the iterator.
|
||||||
|
*
|
||||||
|
* Call hashtableNext to fetch each entry. You must call hashtableResetIterator
|
||||||
|
* when you are done with the iterator.
|
||||||
*/
|
*/
|
||||||
void hashtableInitSafeIterator(hashtableIterator *iterator, hashtable *ht) {
|
void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht, uint8_t flags) {
|
||||||
hashtableInitIterator(iterator, ht);
|
iter *iter;
|
||||||
|
iter = iteratorFromOpaque(iterator);
|
||||||
|
iter->hashtable = ht;
|
||||||
|
iter->table = 0;
|
||||||
|
iter->index = -1;
|
||||||
|
iter->flags = flags;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Reinitializes the iterator for the provided hashtable while
|
||||||
|
* preserving the flags from its previous initialization. */
|
||||||
|
void hashtableReinitIterator(hashtableIterator *iterator, hashtable *ht) {
|
||||||
iter *iter = iteratorFromOpaque(iterator);
|
iter *iter = iteratorFromOpaque(iterator);
|
||||||
iter->safe = 1;
|
hashtableInitIterator(iterator, ht, iter->flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Resets a stack-allocated iterator. */
|
/* Resets a stack-allocated iterator. */
|
||||||
void hashtableResetIterator(hashtableIterator *iterator) {
|
void hashtableResetIterator(hashtableIterator *iterator) {
|
||||||
iter *iter = iteratorFromOpaque(iterator);
|
iter *iter = iteratorFromOpaque(iterator);
|
||||||
if (!(iter->index == -1 && iter->table == 0)) {
|
if (!(iter->index == -1 && iter->table == 0)) {
|
||||||
if (iter->safe) {
|
if (isSafe(iter)) {
|
||||||
hashtableResumeRehashing(iter->hashtable);
|
hashtableResumeRehashing(iter->hashtable);
|
||||||
assert(iter->hashtable->pause_rehash >= 0);
|
assert(iter->hashtable->pause_rehash >= 0);
|
||||||
} else {
|
} else {
|
||||||
@ -1850,21 +1885,13 @@ void hashtableResetIterator(hashtableIterator *iterator) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Allocates and initializes an iterator. */
|
/* Allocates and initializes an iterator. */
|
||||||
hashtableIterator *hashtableCreateIterator(hashtable *ht) {
|
hashtableIterator *hashtableCreateIterator(hashtable *ht, uint8_t flags) {
|
||||||
iter *iter = zmalloc(sizeof(*iter));
|
iter *iter = zmalloc(sizeof(*iter));
|
||||||
hashtableIterator *opaque = iteratorToOpaque(iter);
|
hashtableIterator *opaque = iteratorToOpaque(iter);
|
||||||
hashtableInitIterator(opaque, ht);
|
hashtableInitIterator(opaque, ht, flags);
|
||||||
return opaque;
|
return opaque;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Allocates and initializes a safe iterator. */
|
|
||||||
hashtableIterator *hashtableCreateSafeIterator(hashtable *ht) {
|
|
||||||
hashtableIterator *iterator = hashtableCreateIterator(ht);
|
|
||||||
iter *iter = iteratorFromOpaque(iterator);
|
|
||||||
iter->safe = 1;
|
|
||||||
return iterator;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Resets and frees the memory of an allocated iterator, i.e. one created using
|
/* Resets and frees the memory of an allocated iterator, i.e. one created using
|
||||||
* hashtableCreate(Safe)Iterator. */
|
* hashtableCreate(Safe)Iterator. */
|
||||||
void hashtableReleaseIterator(hashtableIterator *iterator) {
|
void hashtableReleaseIterator(hashtableIterator *iterator) {
|
||||||
@ -1880,7 +1907,7 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) {
|
|||||||
while (1) {
|
while (1) {
|
||||||
if (iter->index == -1 && iter->table == 0) {
|
if (iter->index == -1 && iter->table == 0) {
|
||||||
/* It's the first call to next. */
|
/* It's the first call to next. */
|
||||||
if (iter->safe) {
|
if (isSafe(iter)) {
|
||||||
hashtablePauseRehashing(iter->hashtable);
|
hashtablePauseRehashing(iter->hashtable);
|
||||||
iter->last_seen_size = iter->hashtable->used[iter->table];
|
iter->last_seen_size = iter->hashtable->used[iter->table];
|
||||||
} else {
|
} else {
|
||||||
@ -1907,7 +1934,7 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) {
|
|||||||
iter->bucket = getChildBucket(iter->bucket);
|
iter->bucket = getChildBucket(iter->bucket);
|
||||||
} else if (iter->pos_in_bucket >= ENTRIES_PER_BUCKET) {
|
} else if (iter->pos_in_bucket >= ENTRIES_PER_BUCKET) {
|
||||||
/* Bucket index done. */
|
/* Bucket index done. */
|
||||||
if (iter->safe) {
|
if (isSafe(iter)) {
|
||||||
/* If entries in this bucket chain have been deleted,
|
/* If entries in this bucket chain have been deleted,
|
||||||
* they've left empty spaces in the buckets. The chain is
|
* they've left empty spaces in the buckets. The chain is
|
||||||
* not automatically compacted when rehashing is paused. If
|
* not automatically compacted when rehashing is paused. If
|
||||||
@ -1936,6 +1963,9 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) {
|
|||||||
}
|
}
|
||||||
bucket *b = iter->bucket;
|
bucket *b = iter->bucket;
|
||||||
if (iter->pos_in_bucket == 0) {
|
if (iter->pos_in_bucket == 0) {
|
||||||
|
if (shouldPrefetchValues(iter)) {
|
||||||
|
prefetchBucketValues(b, iter->hashtable);
|
||||||
|
}
|
||||||
prefetchNextBucketEntries(iter, b);
|
prefetchNextBucketEntries(iter, b);
|
||||||
}
|
}
|
||||||
if (!isPositionFilled(b, iter->pos_in_bucket)) {
|
if (!isPositionFilled(b, iter->pos_in_bucket)) {
|
||||||
|
@ -60,6 +60,8 @@ typedef struct {
|
|||||||
/* Callback to free an entry when it's overwritten or deleted.
|
/* Callback to free an entry when it's overwritten or deleted.
|
||||||
* Optional. */
|
* Optional. */
|
||||||
void (*entryDestructor)(void *entry);
|
void (*entryDestructor)(void *entry);
|
||||||
|
/* Callback to prefetch the value associated with a hashtable entry. */
|
||||||
|
void (*entryPrefetchValue)(const void *entry);
|
||||||
/* Callback to control when resizing should be allowed. */
|
/* Callback to control when resizing should be allowed. */
|
||||||
int (*resizeAllowed)(size_t moreMem, double usedRatio);
|
int (*resizeAllowed)(size_t moreMem, double usedRatio);
|
||||||
/* Invoked at the start of rehashing. */
|
/* Invoked at the start of rehashing. */
|
||||||
@ -91,6 +93,10 @@ typedef void (*hashtableScanFunction)(void *privdata, void *entry);
|
|||||||
/* Scan flags */
|
/* Scan flags */
|
||||||
#define HASHTABLE_SCAN_EMIT_REF (1 << 0)
|
#define HASHTABLE_SCAN_EMIT_REF (1 << 0)
|
||||||
|
|
||||||
|
/* Iterator flags */
|
||||||
|
#define HASHTABLE_ITER_SAFE (1 << 0)
|
||||||
|
#define HASHTABLE_ITER_PREFETCH_VALUES (1 << 1)
|
||||||
|
|
||||||
/* --- Prototypes --- */
|
/* --- Prototypes --- */
|
||||||
|
|
||||||
/* Hash function (global seed) */
|
/* Hash function (global seed) */
|
||||||
@ -144,11 +150,10 @@ int hashtableIncrementalFindGetResult(hashtableIncrementalFindState *state, void
|
|||||||
/* Iteration & scan */
|
/* Iteration & scan */
|
||||||
size_t hashtableScan(hashtable *ht, size_t cursor, hashtableScanFunction fn, void *privdata);
|
size_t hashtableScan(hashtable *ht, size_t cursor, hashtableScanFunction fn, void *privdata);
|
||||||
size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction fn, void *privdata, void *(*defragfn)(void *), int flags);
|
size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction fn, void *privdata, void *(*defragfn)(void *), int flags);
|
||||||
void hashtableInitIterator(hashtableIterator *iter, hashtable *ht);
|
void hashtableInitIterator(hashtableIterator *iter, hashtable *ht, uint8_t flags);
|
||||||
void hashtableInitSafeIterator(hashtableIterator *iter, hashtable *ht);
|
void hashtableReinitIterator(hashtableIterator *iterator, hashtable *ht);
|
||||||
void hashtableResetIterator(hashtableIterator *iter);
|
void hashtableResetIterator(hashtableIterator *iter);
|
||||||
hashtableIterator *hashtableCreateIterator(hashtable *ht);
|
hashtableIterator *hashtableCreateIterator(hashtable *ht, uint8_t flags);
|
||||||
hashtableIterator *hashtableCreateSafeIterator(hashtable *ht);
|
|
||||||
void hashtableReleaseIterator(hashtableIterator *iter);
|
void hashtableReleaseIterator(hashtableIterator *iter);
|
||||||
int hashtableNext(hashtableIterator *iter, void **elemptr);
|
int hashtableNext(hashtableIterator *iter, void **elemptr);
|
||||||
|
|
||||||
|
@ -467,7 +467,7 @@ void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full) {
|
|||||||
hashtableStats *mainHtStats = NULL;
|
hashtableStats *mainHtStats = NULL;
|
||||||
hashtableStats *rehashHtStats = NULL;
|
hashtableStats *rehashHtStats = NULL;
|
||||||
hashtable *ht;
|
hashtable *ht;
|
||||||
kvstoreIterator *kvs_it = kvstoreIteratorInit(kvs);
|
kvstoreIterator *kvs_it = kvstoreIteratorInit(kvs, HASHTABLE_ITER_SAFE);
|
||||||
while ((ht = kvstoreIteratorNextHashtable(kvs_it))) {
|
while ((ht = kvstoreIteratorNextHashtable(kvs_it))) {
|
||||||
hashtableStats *stats = hashtableGetStatsHt(ht, 0, full);
|
hashtableStats *stats = hashtableGetStatsHt(ht, 0, full);
|
||||||
if (!mainHtStats) {
|
if (!mainHtStats) {
|
||||||
@ -576,12 +576,12 @@ int kvstoreNumHashtables(kvstore *kvs) {
|
|||||||
/* Returns kvstore iterator that can be used to iterate through sub-hash tables.
|
/* Returns kvstore iterator that can be used to iterate through sub-hash tables.
|
||||||
*
|
*
|
||||||
* The caller should free the resulting kvs_it with kvstoreIteratorRelease. */
|
* The caller should free the resulting kvs_it with kvstoreIteratorRelease. */
|
||||||
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs) {
|
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs, uint8_t flags) {
|
||||||
kvstoreIterator *kvs_it = zmalloc(sizeof(*kvs_it));
|
kvstoreIterator *kvs_it = zmalloc(sizeof(*kvs_it));
|
||||||
kvs_it->kvs = kvs;
|
kvs_it->kvs = kvs;
|
||||||
kvs_it->didx = -1;
|
kvs_it->didx = -1;
|
||||||
kvs_it->next_didx = kvstoreGetFirstNonEmptyHashtableIndex(kvs_it->kvs); /* Finds first non-empty hashtable index. */
|
kvs_it->next_didx = kvstoreGetFirstNonEmptyHashtableIndex(kvs_it->kvs); /* Finds first non-empty hashtable index. */
|
||||||
hashtableInitSafeIterator(&kvs_it->di, NULL);
|
hashtableInitIterator(&kvs_it->di, NULL, flags);
|
||||||
return kvs_it;
|
return kvs_it;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -625,7 +625,7 @@ int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next) {
|
|||||||
/* No current hashtable or reached the end of the hash table. */
|
/* No current hashtable or reached the end of the hash table. */
|
||||||
hashtable *ht = kvstoreIteratorNextHashtable(kvs_it);
|
hashtable *ht = kvstoreIteratorNextHashtable(kvs_it);
|
||||||
if (!ht) return 0;
|
if (!ht) return 0;
|
||||||
hashtableInitSafeIterator(&kvs_it->di, ht);
|
hashtableReinitIterator(&kvs_it->di, ht);
|
||||||
return hashtableNext(&kvs_it->di, next);
|
return hashtableNext(&kvs_it->di, next);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -691,23 +691,15 @@ unsigned long kvstoreHashtableSize(kvstore *kvs, int didx) {
|
|||||||
return hashtableSize(ht);
|
return hashtableSize(ht);
|
||||||
}
|
}
|
||||||
|
|
||||||
kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx) {
|
kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx, uint8_t flags) {
|
||||||
kvstoreHashtableIterator *kvs_di = zmalloc(sizeof(*kvs_di));
|
kvstoreHashtableIterator *kvs_di = zmalloc(sizeof(*kvs_di));
|
||||||
kvs_di->kvs = kvs;
|
kvs_di->kvs = kvs;
|
||||||
kvs_di->didx = didx;
|
kvs_di->didx = didx;
|
||||||
hashtableInitIterator(&kvs_di->di, kvstoreGetHashtable(kvs, didx));
|
hashtableInitIterator(&kvs_di->di, kvstoreGetHashtable(kvs, didx), flags);
|
||||||
return kvs_di;
|
return kvs_di;
|
||||||
}
|
}
|
||||||
|
|
||||||
kvstoreHashtableIterator *kvstoreGetHashtableSafeIterator(kvstore *kvs, int didx) {
|
/* Free the kvs_di returned by kvstoreGetHashtableIterator. */
|
||||||
kvstoreHashtableIterator *kvs_di = zmalloc(sizeof(*kvs_di));
|
|
||||||
kvs_di->kvs = kvs;
|
|
||||||
kvs_di->didx = didx;
|
|
||||||
hashtableInitSafeIterator(&kvs_di->di, kvstoreGetHashtable(kvs, didx));
|
|
||||||
return kvs_di;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Free the kvs_di returned by kvstoreGetHashtableIterator and kvstoreGetHashtableSafeIterator. */
|
|
||||||
void kvstoreReleaseHashtableIterator(kvstoreHashtableIterator *kvs_di) {
|
void kvstoreReleaseHashtableIterator(kvstoreHashtableIterator *kvs_di) {
|
||||||
/* The hashtable may be deleted during the iteration process, so here need to check for NULL. */
|
/* The hashtable may be deleted during the iteration process, so here need to check for NULL. */
|
||||||
if (kvstoreGetHashtable(kvs_di->kvs, kvs_di->didx)) {
|
if (kvstoreGetHashtable(kvs_di->kvs, kvs_di->didx)) {
|
||||||
|
@ -43,7 +43,7 @@ void kvstoreHashtableTrackMemUsage(hashtable *s, ssize_t delta);
|
|||||||
size_t kvstoreHashtableMetadataSize(void);
|
size_t kvstoreHashtableMetadataSize(void);
|
||||||
|
|
||||||
/* kvstore iterator specific functions */
|
/* kvstore iterator specific functions */
|
||||||
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs);
|
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs, uint8_t flags);
|
||||||
void kvstoreIteratorRelease(kvstoreIterator *kvs_it);
|
void kvstoreIteratorRelease(kvstoreIterator *kvs_it);
|
||||||
int kvstoreIteratorGetCurrentHashtableIndex(kvstoreIterator *kvs_it);
|
int kvstoreIteratorGetCurrentHashtableIndex(kvstoreIterator *kvs_it);
|
||||||
int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next);
|
int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next);
|
||||||
@ -57,8 +57,7 @@ unsigned long kvstoreHashtableRehashingCount(kvstore *kvs);
|
|||||||
|
|
||||||
/* Specific hashtable access by hashtable-index */
|
/* Specific hashtable access by hashtable-index */
|
||||||
unsigned long kvstoreHashtableSize(kvstore *kvs, int didx);
|
unsigned long kvstoreHashtableSize(kvstore *kvs, int didx);
|
||||||
kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx);
|
kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx, uint8_t flags);
|
||||||
kvstoreHashtableIterator *kvstoreGetHashtableSafeIterator(kvstore *kvs, int didx);
|
|
||||||
void kvstoreReleaseHashtableIterator(kvstoreHashtableIterator *kvs_id);
|
void kvstoreReleaseHashtableIterator(kvstoreHashtableIterator *kvs_id);
|
||||||
int kvstoreHashtableIteratorNext(kvstoreHashtableIterator *kvs_di, void **next);
|
int kvstoreHashtableIteratorNext(kvstoreHashtableIterator *kvs_di, void **next);
|
||||||
int kvstoreHashtableRandomEntry(kvstore *kvs, int didx, void **found);
|
int kvstoreHashtableRandomEntry(kvstore *kvs, int didx, void **found);
|
||||||
|
@ -528,7 +528,7 @@ void fillCommandCDF(client *c, struct hdr_histogram *histogram) {
|
|||||||
* a per command cumulative distribution of latencies. */
|
* a per command cumulative distribution of latencies. */
|
||||||
void latencyAllCommandsFillCDF(client *c, hashtable *commands, int *command_with_data) {
|
void latencyAllCommandsFillCDF(client *c, hashtable *commands, int *command_with_data) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitSafeIterator(&iter, commands);
|
hashtableInitIterator(&iter, commands, HASHTABLE_ITER_SAFE);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *cmd = next;
|
struct serverCommand *cmd = next;
|
||||||
@ -565,7 +565,7 @@ void latencySpecificCommandsFillCDF(client *c) {
|
|||||||
|
|
||||||
if (cmd->subcommands_ht) {
|
if (cmd->subcommands_ht) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitSafeIterator(&iter, cmd->subcommands_ht);
|
hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *sub = next;
|
struct serverCommand *sub = next;
|
||||||
|
@ -12162,7 +12162,7 @@ int moduleFreeCommand(struct ValkeyModule *module, struct serverCommand *cmd) {
|
|||||||
if (cmd->subcommands_ht) {
|
if (cmd->subcommands_ht) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
void *next;
|
void *next;
|
||||||
hashtableInitSafeIterator(&iter, cmd->subcommands_ht);
|
hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *sub = next;
|
struct serverCommand *sub = next;
|
||||||
if (moduleFreeCommand(module, sub) != C_OK) continue;
|
if (moduleFreeCommand(module, sub) != C_OK) continue;
|
||||||
@ -12185,7 +12185,7 @@ void moduleUnregisterCommands(struct ValkeyModule *module) {
|
|||||||
/* Unregister all the commands registered by this module. */
|
/* Unregister all the commands registered by this module. */
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
void *next;
|
void *next;
|
||||||
hashtableInitSafeIterator(&iter, server.commands);
|
hashtableInitIterator(&iter, server.commands, HASHTABLE_ITER_SAFE);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *cmd = next;
|
struct serverCommand *cmd = next;
|
||||||
if (moduleFreeCommand(module, cmd) != C_OK) continue;
|
if (moduleFreeCommand(module, cmd) != C_OK) continue;
|
||||||
|
@ -630,7 +630,7 @@ void dismissSetObject(robj *o, size_t size_hint) {
|
|||||||
* page size, and there's a high chance we'll actually dismiss something. */
|
* page size, and there's a high chance we'll actually dismiss something. */
|
||||||
if (size_hint / hashtableSize(ht) >= server.page_size) {
|
if (size_hint / hashtableSize(ht) >= server.page_size) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, ht);
|
hashtableInitIterator(&iter, ht, 0);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
sds item = next;
|
sds item = next;
|
||||||
@ -682,7 +682,7 @@ void dismissHashObject(robj *o, size_t size_hint) {
|
|||||||
* a page size, and there's a high chance we'll actually dismiss something. */
|
* a page size, and there's a high chance we'll actually dismiss something. */
|
||||||
if (size_hint / hashtableSize(ht) >= server.page_size) {
|
if (size_hint / hashtableSize(ht) >= server.page_size) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, ht);
|
hashtableInitIterator(&iter, ht, 0);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
dismissHashTypeEntry(next);
|
dismissHashTypeEntry(next);
|
||||||
@ -1156,7 +1156,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
|
|||||||
asize = sizeof(*o) + hashtableMemUsage(ht);
|
asize = sizeof(*o) + hashtableMemUsage(ht);
|
||||||
|
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, ht);
|
hashtableInitIterator(&iter, ht, 0);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next) && samples < sample_size) {
|
while (hashtableNext(&iter, &next) && samples < sample_size) {
|
||||||
sds element = next;
|
sds element = next;
|
||||||
@ -1197,7 +1197,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
|
|||||||
} else if (o->encoding == OBJ_ENCODING_HASHTABLE) {
|
} else if (o->encoding == OBJ_ENCODING_HASHTABLE) {
|
||||||
hashtable *ht = o->ptr;
|
hashtable *ht = o->ptr;
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, ht);
|
hashtableInitIterator(&iter, ht, 0);
|
||||||
void *next;
|
void *next;
|
||||||
|
|
||||||
asize = sizeof(*o) + hashtableMemUsage(ht);
|
asize = sizeof(*o) + hashtableMemUsage(ht);
|
||||||
|
@ -366,7 +366,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty
|
|||||||
void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) {
|
void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) {
|
||||||
if (!kvstoreHashtableSize(server.pubsubshard_channels, slot)) return;
|
if (!kvstoreHashtableSize(server.pubsubshard_channels, slot)) return;
|
||||||
|
|
||||||
kvstoreHashtableIterator *kvs_di = kvstoreGetHashtableSafeIterator(server.pubsubshard_channels, slot);
|
kvstoreHashtableIterator *kvs_di = kvstoreGetHashtableIterator(server.pubsubshard_channels, slot, HASHTABLE_ITER_SAFE);
|
||||||
void *element;
|
void *element;
|
||||||
while (kvstoreHashtableIteratorNext(kvs_di, &element)) {
|
while (kvstoreHashtableIteratorNext(kvs_di, &element)) {
|
||||||
dict *clients = element;
|
dict *clients = element;
|
||||||
@ -730,7 +730,7 @@ void channelList(client *c, sds pat, kvstore *pubsub_channels) {
|
|||||||
replylen = addReplyDeferredLen(c);
|
replylen = addReplyDeferredLen(c);
|
||||||
for (unsigned int i = 0; i < slot_cnt; i++) {
|
for (unsigned int i = 0; i < slot_cnt; i++) {
|
||||||
if (!kvstoreHashtableSize(pubsub_channels, i)) continue;
|
if (!kvstoreHashtableSize(pubsub_channels, i)) continue;
|
||||||
kvstoreHashtableIterator *kvs_di = kvstoreGetHashtableIterator(pubsub_channels, i);
|
kvstoreHashtableIterator *kvs_di = kvstoreGetHashtableIterator(pubsub_channels, i, 0);
|
||||||
void *next;
|
void *next;
|
||||||
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
|
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
|
||||||
dict *clients = next;
|
dict *clients = next;
|
||||||
|
@ -887,7 +887,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
|
|||||||
nwritten += n;
|
nwritten += n;
|
||||||
|
|
||||||
hashtableIterator iterator;
|
hashtableIterator iterator;
|
||||||
hashtableInitIterator(&iterator, set);
|
hashtableInitIterator(&iterator, set, 0);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iterator, &next)) {
|
while (hashtableNext(&iterator, &next)) {
|
||||||
sds ele = next;
|
sds ele = next;
|
||||||
@ -959,7 +959,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
|
|||||||
nwritten += n;
|
nwritten += n;
|
||||||
|
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, ht);
|
hashtableInitIterator(&iter, ht, 0);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
sds field = hashTypeEntryGetField(next);
|
sds field = hashTypeEntryGetField(next);
|
||||||
@ -1349,7 +1349,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
|
|||||||
if ((res = rdbSaveLen(rdb, expires_size)) < 0) goto werr;
|
if ((res = rdbSaveLen(rdb, expires_size)) < 0) goto werr;
|
||||||
written += res;
|
written += res;
|
||||||
|
|
||||||
kvs_it = kvstoreIteratorInit(db->keys);
|
kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES);
|
||||||
int last_slot = -1;
|
int last_slot = -1;
|
||||||
/* Iterate this DB writing every entry */
|
/* Iterate this DB writing every entry */
|
||||||
void *next;
|
void *next;
|
||||||
|
29
src/server.c
29
src/server.c
@ -577,6 +577,15 @@ const void *hashtableObjectGetKey(const void *entry) {
|
|||||||
return objectGetKey(entry);
|
return objectGetKey(entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Prefetch the value if it's not embedded. */
|
||||||
|
void hashtableObjectPrefetchValue(const void *entry) {
|
||||||
|
const robj *obj = entry;
|
||||||
|
if (obj->encoding != OBJ_ENCODING_EMBSTR &&
|
||||||
|
obj->encoding != OBJ_ENCODING_INT) {
|
||||||
|
valkey_prefetch(obj->ptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int hashtableObjKeyCompare(const void *key1, const void *key2) {
|
int hashtableObjKeyCompare(const void *key1, const void *key2) {
|
||||||
const robj *o1 = key1, *o2 = key2;
|
const robj *o1 = key1, *o2 = key2;
|
||||||
return hashtableSdsKeyCompare(o1->ptr, o2->ptr);
|
return hashtableSdsKeyCompare(o1->ptr, o2->ptr);
|
||||||
@ -589,6 +598,7 @@ void hashtableObjectDestructor(void *val) {
|
|||||||
|
|
||||||
/* Kvstore->keys, keys are sds strings, vals are Objects. */
|
/* Kvstore->keys, keys are sds strings, vals are Objects. */
|
||||||
hashtableType kvstoreKeysHashtableType = {
|
hashtableType kvstoreKeysHashtableType = {
|
||||||
|
.entryPrefetchValue = hashtableObjectPrefetchValue,
|
||||||
.entryGetKey = hashtableObjectGetKey,
|
.entryGetKey = hashtableObjectGetKey,
|
||||||
.hashFunction = hashtableSdsHash,
|
.hashFunction = hashtableSdsHash,
|
||||||
.keyCompare = hashtableSdsKeyCompare,
|
.keyCompare = hashtableSdsKeyCompare,
|
||||||
@ -602,6 +612,7 @@ hashtableType kvstoreKeysHashtableType = {
|
|||||||
|
|
||||||
/* Kvstore->expires */
|
/* Kvstore->expires */
|
||||||
hashtableType kvstoreExpiresHashtableType = {
|
hashtableType kvstoreExpiresHashtableType = {
|
||||||
|
.entryPrefetchValue = hashtableObjectPrefetchValue,
|
||||||
.entryGetKey = hashtableObjectGetKey,
|
.entryGetKey = hashtableObjectGetKey,
|
||||||
.hashFunction = hashtableSdsHash,
|
.hashFunction = hashtableSdsHash,
|
||||||
.keyCompare = hashtableSdsKeyCompare,
|
.keyCompare = hashtableSdsKeyCompare,
|
||||||
@ -3205,7 +3216,7 @@ void populateCommandTable(void) {
|
|||||||
void resetCommandTableStats(hashtable *commands) {
|
void resetCommandTableStats(hashtable *commands) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
void *next;
|
void *next;
|
||||||
hashtableInitSafeIterator(&iter, commands);
|
hashtableInitIterator(&iter, commands, HASHTABLE_ITER_SAFE);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *c = next;
|
struct serverCommand *c = next;
|
||||||
c->microseconds = 0;
|
c->microseconds = 0;
|
||||||
@ -4988,7 +4999,7 @@ void addReplyCommandSubCommands(client *c,
|
|||||||
|
|
||||||
void *next;
|
void *next;
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitSafeIterator(&iter, cmd->subcommands_ht);
|
hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *sub = next;
|
struct serverCommand *sub = next;
|
||||||
if (use_map) addReplyBulkCBuffer(c, sub->fullname, sdslen(sub->fullname));
|
if (use_map) addReplyBulkCBuffer(c, sub->fullname, sdslen(sub->fullname));
|
||||||
@ -5150,7 +5161,7 @@ void commandCommand(client *c) {
|
|||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
void *next;
|
void *next;
|
||||||
addReplyArrayLen(c, hashtableSize(server.commands));
|
addReplyArrayLen(c, hashtableSize(server.commands));
|
||||||
hashtableInitIterator(&iter, server.commands);
|
hashtableInitIterator(&iter, server.commands, 0);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *cmd = next;
|
struct serverCommand *cmd = next;
|
||||||
addReplyCommandInfo(c, cmd);
|
addReplyCommandInfo(c, cmd);
|
||||||
@ -5209,7 +5220,7 @@ int shouldFilterFromCommandList(struct serverCommand *cmd, commandListFilter *fi
|
|||||||
void commandListWithFilter(client *c, hashtable *commands, commandListFilter filter, int *numcmds) {
|
void commandListWithFilter(client *c, hashtable *commands, commandListFilter filter, int *numcmds) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
void *next;
|
void *next;
|
||||||
hashtableInitIterator(&iter, commands);
|
hashtableInitIterator(&iter, commands, 0);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *cmd = next;
|
struct serverCommand *cmd = next;
|
||||||
if (!shouldFilterFromCommandList(cmd, &filter)) {
|
if (!shouldFilterFromCommandList(cmd, &filter)) {
|
||||||
@ -5228,7 +5239,7 @@ void commandListWithFilter(client *c, hashtable *commands, commandListFilter fil
|
|||||||
void commandListWithoutFilter(client *c, hashtable *commands, int *numcmds) {
|
void commandListWithoutFilter(client *c, hashtable *commands, int *numcmds) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
void *next;
|
void *next;
|
||||||
hashtableInitIterator(&iter, commands);
|
hashtableInitIterator(&iter, commands, 0);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *cmd = next;
|
struct serverCommand *cmd = next;
|
||||||
addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
|
addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
|
||||||
@ -5290,7 +5301,7 @@ void commandInfoCommand(client *c) {
|
|||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
void *next;
|
void *next;
|
||||||
addReplyArrayLen(c, hashtableSize(server.commands));
|
addReplyArrayLen(c, hashtableSize(server.commands));
|
||||||
hashtableInitIterator(&iter, server.commands);
|
hashtableInitIterator(&iter, server.commands, 0);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *cmd = next;
|
struct serverCommand *cmd = next;
|
||||||
addReplyCommandInfo(c, cmd);
|
addReplyCommandInfo(c, cmd);
|
||||||
@ -5312,7 +5323,7 @@ void commandDocsCommand(client *c) {
|
|||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
void *next;
|
void *next;
|
||||||
addReplyMapLen(c, hashtableSize(server.commands));
|
addReplyMapLen(c, hashtableSize(server.commands));
|
||||||
hashtableInitIterator(&iter, server.commands);
|
hashtableInitIterator(&iter, server.commands, 0);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *cmd = next;
|
struct serverCommand *cmd = next;
|
||||||
addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
|
addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
|
||||||
@ -5441,7 +5452,7 @@ const char *getSafeInfoString(const char *s, size_t len, char **tmp) {
|
|||||||
sds genValkeyInfoStringCommandStats(sds info, hashtable *commands) {
|
sds genValkeyInfoStringCommandStats(sds info, hashtable *commands) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
void *next;
|
void *next;
|
||||||
hashtableInitSafeIterator(&iter, commands);
|
hashtableInitIterator(&iter, commands, HASHTABLE_ITER_SAFE);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *c = next;
|
struct serverCommand *c = next;
|
||||||
char *tmpsafe;
|
char *tmpsafe;
|
||||||
@ -5478,7 +5489,7 @@ sds genValkeyInfoStringACLStats(sds info) {
|
|||||||
sds genValkeyInfoStringLatencyStats(sds info, hashtable *commands) {
|
sds genValkeyInfoStringLatencyStats(sds info, hashtable *commands) {
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
void *next;
|
void *next;
|
||||||
hashtableInitSafeIterator(&iter, commands);
|
hashtableInitIterator(&iter, commands, HASHTABLE_ITER_SAFE);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
struct serverCommand *c = next;
|
struct serverCommand *c = next;
|
||||||
char *tmpsafe;
|
char *tmpsafe;
|
||||||
|
@ -447,7 +447,7 @@ void sortCommandGeneric(client *c, int readonly) {
|
|||||||
} else if (sortval->type == OBJ_ZSET) {
|
} else if (sortval->type == OBJ_ZSET) {
|
||||||
hashtable *ht = ((zset *)sortval->ptr)->ht;
|
hashtable *ht = ((zset *)sortval->ptr)->ht;
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, ht);
|
hashtableInitIterator(&iter, ht, 0);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
zskiplistNode *node = next;
|
zskiplistNode *node = next;
|
||||||
|
@ -382,7 +382,7 @@ void hashTypeInitIterator(robj *subject, hashTypeIterator *hi) {
|
|||||||
hi->fptr = NULL;
|
hi->fptr = NULL;
|
||||||
hi->vptr = NULL;
|
hi->vptr = NULL;
|
||||||
} else if (hi->encoding == OBJ_ENCODING_HASHTABLE) {
|
} else if (hi->encoding == OBJ_ENCODING_HASHTABLE) {
|
||||||
hashtableInitIterator(&hi->iter, subject->ptr);
|
hashtableInitIterator(&hi->iter, subject->ptr, 0);
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown hash encoding");
|
serverPanic("Unknown hash encoding");
|
||||||
}
|
}
|
||||||
|
@ -317,7 +317,7 @@ setTypeIterator *setTypeInitIterator(robj *subject) {
|
|||||||
si->subject = subject;
|
si->subject = subject;
|
||||||
si->encoding = subject->encoding;
|
si->encoding = subject->encoding;
|
||||||
if (si->encoding == OBJ_ENCODING_HASHTABLE) {
|
if (si->encoding == OBJ_ENCODING_HASHTABLE) {
|
||||||
si->hashtable_iterator = hashtableCreateIterator(subject->ptr);
|
si->hashtable_iterator = hashtableCreateIterator(subject->ptr, 0);
|
||||||
} else if (si->encoding == OBJ_ENCODING_INTSET) {
|
} else if (si->encoding == OBJ_ENCODING_INTSET) {
|
||||||
si->ii = 0;
|
si->ii = 0;
|
||||||
} else if (si->encoding == OBJ_ENCODING_LISTPACK) {
|
} else if (si->encoding == OBJ_ENCODING_LISTPACK) {
|
||||||
@ -1179,7 +1179,7 @@ void srandmemberWithCountCommand(client *c) {
|
|||||||
/* CASE 3 & 4: send the result to the user. */
|
/* CASE 3 & 4: send the result to the user. */
|
||||||
{
|
{
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, ht);
|
hashtableInitIterator(&iter, ht, 0);
|
||||||
|
|
||||||
addReplyArrayLen(c, count);
|
addReplyArrayLen(c, count);
|
||||||
serverAssert(count == hashtableSize(ht));
|
serverAssert(count == hashtableSize(ht));
|
||||||
|
@ -2092,7 +2092,7 @@ static void zuiInitIterator(zsetopsrc *op) {
|
|||||||
it->is.is = op->subject->ptr;
|
it->is.is = op->subject->ptr;
|
||||||
it->is.ii = 0;
|
it->is.ii = 0;
|
||||||
} else if (op->encoding == OBJ_ENCODING_HASHTABLE) {
|
} else if (op->encoding == OBJ_ENCODING_HASHTABLE) {
|
||||||
it->ht.iter = hashtableCreateIterator(op->subject->ptr);
|
it->ht.iter = hashtableCreateIterator(op->subject->ptr, 0);
|
||||||
} else if (op->encoding == OBJ_ENCODING_LISTPACK) {
|
} else if (op->encoding == OBJ_ENCODING_LISTPACK) {
|
||||||
it->lp.lp = op->subject->ptr;
|
it->lp.lp = op->subject->ptr;
|
||||||
it->lp.p = lpFirst(it->lp.lp);
|
it->lp.p = lpFirst(it->lp.lp);
|
||||||
@ -2349,7 +2349,7 @@ static size_t zsetHashtableGetMaxElementLength(hashtable *ht, size_t *totallen)
|
|||||||
size_t maxelelen = 0;
|
size_t maxelelen = 0;
|
||||||
|
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, ht);
|
hashtableInitIterator(&iter, ht, 0);
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
zskiplistNode *node = next;
|
zskiplistNode *node = next;
|
||||||
@ -2749,7 +2749,7 @@ static void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIn
|
|||||||
|
|
||||||
/* Step 2: Create the skiplist using final score ordering */
|
/* Step 2: Create the skiplist using final score ordering */
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitIterator(&iter, dstzset->ht);
|
hashtableInitIterator(&iter, dstzset->ht, 0);
|
||||||
|
|
||||||
void *next;
|
void *next;
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
|
@ -547,7 +547,7 @@ int test_iterator(int argc, char **argv, int flags) {
|
|||||||
size_t num_returned = 0;
|
size_t num_returned = 0;
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
void *next;
|
void *next;
|
||||||
hashtableInitIterator(&iter, ht);
|
hashtableInitIterator(&iter, ht, 0);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
uint8_t *entry = next;
|
uint8_t *entry = next;
|
||||||
num_returned++;
|
num_returned++;
|
||||||
@ -592,7 +592,7 @@ int test_safe_iterator(int argc, char **argv, int flags) {
|
|||||||
size_t num_returned = 0;
|
size_t num_returned = 0;
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
void *next;
|
void *next;
|
||||||
hashtableInitSafeIterator(&iter, ht);
|
hashtableInitIterator(&iter, ht, HASHTABLE_ITER_SAFE);
|
||||||
while (hashtableNext(&iter, &next)) {
|
while (hashtableNext(&iter, &next)) {
|
||||||
uint8_t *entry = next;
|
uint8_t *entry = next;
|
||||||
size_t index = entry - entry_counts;
|
size_t index = entry - entry_counts;
|
||||||
@ -657,7 +657,7 @@ int test_compact_bucket_chain(int argc, char **argv, int flags) {
|
|||||||
size_t num_chained_buckets = hashtableChainedBuckets(ht, 0);
|
size_t num_chained_buckets = hashtableChainedBuckets(ht, 0);
|
||||||
size_t num_returned = 0;
|
size_t num_returned = 0;
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
hashtableInitSafeIterator(&iter, ht);
|
hashtableInitIterator(&iter, ht, HASHTABLE_ITER_SAFE);
|
||||||
void *entry;
|
void *entry;
|
||||||
while (hashtableNext(&iter, &entry)) {
|
while (hashtableNext(&iter, &entry)) {
|
||||||
/* As long as the iterator is still returning entries from the same
|
/* As long as the iterator is still returning entries from the same
|
||||||
|
@ -77,7 +77,7 @@ int test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyHashtable(int argc, char **arg
|
|||||||
TEST_ASSERT(kvstoreHashtableAdd(kvs1, didx, stringFromInt(i)));
|
TEST_ASSERT(kvstoreHashtableAdd(kvs1, didx, stringFromInt(i)));
|
||||||
}
|
}
|
||||||
|
|
||||||
kvs_it = kvstoreIteratorInit(kvs1);
|
kvs_it = kvstoreIteratorInit(kvs1, HASHTABLE_ITER_SAFE);
|
||||||
while (kvstoreIteratorNext(kvs_it, &key)) {
|
while (kvstoreIteratorNext(kvs_it, &key)) {
|
||||||
curr_slot = kvstoreIteratorGetCurrentHashtableIndex(kvs_it);
|
curr_slot = kvstoreIteratorGetCurrentHashtableIndex(kvs_it);
|
||||||
TEST_ASSERT(kvstoreHashtableDelete(kvs1, curr_slot, key));
|
TEST_ASSERT(kvstoreHashtableDelete(kvs1, curr_slot, key));
|
||||||
@ -110,7 +110,7 @@ int test_kvstoreIteratorRemoveAllKeysDeleteEmptyHashtable(int argc, char **argv,
|
|||||||
TEST_ASSERT(kvstoreHashtableAdd(kvs2, didx, stringFromInt(i)));
|
TEST_ASSERT(kvstoreHashtableAdd(kvs2, didx, stringFromInt(i)));
|
||||||
}
|
}
|
||||||
|
|
||||||
kvs_it = kvstoreIteratorInit(kvs2);
|
kvs_it = kvstoreIteratorInit(kvs2, HASHTABLE_ITER_SAFE);
|
||||||
while (kvstoreIteratorNext(kvs_it, &key)) {
|
while (kvstoreIteratorNext(kvs_it, &key)) {
|
||||||
curr_slot = kvstoreIteratorGetCurrentHashtableIndex(kvs_it);
|
curr_slot = kvstoreIteratorGetCurrentHashtableIndex(kvs_it);
|
||||||
TEST_ASSERT(kvstoreHashtableDelete(kvs2, curr_slot, key));
|
TEST_ASSERT(kvstoreHashtableDelete(kvs2, curr_slot, key));
|
||||||
@ -146,7 +146,7 @@ int test_kvstoreHashtableIteratorRemoveAllKeysNoDeleteEmptyHashtable(int argc, c
|
|||||||
TEST_ASSERT(kvstoreHashtableAdd(kvs1, didx, stringFromInt(i)));
|
TEST_ASSERT(kvstoreHashtableAdd(kvs1, didx, stringFromInt(i)));
|
||||||
}
|
}
|
||||||
|
|
||||||
kvs_di = kvstoreGetHashtableSafeIterator(kvs1, didx);
|
kvs_di = kvstoreGetHashtableIterator(kvs1, didx, HASHTABLE_ITER_SAFE);
|
||||||
while (kvstoreHashtableIteratorNext(kvs_di, &key)) {
|
while (kvstoreHashtableIteratorNext(kvs_di, &key)) {
|
||||||
TEST_ASSERT(kvstoreHashtableDelete(kvs1, didx, key));
|
TEST_ASSERT(kvstoreHashtableDelete(kvs1, didx, key));
|
||||||
}
|
}
|
||||||
@ -177,7 +177,7 @@ int test_kvstoreHashtableIteratorRemoveAllKeysDeleteEmptyHashtable(int argc, cha
|
|||||||
TEST_ASSERT(kvstoreHashtableAdd(kvs2, didx, stringFromInt(i)));
|
TEST_ASSERT(kvstoreHashtableAdd(kvs2, didx, stringFromInt(i)));
|
||||||
}
|
}
|
||||||
|
|
||||||
kvs_di = kvstoreGetHashtableSafeIterator(kvs2, didx);
|
kvs_di = kvstoreGetHashtableIterator(kvs2, didx, HASHTABLE_ITER_SAFE);
|
||||||
while (kvstoreHashtableIteratorNext(kvs_di, &key)) {
|
while (kvstoreHashtableIteratorNext(kvs_di, &key)) {
|
||||||
TEST_ASSERT(kvstoreHashtableDelete(kvs2, didx, key));
|
TEST_ASSERT(kvstoreHashtableDelete(kvs2, didx, key));
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user