Add 'WithDictIndex' expiry API and update RANDOMKEY command (#1155)

https://github.com/valkey-io/valkey/issues/1145

First part of a two-step effort to add `WithSlot` API for expiry. This
PR is to fix a crash that occurs when a RANDOMKEY uses a different slot
than the cached slot of a client during a multi-exec.

The next part will be to utilize the new API as an optimization to
prevent duplicate work when calculating the slot for a key.

---------

Signed-off-by: Nadav Levanoni <nadavl@amazon.com>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Nadav Levanoni <nadavl@amazon.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
This commit is contained in:
Nadav Levanoni 2024-10-16 17:40:11 -07:00 committed by Madelyn Olson
parent 54aa97c67d
commit 3ec2bb551b
2 changed files with 100 additions and 53 deletions

141
src/db.c
View File

@ -52,10 +52,13 @@ typedef enum {
KEY_DELETED /* The key was deleted now. */ KEY_DELETED /* The key was deleted now. */
} keyStatus; } keyStatus;
keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int dict_index);
keyStatus expireIfNeeded(serverDb *db, robj *key, int flags); keyStatus expireIfNeeded(serverDb *db, robj *key, int flags);
int keyIsExpiredWithDictIndex(serverDb *db, robj *key, int dict_index);
int keyIsExpired(serverDb *db, robj *key); int keyIsExpired(serverDb *db, robj *key);
static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de); static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de);
static int getKVStoreIndexForKey(sds key); static int getKVStoreIndexForKey(sds key);
dictEntry *dbFindExpiresWithDictIndex(serverDb *db, void *key, int dict_index);
/* Update LFU when an object is accessed. /* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached. * Firstly, decrement the counter if the decrement time is reached.
@ -269,7 +272,7 @@ int getKeySlot(sds key) {
* In this case a copy of `key` is copied in kvstore, the caller must ensure the `key` is properly freed. * In this case a copy of `key` is copied in kvstore, the caller must ensure the `key` is properly freed.
*/ */
int dbAddRDBLoad(serverDb *db, sds key, robj *val) { int dbAddRDBLoad(serverDb *db, sds key, robj *val) {
int dict_index = server.cluster_enabled ? getKeySlot(key) : 0; int dict_index = getKVStoreIndexForKey(key);
dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key, NULL); dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key, NULL);
if (de == NULL) return 0; if (de == NULL) return 0;
initObjectLRUOrLFU(val); initObjectLRUOrLFU(val);
@ -375,13 +378,13 @@ robj *dbRandomKey(serverDb *db) {
while (1) { while (1) {
sds key; sds key;
robj *keyobj; robj *keyobj;
int randomSlot = kvstoreGetFairRandomDictIndex(db->keys); int randomDictIndex = kvstoreGetFairRandomDictIndex(db->keys);
de = kvstoreDictGetFairRandomKey(db->keys, randomSlot); de = kvstoreDictGetFairRandomKey(db->keys, randomDictIndex);
if (de == NULL) return NULL; if (de == NULL) return NULL;
key = dictGetKey(de); key = dictGetKey(de);
keyobj = createStringObject(key, sdslen(key)); keyobj = createStringObject(key, sdslen(key));
if (dbFindExpires(db, key)) { if (dbFindExpiresWithDictIndex(db, key, randomDictIndex)) {
if (allvolatile && server.primary_host && --maxtries == 0) { if (allvolatile && server.primary_host && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set, /* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically * it could happen that all the keys are already logically
@ -393,7 +396,7 @@ robj *dbRandomKey(serverDb *db) {
* return a key name that may be already expired. */ * return a key name that may be already expired. */
return keyobj; return keyobj;
} }
if (expireIfNeeded(db, keyobj, 0) != KEY_VALID) { if (expireIfNeededWithDictIndex(db, keyobj, 0, randomDictIndex) != KEY_VALID) {
decrRefCount(keyobj); decrRefCount(keyobj);
continue; /* search for another key. This expired. */ continue; /* search for another key. This expired. */
} }
@ -402,11 +405,9 @@ robj *dbRandomKey(serverDb *db) {
} }
} }
/* Helper for sync and async delete. */ int dbGenericDeleteWithDictIndex(serverDb *db, robj *key, int async, int flags, int dict_index) {
int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
dictEntry **plink; dictEntry **plink;
int table; int table;
int dict_index = getKVStoreIndexForKey(key->ptr);
dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, dict_index, key->ptr, &plink, &table); dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, dict_index, key->ptr, &plink, &table);
if (de) { if (de) {
robj *val = dictGetVal(de); robj *val = dictGetVal(de);
@ -436,6 +437,12 @@ int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
} }
} }
/* Helper for sync and async delete. */
int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
int dict_index = getKVStoreIndexForKey(key->ptr);
return dbGenericDeleteWithDictIndex(db, key, async, flags, dict_index);
}
/* Delete a key, value, and associated expiration entry if any, from the DB */ /* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(serverDb *db, robj *key) { int dbSyncDelete(serverDb *db, robj *key) {
return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED); return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED);
@ -1693,19 +1700,25 @@ void setExpire(client *c, serverDb *db, robj *key, long long when) {
/* Return the expire time of the specified key, or -1 if no expire /* Return the expire time of the specified key, or -1 if no expire
* is associated with this key (i.e. the key is non volatile) */ * is associated with this key (i.e. the key is non volatile) */
long long getExpire(serverDb *db, robj *key) { long long getExpireWithDictIndex(serverDb *db, robj *key, int dict_index) {
dictEntry *de; dictEntry *de;
if ((de = dbFindExpires(db, key->ptr)) == NULL) return -1; if ((de = dbFindExpiresWithDictIndex(db, key->ptr, dict_index)) == NULL) return -1;
return dictGetSignedIntegerVal(de); return dictGetSignedIntegerVal(de);
} }
/* Delete the specified expired key and propagate expire. */ /* Return the expire time of the specified key, or -1 if no expire
void deleteExpiredKeyAndPropagate(serverDb *db, robj *keyobj) { * is associated with this key (i.e. the key is non volatile) */
long long getExpire(serverDb *db, robj *key) {
int dict_index = getKVStoreIndexForKey(key->ptr);
return getExpireWithDictIndex(db, key, dict_index);
}
void deleteExpiredKeyAndPropagateWithDictIndex(serverDb *db, robj *keyobj, int dict_index) {
mstime_t expire_latency; mstime_t expire_latency;
latencyStartMonitor(expire_latency); latencyStartMonitor(expire_latency);
dbGenericDelete(db, keyobj, server.lazyfree_lazy_expire, DB_FLAG_KEY_EXPIRED); dbGenericDeleteWithDictIndex(db, keyobj, server.lazyfree_lazy_expire, DB_FLAG_KEY_EXPIRED, dict_index);
latencyEndMonitor(expire_latency); latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del", expire_latency); latencyAddSampleIfNeeded("expire-del", expire_latency);
notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired", keyobj, db->id); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired", keyobj, db->id);
@ -1714,6 +1727,12 @@ void deleteExpiredKeyAndPropagate(serverDb *db, robj *keyobj) {
server.stat_expiredkeys++; server.stat_expiredkeys++;
} }
/* Delete the specified expired key and propagate expire. */
void deleteExpiredKeyAndPropagate(serverDb *db, robj *keyobj) {
int dict_index = getKVStoreIndexForKey(keyobj->ptr);
deleteExpiredKeyAndPropagateWithDictIndex(db, keyobj, dict_index);
}
/* Delete the specified expired key from overwriting and propagate the DEL or UNLINK. */ /* Delete the specified expired key from overwriting and propagate the DEL or UNLINK. */
void deleteExpiredKeyFromOverwriteAndPropagate(client *c, robj *keyobj) { void deleteExpiredKeyFromOverwriteAndPropagate(client *c, robj *keyobj) {
int deleted = dbGenericDelete(c->db, keyobj, server.lazyfree_lazy_expire, DB_FLAG_KEY_EXPIRED); int deleted = dbGenericDelete(c->db, keyobj, server.lazyfree_lazy_expire, DB_FLAG_KEY_EXPIRED);
@ -1765,12 +1784,11 @@ void propagateDeletion(serverDb *db, robj *key, int lazy) {
decrRefCount(argv[1]); decrRefCount(argv[1]);
} }
/* Check if the key is expired. */ int keyIsExpiredWithDictIndex(serverDb *db, robj *key, int dict_index) {
int keyIsExpired(serverDb *db, robj *key) {
/* Don't expire anything while loading. It will be done later. */ /* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0; if (server.loading) return 0;
mstime_t when = getExpire(db, key); mstime_t when = getExpireWithDictIndex(db, key, dict_index);
mstime_t now; mstime_t now;
if (when < 0) return 0; /* No expire for this key */ if (when < 0) return 0; /* No expire for this key */
@ -1782,39 +1800,15 @@ int keyIsExpired(serverDb *db, robj *key) {
return now > when; return now > when;
} }
/* This function is called when we are going to perform some operation /* Check if the key is expired. */
* in a given key, but such key may be already logically expired even if int keyIsExpired(serverDb *db, robj *key) {
* it still exists in the database. The main way this function is called int dict_index = getKVStoreIndexForKey(key->ptr);
* is via lookupKey*() family of functions. return keyIsExpiredWithDictIndex(db, key, dict_index);
* }
* The behavior of the function depends on the replication role of the
* instance, because by default replicas do not delete expired keys. They keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int dict_index) {
* wait for DELs from the primary for consistency matters. However even
* replicas will try to have a coherent return value for the function,
* so that read commands executed in the replica side will be able to
* behave like if the key is expired even if still present (because the
* primary has yet to propagate the DEL).
*
* In primary as a side effect of finding a key which is expired, such
* key will be evicted from the database. Also this may trigger the
* propagation of a DEL/UNLINK command in AOF / replication stream.
*
* On replicas, this function does not delete expired keys by default, but
* it still returns KEY_EXPIRED if the key is logically expired. To force deletion
* of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED
* flag. Note though that if the current client is executing
* replicated commands from the primary, keys are never considered expired.
*
* On the other hand, if you just want expiration check, but need to avoid
* the actual key deletion and propagation of the deletion, use the
* EXPIRE_AVOID_DELETE_EXPIRED flag.
*
* The return value of the function is KEY_VALID if the key is still valid.
* The function returns KEY_EXPIRED if the key is expired BUT not deleted,
* or returns KEY_DELETED if the key is expired and deleted. */
keyStatus expireIfNeeded(serverDb *db, robj *key, int flags) {
if (server.lazy_expire_disabled) return KEY_VALID; if (server.lazy_expire_disabled) return KEY_VALID;
if (!keyIsExpired(db, key)) return KEY_VALID; if (!keyIsExpiredWithDictIndex(db, key, dict_index)) return KEY_VALID;
/* If we are running in the context of a replica, instead of /* If we are running in the context of a replica, instead of
* evicting the expired key from the database, we return ASAP: * evicting the expired key from the database, we return ASAP:
@ -1849,13 +1843,48 @@ keyStatus expireIfNeeded(serverDb *db, robj *key, int flags) {
key = createStringObject(key->ptr, sdslen(key->ptr)); key = createStringObject(key->ptr, sdslen(key->ptr));
} }
/* Delete the key */ /* Delete the key */
deleteExpiredKeyAndPropagate(db, key); deleteExpiredKeyAndPropagateWithDictIndex(db, key, dict_index);
if (static_key) { if (static_key) {
decrRefCount(key); decrRefCount(key);
} }
return KEY_DELETED; return KEY_DELETED;
} }
/* This function is called when we are going to perform some operation
* in a given key, but such key may be already logically expired even if
* it still exists in the database. The main way this function is called
* is via lookupKey*() family of functions.
*
* The behavior of the function depends on the replication role of the
* instance, because by default replicas do not delete expired keys. They
* wait for DELs from the primary for consistency matters. However even
* replicas will try to have a coherent return value for the function,
* so that read commands executed in the replica side will be able to
* behave like if the key is expired even if still present (because the
* primary has yet to propagate the DEL).
*
* In primary as a side effect of finding a key which is expired, such
* key will be evicted from the database. Also this may trigger the
* propagation of a DEL/UNLINK command in AOF / replication stream.
*
* On replicas, this function does not delete expired keys by default, but
* it still returns KEY_EXPIRED if the key is logically expired. To force deletion
* of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED
* flag. Note though that if the current client is executing
* replicated commands from the primary, keys are never considered expired.
*
* On the other hand, if you just want expiration check, but need to avoid
* the actual key deletion and propagation of the deletion, use the
* EXPIRE_AVOID_DELETE_EXPIRED flag.
*
* The return value of the function is KEY_VALID if the key is still valid.
* The function returns KEY_EXPIRED if the key is expired BUT not deleted,
* or returns KEY_DELETED if the key is expired and deleted. */
keyStatus expireIfNeeded(serverDb *db, robj *key, int flags) {
int dict_index = getKVStoreIndexForKey(key->ptr);
return expireIfNeededWithDictIndex(db, key, flags, dict_index);
}
/* CB passed to kvstoreExpand. /* CB passed to kvstoreExpand.
* The purpose is to skip expansion of unused dicts in cluster mode (all * The purpose is to skip expansion of unused dicts in cluster mode (all
* dicts not mapped to *my* slots) */ * dicts not mapped to *my* slots) */
@ -1897,16 +1926,22 @@ int dbExpandExpires(serverDb *db, uint64_t db_size, int try_expand) {
return dbExpandGeneric(db->expires, db_size, try_expand); return dbExpandGeneric(db->expires, db_size, try_expand);
} }
static dictEntry *dbFindGeneric(kvstore *kvs, void *key) { dictEntry *dbFindWithDictIndex(serverDb *db, void *key, int dict_index) {
return kvstoreDictFind(kvs, server.cluster_enabled ? getKeySlot(key) : 0, key); return kvstoreDictFind(db->keys, dict_index, key);
} }
dictEntry *dbFind(serverDb *db, void *key) { dictEntry *dbFind(serverDb *db, void *key) {
return dbFindGeneric(db->keys, key); int dict_index = getKVStoreIndexForKey(key);
return dbFindWithDictIndex(db, key, dict_index);
}
dictEntry *dbFindExpiresWithDictIndex(serverDb *db, void *key, int dict_index) {
return kvstoreDictFind(db->expires, dict_index, key);
} }
dictEntry *dbFindExpires(serverDb *db, void *key) { dictEntry *dbFindExpires(serverDb *db, void *key) {
return dbFindGeneric(db->expires, key); int dict_index = getKVStoreIndexForKey(key);
return dbFindExpiresWithDictIndex(db, key, dict_index);
} }
unsigned long long dbSize(serverDb *db) { unsigned long long dbSize(serverDb *db) {

View File

@ -919,3 +919,15 @@ start_server {overrides {appendonly {yes} appendfilename {appendonly.aof} append
r get foo r get foo
} {} } {}
} }
start_cluster 1 0 {tags {"external:skip cluster"}} {
test "Regression test for multi-exec with RANDOMKEY accessing the wrong per-slot dictionary" {
R 0 SETEX FOO 10000 BAR
R 0 SETEX FIZZ 10000 BUZZ
R 0 MULTI
R 0 DEL FOO
R 0 RANDOMKEY
assert_equal [R 0 EXEC] {1 FIZZ}
}
}