From ddc4965ac5d4f48eae5c75b191e537b3955c8321 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 7 Feb 2020 18:12:45 +0100 Subject: [PATCH 01/17] Tracking: minor change of names and new INFO field. --- src/config.c | 2 +- src/server.c | 2 ++ src/server.h | 3 ++- src/tracking.c | 8 ++++++-- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/config.c b/src/config.c index a6b374817..b2e5fc12e 100644 --- a/src/config.c +++ b/src/config.c @@ -2160,7 +2160,7 @@ standardConfig configs[] = { createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL), - createIntConfig("tracking-table-max-fill", NULL, MODIFIABLE_CONFIG, 0, 100, server.tracking_table_max_fill, 10, INTEGER_CONFIG, NULL, NULL), /* Default: 10% tracking table max fill. */ + createIntConfig("tracking-table-max-fill", NULL, MODIFIABLE_CONFIG, 0, 100, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 10% tracking table max number of keys tracked. */ createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, server.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */ createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ), createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves), diff --git a/src/server.c b/src/server.c index 2cf8dbf19..910cf5410 100644 --- a/src/server.c +++ b/src/server.c @@ -4221,6 +4221,7 @@ sds genRedisInfoString(const char *section) { "active_defrag_misses:%lld\r\n" "active_defrag_key_hits:%lld\r\n" "active_defrag_key_misses:%lld\r\n" + "tracking_total_keys:%lld\r\n" "tracking_total_items:%lld\r\n", server.stat_numconnections, server.stat_numcommands, @@ -4249,6 +4250,7 @@ sds genRedisInfoString(const char *section) { server.stat_active_defrag_misses, server.stat_active_defrag_key_hits, server.stat_active_defrag_key_misses, + (unsigned long long) trackingGetTotalKeys(), (unsigned long long) trackingGetTotalItems()); } diff --git a/src/server.h b/src/server.h index a30db6b57..3e055a7db 100644 --- a/src/server.h +++ b/src/server.h @@ -1306,7 +1306,7 @@ struct redisServer { list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Client side caching. */ unsigned int tracking_clients; /* # of clients with tracking enabled.*/ - int tracking_table_max_fill; /* Max fill percentage. */ + int tracking_table_max_keys; /* Max number of keys in tracking table. */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -1655,6 +1655,7 @@ void trackingInvalidateKey(robj *keyobj); void trackingInvalidateKeysOnFlush(int dbid); void trackingLimitUsedSlots(void); uint64_t trackingGetTotalItems(void); +uint64_t trackingGetTotalKeys(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/tracking.c b/src/tracking.c index ecb7fbdcc..9c1c9620c 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -223,8 +223,8 @@ void trackingInvalidateKeysOnFlush(int dbid) { void trackingLimitUsedSlots(void) { static unsigned int timeout_counter = 0; if (TrackingTable == NULL) return; - if (server.tracking_table_max_fill == 0) return; /* No limits set. */ - size_t max_keys = server.tracking_table_max_fill; + if (server.tracking_table_max_keys == 0) return; /* No limits set. */ + size_t max_keys = server.tracking_table_max_keys; if (raxSize(TrackingTable) <= max_keys) { timeout_counter = 0; return; /* Limit not reached. */ @@ -264,3 +264,7 @@ void trackingLimitUsedSlots(void) { uint64_t trackingGetTotalItems(void) { return TrackingTableTotalItems; } + +uint64_t trackingGetTotalKeys(void) { + return raxSize(TrackingTable); +} From a34422aee79566830c8ddfddcb6d0cd24ad0be89 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 10 Feb 2020 13:42:18 +0100 Subject: [PATCH 02/17] Tracking: always reply with an array of keys. --- src/pubsub.c | 8 ++++++-- src/tracking.c | 5 ++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/pubsub.c b/src/pubsub.c index 994dd9734..5cb4298e0 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -35,7 +35,11 @@ int clientSubscriptionsCount(client *c); * Pubsub client replies API *----------------------------------------------------------------------------*/ -/* Send a pubsub message of type "message" to the client. */ +/* Send a pubsub message of type "message" to the client. + * Normally 'msg' is a Redis object containing the string to send as + * message. However if the caller sets 'msg' as NULL, it will be able + * to send a special message (for instance an Array type) by using the + * addReply*() API family. */ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); @@ -43,7 +47,7 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { addReplyPushLen(c,3); addReply(c,shared.messagebulk); addReplyBulk(c,channel); - addReplyBulk(c,msg); + if (msg) addReplyBulk(c,msg); } /* Send a pubsub message of type "pmessage" to the client. The difference diff --git a/src/tracking.c b/src/tracking.c index 9c1c9620c..3122563ac 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -132,13 +132,16 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen) { if (c->resp > 2) { addReplyPushLen(c,2); addReplyBulkCBuffer(c,"invalidate",10); + addReplyArrayLen(c,1); addReplyBulkCBuffer(c,keyname,keylen); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ robj keyobj; initStaticStringObject(keyobj,keyname); - addReplyPubsubMessage(c,TrackingChannelName,&keyobj); + addReplyPubsubMessage(c,TrackingChannelName,NULL); + addReplyArrayLen(c,1); + addReplyBulk(c,&keyobj); serverAssert(keyobj.refcount == 1); } } From ed36f3c76fdb4010cc28323fe5f4a87055cf89ea Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 10 Feb 2020 17:18:11 +0100 Subject: [PATCH 03/17] Tracking: BCAST: parsing of the options + skeleton. --- src/networking.c | 63 +++++++++++++++++++++++++++++++++++++----------- src/server.c | 5 +++- src/server.h | 8 +++++- src/tracking.c | 16 +++++++++--- 4 files changed, 73 insertions(+), 19 deletions(-) diff --git a/src/networking.c b/src/networking.c index 2b0f7464a..690a134f1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -154,6 +154,7 @@ client *createClient(connection *conn) { c->peerid = NULL; c->client_list_node = NULL; c->client_tracking_redirection = 0; + c->client_tracking_prefix_nodes = NULL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; c->auth_module = NULL; @@ -2219,38 +2220,72 @@ NULL UNIT_MILLISECONDS) != C_OK) return; pauseClients(duration); addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && - (c->argc == 3 || c->argc == 5)) - { - /* CLIENT TRACKING (on|off) [REDIRECT ] */ + } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) { + /* CLIENT TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] + * [PREFIX second] ... */ long long redir = 0; + int bcast = 0; + robj **prefix; + size_t numprefix = 0; - /* Parse the redirection option: we'll require the client with - * the specified ID to exist right now, even if it is possible - * it will get disconnected later. */ - if (c->argc == 5) { - if (strcasecmp(c->argv[3]->ptr,"redirect") != 0) { - addReply(c,shared.syntaxerr); - return; - } else { - if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) != + /* Parse the options. */ + if (for int j = 3; j < argc; j++) { + int moreargs = (c->argc-1) - j; + + if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) { + j++; + if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) != C_OK) return; + /* We will require the client with the specified ID to exist + * right now, even if it is possible that it gets disconnected + * later. Still a valid sanity check. */ if (lookupClientByID(redir) == NULL) { addReplyError(c,"The client ID you want redirect to " "does not exist"); return; } + } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) { + bcast++; + } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && morearg) { + j++; + prefix = zrealloc(sizeof(robj*)*(numprefix+1)); + prefix[numprefix++] = argv[j]; + } else { + addReply(c,shared.syntaxerr); + return; } } + /* Make sure options are compatible among each other and with the + * current state of the client. */ + if (!bcast && numprefix) { + addReplyError("PREFIX option requires BCAST mode to be enabled"); + zfree(prefix); + return; + } + + if (client->flags & CLIENT_TRACKING) { + int oldbcast = !!client->flags & CLIENT_TRACKING_BCAST; + if (oldbcast != bcast) { + } + addReplyError( + "You can't switch BCAST mode on/off before disabling " + "tracking for this client, and then re-enabling it with " + "a different mode."); + zfree(prefix); + return; + } + + /* Options are ok: enable or disable the tracking for this client. */ if (!strcasecmp(c->argv[2]->ptr,"on")) { - enableTracking(c,redir); + enableTracking(c,redir,bcast,prefix,numprefix); } else if (!strcasecmp(c->argv[2]->ptr,"off")) { disableTracking(c); } else { addReply(c,shared.syntaxerr); return; } + zfree(prefix); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) { /* CLIENT GETREDIR */ diff --git a/src/server.c b/src/server.c index 910cf5410..1001fa4f7 100644 --- a/src/server.c +++ b/src/server.c @@ -3310,8 +3310,11 @@ void call(client *c, int flags) { if (c->cmd->flags & CMD_READONLY) { client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ? server.lua_caller : c; - if (caller->flags & CLIENT_TRACKING) + if (caller->flags & CLIENT_TRACKING && + !(caller->flags & CLIENT_TRACKING_BCAST)) + { trackingRememberKeys(caller); + } } server.fixed_time_expire--; diff --git a/src/server.h b/src/server.h index 3e055a7db..d3ca0d01b 100644 --- a/src/server.h +++ b/src/server.h @@ -247,6 +247,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to perform client side caching. */ #define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */ +#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -822,6 +823,11 @@ typedef struct client { * invalidation messages for keys fetched by this client will be send to * the specified client ID. */ uint64_t client_tracking_redirection; + list *client_tracking_prefix_nodes; /* This list contains listNode pointers + to the nodes we have in every list + of clients in the tracking bcast + table. This way we can remove our + client in O(1) for each list. */ /* Response buffer */ int bufpos; @@ -1648,7 +1654,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...); #endif /* Client side caching (tracking mode) */ -void enableTracking(client *c, uint64_t redirect_to); +void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix); void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); diff --git a/src/tracking.c b/src/tracking.c index 3122563ac..413b21328 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -42,6 +42,7 @@ * Clients will normally take frequently requested objects in memory, removing * them when invalidation messages are received. */ rax *TrackingTable = NULL; +rax *PrefixTable = NULL; uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across the whole tracking table. This givesn an hint about the total memory we @@ -68,16 +69,25 @@ void disableTracking(client *c) { * eventually get freed, we'll send a message to the original client to * inform it of the condition. Multiple clients can redirect the invalidation * messages to the same client ID. */ -void enableTracking(client *c, uint64_t redirect_to) { - if (c->flags & CLIENT_TRACKING) return; +void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) { c->flags |= CLIENT_TRACKING; c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR; c->client_tracking_redirection = redirect_to; - server.tracking_clients++; + if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++; if (TrackingTable == NULL) { TrackingTable = raxNew(); + PrefixTable = raxNew(); TrackingChannelName = createStringObject("__redis__:invalidate",20); } + + if (bcast) { + c->flags |= CLIENT_TRACKING_BCAST; + if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); + for (int j = 0; j < numprefix; j++) { + sds sdsprefix = prefix[j]->ptr; + enableBcastTrackingForPrefix(c,sdsprefix,sdslen(prefix)); + } + } } /* This function is called after the excution of a readonly command in the From abdaebc6c46d0a1532d04ce5e1f48ce910238375 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 11 Feb 2020 17:26:27 +0100 Subject: [PATCH 04/17] Tracking: BCAST: registration in the prefix table. --- src/networking.c | 21 +++++++++--------- src/server.h | 9 +++----- src/tracking.c | 57 +++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 67 insertions(+), 20 deletions(-) diff --git a/src/networking.c b/src/networking.c index 690a134f1..344b76260 100644 --- a/src/networking.c +++ b/src/networking.c @@ -154,7 +154,7 @@ client *createClient(connection *conn) { c->peerid = NULL; c->client_list_node = NULL; c->client_tracking_redirection = 0; - c->client_tracking_prefix_nodes = NULL; + c->client_tracking_prefixes = NULL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; c->auth_module = NULL; @@ -2028,7 +2028,6 @@ int clientSetNameOrReply(client *c, robj *name) { void clientCommand(client *c) { listNode *ln; listIter li; - client *client; if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { @@ -2142,7 +2141,7 @@ NULL /* Iterate clients killing all the matching clients. */ listRewind(server.clients,&li); while ((ln = listNext(&li)) != NULL) { - client = listNodeValue(ln); + client *client = listNodeValue(ln); if (addr && strcmp(getClientPeerId(client),addr) != 0) continue; if (type != -1 && getClientType(client) != type) continue; if (id != 0 && client->id != id) continue; @@ -2229,7 +2228,7 @@ NULL size_t numprefix = 0; /* Parse the options. */ - if (for int j = 3; j < argc; j++) { + for (int j = 3; j < c->argc; j++) { int moreargs = (c->argc-1) - j; if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) { @@ -2246,10 +2245,10 @@ NULL } } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) { bcast++; - } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && morearg) { + } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) { j++; - prefix = zrealloc(sizeof(robj*)*(numprefix+1)); - prefix[numprefix++] = argv[j]; + prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1)); + prefix[numprefix++] = c->argv[j]; } else { addReply(c,shared.syntaxerr); return; @@ -2259,16 +2258,16 @@ NULL /* Make sure options are compatible among each other and with the * current state of the client. */ if (!bcast && numprefix) { - addReplyError("PREFIX option requires BCAST mode to be enabled"); + addReplyError(c,"PREFIX option requires BCAST mode to be enabled"); zfree(prefix); return; } - if (client->flags & CLIENT_TRACKING) { - int oldbcast = !!client->flags & CLIENT_TRACKING_BCAST; + if (c->flags & CLIENT_TRACKING) { + int oldbcast = !!c->flags & CLIENT_TRACKING_BCAST; if (oldbcast != bcast) { } - addReplyError( + addReplyError(c, "You can't switch BCAST mode on/off before disabling " "tracking for this client, and then re-enabling it with " "a different mode."); diff --git a/src/server.h b/src/server.h index d3ca0d01b..725c3cbc8 100644 --- a/src/server.h +++ b/src/server.h @@ -823,12 +823,9 @@ typedef struct client { * invalidation messages for keys fetched by this client will be send to * the specified client ID. */ uint64_t client_tracking_redirection; - list *client_tracking_prefix_nodes; /* This list contains listNode pointers - to the nodes we have in every list - of clients in the tracking bcast - table. This way we can remove our - client in O(1) for each list. */ - + rax *client_tracking_prefixes; /* A dictionary of prefixes we are already + subscribed to in BCAST mode, in the + context of client side caching. */ /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; diff --git a/src/tracking.c b/src/tracking.c index 413b21328..9f46275a4 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -49,6 +49,15 @@ uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across are using server side for CSC. */ robj *TrackingChannelName; +/* This is the structure that we have as value of the PrefixTable, and + * represents the list of keys modified, and the list of clients that need + * to be notified, for a given prefix. */ +typedef struct bcastState { + rax *keys; /* Keys modified in the current event loop cycle. */ + rax *clients; /* Clients subscribed to the notification events for this + prefix. */ +} bcastState; + /* Remove the tracking state from the client 'c'. Note that there is not much * to do for us here, if not to decrement the counter of the clients in * tracking mode, because we just store the ID of the client in the tracking @@ -56,9 +65,51 @@ robj *TrackingChannelName; * client with many entries in the table is removed, it would cost a lot of * time to do the cleanup. */ void disableTracking(client *c) { + /* If this client is in broadcasting mode, we need to unsubscribe it + * from all the prefixes it is registered to. */ + if (c->flags & CLIENT_TRACKING_BCAST) { + raxIterator ri; + raxStart(&ri,c->client_tracking_prefixes); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len); + serverAssert(bs != raxNotFound); + raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL); + /* Was it the last client? Remove the prefix from the + * table. */ + if (raxSize(bs->clients) == 0) { + raxFree(bs->clients); + raxFree(bs->keys); + zfree(bs); + raxRemove(PrefixTable,ri.key,ri.key_len,NULL); + } + } + raxStop(&ri); + } + + /* Clear flags and adjust the count. */ if (c->flags & CLIENT_TRACKING) { server.tracking_clients--; - c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR); + c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| + CLIENT_TRACKING_BCAST); + } +} + +/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is + * already registered for the specified prefix, no operation is performed. */ +void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { + bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix)); + /* If this is the first client subscribing to such prefix, create + * the prefix in the table. */ + if (bs == raxNotFound) { + bs = zmalloc(sizeof(*bs)); + bs->keys = raxNew(); + bs->clients = raxNew(); + raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); + } + if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { + raxInsert(c->client_tracking_prefixes, + (unsigned char*)prefix,plen,NULL,NULL); } } @@ -83,9 +134,9 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s if (bcast) { c->flags |= CLIENT_TRACKING_BCAST; if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); - for (int j = 0; j < numprefix; j++) { + for (size_t j = 0; j < numprefix; j++) { sds sdsprefix = prefix[j]->ptr; - enableBcastTrackingForPrefix(c,sdsprefix,sdslen(prefix)); + enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix)); } } } From 261fabeaa421f48e9ef437f62a56fd1651d904a4 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 11 Feb 2020 18:11:59 +0100 Subject: [PATCH 05/17] Tracking: BCAST: broadcasting of keys in prefixes implemented. --- src/server.h | 1 + src/tracking.c | 104 ++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 95 insertions(+), 10 deletions(-) diff --git a/src/server.h b/src/server.h index 725c3cbc8..439bbc393 100644 --- a/src/server.h +++ b/src/server.h @@ -1659,6 +1659,7 @@ void trackingInvalidateKeysOnFlush(int dbid); void trackingLimitUsedSlots(void); uint64_t trackingGetTotalItems(void); uint64_t trackingGetTotalKeys(void); +void trackingBroadcastInvalidationMessages(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/tracking.c b/src/tracking.c index 9f46275a4..345c5f1ad 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -167,7 +167,17 @@ void trackingRememberKeys(client *c) { getKeysFreeResult(keys); } -void sendTrackingMessage(client *c, char *keyname, size_t keylen) { +/* Given a key name, this function sends an invalidation message in the + * proper channel (depending on RESP version: PubSub or Push message) and + * to the proper client (in case fo redirection), in the context of the + * client 'c' with tracking enabled. + * + * In case the 'proto' argument is non zero, the function will assume that + * 'keyname' points to a buffer of 'keylen' bytes already expressed in the + * form of Redis RESP protocol, representing an array of keys to send + * to the client as value of the invalidation. This is used in BCAST mode + * in order to optimized the implementation to use less CPU time. */ +void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { int using_redirection = 0; if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); @@ -193,18 +203,38 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen) { if (c->resp > 2) { addReplyPushLen(c,2); addReplyBulkCBuffer(c,"invalidate",10); - addReplyArrayLen(c,1); - addReplyBulkCBuffer(c,keyname,keylen); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ - robj keyobj; - initStaticStringObject(keyobj,keyname); addReplyPubsubMessage(c,TrackingChannelName,NULL); - addReplyArrayLen(c,1); - addReplyBulk(c,&keyobj); - serverAssert(keyobj.refcount == 1); } + + /* Send the "value" part, which is the array of keys. */ + if (proto) { + addReplyProto(c,keyname,keylen); + } else { + addReplyArrayLen(c,1); + addReplyBulkCBuffer(c,keyname,keylen); + } +} + +/* This function is called when a key is modified in Redis and in the case + * we have at least one client with the BCAST mode enabled. + * Its goal is to set the key in the right broadcast state if the key + * matches one or more prefixes in the prefix table. Later when we + * return to the event loop, we'll send invalidation messages to the + * clients subscribed to each prefix. */ +void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { + raxIterator ri; + raxStart(&ri,PrefixTable); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (keylen > ri.key_len) continue; + if (memcmp(ri.key,keyname,ri.key_len) != 0) continue; + bcastState *bs = ri.data; + raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); + } + raxStop(&ri); } /* This function is called from signalModifiedKey() or other places in Redis @@ -214,6 +244,10 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen) { void trackingInvalidateKey(robj *keyobj) { if (TrackingTable == NULL) return; sds sdskey = keyobj->ptr; + + if (raxSize(PrefixTable) > 0) + trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey)); + rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); if (ids == raxNotFound) return;; @@ -225,7 +259,7 @@ void trackingInvalidateKey(robj *keyobj) { memcpy(&id,ri.key,sizeof(id)); client *c = lookupClientByID(id); if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; - sendTrackingMessage(c,sdskey,sdslen(sdskey)); + sendTrackingMessage(c,sdskey,sdslen(sdskey),0); } raxStop(&ri); @@ -262,7 +296,7 @@ void trackingInvalidateKeysOnFlush(int dbid) { while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); if (c->flags & CLIENT_TRACKING) { - sendTrackingMessage(c,"",1); + sendTrackingMessage(c,"",1,0); } } } @@ -323,6 +357,56 @@ void trackingLimitUsedSlots(void) { timeout_counter++; } +/* This function will run the prefixes of clients in BCAST mode and + * keys that were modified about each prefix, and will send the + * notifications to each client in each prefix. */ +void trackingBroadcastInvalidationMessages(void) { + raxIterator ri, ri2; + raxStart(&ri,PrefixTable); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + bcastState *bs = ri.data; + /* Create the array reply with the list of keys once, then send + * it to all the clients subscribed to this prefix. */ + char buf[32]; + size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); + sds proto = sdsempty(); + proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); + proto = sdscatlen(proto,"*",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + raxStart(&ri2,bs->keys); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + len = ll2string(buf,sizeof(buf),ri2.key_len); + sds proto = sdsnewlen("$",1); + proto = sdscatlen(proto,ri2.key,ri2.key_len); + proto = sdscatlen(proto,"\r\n",2); + } + raxStop(&ri2); + + /* Send this array of keys to every client in the list. */ + raxStart(&ri2,bs->clients); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + client *c; + memcpy(&c,ri2.key,sizeof(c)); + sendTrackingMessage(c,proto,sdslen(proto),1); + } + raxStop(&ri2); + + /* Clean up: we can remove everything from this state, because we + * want to only track the new keys that will be accumulated starting + * from now. */ + sdsfree(proto); + raxFree(bs->clients); + raxFree(bs->keys); + bs->clients = raxNew(); + bs->keys = raxNew(); + } + raxStop(&ri); +} + /* This is just used in order to access the amount of used slots in the * tracking table. */ uint64_t trackingGetTotalItems(void) { From 7841d0234fd09b3b3c9173147a52fce534a52ff4 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 12 Feb 2020 19:22:04 +0100 Subject: [PATCH 06/17] Tracking: BCAST: basic feature now works. --- src/networking.c | 2 +- src/server.c | 4 +++ src/tracking.c | 89 +++++++++++++++++++++++++++--------------------- 3 files changed, 55 insertions(+), 40 deletions(-) diff --git a/src/networking.c b/src/networking.c index 344b76260..46534253e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2224,7 +2224,7 @@ NULL * [PREFIX second] ... */ long long redir = 0; int bcast = 0; - robj **prefix; + robj **prefix = NULL; size_t numprefix = 0; /* Parse the options. */ diff --git a/src/server.c b/src/server.c index 1001fa4f7..22c81070c 100644 --- a/src/server.c +++ b/src/server.c @@ -2124,6 +2124,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { if (listLength(server.unblocked_clients)) processUnblockedClients(); + /* Send the invalidation messages to clients participating to the + * client side caching protocol in broadcasting (BCAST) mode. */ + trackingBroadcastInvalidationMessages(); + /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); diff --git a/src/tracking.c b/src/tracking.c index 345c5f1ad..672b886a3 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -85,6 +85,8 @@ void disableTracking(client *c) { } } raxStop(&ri); + raxFree(c->client_tracking_prefixes); + c->client_tracking_prefixes = NULL; } /* Clear flags and adjust the count. */ @@ -108,6 +110,8 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); } if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { + if (c->client_tracking_prefixes == NULL) + c->client_tracking_prefixes = raxNew(); raxInsert(c->client_tracking_prefixes, (unsigned char*)prefix,plen,NULL,NULL); } @@ -121,10 +125,10 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { * inform it of the condition. Multiple clients can redirect the invalidation * messages to the same client ID. */ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) { - c->flags |= CLIENT_TRACKING; - c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR; - c->client_tracking_redirection = redirect_to; if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++; + c->flags |= CLIENT_TRACKING; + c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST); + c->client_tracking_redirection = redirect_to; if (TrackingTable == NULL) { TrackingTable = raxNew(); PrefixTable = raxNew(); @@ -229,10 +233,11 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { raxStart(&ri,PrefixTable); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { - if (keylen > ri.key_len) continue; - if (memcmp(ri.key,keyname,ri.key_len) != 0) continue; - bcastState *bs = ri.data; - raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); + if (ri.key_len > keylen) continue; + if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0) + continue; + bcastState *bs = ri.data; + raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); } raxStop(&ri); } @@ -362,46 +367,52 @@ void trackingLimitUsedSlots(void) { * notifications to each client in each prefix. */ void trackingBroadcastInvalidationMessages(void) { raxIterator ri, ri2; + + /* Return ASAP if there is nothing to do here. */ + if (TrackingTable == NULL || !server.tracking_clients) return; + raxStart(&ri,PrefixTable); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { bcastState *bs = ri.data; - /* Create the array reply with the list of keys once, then send - * it to all the clients subscribed to this prefix. */ - char buf[32]; - size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); - sds proto = sdsempty(); - proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); - proto = sdscatlen(proto,"*",1); - proto = sdscatlen(proto,buf,len); - proto = sdscatlen(proto,"\r\n",2); - raxStart(&ri2,bs->keys); - raxSeek(&ri2,"^",NULL,0); - while(raxNext(&ri2)) { - len = ll2string(buf,sizeof(buf),ri2.key_len); - sds proto = sdsnewlen("$",1); - proto = sdscatlen(proto,ri2.key,ri2.key_len); + if (raxSize(bs->keys)) { + /* Create the array reply with the list of keys once, then send + * it to all the clients subscribed to this prefix. */ + char buf[32]; + size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); + sds proto = sdsempty(); + proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); + proto = sdscatlen(proto,"*",1); + proto = sdscatlen(proto,buf,len); proto = sdscatlen(proto,"\r\n",2); - } - raxStop(&ri2); + raxStart(&ri2,bs->keys); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + len = ll2string(buf,sizeof(buf),ri2.key_len); + proto = sdscatlen(proto,"$",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + proto = sdscatlen(proto,ri2.key,ri2.key_len); + proto = sdscatlen(proto,"\r\n",2); + } + raxStop(&ri2); - /* Send this array of keys to every client in the list. */ - raxStart(&ri2,bs->clients); - raxSeek(&ri2,"^",NULL,0); - while(raxNext(&ri2)) { - client *c; - memcpy(&c,ri2.key,sizeof(c)); - sendTrackingMessage(c,proto,sdslen(proto),1); - } - raxStop(&ri2); + /* Send this array of keys to every client in the list. */ + raxStart(&ri2,bs->clients); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + client *c; + memcpy(&c,ri2.key,sizeof(c)); + sendTrackingMessage(c,proto,sdslen(proto),1); + } + raxStop(&ri2); - /* Clean up: we can remove everything from this state, because we - * want to only track the new keys that will be accumulated starting - * from now. */ - sdsfree(proto); - raxFree(bs->clients); + /* Clean up: we can remove everything from this state, because we + * want to only track the new keys that will be accumulated starting + * from now. */ + sdsfree(proto); + } raxFree(bs->keys); - bs->clients = raxNew(); bs->keys = raxNew(); } raxStop(&ri); From d5517def76e3ee9a76c70f0f39a7b0c14039cc46 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 13 Feb 2020 16:58:07 +0100 Subject: [PATCH 07/17] Tracking: fix sending messages bug + tracking off bug. --- src/networking.c | 42 ++++++++++++++++++++++-------------------- src/tracking.c | 6 ++++++ 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/networking.c b/src/networking.c index 46534253e..1b4b19645 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2250,37 +2250,39 @@ NULL prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1)); prefix[numprefix++] = c->argv[j]; } else { + zfree(prefix); addReply(c,shared.syntaxerr); return; } } - /* Make sure options are compatible among each other and with the - * current state of the client. */ - if (!bcast && numprefix) { - addReplyError(c,"PREFIX option requires BCAST mode to be enabled"); - zfree(prefix); - return; - } - - if (c->flags & CLIENT_TRACKING) { - int oldbcast = !!c->flags & CLIENT_TRACKING_BCAST; - if (oldbcast != bcast) { - } - addReplyError(c, - "You can't switch BCAST mode on/off before disabling " - "tracking for this client, and then re-enabling it with " - "a different mode."); - zfree(prefix); - return; - } - /* Options are ok: enable or disable the tracking for this client. */ if (!strcasecmp(c->argv[2]->ptr,"on")) { + /* Before enabling tracking, make sure options are compatible + * among each other and with the current state of the client. */ + if (!bcast && numprefix) { + addReplyError(c, + "PREFIX option requires BCAST mode to be enabled"); + zfree(prefix); + return; + } + + if (c->flags & CLIENT_TRACKING) { + int oldbcast = !!c->flags & CLIENT_TRACKING_BCAST; + if (oldbcast != bcast) { + addReplyError(c, + "You can't switch BCAST mode on/off before disabling " + "tracking for this client, and then re-enabling it with " + "a different mode."); + zfree(prefix); + return; + } + } enableTracking(c,redir,bcast,prefix,numprefix); } else if (!strcasecmp(c->argv[2]->ptr,"off")) { disableTracking(c); } else { + zfree(prefix); addReply(c,shared.syntaxerr); return; } diff --git a/src/tracking.c b/src/tracking.c index 672b886a3..ef5840863 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -211,6 +211,12 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ addReplyPubsubMessage(c,TrackingChannelName,NULL); + } else { + /* If are here, the client is not using RESP3, nor is + * redirecting to another client. We can't send anything to + * it since RESP2 does not support push messages in the same + * connection. */ + return; } /* Send the "value" part, which is the array of keys. */ From 2fdce45e9d56245840b4383964ca4a41ef1a3f49 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 14 Feb 2020 14:17:10 +0100 Subject: [PATCH 08/17] Tracking: fix behavior when switchinig from normal to BCAST. --- src/tracking.c | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/tracking.c b/src/tracking.c index ef5840863..3333472a4 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -269,7 +269,17 @@ void trackingInvalidateKey(robj *keyobj) { uint64_t id; memcpy(&id,ri.key,sizeof(id)); client *c = lookupClientByID(id); - if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; + /* Note that if the client is in BCAST mode, we don't want to + * send invalidation messages that were pending in the case + * previously the client was not in BCAST mode. This can happen if + * TRACKING is enabled normally, and then the client switches to + * BCAST mode. */ + if (c == NULL || + !(c->flags & CLIENT_TRACKING)|| + c->flags & CLIENT_TRACKING_BCAST) + { + continue; + } sendTrackingMessage(c,sdskey,sdslen(sdskey),0); } raxStop(&ri); From 5173b1042ee1dd8b22d06161d2f59b45363a6196 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 14 Feb 2020 14:27:22 +0100 Subject: [PATCH 09/17] Tracking: fix operators precedence error in bcast check. --- src/networking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 1b4b19645..69350eed1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2268,7 +2268,7 @@ NULL } if (c->flags & CLIENT_TRACKING) { - int oldbcast = !!c->flags & CLIENT_TRACKING_BCAST; + int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST); if (oldbcast != bcast) { addReplyError(c, "You can't switch BCAST mode on/off before disabling " From aa556cb7ddf4bba604c6e4e894c68261c49689ed Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 14 Feb 2020 14:29:00 +0100 Subject: [PATCH 10/17] Tracking: first set of tests for the feature. --- tests/unit/tracking.tcl | 66 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 tests/unit/tracking.tcl diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl new file mode 100644 index 000000000..2058319f7 --- /dev/null +++ b/tests/unit/tracking.tcl @@ -0,0 +1,66 @@ +start_server {tags {"tracking"}} { + # Create a deferred client we'll use to redirect invalidation + # messages to. + set rd1 [redis_deferring_client] + $rd1 client id + set redir [$rd1 read] + $rd1 subscribe __redis__:invalidate + $rd1 read ; # Consume the SUBSCRIBE reply. + + test {Clients are able to enable tracking and redirect it} { + r CLIENT TRACKING on REDIRECT $redir + } {*OK} + + test {The other connection is able to get invalidations} { + r SET a 1 + r GET a + r INCR a + r INCR b ; # This key should not be notified, since it wasn't fetched. + set keys [lindex [$rd1 read] 2] + assert {[llength $keys] == 1} + assert {[lindex $keys 0] eq {a}} + } + + test {The client is now able to disable tracking} { + # Make sure to add a few more keys in the tracking list + # so that we can check for leaks, as a side effect. + r MGET a b c d e f g + r CLIENT TRACKING off + } + + test {Clients can enable the BCAST mode with the empty prefix} { + r CLIENT TRACKING on BCAST REDIRECT $redir + } {*OK*} + + test {The connection gets invalidation messages about all the keys} { + r MSET a 1 b 2 c 3 + set keys [lsort [lindex [$rd1 read] 2]] + assert {$keys eq {a b c}} + } + + test {Clients can enable the BCAST mode with prefixes} { + r CLIENT TRACKING off + r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX a: PREFIX b: + r MULTI + r INCR a:1 + r INCR a:2 + r INCR b:1 + r INCR b:2 + r EXEC + # Because of the internals, we know we are going to receive + # two separated notifications for the two different prefixes. + set keys1 [lsort [lindex [$rd1 read] 2]] + set keys2 [lsort [lindex [$rd1 read] 2]] + set keys [lsort [list {*}$keys1 {*}$keys2]] + assert {$keys eq {a:1 a:2 b:1 b:2}} + } + + test {Adding prefixes to BCAST mode works} { + r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX c: + r INCR c:1234 + set keys [lsort [lindex [$rd1 read] 2]] + assert {$keys eq {c:1234}} + } + + $rd1 close +} From 1c2d250dd210cf29bef8e1d6ba0870a09c01b107 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 14 Feb 2020 18:22:25 +0100 Subject: [PATCH 11/17] Signal key as modified when expired on-access. This fixes WATCH and client side caching with keys expiring because of a synchronous access and not because of background expiring. --- src/db.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/db.c b/src/db.c index c80524e94..8a0242d9e 100644 --- a/src/db.c +++ b/src/db.c @@ -1296,8 +1296,10 @@ int expireIfNeeded(redisDb *db, robj *key) { propagateExpire(db,key,server.lazyfree_lazy_expire); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",key,db->id); - return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : - dbSyncDelete(db,key); + int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : + dbSyncDelete(db,key); + if (retval) signalModifiedKey(db,key); + return retval; } /* ----------------------------------------------------------------------------- From 3c2b67dc2ab7ab5badef9cc4d229d448d9298b79 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Fri, 14 Feb 2020 17:13:58 +0200 Subject: [PATCH 12/17] Fixes segfault on calling trackingGetTotalKeys ... with CSC disabled --- src/tracking.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tracking.c b/src/tracking.c index 3333472a4..7179a54f8 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -441,5 +441,6 @@ uint64_t trackingGetTotalItems(void) { } uint64_t trackingGetTotalKeys(void) { + if (TrackingTable == NULL) return 0; return raxSize(TrackingTable); } From a227fd7ccae810bdf4f86de644e38bbbe9e71570 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 19 Feb 2020 19:00:29 +0100 Subject: [PATCH 13/17] Tracking: fix max-keys configuration directive. --- src/config.c | 2 +- src/server.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/config.c b/src/config.c index b2e5fc12e..d55d1f8b5 100644 --- a/src/config.c +++ b/src/config.c @@ -2160,7 +2160,6 @@ standardConfig configs[] = { createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL), - createIntConfig("tracking-table-max-fill", NULL, MODIFIABLE_CONFIG, 0, 100, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 10% tracking table max number of keys tracked. */ createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, server.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */ createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ), createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves), @@ -2195,6 +2194,7 @@ standardConfig configs[] = { createSizeTConfig("stream-node-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.stream_node_max_bytes, 4096, MEMORY_CONFIG, NULL, NULL), createSizeTConfig("zset-max-ziplist-value", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.zset_max_ziplist_value, 64, MEMORY_CONFIG, NULL, NULL), createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL), + createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */ /* Other configs */ createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */ diff --git a/src/server.h b/src/server.h index 439bbc393..bb40ca4e1 100644 --- a/src/server.h +++ b/src/server.h @@ -1309,7 +1309,7 @@ struct redisServer { list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Client side caching. */ unsigned int tracking_clients; /* # of clients with tracking enabled.*/ - int tracking_table_max_keys; /* Max number of keys in tracking table. */ + size_t tracking_table_max_keys; /* Max number of keys in tracking table. */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; From 54237ff024fdde91baafa884e268d77660976a89 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 21 Feb 2020 13:48:43 +0100 Subject: [PATCH 14/17] Test is more complex now, increase default timeout. --- tests/test_helper.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index cb7e4e328..b266bc56d 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -87,7 +87,7 @@ set ::file ""; # If set, runs only the tests in this comma separated list set ::curfile ""; # Hold the filename of the current suite set ::accurate 0; # If true runs fuzz tests with more iterations set ::force_failure 0 -set ::timeout 600; # 10 minutes without progresses will quit the test. +set ::timeout 1200; # 20 minutes without progresses will quit the test. set ::last_progress [clock seconds] set ::active_servers {} ; # Pids of active Redis instances. set ::dont_clean 0 From 253fcf98f805c0262d3f214560cb55a0de254eeb Mon Sep 17 00:00:00 2001 From: hayashier Date: Tue, 31 Dec 2019 17:46:48 +0900 Subject: [PATCH 15/17] fix typo from fss to rss --- src/object.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/object.c b/src/object.c index cc6b218a0..11e335afc 100644 --- a/src/object.c +++ b/src/object.c @@ -1102,13 +1102,13 @@ sds getMemoryDoctorReport(void) { num_reports++; } - /* Allocator fss is higher than 1.1 and 10MB ? */ + /* Allocator rss is higher than 1.1 and 10MB ? */ if (mh->allocator_rss > 1.1 && mh->allocator_rss_bytes > 10<<20) { high_alloc_rss = 1; num_reports++; } - /* Non-Allocator fss is higher than 1.1 and 10MB ? */ + /* Non-Allocator rss is higher than 1.1 and 10MB ? */ if (mh->rss_extra > 1.1 && mh->rss_extra_bytes > 10<<20) { high_proc_rss = 1; num_reports++; From d1be7aaa18973d7d84d9cba08d0be02a9c1ddee4 Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Wed, 19 Feb 2020 08:24:20 +0530 Subject: [PATCH 16/17] XGROUP DESTROY should unblock XREADGROUP with -NOGROUP --- src/t_stream.c | 2 ++ tests/unit/type/stream-cgroups.tcl | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/src/t_stream.c b/src/t_stream.c index 0f0f97a1d..900fa3a17 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1850,6 +1850,8 @@ NULL server.dirty++; notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy", c->argv[2],c->db->id); + /* We want to unblock any XREADGROUP consumers with -NOGROUP. */ + signalKeyAsReady(c->db,c->argv[2]); } else { addReply(c,shared.czero); } diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index a59e168ef..072ed14d6 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -161,6 +161,15 @@ start_server { assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}} } + test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 100 STREAMS mystream ">" + r XGROUP DESTROY mystream mygroup + assert_error "*NOGROUP*" {$rd read} + } + test {XCLAIM can claim PEL items from another consumer} { # Add 3 items into the stream, and create a consumer group r del mystream From 79e8b17d7b44c793d8b22668b8583a297ee1b387 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 18 Feb 2020 16:19:52 +0200 Subject: [PATCH 17/17] Defrag big lists in portions to avoid latency and freeze When active defrag kicks in and finds a big list, it will create a bookmark to a node so that it is able to resume iteration from that node later. The quicklist manages that bookmark, and updates it in case that node is deleted. This will increase memory usage only on lists of over 1000 (see active-defrag-max-scan-fields) quicklist nodes (1000 ziplists, not 1000 items) by 16 bytes. In 32 bit build, this change reduces the maximum effective config of list-compress-depth and list-max-ziplist-size (from 32767 to 8191) --- src/defrag.c | 96 +++++++++++++++------- src/quicklist.c | 150 ++++++++++++++++++++++++++++++++++- src/quicklist.h | 46 ++++++++++- tests/unit/memefficiency.tcl | 92 +++++++++++++++++++++ 4 files changed, 350 insertions(+), 34 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index 04e57955b..e729297a5 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -5,8 +5,8 @@ * We do that by scanning the keyspace and for each pointer we have, we can try to * ask the allocator if moving it to a new address will help reduce fragmentation. * - * Copyright (c) 2017, Oran Agra - * Copyright (c) 2017, Redis Labs, Inc + * Copyright (c) 2020, Oran Agra + * Copyright (c) 2020, Redis Labs, Inc * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -408,25 +408,32 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd return NULL; } -long activeDefragQuickListNodes(quicklist *ql) { - quicklistNode *node = ql->head, *newnode; +long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { + quicklistNode *newnode, *node = *node_ref; long defragged = 0; unsigned char *newzl; + if ((newnode = activeDefragAlloc(node))) { + if (newnode->prev) + newnode->prev->next = newnode; + else + ql->head = newnode; + if (newnode->next) + newnode->next->prev = newnode; + else + ql->tail = newnode; + *node_ref = node = newnode; + defragged++; + } + if ((newzl = activeDefragAlloc(node->zl))) + defragged++, node->zl = newzl; + return defragged; +} + +long activeDefragQuickListNodes(quicklist *ql) { + quicklistNode *node = ql->head; + long defragged = 0; while (node) { - if ((newnode = activeDefragAlloc(node))) { - if (newnode->prev) - newnode->prev->next = newnode; - else - ql->head = newnode; - if (newnode->next) - newnode->next->prev = newnode; - else - ql->tail = newnode; - node = newnode; - defragged++; - } - if ((newzl = activeDefragAlloc(node->zl))) - defragged++, node->zl = newzl; + defragged += activeDefragQuickListNode(ql, &node); node = node->next; } return defragged; @@ -440,12 +447,48 @@ void defragLater(redisDb *db, dictEntry *kde) { listAddNodeTail(db->defrag_later, key); } -long scanLaterList(robj *ob) { +/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ +long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) { quicklist *ql = ob->ptr; + quicklistNode *node; + long iterations = 0; + int bookmark_failed = 0; if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST) return 0; - server.stat_active_defrag_scanned+=ql->len; - return activeDefragQuickListNodes(ql); + + if (*cursor == 0) { + /* if cursor is 0, we start new iteration */ + node = ql->head; + } else { + node = quicklistBookmarkFind(ql, "_AD"); + if (!node) { + /* if the bookmark was deleted, it means we reached the end. */ + *cursor = 0; + return 0; + } + node = node->next; + } + + (*cursor)++; + while (node) { + (*defragged) += activeDefragQuickListNode(ql, &node); + server.stat_active_defrag_scanned++; + if (++iterations > 128 && !bookmark_failed) { + if (ustime() > endtime) { + if (!quicklistBookmarkCreate(&ql, "_AD", node)) { + bookmark_failed = 1; + } else { + ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */ + return 1; + } + } + iterations = 0; + } + node = node->next; + } + quicklistBookmarkDelete(ql, "_AD"); + *cursor = 0; + return bookmark_failed? 1: 0; } typedef struct { @@ -638,7 +681,8 @@ int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime, void *newdata = activeDefragAlloc(ri.data); if (newdata) raxSetData(ri.node, ri.data=newdata), (*defragged)++; - if (++iterations > 16) { + server.stat_active_defrag_scanned++; + if (++iterations > 128) { if (ustime() > endtime) { serverAssert(ri.key_len==sizeof(last)); memcpy(last,ri.key,ri.key_len); @@ -900,8 +944,7 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { if (de) { robj *ob = dictGetVal(de); if (ob->type == OBJ_LIST) { - server.stat_active_defrag_hits += scanLaterList(ob); - *cursor = 0; /* list has no scan, we must finish it in one go */ + return scanLaterList(ob, cursor, endtime, &server.stat_active_defrag_hits); } else if (ob->type == OBJ_SET) { server.stat_active_defrag_hits += scanLaterSet(ob, cursor); } else if (ob->type == OBJ_ZSET) { @@ -961,11 +1004,6 @@ int defragLaterStep(redisDb *db, long long endtime) { if (defragLaterItem(de, &defrag_later_cursor, endtime)) quit = 1; /* time is up, we didn't finish all the work */ - /* Don't start a new BIG key in this loop, this is because the - * next key can be a list, and scanLaterList must be done in once cycle */ - if (!defrag_later_cursor) - quit = 1; - /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields * (if we have a lot of pointers in one hash bucket, or rehashing), * check if we reached the time limit. */ diff --git a/src/quicklist.c b/src/quicklist.c index 7b5484116..ae183ffd8 100644 --- a/src/quicklist.c +++ b/src/quicklist.c @@ -70,6 +70,12 @@ static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536}; } while (0); #endif +/* Bookmarks forward declarations */ +#define QL_MAX_BM ((1 << QL_BM_BITS)-1) +quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name); +quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node); +void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm); + /* Simple way to give quicklistEntry structs default values with one call. */ #define initEntry(e) \ do { \ @@ -100,10 +106,11 @@ quicklist *quicklistCreate(void) { quicklist->count = 0; quicklist->compress = 0; quicklist->fill = -2; + quicklist->bookmark_count = 0; return quicklist; } -#define COMPRESS_MAX (1 << 16) +#define COMPRESS_MAX (1 << QL_COMP_BITS) void quicklistSetCompressDepth(quicklist *quicklist, int compress) { if (compress > COMPRESS_MAX) { compress = COMPRESS_MAX; @@ -113,7 +120,7 @@ void quicklistSetCompressDepth(quicklist *quicklist, int compress) { quicklist->compress = compress; } -#define FILL_MAX (1 << 15) +#define FILL_MAX (1 << (QL_FILL_BITS-1)) void quicklistSetFill(quicklist *quicklist, int fill) { if (fill > FILL_MAX) { fill = FILL_MAX; @@ -169,6 +176,7 @@ void quicklistRelease(quicklist *quicklist) { quicklist->len--; current = next; } + quicklistBookmarksClear(quicklist); zfree(quicklist); } @@ -578,6 +586,15 @@ quicklist *quicklistCreateFromZiplist(int fill, int compress, REDIS_STATIC void __quicklistDelNode(quicklist *quicklist, quicklistNode *node) { + /* Update the bookmark if any */ + quicklistBookmark *bm = _quicklistBookmarkFindByNode(quicklist, node); + if (bm) { + bm->node = node->next; + /* if the bookmark was to the last node, delete it. */ + if (!bm->node) + _quicklistBookmarkDelete(quicklist, bm); + } + if (node->next) node->next->prev = node->prev; if (node->prev) @@ -1410,6 +1427,87 @@ void quicklistPush(quicklist *quicklist, void *value, const size_t sz, } } +/* Create or update a bookmark in the list which will be updated to the next node + * automatically when the one referenced gets deleted. + * Returns 1 on success (creation of new bookmark or override of an existing one). + * Returns 0 on failure (reached the maximum supported number of bookmarks). + * NOTE: use short simple names, so that string compare on find is quick. + * NOTE: bookmakrk creation may re-allocate the quicklist, so the input pointer + may change and it's the caller responsibilty to update the reference. + */ +int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node) { + quicklist *ql = *ql_ref; + if (ql->bookmark_count >= QL_MAX_BM) + return 0; + quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name); + if (bm) { + bm->node = node; + return 1; + } + ql = zrealloc(ql, sizeof(quicklist) + (ql->bookmark_count+1) * sizeof(quicklistBookmark)); + *ql_ref = ql; + ql->bookmarks[ql->bookmark_count].node = node; + ql->bookmarks[ql->bookmark_count].name = zstrdup(name); + ql->bookmark_count++; + return 1; +} + +/* Find the quicklist node referenced by a named bookmark. + * When the bookmarked node is deleted the bookmark is updated to the next node, + * and if that's the last node, the bookmark is deleted (so find returns NULL). */ +quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name) { + quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name); + if (!bm) return NULL; + return bm->node; +} + +/* Delete a named bookmark. + * returns 0 if bookmark was not found, and 1 if deleted. + * Note that the bookmark memory is not freed yet, and is kept for future use. */ +int quicklistBookmarkDelete(quicklist *ql, const char *name) { + quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name); + if (!bm) + return 0; + _quicklistBookmarkDelete(ql, bm); + return 1; +} + +quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name) { + unsigned i; + for (i=0; ibookmark_count; i++) { + if (!strcmp(ql->bookmarks[i].name, name)) { + return &ql->bookmarks[i]; + } + } + return NULL; +} + +quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node) { + unsigned i; + for (i=0; ibookmark_count; i++) { + if (ql->bookmarks[i].node == node) { + return &ql->bookmarks[i]; + } + } + return NULL; +} + +void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm) { + int index = bm - ql->bookmarks; + zfree(bm->name); + ql->bookmark_count--; + memmove(bm, bm+1, (ql->bookmark_count - index)* sizeof(*bm)); + /* NOTE: We do not shrink (realloc) the quicklist yet (to avoid resonance, + * it may be re-used later (a call to realloc may NOP). */ +} + +void quicklistBookmarksClear(quicklist *ql) { + while (ql->bookmark_count) + zfree(ql->bookmarks[--ql->bookmark_count].name); + /* NOTE: We do not shrink (realloc) the quick list. main use case for this + * function is just before releasing the allocation. */ +} + /* The rest of this file is test cases and test helpers. */ #ifdef REDIS_TEST #include @@ -2641,6 +2739,54 @@ int quicklistTest(int argc, char *argv[]) { printf("Compressions: %0.2f seconds.\n", (float)(stop - start) / 1000); printf("\n"); + TEST("bookmark get updated to next item") { + quicklist *ql = quicklistNew(1, 0); + quicklistPushTail(ql, "1", 1); + quicklistPushTail(ql, "2", 1); + quicklistPushTail(ql, "3", 1); + quicklistPushTail(ql, "4", 1); + quicklistPushTail(ql, "5", 1); + assert(ql->len==5); + /* add two bookmarks, one pointing to the node before the last. */ + assert(quicklistBookmarkCreate(&ql, "_dummy", ql->head->next)); + assert(quicklistBookmarkCreate(&ql, "_test", ql->tail->prev)); + /* test that the bookmark returns the right node, delete it and see that the bookmark points to the last node */ + assert(quicklistBookmarkFind(ql, "_test") == ql->tail->prev); + assert(quicklistDelRange(ql, -2, 1)); + assert(quicklistBookmarkFind(ql, "_test") == ql->tail); + /* delete the last node, and see that the bookmark was deleted. */ + assert(quicklistDelRange(ql, -1, 1)); + assert(quicklistBookmarkFind(ql, "_test") == NULL); + /* test that other bookmarks aren't affected */ + assert(quicklistBookmarkFind(ql, "_dummy") == ql->head->next); + assert(quicklistBookmarkFind(ql, "_missing") == NULL); + assert(ql->len==3); + quicklistBookmarksClear(ql); /* for coverage */ + assert(quicklistBookmarkFind(ql, "_dummy") == NULL); + quicklistRelease(ql); + } + + TEST("bookmark limit") { + int i; + quicklist *ql = quicklistNew(1, 0); + quicklistPushHead(ql, "1", 1); + for (i=0; ihead)); + /* when all bookmarks are used, creation fails */ + assert(!quicklistBookmarkCreate(&ql, "_test", ql->head)); + /* delete one and see that we can now create another */ + assert(quicklistBookmarkDelete(ql, "0")); + assert(quicklistBookmarkCreate(&ql, "_test", ql->head)); + /* delete one and see that the rest survive */ + assert(quicklistBookmarkDelete(ql, "_test")); + for (i=1; ihead); + /* make sure the deleted ones are indeed gone */ + assert(!quicklistBookmarkFind(ql, "0")); + assert(!quicklistBookmarkFind(ql, "_test")); + quicklistRelease(ql); + } + if (!err) printf("ALL TESTS PASSED!\n"); else diff --git a/src/quicklist.h b/src/quicklist.h index a7e27a2dd..8b553c119 100644 --- a/src/quicklist.h +++ b/src/quicklist.h @@ -28,6 +28,8 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include // for UINTPTR_MAX + #ifndef __QUICKLIST_H__ #define __QUICKLIST_H__ @@ -64,19 +66,51 @@ typedef struct quicklistLZF { char compressed[]; } quicklistLZF; +/* Bookmarks are padded with realloc at the end of of the quicklist struct. + * They should only be used for very big lists if thousands of nodes were the + * excess memory usage is negligible, and there's a real need to iterate on them + * in portions. + * When not used, they don't add any memory overhead, but when used and then + * deleted, some overhead remains (to avoid resonance). + * The number of bookmarks used should be kept to minimum since it also adds + * overhead on node deletion (searching for a bookmark to update). */ +typedef struct quicklistBookmark { + quicklistNode *node; + char *name; +} quicklistBookmark; + +#if UINTPTR_MAX == 0xffffffff +/* 32-bit */ +# define QL_FILL_BITS 14 +# define QL_COMP_BITS 14 +# define QL_BM_BITS 4 +#elif UINTPTR_MAX == 0xffffffffffffffff +/* 64-bit */ +# define QL_FILL_BITS 16 +# define QL_COMP_BITS 16 +# define QL_BM_BITS 4 /* we can encode more, but we rather limit the user + since they cause performance degradation. */ +#else +# error unknown arch bits count +#endif + /* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist. * 'count' is the number of total entries. * 'len' is the number of quicklist nodes. * 'compress' is: -1 if compression disabled, otherwise it's the number * of quicklistNodes to leave uncompressed at ends of quicklist. - * 'fill' is the user-requested (or default) fill factor. */ + * 'fill' is the user-requested (or default) fill factor. + * 'bookmakrs are an optional feature that is used by realloc this struct, + * so that they don't consume memory when not used. */ typedef struct quicklist { quicklistNode *head; quicklistNode *tail; unsigned long count; /* total count of all entries in all ziplists */ unsigned long len; /* number of quicklistNodes */ - int fill : 16; /* fill factor for individual nodes */ - unsigned int compress : 16; /* depth of end nodes not to compress;0=off */ + int fill : QL_FILL_BITS; /* fill factor for individual nodes */ + unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */ + unsigned int bookmark_count: QL_BM_BITS; + quicklistBookmark bookmarks[]; } quicklist; typedef struct quicklistIter { @@ -158,6 +192,12 @@ unsigned long quicklistCount(const quicklist *ql); int quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len); size_t quicklistGetLzf(const quicklistNode *node, void **data); +/* bookmarks */ +int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node); +int quicklistBookmarkDelete(quicklist *ql, const char *name); +quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name); +void quicklistBookmarksClear(quicklist *ql); + #ifdef REDIS_TEST int quicklistTest(int argc, char *argv[]); #endif diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index d152e212c..ec80c7384 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -209,5 +209,97 @@ start_server {tags {"defrag"}} { assert {$digest eq $newdigest} r save ;# saving an rdb iterates over all the data / pointers } {OK} + + test "Active defrag big list" { + r flushdb + r config resetstat + r config set save "" ;# prevent bgsave from interfereing with save below + r config set hz 100 + r config set activedefrag no + r config set active-defrag-max-scan-fields 1000 + r config set active-defrag-threshold-lower 5 + r config set active-defrag-cycle-min 65 + r config set active-defrag-cycle-max 75 + r config set active-defrag-ignore-bytes 2mb + r config set maxmemory 0 + r config set list-max-ziplist-size 5 ;# list of 500k items will have 100k quicklist nodes + + # create big keys with 10k items + set rd [redis_deferring_client] + + set expected_frag 1.7 + # add a mass of list nodes to two lists (allocations are interlaced) + set val [string repeat A 100] ;# 5 items of 100 bytes puts us in the 640 bytes bin, which has 32 regs, so high potential for fragmentation + for {set j 0} {$j < 500000} {incr j} { + $rd lpush biglist1 $val + $rd lpush biglist2 $val + } + for {set j 0} {$j < 500000} {incr j} { + $rd read ; # Discard replies + $rd read ; # Discard replies + } + + # create some fragmentation + r del biglist2 + + # start defrag + after 120 ;# serverCron only updates the info once in 100ms + set frag [s allocator_frag_ratio] + if {$::verbose} { + puts "frag $frag" + } + + assert {$frag >= $expected_frag} + r config set latency-monitor-threshold 5 + r latency reset + + set digest [r debug digest] + catch {r config set activedefrag yes} e + if {![string match {DISABLED*} $e]} { + # wait for the active defrag to start working (decision once a second) + wait_for_condition 50 100 { + [s active_defrag_running] ne 0 + } else { + fail "defrag not started." + } + + # wait for the active defrag to stop working + wait_for_condition 500 100 { + [s active_defrag_running] eq 0 + } else { + after 120 ;# serverCron only updates the info once in 100ms + puts [r info memory] + puts [r info stats] + puts [r memory malloc-stats] + fail "defrag didn't stop." + } + + # test the the fragmentation is lower + after 120 ;# serverCron only updates the info once in 100ms + set frag [s allocator_frag_ratio] + set max_latency 0 + foreach event [r latency latest] { + lassign $event eventname time latency max + if {$eventname == "active-defrag-cycle"} { + set max_latency $max + } + } + if {$::verbose} { + puts "frag $frag" + puts "max latency $max_latency" + puts [r latency latest] + puts [r latency history active-defrag-cycle] + } + assert {$frag < 1.1} + # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, + # we expect max latency to be not much higher than 7.5ms + assert {$max_latency <= 12} + } + # verify the data isn't corrupted or changed + set newdigest [r debug digest] + assert {$digest eq $newdigest} + r save ;# saving an rdb iterates over all the data / pointers + r del biglist1 ;# coverage for quicklistBookmarksClear + } {1} } }