Tracking: NOLOOP internals implementation.

This commit is contained in:
antirez 2020-04-21 10:51:46 +02:00
parent c7db333abb
commit 94f2e7f9f9
17 changed files with 174 additions and 112 deletions

View File

@ -554,7 +554,7 @@ void setbitCommand(client *c) {
byteval &= ~(1 << bit); byteval &= ~(1 << bit);
byteval |= ((on & 0x1) << bit); byteval |= ((on & 0x1) << bit);
((uint8_t*)o->ptr)[byte] = byteval; ((uint8_t*)o->ptr)[byte] = byteval;
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
addReply(c, bitval ? shared.cone : shared.czero); addReply(c, bitval ? shared.cone : shared.czero);
@ -754,11 +754,11 @@ void bitopCommand(client *c) {
/* Store the computed value into the target key */ /* Store the computed value into the target key */
if (maxlen) { if (maxlen) {
o = createObject(OBJ_STRING,res); o = createObject(OBJ_STRING,res);
setKey(c->db,targetkey,o); setKey(c,c->db,targetkey,o);
notifyKeyspaceEvent(NOTIFY_STRING,"set",targetkey,c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"set",targetkey,c->db->id);
decrRefCount(o); decrRefCount(o);
} else if (dbDelete(c->db,targetkey)) { } else if (dbDelete(c->db,targetkey)) {
signalModifiedKey(c->db,targetkey); signalModifiedKey(c,c->db,targetkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",targetkey,c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",targetkey,c->db->id);
} }
server.dirty++; server.dirty++;
@ -1135,7 +1135,7 @@ void bitfieldGeneric(client *c, int flags) {
} }
if (changes) { if (changes) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
server.dirty += changes; server.dirty += changes;
} }

View File

@ -4982,7 +4982,7 @@ void restoreCommand(client *c) {
setExpire(c,c->db,c->argv[1],ttl); setExpire(c,c->db,c->argv[1],ttl);
} }
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000); objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",c->argv[1],c->db->id);
addReply(c,shared.ok); addReply(c,shared.ok);
server.dirty++; server.dirty++;
@ -5329,7 +5329,7 @@ try_again:
if (!copy) { if (!copy) {
/* No COPY option: remove the local key, signal the change. */ /* No COPY option: remove the local key, signal the change. */
dbDelete(c->db,kv[j]); dbDelete(c->db,kv[j]);
signalModifiedKey(c->db,kv[j]); signalModifiedKey(c,c->db,kv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id);
server.dirty++; server.dirty++;

View File

@ -238,8 +238,10 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
* 3) The expire time of the key is reset (the key is made persistent), * 3) The expire time of the key is reset (the key is made persistent),
* unless 'keepttl' is true. * unless 'keepttl' is true.
* *
* All the new keys in the database should be created via this interface. */ * All the new keys in the database should be created via this interface.
void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal) { * The client 'c' argument may be set to NULL if the operation is performed
* in a context where there is no clear client performing the operation. */
void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
if (lookupKeyWrite(db,key) == NULL) { if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val); dbAdd(db,key,val);
} else { } else {
@ -247,12 +249,12 @@ void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal) {
} }
incrRefCount(val); incrRefCount(val);
if (!keepttl) removeExpire(db,key); if (!keepttl) removeExpire(db,key);
if (signal) signalModifiedKey(db,key); if (signal) signalModifiedKey(c,db,key);
} }
/* Common case for genericSetKey() where the TTL is not retained. */ /* Common case for genericSetKey() where the TTL is not retained. */
void setKey(redisDb *db, robj *key, robj *val) { void setKey(client *c, redisDb *db, robj *key, robj *val) {
genericSetKey(db,key,val,0,1); genericSetKey(c,db,key,val,0,1);
} }
/* Return true if the specified key exists in the specified database. /* Return true if the specified key exists in the specified database.
@ -467,9 +469,11 @@ long long dbTotalServerKeyCount() {
* Every time a DB is flushed the function signalFlushDb() is called. * Every time a DB is flushed the function signalFlushDb() is called.
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
void signalModifiedKey(redisDb *db, robj *key) { /* Note that the 'c' argument may be NULL if the key was modified out of
* a context of a client. */
void signalModifiedKey(client *c, redisDb *db, robj *key) {
touchWatchedKey(db,key); touchWatchedKey(db,key);
trackingInvalidateKey(key); trackingInvalidateKey(c,key);
} }
void signalFlushedDb(int dbid) { void signalFlushedDb(int dbid) {
@ -563,7 +567,7 @@ void delGenericCommand(client *c, int lazy) {
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]); dbSyncDelete(c->db,c->argv[j]);
if (deleted) { if (deleted) {
signalModifiedKey(c->db,c->argv[j]); signalModifiedKey(c,c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC, notifyKeyspaceEvent(NOTIFY_GENERIC,
"del",c->argv[j],c->db->id); "del",c->argv[j],c->db->id);
server.dirty++; server.dirty++;
@ -1003,8 +1007,8 @@ void renameGenericCommand(client *c, int nx) {
dbAdd(c->db,c->argv[2],o); dbAdd(c->db,c->argv[2],o);
if (expire != -1) setExpire(c,c->db,c->argv[2],expire); if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
dbDelete(c->db,c->argv[1]); dbDelete(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]); signalModifiedKey(c,c->db,c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
c->argv[1],c->db->id); c->argv[1],c->db->id);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to", notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
@ -1072,8 +1076,8 @@ void moveCommand(client *c) {
/* OK! key moved, free the entry in the source DB */ /* OK! key moved, free the entry in the source DB */
dbDelete(src,c->argv[1]); dbDelete(src,c->argv[1]);
signalModifiedKey(src,c->argv[1]); signalModifiedKey(c,src,c->argv[1]);
signalModifiedKey(dst,c->argv[1]); signalModifiedKey(c,dst,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC, notifyKeyspaceEvent(NOTIFY_GENERIC,
"move_from",c->argv[1],src->id); "move_from",c->argv[1],src->id);
notifyKeyspaceEvent(NOTIFY_GENERIC, notifyKeyspaceEvent(NOTIFY_GENERIC,
@ -1317,7 +1321,7 @@ int expireIfNeeded(redisDb *db, robj *key) {
"expired",key,db->id); "expired",key,db->id);
int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key); dbSyncDelete(db,key);
if (retval) signalModifiedKey(db,key); if (retval) signalModifiedKey(NULL,db,key);
return retval; return retval;
} }

View File

@ -588,7 +588,7 @@ NULL
memcpy(val->ptr, buf, valsize<=buflen? valsize: buflen); memcpy(val->ptr, buf, valsize<=buflen? valsize: buflen);
} }
dbAdd(c->db,key,val); dbAdd(c->db,key,val);
signalModifiedKey(c->db,key); signalModifiedKey(c,c->db,key);
decrRefCount(key); decrRefCount(key);
} }
addReply(c,shared.ok); addReply(c,shared.ok);

