From 92357b2d61bff014e3198bb39886d6b4e26163d1 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 7 Feb 2020 14:03:43 +0100 Subject: [PATCH 01/13] Tracking: first conversion from hashing to key names. --- src/server.c | 4 +- src/server.h | 2 +- src/tracking.c | 206 +++++++++++++++++++++---------------------------- 3 files changed, 91 insertions(+), 121 deletions(-) diff --git a/src/server.c b/src/server.c index f87dfba46..df6a8c1f4 100644 --- a/src/server.c +++ b/src/server.c @@ -4221,7 +4221,7 @@ sds genRedisInfoString(const char *section) { "active_defrag_misses:%lld\r\n" "active_defrag_key_hits:%lld\r\n" "active_defrag_key_misses:%lld\r\n" - "tracking_used_slots:%lld\r\n", + "tracking_tracked_keys:%lld\r\n", server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), @@ -4249,7 +4249,7 @@ sds genRedisInfoString(const char *section) { server.stat_active_defrag_misses, server.stat_active_defrag_key_hits, server.stat_active_defrag_key_misses, - trackingGetUsedSlots()); + (unsigned long long) trackingGetTotalItems()); } /* Replication */ diff --git a/src/server.h b/src/server.h index dfbc15ac3..a30db6b57 100644 --- a/src/server.h +++ b/src/server.h @@ -1654,7 +1654,7 @@ void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); void trackingInvalidateKeysOnFlush(int dbid); void trackingLimitUsedSlots(void); -unsigned long long trackingGetUsedSlots(void); +uint64_t trackingGetTotalItems(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/tracking.c b/src/tracking.c index acb97800a..ecb7fbdcc 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -30,37 +30,22 @@ #include "server.h" -/* The tracking table is constituted by 2^24 radix trees (each tree, and the - * table itself, are allocated in a lazy way only when needed) tracking - * clients that may have certain keys in their local, client side, cache. - * - * Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash - * slots, however here the function we use is crc64, taking the least - * significant 24 bits of the output. +/* The tracking table is constituted by a radix tree of keys, each pointing + * to a radix tree of client IDs, used to track the clients that may have + * certain keys in their local, client side, cache. * * When a client enables tracking with "CLIENT TRACKING on", each key served to - * the client is hashed to one of such slots, and Redis will remember what - * client may have keys about such slot. Later, when a key in a given slot is - * modified, all the clients that may have local copies of keys in that slot - * will receive an invalidation message. There is no distinction of database - * number: a single table is used. + * the client is remembered in the table mapping the keys to the client IDs. + * Later, when a key is modified, all the clients that may have local copy + * of such key will receive an invalidation message. * * Clients will normally take frequently requested objects in memory, removing - * them when invalidation messages are received. A strategy clients may use is - * to just cache objects in a dictionary, associating to each cached object - * some incremental epoch, or just a timestamp. When invalidation messages are - * received clients may store, in a different table, the timestamp (or epoch) - * of the invalidation of such given slot: later when accessing objects, the - * eviction of stale objects may be performed in a lazy way by checking if the - * cached object timestamp is older than the invalidation timestamp for such - * objects. - * - * The output of the 24 bit hash function is very large (more than 16 million - * possible slots), so clients that may want to use less resources may only - * use the most significant bits instead of the full 24 bits. */ -#define TRACKING_TABLE_SIZE (1<<24) -rax **TrackingTable = NULL; -unsigned long TrackingTableUsedSlots = 0; + * them when invalidation messages are received. */ +rax *TrackingTable = NULL; +uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across + the whole tracking table. This givesn + an hint about the total memory we + are using server side for CSC. */ robj *TrackingChannelName; /* Remove the tracking state from the client 'c'. Note that there is not much @@ -90,7 +75,7 @@ void enableTracking(client *c, uint64_t redirect_to) { c->client_tracking_redirection = redirect_to; server.tracking_clients++; if (TrackingTable == NULL) { - TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE); + TrackingTable = raxNew(); TrackingChannelName = createStringObject("__redis__:invalidate",20); } } @@ -108,19 +93,20 @@ void trackingRememberKeys(client *c) { for(int j = 0; j < numkeys; j++) { int idx = keys[j]; sds sdskey = c->argv[idx]->ptr; - uint64_t hash = crc64(0, - (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - if (TrackingTable[hash] == NULL) { - TrackingTable[hash] = raxNew(); - TrackingTableUsedSlots++; + rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); + if (ids == raxNotFound) { + ids = raxNew(); + int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey, + sdslen(sdskey),ids, NULL); + serverAssert(inserted == 1); } - raxTryInsert(TrackingTable[hash], - (unsigned char*)&c->id,sizeof(c->id),NULL,NULL); + if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL)) + TrackingTableTotalItems++; } getKeysFreeResult(keys); } -void sendTrackingMessage(client *c, long long hash) { +void sendTrackingMessage(client *c, char *keyname, size_t keylen) { int using_redirection = 0; if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); @@ -146,49 +132,44 @@ void sendTrackingMessage(client *c, long long hash) { if (c->resp > 2) { addReplyPushLen(c,2); addReplyBulkCBuffer(c,"invalidate",10); - addReplyLongLong(c,hash); + addReplyBulkCBuffer(c,keyname,keylen); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { - robj *msg = createStringObjectFromLongLong(hash); - addReplyPubsubMessage(c,TrackingChannelName,msg); - decrRefCount(msg); + /* We use a static object to speedup things, however we assume + * that addReplyPubsubMessage() will not take a reference. */ + robj keyobj; + initStaticStringObject(keyobj,keyname); + addReplyPubsubMessage(c,TrackingChannelName,&keyobj); + serverAssert(keyobj.refcount == 1); } } -/* Invalidates a caching slot: this is actually the low level implementation - * of the API that Redis calls externally, that is trackingInvalidateKey(). */ -void trackingInvalidateSlot(uint64_t slot) { - if (TrackingTable == NULL || TrackingTable[slot] == NULL) return; - - raxIterator ri; - raxStart(&ri,TrackingTable[slot]); - raxSeek(&ri,"^",NULL,0); - while(raxNext(&ri)) { - uint64_t id; - memcpy(&id,ri.key,sizeof(id)); - client *c = lookupClientByID(id); - if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; - sendTrackingMessage(c,slot); - } - raxStop(&ri); - - /* Free the tracking table: we'll create the radix tree and populate it - * again if more keys will be modified in this caching slot. */ - raxFree(TrackingTable[slot]); - TrackingTable[slot] = NULL; - TrackingTableUsedSlots--; -} - /* This function is called from signalModifiedKey() or other places in Redis * when a key changes value. In the context of keys tracking, our task here is * to send a notification to every client that may have keys about such caching * slot. */ void trackingInvalidateKey(robj *keyobj) { - if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return; - + if (TrackingTable == NULL) return; sds sdskey = keyobj->ptr; - uint64_t hash = crc64(0, - (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - trackingInvalidateSlot(hash); + rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); + if (ids == raxNotFound) return;; + + raxIterator ri; + raxStart(&ri,ids); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + uint64_t id; + memcpy(&id,ri.key,sizeof(id)); + client *c = lookupClientByID(id); + if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; + sendTrackingMessage(c,sdskey,sdslen(sdskey)); + } + raxStop(&ri); + + /* Free the tracking table: we'll create the radix tree and populate it + * again if more keys will be modified in this caching slot. */ + TrackingTableTotalItems -= raxSize(ids); + raxFree(ids); + raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL); } /* This function is called when one or all the Redis databases are flushed @@ -205,6 +186,10 @@ void trackingInvalidateKey(robj *keyobj) { * we just send the invalidation message to all the clients, but don't * flush the table: it will slowly get garbage collected as more keys * are modified in the used caching slots. */ +void freeTrackingRadixTree(void *rt) { + raxFree(rt); +} + void trackingInvalidateKeysOnFlush(int dbid) { if (server.tracking_clients) { listNode *ln; @@ -213,84 +198,69 @@ void trackingInvalidateKeysOnFlush(int dbid) { while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); if (c->flags & CLIENT_TRACKING) { - sendTrackingMessage(c,-1); + sendTrackingMessage(c,"",1); } } } /* In case of FLUSHALL, reclaim all the memory used by tracking. */ if (dbid == -1 && TrackingTable) { - for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) { - if (TrackingTable[j] != NULL) { - raxFree(TrackingTable[j]); - TrackingTable[j] = NULL; - TrackingTableUsedSlots--; - } - } - - /* If there are no clients with tracking enabled, we can even - * reclaim the memory used by the table itself. The code assumes - * the table is allocated only if there is at least one client alive - * with tracking enabled. */ - if (server.tracking_clients == 0) { - zfree(TrackingTable); - TrackingTable = NULL; - } + raxFreeWithCallback(TrackingTable,freeTrackingRadixTree); + TrackingTableTotalItems = 0; } } /* Tracking forces Redis to remember information about which client may have - * keys about certian caching slots. In workloads where there are a lot of - * reads, but keys are hardly modified, the amount of information we have - * to remember server side could be a lot: for each 16 millions of caching - * slots we may end with a radix tree containing many entries. + * certain keys. In workloads where there are a lot of reads, but keys are + * hardly modified, the amount of information we have to remember server side + * could be a lot, with the number of keys being totally not bound. * - * So Redis allows the user to configure a maximum fill rate for the + * So Redis allows the user to configure a maximum number of keys for the * invalidation table. This function makes sure that we don't go over the * specified fill rate: if we are over, we can just evict informations about - * random caching slots, and send invalidation messages to clients like if - * the key was modified. */ + * a random key, and send invalidation messages to clients like if the key was + * modified. */ void trackingLimitUsedSlots(void) { static unsigned int timeout_counter = 0; - + if (TrackingTable == NULL) return; if (server.tracking_table_max_fill == 0) return; /* No limits set. */ - unsigned int max_slots = - (TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill; - if (TrackingTableUsedSlots <= max_slots) { + size_t max_keys = server.tracking_table_max_fill; + if (raxSize(TrackingTable) <= max_keys) { timeout_counter = 0; return; /* Limit not reached. */ } - /* We have to invalidate a few slots to reach the limit again. The effort + /* We have to invalidate a few keys to reach the limit again. The effort * we do here is proportional to the number of times we entered this * function and found that we are still over the limit. */ int effort = 100 * (timeout_counter+1); - /* Let's start at a random position, and perform linear probing, in order - * to improve cache locality. However once we are able to find an used - * slot, jump again randomly, in order to avoid creating big holes in the - * table (that will make this funciton use more resourced later). */ + /* We just remove one key after another by using a random walk. */ + raxIterator ri; + raxStart(&ri,TrackingTable); while(effort > 0) { - unsigned int idx = rand() % TRACKING_TABLE_SIZE; - do { - effort--; - idx = (idx+1) % TRACKING_TABLE_SIZE; - if (TrackingTable[idx] != NULL) { - trackingInvalidateSlot(idx); - if (TrackingTableUsedSlots <= max_slots) { - timeout_counter = 0; - return; /* Return ASAP: we are again under the limit. */ - } else { - break; /* Jump to next random position. */ - } - } - } while(effort > 0); + effort--; + raxSeek(&ri,"^",NULL,0); + raxRandomWalk(&ri,0); + rax *ids = ri.data; + TrackingTableTotalItems -= raxSize(ids); + raxFree(ids); + raxRemove(TrackingTable,ri.key,ri.key_len,NULL); + if (raxSize(TrackingTable) <= max_keys) { + timeout_counter = 0; + raxStop(&ri); + return; /* Return ASAP: we are again under the limit. */ + } } + + /* If we reach this point, we were not able to go under the configured + * limit using the maximum effort we had for this run. */ + raxStop(&ri); timeout_counter++; } /* This is just used in order to access the amount of used slots in the * tracking table. */ -unsigned long long trackingGetUsedSlots(void) { - return TrackingTableUsedSlots; +uint64_t trackingGetTotalItems(void) { + return TrackingTableTotalItems; } From d933d6f2a42b9f0add28df212b5cfbc7756e3cf2 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 7 Feb 2020 17:19:11 +0100 Subject: [PATCH 02/13] Tracking: rename INFO field with total items. --- src/server.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.c b/src/server.c index df6a8c1f4..2cf8dbf19 100644 --- a/src/server.c +++ b/src/server.c @@ -4221,7 +4221,7 @@ sds genRedisInfoString(const char *section) { "active_defrag_misses:%lld\r\n" "active_defrag_key_hits:%lld\r\n" "active_defrag_key_misses:%lld\r\n" - "tracking_tracked_keys:%lld\r\n", + "tracking_total_items:%lld\r\n", server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), From 1ea66724300f1cd7c97b842a7c9190e72ea91976 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 7 Feb 2020 18:12:10 +0100 Subject: [PATCH 03/13] Rax.c: populate data field after random walk. --- src/rax.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rax.c b/src/rax.c index 29b74ae90..a560dde02 100644 --- a/src/rax.c +++ b/src/rax.c @@ -1766,6 +1766,7 @@ int raxRandomWalk(raxIterator *it, size_t steps) { if (n->iskey) steps--; } it->node = n; + it->data = raxGetData(it->node); return 1; } From 85e4777d5c4411b499a845565729910bb09d64ac Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 7 Feb 2020 18:12:45 +0100 Subject: [PATCH 04/13] Tracking: minor change of names and new INFO field. --- src/config.c | 2 +- src/server.c | 2 ++ src/server.h | 3 ++- src/tracking.c | 8 ++++++-- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/config.c b/src/config.c index a6b374817..b2e5fc12e 100644 --- a/src/config.c +++ b/src/config.c @@ -2160,7 +2160,7 @@ standardConfig configs[] = { createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL), - createIntConfig("tracking-table-max-fill", NULL, MODIFIABLE_CONFIG, 0, 100, server.tracking_table_max_fill, 10, INTEGER_CONFIG, NULL, NULL), /* Default: 10% tracking table max fill. */ + createIntConfig("tracking-table-max-fill", NULL, MODIFIABLE_CONFIG, 0, 100, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 10% tracking table max number of keys tracked. */ createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, server.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */ createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ), createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves), diff --git a/src/server.c b/src/server.c index 2cf8dbf19..910cf5410 100644 --- a/src/server.c +++ b/src/server.c @@ -4221,6 +4221,7 @@ sds genRedisInfoString(const char *section) { "active_defrag_misses:%lld\r\n" "active_defrag_key_hits:%lld\r\n" "active_defrag_key_misses:%lld\r\n" + "tracking_total_keys:%lld\r\n" "tracking_total_items:%lld\r\n", server.stat_numconnections, server.stat_numcommands, @@ -4249,6 +4250,7 @@ sds genRedisInfoString(const char *section) { server.stat_active_defrag_misses, server.stat_active_defrag_key_hits, server.stat_active_defrag_key_misses, + (unsigned long long) trackingGetTotalKeys(), (unsigned long long) trackingGetTotalItems()); } diff --git a/src/server.h b/src/server.h index a30db6b57..3e055a7db 100644 --- a/src/server.h +++ b/src/server.h @@ -1306,7 +1306,7 @@ struct redisServer { list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Client side caching. */ unsigned int tracking_clients; /* # of clients with tracking enabled.*/ - int tracking_table_max_fill; /* Max fill percentage. */ + int tracking_table_max_keys; /* Max number of keys in tracking table. */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -1655,6 +1655,7 @@ void trackingInvalidateKey(robj *keyobj); void trackingInvalidateKeysOnFlush(int dbid); void trackingLimitUsedSlots(void); uint64_t trackingGetTotalItems(void); +uint64_t trackingGetTotalKeys(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/tracking.c b/src/tracking.c index ecb7fbdcc..9c1c9620c 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -223,8 +223,8 @@ void trackingInvalidateKeysOnFlush(int dbid) { void trackingLimitUsedSlots(void) { static unsigned int timeout_counter = 0; if (TrackingTable == NULL) return; - if (server.tracking_table_max_fill == 0) return; /* No limits set. */ - size_t max_keys = server.tracking_table_max_fill; + if (server.tracking_table_max_keys == 0) return; /* No limits set. */ + size_t max_keys = server.tracking_table_max_keys; if (raxSize(TrackingTable) <= max_keys) { timeout_counter = 0; return; /* Limit not reached. */ @@ -264,3 +264,7 @@ void trackingLimitUsedSlots(void) { uint64_t trackingGetTotalItems(void) { return TrackingTableTotalItems; } + +uint64_t trackingGetTotalKeys(void) { + return raxSize(TrackingTable); +} From f53cc00c09a4e7c612b3781021246cbbeb533d7b Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 10 Feb 2020 13:42:18 +0100 Subject: [PATCH 05/13] Tracking: always reply with an array of keys. --- src/pubsub.c | 8 ++++++-- src/tracking.c | 5 ++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/pubsub.c b/src/pubsub.c index 994dd9734..5cb4298e0 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -35,7 +35,11 @@ int clientSubscriptionsCount(client *c); * Pubsub client replies API *----------------------------------------------------------------------------*/ -/* Send a pubsub message of type "message" to the client. */ +/* Send a pubsub message of type "message" to the client. + * Normally 'msg' is a Redis object containing the string to send as + * message. However if the caller sets 'msg' as NULL, it will be able + * to send a special message (for instance an Array type) by using the + * addReply*() API family. */ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); @@ -43,7 +47,7 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { addReplyPushLen(c,3); addReply(c,shared.messagebulk); addReplyBulk(c,channel); - addReplyBulk(c,msg); + if (msg) addReplyBulk(c,msg); } /* Send a pubsub message of type "pmessage" to the client. The difference diff --git a/src/tracking.c b/src/tracking.c index 9c1c9620c..3122563ac 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -132,13 +132,16 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen) { if (c->resp > 2) { addReplyPushLen(c,2); addReplyBulkCBuffer(c,"invalidate",10); + addReplyArrayLen(c,1); addReplyBulkCBuffer(c,keyname,keylen); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ robj keyobj; initStaticStringObject(keyobj,keyname); - addReplyPubsubMessage(c,TrackingChannelName,&keyobj); + addReplyPubsubMessage(c,TrackingChannelName,NULL); + addReplyArrayLen(c,1); + addReplyBulk(c,&keyobj); serverAssert(keyobj.refcount == 1); } } From dfe126f3e92c9770bef1915b6e64add2c41edcfa Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 10 Feb 2020 17:18:11 +0100 Subject: [PATCH 06/13] Tracking: BCAST: parsing of the options + skeleton. --- src/networking.c | 63 +++++++++++++++++++++++++++++++++++++----------- src/server.c | 5 +++- src/server.h | 8 +++++- src/tracking.c | 16 +++++++++--- 4 files changed, 73 insertions(+), 19 deletions(-) diff --git a/src/networking.c b/src/networking.c index 2b0f7464a..690a134f1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -154,6 +154,7 @@ client *createClient(connection *conn) { c->peerid = NULL; c->client_list_node = NULL; c->client_tracking_redirection = 0; + c->client_tracking_prefix_nodes = NULL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; c->auth_module = NULL; @@ -2219,38 +2220,72 @@ NULL UNIT_MILLISECONDS) != C_OK) return; pauseClients(duration); addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && - (c->argc == 3 || c->argc == 5)) - { - /* CLIENT TRACKING (on|off) [REDIRECT ] */ + } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) { + /* CLIENT TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] + * [PREFIX second] ... */ long long redir = 0; + int bcast = 0; + robj **prefix; + size_t numprefix = 0; - /* Parse the redirection option: we'll require the client with - * the specified ID to exist right now, even if it is possible - * it will get disconnected later. */ - if (c->argc == 5) { - if (strcasecmp(c->argv[3]->ptr,"redirect") != 0) { - addReply(c,shared.syntaxerr); - return; - } else { - if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) != + /* Parse the options. */ + if (for int j = 3; j < argc; j++) { + int moreargs = (c->argc-1) - j; + + if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) { + j++; + if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) != C_OK) return; + /* We will require the client with the specified ID to exist + * right now, even if it is possible that it gets disconnected + * later. Still a valid sanity check. */ if (lookupClientByID(redir) == NULL) { addReplyError(c,"The client ID you want redirect to " "does not exist"); return; } + } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) { + bcast++; + } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && morearg) { + j++; + prefix = zrealloc(sizeof(robj*)*(numprefix+1)); + prefix[numprefix++] = argv[j]; + } else { + addReply(c,shared.syntaxerr); + return; } } + /* Make sure options are compatible among each other and with the + * current state of the client. */ + if (!bcast && numprefix) { + addReplyError("PREFIX option requires BCAST mode to be enabled"); + zfree(prefix); + return; + } + + if (client->flags & CLIENT_TRACKING) { + int oldbcast = !!client->flags & CLIENT_TRACKING_BCAST; + if (oldbcast != bcast) { + } + addReplyError( + "You can't switch BCAST mode on/off before disabling " + "tracking for this client, and then re-enabling it with " + "a different mode."); + zfree(prefix); + return; + } + + /* Options are ok: enable or disable the tracking for this client. */ if (!strcasecmp(c->argv[2]->ptr,"on")) { - enableTracking(c,redir); + enableTracking(c,redir,bcast,prefix,numprefix); } else if (!strcasecmp(c->argv[2]->ptr,"off")) { disableTracking(c); } else { addReply(c,shared.syntaxerr); return; } + zfree(prefix); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) { /* CLIENT GETREDIR */ diff --git a/src/server.c b/src/server.c index 910cf5410..1001fa4f7 100644 --- a/src/server.c +++ b/src/server.c @@ -3310,8 +3310,11 @@ void call(client *c, int flags) { if (c->cmd->flags & CMD_READONLY) { client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ? server.lua_caller : c; - if (caller->flags & CLIENT_TRACKING) + if (caller->flags & CLIENT_TRACKING && + !(caller->flags & CLIENT_TRACKING_BCAST)) + { trackingRememberKeys(caller); + } } server.fixed_time_expire--; diff --git a/src/server.h b/src/server.h index 3e055a7db..d3ca0d01b 100644 --- a/src/server.h +++ b/src/server.h @@ -247,6 +247,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to perform client side caching. */ #define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */ +#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -822,6 +823,11 @@ typedef struct client { * invalidation messages for keys fetched by this client will be send to * the specified client ID. */ uint64_t client_tracking_redirection; + list *client_tracking_prefix_nodes; /* This list contains listNode pointers + to the nodes we have in every list + of clients in the tracking bcast + table. This way we can remove our + client in O(1) for each list. */ /* Response buffer */ int bufpos; @@ -1648,7 +1654,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...); #endif /* Client side caching (tracking mode) */ -void enableTracking(client *c, uint64_t redirect_to); +void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix); void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); diff --git a/src/tracking.c b/src/tracking.c index 3122563ac..413b21328 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -42,6 +42,7 @@ * Clients will normally take frequently requested objects in memory, removing * them when invalidation messages are received. */ rax *TrackingTable = NULL; +rax *PrefixTable = NULL; uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across the whole tracking table. This givesn an hint about the total memory we @@ -68,16 +69,25 @@ void disableTracking(client *c) { * eventually get freed, we'll send a message to the original client to * inform it of the condition. Multiple clients can redirect the invalidation * messages to the same client ID. */ -void enableTracking(client *c, uint64_t redirect_to) { - if (c->flags & CLIENT_TRACKING) return; +void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) { c->flags |= CLIENT_TRACKING; c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR; c->client_tracking_redirection = redirect_to; - server.tracking_clients++; + if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++; if (TrackingTable == NULL) { TrackingTable = raxNew(); + PrefixTable = raxNew(); TrackingChannelName = createStringObject("__redis__:invalidate",20); } + + if (bcast) { + c->flags |= CLIENT_TRACKING_BCAST; + if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); + for (int j = 0; j < numprefix; j++) { + sds sdsprefix = prefix[j]->ptr; + enableBcastTrackingForPrefix(c,sdsprefix,sdslen(prefix)); + } + } } /* This function is called after the excution of a readonly command in the From 3f7ba86255b9d6acd73dd39cc8f05d3d3f8741a9 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 11 Feb 2020 17:26:27 +0100 Subject: [PATCH 07/13] Tracking: BCAST: registration in the prefix table. --- src/networking.c | 21 +++++++++--------- src/server.h | 9 +++----- src/tracking.c | 57 +++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 67 insertions(+), 20 deletions(-) diff --git a/src/networking.c b/src/networking.c index 690a134f1..344b76260 100644 --- a/src/networking.c +++ b/src/networking.c @@ -154,7 +154,7 @@ client *createClient(connection *conn) { c->peerid = NULL; c->client_list_node = NULL; c->client_tracking_redirection = 0; - c->client_tracking_prefix_nodes = NULL; + c->client_tracking_prefixes = NULL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; c->auth_module = NULL; @@ -2028,7 +2028,6 @@ int clientSetNameOrReply(client *c, robj *name) { void clientCommand(client *c) { listNode *ln; listIter li; - client *client; if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { @@ -2142,7 +2141,7 @@ NULL /* Iterate clients killing all the matching clients. */ listRewind(server.clients,&li); while ((ln = listNext(&li)) != NULL) { - client = listNodeValue(ln); + client *client = listNodeValue(ln); if (addr && strcmp(getClientPeerId(client),addr) != 0) continue; if (type != -1 && getClientType(client) != type) continue; if (id != 0 && client->id != id) continue; @@ -2229,7 +2228,7 @@ NULL size_t numprefix = 0; /* Parse the options. */ - if (for int j = 3; j < argc; j++) { + for (int j = 3; j < c->argc; j++) { int moreargs = (c->argc-1) - j; if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) { @@ -2246,10 +2245,10 @@ NULL } } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) { bcast++; - } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && morearg) { + } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) { j++; - prefix = zrealloc(sizeof(robj*)*(numprefix+1)); - prefix[numprefix++] = argv[j]; + prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1)); + prefix[numprefix++] = c->argv[j]; } else { addReply(c,shared.syntaxerr); return; @@ -2259,16 +2258,16 @@ NULL /* Make sure options are compatible among each other and with the * current state of the client. */ if (!bcast && numprefix) { - addReplyError("PREFIX option requires BCAST mode to be enabled"); + addReplyError(c,"PREFIX option requires BCAST mode to be enabled"); zfree(prefix); return; } - if (client->flags & CLIENT_TRACKING) { - int oldbcast = !!client->flags & CLIENT_TRACKING_BCAST; + if (c->flags & CLIENT_TRACKING) { + int oldbcast = !!c->flags & CLIENT_TRACKING_BCAST; if (oldbcast != bcast) { } - addReplyError( + addReplyError(c, "You can't switch BCAST mode on/off before disabling " "tracking for this client, and then re-enabling it with " "a different mode."); diff --git a/src/server.h b/src/server.h index d3ca0d01b..725c3cbc8 100644 --- a/src/server.h +++ b/src/server.h @@ -823,12 +823,9 @@ typedef struct client { * invalidation messages for keys fetched by this client will be send to * the specified client ID. */ uint64_t client_tracking_redirection; - list *client_tracking_prefix_nodes; /* This list contains listNode pointers - to the nodes we have in every list - of clients in the tracking bcast - table. This way we can remove our - client in O(1) for each list. */ - + rax *client_tracking_prefixes; /* A dictionary of prefixes we are already + subscribed to in BCAST mode, in the + context of client side caching. */ /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; diff --git a/src/tracking.c b/src/tracking.c index 413b21328..9f46275a4 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -49,6 +49,15 @@ uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across are using server side for CSC. */ robj *TrackingChannelName; +/* This is the structure that we have as value of the PrefixTable, and + * represents the list of keys modified, and the list of clients that need + * to be notified, for a given prefix. */ +typedef struct bcastState { + rax *keys; /* Keys modified in the current event loop cycle. */ + rax *clients; /* Clients subscribed to the notification events for this + prefix. */ +} bcastState; + /* Remove the tracking state from the client 'c'. Note that there is not much * to do for us here, if not to decrement the counter of the clients in * tracking mode, because we just store the ID of the client in the tracking @@ -56,9 +65,51 @@ robj *TrackingChannelName; * client with many entries in the table is removed, it would cost a lot of * time to do the cleanup. */ void disableTracking(client *c) { + /* If this client is in broadcasting mode, we need to unsubscribe it + * from all the prefixes it is registered to. */ + if (c->flags & CLIENT_TRACKING_BCAST) { + raxIterator ri; + raxStart(&ri,c->client_tracking_prefixes); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len); + serverAssert(bs != raxNotFound); + raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL); + /* Was it the last client? Remove the prefix from the + * table. */ + if (raxSize(bs->clients) == 0) { + raxFree(bs->clients); + raxFree(bs->keys); + zfree(bs); + raxRemove(PrefixTable,ri.key,ri.key_len,NULL); + } + } + raxStop(&ri); + } + + /* Clear flags and adjust the count. */ if (c->flags & CLIENT_TRACKING) { server.tracking_clients--; - c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR); + c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| + CLIENT_TRACKING_BCAST); + } +} + +/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is + * already registered for the specified prefix, no operation is performed. */ +void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { + bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix)); + /* If this is the first client subscribing to such prefix, create + * the prefix in the table. */ + if (bs == raxNotFound) { + bs = zmalloc(sizeof(*bs)); + bs->keys = raxNew(); + bs->clients = raxNew(); + raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); + } + if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { + raxInsert(c->client_tracking_prefixes, + (unsigned char*)prefix,plen,NULL,NULL); } } @@ -83,9 +134,9 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s if (bcast) { c->flags |= CLIENT_TRACKING_BCAST; if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); - for (int j = 0; j < numprefix; j++) { + for (size_t j = 0; j < numprefix; j++) { sds sdsprefix = prefix[j]->ptr; - enableBcastTrackingForPrefix(c,sdsprefix,sdslen(prefix)); + enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix)); } } } From 71f3f3f1afe4fbb6f8634970258b5dec2d389c68 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 11 Feb 2020 18:11:59 +0100 Subject: [PATCH 08/13] Tracking: BCAST: broadcasting of keys in prefixes implemented. --- src/server.h | 1 + src/tracking.c | 104 ++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 95 insertions(+), 10 deletions(-) diff --git a/src/server.h b/src/server.h index 725c3cbc8..439bbc393 100644 --- a/src/server.h +++ b/src/server.h @@ -1659,6 +1659,7 @@ void trackingInvalidateKeysOnFlush(int dbid); void trackingLimitUsedSlots(void); uint64_t trackingGetTotalItems(void); uint64_t trackingGetTotalKeys(void); +void trackingBroadcastInvalidationMessages(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/tracking.c b/src/tracking.c index 9f46275a4..345c5f1ad 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -167,7 +167,17 @@ void trackingRememberKeys(client *c) { getKeysFreeResult(keys); } -void sendTrackingMessage(client *c, char *keyname, size_t keylen) { +/* Given a key name, this function sends an invalidation message in the + * proper channel (depending on RESP version: PubSub or Push message) and + * to the proper client (in case fo redirection), in the context of the + * client 'c' with tracking enabled. + * + * In case the 'proto' argument is non zero, the function will assume that + * 'keyname' points to a buffer of 'keylen' bytes already expressed in the + * form of Redis RESP protocol, representing an array of keys to send + * to the client as value of the invalidation. This is used in BCAST mode + * in order to optimized the implementation to use less CPU time. */ +void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { int using_redirection = 0; if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); @@ -193,18 +203,38 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen) { if (c->resp > 2) { addReplyPushLen(c,2); addReplyBulkCBuffer(c,"invalidate",10); - addReplyArrayLen(c,1); - addReplyBulkCBuffer(c,keyname,keylen); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ - robj keyobj; - initStaticStringObject(keyobj,keyname); addReplyPubsubMessage(c,TrackingChannelName,NULL); - addReplyArrayLen(c,1); - addReplyBulk(c,&keyobj); - serverAssert(keyobj.refcount == 1); } + + /* Send the "value" part, which is the array of keys. */ + if (proto) { + addReplyProto(c,keyname,keylen); + } else { + addReplyArrayLen(c,1); + addReplyBulkCBuffer(c,keyname,keylen); + } +} + +/* This function is called when a key is modified in Redis and in the case + * we have at least one client with the BCAST mode enabled. + * Its goal is to set the key in the right broadcast state if the key + * matches one or more prefixes in the prefix table. Later when we + * return to the event loop, we'll send invalidation messages to the + * clients subscribed to each prefix. */ +void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { + raxIterator ri; + raxStart(&ri,PrefixTable); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (keylen > ri.key_len) continue; + if (memcmp(ri.key,keyname,ri.key_len) != 0) continue; + bcastState *bs = ri.data; + raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); + } + raxStop(&ri); } /* This function is called from signalModifiedKey() or other places in Redis @@ -214,6 +244,10 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen) { void trackingInvalidateKey(robj *keyobj) { if (TrackingTable == NULL) return; sds sdskey = keyobj->ptr; + + if (raxSize(PrefixTable) > 0) + trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey)); + rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); if (ids == raxNotFound) return;; @@ -225,7 +259,7 @@ void trackingInvalidateKey(robj *keyobj) { memcpy(&id,ri.key,sizeof(id)); client *c = lookupClientByID(id); if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; - sendTrackingMessage(c,sdskey,sdslen(sdskey)); + sendTrackingMessage(c,sdskey,sdslen(sdskey),0); } raxStop(&ri); @@ -262,7 +296,7 @@ void trackingInvalidateKeysOnFlush(int dbid) { while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); if (c->flags & CLIENT_TRACKING) { - sendTrackingMessage(c,"",1); + sendTrackingMessage(c,"",1,0); } } } @@ -323,6 +357,56 @@ void trackingLimitUsedSlots(void) { timeout_counter++; } +/* This function will run the prefixes of clients in BCAST mode and + * keys that were modified about each prefix, and will send the + * notifications to each client in each prefix. */ +void trackingBroadcastInvalidationMessages(void) { + raxIterator ri, ri2; + raxStart(&ri,PrefixTable); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + bcastState *bs = ri.data; + /* Create the array reply with the list of keys once, then send + * it to all the clients subscribed to this prefix. */ + char buf[32]; + size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); + sds proto = sdsempty(); + proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); + proto = sdscatlen(proto,"*",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + raxStart(&ri2,bs->keys); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + len = ll2string(buf,sizeof(buf),ri2.key_len); + sds proto = sdsnewlen("$",1); + proto = sdscatlen(proto,ri2.key,ri2.key_len); + proto = sdscatlen(proto,"\r\n",2); + } + raxStop(&ri2); + + /* Send this array of keys to every client in the list. */ + raxStart(&ri2,bs->clients); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + client *c; + memcpy(&c,ri2.key,sizeof(c)); + sendTrackingMessage(c,proto,sdslen(proto),1); + } + raxStop(&ri2); + + /* Clean up: we can remove everything from this state, because we + * want to only track the new keys that will be accumulated starting + * from now. */ + sdsfree(proto); + raxFree(bs->clients); + raxFree(bs->keys); + bs->clients = raxNew(); + bs->keys = raxNew(); + } + raxStop(&ri); +} + /* This is just used in order to access the amount of used slots in the * tracking table. */ uint64_t trackingGetTotalItems(void) { From 40194a2a6809520b5f01da4a7b41afe2a2441f64 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 12 Feb 2020 19:22:04 +0100 Subject: [PATCH 09/13] Tracking: BCAST: basic feature now works. --- src/networking.c | 2 +- src/server.c | 4 +++ src/tracking.c | 89 +++++++++++++++++++++++++++--------------------- 3 files changed, 55 insertions(+), 40 deletions(-) diff --git a/src/networking.c b/src/networking.c index 344b76260..46534253e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2224,7 +2224,7 @@ NULL * [PREFIX second] ... */ long long redir = 0; int bcast = 0; - robj **prefix; + robj **prefix = NULL; size_t numprefix = 0; /* Parse the options. */ diff --git a/src/server.c b/src/server.c index 1001fa4f7..22c81070c 100644 --- a/src/server.c +++ b/src/server.c @@ -2124,6 +2124,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { if (listLength(server.unblocked_clients)) processUnblockedClients(); + /* Send the invalidation messages to clients participating to the + * client side caching protocol in broadcasting (BCAST) mode. */ + trackingBroadcastInvalidationMessages(); + /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); diff --git a/src/tracking.c b/src/tracking.c index 345c5f1ad..672b886a3 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -85,6 +85,8 @@ void disableTracking(client *c) { } } raxStop(&ri); + raxFree(c->client_tracking_prefixes); + c->client_tracking_prefixes = NULL; } /* Clear flags and adjust the count. */ @@ -108,6 +110,8 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); } if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { + if (c->client_tracking_prefixes == NULL) + c->client_tracking_prefixes = raxNew(); raxInsert(c->client_tracking_prefixes, (unsigned char*)prefix,plen,NULL,NULL); } @@ -121,10 +125,10 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { * inform it of the condition. Multiple clients can redirect the invalidation * messages to the same client ID. */ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) { - c->flags |= CLIENT_TRACKING; - c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR; - c->client_tracking_redirection = redirect_to; if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++; + c->flags |= CLIENT_TRACKING; + c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST); + c->client_tracking_redirection = redirect_to; if (TrackingTable == NULL) { TrackingTable = raxNew(); PrefixTable = raxNew(); @@ -229,10 +233,11 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { raxStart(&ri,PrefixTable); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { - if (keylen > ri.key_len) continue; - if (memcmp(ri.key,keyname,ri.key_len) != 0) continue; - bcastState *bs = ri.data; - raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); + if (ri.key_len > keylen) continue; + if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0) + continue; + bcastState *bs = ri.data; + raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); } raxStop(&ri); } @@ -362,46 +367,52 @@ void trackingLimitUsedSlots(void) { * notifications to each client in each prefix. */ void trackingBroadcastInvalidationMessages(void) { raxIterator ri, ri2; + + /* Return ASAP if there is nothing to do here. */ + if (TrackingTable == NULL || !server.tracking_clients) return; + raxStart(&ri,PrefixTable); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { bcastState *bs = ri.data; - /* Create the array reply with the list of keys once, then send - * it to all the clients subscribed to this prefix. */ - char buf[32]; - size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); - sds proto = sdsempty(); - proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); - proto = sdscatlen(proto,"*",1); - proto = sdscatlen(proto,buf,len); - proto = sdscatlen(proto,"\r\n",2); - raxStart(&ri2,bs->keys); - raxSeek(&ri2,"^",NULL,0); - while(raxNext(&ri2)) { - len = ll2string(buf,sizeof(buf),ri2.key_len); - sds proto = sdsnewlen("$",1); - proto = sdscatlen(proto,ri2.key,ri2.key_len); + if (raxSize(bs->keys)) { + /* Create the array reply with the list of keys once, then send + * it to all the clients subscribed to this prefix. */ + char buf[32]; + size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); + sds proto = sdsempty(); + proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); + proto = sdscatlen(proto,"*",1); + proto = sdscatlen(proto,buf,len); proto = sdscatlen(proto,"\r\n",2); - } - raxStop(&ri2); + raxStart(&ri2,bs->keys); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + len = ll2string(buf,sizeof(buf),ri2.key_len); + proto = sdscatlen(proto,"$",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + proto = sdscatlen(proto,ri2.key,ri2.key_len); + proto = sdscatlen(proto,"\r\n",2); + } + raxStop(&ri2); - /* Send this array of keys to every client in the list. */ - raxStart(&ri2,bs->clients); - raxSeek(&ri2,"^",NULL,0); - while(raxNext(&ri2)) { - client *c; - memcpy(&c,ri2.key,sizeof(c)); - sendTrackingMessage(c,proto,sdslen(proto),1); - } - raxStop(&ri2); + /* Send this array of keys to every client in the list. */ + raxStart(&ri2,bs->clients); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + client *c; + memcpy(&c,ri2.key,sizeof(c)); + sendTrackingMessage(c,proto,sdslen(proto),1); + } + raxStop(&ri2); - /* Clean up: we can remove everything from this state, because we - * want to only track the new keys that will be accumulated starting - * from now. */ - sdsfree(proto); - raxFree(bs->clients); + /* Clean up: we can remove everything from this state, because we + * want to only track the new keys that will be accumulated starting + * from now. */ + sdsfree(proto); + } raxFree(bs->keys); - bs->clients = raxNew(); bs->keys = raxNew(); } raxStop(&ri); From 6922ccc0b98156e787b3d2f35daf0299e7844250 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 13 Feb 2020 16:58:07 +0100 Subject: [PATCH 10/13] Tracking: fix sending messages bug + tracking off bug. --- src/networking.c | 42 ++++++++++++++++++++++-------------------- src/tracking.c | 6 ++++++ 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/networking.c b/src/networking.c index 46534253e..1b4b19645 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2250,37 +2250,39 @@ NULL prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1)); prefix[numprefix++] = c->argv[j]; } else { + zfree(prefix); addReply(c,shared.syntaxerr); return; } } - /* Make sure options are compatible among each other and with the - * current state of the client. */ - if (!bcast && numprefix) { - addReplyError(c,"PREFIX option requires BCAST mode to be enabled"); - zfree(prefix); - return; - } - - if (c->flags & CLIENT_TRACKING) { - int oldbcast = !!c->flags & CLIENT_TRACKING_BCAST; - if (oldbcast != bcast) { - } - addReplyError(c, - "You can't switch BCAST mode on/off before disabling " - "tracking for this client, and then re-enabling it with " - "a different mode."); - zfree(prefix); - return; - } - /* Options are ok: enable or disable the tracking for this client. */ if (!strcasecmp(c->argv[2]->ptr,"on")) { + /* Before enabling tracking, make sure options are compatible + * among each other and with the current state of the client. */ + if (!bcast && numprefix) { + addReplyError(c, + "PREFIX option requires BCAST mode to be enabled"); + zfree(prefix); + return; + } + + if (c->flags & CLIENT_TRACKING) { + int oldbcast = !!c->flags & CLIENT_TRACKING_BCAST; + if (oldbcast != bcast) { + addReplyError(c, + "You can't switch BCAST mode on/off before disabling " + "tracking for this client, and then re-enabling it with " + "a different mode."); + zfree(prefix); + return; + } + } enableTracking(c,redir,bcast,prefix,numprefix); } else if (!strcasecmp(c->argv[2]->ptr,"off")) { disableTracking(c); } else { + zfree(prefix); addReply(c,shared.syntaxerr); return; } diff --git a/src/tracking.c b/src/tracking.c index 672b886a3..ef5840863 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -211,6 +211,12 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ addReplyPubsubMessage(c,TrackingChannelName,NULL); + } else { + /* If are here, the client is not using RESP3, nor is + * redirecting to another client. We can't send anything to + * it since RESP2 does not support push messages in the same + * connection. */ + return; } /* Send the "value" part, which is the array of keys. */ From f6e32a832f4aaa92721f4ea1eadc1d3897ba32c2 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 14 Feb 2020 14:17:10 +0100 Subject: [PATCH 11/13] Tracking: fix behavior when switchinig from normal to BCAST. --- src/tracking.c | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/tracking.c b/src/tracking.c index ef5840863..3333472a4 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -269,7 +269,17 @@ void trackingInvalidateKey(robj *keyobj) { uint64_t id; memcpy(&id,ri.key,sizeof(id)); client *c = lookupClientByID(id); - if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; + /* Note that if the client is in BCAST mode, we don't want to + * send invalidation messages that were pending in the case + * previously the client was not in BCAST mode. This can happen if + * TRACKING is enabled normally, and then the client switches to + * BCAST mode. */ + if (c == NULL || + !(c->flags & CLIENT_TRACKING)|| + c->flags & CLIENT_TRACKING_BCAST) + { + continue; + } sendTrackingMessage(c,sdskey,sdslen(sdskey),0); } raxStop(&ri); From 47177c9edc9d6f738f1aacb33bd4e1d6c2c5a697 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 14 Feb 2020 14:27:22 +0100 Subject: [PATCH 12/13] Tracking: fix operators precedence error in bcast check. --- src/networking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 1b4b19645..69350eed1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2268,7 +2268,7 @@ NULL } if (c->flags & CLIENT_TRACKING) { - int oldbcast = !!c->flags & CLIENT_TRACKING_BCAST; + int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST); if (oldbcast != bcast) { addReplyError(c, "You can't switch BCAST mode on/off before disabling " From 8ea7a3ee686a8bddf0b07585922917adcfda91dc Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 14 Feb 2020 14:29:00 +0100 Subject: [PATCH 13/13] Tracking: first set of tests for the feature. --- tests/unit/tracking.tcl | 66 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 tests/unit/tracking.tcl diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl new file mode 100644 index 000000000..2058319f7 --- /dev/null +++ b/tests/unit/tracking.tcl @@ -0,0 +1,66 @@ +start_server {tags {"tracking"}} { + # Create a deferred client we'll use to redirect invalidation + # messages to. + set rd1 [redis_deferring_client] + $rd1 client id + set redir [$rd1 read] + $rd1 subscribe __redis__:invalidate + $rd1 read ; # Consume the SUBSCRIBE reply. + + test {Clients are able to enable tracking and redirect it} { + r CLIENT TRACKING on REDIRECT $redir + } {*OK} + + test {The other connection is able to get invalidations} { + r SET a 1 + r GET a + r INCR a + r INCR b ; # This key should not be notified, since it wasn't fetched. + set keys [lindex [$rd1 read] 2] + assert {[llength $keys] == 1} + assert {[lindex $keys 0] eq {a}} + } + + test {The client is now able to disable tracking} { + # Make sure to add a few more keys in the tracking list + # so that we can check for leaks, as a side effect. + r MGET a b c d e f g + r CLIENT TRACKING off + } + + test {Clients can enable the BCAST mode with the empty prefix} { + r CLIENT TRACKING on BCAST REDIRECT $redir + } {*OK*} + + test {The connection gets invalidation messages about all the keys} { + r MSET a 1 b 2 c 3 + set keys [lsort [lindex [$rd1 read] 2]] + assert {$keys eq {a b c}} + } + + test {Clients can enable the BCAST mode with prefixes} { + r CLIENT TRACKING off + r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX a: PREFIX b: + r MULTI + r INCR a:1 + r INCR a:2 + r INCR b:1 + r INCR b:2 + r EXEC + # Because of the internals, we know we are going to receive + # two separated notifications for the two different prefixes. + set keys1 [lsort [lindex [$rd1 read] 2]] + set keys2 [lsort [lindex [$rd1 read] 2]] + set keys [lsort [list {*}$keys1 {*}$keys2]] + assert {$keys eq {a:1 a:2 b:1 b:2}} + } + + test {Adding prefixes to BCAST mode works} { + r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX c: + r INCR c:1234 + set keys [lsort [lindex [$rd1 read] 2]] + assert {$keys eq {c:1234}} + } + + $rd1 close +}