Add a special notification unlink available only for modules (#9406)

Add a new module event `RedisModule_Event_Key`, this event is fired
when a key is removed from the keyspace.
The event includes an open key that can be used for reading the key before
it is removed. Modules can also extract the key-name, and use RM_Open
or RM_Call to access key from within that event, but shouldn't modify anything
from within this event.

The following sub events are available:
  - `REDISMODULE_SUBEVENT_KEY_DELETED`
  - `REDISMODULE_SUBEVENT_KEY_EXPIRED`
  - `REDISMODULE_SUBEVENT_KEY_EVICTED`
  - `REDISMODULE_SUBEVENT_KEY_OVERWRITE`

The data pointer can be casted to a RedisModuleKeyInfo structure
with the following fields:
```
     RedisModuleKey *key;    // Opened Key
 ```

### internals

* We also add two dict functions:
  `dictTwoPhaseUnlinkFind` finds an element from the table, also get the plink of the entry.
  The entry is returned if the element is found. The user should later call `dictTwoPhaseUnlinkFree`
  with it in order to unlink and release it. Otherwise if the key is not found, NULL is returned.
  These two functions should be used in pair. `dictTwoPhaseUnlinkFind` pauses rehash and
  `dictTwoPhaseUnlinkFree` resumes rehash.
* We change `dbOverwrite` to `dbReplaceValue` which just replaces the value of the key and
  doesn't fire any events. The "overwrite" part (which emits events) is just when called from `setKey`,
  the other places that called dbOverwrite were ones that just update the value in-place (INCR*, SPOP,
  and dbUnshareStringValue). This should not have any real impact since `moduleNotifyKeyUnlink` and
  `signalDeletedKeyAsReady` wouldn't have mattered in these cases anyway (i.e. module keys and
  stream keys didn't have direct calls to dbOverwrite)
* since we allow doing RM_OpenKey from withing these callbacks, we temporarily disable lazy expiry.
* We also temporarily disable lazy expiry when we are in unlink/unlink2 callback and keyspace 
  notification callback.
* Move special definitions to the top of redismodule.h
  This is needed to resolve compilation errors with RedisModuleKeyInfoV1
  that carries a RedisModuleKey member.

Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
Huang Zhw 2022-11-30 17:56:36 +08:00 committed by GitHub
parent 7dfd7b9197
commit c81813148b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 508 additions and 58 deletions

View File

@ -218,8 +218,13 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
* count of the new value is up to the caller.
* This function does not modify the expire time of the existing key.
*
* The 'overwrite' flag is an indication whether this is done as part of a
* complete replacement of their key, which can be thought as a deletion and
* replacement (in which case we need to emit deletion signals), or just an
* update of a value of an existing key (when false).
*
* The program is aborted if the key was not already present. */
void dbOverwrite(redisDb *db, robj *key, robj *val) {
static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite) {
dictEntry *de = dictFind(db->dict,key->ptr);
serverAssertWithInfo(NULL,key,de != NULL);
@ -228,12 +233,22 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
val->lru = old->lru;
}
/* Although the key is not really deleted from the database, we regard
* overwrite as two steps of unlink+add, so we still need to call the unlink
* callback of the module. */
moduleNotifyKeyUnlink(key,old,db->id);
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
signalDeletedKeyAsReady(db,key,old->type);
if (overwrite) {
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
* need to incr to retain old */
incrRefCount(old);
/* Although the key is not really deleted from the database, we regard
* overwrite as two steps of unlink+add, so we still need to call the unlink
* callback of the module. */
moduleNotifyKeyUnlink(key,old,db->id,DB_FLAG_KEY_OVERWRITE);
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
signalDeletedKeyAsReady(db,key,old->type);
decrRefCount(old);
/* Because of RM_StringDMA, old may be changed, so we need get old again */
old = dictGetVal(de);
/* Entry in auxentry may be changed, so we need update auxentry */
auxentry = *de;
}
dictSetVal(db->dict, de, val);
if (server.lazyfree_lazy_server_del) {
@ -244,6 +259,12 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
dictFreeVal(db->dict, &auxentry);
}
/* Replace an existing key with a new value, we just replace value and don't
* emit any events */
void dbReplaceValue(redisDb *db, robj *key, robj *val) {
dbSetValue(db, key, val, 0);
}
/* High level Set operation. This function can be used in order to set
* a key, whatever it was existing or not, to a new object.
*
@ -268,7 +289,7 @@ void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) {
if (!keyfound) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
dbSetValue(db,key,val,1);
}
incrRefCount(val);
if (!(flags & SETKEY_KEEPTTL)) removeExpire(db,key);
@ -315,23 +336,33 @@ robj *dbRandomKey(redisDb *db) {
}
/* Helper for sync and async delete. */
static int dbGenericDelete(redisDb *db, robj *key, int async) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
dictEntry *de = dictUnlink(db->dict,key->ptr);
int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
dictEntry **plink;
int table;
dictEntry *de = dictTwoPhaseUnlinkFind(db->dict,key->ptr,&plink,&table);
if (de) {
robj *val = dictGetVal(de);
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
* need to incr to retain val */
incrRefCount(val);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val,db->id);
moduleNotifyKeyUnlink(key,val,db->id,flags);
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
signalDeletedKeyAsReady(db,key,val->type);
/* We should call decr before freeObjAsync. If not, the refcount may be
* greater than 1, so freeObjAsync doesn't work */
decrRefCount(val);
if (async) {
freeObjAsync(key, val, db->id);
/* Because of dbUnshareStringValue, the val in de may change. */
freeObjAsync(key, dictGetVal(de), db->id);
dictSetVal(db->dict, de, NULL);
}
if (server.cluster_enabled) slotToKeyDelEntry(de, db);
dictFreeUnlinkedEntry(db->dict,de);
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
dictTwoPhaseUnlinkFree(db->dict,de,plink,table);
return 1;
} else {
return 0;
@ -340,19 +371,19 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) {
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
return dbGenericDelete(db, key, 0);
return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED);
}
/* Delete a key, value, and associated expiration entry if any, from the DB. If
* the value consists of many allocations, it may be freed asynchronously. */
int dbAsyncDelete(redisDb *db, robj *key) {
return dbGenericDelete(db, key, 1);
return dbGenericDelete(db, key, 1, DB_FLAG_KEY_DELETED);
}
/* This is a wrapper whose behavior depends on the Redis lazy free
* configuration. Deletes the key synchronously or asynchronously. */
int dbDelete(redisDb *db, robj *key) {
return dbGenericDelete(db, key, server.lazyfree_lazy_server_del);
return dbGenericDelete(db, key, server.lazyfree_lazy_server_del, DB_FLAG_KEY_DELETED);
}
/* Prepare the string object stored at 'key' to be modified destructively
@ -388,7 +419,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
robj *decoded = getDecodedObject(o);
o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
dbOverwrite(db,key,o);
dbReplaceValue(db,key,o);
}
return o;
}
@ -1561,10 +1592,7 @@ long long getExpire(redisDb *db, robj *key) {
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
mstime_t expire_latency;
latencyStartMonitor(expire_latency);
if (server.lazyfree_lazy_expire)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
dbGenericDelete(db,keyobj,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED);
latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del",expire_latency);
notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
@ -1657,6 +1685,7 @@ int keyIsExpired(redisDb *db, robj *key) {
* The return value of the function is 0 if the key is still valid,
* otherwise the function returns 1 if the key is expired. */
int expireIfNeeded(redisDb *db, robj *key, int flags) {
if (server.lazy_expire_disabled) return 0;
if (!keyIsExpired(db,key)) return 0;
/* If we are running in the context of a replica, instead of

View File

@ -541,6 +541,56 @@ void *dictFetchValue(dict *d, const void *key) {
return he ? dictGetVal(he) : NULL;
}
/* Find an element from the table, also get the plink of the entry. The entry
* is returned if the element is found, and the user should later call
* `dictTwoPhaseUnlinkFree` with it in order to unlink and release it. Otherwise if
* the key is not found, NULL is returned. These two functions should be used in pair.
* `dictTwoPhaseUnlinkFind` pauses rehash and `dictTwoPhaseUnlinkFree` resumes rehash.
*
* We can use like this:
*
* dictEntry *de = dictTwoPhaseUnlinkFind(db->dict,key->ptr,&plink, &table);
* // Do something, but we can't modify the dict
* dictTwoPhaseUnlinkFree(db->dict,de,plink,table); // We don't need to lookup again
*
* If we want to find an entry before delete this entry, this an optimization to avoid
* dictFind followed by dictDelete. i.e. the first API is a find, and it gives some info
* to the second one to avoid repeating the lookup
*/
dictEntry *dictTwoPhaseUnlinkFind(dict *d, const void *key, dictEntry ***plink, int *table_index) {
uint64_t h, idx, table;
if (dictSize(d) == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
dictEntry **ref = &d->ht_table[table][idx];
while(*ref) {
if (key==(*ref)->key || dictCompareKeys(d, key, (*ref)->key)) {
*table_index = table;
*plink = ref;
dictPauseRehashing(d);
return *ref;
}
ref = &(*ref)->next;
}
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}
void dictTwoPhaseUnlinkFree(dict *d, dictEntry *he, dictEntry **plink, int table_index) {
if (he == NULL) return;
d->ht_used[table_index]--;
*plink = he->next;
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
dictResumeRehashing(d);
}
/* A fingerprint is a 64 bit number that represents the state of the dictionary
* at a given time, it's just a few dict properties xored together.
* When an unsafe iterator is initialized, we get the dict fingerprint, and check

View File

@ -181,6 +181,8 @@ int dictReplace(dict *d, void *key, void *val);
int dictDelete(dict *d, const void *key);
dictEntry *dictUnlink(dict *d, const void *key);
void dictFreeUnlinkedEntry(dict *d, dictEntry *he);
dictEntry *dictTwoPhaseUnlinkFind(dict *d, const void *key, dictEntry ***plink, int *table_index);
void dictTwoPhaseUnlinkFree(dict *d, dictEntry *he, dictEntry **plink, int table_index);
void dictRelease(dict *d);
dictEntry * dictFind(dict *d, const void *key);
void *dictFetchValue(dict *d, const void *key);

View File

@ -678,10 +678,7 @@ int performEvictions(void) {
* we only care about memory used by the key space. */
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
dbGenericDelete(db,keyobj,server.lazyfree_lazy_eviction,DB_FLAG_KEY_EVICTED);
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
delta -= (long long) zmalloc_used_memory();

View File

@ -637,8 +637,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
if (checkAlreadyExpired(when)) {
robj *aux;
int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
dbSyncDelete(c->db,key);
int deleted = dbGenericDelete(c->db,key,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED);
serverAssertWithInfo(c,key,deleted);
server.dirty++;

View File

@ -216,7 +216,6 @@ struct RedisModuleKey {
} stream;
} u;
};
typedef struct RedisModuleKey RedisModuleKey;
/* RedisModuleKey 'ztype' values. */
#define REDISMODULE_ZSET_RANGE_NONE 0 /* This must always be 0. */
@ -8130,7 +8129,9 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
* If the subscriber performs an action triggering itself,
* it will not be notified about it. */
sub->active = 1;
server.lazy_expire_disabled++;
sub->notify_callback(&ctx, type, event, key);
server.lazy_expire_disabled--;
sub->active = 0;
moduleFreeContext(&ctx);
}
@ -10603,6 +10604,7 @@ static uint64_t moduleEventVersions[] = {
-1, /* REDISMODULE_EVENT_REPL_ASYNC_LOAD */
-1, /* REDISMODULE_EVENT_EVENTLOOP */
-1, /* REDISMODULE_EVENT_CONFIG */
REDISMODULE_KEYINFO_VERSION, /* REDISMODULE_EVENT_KEY */
};
/* Register to be notified, via a callback, when the specified server event
@ -10877,6 +10879,22 @@ static uint64_t moduleEventVersions[] = {
* // name of each modified configuration item
* uint32_t num_changes; // The number of elements in the config_names array
*
* * RedisModule_Event_Key
*
* Called when a key is removed from the keyspace. We can't modify any key in
* the event.
* The following sub events are available:
*
* * `REDISMODULE_SUBEVENT_KEY_DELETED`
* * `REDISMODULE_SUBEVENT_KEY_EXPIRED`
* * `REDISMODULE_SUBEVENT_KEY_EVICTED`
* * `REDISMODULE_SUBEVENT_KEY_OVERWRITTEN`
*
* The data pointer can be casted to a RedisModuleKeyInfo
* structure with the following fields:
*
* RedisModuleKey *key; // Key name
*
* The function returns REDISMODULE_OK if the module was successfully subscribed
* for the specified event. If the API is called from a wrong context or unsupported event
* is given then REDISMODULE_ERR is returned. */
@ -10956,12 +10974,21 @@ int RM_IsSubEventSupported(RedisModuleEvent event, int64_t subevent) {
return subevent < _REDISMODULE_SUBEVENT_EVENTLOOP_NEXT;
case REDISMODULE_EVENT_CONFIG:
return subevent < _REDISMODULE_SUBEVENT_CONFIG_NEXT;
case REDISMODULE_EVENT_KEY:
return subevent < _REDISMODULE_SUBEVENT_KEY_NEXT;
default:
break;
}
return 0;
}
typedef struct KeyInfo {
int32_t dbnum;
RedisModuleString *key;
robj *value;
int mode;
} KeyInfo;
/* This is called by the Redis internals every time we want to fire an
* event that can be intercepted by some module. The pointer 'data' is useful
* in order to populate the event-specific structure when needed, in order
@ -10998,6 +11025,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
RedisModuleClientInfoV1 civ1;
RedisModuleReplicationInfoV1 riv1;
RedisModuleModuleChangeV1 mcv1;
RedisModuleKey key;
RedisModuleKeyInfoV1 ki = {REDISMODULE_KEYINFO_VERSION, &key};
/* Event specific context and data pointer setup. */
if (eid == REDISMODULE_EVENT_CLIENT_CHANGE) {
@ -11029,12 +11058,21 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
moduledata = data;
} else if (eid == REDISMODULE_EVENT_CONFIG) {
moduledata = data;
} else if (eid == REDISMODULE_EVENT_KEY) {
KeyInfo *info = data;
selectDb(ctx.client, info->dbnum);
moduleInitKey(&key, &ctx, info->key, info->value, info->mode);
moduledata = &ki;
}
el->module->in_hook++;
el->callback(&ctx,el->event,subid,moduledata);
el->module->in_hook--;
if (eid == REDISMODULE_EVENT_KEY) {
moduleCloseKey(&key);
}
moduleFreeContext(&ctx);
}
}
@ -11078,9 +11116,21 @@ void processModuleLoadingProgressEvent(int is_aof) {
}
}
/* When a module key is deleted (in dbAsyncDelete/dbSyncDelete/dbOverwrite), it
/* When a key is deleted (in dbAsyncDelete/dbSyncDelete/setKey), it
* will be called to tell the module which key is about to be released. */
void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid) {
void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags) {
server.lazy_expire_disabled++;
int subevent = REDISMODULE_SUBEVENT_KEY_DELETED;
if (flags & DB_FLAG_KEY_EXPIRED) {
subevent = REDISMODULE_SUBEVENT_KEY_EXPIRED;
} else if (flags & DB_FLAG_KEY_EVICTED) {
subevent = REDISMODULE_SUBEVENT_KEY_EVICTED;
} else if (flags & DB_FLAG_KEY_OVERWRITE) {
subevent = REDISMODULE_SUBEVENT_KEY_OVERWRITTEN;
}
KeyInfo info = {dbid, key, val, REDISMODULE_WRITE};
moduleFireServerEvent(REDISMODULE_EVENT_KEY, subevent, &info);
if (val->type == OBJ_MODULE) {
moduleValue *mv = val->ptr;
moduleType *mt = mv->type;
@ -11090,8 +11140,9 @@ void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid) {
mt->unlink2(&ctx,mv->value);
} else if (mt->unlink != NULL) {
mt->unlink(key,mv->value);
}
}
}
server.lazy_expire_disabled--;
}
/* Return the free_effort of the module, it will automatically choose to call

View File

@ -6,6 +6,28 @@
#include <stdio.h>
#include <stdlib.h>
typedef struct RedisModuleString RedisModuleString;
typedef struct RedisModuleKey RedisModuleKey;
/* -------------- Defines NOT common between core and modules ------------- */
#if defined REDISMODULE_CORE
/* Things only defined for the modules core (server), not exported to modules
* that include this file. */
#define RedisModuleString robj
#endif /* defined REDISMODULE_CORE */
#if !defined REDISMODULE_CORE && !defined REDISMODULE_CORE_MODULE
/* Things defined for modules, but not for core-modules. */
typedef long long mstime_t;
typedef long long ustime_t;
#endif /* !defined REDISMODULE_CORE && !defined REDISMODULE_CORE_MODULE */
/* ---------------- Defines common between core and modules --------------- */
/* Error status return values. */
@ -458,7 +480,8 @@ typedef void (*RedisModuleEventLoopOneShotFunc)(void *user_data);
#define REDISMODULE_EVENT_REPL_ASYNC_LOAD 14
#define REDISMODULE_EVENT_EVENTLOOP 15
#define REDISMODULE_EVENT_CONFIG 16
#define _REDISMODULE_EVENT_NEXT 17 /* Next event flag, should be updated if a new event added. */
#define REDISMODULE_EVENT_KEY 17
#define _REDISMODULE_EVENT_NEXT 18 /* Next event flag, should be updated if a new event added. */
typedef struct RedisModuleEvent {
uint64_t id; /* REDISMODULE_EVENT_... defines. */
@ -565,6 +588,10 @@ static const RedisModuleEvent
RedisModuleEvent_Config = {
REDISMODULE_EVENT_CONFIG,
1
},
RedisModuleEvent_Key = {
REDISMODULE_EVENT_KEY,
1
};
/* Those are values that are used for the 'subevent' callback argument. */
@ -633,6 +660,12 @@ static const RedisModuleEvent
#define REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP 1
#define _REDISMODULE_SUBEVENT_EVENTLOOP_NEXT 2
#define REDISMODULE_SUBEVENT_KEY_DELETED 0
#define REDISMODULE_SUBEVENT_KEY_EXPIRED 1
#define REDISMODULE_SUBEVENT_KEY_EVICTED 2
#define REDISMODULE_SUBEVENT_KEY_OVERWRITTEN 3
#define _REDISMODULE_SUBEVENT_KEY_NEXT 4
#define _REDISMODULE_SUBEVENT_SHUTDOWN_NEXT 0
#define _REDISMODULE_SUBEVENT_CRON_LOOP_NEXT 0
#define _REDISMODULE_SUBEVENT_SWAPDB_NEXT 0
@ -756,6 +789,16 @@ typedef struct RedisModuleSwapDbInfo {
#define RedisModuleSwapDbInfo RedisModuleSwapDbInfoV1
#define REDISMODULE_KEYINFO_VERSION 1
typedef struct RedisModuleKeyInfo {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
RedisModuleKey *key; /* Opened key. */
} RedisModuleKeyInfoV1;
#define RedisModuleKeyInfo RedisModuleKeyInfoV1
typedef enum {
REDISMODULE_ACL_LOG_AUTH = 0, /* Authentication failure */
REDISMODULE_ACL_LOG_CMD, /* Command authorization failure */
@ -764,7 +807,6 @@ typedef enum {
} RedisModuleACLLogEntryReason;
/* Incomplete structures needed by both the core and modules. */
typedef struct RedisModuleString RedisModuleString;
typedef struct RedisModuleIO RedisModuleIO;
typedef struct RedisModuleDigest RedisModuleDigest;
typedef struct RedisModuleInfoCtx RedisModuleInfoCtx;
@ -778,22 +820,6 @@ typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
/* ------------------------- End of common defines ------------------------ */
#if defined REDISMODULE_CORE
/* Things only defined for the modules core (server), not exported to modules
* that include this file. */
#define RedisModuleString robj
#endif /* defined REDISMODULE_CORE */
#if !defined REDISMODULE_CORE && !defined REDISMODULE_CORE_MODULE
/* Things defined for modules, but not for core-modules. */
typedef long long mstime_t;
typedef long long ustime_t;
#endif /* !defined REDISMODULE_CORE && !defined REDISMODULE_CORE_MODULE */
/* ----------- The rest of the defines are only for modules ----------------- */
#if !defined REDISMODULE_CORE || defined REDISMODULE_CORE_MODULE
/* Things defined for modules and core-modules. */
@ -826,7 +852,6 @@ typedef long long ustime_t;
/* Incomplete structures for compiler checks but opaque access. */
typedef struct RedisModuleCtx RedisModuleCtx;
typedef struct RedisModuleCommand RedisModuleCommand;
typedef struct RedisModuleKey RedisModuleKey;
typedef struct RedisModuleCallReply RedisModuleCallReply;
typedef struct RedisModuleType RedisModuleType;
typedef struct RedisModuleBlockedClient RedisModuleBlockedClient;

View File

@ -1888,6 +1888,7 @@ void initServerConfig(void) {
server.bindaddr[j] = zstrdup(default_bindaddr[j]);
memset(server.listeners, 0x00, sizeof(server.listeners));
server.active_expire_enabled = 1;
server.lazy_expire_disabled = 0;
server.skip_checksum_validation = 0;
server.loading = 0;
server.async_loading = 0;

View File

@ -291,6 +291,13 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
/* Key flags for when access type is unknown */
#define CMD_KEY_FULL_ACCESS (CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE)
/* Key flags for how key is removed */
#define DB_FLAG_KEY_NONE 0
#define DB_FLAG_KEY_DELETED (1ULL<<0)
#define DB_FLAG_KEY_EXPIRED (1ULL<<1)
#define DB_FLAG_KEY_EVICTED (1ULL<<2)
#define DB_FLAG_KEY_OVERWRITE (1ULL<<3)
/* Channel flags share the same flag space as the key flags */
#define CMD_CHANNEL_PATTERN (1ULL<<11) /* The argument is a channel pattern */
#define CMD_CHANNEL_SUBSCRIBE (1ULL<<12) /* The command subscribes to channels */
@ -1648,6 +1655,7 @@ struct redisServer {
int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */
int active_expire_enabled; /* Can be disabled for testing purposes. */
int active_expire_effort; /* From 1 (default) to 10, active effort. */
int lazy_expire_disabled; /* If > 0, don't trigger lazy expire */
int active_defrag_enabled;
int sanitize_dump_payload; /* Enables deep sanitization for ziplist and listpack in RDB and RESTORE. */
int skip_checksum_validation; /* Disable checksum validation for RDB and RESTORE payload. */
@ -2440,7 +2448,7 @@ void moduleUnblockClient(client *c);
int moduleBlockedClientMayTimeout(client *c);
int moduleClientIsBlockedOnKeys(client *c);
void moduleNotifyUserChanged(client *c);
void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid);
void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags);
size_t moduleGetFreeEffort(robj *key, robj *val, int dbid);
size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid);
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value);
@ -3154,7 +3162,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
void dbAdd(redisDb *db, robj *key, robj *val);
int dbAddRDBLoad(redisDb *db, sds key, robj *val);
void dbOverwrite(redisDb *db, robj *key, robj *val);
void dbReplaceValue(redisDb *db, robj *key, robj *val);
#define SETKEY_KEEPTTL 1
#define SETKEY_NO_SIGNAL 2
@ -3162,6 +3170,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val);
#define SETKEY_DOESNT_EXIST 8
void setKey(client *c, redisDb *db, robj *key, robj *val, int flags);
robj *dbRandomKey(redisDb *db);
int dbGenericDelete(redisDb *db, robj *key, int async, int flags);
int dbSyncDelete(redisDb *db, robj *key);
int dbDelete(redisDb *db, robj *key);
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);