View File

@ -64,7 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
dbSyncDelete(db,keyobj); dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED, notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id); "expired",keyobj,db->id);
trackingInvalidateKey(keyobj); trackingInvalidateKey(NULL,keyobj);
decrRefCount(keyobj); decrRefCount(keyobj);
server.stat_expiredkeys++; server.stat_expiredkeys++;
return 1; return 1;
@ -519,14 +519,14 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
/* Replicate/AOF this as an explicit DEL or UNLINK. */ /* Replicate/AOF this as an explicit DEL or UNLINK. */
aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del; aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
rewriteClientCommandVector(c,2,aux,key); rewriteClientCommandVector(c,2,aux,key);
signalModifiedKey(c->db,key); signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
addReply(c, shared.cone); addReply(c, shared.cone);
return; return;
} else { } else {
setExpire(c,c->db,key,when); setExpire(c,c->db,key,when);
addReply(c,shared.cone); addReply(c,shared.cone);
signalModifiedKey(c->db,key); signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
server.dirty++; server.dirty++;
return; return;

View File

@ -657,13 +657,13 @@ void georadiusGeneric(client *c, int flags) {
if (returned_items) { if (returned_items) {
zsetConvertToZiplistIfNeeded(zobj,maxelelen); zsetConvertToZiplistIfNeeded(zobj,maxelelen);
setKey(c->db,storekey,zobj); setKey(c,c->db,storekey,zobj);
decrRefCount(zobj); decrRefCount(zobj);
notifyKeyspaceEvent(NOTIFY_ZSET,"georadiusstore",storekey, notifyKeyspaceEvent(NOTIFY_ZSET,"georadiusstore",storekey,
c->db->id); c->db->id);
server.dirty += returned_items; server.dirty += returned_items;
} else if (dbDelete(c->db,storekey)) { } else if (dbDelete(c->db,storekey)) {
signalModifiedKey(c->db,storekey); signalModifiedKey(c,c->db,storekey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id);
server.dirty++; server.dirty++;
} }

View File

@ -1209,7 +1209,7 @@ void pfaddCommand(client *c) {
} }
hdr = o->ptr; hdr = o->ptr;
if (updated) { if (updated) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
HLL_INVALIDATE_CACHE(hdr); HLL_INVALIDATE_CACHE(hdr);
@ -1300,7 +1300,7 @@ void pfcountCommand(client *c) {
* data structure is not modified, since the cached value * data structure is not modified, since the cached value
* may be modified and given that the HLL is a Redis string * may be modified and given that the HLL is a Redis string
* we need to propagate the change. */ * we need to propagate the change. */
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
addReplyLongLong(c,card); addReplyLongLong(c,card);
@ -1373,7 +1373,7 @@ void pfmergeCommand(client *c) {
last hllSparseSet() call. */ last hllSparseSet() call. */
HLL_INVALIDATE_CACHE(hdr); HLL_INVALIDATE_CACHE(hdr);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
/* We generate a PFADD event for PFMERGE for semantical simplicity /* We generate a PFADD event for PFMERGE for semantical simplicity
* since in theory this is a mass-add of elements. */ * since in theory this is a mass-add of elements. */
notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);

View File

@ -896,7 +896,7 @@ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
/* Signals that the key is modified from user's perspective (i.e. invalidate WATCH /* Signals that the key is modified from user's perspective (i.e. invalidate WATCH
* and client side caching). */ * and client side caching). */
int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) { int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) {
signalModifiedKey(ctx->client->db,keyname); signalModifiedKey(ctx->client,ctx->client->db,keyname);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -2016,7 +2016,7 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
static void moduleCloseKey(RedisModuleKey *key) { static void moduleCloseKey(RedisModuleKey *key) {
int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
if ((key->mode & REDISMODULE_WRITE) && signal) if ((key->mode & REDISMODULE_WRITE) && signal)
signalModifiedKey(key->db,key->key); signalModifiedKey(key->ctx->client,key->db,key->key);
/* TODO: if (key->iter) RM_KeyIteratorStop(kp); */ /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
RM_ZsetRangeStop(key); RM_ZsetRangeStop(key);
decrRefCount(key->key); decrRefCount(key->key);
@ -2157,7 +2157,7 @@ RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) { int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) {
if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR; if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
RM_DeleteKey(key); RM_DeleteKey(key);
genericSetKey(key->db,key->key,str,0,0); genericSetKey(key->ctx->client,key->db,key->key,str,0,0);
key->value = str; key->value = str;
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -2237,7 +2237,7 @@ int RM_StringTruncate(RedisModuleKey *key, size_t newlen) {
if (key->value == NULL) { if (key->value == NULL) {
/* Empty key: create it with the new size. */ /* Empty key: create it with the new size. */
robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen)); robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen));
genericSetKey(key->db,key->key,o,0,0); genericSetKey(key->ctx->client,key->db,key->key,o,0,0);
key->value = o; key->value = o;
decrRefCount(o); decrRefCount(o);
} else { } else {
@ -3625,7 +3625,7 @@ int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) {
if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR; if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
RM_DeleteKey(key); RM_DeleteKey(key);
robj *o = createModuleObject(mt,value); robj *o = createModuleObject(mt,value);
genericSetKey(key->db,key->key,o,0,0); genericSetKey(key->ctx->client,key->db,key->key,o,0,0);
decrRefCount(o); decrRefCount(o);
key->value = o; key->value = o;
return REDISMODULE_OK; return REDISMODULE_OK;

View File

@ -252,7 +252,9 @@ typedef long long ustime_t; /* microsecond time type. */
#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */ #define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */
#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given, #define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given,
depending on optin/optout mode. */ depending on optin/optout mode. */
#define CLIENT_IN_TO_TABLE (1ULL<<37) /* This client is in the timeout table. */ #define CLIENT_TRACKING_NOLOOP (1ULL<<37) /* Don't send invalidation messages
about writes performed by myself.*/
#define CLIENT_IN_TO_TABLE (1ULL<<38) /* This client is in the timeout table. */
/* Client block type (btype field in client structure) /* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */ * if CLIENT_BLOCKED flag is set. */
@ -1683,7 +1685,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix); void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix);
void disableTracking(client *c); void disableTracking(client *c);
void trackingRememberKeys(client *c); void trackingRememberKeys(client *c);
void trackingInvalidateKey(robj *keyobj); void trackingInvalidateKey(client *c, robj *keyobj);
void trackingInvalidateKeysOnFlush(int dbid); void trackingInvalidateKeysOnFlush(int dbid);
void trackingLimitUsedSlots(void); void trackingLimitUsedSlots(void);
uint64_t trackingGetTotalItems(void); uint64_t trackingGetTotalItems(void);
@ -2071,8 +2073,8 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
void dbAdd(redisDb *db, robj *key, robj *val); void dbAdd(redisDb *db, robj *key, robj *val);
int dbAddRDBLoad(redisDb *db, sds key, robj *val); int dbAddRDBLoad(redisDb *db, sds key, robj *val);
void dbOverwrite(redisDb *db, robj *key, robj *val); void dbOverwrite(redisDb *db, robj *key, robj *val);
void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal); void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal);
void setKey(redisDb *db, robj *key, robj *val); void setKey(client *c, redisDb *db, robj *key, robj *val);
int dbExists(redisDb *db, robj *key); int dbExists(redisDb *db, robj *key);
robj *dbRandomKey(redisDb *db); robj *dbRandomKey(redisDb *db);
int dbSyncDelete(redisDb *db, robj *key); int dbSyncDelete(redisDb *db, robj *key);
@ -2088,7 +2090,7 @@ void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount(); long long dbTotalServerKeyCount();
int selectDb(client *c, int id); int selectDb(client *c, int id);
void signalModifiedKey(redisDb *db, robj *key); void signalModifiedKey(client *c, redisDb *db, robj *key);
void signalFlushedDb(int dbid); void signalFlushedDb(int dbid);
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count); unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count);
unsigned int countKeysInSlot(unsigned int hashslot); unsigned int countKeysInSlot(unsigned int hashslot);

