Merge branch 'csc2' into unstable
This commit is contained in:
commit
090bc0c1a3
@ -2160,7 +2160,7 @@ standardConfig configs[] = {
|
|||||||
createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL),
|
createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL),
|
||||||
createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL),
|
createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL),
|
||||||
createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL),
|
createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL),
|
||||||
createIntConfig("tracking-table-max-fill", NULL, MODIFIABLE_CONFIG, 0, 100, server.tracking_table_max_fill, 10, INTEGER_CONFIG, NULL, NULL), /* Default: 10% tracking table max fill. */
|
createIntConfig("tracking-table-max-fill", NULL, MODIFIABLE_CONFIG, 0, 100, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 10% tracking table max number of keys tracked. */
|
||||||
createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, server.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */
|
createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, server.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */
|
||||||
createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ),
|
createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ),
|
||||||
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves),
|
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves),
|
||||||
|
@ -154,6 +154,7 @@ client *createClient(connection *conn) {
|
|||||||
c->peerid = NULL;
|
c->peerid = NULL;
|
||||||
c->client_list_node = NULL;
|
c->client_list_node = NULL;
|
||||||
c->client_tracking_redirection = 0;
|
c->client_tracking_redirection = 0;
|
||||||
|
c->client_tracking_prefixes = NULL;
|
||||||
c->auth_callback = NULL;
|
c->auth_callback = NULL;
|
||||||
c->auth_callback_privdata = NULL;
|
c->auth_callback_privdata = NULL;
|
||||||
c->auth_module = NULL;
|
c->auth_module = NULL;
|
||||||
@ -2027,7 +2028,6 @@ int clientSetNameOrReply(client *c, robj *name) {
|
|||||||
void clientCommand(client *c) {
|
void clientCommand(client *c) {
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listIter li;
|
listIter li;
|
||||||
client *client;
|
|
||||||
|
|
||||||
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
|
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
|
||||||
const char *help[] = {
|
const char *help[] = {
|
||||||
@ -2141,7 +2141,7 @@ NULL
|
|||||||
/* Iterate clients killing all the matching clients. */
|
/* Iterate clients killing all the matching clients. */
|
||||||
listRewind(server.clients,&li);
|
listRewind(server.clients,&li);
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
client = listNodeValue(ln);
|
client *client = listNodeValue(ln);
|
||||||
if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
|
if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
|
||||||
if (type != -1 && getClientType(client) != type) continue;
|
if (type != -1 && getClientType(client) != type) continue;
|
||||||
if (id != 0 && client->id != id) continue;
|
if (id != 0 && client->id != id) continue;
|
||||||
@ -2219,38 +2219,74 @@ NULL
|
|||||||
UNIT_MILLISECONDS) != C_OK) return;
|
UNIT_MILLISECONDS) != C_OK) return;
|
||||||
pauseClients(duration);
|
pauseClients(duration);
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") &&
|
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
|
||||||
(c->argc == 3 || c->argc == 5))
|
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
|
||||||
{
|
* [PREFIX second] ... */
|
||||||
/* CLIENT TRACKING (on|off) [REDIRECT <id>] */
|
|
||||||
long long redir = 0;
|
long long redir = 0;
|
||||||
|
int bcast = 0;
|
||||||
|
robj **prefix = NULL;
|
||||||
|
size_t numprefix = 0;
|
||||||
|
|
||||||
/* Parse the redirection option: we'll require the client with
|
/* Parse the options. */
|
||||||
* the specified ID to exist right now, even if it is possible
|
for (int j = 3; j < c->argc; j++) {
|
||||||
* it will get disconnected later. */
|
int moreargs = (c->argc-1) - j;
|
||||||
if (c->argc == 5) {
|
|
||||||
if (strcasecmp(c->argv[3]->ptr,"redirect") != 0) {
|
if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
|
||||||
addReply(c,shared.syntaxerr);
|
j++;
|
||||||
return;
|
if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) !=
|
||||||
} else {
|
|
||||||
if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) !=
|
|
||||||
C_OK) return;
|
C_OK) return;
|
||||||
|
/* We will require the client with the specified ID to exist
|
||||||
|
* right now, even if it is possible that it gets disconnected
|
||||||
|
* later. Still a valid sanity check. */
|
||||||
if (lookupClientByID(redir) == NULL) {
|
if (lookupClientByID(redir) == NULL) {
|
||||||
addReplyError(c,"The client ID you want redirect to "
|
addReplyError(c,"The client ID you want redirect to "
|
||||||
"does not exist");
|
"does not exist");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
} else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
|
||||||
}
|
bcast++;
|
||||||
|
} else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
|
||||||
if (!strcasecmp(c->argv[2]->ptr,"on")) {
|
j++;
|
||||||
enableTracking(c,redir);
|
prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
|
||||||
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
|
prefix[numprefix++] = c->argv[j];
|
||||||
disableTracking(c);
|
|
||||||
} else {
|
} else {
|
||||||
|
zfree(prefix);
|
||||||
addReply(c,shared.syntaxerr);
|
addReply(c,shared.syntaxerr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Options are ok: enable or disable the tracking for this client. */
|
||||||
|
if (!strcasecmp(c->argv[2]->ptr,"on")) {
|
||||||
|
/* Before enabling tracking, make sure options are compatible
|
||||||
|
* among each other and with the current state of the client. */
|
||||||
|
if (!bcast && numprefix) {
|
||||||
|
addReplyError(c,
|
||||||
|
"PREFIX option requires BCAST mode to be enabled");
|
||||||
|
zfree(prefix);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c->flags & CLIENT_TRACKING) {
|
||||||
|
int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST);
|
||||||
|
if (oldbcast != bcast) {
|
||||||
|
addReplyError(c,
|
||||||
|
"You can't switch BCAST mode on/off before disabling "
|
||||||
|
"tracking for this client, and then re-enabling it with "
|
||||||
|
"a different mode.");
|
||||||
|
zfree(prefix);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
enableTracking(c,redir,bcast,prefix,numprefix);
|
||||||
|
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
|
||||||
|
disableTracking(c);
|
||||||
|
} else {
|
||||||
|
zfree(prefix);
|
||||||
|
addReply(c,shared.syntaxerr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
zfree(prefix);
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
|
||||||
/* CLIENT GETREDIR */
|
/* CLIENT GETREDIR */
|
||||||
|
@ -35,7 +35,11 @@ int clientSubscriptionsCount(client *c);
|
|||||||
* Pubsub client replies API
|
* Pubsub client replies API
|
||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
|
|
||||||
/* Send a pubsub message of type "message" to the client. */
|
/* Send a pubsub message of type "message" to the client.
|
||||||
|
* Normally 'msg' is a Redis object containing the string to send as
|
||||||
|
* message. However if the caller sets 'msg' as NULL, it will be able
|
||||||
|
* to send a special message (for instance an Array type) by using the
|
||||||
|
* addReply*() API family. */
|
||||||
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
|
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
|
||||||
if (c->resp == 2)
|
if (c->resp == 2)
|
||||||
addReply(c,shared.mbulkhdr[3]);
|
addReply(c,shared.mbulkhdr[3]);
|
||||||
@ -43,7 +47,7 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
|
|||||||
addReplyPushLen(c,3);
|
addReplyPushLen(c,3);
|
||||||
addReply(c,shared.messagebulk);
|
addReply(c,shared.messagebulk);
|
||||||
addReplyBulk(c,channel);
|
addReplyBulk(c,channel);
|
||||||
addReplyBulk(c,msg);
|
if (msg) addReplyBulk(c,msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send a pubsub message of type "pmessage" to the client. The difference
|
/* Send a pubsub message of type "pmessage" to the client. The difference
|
||||||
|
@ -1766,6 +1766,7 @@ int raxRandomWalk(raxIterator *it, size_t steps) {
|
|||||||
if (n->iskey) steps--;
|
if (n->iskey) steps--;
|
||||||
}
|
}
|
||||||
it->node = n;
|
it->node = n;
|
||||||
|
it->data = raxGetData(it->node);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
15
src/server.c
15
src/server.c
@ -2124,6 +2124,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
if (listLength(server.unblocked_clients))
|
if (listLength(server.unblocked_clients))
|
||||||
processUnblockedClients();
|
processUnblockedClients();
|
||||||
|
|
||||||
|
/* Send the invalidation messages to clients participating to the
|
||||||
|
* client side caching protocol in broadcasting (BCAST) mode. */
|
||||||
|
trackingBroadcastInvalidationMessages();
|
||||||
|
|
||||||
/* Write the AOF buffer on disk */
|
/* Write the AOF buffer on disk */
|
||||||
flushAppendOnlyFile(0);
|
flushAppendOnlyFile(0);
|
||||||
|
|
||||||
@ -3310,9 +3314,12 @@ void call(client *c, int flags) {
|
|||||||
if (c->cmd->flags & CMD_READONLY) {
|
if (c->cmd->flags & CMD_READONLY) {
|
||||||
client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ?
|
client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ?
|
||||||
server.lua_caller : c;
|
server.lua_caller : c;
|
||||||
if (caller->flags & CLIENT_TRACKING)
|
if (caller->flags & CLIENT_TRACKING &&
|
||||||
|
!(caller->flags & CLIENT_TRACKING_BCAST))
|
||||||
|
{
|
||||||
trackingRememberKeys(caller);
|
trackingRememberKeys(caller);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
server.fixed_time_expire--;
|
server.fixed_time_expire--;
|
||||||
server.stat_numcommands++;
|
server.stat_numcommands++;
|
||||||
@ -4221,7 +4228,8 @@ sds genRedisInfoString(const char *section) {
|
|||||||
"active_defrag_misses:%lld\r\n"
|
"active_defrag_misses:%lld\r\n"
|
||||||
"active_defrag_key_hits:%lld\r\n"
|
"active_defrag_key_hits:%lld\r\n"
|
||||||
"active_defrag_key_misses:%lld\r\n"
|
"active_defrag_key_misses:%lld\r\n"
|
||||||
"tracking_used_slots:%lld\r\n",
|
"tracking_total_keys:%lld\r\n"
|
||||||
|
"tracking_total_items:%lld\r\n",
|
||||||
server.stat_numconnections,
|
server.stat_numconnections,
|
||||||
server.stat_numcommands,
|
server.stat_numcommands,
|
||||||
getInstantaneousMetric(STATS_METRIC_COMMAND),
|
getInstantaneousMetric(STATS_METRIC_COMMAND),
|
||||||
@ -4249,7 +4257,8 @@ sds genRedisInfoString(const char *section) {
|
|||||||
server.stat_active_defrag_misses,
|
server.stat_active_defrag_misses,
|
||||||
server.stat_active_defrag_key_hits,
|
server.stat_active_defrag_key_hits,
|
||||||
server.stat_active_defrag_key_misses,
|
server.stat_active_defrag_key_misses,
|
||||||
trackingGetUsedSlots());
|
(unsigned long long) trackingGetTotalKeys(),
|
||||||
|
(unsigned long long) trackingGetTotalItems());
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Replication */
|
/* Replication */
|
||||||
|
13
src/server.h
13
src/server.h
@ -247,6 +247,7 @@ typedef long long ustime_t; /* microsecond time type. */
|
|||||||
#define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to
|
#define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to
|
||||||
perform client side caching. */
|
perform client side caching. */
|
||||||
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
|
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
|
||||||
|
#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */
|
||||||
|
|
||||||
/* Client block type (btype field in client structure)
|
/* Client block type (btype field in client structure)
|
||||||
* if CLIENT_BLOCKED flag is set. */
|
* if CLIENT_BLOCKED flag is set. */
|
||||||
@ -822,7 +823,9 @@ typedef struct client {
|
|||||||
* invalidation messages for keys fetched by this client will be send to
|
* invalidation messages for keys fetched by this client will be send to
|
||||||
* the specified client ID. */
|
* the specified client ID. */
|
||||||
uint64_t client_tracking_redirection;
|
uint64_t client_tracking_redirection;
|
||||||
|
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
|
||||||
|
subscribed to in BCAST mode, in the
|
||||||
|
context of client side caching. */
|
||||||
/* Response buffer */
|
/* Response buffer */
|
||||||
int bufpos;
|
int bufpos;
|
||||||
char buf[PROTO_REPLY_CHUNK_BYTES];
|
char buf[PROTO_REPLY_CHUNK_BYTES];
|
||||||
@ -1306,7 +1309,7 @@ struct redisServer {
|
|||||||
list *ready_keys; /* List of readyList structures for BLPOP & co */
|
list *ready_keys; /* List of readyList structures for BLPOP & co */
|
||||||
/* Client side caching. */
|
/* Client side caching. */
|
||||||
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
|
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
|
||||||
int tracking_table_max_fill; /* Max fill percentage. */
|
int tracking_table_max_keys; /* Max number of keys in tracking table. */
|
||||||
/* Sort parameters - qsort_r() is only available under BSD so we
|
/* Sort parameters - qsort_r() is only available under BSD so we
|
||||||
* have to take this state global, in order to pass it to sortCompare() */
|
* have to take this state global, in order to pass it to sortCompare() */
|
||||||
int sort_desc;
|
int sort_desc;
|
||||||
@ -1648,13 +1651,15 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* Client side caching (tracking mode) */
|
/* Client side caching (tracking mode) */
|
||||||
void enableTracking(client *c, uint64_t redirect_to);
|
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix);
|
||||||
void disableTracking(client *c);
|
void disableTracking(client *c);
|
||||||
void trackingRememberKeys(client *c);
|
void trackingRememberKeys(client *c);
|
||||||
void trackingInvalidateKey(robj *keyobj);
|
void trackingInvalidateKey(robj *keyobj);
|
||||||
void trackingInvalidateKeysOnFlush(int dbid);
|
void trackingInvalidateKeysOnFlush(int dbid);
|
||||||
void trackingLimitUsedSlots(void);
|
void trackingLimitUsedSlots(void);
|
||||||
unsigned long long trackingGetUsedSlots(void);
|
uint64_t trackingGetTotalItems(void);
|
||||||
|
uint64_t trackingGetTotalKeys(void);
|
||||||
|
void trackingBroadcastInvalidationMessages(void);
|
||||||
|
|
||||||
/* List data type */
|
/* List data type */
|
||||||
void listTypeTryConversion(robj *subject, robj *value);
|
void listTypeTryConversion(robj *subject, robj *value);
|
||||||
|
375
src/tracking.c
375
src/tracking.c
@ -30,39 +30,34 @@
|
|||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
|
||||||
/* The tracking table is constituted by 2^24 radix trees (each tree, and the
|
/* The tracking table is constituted by a radix tree of keys, each pointing
|
||||||
* table itself, are allocated in a lazy way only when needed) tracking
|
* to a radix tree of client IDs, used to track the clients that may have
|
||||||
* clients that may have certain keys in their local, client side, cache.
|
* certain keys in their local, client side, cache.
|
||||||
*
|
|
||||||
* Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash
|
|
||||||
* slots, however here the function we use is crc64, taking the least
|
|
||||||
* significant 24 bits of the output.
|
|
||||||
*
|
*
|
||||||
* When a client enables tracking with "CLIENT TRACKING on", each key served to
|
* When a client enables tracking with "CLIENT TRACKING on", each key served to
|
||||||
* the client is hashed to one of such slots, and Redis will remember what
|
* the client is remembered in the table mapping the keys to the client IDs.
|
||||||
* client may have keys about such slot. Later, when a key in a given slot is
|
* Later, when a key is modified, all the clients that may have local copy
|
||||||
* modified, all the clients that may have local copies of keys in that slot
|
* of such key will receive an invalidation message.
|
||||||
* will receive an invalidation message. There is no distinction of database
|
|
||||||
* number: a single table is used.
|
|
||||||
*
|
*
|
||||||
* Clients will normally take frequently requested objects in memory, removing
|
* Clients will normally take frequently requested objects in memory, removing
|
||||||
* them when invalidation messages are received. A strategy clients may use is
|
* them when invalidation messages are received. */
|
||||||
* to just cache objects in a dictionary, associating to each cached object
|
rax *TrackingTable = NULL;
|
||||||
* some incremental epoch, or just a timestamp. When invalidation messages are
|
rax *PrefixTable = NULL;
|
||||||
* received clients may store, in a different table, the timestamp (or epoch)
|
uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
|
||||||
* of the invalidation of such given slot: later when accessing objects, the
|
the whole tracking table. This givesn
|
||||||
* eviction of stale objects may be performed in a lazy way by checking if the
|
an hint about the total memory we
|
||||||
* cached object timestamp is older than the invalidation timestamp for such
|
are using server side for CSC. */
|
||||||
* objects.
|
|
||||||
*
|
|
||||||
* The output of the 24 bit hash function is very large (more than 16 million
|
|
||||||
* possible slots), so clients that may want to use less resources may only
|
|
||||||
* use the most significant bits instead of the full 24 bits. */
|
|
||||||
#define TRACKING_TABLE_SIZE (1<<24)
|
|
||||||
rax **TrackingTable = NULL;
|
|
||||||
unsigned long TrackingTableUsedSlots = 0;
|
|
||||||
robj *TrackingChannelName;
|
robj *TrackingChannelName;
|
||||||
|
|
||||||
|
/* This is the structure that we have as value of the PrefixTable, and
|
||||||
|
* represents the list of keys modified, and the list of clients that need
|
||||||
|
* to be notified, for a given prefix. */
|
||||||
|
typedef struct bcastState {
|
||||||
|
rax *keys; /* Keys modified in the current event loop cycle. */
|
||||||
|
rax *clients; /* Clients subscribed to the notification events for this
|
||||||
|
prefix. */
|
||||||
|
} bcastState;
|
||||||
|
|
||||||
/* Remove the tracking state from the client 'c'. Note that there is not much
|
/* Remove the tracking state from the client 'c'. Note that there is not much
|
||||||
* to do for us here, if not to decrement the counter of the clients in
|
* to do for us here, if not to decrement the counter of the clients in
|
||||||
* tracking mode, because we just store the ID of the client in the tracking
|
* tracking mode, because we just store the ID of the client in the tracking
|
||||||
@ -70,9 +65,55 @@ robj *TrackingChannelName;
|
|||||||
* client with many entries in the table is removed, it would cost a lot of
|
* client with many entries in the table is removed, it would cost a lot of
|
||||||
* time to do the cleanup. */
|
* time to do the cleanup. */
|
||||||
void disableTracking(client *c) {
|
void disableTracking(client *c) {
|
||||||
|
/* If this client is in broadcasting mode, we need to unsubscribe it
|
||||||
|
* from all the prefixes it is registered to. */
|
||||||
|
if (c->flags & CLIENT_TRACKING_BCAST) {
|
||||||
|
raxIterator ri;
|
||||||
|
raxStart(&ri,c->client_tracking_prefixes);
|
||||||
|
raxSeek(&ri,"^",NULL,0);
|
||||||
|
while(raxNext(&ri)) {
|
||||||
|
bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len);
|
||||||
|
serverAssert(bs != raxNotFound);
|
||||||
|
raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL);
|
||||||
|
/* Was it the last client? Remove the prefix from the
|
||||||
|
* table. */
|
||||||
|
if (raxSize(bs->clients) == 0) {
|
||||||
|
raxFree(bs->clients);
|
||||||
|
raxFree(bs->keys);
|
||||||
|
zfree(bs);
|
||||||
|
raxRemove(PrefixTable,ri.key,ri.key_len,NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
raxStop(&ri);
|
||||||
|
raxFree(c->client_tracking_prefixes);
|
||||||
|
c->client_tracking_prefixes = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Clear flags and adjust the count. */
|
||||||
if (c->flags & CLIENT_TRACKING) {
|
if (c->flags & CLIENT_TRACKING) {
|
||||||
server.tracking_clients--;
|
server.tracking_clients--;
|
||||||
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR);
|
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
|
||||||
|
CLIENT_TRACKING_BCAST);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
|
||||||
|
* already registered for the specified prefix, no operation is performed. */
|
||||||
|
void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
|
||||||
|
bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix));
|
||||||
|
/* If this is the first client subscribing to such prefix, create
|
||||||
|
* the prefix in the table. */
|
||||||
|
if (bs == raxNotFound) {
|
||||||
|
bs = zmalloc(sizeof(*bs));
|
||||||
|
bs->keys = raxNew();
|
||||||
|
bs->clients = raxNew();
|
||||||
|
raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
|
||||||
|
}
|
||||||
|
if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
|
||||||
|
if (c->client_tracking_prefixes == NULL)
|
||||||
|
c->client_tracking_prefixes = raxNew();
|
||||||
|
raxInsert(c->client_tracking_prefixes,
|
||||||
|
(unsigned char*)prefix,plen,NULL,NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,16 +124,25 @@ void disableTracking(client *c) {
|
|||||||
* eventually get freed, we'll send a message to the original client to
|
* eventually get freed, we'll send a message to the original client to
|
||||||
* inform it of the condition. Multiple clients can redirect the invalidation
|
* inform it of the condition. Multiple clients can redirect the invalidation
|
||||||
* messages to the same client ID. */
|
* messages to the same client ID. */
|
||||||
void enableTracking(client *c, uint64_t redirect_to) {
|
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) {
|
||||||
if (c->flags & CLIENT_TRACKING) return;
|
if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
|
||||||
c->flags |= CLIENT_TRACKING;
|
c->flags |= CLIENT_TRACKING;
|
||||||
c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
|
c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST);
|
||||||
c->client_tracking_redirection = redirect_to;
|
c->client_tracking_redirection = redirect_to;
|
||||||
server.tracking_clients++;
|
|
||||||
if (TrackingTable == NULL) {
|
if (TrackingTable == NULL) {
|
||||||
TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE);
|
TrackingTable = raxNew();
|
||||||
|
PrefixTable = raxNew();
|
||||||
TrackingChannelName = createStringObject("__redis__:invalidate",20);
|
TrackingChannelName = createStringObject("__redis__:invalidate",20);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (bcast) {
|
||||||
|
c->flags |= CLIENT_TRACKING_BCAST;
|
||||||
|
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
|
||||||
|
for (size_t j = 0; j < numprefix; j++) {
|
||||||
|
sds sdsprefix = prefix[j]->ptr;
|
||||||
|
enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called after the excution of a readonly command in the
|
/* This function is called after the excution of a readonly command in the
|
||||||
@ -108,19 +158,30 @@ void trackingRememberKeys(client *c) {
|
|||||||
for(int j = 0; j < numkeys; j++) {
|
for(int j = 0; j < numkeys; j++) {
|
||||||
int idx = keys[j];
|
int idx = keys[j];
|
||||||
sds sdskey = c->argv[idx]->ptr;
|
sds sdskey = c->argv[idx]->ptr;
|
||||||
uint64_t hash = crc64(0,
|
rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
|
||||||
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
|
if (ids == raxNotFound) {
|
||||||
if (TrackingTable[hash] == NULL) {
|
ids = raxNew();
|
||||||
TrackingTable[hash] = raxNew();
|
int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey,
|
||||||
TrackingTableUsedSlots++;
|
sdslen(sdskey),ids, NULL);
|
||||||
|
serverAssert(inserted == 1);
|
||||||
}
|
}
|
||||||
raxTryInsert(TrackingTable[hash],
|
if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL))
|
||||||
(unsigned char*)&c->id,sizeof(c->id),NULL,NULL);
|
TrackingTableTotalItems++;
|
||||||
}
|
}
|
||||||
getKeysFreeResult(keys);
|
getKeysFreeResult(keys);
|
||||||
}
|
}
|
||||||
|
|
||||||
void sendTrackingMessage(client *c, long long hash) {
|
/* Given a key name, this function sends an invalidation message in the
|
||||||
|
* proper channel (depending on RESP version: PubSub or Push message) and
|
||||||
|
* to the proper client (in case fo redirection), in the context of the
|
||||||
|
* client 'c' with tracking enabled.
|
||||||
|
*
|
||||||
|
* In case the 'proto' argument is non zero, the function will assume that
|
||||||
|
* 'keyname' points to a buffer of 'keylen' bytes already expressed in the
|
||||||
|
* form of Redis RESP protocol, representing an array of keys to send
|
||||||
|
* to the client as value of the invalidation. This is used in BCAST mode
|
||||||
|
* in order to optimized the implementation to use less CPU time. */
|
||||||
|
void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
|
||||||
int using_redirection = 0;
|
int using_redirection = 0;
|
||||||
if (c->client_tracking_redirection) {
|
if (c->client_tracking_redirection) {
|
||||||
client *redir = lookupClientByID(c->client_tracking_redirection);
|
client *redir = lookupClientByID(c->client_tracking_redirection);
|
||||||
@ -146,36 +207,45 @@ void sendTrackingMessage(client *c, long long hash) {
|
|||||||
if (c->resp > 2) {
|
if (c->resp > 2) {
|
||||||
addReplyPushLen(c,2);
|
addReplyPushLen(c,2);
|
||||||
addReplyBulkCBuffer(c,"invalidate",10);
|
addReplyBulkCBuffer(c,"invalidate",10);
|
||||||
addReplyLongLong(c,hash);
|
|
||||||
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
|
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
|
||||||
robj *msg = createStringObjectFromLongLong(hash);
|
/* We use a static object to speedup things, however we assume
|
||||||
addReplyPubsubMessage(c,TrackingChannelName,msg);
|
* that addReplyPubsubMessage() will not take a reference. */
|
||||||
decrRefCount(msg);
|
addReplyPubsubMessage(c,TrackingChannelName,NULL);
|
||||||
|
} else {
|
||||||
|
/* If are here, the client is not using RESP3, nor is
|
||||||
|
* redirecting to another client. We can't send anything to
|
||||||
|
* it since RESP2 does not support push messages in the same
|
||||||
|
* connection. */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Send the "value" part, which is the array of keys. */
|
||||||
|
if (proto) {
|
||||||
|
addReplyProto(c,keyname,keylen);
|
||||||
|
} else {
|
||||||
|
addReplyArrayLen(c,1);
|
||||||
|
addReplyBulkCBuffer(c,keyname,keylen);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Invalidates a caching slot: this is actually the low level implementation
|
/* This function is called when a key is modified in Redis and in the case
|
||||||
* of the API that Redis calls externally, that is trackingInvalidateKey(). */
|
* we have at least one client with the BCAST mode enabled.
|
||||||
void trackingInvalidateSlot(uint64_t slot) {
|
* Its goal is to set the key in the right broadcast state if the key
|
||||||
if (TrackingTable == NULL || TrackingTable[slot] == NULL) return;
|
* matches one or more prefixes in the prefix table. Later when we
|
||||||
|
* return to the event loop, we'll send invalidation messages to the
|
||||||
|
* clients subscribed to each prefix. */
|
||||||
|
void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
|
||||||
raxIterator ri;
|
raxIterator ri;
|
||||||
raxStart(&ri,TrackingTable[slot]);
|
raxStart(&ri,PrefixTable);
|
||||||
raxSeek(&ri,"^",NULL,0);
|
raxSeek(&ri,"^",NULL,0);
|
||||||
while(raxNext(&ri)) {
|
while(raxNext(&ri)) {
|
||||||
uint64_t id;
|
if (ri.key_len > keylen) continue;
|
||||||
memcpy(&id,ri.key,sizeof(id));
|
if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
|
||||||
client *c = lookupClientByID(id);
|
continue;
|
||||||
if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
|
bcastState *bs = ri.data;
|
||||||
sendTrackingMessage(c,slot);
|
raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL);
|
||||||
}
|
}
|
||||||
raxStop(&ri);
|
raxStop(&ri);
|
||||||
|
|
||||||
/* Free the tracking table: we'll create the radix tree and populate it
|
|
||||||
* again if more keys will be modified in this caching slot. */
|
|
||||||
raxFree(TrackingTable[slot]);
|
|
||||||
TrackingTable[slot] = NULL;
|
|
||||||
TrackingTableUsedSlots--;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called from signalModifiedKey() or other places in Redis
|
/* This function is called from signalModifiedKey() or other places in Redis
|
||||||
@ -183,12 +253,42 @@ void trackingInvalidateSlot(uint64_t slot) {
|
|||||||
* to send a notification to every client that may have keys about such caching
|
* to send a notification to every client that may have keys about such caching
|
||||||
* slot. */
|
* slot. */
|
||||||
void trackingInvalidateKey(robj *keyobj) {
|
void trackingInvalidateKey(robj *keyobj) {
|
||||||
if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return;
|
if (TrackingTable == NULL) return;
|
||||||
|
|
||||||
sds sdskey = keyobj->ptr;
|
sds sdskey = keyobj->ptr;
|
||||||
uint64_t hash = crc64(0,
|
|
||||||
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
|
if (raxSize(PrefixTable) > 0)
|
||||||
trackingInvalidateSlot(hash);
|
trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey));
|
||||||
|
|
||||||
|
rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
|
||||||
|
if (ids == raxNotFound) return;;
|
||||||
|
|
||||||
|
raxIterator ri;
|
||||||
|
raxStart(&ri,ids);
|
||||||
|
raxSeek(&ri,"^",NULL,0);
|
||||||
|
while(raxNext(&ri)) {
|
||||||
|
uint64_t id;
|
||||||
|
memcpy(&id,ri.key,sizeof(id));
|
||||||
|
client *c = lookupClientByID(id);
|
||||||
|
/* Note that if the client is in BCAST mode, we don't want to
|
||||||
|
* send invalidation messages that were pending in the case
|
||||||
|
* previously the client was not in BCAST mode. This can happen if
|
||||||
|
* TRACKING is enabled normally, and then the client switches to
|
||||||
|
* BCAST mode. */
|
||||||
|
if (c == NULL ||
|
||||||
|
!(c->flags & CLIENT_TRACKING)||
|
||||||
|
c->flags & CLIENT_TRACKING_BCAST)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
sendTrackingMessage(c,sdskey,sdslen(sdskey),0);
|
||||||
|
}
|
||||||
|
raxStop(&ri);
|
||||||
|
|
||||||
|
/* Free the tracking table: we'll create the radix tree and populate it
|
||||||
|
* again if more keys will be modified in this caching slot. */
|
||||||
|
TrackingTableTotalItems -= raxSize(ids);
|
||||||
|
raxFree(ids);
|
||||||
|
raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called when one or all the Redis databases are flushed
|
/* This function is called when one or all the Redis databases are flushed
|
||||||
@ -205,6 +305,10 @@ void trackingInvalidateKey(robj *keyobj) {
|
|||||||
* we just send the invalidation message to all the clients, but don't
|
* we just send the invalidation message to all the clients, but don't
|
||||||
* flush the table: it will slowly get garbage collected as more keys
|
* flush the table: it will slowly get garbage collected as more keys
|
||||||
* are modified in the used caching slots. */
|
* are modified in the used caching slots. */
|
||||||
|
void freeTrackingRadixTree(void *rt) {
|
||||||
|
raxFree(rt);
|
||||||
|
}
|
||||||
|
|
||||||
void trackingInvalidateKeysOnFlush(int dbid) {
|
void trackingInvalidateKeysOnFlush(int dbid) {
|
||||||
if (server.tracking_clients) {
|
if (server.tracking_clients) {
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
@ -213,84 +317,129 @@ void trackingInvalidateKeysOnFlush(int dbid) {
|
|||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
client *c = listNodeValue(ln);
|
client *c = listNodeValue(ln);
|
||||||
if (c->flags & CLIENT_TRACKING) {
|
if (c->flags & CLIENT_TRACKING) {
|
||||||
sendTrackingMessage(c,-1);
|
sendTrackingMessage(c,"",1,0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
|
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
|
||||||
if (dbid == -1 && TrackingTable) {
|
if (dbid == -1 && TrackingTable) {
|
||||||
for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) {
|
raxFreeWithCallback(TrackingTable,freeTrackingRadixTree);
|
||||||
if (TrackingTable[j] != NULL) {
|
TrackingTableTotalItems = 0;
|
||||||
raxFree(TrackingTable[j]);
|
|
||||||
TrackingTable[j] = NULL;
|
|
||||||
TrackingTableUsedSlots--;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* If there are no clients with tracking enabled, we can even
|
|
||||||
* reclaim the memory used by the table itself. The code assumes
|
|
||||||
* the table is allocated only if there is at least one client alive
|
|
||||||
* with tracking enabled. */
|
|
||||||
if (server.tracking_clients == 0) {
|
|
||||||
zfree(TrackingTable);
|
|
||||||
TrackingTable = NULL;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Tracking forces Redis to remember information about which client may have
|
/* Tracking forces Redis to remember information about which client may have
|
||||||
* keys about certian caching slots. In workloads where there are a lot of
|
* certain keys. In workloads where there are a lot of reads, but keys are
|
||||||
* reads, but keys are hardly modified, the amount of information we have
|
* hardly modified, the amount of information we have to remember server side
|
||||||
* to remember server side could be a lot: for each 16 millions of caching
|
* could be a lot, with the number of keys being totally not bound.
|
||||||
* slots we may end with a radix tree containing many entries.
|
|
||||||
*
|
*
|
||||||
* So Redis allows the user to configure a maximum fill rate for the
|
* So Redis allows the user to configure a maximum number of keys for the
|
||||||
* invalidation table. This function makes sure that we don't go over the
|
* invalidation table. This function makes sure that we don't go over the
|
||||||
* specified fill rate: if we are over, we can just evict informations about
|
* specified fill rate: if we are over, we can just evict informations about
|
||||||
* random caching slots, and send invalidation messages to clients like if
|
* a random key, and send invalidation messages to clients like if the key was
|
||||||
* the key was modified. */
|
* modified. */
|
||||||
void trackingLimitUsedSlots(void) {
|
void trackingLimitUsedSlots(void) {
|
||||||
static unsigned int timeout_counter = 0;
|
static unsigned int timeout_counter = 0;
|
||||||
|
if (TrackingTable == NULL) return;
|
||||||
if (server.tracking_table_max_fill == 0) return; /* No limits set. */
|
if (server.tracking_table_max_keys == 0) return; /* No limits set. */
|
||||||
unsigned int max_slots =
|
size_t max_keys = server.tracking_table_max_keys;
|
||||||
(TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill;
|
if (raxSize(TrackingTable) <= max_keys) {
|
||||||
if (TrackingTableUsedSlots <= max_slots) {
|
|
||||||
timeout_counter = 0;
|
timeout_counter = 0;
|
||||||
return; /* Limit not reached. */
|
return; /* Limit not reached. */
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We have to invalidate a few slots to reach the limit again. The effort
|
/* We have to invalidate a few keys to reach the limit again. The effort
|
||||||
* we do here is proportional to the number of times we entered this
|
* we do here is proportional to the number of times we entered this
|
||||||
* function and found that we are still over the limit. */
|
* function and found that we are still over the limit. */
|
||||||
int effort = 100 * (timeout_counter+1);
|
int effort = 100 * (timeout_counter+1);
|
||||||
|
|
||||||
/* Let's start at a random position, and perform linear probing, in order
|
/* We just remove one key after another by using a random walk. */
|
||||||
* to improve cache locality. However once we are able to find an used
|
raxIterator ri;
|
||||||
* slot, jump again randomly, in order to avoid creating big holes in the
|
raxStart(&ri,TrackingTable);
|
||||||
* table (that will make this funciton use more resourced later). */
|
|
||||||
while(effort > 0) {
|
while(effort > 0) {
|
||||||
unsigned int idx = rand() % TRACKING_TABLE_SIZE;
|
|
||||||
do {
|
|
||||||
effort--;
|
effort--;
|
||||||
idx = (idx+1) % TRACKING_TABLE_SIZE;
|
raxSeek(&ri,"^",NULL,0);
|
||||||
if (TrackingTable[idx] != NULL) {
|
raxRandomWalk(&ri,0);
|
||||||
trackingInvalidateSlot(idx);
|
rax *ids = ri.data;
|
||||||
if (TrackingTableUsedSlots <= max_slots) {
|
TrackingTableTotalItems -= raxSize(ids);
|
||||||
|
raxFree(ids);
|
||||||
|
raxRemove(TrackingTable,ri.key,ri.key_len,NULL);
|
||||||
|
if (raxSize(TrackingTable) <= max_keys) {
|
||||||
timeout_counter = 0;
|
timeout_counter = 0;
|
||||||
|
raxStop(&ri);
|
||||||
return; /* Return ASAP: we are again under the limit. */
|
return; /* Return ASAP: we are again under the limit. */
|
||||||
} else {
|
|
||||||
break; /* Jump to next random position. */
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} while(effort > 0);
|
|
||||||
}
|
/* If we reach this point, we were not able to go under the configured
|
||||||
|
* limit using the maximum effort we had for this run. */
|
||||||
|
raxStop(&ri);
|
||||||
timeout_counter++;
|
timeout_counter++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This function will run the prefixes of clients in BCAST mode and
|
||||||
|
* keys that were modified about each prefix, and will send the
|
||||||
|
* notifications to each client in each prefix. */
|
||||||
|
void trackingBroadcastInvalidationMessages(void) {
|
||||||
|
raxIterator ri, ri2;
|
||||||
|
|
||||||
|
/* Return ASAP if there is nothing to do here. */
|
||||||
|
if (TrackingTable == NULL || !server.tracking_clients) return;
|
||||||
|
|
||||||
|
raxStart(&ri,PrefixTable);
|
||||||
|
raxSeek(&ri,"^",NULL,0);
|
||||||
|
while(raxNext(&ri)) {
|
||||||
|
bcastState *bs = ri.data;
|
||||||
|
if (raxSize(bs->keys)) {
|
||||||
|
/* Create the array reply with the list of keys once, then send
|
||||||
|
* it to all the clients subscribed to this prefix. */
|
||||||
|
char buf[32];
|
||||||
|
size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys));
|
||||||
|
sds proto = sdsempty();
|
||||||
|
proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15);
|
||||||
|
proto = sdscatlen(proto,"*",1);
|
||||||
|
proto = sdscatlen(proto,buf,len);
|
||||||
|
proto = sdscatlen(proto,"\r\n",2);
|
||||||
|
raxStart(&ri2,bs->keys);
|
||||||
|
raxSeek(&ri2,"^",NULL,0);
|
||||||
|
while(raxNext(&ri2)) {
|
||||||
|
len = ll2string(buf,sizeof(buf),ri2.key_len);
|
||||||
|
proto = sdscatlen(proto,"$",1);
|
||||||
|
proto = sdscatlen(proto,buf,len);
|
||||||
|
proto = sdscatlen(proto,"\r\n",2);
|
||||||
|
proto = sdscatlen(proto,ri2.key,ri2.key_len);
|
||||||
|
proto = sdscatlen(proto,"\r\n",2);
|
||||||
|
}
|
||||||
|
raxStop(&ri2);
|
||||||
|
|
||||||
|
/* Send this array of keys to every client in the list. */
|
||||||
|
raxStart(&ri2,bs->clients);
|
||||||
|
raxSeek(&ri2,"^",NULL,0);
|
||||||
|
while(raxNext(&ri2)) {
|
||||||
|
client *c;
|
||||||
|
memcpy(&c,ri2.key,sizeof(c));
|
||||||
|
sendTrackingMessage(c,proto,sdslen(proto),1);
|
||||||
|
}
|
||||||
|
raxStop(&ri2);
|
||||||
|
|
||||||
|
/* Clean up: we can remove everything from this state, because we
|
||||||
|
* want to only track the new keys that will be accumulated starting
|
||||||
|
* from now. */
|
||||||
|
sdsfree(proto);
|
||||||
|
}
|
||||||
|
raxFree(bs->keys);
|
||||||
|
bs->keys = raxNew();
|
||||||
|
}
|
||||||
|
raxStop(&ri);
|
||||||
|
}
|
||||||
|
|
||||||
/* This is just used in order to access the amount of used slots in the
|
/* This is just used in order to access the amount of used slots in the
|
||||||
* tracking table. */
|
* tracking table. */
|
||||||
unsigned long long trackingGetUsedSlots(void) {
|
uint64_t trackingGetTotalItems(void) {
|
||||||
return TrackingTableUsedSlots;
|
return TrackingTableTotalItems;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t trackingGetTotalKeys(void) {
|
||||||
|
return raxSize(TrackingTable);
|
||||||
}
|
}
|
||||||
|
66
tests/unit/tracking.tcl
Normal file
66
tests/unit/tracking.tcl
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
start_server {tags {"tracking"}} {
|
||||||
|
# Create a deferred client we'll use to redirect invalidation
|
||||||
|
# messages to.
|
||||||
|
set rd1 [redis_deferring_client]
|
||||||
|
$rd1 client id
|
||||||
|
set redir [$rd1 read]
|
||||||
|
$rd1 subscribe __redis__:invalidate
|
||||||
|
$rd1 read ; # Consume the SUBSCRIBE reply.
|
||||||
|
|
||||||
|
test {Clients are able to enable tracking and redirect it} {
|
||||||
|
r CLIENT TRACKING on REDIRECT $redir
|
||||||
|
} {*OK}
|
||||||
|
|
||||||
|
test {The other connection is able to get invalidations} {
|
||||||
|
r SET a 1
|
||||||
|
r GET a
|
||||||
|
r INCR a
|
||||||
|
r INCR b ; # This key should not be notified, since it wasn't fetched.
|
||||||
|
set keys [lindex [$rd1 read] 2]
|
||||||
|
assert {[llength $keys] == 1}
|
||||||
|
assert {[lindex $keys 0] eq {a}}
|
||||||
|
}
|
||||||
|
|
||||||
|
test {The client is now able to disable tracking} {
|
||||||
|
# Make sure to add a few more keys in the tracking list
|
||||||
|
# so that we can check for leaks, as a side effect.
|
||||||
|
r MGET a b c d e f g
|
||||||
|
r CLIENT TRACKING off
|
||||||
|
}
|
||||||
|
|
||||||
|
test {Clients can enable the BCAST mode with the empty prefix} {
|
||||||
|
r CLIENT TRACKING on BCAST REDIRECT $redir
|
||||||
|
} {*OK*}
|
||||||
|
|
||||||
|
test {The connection gets invalidation messages about all the keys} {
|
||||||
|
r MSET a 1 b 2 c 3
|
||||||
|
set keys [lsort [lindex [$rd1 read] 2]]
|
||||||
|
assert {$keys eq {a b c}}
|
||||||
|
}
|
||||||
|
|
||||||
|
test {Clients can enable the BCAST mode with prefixes} {
|
||||||
|
r CLIENT TRACKING off
|
||||||
|
r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX a: PREFIX b:
|
||||||
|
r MULTI
|
||||||
|
r INCR a:1
|
||||||
|
r INCR a:2
|
||||||
|
r INCR b:1
|
||||||
|
r INCR b:2
|
||||||
|
r EXEC
|
||||||
|
# Because of the internals, we know we are going to receive
|
||||||
|
# two separated notifications for the two different prefixes.
|
||||||
|
set keys1 [lsort [lindex [$rd1 read] 2]]
|
||||||
|
set keys2 [lsort [lindex [$rd1 read] 2]]
|
||||||
|
set keys [lsort [list {*}$keys1 {*}$keys2]]
|
||||||
|
assert {$keys eq {a:1 a:2 b:1 b:2}}
|
||||||
|
}
|
||||||
|
|
||||||
|
test {Adding prefixes to BCAST mode works} {
|
||||||
|
r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX c:
|
||||||
|
r INCR c:1234
|
||||||
|
set keys [lsort [lindex [$rd1 read] 2]]
|
||||||
|
assert {$keys eq {c:1234}}
|
||||||
|
}
|
||||||
|
|
||||||
|
$rd1 close
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user