View File

@ -885,7 +885,7 @@ void spopWithCountCommand(client *c) {
setTypeReleaseIterator(si);
/* Assign the new set as the key value. */
dbOverwrite(c->db,c->argv[1],newset);
dbReplaceValue(c->db,c->argv[1],newset);
}
/* Don't propagate the command itself even if we incremented the

View File

@ -612,7 +612,7 @@ void incrDecrCommand(client *c, long long incr) {
} else {
new = createStringObjectFromLongLongForValue(value);
if (o) {
dbOverwrite(c->db,c->argv[1],new);
dbReplaceValue(c->db,c->argv[1],new);
} else {
dbAdd(c->db,c->argv[1],new);
}
@ -667,7 +667,7 @@ void incrbyfloatCommand(client *c) {
}
new = createStringObjectFromLongDouble(value,1);
if (o)
dbOverwrite(c->db,c->argv[1],new);
dbReplaceValue(c->db,c->argv[1],new);
else
dbAdd(c->db,c->argv[1],new);
signalModifiedKey(c,c->db,c->argv[1]);

View File

@ -33,11 +33,18 @@
#include "redismodule.h"
#include <stdio.h>
#include <string.h>
#include <assert.h>
/* We need to store events to be able to test and see what we got, and we can't
* store them in the key-space since that would mess up rdb loading (duplicates)
* and be lost of flushdb. */
RedisModuleDict *event_log = NULL;
/* stores all the keys on which we got 'removed' event */
RedisModuleDict *removed_event_log = NULL;
/* stores all the subevent on which we got 'removed' event */
RedisModuleDict *removed_subevent_type = NULL;
/* stores all the keys on which we got 'removed' event with expiry information */
RedisModuleDict *removed_expiry_log = NULL;
typedef struct EventElement {
long count;
@ -279,6 +286,119 @@ void configChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub,
LogStringEvent(ctx, "config-change-first", ei->config_names[0]);
}
void keyInfoCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
RedisModuleKeyInfoV1 *ei = data;
RedisModuleKey *kp = ei->key;
RedisModuleString *key = (RedisModuleString *) RedisModule_GetKeyNameFromModuleKey(kp);
const char *keyname = RedisModule_StringPtrLen(key, NULL);
RedisModuleString *event_keyname = RedisModule_CreateStringPrintf(ctx, "key-info-%s", keyname);
LogStringEvent(ctx, RedisModule_StringPtrLen(event_keyname, NULL), keyname);
RedisModule_FreeString(ctx, event_keyname);
/* Despite getting a key object from the callback, we also try to re-open it
* to make sure the callback is called before it is actually removed from the keyspace. */
RedisModuleKey *kp_open = RedisModule_OpenKey(ctx, key, REDISMODULE_READ);
assert(RedisModule_ValueLength(kp) == RedisModule_ValueLength(kp_open));
RedisModule_CloseKey(kp_open);
/* We also try to RM_Call a command that accesses that key, also to make sure it's still in the keyspace. */
char *size_command = NULL;
int key_type = RedisModule_KeyType(kp);
if (key_type == REDISMODULE_KEYTYPE_STRING) {
size_command = "STRLEN";
} else if (key_type == REDISMODULE_KEYTYPE_LIST) {
size_command = "LLEN";
} else if (key_type == REDISMODULE_KEYTYPE_HASH) {
size_command = "HLEN";
} else if (key_type == REDISMODULE_KEYTYPE_SET) {
size_command = "SCARD";
} else if (key_type == REDISMODULE_KEYTYPE_ZSET) {
size_command = "ZCARD";
} else if (key_type == REDISMODULE_KEYTYPE_STREAM) {
size_command = "XLEN";
}
if (size_command != NULL) {
RedisModuleCallReply *reply = RedisModule_Call(ctx, size_command, "s", key);
assert(reply != NULL);
assert(RedisModule_ValueLength(kp) == (size_t) RedisModule_CallReplyInteger(reply));
RedisModule_FreeCallReply(reply);
}
/* Now use the key object we got from the callback for various validations. */
RedisModuleString *prev = RedisModule_DictGetC(removed_event_log, (void*)keyname, strlen(keyname), NULL);
/* We keep object length */
RedisModuleString *v = RedisModule_CreateStringPrintf(ctx, "%zd", RedisModule_ValueLength(kp));
/* For string type, we keep value instead of length */
if (RedisModule_KeyType(kp) == REDISMODULE_KEYTYPE_STRING) {
RedisModule_FreeString(ctx, v);
size_t len;
/* We need to access the string value with RedisModule_StringDMA.
* RedisModule_StringDMA may call dbUnshareStringValue to free the origin object,
* so we also can test it. */
char *s = RedisModule_StringDMA(kp, &len, REDISMODULE_READ);
v = RedisModule_CreateString(ctx, s, len);
}
RedisModule_DictReplaceC(removed_event_log, (void*)keyname, strlen(keyname), v);
if (prev != NULL) {
RedisModule_FreeString(ctx, prev);
}
const char *subevent = "deleted";
if (sub == REDISMODULE_SUBEVENT_KEY_EXPIRED) {
subevent = "expired";
} else if (sub == REDISMODULE_SUBEVENT_KEY_EVICTED) {
subevent = "evicted";
} else if (sub == REDISMODULE_SUBEVENT_KEY_OVERWRITTEN) {
subevent = "overwritten";
}
RedisModule_DictReplaceC(removed_subevent_type, (void*)keyname, strlen(keyname), (void *)subevent);
RedisModuleString *prevexpire = RedisModule_DictGetC(removed_expiry_log, (void*)keyname, strlen(keyname), NULL);
RedisModuleString *expire = RedisModule_CreateStringPrintf(ctx, "%lld", RedisModule_GetAbsExpire(kp));
RedisModule_DictReplaceC(removed_expiry_log, (void*)keyname, strlen(keyname), (void *)expire);
if (prevexpire != NULL) {
RedisModule_FreeString(ctx, prevexpire);
}
}
static int cmdIsKeyRemoved(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
if(argc != 2){
return RedisModule_WrongArity(ctx);
}
const char *key = RedisModule_StringPtrLen(argv[1], NULL);
RedisModuleString *value = RedisModule_DictGetC(removed_event_log, (void*)key, strlen(key), NULL);
if (value == NULL) {
return RedisModule_ReplyWithError(ctx, "ERR Key was not removed");
}
const char *subevent = RedisModule_DictGetC(removed_subevent_type, (void*)key, strlen(key), NULL);
RedisModule_ReplyWithArray(ctx, 2);
RedisModule_ReplyWithString(ctx, value);
RedisModule_ReplyWithSimpleString(ctx, subevent);
return REDISMODULE_OK;
}
static int cmdKeyExpiry(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
if(argc != 2){
return RedisModule_WrongArity(ctx);
}
const char* key = RedisModule_StringPtrLen(argv[1], NULL);
RedisModuleString *expire = RedisModule_DictGetC(removed_expiry_log, (void*)key, strlen(key), NULL);
if (expire == NULL) {
return RedisModule_ReplyWithError(ctx, "ERR Key was not removed");
}
RedisModule_ReplyWithString(ctx, expire);
return REDISMODULE_OK;
}
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
@ -332,7 +452,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_Config, configChangeCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_Key, keyInfoCallback);
event_log = RedisModule_CreateDict(ctx);
removed_event_log = RedisModule_CreateDict(ctx);
removed_subevent_type = RedisModule_CreateDict(ctx);
removed_expiry_log = RedisModule_CreateDict(ctx);
if (RedisModule_CreateCommand(ctx,"hooks.event_count", cmdEventCount,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
@ -340,6 +466,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"hooks.clear", cmdEventsClear,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"hooks.is_key_removed", cmdIsKeyRemoved,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"hooks.pexpireat", cmdKeyExpiry,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}
@ -348,6 +478,29 @@ int RedisModule_OnUnload(RedisModuleCtx *ctx) {
clearEvents(ctx);
RedisModule_FreeDict(ctx, event_log);
event_log = NULL;
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(removed_event_log, "^", NULL, 0);
char* key;
size_t keyLen;
RedisModuleString* val;
while((key = RedisModule_DictNextC(iter, &keyLen, (void**)&val))){
RedisModule_FreeString(ctx, val);
}
RedisModule_FreeDict(ctx, removed_event_log);
RedisModule_DictIteratorStop(iter);
removed_event_log = NULL;
RedisModule_FreeDict(ctx, removed_subevent_type);
removed_subevent_type = NULL;
iter = RedisModule_DictIteratorStartC(removed_expiry_log, "^", NULL, 0);
while((key = RedisModule_DictNextC(iter, &keyLen, (void**)&val))){
RedisModule_FreeString(ctx, val);
}
RedisModule_FreeDict(ctx, removed_expiry_log);
RedisModule_DictIteratorStop(iter);
removed_expiry_log = NULL;
return REDISMODULE_OK;
}

View File

@ -85,6 +85,140 @@ tags "modules" {
assert {[r hooks.event_count loading-rdb-start] == 1}
}
test {Test key unlink hook} {
r set testkey1 hello
r del testkey1
assert {[r hooks.event_count key-info-testkey1] == 1}
assert_equal [r hooks.event_last key-info-testkey1] testkey1
r lpush testkey1 hello
r lpop testkey1
assert {[r hooks.event_count key-info-testkey1] == 2}
assert_equal [r hooks.event_last key-info-testkey1] testkey1
r set testkey2 world
r unlink testkey2
assert {[r hooks.event_count key-info-testkey2] == 1}
assert_equal [r hooks.event_last key-info-testkey2] testkey2
}
test {Test removed key event} {
r set str abcd
r set str abcde
# For String Type value is returned
assert_equal {abcd overwritten} [r hooks.is_key_removed str]
assert_equal -1 [r hooks.pexpireat str]
r del str
assert_equal {abcde deleted} [r hooks.is_key_removed str]
assert_equal -1 [r hooks.pexpireat str]
# test int encoded string
r set intstr 12345678
# incr doesn't fire event
r incr intstr
catch {[r hooks.is_key_removed intstr]} output
assert_match {ERR * removed} $output
r del intstr
assert_equal {12345679 deleted} [r hooks.is_key_removed intstr]
catch {[r hooks.is_key_removed not-exists]} output
assert_match {ERR * removed} $output
r hset hash f v
r hdel hash f
assert_equal {0 deleted} [r hooks.is_key_removed hash]
r hset hash f v a b
r del hash
assert_equal {2 deleted} [r hooks.is_key_removed hash]
r lpush list 1
r lpop list
assert_equal {0 deleted} [r hooks.is_key_removed list]
r lpush list 1 2 3
r del list
assert_equal {3 deleted} [r hooks.is_key_removed list]
r sadd set 1
r spop set
assert_equal {0 deleted} [r hooks.is_key_removed set]
r sadd set 1 2 3 4
r del set
assert_equal {4 deleted} [r hooks.is_key_removed set]
r zadd zset 1 f
r zpopmin zset
assert_equal {0 deleted} [r hooks.is_key_removed zset]
r zadd zset 1 f 2 d
r del zset
assert_equal {2 deleted} [r hooks.is_key_removed zset]
r xadd stream 1-1 f v
r xdel stream 1-1
# Stream does not delete object when del entry
catch {[r hooks.is_key_removed stream]} output
assert_match {ERR * removed} $output
r del stream
assert_equal {0 deleted} [r hooks.is_key_removed stream]
r xadd stream 2-1 f v
r del stream
assert_equal {1 deleted} [r hooks.is_key_removed stream]
# delete key because of active expire
set size [r dbsize]
r set active-expire abcd px 1
#ensure active expire
wait_for_condition 50 100 {
[r dbsize] == $size
} else {
fail "Active expire not trigger"
}
assert_equal {abcd expired} [r hooks.is_key_removed active-expire]
# current time is greater than pexpireat
set now [r time]
set mill [expr ([lindex $now 0]*1000)+([lindex $now 1]/1000)]
assert {$mill >= [r hooks.pexpireat active-expire]}
# delete key because of lazy expire
r debug set-active-expire 0
r set lazy-expire abcd px 1
after 10
r get lazy-expire
assert_equal {abcd expired} [r hooks.is_key_removed lazy-expire]
set now [r time]
set mill [expr ([lindex $now 0]*1000)+([lindex $now 1]/1000)]
assert {$mill >= [r hooks.pexpireat lazy-expire]}
r debug set-active-expire 1
# delete key not yet expired
set now [r time]
set expireat [expr ([lindex $now 0]*1000)+([lindex $now 1]/1000)+1000000]
r set not-expire abcd pxat $expireat
r del not-expire
assert_equal {abcd deleted} [r hooks.is_key_removed not-expire]
assert_equal $expireat [r hooks.pexpireat not-expire]
# Test key evict
set used [expr {[s used_memory] - [s mem_not_counted_for_evict]}]
set limit [expr {$used+100*1024}]
set old_policy [lindex [r config get maxmemory-policy] 1]
r config set maxmemory $limit
# We set policy volatile-random, so only keys with ttl will be evicted
r config set maxmemory-policy volatile-random
r setex volatile-key 10000 x
# We use SETBIT here, so we can set a big key and get the used_memory
# bigger than maxmemory. Next command will evict volatile keys. We
# can't use SET, as SET uses big input buffer, so it will fail.
r setbit big-key 1600000 0 ;# this will consume 200kb
r getbit big-key 0
assert_equal {x evicted} [r hooks.is_key_removed volatile-key]
r config set maxmemory-policy $old_policy
r config set maxmemory 0
} {OK} {needs:debug}
test {Test flushdb hooks} {
r flushdb
assert_equal [r hooks.event_last flush-start] 9