View File

@ -570,12 +570,12 @@ void sortCommand(client *c) {
} }
} }
if (outputlen) { if (outputlen) {
setKey(c->db,storekey,sobj); setKey(c,c->db,storekey,sobj);
notifyKeyspaceEvent(NOTIFY_LIST,"sortstore",storekey, notifyKeyspaceEvent(NOTIFY_LIST,"sortstore",storekey,
c->db->id); c->db->id);
server.dirty += outputlen; server.dirty += outputlen;
} else if (dbDelete(c->db,storekey)) { } else if (dbDelete(c->db,storekey)) {
signalModifiedKey(c->db,storekey); signalModifiedKey(c,c->db,storekey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id);
server.dirty++; server.dirty++;
} }

View File

@ -521,7 +521,7 @@ void hsetnxCommand(client *c) {
} else { } else {
hashTypeSet(o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY); hashTypeSet(o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY);
addReply(c, shared.cone); addReply(c, shared.cone);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
@ -551,7 +551,7 @@ void hsetCommand(client *c) {
/* HMSET */ /* HMSET */
addReply(c, shared.ok); addReply(c, shared.ok);
} }
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
@ -586,7 +586,7 @@ void hincrbyCommand(client *c) {
new = sdsfromlonglong(value); new = sdsfromlonglong(value);
hashTypeSet(o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE); hashTypeSet(o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE);
addReplyLongLong(c,value); addReplyLongLong(c,value);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hincrby",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_HASH,"hincrby",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
@ -625,7 +625,7 @@ void hincrbyfloatCommand(client *c) {
new = sdsnewlen(buf,len); new = sdsnewlen(buf,len);
hashTypeSet(o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE); hashTypeSet(o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE);
addReplyBulkCBuffer(c,buf,len); addReplyBulkCBuffer(c,buf,len);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
@ -721,7 +721,7 @@ void hdelCommand(client *c) {
} }
} }
if (deleted) { if (deleted) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hdel",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_HASH,"hdel",c->argv[1],c->db->id);
if (keyremoved) if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1], notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],

