Merge commit '677e95e2f5ee903682ce44a7bfd9558a22577926' into unstable

Former-commit-id: 4602f8c391409e9dd59f1fbee6a5ef011b219ca1
This commit is contained in:
John Sully 2019-07-19 00:01:56 -04:00
commit 4088de0ed2
10 changed files with 284 additions and 11 deletions

View File

@ -209,7 +209,7 @@ endif
REDIS_SERVER_NAME=keydb-server
REDIS_SENTINEL_NAME=keydb-sentinel
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o $(ASM_OBJ)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o $(ASM_OBJ)
REDIS_CLI_NAME=keydb-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ)
REDIS_BENCHMARK_NAME=keydb-benchmark

View File

@ -451,6 +451,7 @@ int selectDb(client *c, int id) {
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
if (g_pserver->tracking_clients) trackingInvalidateKey(key);
}
void signalFlushedDb(int dbid) {

View File

@ -706,7 +706,7 @@ void _serverAssertPrintClientInfo(const client *c) {
bugReportStart();
serverLog(LL_WARNING,"=== ASSERTION FAILED CLIENT CONTEXT ===");
serverLog(LL_WARNING,"client->flags = %d", static_cast<int>(c->flags));
serverLog(LL_WARNING,"client->flags = %llu", static_cast<unsigned long long>(c->flags));
serverLog(LL_WARNING,"client->fd = %d", c->fd);
serverLog(LL_WARNING,"client->argc = %d", c->argc);
for (j=0; j < c->argc; j++) {

View File

@ -64,6 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj);
decrRefCount(keyobj);
g_pserver->stat_expiredkeys++;
return 1;

View File

@ -1252,6 +1252,17 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
return REDISMODULE_OK;
}
/* Reply with a bulk string, taking in input a C buffer pointer that is
* assumed to be null-terminated.
*
* The function always returns REDISMODULE_OK. */
int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReplyBulkCString(c,(char*)buf);
return REDISMODULE_OK;
}
/* Reply with a bulk string, taking in input a RedisModuleString object.
*
* The function always returns REDISMODULE_OK. */
@ -1465,6 +1476,9 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
if (g_pserver->cluster_enabled)
flags |= REDISMODULE_CTX_FLAGS_CLUSTER;
if (g_pserver->loading)
flags |= REDISMODULE_CTX_FLAGS_LOADING;
/* Maxmemory and eviction policy */
if (g_pserver->maxmemory > 0) {
flags |= REDISMODULE_CTX_FLAGS_MAXMEMORY;

View File

@ -174,6 +174,7 @@ client *createClient(int fd, int iel) {
c->bufAsync = NULL;
c->buflenAsync = 0;
c->bufposAsync = 0;
c->client_tracking_redirection = 0;
memset(c->uuid, 0, UUID_BINARY_LEN);
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
@ -1264,6 +1265,9 @@ void unlinkClient(client *c) {
serverAssert(fFound);
c->fPendingAsyncWrite = FALSE;
}
/* Clear the tracking status. */
if (c->flags & CLIENT_TRACKING) disableTracking(c);
}
void freeClient(client *c) {
@ -2245,6 +2249,8 @@ sds catClientInfoString(sds s, client *client) {
if (client->flags & CLIENT_PUBSUB) *p++ = 'P';
if (client->flags & CLIENT_MULTI) *p++ = 'x';
if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
if (client->flags & CLIENT_TRACKING) *p++ = 't';
if (client->flags & CLIENT_TRACKING_BROKEN_REDIR) *p++ = 'R';
if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
@ -2357,6 +2363,7 @@ void clientCommand(client *c) {
"reply (on|off|skip) -- Control the replies sent to the current connection.",
"setname <name> -- Assign the name <name> to the current connection.",
"unblock <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
"tracking (on|off) [REDIRECT <id>] -- Enable client keys tracking for client side caching.",
NULL
};
addReplyHelp(c, help);
@ -2515,21 +2522,57 @@ NULL
} else {
addReply(c,shared.czero);
}
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"setname") && c->argc == 3) {
} else if (!strcasecmp(szFromObj(c->argv[1]),"setname") && c->argc == 3) {
/* CLIENT SETNAME */
if (clientSetNameOrReply(c,c->argv[2]) == C_OK)
addReply(c,shared.ok);
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"getname") && c->argc == 2) {
} else if (!strcasecmp(szFromObj(c->argv[1]),"getname") && c->argc == 2) {
/* CLIENT GETNAME */
if (c->name)
addReplyBulk(c,c->name);
else
addReplyNull(c, shared.nullbulk);
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"pause") && c->argc == 3) {
} else if (!strcasecmp(szFromObj(c->argv[1]),"pause") && c->argc == 3) {
/* CLIENT PAUSE */
long long duration;
if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,UNIT_MILLISECONDS)
!= C_OK) return;
if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,
UNIT_MILLISECONDS) != C_OK) return;
pauseClients(duration);
addReply(c,shared.ok);
} else if (!strcasecmp(szFromObj(c->argv[1]),"tracking") &&
(c->argc == 3 || c->argc == 5))
{
/* CLIENT TRACKING (on|off) [REDIRECT <id>] */
long long redir = 0;
/* Parse the redirection option: we'll require the client with
* the specified ID to exist right now, even if it is possible
* it will get disconnected later. */
if (c->argc == 5) {
if (strcasecmp(szFromObj(c->argv[3]),"redirect") != 0) {
addReply(c,shared.syntaxerr);
return;
} else {
if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) !=
C_OK) return;
if (lookupClientByID(redir) == NULL) {
addReplyError(c,"The client ID you want redirect to "
"does not exist");
return;
}
}
}
if (!strcasecmp(szFromObj(c->argv[2]),"on")) {
enableTracking(c,redir);
} else if (!strcasecmp(szFromObj(c->argv[2]),"off")) {
disableTracking(c);
} else {
addReply(c,shared.syntaxerr);
return;
}
addReply(c,shared.ok);
} else {
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try CLIENT HELP", (char*)ptrFromObj(c->argv[1]));
}

