From 94f2e7f9f9f7e6eca8f8bd7ae412c34806e68351 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 21 Apr 2020 10:51:46 +0200 Subject: [PATCH] Tracking: NOLOOP internals implementation. --- src/bitops.c | 8 ++-- src/cluster.c | 4 +- src/db.c | 30 +++++++----- src/debug.c | 2 +- src/expire.c | 6 +-- src/geo.c | 4 +- src/hyperloglog.c | 6 +-- src/module.c | 10 ++-- src/server.h | 12 +++-- src/sort.c | 4 +- src/t_hash.c | 10 ++-- src/t_list.c | 20 ++++---- src/t_set.c | 20 ++++---- src/t_stream.c | 6 +-- src/t_string.c | 14 +++--- src/t_zset.c | 12 ++--- src/tracking.c | 118 ++++++++++++++++++++++++++++++++++------------ 17 files changed, 174 insertions(+), 112 deletions(-) diff --git a/src/bitops.c b/src/bitops.c index 496d78b66..f506a881b 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -554,7 +554,7 @@ void setbitCommand(client *c) { byteval &= ~(1 << bit); byteval |= ((on & 0x1) << bit); ((uint8_t*)o->ptr)[byte] = byteval; - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id); server.dirty++; addReply(c, bitval ? shared.cone : shared.czero); @@ -754,11 +754,11 @@ void bitopCommand(client *c) { /* Store the computed value into the target key */ if (maxlen) { o = createObject(OBJ_STRING,res); - setKey(c->db,targetkey,o); + setKey(c,c->db,targetkey,o); notifyKeyspaceEvent(NOTIFY_STRING,"set",targetkey,c->db->id); decrRefCount(o); } else if (dbDelete(c->db,targetkey)) { - signalModifiedKey(c->db,targetkey); + signalModifiedKey(c,c->db,targetkey); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",targetkey,c->db->id); } server.dirty++; @@ -1135,7 +1135,7 @@ void bitfieldGeneric(client *c, int flags) { } if (changes) { - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id); server.dirty += changes; } diff --git a/src/cluster.c b/src/cluster.c index 2377b386b..b103e2fe1 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4982,7 +4982,7 @@ void restoreCommand(client *c) { setExpire(c,c->db,c->argv[1],ttl); } objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000); - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",c->argv[1],c->db->id); addReply(c,shared.ok); server.dirty++; @@ -5329,7 +5329,7 @@ try_again: if (!copy) { /* No COPY option: remove the local key, signal the change. */ dbDelete(c->db,kv[j]); - signalModifiedKey(c->db,kv[j]); + signalModifiedKey(c,c->db,kv[j]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id); server.dirty++; diff --git a/src/db.c b/src/db.c index 59f0cc7a0..dc4a0b63e 100644 --- a/src/db.c +++ b/src/db.c @@ -238,8 +238,10 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { * 3) The expire time of the key is reset (the key is made persistent), * unless 'keepttl' is true. * - * All the new keys in the database should be created via this interface. */ -void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal) { + * All the new keys in the database should be created via this interface. + * The client 'c' argument may be set to NULL if the operation is performed + * in a context where there is no clear client performing the operation. */ +void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) { if (lookupKeyWrite(db,key) == NULL) { dbAdd(db,key,val); } else { @@ -247,12 +249,12 @@ void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal) { } incrRefCount(val); if (!keepttl) removeExpire(db,key); - if (signal) signalModifiedKey(db,key); + if (signal) signalModifiedKey(c,db,key); } /* Common case for genericSetKey() where the TTL is not retained. */ -void setKey(redisDb *db, robj *key, robj *val) { - genericSetKey(db,key,val,0,1); +void setKey(client *c, redisDb *db, robj *key, robj *val) { + genericSetKey(c,db,key,val,0,1); } /* Return true if the specified key exists in the specified database. @@ -467,9 +469,11 @@ long long dbTotalServerKeyCount() { * Every time a DB is flushed the function signalFlushDb() is called. *----------------------------------------------------------------------------*/ -void signalModifiedKey(redisDb *db, robj *key) { +/* Note that the 'c' argument may be NULL if the key was modified out of + * a context of a client. */ +void signalModifiedKey(client *c, redisDb *db, robj *key) { touchWatchedKey(db,key); - trackingInvalidateKey(key); + trackingInvalidateKey(c,key); } void signalFlushedDb(int dbid) { @@ -563,7 +567,7 @@ void delGenericCommand(client *c, int lazy) { int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : dbSyncDelete(c->db,c->argv[j]); if (deleted) { - signalModifiedKey(c->db,c->argv[j]); + signalModifiedKey(c,c->db,c->argv[j]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del",c->argv[j],c->db->id); server.dirty++; @@ -1003,8 +1007,8 @@ void renameGenericCommand(client *c, int nx) { dbAdd(c->db,c->argv[2],o); if (expire != -1) setExpire(c,c->db,c->argv[2],expire); dbDelete(c->db,c->argv[1]); - signalModifiedKey(c->db,c->argv[1]); - signalModifiedKey(c->db,c->argv[2]); + signalModifiedKey(c,c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[2]); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to", @@ -1072,8 +1076,8 @@ void moveCommand(client *c) { /* OK! key moved, free the entry in the source DB */ dbDelete(src,c->argv[1]); - signalModifiedKey(src,c->argv[1]); - signalModifiedKey(dst,c->argv[1]); + signalModifiedKey(c,src,c->argv[1]); + signalModifiedKey(c,dst,c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC, "move_from",c->argv[1],src->id); notifyKeyspaceEvent(NOTIFY_GENERIC, @@ -1317,7 +1321,7 @@ int expireIfNeeded(redisDb *db, robj *key) { "expired",key,db->id); int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key); - if (retval) signalModifiedKey(db,key); + if (retval) signalModifiedKey(NULL,db,key); return retval; } diff --git a/src/debug.c b/src/debug.c index 1351b2536..cbb56cb71 100644 --- a/src/debug.c +++ b/src/debug.c @@ -588,7 +588,7 @@ NULL memcpy(val->ptr, buf, valsize<=buflen? valsize: buflen); } dbAdd(c->db,key,val); - signalModifiedKey(c->db,key); + signalModifiedKey(c,c->db,key); decrRefCount(key); } addReply(c,shared.ok); diff --git a/src/expire.c b/src/expire.c index c102a01ff..30a27193d 100644 --- a/src/expire.c +++ b/src/expire.c @@ -64,7 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { dbSyncDelete(db,keyobj); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",keyobj,db->id); - trackingInvalidateKey(keyobj); + trackingInvalidateKey(NULL,keyobj); decrRefCount(keyobj); server.stat_expiredkeys++; return 1; @@ -519,14 +519,14 @@ void expireGenericCommand(client *c, long long basetime, int unit) { /* Replicate/AOF this as an explicit DEL or UNLINK. */ aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del; rewriteClientCommandVector(c,2,aux,key); - signalModifiedKey(c->db,key); + signalModifiedKey(c,c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); addReply(c, shared.cone); return; } else { setExpire(c,c->db,key,when); addReply(c,shared.cone); - signalModifiedKey(c->db,key); + signalModifiedKey(c,c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); server.dirty++; return; diff --git a/src/geo.c b/src/geo.c index f7920a2e2..3e5d5f606 100644 --- a/src/geo.c +++ b/src/geo.c @@ -657,13 +657,13 @@ void georadiusGeneric(client *c, int flags) { if (returned_items) { zsetConvertToZiplistIfNeeded(zobj,maxelelen); - setKey(c->db,storekey,zobj); + setKey(c,c->db,storekey,zobj); decrRefCount(zobj); notifyKeyspaceEvent(NOTIFY_ZSET,"georadiusstore",storekey, c->db->id); server.dirty += returned_items; } else if (dbDelete(c->db,storekey)) { - signalModifiedKey(c->db,storekey); + signalModifiedKey(c,c->db,storekey); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id); server.dirty++; } diff --git a/src/hyperloglog.c b/src/hyperloglog.c index facd99743..721f492a1 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -1209,7 +1209,7 @@ void pfaddCommand(client *c) { } hdr = o->ptr; if (updated) { - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id); server.dirty++; HLL_INVALIDATE_CACHE(hdr); @@ -1300,7 +1300,7 @@ void pfcountCommand(client *c) { * data structure is not modified, since the cached value * may be modified and given that the HLL is a Redis string * we need to propagate the change. */ - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); server.dirty++; } addReplyLongLong(c,card); @@ -1373,7 +1373,7 @@ void pfmergeCommand(client *c) { last hllSparseSet() call. */ HLL_INVALIDATE_CACHE(hdr); - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); /* We generate a PFADD event for PFMERGE for semantical simplicity * since in theory this is a mass-add of elements. */ notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id); diff --git a/src/module.c b/src/module.c index 4d3d9e1af..e3a338dad 100644 --- a/src/module.c +++ b/src/module.c @@ -896,7 +896,7 @@ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) { /* Signals that the key is modified from user's perspective (i.e. invalidate WATCH * and client side caching). */ int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) { - signalModifiedKey(ctx->client->db,keyname); + signalModifiedKey(ctx->client,ctx->client->db,keyname); return REDISMODULE_OK; } @@ -2016,7 +2016,7 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { static void moduleCloseKey(RedisModuleKey *key) { int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); if ((key->mode & REDISMODULE_WRITE) && signal) - signalModifiedKey(key->db,key->key); + signalModifiedKey(key->ctx->client,key->db,key->key); /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */ RM_ZsetRangeStop(key); decrRefCount(key->key); @@ -2157,7 +2157,7 @@ RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) { int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) { if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR; RM_DeleteKey(key); - genericSetKey(key->db,key->key,str,0,0); + genericSetKey(key->ctx->client,key->db,key->key,str,0,0); key->value = str; return REDISMODULE_OK; } @@ -2237,7 +2237,7 @@ int RM_StringTruncate(RedisModuleKey *key, size_t newlen) { if (key->value == NULL) { /* Empty key: create it with the new size. */ robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen)); - genericSetKey(key->db,key->key,o,0,0); + genericSetKey(key->ctx->client,key->db,key->key,o,0,0); key->value = o; decrRefCount(o); } else { @@ -3625,7 +3625,7 @@ int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) { if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR; RM_DeleteKey(key); robj *o = createModuleObject(mt,value); - genericSetKey(key->db,key->key,o,0,0); + genericSetKey(key->ctx->client,key->db,key->key,o,0,0); decrRefCount(o); key->value = o; return REDISMODULE_OK; diff --git a/src/server.h b/src/server.h index d0d5ff154..d39359dce 100644 --- a/src/server.h +++ b/src/server.h @@ -252,7 +252,9 @@ typedef long long ustime_t; /* microsecond time type. */ #define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */ #define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given, depending on optin/optout mode. */ -#define CLIENT_IN_TO_TABLE (1ULL<<37) /* This client is in the timeout table. */ +#define CLIENT_TRACKING_NOLOOP (1ULL<<37) /* Don't send invalidation messages + about writes performed by myself.*/ +#define CLIENT_IN_TO_TABLE (1ULL<<38) /* This client is in the timeout table. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -1683,7 +1685,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...); void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix); void disableTracking(client *c); void trackingRememberKeys(client *c); -void trackingInvalidateKey(robj *keyobj); +void trackingInvalidateKey(client *c, robj *keyobj); void trackingInvalidateKeysOnFlush(int dbid); void trackingLimitUsedSlots(void); uint64_t trackingGetTotalItems(void); @@ -2071,8 +2073,8 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, void dbAdd(redisDb *db, robj *key, robj *val); int dbAddRDBLoad(redisDb *db, sds key, robj *val); void dbOverwrite(redisDb *db, robj *key, robj *val); -void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal); -void setKey(redisDb *db, robj *key, robj *val); +void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal); +void setKey(client *c, redisDb *db, robj *key, robj *val); int dbExists(redisDb *db, robj *key); robj *dbRandomKey(redisDb *db); int dbSyncDelete(redisDb *db, robj *key); @@ -2088,7 +2090,7 @@ void flushAllDataAndResetRDB(int flags); long long dbTotalServerKeyCount(); int selectDb(client *c, int id); -void signalModifiedKey(redisDb *db, robj *key); +void signalModifiedKey(client *c, redisDb *db, robj *key); void signalFlushedDb(int dbid); unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count); unsigned int countKeysInSlot(unsigned int hashslot); diff --git a/src/sort.c b/src/sort.c index db26da158..f269a7731 100644 --- a/src/sort.c +++ b/src/sort.c @@ -570,12 +570,12 @@ void sortCommand(client *c) { } } if (outputlen) { - setKey(c->db,storekey,sobj); + setKey(c,c->db,storekey,sobj); notifyKeyspaceEvent(NOTIFY_LIST,"sortstore",storekey, c->db->id); server.dirty += outputlen; } else if (dbDelete(c->db,storekey)) { - signalModifiedKey(c->db,storekey); + signalModifiedKey(c,c->db,storekey); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id); server.dirty++; } diff --git a/src/t_hash.c b/src/t_hash.c index b9f0db7fc..866bcd25b 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -521,7 +521,7 @@ void hsetnxCommand(client *c) { } else { hashTypeSet(o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY); addReply(c, shared.cone); - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); server.dirty++; } @@ -551,7 +551,7 @@ void hsetCommand(client *c) { /* HMSET */ addReply(c, shared.ok); } - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); server.dirty++; } @@ -586,7 +586,7 @@ void hincrbyCommand(client *c) { new = sdsfromlonglong(value); hashTypeSet(o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE); addReplyLongLong(c,value); - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hincrby",c->argv[1],c->db->id); server.dirty++; } @@ -625,7 +625,7 @@ void hincrbyfloatCommand(client *c) { new = sdsnewlen(buf,len); hashTypeSet(o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE); addReplyBulkCBuffer(c,buf,len); - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id); server.dirty++; @@ -721,7 +721,7 @@ void hdelCommand(client *c) { } } if (deleted) { - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hdel",c->argv[1],c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1], diff --git a/src/t_list.c b/src/t_list.c index eaeaa8e48..4770a2272 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -217,7 +217,7 @@ void pushGenericCommand(client *c, int where) { if (pushed) { char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); } server.dirty += pushed; @@ -247,7 +247,7 @@ void pushxGenericCommand(client *c, int where) { if (pushed) { char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); } server.dirty += pushed; @@ -292,7 +292,7 @@ void linsertCommand(client *c) { listTypeReleaseIterator(iter); if (inserted) { - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_LIST,"linsert", c->argv[1],c->db->id); server.dirty++; @@ -355,7 +355,7 @@ void lsetCommand(client *c) { addReply(c,shared.outofrangeerr); } else { addReply(c,shared.ok); - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id); server.dirty++; } @@ -382,7 +382,7 @@ void popGenericCommand(client *c, int where) { c->argv[1],c->db->id); dbDelete(c->db,c->argv[1]); } - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); server.dirty++; } } @@ -482,7 +482,7 @@ void ltrimCommand(client *c) { dbDelete(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); } - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); server.dirty++; addReply(c,shared.ok); } @@ -519,7 +519,7 @@ void lremCommand(client *c) { listTypeReleaseIterator(li); if (removed) { - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id); } @@ -555,7 +555,7 @@ void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) { server.list_compress_depth); dbAdd(c->db,dstkey,dstobj); } - signalModifiedKey(c->db,dstkey); + signalModifiedKey(c,c->db,dstkey); listTypePush(dstobj,value,LIST_HEAD); notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id); /* Always send the pushed value to the client. */ @@ -593,7 +593,7 @@ void rpoplpushCommand(client *c) { notifyKeyspaceEvent(NOTIFY_GENERIC,"del", touchedkey,c->db->id); } - signalModifiedKey(c->db,touchedkey); + signalModifiedKey(c,c->db,touchedkey); decrRefCount(touchedkey); server.dirty++; if (c->cmd->proc == brpoplpushCommand) { @@ -708,7 +708,7 @@ void blockingPopGenericCommand(client *c, int where) { notifyKeyspaceEvent(NOTIFY_GENERIC,"del", c->argv[j],c->db->id); } - signalModifiedKey(c->db,c->argv[j]); + signalModifiedKey(c,c->db,c->argv[j]); server.dirty++; /* Replicate it as an [LR]POP instead of B[LR]POP. */ diff --git a/src/t_set.c b/src/t_set.c index 60cf22d8c..c2e73a6e6 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -280,7 +280,7 @@ void saddCommand(client *c) { if (setTypeAdd(set,c->argv[j]->ptr)) added++; } if (added) { - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id); } server.dirty += added; @@ -305,7 +305,7 @@ void sremCommand(client *c) { } } if (deleted) { - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1], @@ -358,8 +358,8 @@ void smoveCommand(client *c) { dbAdd(c->db,c->argv[2],dstset); } - signalModifiedKey(c->db,c->argv[1]); - signalModifiedKey(c->db,c->argv[2]); + signalModifiedKey(c,c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[2]); server.dirty++; /* An extra key has changed when ele was successfully added to dstset */ @@ -444,7 +444,7 @@ void spopWithCountCommand(client *c) { /* Propagate this command as an DEL operation */ rewriteClientCommandVector(c,2,shared.del,c->argv[1]); - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); server.dirty++; return; } @@ -546,7 +546,7 @@ void spopWithCountCommand(client *c) { * the alsoPropagate() API. */ decrRefCount(propargv[0]); preventCommandPropagation(c); - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); server.dirty++; } @@ -599,7 +599,7 @@ void spopCommand(client *c) { } /* Set has been modified */ - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); server.dirty++; } @@ -808,7 +808,7 @@ void sinterGenericCommand(client *c, robj **setkeys, zfree(sets); if (dstkey) { if (dbDelete(c->db,dstkey)) { - signalModifiedKey(c->db,dstkey); + signalModifiedKey(c,c->db,dstkey); server.dirty++; } addReply(c,shared.czero); @@ -908,7 +908,7 @@ void sinterGenericCommand(client *c, robj **setkeys, notifyKeyspaceEvent(NOTIFY_GENERIC,"del", dstkey,c->db->id); } - signalModifiedKey(c->db,dstkey); + signalModifiedKey(c,c->db,dstkey); server.dirty++; } else { setDeferredSetLen(c,replylen,cardinality); @@ -1083,7 +1083,7 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, notifyKeyspaceEvent(NOTIFY_GENERIC,"del", dstkey,c->db->id); } - signalModifiedKey(c->db,dstkey); + signalModifiedKey(c,c->db,dstkey); server.dirty++; } zfree(sets); diff --git a/src/t_stream.c b/src/t_stream.c index 155167af9..3efaa4509 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1262,7 +1262,7 @@ void xaddCommand(client *c) { } addReplyStreamID(c,&id); - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); server.dirty++; @@ -2390,7 +2390,7 @@ void xdelCommand(client *c) { /* Propagate the write if needed. */ if (deleted) { - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id); server.dirty += deleted; } @@ -2467,7 +2467,7 @@ void xtrimCommand(client *c) { /* Propagate the write if needed. */ if (deleted) { - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); server.dirty += deleted; if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx); diff --git a/src/t_string.c b/src/t_string.c index 335bda404..ef382bb0c 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -84,7 +84,7 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, addReply(c, abort_reply ? abort_reply : shared.null[c->resp]); return; } - genericSetKey(c->db,key,val,flags & OBJ_SET_KEEPTTL,1); + genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1); server.dirty++; if (expire) setExpire(c,c->db,key,mstime()+milliseconds); notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); @@ -183,7 +183,7 @@ void getCommand(client *c) { void getsetCommand(client *c) { if (getGenericCommand(c) == C_ERR) return; c->argv[2] = tryObjectEncoding(c->argv[2]); - setKey(c->db,c->argv[1],c->argv[2]); + setKey(c,c->db,c->argv[1],c->argv[2]); notifyKeyspaceEvent(NOTIFY_STRING,"set",c->argv[1],c->db->id); server.dirty++; } @@ -240,7 +240,7 @@ void setrangeCommand(client *c) { if (sdslen(value) > 0) { o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value)); memcpy((char*)o->ptr+offset,value,sdslen(value)); - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING, "setrange",c->argv[1],c->db->id); server.dirty++; @@ -328,7 +328,7 @@ void msetGenericCommand(client *c, int nx) { for (j = 1; j < c->argc; j += 2) { c->argv[j+1] = tryObjectEncoding(c->argv[j+1]); - setKey(c->db,c->argv[j],c->argv[j+1]); + setKey(c,c->db,c->argv[j],c->argv[j+1]); notifyKeyspaceEvent(NOTIFY_STRING,"set",c->argv[j],c->db->id); } server.dirty += (c->argc-1)/2; @@ -373,7 +373,7 @@ void incrDecrCommand(client *c, long long incr) { dbAdd(c->db,c->argv[1],new); } } - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id); server.dirty++; addReply(c,shared.colon); @@ -423,7 +423,7 @@ void incrbyfloatCommand(client *c) { dbOverwrite(c->db,c->argv[1],new); else dbAdd(c->db,c->argv[1],new); - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"incrbyfloat",c->argv[1],c->db->id); server.dirty++; addReplyBulk(c,new); @@ -467,7 +467,7 @@ void appendCommand(client *c) { o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr)); totlen = sdslen(o->ptr); } - signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"append",c->argv[1],c->db->id); server.dirty++; addReplyLongLong(c,totlen); diff --git a/src/t_zset.c b/src/t_zset.c index 5c000e76f..9c409cd96 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1646,7 +1646,7 @@ reply_to_client: cleanup: zfree(scores); if (added || updated) { - signalModifiedKey(c->db,key); + signalModifiedKey(c,c->db,key); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } @@ -1681,7 +1681,7 @@ void zremCommand(client *c) { notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); - signalModifiedKey(c->db,key); + signalModifiedKey(c,c->db,key); server.dirty += deleted; } addReplyLongLong(c,deleted); @@ -1779,7 +1779,7 @@ void zremrangeGenericCommand(client *c, int rangetype) { /* Step 4: Notifications and reply. */ if (deleted) { char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"}; - signalModifiedKey(c->db,key); + signalModifiedKey(c,c->db,key); notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); @@ -2383,7 +2383,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) { zsetConvertToZiplistIfNeeded(dstobj,maxelelen); dbAdd(c->db,dstkey,dstobj); addReplyLongLong(c,zsetLength(dstobj)); - signalModifiedKey(c->db,dstkey); + signalModifiedKey(c,c->db,dstkey); notifyKeyspaceEvent(NOTIFY_ZSET, (op == SET_OP_UNION) ? "zunionstore" : "zinterstore", dstkey,c->db->id); @@ -2392,7 +2392,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) { decrRefCount(dstobj); addReply(c,shared.czero); if (touched) { - signalModifiedKey(c->db,dstkey); + signalModifiedKey(c,c->db,dstkey); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id); server.dirty++; } @@ -3216,7 +3216,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey if (arraylen == 0) { /* Do this only for the first iteration. */ char *events[2] = {"zpopmin","zpopmax"}; notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id); - signalModifiedKey(c->db,key); + signalModifiedKey(c,c->db,key); } addReplyBulkCBuffer(c,ele,sdslen(ele)); diff --git a/src/tracking.c b/src/tracking.c index 6f7929430..434e086b5 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -245,7 +245,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { * matches one or more prefixes in the prefix table. Later when we * return to the event loop, we'll send invalidation messages to the * clients subscribed to each prefix. */ -void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { +void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) { raxIterator ri; raxStart(&ri,PrefixTable); raxSeek(&ri,"^",NULL,0); @@ -254,7 +254,11 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0) continue; bcastState *bs = ri.data; - raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); + /* We insert the client pointer as associated value in the radix + * tree. This way we know who was the client that did the last + * change to the key, and can avoid sending the notification in the + * case the client is in NOLOOP mode. */ + raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,c,NULL); } raxStop(&ri); } @@ -262,13 +266,17 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { /* This function is called from signalModifiedKey() or other places in Redis * when a key changes value. In the context of keys tracking, our task here is * to send a notification to every client that may have keys about such caching - * slot. */ -void trackingInvalidateKey(robj *keyobj) { + * slot. + * + * Note that 'c' may be NULL in case the operation was performed outside the + * context of a client modifying the database (for instance when we delete a + * key because of expire). */ +void trackingInvalidateKey(client *c, robj *keyobj) { if (TrackingTable == NULL) return; sds sdskey = keyobj->ptr; if (raxSize(PrefixTable) > 0) - trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey)); + trackingRememberKeyToBroadcast(c,sdskey,sdslen(sdskey)); rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); if (ids == raxNotFound) return; @@ -279,19 +287,28 @@ void trackingInvalidateKey(robj *keyobj) { while(raxNext(&ri)) { uint64_t id; memcpy(&id,ri.key,sizeof(id)); - client *c = lookupClientByID(id); + client *target = lookupClientByID(id); /* Note that if the client is in BCAST mode, we don't want to * send invalidation messages that were pending in the case * previously the client was not in BCAST mode. This can happen if * TRACKING is enabled normally, and then the client switches to * BCAST mode. */ - if (c == NULL || - !(c->flags & CLIENT_TRACKING)|| - c->flags & CLIENT_TRACKING_BCAST) + if (target == NULL || + !(target->flags & CLIENT_TRACKING)|| + target->flags & CLIENT_TRACKING_BCAST) { continue; } - sendTrackingMessage(c,sdskey,sdslen(sdskey),0); + + /* If the client enabled the NOLOOP mode, don't send notifications + * about keys changed by the client itself. */ + if (target->flags & CLIENT_TRACKING_NOLOOP && + target == c) + { + continue; + } + + sendTrackingMessage(target,sdskey,sdslen(sdskey),0); } raxStop(&ri); @@ -383,6 +400,54 @@ void trackingLimitUsedSlots(void) { timeout_counter++; } +/* Generate Redis protocol for an array containing all the key names + * in the 'keys' radix tree. If the client is not NULL, the list will not + * include keys that were modified the last time by this client, in order + * to implement the NOLOOP option. + * + * If the resultin array would be empty, NULL is returned instead. */ +sds trackingBuildBroadcastReply(client *c, rax *keys) { + raxIterator ri; + uint64_t count; + + if (c == NULL) { + count = raxSize(keys); + } else { + count = 0; + raxStart(&ri,keys); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (ri.data != c) count++; + } + raxStop(&ri); + + if (count == 0) return NULL; + } + + /* Create the array reply with the list of keys once, then send + * it to all the clients subscribed to this prefix. */ + char buf[32]; + size_t len = ll2string(buf,sizeof(buf),count); + sds proto = sdsempty(); + proto = sdsMakeRoomFor(proto,count*15); + proto = sdscatlen(proto,"*",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + raxStart(&ri,keys); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (c && ri.data == c) continue; + len = ll2string(buf,sizeof(buf),ri.key_len); + proto = sdscatlen(proto,"$",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + proto = sdscatlen(proto,ri.key,ri.key_len); + proto = sdscatlen(proto,"\r\n",2); + } + raxStop(&ri); + return proto; +} + /* This function will run the prefixes of clients in BCAST mode and * keys that were modified about each prefix, and will send the * notifications to each client in each prefix. */ @@ -397,26 +462,10 @@ void trackingBroadcastInvalidationMessages(void) { while(raxNext(&ri)) { bcastState *bs = ri.data; if (raxSize(bs->keys)) { - /* Create the array reply with the list of keys once, then send - * it to all the clients subscribed to this prefix. */ - char buf[32]; - size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); - sds proto = sdsempty(); - proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); - proto = sdscatlen(proto,"*",1); - proto = sdscatlen(proto,buf,len); - proto = sdscatlen(proto,"\r\n",2); - raxStart(&ri2,bs->keys); - raxSeek(&ri2,"^",NULL,0); - while(raxNext(&ri2)) { - len = ll2string(buf,sizeof(buf),ri2.key_len); - proto = sdscatlen(proto,"$",1); - proto = sdscatlen(proto,buf,len); - proto = sdscatlen(proto,"\r\n",2); - proto = sdscatlen(proto,ri2.key,ri2.key_len); - proto = sdscatlen(proto,"\r\n",2); - } - raxStop(&ri2); + + /* Generate the common protocol for all the clients that are + * not using the NOLOOP option. */ + sds proto = trackingBuildBroadcastReply(NULL,bs->keys); /* Send this array of keys to every client in the list. */ raxStart(&ri2,bs->clients); @@ -424,7 +473,14 @@ void trackingBroadcastInvalidationMessages(void) { while(raxNext(&ri2)) { client *c; memcpy(&c,ri2.key,sizeof(c)); - sendTrackingMessage(c,proto,sdslen(proto),1); + if (c->flags & CLIENT_TRACKING_NOLOOP) { + /* This client may have certain keys excluded. */ + sds adhoc = trackingBuildBroadcastReply(c,bs->keys); + sendTrackingMessage(c,adhoc,sdslen(adhoc),1); + sdsfree(adhoc); + } else { + sendTrackingMessage(c,proto,sdslen(proto),1); + } } raxStop(&ri2);