View File

@ -217,7 +217,7 @@ void pushGenericCommand(client *c, int where) {
if (pushed) { if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
} }
server.dirty += pushed; server.dirty += pushed;
@ -247,7 +247,7 @@ void pushxGenericCommand(client *c, int where) {
if (pushed) { if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
} }
server.dirty += pushed; server.dirty += pushed;
@ -292,7 +292,7 @@ void linsertCommand(client *c) {
listTypeReleaseIterator(iter); listTypeReleaseIterator(iter);
if (inserted) { if (inserted) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"linsert", notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
c->argv[1],c->db->id); c->argv[1],c->db->id);
server.dirty++; server.dirty++;
@ -355,7 +355,7 @@ void lsetCommand(client *c) {
addReply(c,shared.outofrangeerr); addReply(c,shared.outofrangeerr);
} else { } else {
addReply(c,shared.ok); addReply(c,shared.ok);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
@ -382,7 +382,7 @@ void popGenericCommand(client *c, int where) {
c->argv[1],c->db->id); c->argv[1],c->db->id);
dbDelete(c->db,c->argv[1]); dbDelete(c->db,c->argv[1]);
} }
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
} }
@ -482,7 +482,7 @@ void ltrimCommand(client *c) {
dbDelete(c->db,c->argv[1]); dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
} }
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++; server.dirty++;
addReply(c,shared.ok); addReply(c,shared.ok);
} }
@ -519,7 +519,7 @@ void lremCommand(client *c) {
listTypeReleaseIterator(li); listTypeReleaseIterator(li);
if (removed) { if (removed) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id);
} }
@ -555,7 +555,7 @@ void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
server.list_compress_depth); server.list_compress_depth);
dbAdd(c->db,dstkey,dstobj); dbAdd(c->db,dstkey,dstobj);
} }
signalModifiedKey(c->db,dstkey); signalModifiedKey(c,c->db,dstkey);
listTypePush(dstobj,value,LIST_HEAD); listTypePush(dstobj,value,LIST_HEAD);
notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id); notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
/* Always send the pushed value to the client. */ /* Always send the pushed value to the client. */
@ -593,7 +593,7 @@ void rpoplpushCommand(client *c) {
notifyKeyspaceEvent(NOTIFY_GENERIC,"del", notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
touchedkey,c->db->id); touchedkey,c->db->id);
} }
signalModifiedKey(c->db,touchedkey); signalModifiedKey(c,c->db,touchedkey);
decrRefCount(touchedkey); decrRefCount(touchedkey);
server.dirty++; server.dirty++;
if (c->cmd->proc == brpoplpushCommand) { if (c->cmd->proc == brpoplpushCommand) {
@ -708,7 +708,7 @@ void blockingPopGenericCommand(client *c, int where) {
notifyKeyspaceEvent(NOTIFY_GENERIC,"del", notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[j],c->db->id); c->argv[j],c->db->id);
} }
signalModifiedKey(c->db,c->argv[j]); signalModifiedKey(c,c->db,c->argv[j]);
server.dirty++; server.dirty++;
/* Replicate it as an [LR]POP instead of B[LR]POP. */ /* Replicate it as an [LR]POP instead of B[LR]POP. */