View File

@ -91,6 +91,8 @@ extern "C" {
#define REDISMODULE_CTX_FLAGS_OOM_WARNING (1<<11)
/* The command was sent over the replication link. */
#define REDISMODULE_CTX_FLAGS_REPLICATED (1<<12)
/* Redis is currently loading either from AOF or RDB. */
#define REDISMODULE_CTX_FLAGS_LOADING (1<<13)
#define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */
@ -230,6 +232,7 @@ int REDISMODULE_API_FUNC(RedisModule_ReplyWithSimpleString)(RedisModuleCtx *ctx,
int REDISMODULE_API_FUNC(RedisModule_ReplyWithArray)(RedisModuleCtx *ctx, long len);
void REDISMODULE_API_FUNC(RedisModule_ReplySetArrayLength)(RedisModuleCtx *ctx, long len);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithStringBuffer)(RedisModuleCtx *ctx, const char *buf, size_t len);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithCString)(RedisModuleCtx *ctx, const char *buf);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithString)(RedisModuleCtx *ctx, RedisModuleString *str);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithNull)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithDouble)(RedisModuleCtx *ctx, double d);
@ -380,6 +383,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(ReplyWithArray);
REDISMODULE_GET_API(ReplySetArrayLength);
REDISMODULE_GET_API(ReplyWithStringBuffer);
REDISMODULE_GET_API(ReplyWithCString);
REDISMODULE_GET_API(ReplyWithString);
REDISMODULE_GET_API(ReplyWithNull);
REDISMODULE_GET_API(ReplyWithCallReply);

View File

