diff --git a/src/Makefile b/src/Makefile index 3a9365f78..be4c07369 100644 --- a/src/Makefile +++ b/src/Makefile @@ -209,7 +209,7 @@ endif REDIS_SERVER_NAME=keydb-server REDIS_SENTINEL_NAME=keydb-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark diff --git a/src/db.cpp b/src/db.cpp index 044a2bcd5..b4683c321 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -451,6 +451,7 @@ int selectDb(client *c, int id) { void signalModifiedKey(redisDb *db, robj *key) { touchWatchedKey(db,key); + if (g_pserver->tracking_clients) trackingInvalidateKey(key); } void signalFlushedDb(int dbid) { diff --git a/src/debug.cpp b/src/debug.cpp index 2fbbd9e2f..3485df967 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -706,7 +706,7 @@ void _serverAssertPrintClientInfo(const client *c) { bugReportStart(); serverLog(LL_WARNING,"=== ASSERTION FAILED CLIENT CONTEXT ==="); - serverLog(LL_WARNING,"client->flags = %d", static_cast(c->flags)); + serverLog(LL_WARNING,"client->flags = %llu", static_cast(c->flags)); serverLog(LL_WARNING,"client->fd = %d", c->fd); serverLog(LL_WARNING,"client->argc = %d", c->argc); for (j=0; j < c->argc; j++) { diff --git a/src/expire.cpp b/src/expire.cpp index 64a430389..5a0abbb06 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -64,6 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { dbSyncDelete(db,keyobj); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",keyobj,db->id); + if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); decrRefCount(keyobj); g_pserver->stat_expiredkeys++; return 1; diff --git a/src/module.cpp b/src/module.cpp index 568205f90..7d825df27 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -1252,6 +1252,17 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { return REDISMODULE_OK; } +/* Reply with a bulk string, taking in input a C buffer pointer that is + * assumed to be null-terminated. + * + * The function always returns REDISMODULE_OK. */ +int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { + client *c = moduleGetReplyClient(ctx); + if (c == NULL) return REDISMODULE_OK; + addReplyBulkCString(c,(char*)buf); + return REDISMODULE_OK; +} + /* Reply with a bulk string, taking in input a RedisModuleString object. * * The function always returns REDISMODULE_OK. */ @@ -1465,6 +1476,9 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { if (g_pserver->cluster_enabled) flags |= REDISMODULE_CTX_FLAGS_CLUSTER; + if (g_pserver->loading) + flags |= REDISMODULE_CTX_FLAGS_LOADING; + /* Maxmemory and eviction policy */ if (g_pserver->maxmemory > 0) { flags |= REDISMODULE_CTX_FLAGS_MAXMEMORY; diff --git a/src/networking.cpp b/src/networking.cpp index 92e2e42a5..9bd70c35e 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -174,6 +174,7 @@ client *createClient(int fd, int iel) { c->bufAsync = NULL; c->buflenAsync = 0; c->bufposAsync = 0; + c->client_tracking_redirection = 0; memset(c->uuid, 0, UUID_BINARY_LEN); listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); @@ -1264,6 +1265,9 @@ void unlinkClient(client *c) { serverAssert(fFound); c->fPendingAsyncWrite = FALSE; } + + /* Clear the tracking status. */ + if (c->flags & CLIENT_TRACKING) disableTracking(c); } void freeClient(client *c) { @@ -2245,6 +2249,8 @@ sds catClientInfoString(sds s, client *client) { if (client->flags & CLIENT_PUBSUB) *p++ = 'P'; if (client->flags & CLIENT_MULTI) *p++ = 'x'; if (client->flags & CLIENT_BLOCKED) *p++ = 'b'; + if (client->flags & CLIENT_TRACKING) *p++ = 't'; + if (client->flags & CLIENT_TRACKING_BROKEN_REDIR) *p++ = 'R'; if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd'; if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c'; if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u'; @@ -2357,6 +2363,7 @@ void clientCommand(client *c) { "reply (on|off|skip) -- Control the replies sent to the current connection.", "setname -- Assign the name to the current connection.", "unblock [TIMEOUT|ERROR] -- Unblock the specified blocked client.", +"tracking (on|off) [REDIRECT ] -- Enable client keys tracking for client side caching.", NULL }; addReplyHelp(c, help); @@ -2515,21 +2522,57 @@ NULL } else { addReply(c,shared.czero); } - } else if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"setname") && c->argc == 3) { + } else if (!strcasecmp(szFromObj(c->argv[1]),"setname") && c->argc == 3) { + /* CLIENT SETNAME */ if (clientSetNameOrReply(c,c->argv[2]) == C_OK) addReply(c,shared.ok); - } else if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"getname") && c->argc == 2) { + } else if (!strcasecmp(szFromObj(c->argv[1]),"getname") && c->argc == 2) { + /* CLIENT GETNAME */ if (c->name) addReplyBulk(c,c->name); else addReplyNull(c, shared.nullbulk); - } else if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"pause") && c->argc == 3) { + } else if (!strcasecmp(szFromObj(c->argv[1]),"pause") && c->argc == 3) { + /* CLIENT PAUSE */ long long duration; - if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,UNIT_MILLISECONDS) - != C_OK) return; + if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration, + UNIT_MILLISECONDS) != C_OK) return; pauseClients(duration); addReply(c,shared.ok); + } else if (!strcasecmp(szFromObj(c->argv[1]),"tracking") && + (c->argc == 3 || c->argc == 5)) + { + /* CLIENT TRACKING (on|off) [REDIRECT ] */ + long long redir = 0; + + /* Parse the redirection option: we'll require the client with + * the specified ID to exist right now, even if it is possible + * it will get disconnected later. */ + if (c->argc == 5) { + if (strcasecmp(szFromObj(c->argv[3]),"redirect") != 0) { + addReply(c,shared.syntaxerr); + return; + } else { + if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) != + C_OK) return; + if (lookupClientByID(redir) == NULL) { + addReplyError(c,"The client ID you want redirect to " + "does not exist"); + return; + } + } + } + + if (!strcasecmp(szFromObj(c->argv[2]),"on")) { + enableTracking(c,redir); + } else if (!strcasecmp(szFromObj(c->argv[2]),"off")) { + disableTracking(c); + } else { + addReply(c,shared.syntaxerr); + return; + } + addReply(c,shared.ok); } else { addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try CLIENT HELP", (char*)ptrFromObj(c->argv[1])); } diff --git a/src/redismodule.h b/src/redismodule.h index 217025319..fae755b0a 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -91,6 +91,8 @@ extern "C" { #define REDISMODULE_CTX_FLAGS_OOM_WARNING (1<<11) /* The command was sent over the replication link. */ #define REDISMODULE_CTX_FLAGS_REPLICATED (1<<12) +/* Redis is currently loading either from AOF or RDB. */ +#define REDISMODULE_CTX_FLAGS_LOADING (1<<13) #define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */ @@ -230,6 +232,7 @@ int REDISMODULE_API_FUNC(RedisModule_ReplyWithSimpleString)(RedisModuleCtx *ctx, int REDISMODULE_API_FUNC(RedisModule_ReplyWithArray)(RedisModuleCtx *ctx, long len); void REDISMODULE_API_FUNC(RedisModule_ReplySetArrayLength)(RedisModuleCtx *ctx, long len); int REDISMODULE_API_FUNC(RedisModule_ReplyWithStringBuffer)(RedisModuleCtx *ctx, const char *buf, size_t len); +int REDISMODULE_API_FUNC(RedisModule_ReplyWithCString)(RedisModuleCtx *ctx, const char *buf); int REDISMODULE_API_FUNC(RedisModule_ReplyWithString)(RedisModuleCtx *ctx, RedisModuleString *str); int REDISMODULE_API_FUNC(RedisModule_ReplyWithNull)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_ReplyWithDouble)(RedisModuleCtx *ctx, double d); @@ -380,6 +383,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ReplyWithArray); REDISMODULE_GET_API(ReplySetArrayLength); REDISMODULE_GET_API(ReplyWithStringBuffer); + REDISMODULE_GET_API(ReplyWithCString); REDISMODULE_GET_API(ReplyWithString); REDISMODULE_GET_API(ReplyWithNull); REDISMODULE_GET_API(ReplyWithCallReply); diff --git a/src/server.cpp b/src/server.cpp index c40efc7ac..ebdca3234 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3375,6 +3375,7 @@ void call(client *c, int flags) { latencyAddSampleIfNeeded(latency_event,duration/1000); slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); } + if (flags & CMD_CALL_STATS) { /* use the real command that was executed (cmd and lastamc) may be * different, in case of MULTI-EXEC or re-written commands such as @@ -3448,6 +3449,16 @@ void call(client *c, int flags) { ProcessPendingAsyncWrites(); g_pserver->also_propagate = prev_also_propagate; + + /* If the client has keys tracking enabled for client side caching, + * make sure to remember the keys it fetched via this command. */ + if (c->cmd->flags & CMD_READONLY) { + client *caller = (c->flags & CLIENT_LUA && g_pserver->lua_caller) ? + g_pserver->lua_caller : c; + if (caller->flags & CLIENT_TRACKING) + trackingRememberKeys(caller); + } + g_pserver->stat_numcommands++; } diff --git a/src/server.h b/src/server.h index f359e9852..03b4897c5 100644 --- a/src/server.h +++ b/src/server.h @@ -327,8 +327,8 @@ public: #define AOF_WAIT_REWRITE 2 /* AOF waits rewrite to start appending */ /* Client flags */ -#define CLIENT_SLAVE (1<<0) /* This client is a slave server */ -#define CLIENT_MASTER (1<<1) /* This client is a master server */ +#define CLIENT_SLAVE (1<<0) /* This client is a repliaca */ +#define CLIENT_MASTER (1<<1) /* This client is a master */ #define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */ #define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */ #define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */ @@ -359,7 +359,17 @@ public: #define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */ #define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */ #define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */ -#define CLIENT_FORCE_REPLY (1<<29) /* Should addReply be forced to write the text? */ +#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put + in the list of clients we can read + from. */ +#define CLIENT_PENDING_COMMAND (1<<30) /* Used in threaded I/O to signal after + we return single threaded that the + client has already pending commands + to be executed. */ +#define CLIENT_TRACKING (1<<31) /* Client enabled keys tracking in order to + perform client side caching. */ +#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */ +#define CLIENT_FORCE_REPLY (1ULL<<33) /* Should addReply be forced to write the text? */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -930,7 +940,7 @@ typedef struct client { time_t ctime; /* Client creation time. */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; - std::atomic flags; /* Client flags: CLIENT_* macros. */ + std::atomic flags; /* Client flags: CLIENT_* macros. */ int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */ int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */ @@ -966,6 +976,11 @@ typedef struct client { /* UUIDs are transient and lost when the server is shut down */ unsigned char uuid[UUID_BINARY_LEN]; + /* If this client is in tracking mode and this field is non zero, + * invalidation messages for keys fetched by this client will be send to + * the specified client ID. */ + uint64_t client_tracking_redirection; + /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; @@ -1460,6 +1475,8 @@ struct redisServer { unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/ unsigned int blocked_clients_by_type[BLOCKED_NUM]; list *ready_keys; /* List of readyList structures for BLPOP & co */ + /* Client side caching. */ + unsigned int tracking_clients; /* # of clients with tracking enabled.*/ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -1796,6 +1813,7 @@ void addReplyPushLenAsync(client *c, long length); void addReplyLongLongAsync(client *c, long long ll); void ProcessPendingAsyncWrites(void); +client *lookupClientByID(uint64_t id); #ifdef __GNUC__ void addReplyErrorFormat(client *c, const char *fmt, ...) @@ -1807,6 +1825,12 @@ void addReplyErrorFormat(client *c, const char *fmt, ...); void addReplyStatusFormat(client *c, const char *fmt, ...); #endif +/* Client side caching (tracking mode) */ +void enableTracking(client *c, uint64_t redirect_to); +void disableTracking(client *c); +void trackingRememberKeys(client *c); +void trackingInvalidateKey(robj *keyobj); + /* List data type */ void listTypeTryConversion(robj *subject, robj *value); void listTypePush(robj *subject, robj *value, int where); @@ -2134,6 +2158,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify); void freePubsubPattern(const void *p); int listMatchPubsubPattern(void *a, void *b); int pubsubPublishMessage(robj *channel, robj *message); +void addReplyPubsubMessage(client *c, robj *channel, robj *msg); /* Keyspace events notification */ void notifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); diff --git a/src/tracking.cpp b/src/tracking.cpp new file mode 100644 index 000000000..bcc98eac7 --- /dev/null +++ b/src/tracking.cpp @@ -0,0 +1,174 @@ +/* tracking.c - Client side caching: keys tracking and invalidation + * + * Copyright (c) 2019, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" + +/* The tracking table is constituted by 2^24 radix trees (each tree, and the + * table itself, are allocated in a lazy way only when needed) tracking + * clients that may have certain keys in their local, client side, cache. + * + * Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash + * slots, however here the function we use is crc64, taking the least + * significant 24 bits of the output. + * + * When a client enables tracking with "CLIENT TRACKING on", each key served to + * the client is hashed to one of such slots, and Redis will remember what + * client may have keys about such slot. Later, when a key in a given slot is + * modified, all the clients that may have local copies of keys in that slot + * will receive an invalidation message. There is no distinction of database + * number: a single table is used. + * + * Clients will normally take frequently requested objects in memory, removing + * them when invalidation messages are received. A strategy clients may use is + * to just cache objects in a dictionary, associating to each cached object + * some incremental epoch, or just a timestamp. When invalidation messages are + * received clients may store, in a different table, the timestamp (or epoch) + * of the invalidation of such given slot: later when accessing objects, the + * eviction of stale objects may be performed in a lazy way by checking if the + * cached object timestamp is older than the invalidation timestamp for such + * objects. + * + * The output of the 24 bit hash function is very large (more than 16 million + * possible slots), so clients that may want to use less resources may only + * use the most significant bits instead of the full 24 bits. */ +#define TRACKING_TABLE_SIZE (1<<24) +rax **TrackingTable = NULL; +robj *TrackingChannelName; + +/* Remove the tracking state from the client 'c'. Note that there is not much + * to do for us here, if not to decrement the counter of the clients in + * tracking mode, because we just store the ID of the client in the tracking + * table, so we'll remove the ID reference in a lazy way. Otherwise when a + * client with many entries in the table is removed, it would cost a lot of + * time to do the cleanup. */ +void disableTracking(client *c) { + if (c->flags & CLIENT_TRACKING) { + g_pserver->tracking_clients--; + c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR); + } +} + +/* Enable the tracking state for the client 'c', and as a side effect allocates + * the tracking table if needed. If the 'redirect_to' argument is non zero, the + * invalidation messages for this client will be sent to the client ID + * specified by the 'redirect_to' argument. Note that if such client will + * eventually get freed, we'll send a message to the original client to + * inform it of the condition. Multiple clients can redirect the invalidation + * messages to the same client ID. */ +void enableTracking(client *c, uint64_t redirect_to) { + if (c->flags & CLIENT_TRACKING) return; + c->flags |= CLIENT_TRACKING; + c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR; + c->client_tracking_redirection = redirect_to; + g_pserver->tracking_clients++; + if (TrackingTable == NULL) { + TrackingTable = (rax**)zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE, MALLOC_LOCAL); + TrackingChannelName = createStringObject("__redis__:invalidate",20); + } +} + +/* This function is called after the excution of a readonly command in the + * case the client 'c' has keys tracking enabled. It will populate the + * tracking ivalidation table according to the keys the user fetched, so that + * Redis will know what are the clients that should receive an invalidation + * message with certain groups of keys are modified. */ +void trackingRememberKeys(client *c) { + int numkeys; + int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys); + if (keys == NULL) return; + + for(int j = 0; j < numkeys; j++) { + int idx = keys[j]; + sds sdskey = (sds)ptrFromObj(c->argv[idx]); + uint64_t hash = crc64(0, + (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); + if (TrackingTable[hash] == NULL) + TrackingTable[hash] = raxNew(); + raxTryInsert(TrackingTable[hash], + (unsigned char*)&c->id,sizeof(c->id),NULL,NULL); + } + getKeysFreeResult(keys); +} + +/* This function is called from signalModifiedKey() or other places in Redis + * when a key changes value. In the context of keys tracking, our task here is + * to send a notification to every client that may have keys about such . */ +void trackingInvalidateKey(robj *keyobj) { + sds sdskey = (sds)ptrFromObj(keyobj); + uint64_t hash = crc64(0, + (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); + if (TrackingTable == NULL || TrackingTable[hash] == NULL) return; + + raxIterator ri; + raxStart(&ri,TrackingTable[hash]); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + uint64_t id; + memcpy(&id,ri.key,ri.key_len); + client *c = lookupClientByID(id); + int using_redirection = 0; + if (c->client_tracking_redirection) { + client *redir = lookupClientByID(c->client_tracking_redirection); + if (!redir) { + /* We need to signal to the original connection that we + * are unable to send invalidation messages to the redirected + * connection, because the client no longer exist. */ + if (c->resp > 2) { + addReplyPushLen(c,3); + addReplyBulkCBuffer(c,"tracking-redir-broken",21); + addReplyLongLong(c,c->client_tracking_redirection); + } + continue; + } + c = redir; + using_redirection = 1; + } + + /* Only send such info for clients in RESP version 3 or more. However + * if redirection is active, and the connection we redirect to is + * in Pub/Sub mode, we can support the feature with RESP 2 as well, + * by sending Pub/Sub messages in the __redis__:invalidate channel. */ + if (c->resp > 2) { + addReplyPushLen(c,2); + addReplyBulkCBuffer(c,"invalidate",10); + addReplyLongLong(c,hash); + } else if (using_redirection && c->flags & CLIENT_PUBSUB) { + robj *msg = createStringObjectFromLongLong(hash); + addReplyPubsubMessage(c,TrackingChannelName,msg); + decrRefCount(msg); + } + } + raxStop(&ri); + + /* Free the tracking table: we'll create the radix tree and populate it + * again if more keys will be modified in this hash slot. */ + raxFree(TrackingTable[hash]); + TrackingTable[hash] = NULL; +}