View File

@ -280,7 +280,7 @@ void saddCommand(client *c) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++; if (setTypeAdd(set,c->argv[j]->ptr)) added++;
} }
if (added) { if (added) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
} }
server.dirty += added; server.dirty += added;
@ -305,7 +305,7 @@ void sremCommand(client *c) {
} }
} }
if (deleted) { if (deleted) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);
if (keyremoved) if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1], notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],
@ -358,8 +358,8 @@ void smoveCommand(client *c) {
dbAdd(c->db,c->argv[2],dstset); dbAdd(c->db,c->argv[2],dstset);
} }
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]); signalModifiedKey(c,c->db,c->argv[2]);
server.dirty++; server.dirty++;
/* An extra key has changed when ele was successfully added to dstset */ /* An extra key has changed when ele was successfully added to dstset */
@ -444,7 +444,7 @@ void spopWithCountCommand(client *c) {
/* Propagate this command as an DEL operation */ /* Propagate this command as an DEL operation */
rewriteClientCommandVector(c,2,shared.del,c->argv[1]); rewriteClientCommandVector(c,2,shared.del,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++; server.dirty++;
return; return;
} }
@ -546,7 +546,7 @@ void spopWithCountCommand(client *c) {
* the alsoPropagate() API. */ * the alsoPropagate() API. */
decrRefCount(propargv[0]); decrRefCount(propargv[0]);
preventCommandPropagation(c); preventCommandPropagation(c);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
@ -599,7 +599,7 @@ void spopCommand(client *c) {
} }
/* Set has been modified */ /* Set has been modified */
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++; server.dirty++;
} }
@ -808,7 +808,7 @@ void sinterGenericCommand(client *c, robj **setkeys,
zfree(sets); zfree(sets);
if (dstkey) { if (dstkey) {
if (dbDelete(c->db,dstkey)) { if (dbDelete(c->db,dstkey)) {
signalModifiedKey(c->db,dstkey); signalModifiedKey(c,c->db,dstkey);
server.dirty++; server.dirty++;
} }
addReply(c,shared.czero); addReply(c,shared.czero);
@ -908,7 +908,7 @@ void sinterGenericCommand(client *c, robj **setkeys,
notifyKeyspaceEvent(NOTIFY_GENERIC,"del", notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
dstkey,c->db->id); dstkey,c->db->id);
} }
signalModifiedKey(c->db,dstkey); signalModifiedKey(c,c->db,dstkey);
server.dirty++; server.dirty++;
} else { } else {
setDeferredSetLen(c,replylen,cardinality); setDeferredSetLen(c,replylen,cardinality);
@ -1083,7 +1083,7 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
notifyKeyspaceEvent(NOTIFY_GENERIC,"del", notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
dstkey,c->db->id); dstkey,c->db->id);
} }
signalModifiedKey(c->db,dstkey); signalModifiedKey(c,c->db,dstkey);
server.dirty++; server.dirty++;
} }
zfree(sets); zfree(sets);