@ -3375,6 +3375,7 @@ void call(client *c, int flags) {
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
if (flags & CMD_CALL_STATS) {
/* use the real command that was executed (cmd and lastamc) may be
* different, in case of MULTI-EXEC or re-written commands such as
@ -3448,6 +3449,16 @@ void call(client *c, int flags) {
ProcessPendingAsyncWrites();
g_pserver->also_propagate = prev_also_propagate;
/* If the client has keys tracking enabled for client side caching,
* make sure to remember the keys it fetched via this command. */
if (c->cmd->flags & CMD_READONLY) {
client *caller = (c->flags & CLIENT_LUA && g_pserver->lua_caller) ?
g_pserver->lua_caller : c;
if (caller->flags & CLIENT_TRACKING)
trackingRememberKeys(caller);
}
g_pserver->stat_numcommands++;
}

View File

@ -327,8 +327,8 @@ public:
#define AOF_WAIT_REWRITE 2 /* AOF waits rewrite to start appending */
/* Client flags */
#define CLIENT_SLAVE (1<<0) /* This client is a slave server */
#define CLIENT_MASTER (1<<1) /* This client is a master server */
#define CLIENT_SLAVE (1<<0) /* This client is a repliaca */
#define CLIENT_MASTER (1<<1) /* This client is a master */
#define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */
#define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */
#define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
@ -359,7 +359,17 @@ public:
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
#define CLIENT_FORCE_REPLY (1<<29) /* Should addReply be forced to write the text? */
#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put
in the list of clients we can read
from. */
#define CLIENT_PENDING_COMMAND (1<<30) /* Used in threaded I/O to signal after
we return single threaded that the
client has already pending commands
to be executed. */
#define CLIENT_TRACKING (1<<31) /* Client enabled keys tracking in order to
perform client side caching. */
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
#define CLIENT_FORCE_REPLY (1ULL<<33) /* Should addReply be forced to write the text? */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@ -930,7 +940,7 @@ typedef struct client {
time_t ctime; /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
std::atomic<int> flags; /* Client flags: CLIENT_* macros. */
std::atomic<uint64_t> flags; /* Client flags: CLIENT_* macros. */
int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */
@ -966,6 +976,11 @@ typedef struct client {
/* UUIDs are transient and lost when the server is shut down */
unsigned char uuid[UUID_BINARY_LEN];
/* If this client is in tracking mode and this field is non zero,
* invalidation messages for keys fetched by this client will be send to
* the specified client ID. */
uint64_t client_tracking_redirection;
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
@ -1460,6 +1475,8 @@ struct redisServer {
unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/
unsigned int blocked_clients_by_type[BLOCKED_NUM];
list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Client side caching. */
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
@ -1796,6 +1813,7 @@ void addReplyPushLenAsync(client *c, long length);
void addReplyLongLongAsync(client *c, long long ll);
void ProcessPendingAsyncWrites(void);
client *lookupClientByID(uint64_t id);
#ifdef __GNUC__
void addReplyErrorFormat(client *c, const char *fmt, ...)
@ -1807,6 +1825,12 @@ void addReplyErrorFormat(client *c, const char *fmt, ...);
void addReplyStatusFormat(client *c, const char *fmt, ...);
#endif
/* Client side caching (tracking mode) */
void enableTracking(client *c, uint64_t redirect_to);
void disableTracking(client *c);
void trackingRememberKeys(client *c);
void trackingInvalidateKey(robj *keyobj);
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);
void listTypePush(robj *subject, robj *value, int where);
@ -2134,6 +2158,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify);
void freePubsubPattern(const void *p);
int listMatchPubsubPattern(void *a, void *b);
int pubsubPublishMessage(robj *channel, robj *message);
void addReplyPubsubMessage(client *c, robj *channel, robj *msg);
/* Keyspace events notification */
void notifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);

174
src/tracking.cpp Normal file
View File