View File

@ -1262,7 +1262,7 @@ void xaddCommand(client *c) {
} }
addReplyStreamID(c,&id); addReplyStreamID(c,&id);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
@ -2390,7 +2390,7 @@ void xdelCommand(client *c) {
/* Propagate the write if needed. */ /* Propagate the write if needed. */
if (deleted) { if (deleted) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
server.dirty += deleted; server.dirty += deleted;
} }
@ -2467,7 +2467,7 @@ void xtrimCommand(client *c) {
/* Propagate the write if needed. */ /* Propagate the write if needed. */
if (deleted) { if (deleted) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
server.dirty += deleted; server.dirty += deleted;
if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx); if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);

View File

@ -84,7 +84,7 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire,
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]); addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
return; return;
} }
genericSetKey(c->db,key,val,flags & OBJ_SET_KEEPTTL,1); genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1);
server.dirty++; server.dirty++;
if (expire) setExpire(c,c->db,key,mstime()+milliseconds); if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
@ -183,7 +183,7 @@ void getCommand(client *c) {
void getsetCommand(client *c) { void getsetCommand(client *c) {
if (getGenericCommand(c) == C_ERR) return; if (getGenericCommand(c) == C_ERR) return;
c->argv[2] = tryObjectEncoding(c->argv[2]); c->argv[2] = tryObjectEncoding(c->argv[2]);
setKey(c->db,c->argv[1],c->argv[2]); setKey(c,c->db,c->argv[1],c->argv[2]);
notifyKeyspaceEvent(NOTIFY_STRING,"set",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"set",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
} }
@ -240,7 +240,7 @@ void setrangeCommand(client *c) {
if (sdslen(value) > 0) { if (sdslen(value) > 0) {
o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value)); o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value));
memcpy((char*)o->ptr+offset,value,sdslen(value)); memcpy((char*)o->ptr+offset,value,sdslen(value));
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING, notifyKeyspaceEvent(NOTIFY_STRING,
"setrange",c->argv[1],c->db->id); "setrange",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
@ -328,7 +328,7 @@ void msetGenericCommand(client *c, int nx) {
for (j = 1; j < c->argc; j += 2) { for (j = 1; j < c->argc; j += 2) {
c->argv[j+1] = tryObjectEncoding(c->argv[j+1]); c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
setKey(c->db,c->argv[j],c->argv[j+1]); setKey(c,c->db,c->argv[j],c->argv[j+1]);
notifyKeyspaceEvent(NOTIFY_STRING,"set",c->argv[j],c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"set",c->argv[j],c->db->id);
} }
server.dirty += (c->argc-1)/2; server.dirty += (c->argc-1)/2;
@ -373,7 +373,7 @@ void incrDecrCommand(client *c, long long incr) {
dbAdd(c->db,c->argv[1],new); dbAdd(c->db,c->argv[1],new);
} }
} }
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
addReply(c,shared.colon); addReply(c,shared.colon);
@ -423,7 +423,7 @@ void incrbyfloatCommand(client *c) {
dbOverwrite(c->db,c->argv[1],new); dbOverwrite(c->db,c->argv[1],new);
else else
dbAdd(c->db,c->argv[1],new); dbAdd(c->db,c->argv[1],new);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"incrbyfloat",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"incrbyfloat",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
addReplyBulk(c,new); addReplyBulk(c,new);
@ -467,7 +467,7 @@ void appendCommand(client *c) {
o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr)); o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr));
totlen = sdslen(o->ptr); totlen = sdslen(o->ptr);
} }
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"append",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"append",c->argv[1],c->db->id);
server.dirty++; server.dirty++;
addReplyLongLong(c,totlen); addReplyLongLong(c,totlen);

View File

@ -1646,7 +1646,7 @@ reply_to_client:
cleanup: cleanup:
zfree(scores); zfree(scores);
if (added || updated) { if (added || updated) {
signalModifiedKey(c->db,key); signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET, notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id); incr ? "zincr" : "zadd", key, c->db->id);
} }
@ -1681,7 +1681,7 @@ void zremCommand(client *c) {
notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
if (keyremoved) if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
signalModifiedKey(c->db,key); signalModifiedKey(c,c->db,key);
server.dirty += deleted; server.dirty += deleted;
} }
addReplyLongLong(c,deleted); addReplyLongLong(c,deleted);
@ -1779,7 +1779,7 @@ void zremrangeGenericCommand(client *c, int rangetype) {
/* Step 4: Notifications and reply. */ /* Step 4: Notifications and reply. */
if (deleted) { if (deleted) {
char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"}; char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"};
signalModifiedKey(c->db,key); signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id); notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id);
if (keyremoved) if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
@ -2383,7 +2383,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
zsetConvertToZiplistIfNeeded(dstobj,maxelelen); zsetConvertToZiplistIfNeeded(dstobj,maxelelen);
dbAdd(c->db,dstkey,dstobj); dbAdd(c->db,dstkey,dstobj);
addReplyLongLong(c,zsetLength(dstobj)); addReplyLongLong(c,zsetLength(dstobj));
signalModifiedKey(c->db,dstkey); signalModifiedKey(c,c->db,dstkey);
notifyKeyspaceEvent(NOTIFY_ZSET, notifyKeyspaceEvent(NOTIFY_ZSET,
(op == SET_OP_UNION) ? "zunionstore" : "zinterstore", (op == SET_OP_UNION) ? "zunionstore" : "zinterstore",
dstkey,c->db->id); dstkey,c->db->id);
@ -2392,7 +2392,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
decrRefCount(dstobj); decrRefCount(dstobj);
addReply(c,shared.czero); addReply(c,shared.czero);
if (touched) { if (touched) {
signalModifiedKey(c->db,dstkey); signalModifiedKey(c,c->db,dstkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
server.dirty++; server.dirty++;
} }
@ -3216,7 +3216,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
if (arraylen == 0) { /* Do this only for the first iteration. */ if (arraylen == 0) { /* Do this only for the first iteration. */
char *events[2] = {"zpopmin","zpopmax"}; char *events[2] = {"zpopmin","zpopmax"};
notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id); notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
signalModifiedKey(c->db,key); signalModifiedKey(c,c->db,key);
} }
addReplyBulkCBuffer(c,ele,sdslen(ele)); addReplyBulkCBuffer(c,ele,sdslen(ele));

View File