@ -0,0 +1,174 @@
/* tracking.c - Client side caching: keys tracking and invalidation
*
* Copyright (c) 2019, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "server.h"
/* The tracking table is constituted by 2^24 radix trees (each tree, and the
* table itself, are allocated in a lazy way only when needed) tracking
* clients that may have certain keys in their local, client side, cache.
*
* Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash
* slots, however here the function we use is crc64, taking the least
* significant 24 bits of the output.
*
* When a client enables tracking with "CLIENT TRACKING on", each key served to
* the client is hashed to one of such slots, and Redis will remember what
* client may have keys about such slot. Later, when a key in a given slot is
* modified, all the clients that may have local copies of keys in that slot
* will receive an invalidation message. There is no distinction of database
* number: a single table is used.
*
* Clients will normally take frequently requested objects in memory, removing
* them when invalidation messages are received. A strategy clients may use is
* to just cache objects in a dictionary, associating to each cached object
* some incremental epoch, or just a timestamp. When invalidation messages are
* received clients may store, in a different table, the timestamp (or epoch)
* of the invalidation of such given slot: later when accessing objects, the
* eviction of stale objects may be performed in a lazy way by checking if the
* cached object timestamp is older than the invalidation timestamp for such
* objects.
*
* The output of the 24 bit hash function is very large (more than 16 million
* possible slots), so clients that may want to use less resources may only
* use the most significant bits instead of the full 24 bits. */
#define TRACKING_TABLE_SIZE (1<<24)
rax **TrackingTable = NULL;
robj *TrackingChannelName;
/* Remove the tracking state from the client 'c'. Note that there is not much
* to do for us here, if not to decrement the counter of the clients in
* tracking mode, because we just store the ID of the client in the tracking
* table, so we'll remove the ID reference in a lazy way. Otherwise when a
* client with many entries in the table is removed, it would cost a lot of
* time to do the cleanup. */
void disableTracking(client *c) {
if (c->flags & CLIENT_TRACKING) {
g_pserver->tracking_clients--;
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR);
}
}
/* Enable the tracking state for the client 'c', and as a side effect allocates
* the tracking table if needed. If the 'redirect_to' argument is non zero, the
* invalidation messages for this client will be sent to the client ID
* specified by the 'redirect_to' argument. Note that if such client will
* eventually get freed, we'll send a message to the original client to
* inform it of the condition. Multiple clients can redirect the invalidation
* messages to the same client ID. */
void enableTracking(client *c, uint64_t redirect_to) {
if (c->flags & CLIENT_TRACKING) return;
c->flags |= CLIENT_TRACKING;
c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
c->client_tracking_redirection = redirect_to;
g_pserver->tracking_clients++;
if (TrackingTable == NULL) {
TrackingTable = (rax**)zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE, MALLOC_LOCAL);
TrackingChannelName = createStringObject("__redis__:invalidate",20);
}
}
/* This function is called after the excution of a readonly command in the
* case the client 'c' has keys tracking enabled. It will populate the
* tracking ivalidation table according to the keys the user fetched, so that
* Redis will know what are the clients that should receive an invalidation
* message with certain groups of keys are modified. */
void trackingRememberKeys(client *c) {
int numkeys;
int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys);
if (keys == NULL) return;
for(int j = 0; j < numkeys; j++) {
int idx = keys[j];
sds sdskey = (sds)ptrFromObj(c->argv[idx]);
uint64_t hash = crc64(0,
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
if (TrackingTable[hash] == NULL)
TrackingTable[hash] = raxNew();
raxTryInsert(TrackingTable[hash],
(unsigned char*)&c->id,sizeof(c->id),NULL,NULL);
}
getKeysFreeResult(keys);
}
/* This function is called from signalModifiedKey() or other places in Redis
* when a key changes value. In the context of keys tracking, our task here is
* to send a notification to every client that may have keys about such . */
void trackingInvalidateKey(robj *keyobj) {
sds sdskey = (sds)ptrFromObj(keyobj);
uint64_t hash = crc64(0,
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
if (TrackingTable == NULL || TrackingTable[hash] == NULL) return;
raxIterator ri;
raxStart(&ri,TrackingTable[hash]);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
uint64_t id;
memcpy(&id,ri.key,ri.key_len);
client *c = lookupClientByID(id);
int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
if (!redir) {
/* We need to signal to the original connection that we
* are unable to send invalidation messages to the redirected
* connection, because the client no longer exist. */
if (c->resp > 2) {
addReplyPushLen(c,3);
addReplyBulkCBuffer(c,"tracking-redir-broken",21);
addReplyLongLong(c,c->client_tracking_redirection);
}
continue;
}
c = redir;
using_redirection = 1;
}
/* Only send such info for clients in RESP version 3 or more. However
* if redirection is active, and the connection we redirect to is
* in Pub/Sub mode, we can support the feature with RESP 2 as well,
* by sending Pub/Sub messages in the __redis__:invalidate channel. */
if (c->resp > 2) {
addReplyPushLen(c,2);
addReplyBulkCBuffer(c,"invalidate",10);
addReplyLongLong(c,hash);
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
robj *msg = createStringObjectFromLongLong(hash);
addReplyPubsubMessage(c,TrackingChannelName,msg);
decrRefCount(msg);
}
}
raxStop(&ri);
/* Free the tracking table: we'll create the radix tree and populate it
* again if more keys will be modified in this hash slot. */
raxFree(TrackingTable[hash]);
TrackingTable[hash] = NULL;
}