@ -245,7 +245,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
* matches one or more prefixes in the prefix table. Later when we * matches one or more prefixes in the prefix table. Later when we
* return to the event loop, we'll send invalidation messages to the * return to the event loop, we'll send invalidation messages to the
* clients subscribed to each prefix. */ * clients subscribed to each prefix. */
void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) {
raxIterator ri; raxIterator ri;
raxStart(&ri,PrefixTable); raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0); raxSeek(&ri,"^",NULL,0);
@ -254,7 +254,11 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0) if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
continue; continue;
bcastState *bs = ri.data; bcastState *bs = ri.data;
raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); /* We insert the client pointer as associated value in the radix
* tree. This way we know who was the client that did the last
* change to the key, and can avoid sending the notification in the
* case the client is in NOLOOP mode. */
raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,c,NULL);
} }
raxStop(&ri); raxStop(&ri);
} }
@ -262,13 +266,17 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
/* This function is called from signalModifiedKey() or other places in Redis /* This function is called from signalModifiedKey() or other places in Redis
* when a key changes value. In the context of keys tracking, our task here is * when a key changes value. In the context of keys tracking, our task here is
* to send a notification to every client that may have keys about such caching * to send a notification to every client that may have keys about such caching
* slot. */ * slot.
void trackingInvalidateKey(robj *keyobj) { *
* Note that 'c' may be NULL in case the operation was performed outside the
* context of a client modifying the database (for instance when we delete a
* key because of expire). */
void trackingInvalidateKey(client *c, robj *keyobj) {
if (TrackingTable == NULL) return; if (TrackingTable == NULL) return;
sds sdskey = keyobj->ptr; sds sdskey = keyobj->ptr;
if (raxSize(PrefixTable) > 0) if (raxSize(PrefixTable) > 0)
trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey)); trackingRememberKeyToBroadcast(c,sdskey,sdslen(sdskey));
rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
if (ids == raxNotFound) return; if (ids == raxNotFound) return;
@ -279,19 +287,28 @@ void trackingInvalidateKey(robj *keyobj) {
while(raxNext(&ri)) { while(raxNext(&ri)) {
uint64_t id; uint64_t id;
memcpy(&id,ri.key,sizeof(id)); memcpy(&id,ri.key,sizeof(id));
client *c = lookupClientByID(id); client *target = lookupClientByID(id);
/* Note that if the client is in BCAST mode, we don't want to /* Note that if the client is in BCAST mode, we don't want to
* send invalidation messages that were pending in the case * send invalidation messages that were pending in the case
* previously the client was not in BCAST mode. This can happen if * previously the client was not in BCAST mode. This can happen if
* TRACKING is enabled normally, and then the client switches to * TRACKING is enabled normally, and then the client switches to
* BCAST mode. */ * BCAST mode. */
if (c == NULL || if (target == NULL ||
!(c->flags & CLIENT_TRACKING)|| !(target->flags & CLIENT_TRACKING)||
c->flags & CLIENT_TRACKING_BCAST) target->flags & CLIENT_TRACKING_BCAST)
{ {
continue; continue;
} }
sendTrackingMessage(c,sdskey,sdslen(sdskey),0);
/* If the client enabled the NOLOOP mode, don't send notifications
* about keys changed by the client itself. */
if (target->flags & CLIENT_TRACKING_NOLOOP &&
target == c)
{
continue;
}
sendTrackingMessage(target,sdskey,sdslen(sdskey),0);
} }
raxStop(&ri); raxStop(&ri);
@ -383,6 +400,54 @@ void trackingLimitUsedSlots(void) {
timeout_counter++; timeout_counter++;
} }
/* Generate Redis protocol for an array containing all the key names
* in the 'keys' radix tree. If the client is not NULL, the list will not
* include keys that were modified the last time by this client, in order
* to implement the NOLOOP option.
*
* If the resultin array would be empty, NULL is returned instead. */
sds trackingBuildBroadcastReply(client *c, rax *keys) {
raxIterator ri;
uint64_t count;
if (c == NULL) {
count = raxSize(keys);
} else {
count = 0;
raxStart(&ri,keys);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
if (ri.data != c) count++;
}
raxStop(&ri);
if (count == 0) return NULL;
}
/* Create the array reply with the list of keys once, then send
* it to all the clients subscribed to this prefix. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),count);
sds proto = sdsempty();
proto = sdsMakeRoomFor(proto,count*15);
proto = sdscatlen(proto,"*",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
raxStart(&ri,keys);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
if (c && ri.data == c) continue;
len = ll2string(buf,sizeof(buf),ri.key_len);
proto = sdscatlen(proto,"$",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
proto = sdscatlen(proto,ri.key,ri.key_len);
proto = sdscatlen(proto,"\r\n",2);
}
raxStop(&ri);
return proto;
}
/* This function will run the prefixes of clients in BCAST mode and /* This function will run the prefixes of clients in BCAST mode and
* keys that were modified about each prefix, and will send the * keys that were modified about each prefix, and will send the
* notifications to each client in each prefix. */ * notifications to each client in each prefix. */
@ -397,26 +462,10 @@ void trackingBroadcastInvalidationMessages(void) {
while(raxNext(&ri)) { while(raxNext(&ri)) {
bcastState *bs = ri.data; bcastState *bs = ri.data;
if (raxSize(bs->keys)) { if (raxSize(bs->keys)) {
/* Create the array reply with the list of keys once, then send
* it to all the clients subscribed to this prefix. */ /* Generate the common protocol for all the clients that are
char buf[32]; * not using the NOLOOP option. */
size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); sds proto = trackingBuildBroadcastReply(NULL,bs->keys);
sds proto = sdsempty();
proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15);
proto = sdscatlen(proto,"*",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
raxStart(&ri2,bs->keys);
raxSeek(&ri2,"^",NULL,0);
while(raxNext(&ri2)) {
len = ll2string(buf,sizeof(buf),ri2.key_len);
proto = sdscatlen(proto,"$",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
proto = sdscatlen(proto,ri2.key,ri2.key_len);
proto = sdscatlen(proto,"\r\n",2);
}
raxStop(&ri2);
/* Send this array of keys to every client in the list. */ /* Send this array of keys to every client in the list. */
raxStart(&ri2,bs->clients); raxStart(&ri2,bs->clients);
@ -424,8 +473,15 @@ void trackingBroadcastInvalidationMessages(void) {
while(raxNext(&ri2)) { while(raxNext(&ri2)) {
client *c; client *c;
memcpy(&c,ri2.key,sizeof(c)); memcpy(&c,ri2.key,sizeof(c));
if (c->flags & CLIENT_TRACKING_NOLOOP) {
/* This client may have certain keys excluded. */
sds adhoc = trackingBuildBroadcastReply(c,bs->keys);
sendTrackingMessage(c,adhoc,sdslen(adhoc),1);
sdsfree(adhoc);
} else {
sendTrackingMessage(c,proto,sdslen(proto),1); sendTrackingMessage(c,proto,sdslen(proto),1);
} }
}
raxStop(&ri2); raxStop(&ri2);
/* Clean up: we can remove everything from this state, because we /* Clean up: we can remove everything from this state, because we