diff --git a/src/config.cpp b/src/config.cpp index aa2b592a8..caa4be15b 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2253,7 +2253,6 @@ standardConfig configs[] = { createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->key_load_delay, 0, INTEGER_CONFIG, NULL, NULL), - createIntConfig("tracking-table-max-fill", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->tracking_table_max_fill, 10, INTEGER_CONFIG, NULL, NULL), /* Default: 10% tracking table max fill. */ createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, cserver.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */ createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ), createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves), @@ -2288,6 +2287,7 @@ standardConfig configs[] = { createSizeTConfig("stream-node-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->stream_node_max_bytes, 4096, MEMORY_CONFIG, NULL, NULL), createSizeTConfig("zset-max-ziplist-value", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->zset_max_ziplist_value, 64, MEMORY_CONFIG, NULL, NULL), createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL), + createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */ /* Other configs */ createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */ diff --git a/src/db.cpp b/src/db.cpp index e71bf89c6..4b48f8df3 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1550,8 +1550,10 @@ int expireIfNeeded(redisDb *db, robj *key) { propagateExpire(db,key,g_pserver->lazyfree_lazy_expire); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",key,db->id); - return g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(db,key) : - dbSyncDelete(db,key); + int retval = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(db,key) : + dbSyncDelete(db,key); + if (retval) signalModifiedKey(db,key); + return retval; } /* ----------------------------------------------------------------------------- diff --git a/src/defrag.cpp b/src/defrag.cpp index 97297b8e5..767da9def 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -5,8 +5,8 @@ * We do that by scanning the keyspace and for each pointer we have, we can try to * ask the allocator if moving it to a new address will help reduce fragmentation. * - * Copyright (c) 2017, Oran Agra - * Copyright (c) 2017, Redis Labs, Inc + * Copyright (c) 2020, Oran Agra + * Copyright (c) 2020, Redis Labs, Inc * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -421,25 +421,32 @@ bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { return false; } -long activeDefragQuickListNodes(quicklist *ql) { - quicklistNode *node = ql->head, *newnode; +long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { + quicklistNode *newnode, *node = *node_ref; long defragged = 0; unsigned char *newzl; + if ((newnode = (quicklistNode*)activeDefragAlloc(node))) { + if (newnode->prev) + newnode->prev->next = newnode; + else + ql->head = newnode; + if (newnode->next) + newnode->next->prev = newnode; + else + ql->tail = newnode; + *node_ref = node = newnode; + defragged++; + } + if ((newzl = (unsigned char*)activeDefragAlloc(node->zl))) + defragged++, node->zl = newzl; + return defragged; +} + +long activeDefragQuickListNodes(quicklist *ql) { + quicklistNode *node = ql->head; + long defragged = 0; while (node) { - if ((newnode = (quicklistNode*)activeDefragAlloc(node))) { - if (newnode->prev) - newnode->prev->next = newnode; - else - ql->head = newnode; - if (newnode->next) - newnode->next->prev = newnode; - else - ql->tail = newnode; - node = newnode; - defragged++; - } - if ((newzl = (unsigned char*)activeDefragAlloc(node->zl))) - defragged++, node->zl = newzl; + defragged += activeDefragQuickListNode(ql, &node); node = node->next; } return defragged; @@ -453,12 +460,48 @@ void defragLater(redisDb *db, dictEntry *kde) { listAddNodeTail(db->defrag_later, key); } -long scanLaterList(robj *ob) { +/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ +long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) { quicklist *ql = (quicklist*)ptrFromObj(ob); + quicklistNode *node; + long iterations = 0; + int bookmark_failed = 0; if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST) return 0; - g_pserver->stat_active_defrag_scanned+=ql->len; - return activeDefragQuickListNodes(ql); + + if (*cursor == 0) { + /* if cursor is 0, we start new iteration */ + node = ql->head; + } else { + node = quicklistBookmarkFind(ql, "_AD"); + if (!node) { + /* if the bookmark was deleted, it means we reached the end. */ + *cursor = 0; + return 0; + } + node = node->next; + } + + (*cursor)++; + while (node) { + (*defragged) += activeDefragQuickListNode(ql, &node); + g_pserver->stat_active_defrag_scanned++; + if (++iterations > 128 && !bookmark_failed) { + if (ustime() > endtime) { + if (!quicklistBookmarkCreate(&ql, "_AD", node)) { + bookmark_failed = 1; + } else { + ob->m_ptr = ql; /* bookmark creation may have re-allocated the quicklist */ + return 1; + } + } + iterations = 0; + } + node = node->next; + } + quicklistBookmarkDelete(ql, "_AD"); + *cursor = 0; + return bookmark_failed? 1: 0; } typedef struct { @@ -651,7 +694,8 @@ int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime, void *newdata = activeDefragAlloc(ri.data); if (newdata) raxSetData(ri.node, ri.data=newdata), (*defragged)++; - if (++iterations > 16) { + g_pserver->stat_active_defrag_scanned++; + if (++iterations > 128) { if (ustime() > endtime) { serverAssert(ri.key_len==sizeof(last)); memcpy(last,ri.key,ri.key_len); @@ -916,8 +960,7 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { if (de) { robj *ob = (robj*)dictGetVal(de); if (ob->type == OBJ_LIST) { - g_pserver->stat_active_defrag_hits += scanLaterList(ob); - *cursor = 0; /* list has no scan, we must finish it in one go */ + return scanLaterList(ob, cursor, endtime, &g_pserver->stat_active_defrag_hits); } else if (ob->type == OBJ_SET) { g_pserver->stat_active_defrag_hits += scanLaterSet(ob, cursor); } else if (ob->type == OBJ_ZSET) { @@ -977,11 +1020,6 @@ int defragLaterStep(redisDb *db, long long endtime) { if (defragLaterItem(de, &defrag_later_cursor, endtime)) quit = 1; /* time is up, we didn't finish all the work */ - /* Don't start a new BIG key in this loop, this is because the - * next key can be a list, and scanLaterList must be done in once cycle */ - if (!defrag_later_cursor) - quit = 1; - /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields * (if we have a lot of pointers in one hash bucket, or rehashing), * check if we reached the time limit. */ diff --git a/src/networking.cpp b/src/networking.cpp index 4d60ac1ff..1c8d561e6 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -176,6 +176,7 @@ client *createClient(connection *conn, int iel) { c->master_error = 0; memset(c->uuid, 0, UUID_BINARY_LEN); + c->client_tracking_prefixes = NULL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; c->auth_module = NULL; @@ -2584,7 +2585,6 @@ int clientSetNameOrReply(client *c, robj *name) { void clientCommand(client *c) { listNode *ln; listIter li; - client *client; if (c->argc == 2 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"help")) { const char *help[] = { @@ -2698,7 +2698,7 @@ NULL /* Iterate clients killing all the matching clients. */ listRewind(g_pserver->clients,&li); while ((ln = listNext(&li)) != NULL) { - client = (struct client*)listNodeValue(ln); + client *client = (struct client*)listNodeValue(ln); if (addr && strcmp(getClientPeerId(client),addr) != 0) continue; if (type != -1 && getClientType(client) != type) continue; if (id != 0 && client->id != id) continue; @@ -2787,38 +2787,74 @@ NULL UNIT_MILLISECONDS) != C_OK) return; pauseClients(duration); addReply(c,shared.ok); - } else if (!strcasecmp(szFromObj(c->argv[1]),"tracking") && - (c->argc == 3 || c->argc == 5)) - { - /* CLIENT TRACKING (on|off) [REDIRECT ] */ + } else if (!strcasecmp(szFromObj(c->argv[1]),"tracking") && c->argc >= 3) { + /* CLIENT TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] + * [PREFIX second] ... */ long long redir = 0; + int bcast = 0; + robj **prefix = NULL; + size_t numprefix = 0; - /* Parse the redirection option: we'll require the client with - * the specified ID to exist right now, even if it is possible - * it will get disconnected later. */ - if (c->argc == 5) { - if (strcasecmp(szFromObj(c->argv[3]),"redirect") != 0) { - addReply(c,shared.syntaxerr); - return; - } else { - if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) != + /* Parse the options. */ + for (int j = 3; j < c->argc; j++) { + int moreargs = (c->argc-1) - j; + + if (!strcasecmp(szFromObj(c->argv[j]),"redirect") && moreargs) { + j++; + if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) != C_OK) return; + /* We will require the client with the specified ID to exist + * right now, even if it is possible that it gets disconnected + * later. Still a valid sanity check. */ if (lookupClientByID(redir) == NULL) { addReplyError(c,"The client ID you want redirect to " "does not exist"); return; } + } else if (!strcasecmp(szFromObj(c->argv[j]),"bcast")) { + bcast++; + } else if (!strcasecmp(szFromObj(c->argv[j]),"prefix") && moreargs) { + j++; + prefix = (robj**)zrealloc(prefix,sizeof(robj*)*(numprefix+1), MALLOC_LOCAL); + prefix[numprefix++] = c->argv[j]; + } else { + zfree(prefix); + addReply(c,shared.syntaxerr); + return; } } + /* Options are ok: enable or disable the tracking for this client. */ if (!strcasecmp(szFromObj(c->argv[2]),"on")) { - enableTracking(c,redir); + /* Before enabling tracking, make sure options are compatible + * among each other and with the current state of the client. */ + if (!bcast && numprefix) { + addReplyError(c, + "PREFIX option requires BCAST mode to be enabled"); + zfree(prefix); + return; + } + + if (c->flags & CLIENT_TRACKING) { + int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST); + if (oldbcast != bcast) { + addReplyError(c, + "You can't switch BCAST mode on/off before disabling " + "tracking for this client, and then re-enabling it with " + "a different mode."); + zfree(prefix); + return; + } + } + enableTracking(c,redir,bcast,prefix,numprefix); } else if (!strcasecmp(szFromObj(c->argv[2]),"off")) { disableTracking(c); } else { + zfree(prefix); addReply(c,shared.syntaxerr); return; } + zfree(prefix); addReply(c,shared.ok); } else if (!strcasecmp(szFromObj(c->argv[1]),"getredir") && c->argc == 2) { /* CLIENT GETREDIR */ diff --git a/src/object.cpp b/src/object.cpp index 2692e6308..8ef69a0eb 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1151,13 +1151,13 @@ sds getMemoryDoctorReport(void) { num_reports++; } - /* Allocator fss is higher than 1.1 and 10MB ? */ + /* Allocator rss is higher than 1.1 and 10MB ? */ if (mh->allocator_rss > 1.1 && mh->allocator_rss_bytes > 10<<20) { high_alloc_rss = 1; num_reports++; } - /* Non-Allocator fss is higher than 1.1 and 10MB ? */ + /* Non-Allocator rss is higher than 1.1 and 10MB ? */ if (mh->rss_extra > 1.1 && mh->rss_extra_bytes > 10<<20) { high_proc_rss = 1; num_reports++; diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 0c657a588..661124c1c 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -35,7 +35,11 @@ int clientSubscriptionsCount(client *c); * Pubsub client replies API *----------------------------------------------------------------------------*/ -/* Send a pubsub message of type "message" to the client. */ +/* Send a pubsub message of type "message" to the client. + * Normally 'msg' is a Redis object containing the string to send as + * message. However if the caller sets 'msg' as NULL, it will be able + * to send a special message (for instance an Array type) by using the + * addReply*() API family. */ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { if (c->resp == 2) addReplyAsync(c,shared.mbulkhdr[3]); @@ -43,7 +47,7 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { addReplyPushLenAsync(c,3); addReplyAsync(c,shared.messagebulk); addReplyBulkAsync(c,channel); - addReplyBulkAsync(c,msg); + if (msg) addReplyBulkAsync(c,msg); } /* Send a pubsub message of type "pmessage" to the client. The difference diff --git a/src/quicklist.c b/src/quicklist.c index 2f5eab215..0385e0bf9 100644 --- a/src/quicklist.c +++ b/src/quicklist.c @@ -70,6 +70,12 @@ static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536}; } while (0); #endif +/* Bookmarks forward declarations */ +#define QL_MAX_BM ((1 << QL_BM_BITS)-1) +quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name); +quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node); +void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm); + /* Simple way to give quicklistEntry structs default values with one call. */ #define initEntry(e) \ do { \ @@ -100,10 +106,11 @@ quicklist *quicklistCreate(void) { quicklist->count = 0; quicklist->compress = 0; quicklist->fill = -2; + quicklist->bookmark_count = 0; return quicklist; } -#define COMPRESS_MAX (1 << 16) +#define COMPRESS_MAX (1 << QL_COMP_BITS) void quicklistSetCompressDepth(quicklist *quicklist, int compress) { if (compress > COMPRESS_MAX) { compress = COMPRESS_MAX; @@ -113,7 +120,7 @@ void quicklistSetCompressDepth(quicklist *quicklist, int compress) { quicklist->compress = compress; } -#define FILL_MAX (1 << 15) +#define FILL_MAX (1 << (QL_FILL_BITS-1)) void quicklistSetFill(quicklist *quicklist, int fill) { if (fill > FILL_MAX) { fill = FILL_MAX; @@ -169,6 +176,7 @@ void quicklistRelease(quicklist *quicklist) { quicklist->len--; current = next; } + quicklistBookmarksClear(quicklist); zfree(quicklist); } @@ -578,6 +586,15 @@ quicklist *quicklistCreateFromZiplist(int fill, int compress, REDIS_STATIC void __quicklistDelNode(quicklist *quicklist, quicklistNode *node) { + /* Update the bookmark if any */ + quicklistBookmark *bm = _quicklistBookmarkFindByNode(quicklist, node); + if (bm) { + bm->node = node->next; + /* if the bookmark was to the last node, delete it. */ + if (!bm->node) + _quicklistBookmarkDelete(quicklist, bm); + } + if (node->next) node->next->prev = node->prev; if (node->prev) @@ -1410,6 +1427,87 @@ void quicklistPush(quicklist *quicklist, void *value, const size_t sz, } } +/* Create or update a bookmark in the list which will be updated to the next node + * automatically when the one referenced gets deleted. + * Returns 1 on success (creation of new bookmark or override of an existing one). + * Returns 0 on failure (reached the maximum supported number of bookmarks). + * NOTE: use short simple names, so that string compare on find is quick. + * NOTE: bookmakrk creation may re-allocate the quicklist, so the input pointer + may change and it's the caller responsibilty to update the reference. + */ +int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node) { + quicklist *ql = *ql_ref; + if (ql->bookmark_count >= QL_MAX_BM) + return 0; + quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name); + if (bm) { + bm->node = node; + return 1; + } + ql = zrealloc(ql, sizeof(quicklist) + (ql->bookmark_count+1) * sizeof(quicklistBookmark), MALLOC_SHARED); + *ql_ref = ql; + ql->bookmarks[ql->bookmark_count].node = node; + ql->bookmarks[ql->bookmark_count].name = zstrdup(name); + ql->bookmark_count++; + return 1; +} + +/* Find the quicklist node referenced by a named bookmark. + * When the bookmarked node is deleted the bookmark is updated to the next node, + * and if that's the last node, the bookmark is deleted (so find returns NULL). */ +quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name) { + quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name); + if (!bm) return NULL; + return bm->node; +} + +/* Delete a named bookmark. + * returns 0 if bookmark was not found, and 1 if deleted. + * Note that the bookmark memory is not freed yet, and is kept for future use. */ +int quicklistBookmarkDelete(quicklist *ql, const char *name) { + quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name); + if (!bm) + return 0; + _quicklistBookmarkDelete(ql, bm); + return 1; +} + +quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name) { + unsigned i; + for (i=0; ibookmark_count; i++) { + if (!strcmp(ql->bookmarks[i].name, name)) { + return &ql->bookmarks[i]; + } + } + return NULL; +} + +quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node) { + unsigned i; + for (i=0; ibookmark_count; i++) { + if (ql->bookmarks[i].node == node) { + return &ql->bookmarks[i]; + } + } + return NULL; +} + +void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm) { + int index = bm - ql->bookmarks; + zfree(bm->name); + ql->bookmark_count--; + memmove(bm, bm+1, (ql->bookmark_count - index)* sizeof(*bm)); + /* NOTE: We do not shrink (realloc) the quicklist yet (to avoid resonance, + * it may be re-used later (a call to realloc may NOP). */ +} + +void quicklistBookmarksClear(quicklist *ql) { + while (ql->bookmark_count) + zfree(ql->bookmarks[--ql->bookmark_count].name); + /* NOTE: We do not shrink (realloc) the quick list. main use case for this + * function is just before releasing the allocation. */ +} + /* The rest of this file is test cases and test helpers. */ #ifdef REDIS_TEST #include @@ -2641,6 +2739,54 @@ int quicklistTest(int argc, char *argv[]) { printf("Compressions: %0.2f seconds.\n", (float)(stop - start) / 1000); printf("\n"); + TEST("bookmark get updated to next item") { + quicklist *ql = quicklistNew(1, 0); + quicklistPushTail(ql, "1", 1); + quicklistPushTail(ql, "2", 1); + quicklistPushTail(ql, "3", 1); + quicklistPushTail(ql, "4", 1); + quicklistPushTail(ql, "5", 1); + assert(ql->len==5); + /* add two bookmarks, one pointing to the node before the last. */ + assert(quicklistBookmarkCreate(&ql, "_dummy", ql->head->next)); + assert(quicklistBookmarkCreate(&ql, "_test", ql->tail->prev)); + /* test that the bookmark returns the right node, delete it and see that the bookmark points to the last node */ + assert(quicklistBookmarkFind(ql, "_test") == ql->tail->prev); + assert(quicklistDelRange(ql, -2, 1)); + assert(quicklistBookmarkFind(ql, "_test") == ql->tail); + /* delete the last node, and see that the bookmark was deleted. */ + assert(quicklistDelRange(ql, -1, 1)); + assert(quicklistBookmarkFind(ql, "_test") == NULL); + /* test that other bookmarks aren't affected */ + assert(quicklistBookmarkFind(ql, "_dummy") == ql->head->next); + assert(quicklistBookmarkFind(ql, "_missing") == NULL); + assert(ql->len==3); + quicklistBookmarksClear(ql); /* for coverage */ + assert(quicklistBookmarkFind(ql, "_dummy") == NULL); + quicklistRelease(ql); + } + + TEST("bookmark limit") { + int i; + quicklist *ql = quicklistNew(1, 0); + quicklistPushHead(ql, "1", 1); + for (i=0; ihead)); + /* when all bookmarks are used, creation fails */ + assert(!quicklistBookmarkCreate(&ql, "_test", ql->head)); + /* delete one and see that we can now create another */ + assert(quicklistBookmarkDelete(ql, "0")); + assert(quicklistBookmarkCreate(&ql, "_test", ql->head)); + /* delete one and see that the rest survive */ + assert(quicklistBookmarkDelete(ql, "_test")); + for (i=1; ihead); + /* make sure the deleted ones are indeed gone */ + assert(!quicklistBookmarkFind(ql, "0")); + assert(!quicklistBookmarkFind(ql, "_test")); + quicklistRelease(ql); + } + if (!err) printf("ALL TESTS PASSED!\n"); else diff --git a/src/quicklist.h b/src/quicklist.h index 18619ad86..70b67d942 100644 --- a/src/quicklist.h +++ b/src/quicklist.h @@ -28,6 +28,8 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include // for UINTPTR_MAX + #ifndef __QUICKLIST_H__ #define __QUICKLIST_H__ @@ -72,19 +74,53 @@ typedef struct quicklistLZF { #endif } quicklistLZF; +/* Bookmarks are padded with realloc at the end of of the quicklist struct. + * They should only be used for very big lists if thousands of nodes were the + * excess memory usage is negligible, and there's a real need to iterate on them + * in portions. + * When not used, they don't add any memory overhead, but when used and then + * deleted, some overhead remains (to avoid resonance). + * The number of bookmarks used should be kept to minimum since it also adds + * overhead on node deletion (searching for a bookmark to update). */ +typedef struct quicklistBookmark { + quicklistNode *node; + char *name; +} quicklistBookmark; + +#if UINTPTR_MAX == 0xffffffff +/* 32-bit */ +# define QL_FILL_BITS 14 +# define QL_COMP_BITS 14 +# define QL_BM_BITS 4 +#elif UINTPTR_MAX == 0xffffffffffffffff +/* 64-bit */ +# define QL_FILL_BITS 16 +# define QL_COMP_BITS 16 +# define QL_BM_BITS 4 /* we can encode more, but we rather limit the user + since they cause performance degradation. */ +#else +# error unknown arch bits count +#endif + /* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist. * 'count' is the number of total entries. * 'len' is the number of quicklist nodes. * 'compress' is: -1 if compression disabled, otherwise it's the number * of quicklistNodes to leave uncompressed at ends of quicklist. - * 'fill' is the user-requested (or default) fill factor. */ + * 'fill' is the user-requested (or default) fill factor. + * 'bookmakrs are an optional feature that is used by realloc this struct, + * so that they don't consume memory when not used. */ typedef struct quicklist { quicklistNode *head; quicklistNode *tail; unsigned long count; /* total count of all entries in all ziplists */ unsigned long len; /* number of quicklistNodes */ - int fill : 16; /* fill factor for individual nodes */ - unsigned int compress : 16; /* depth of end nodes not to compress;0=off */ + int fill : QL_FILL_BITS; /* fill factor for individual nodes */ + unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */ + unsigned int bookmark_count: QL_BM_BITS; +#ifndef __cplusplus + quicklistBookmark bookmarks[]; +#endif } quicklist; typedef struct quicklistIter { @@ -170,6 +206,12 @@ unsigned long quicklistCount(const quicklist *ql); int quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len); size_t quicklistGetLzf(const quicklistNode *node, void **data); +/* bookmarks */ +int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node); +int quicklistBookmarkDelete(quicklist *ql, const char *name); +quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name); +void quicklistBookmarksClear(quicklist *ql); + #ifdef REDIS_TEST int quicklistTest(int argc, char *argv[]); #endif diff --git a/src/server.cpp b/src/server.cpp index d7a378f98..d4426bbe4 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2209,6 +2209,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { processUnblockedClients(IDX_EVENT_LOOP_MAIN); } + /* Send the invalidation messages to clients participating to the + * client side caching protocol in broadcasting (BCAST) mode. */ + trackingBroadcastInvalidationMessages(); + /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); @@ -3526,8 +3530,11 @@ void call(client *c, int flags) { if (c->cmd->flags & CMD_READONLY) { client *caller = (c->flags & CLIENT_LUA && g_pserver->lua_caller) ? g_pserver->lua_caller : c; - if (caller->flags & CLIENT_TRACKING) + if (caller->flags & CLIENT_TRACKING && + !(caller->flags & CLIENT_TRACKING_BCAST)) + { trackingRememberKeys(caller); + } } g_pserver->stat_numcommands++; @@ -4477,6 +4484,7 @@ sds genRedisInfoString(const char *section) { "active_defrag_misses:%lld\r\n" "active_defrag_key_hits:%lld\r\n" "active_defrag_key_misses:%lld\r\n" + "tracking_total_keys:%lld\r\n" "tracking_total_items:%llu\r\n", g_pserver->stat_numconnections, g_pserver->stat_numcommands, @@ -4505,6 +4513,7 @@ sds genRedisInfoString(const char *section) { g_pserver->stat_active_defrag_misses, g_pserver->stat_active_defrag_key_hits, g_pserver->stat_active_defrag_key_misses, + (unsigned long long) trackingGetTotalKeys(), (unsigned long long) trackingGetTotalItems()); } diff --git a/src/server.h b/src/server.h index c55eb9ee6..bb3f65c25 100644 --- a/src/server.h +++ b/src/server.h @@ -412,6 +412,7 @@ public: perform client side caching. */ #define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */ #define CLIENT_FORCE_REPLY (1ULL<<33) /* Should addReply be forced to write the text? */ +#define CLIENT_TRACKING_BCAST (1ULL<<34) /* Tracking in BCAST mode. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -1298,7 +1299,9 @@ typedef struct client { * invalidation messages for keys fetched by this client will be send to * the specified client ID. */ uint64_t client_tracking_redirection; - + rax *client_tracking_prefixes; /* A dictionary of prefixes we are already + subscribed to in BCAST mode, in the + context of client side caching. */ /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; @@ -1848,7 +1851,7 @@ struct redisServer { list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Client side caching. */ unsigned int tracking_clients; /* # of clients with tracking enabled.*/ - int tracking_table_max_fill; /* Max fill percentage. */ + size_t tracking_table_max_keys; /* Max number of keys in tracking table. */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -2224,13 +2227,15 @@ void addReplyStatusFormat(client *c, const char *fmt, ...); #endif /* Client side caching (tracking mode) */ -void enableTracking(client *c, uint64_t redirect_to); +void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix); void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); void trackingInvalidateKeysOnFlush(int dbid); void trackingLimitUsedSlots(void); uint64_t trackingGetTotalItems(void); +uint64_t trackingGetTotalKeys(void); +void trackingBroadcastInvalidationMessages(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/t_stream.cpp b/src/t_stream.cpp index 11d0e3b07..cd69dbe93 100644 --- a/src/t_stream.cpp +++ b/src/t_stream.cpp @@ -1856,6 +1856,8 @@ NULL g_pserver->dirty++; notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy", c->argv[2],c->db->id); + /* We want to unblock any XREADGROUP consumers with -NOGROUP. */ + signalKeyAsReady(c->db,c->argv[2]); } else { addReply(c,shared.czero); } diff --git a/src/tracking.cpp b/src/tracking.cpp index baf5ddbc8..50f03c946 100644 --- a/src/tracking.cpp +++ b/src/tracking.cpp @@ -42,12 +42,22 @@ * Clients will normally take frequently requested objects in memory, removing * them when invalidation messages are received. */ rax *TrackingTable = NULL; +rax *PrefixTable = NULL; uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across the whole tracking table. This givesn an hint about the total memory we are using server side for CSC. */ robj *TrackingChannelName; +/* This is the structure that we have as value of the PrefixTable, and + * represents the list of keys modified, and the list of clients that need + * to be notified, for a given prefix. */ +typedef struct bcastState { + rax *keys; /* Keys modified in the current event loop cycle. */ + rax *clients; /* Clients subscribed to the notification events for this + prefix. */ +} bcastState; + /* Remove the tracking state from the client 'c'. Note that there is not much * to do for us here, if not to decrement the counter of the clients in * tracking mode, because we just store the ID of the client in the tracking @@ -55,9 +65,55 @@ robj *TrackingChannelName; * client with many entries in the table is removed, it would cost a lot of * time to do the cleanup. */ void disableTracking(client *c) { + /* If this client is in broadcasting mode, we need to unsubscribe it + * from all the prefixes it is registered to. */ + if (c->flags & CLIENT_TRACKING_BCAST) { + raxIterator ri; + raxStart(&ri,c->client_tracking_prefixes); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + bcastState *bs = (bcastState*)raxFind(PrefixTable,ri.key,ri.key_len); + serverAssert(bs != raxNotFound); + raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL); + /* Was it the last client? Remove the prefix from the + * table. */ + if (raxSize(bs->clients) == 0) { + raxFree(bs->clients); + raxFree(bs->keys); + zfree(bs); + raxRemove(PrefixTable,ri.key,ri.key_len,NULL); + } + } + raxStop(&ri); + raxFree(c->client_tracking_prefixes); + c->client_tracking_prefixes = NULL; + } + + /* Clear flags and adjust the count. */ if (c->flags & CLIENT_TRACKING) { g_pserver->tracking_clients--; - c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR); + c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| + CLIENT_TRACKING_BCAST); + } +} + +/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is + * already registered for the specified prefix, no operation is performed. */ +void enableBcastTrackingForPrefix(client *c, const char *prefix, size_t plen) { + bcastState *bs = (bcastState*)raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix)); + /* If this is the first client subscribing to such prefix, create + * the prefix in the table. */ + if (bs == raxNotFound) { + bs = (bcastState*)zmalloc(sizeof(*bs)); + bs->keys = raxNew(); + bs->clients = raxNew(); + raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); + } + if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { + if (c->client_tracking_prefixes == NULL) + c->client_tracking_prefixes = raxNew(); + raxInsert(c->client_tracking_prefixes, + (unsigned char*)prefix,plen,NULL,NULL); } } @@ -68,16 +124,25 @@ void disableTracking(client *c) { * eventually get freed, we'll send a message to the original client to * inform it of the condition. Multiple clients can redirect the invalidation * messages to the same client ID. */ -void enableTracking(client *c, uint64_t redirect_to) { - if (c->flags & CLIENT_TRACKING) return; +void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) { + if (!(c->flags & CLIENT_TRACKING)) g_pserver->tracking_clients++; c->flags |= CLIENT_TRACKING; - c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR; + c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST); c->client_tracking_redirection = redirect_to; - g_pserver->tracking_clients++; if (TrackingTable == NULL) { TrackingTable = raxNew(); + PrefixTable = raxNew(); TrackingChannelName = createStringObject("__redis__:invalidate",20); } + + if (bcast) { + c->flags |= CLIENT_TRACKING_BCAST; + if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); + for (size_t j = 0; j < numprefix; j++) { + sds sdsprefix = szFromObj(prefix[j]); + enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix)); + } + } } /* This function is called after the excution of a readonly command in the @@ -106,7 +171,17 @@ void trackingRememberKeys(client *c) { getKeysFreeResult(keys); } -void sendTrackingMessage(client *c, const char *keyname, size_t keylen) { +/* Given a key name, this function sends an invalidation message in the + * proper channel (depending on RESP version: PubSub or Push message) and + * to the proper client (in case fo redirection), in the context of the + * client 'c' with tracking enabled. + * + * In case the 'proto' argument is non zero, the function will assume that + * 'keyname' points to a buffer of 'keylen' bytes already expressed in the + * form of Redis RESP protocol, representing an array of keys to send + * to the client as value of the invalidation. This is used in BCAST mode + * in order to optimized the implementation to use less CPU time. */ +void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int proto) { int using_redirection = 0; if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); @@ -132,15 +207,45 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen) { if (c->resp > 2) { addReplyPushLen(c,2); addReplyBulkCBuffer(c,"invalidate",10); - addReplyBulkCBuffer(c,keyname,keylen); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ - robj keyobj; - initStaticStringObject(keyobj,(char*)keyname); - addReplyPubsubMessage(c,TrackingChannelName,&keyobj); - serverAssert(keyobj.getrefcount(std::memory_order_relaxed) == 1); + addReplyPubsubMessage(c,TrackingChannelName,NULL); + } else { + /* If are here, the client is not using RESP3, nor is + * redirecting to another client. We can't send anything to + * it since RESP2 does not support push messages in the same + * connection. */ + return; } + + /* Send the "value" part, which is the array of keys. */ + if (proto) { + addReplyProto(c,keyname,keylen); + } else { + addReplyArrayLen(c,1); + addReplyBulkCBuffer(c,keyname,keylen); + } +} + +/* This function is called when a key is modified in Redis and in the case + * we have at least one client with the BCAST mode enabled. + * Its goal is to set the key in the right broadcast state if the key + * matches one or more prefixes in the prefix table. Later when we + * return to the event loop, we'll send invalidation messages to the + * clients subscribed to each prefix. */ +void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { + raxIterator ri; + raxStart(&ri,PrefixTable); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (ri.key_len > keylen) continue; + if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0) + continue; + bcastState *bs = (bcastState*)ri.data; + raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); + } + raxStop(&ri); } /* This function is called from signalModifiedKey() or other places in Redis @@ -150,6 +255,10 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen) { void trackingInvalidateKey(robj *keyobj) { if (TrackingTable == NULL) return; sds sdskey = szFromObj(keyobj); + + if (raxSize(PrefixTable) > 0) + trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey)); + rax *ids = (rax*)raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); if (ids == raxNotFound) return;; @@ -160,8 +269,18 @@ void trackingInvalidateKey(robj *keyobj) { uint64_t id; memcpy(&id,ri.key,sizeof(id)); client *c = lookupClientByID(id); - if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; - sendTrackingMessage(c,sdskey,sdslen(sdskey)); + /* Note that if the client is in BCAST mode, we don't want to + * send invalidation messages that were pending in the case + * previously the client was not in BCAST mode. This can happen if + * TRACKING is enabled normally, and then the client switches to + * BCAST mode. */ + if (c == NULL || + !(c->flags & CLIENT_TRACKING)|| + c->flags & CLIENT_TRACKING_BCAST) + { + continue; + } + sendTrackingMessage(c,sdskey,sdslen(sdskey),0); } raxStop(&ri); @@ -198,7 +317,7 @@ void trackingInvalidateKeysOnFlush(int dbid) { while ((ln = listNext(&li)) != NULL) { client *c = (client*)listNodeValue(ln); if (c->flags & CLIENT_TRACKING) { - sendTrackingMessage(c,"",1); + sendTrackingMessage(c,"",1,0); } } } @@ -223,8 +342,8 @@ void trackingInvalidateKeysOnFlush(int dbid) { void trackingLimitUsedSlots(void) { static unsigned int timeout_counter = 0; if (TrackingTable == NULL) return; - if (g_pserver->tracking_table_max_fill == 0) return; /* No limits set. */ - size_t max_keys = g_pserver->tracking_table_max_fill; + if (g_pserver->tracking_table_max_keys == 0) return; /* No limits set. */ + size_t max_keys = g_pserver->tracking_table_max_keys; if (raxSize(TrackingTable) <= max_keys) { timeout_counter = 0; return; /* Limit not reached. */ @@ -259,8 +378,69 @@ void trackingLimitUsedSlots(void) { timeout_counter++; } +/* This function will run the prefixes of clients in BCAST mode and + * keys that were modified about each prefix, and will send the + * notifications to each client in each prefix. */ +void trackingBroadcastInvalidationMessages(void) { + raxIterator ri, ri2; + + /* Return ASAP if there is nothing to do here. */ + if (TrackingTable == NULL || !g_pserver->tracking_clients) return; + + raxStart(&ri,PrefixTable); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + bcastState *bs = (bcastState*)ri.data; + if (raxSize(bs->keys)) { + /* Create the array reply with the list of keys once, then send + * it to all the clients subscribed to this prefix. */ + char buf[32]; + size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); + sds proto = sdsempty(); + proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); + proto = sdscatlen(proto,"*",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + raxStart(&ri2,bs->keys); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + len = ll2string(buf,sizeof(buf),ri2.key_len); + proto = sdscatlen(proto,"$",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + proto = sdscatlen(proto,ri2.key,ri2.key_len); + proto = sdscatlen(proto,"\r\n",2); + } + raxStop(&ri2); + + /* Send this array of keys to every client in the list. */ + raxStart(&ri2,bs->clients); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + client *c; + memcpy(&c,ri2.key,sizeof(c)); + sendTrackingMessage(c,proto,sdslen(proto),1); + } + raxStop(&ri2); + + /* Clean up: we can remove everything from this state, because we + * want to only track the new keys that will be accumulated starting + * from now. */ + sdsfree(proto); + } + raxFree(bs->keys); + bs->keys = raxNew(); + } + raxStop(&ri); +} + /* This is just used in order to access the amount of used slots in the * tracking table. */ uint64_t trackingGetTotalItems(void) { return TrackingTableTotalItems; } + +uint64_t trackingGetTotalKeys(void) { + if (TrackingTable == NULL) return 0; + return raxSize(TrackingTable); +} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index e83f3657a..dfcdb8e35 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -92,7 +92,7 @@ set ::file ""; # If set, runs only the tests in this comma separated list set ::curfile ""; # Hold the filename of the current suite set ::accurate 0; # If true runs fuzz tests with more iterations set ::force_failure 0 -set ::timeout 600; # 10 minutes without progresses will quit the test. +set ::timeout 1200; # 20 minutes without progresses will quit the test. set ::last_progress [clock seconds] set ::active_servers {} ; # Pids of active Redis instances. set ::dont_clean 0 diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index d152e212c..ec80c7384 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -209,5 +209,97 @@ start_server {tags {"defrag"}} { assert {$digest eq $newdigest} r save ;# saving an rdb iterates over all the data / pointers } {OK} + + test "Active defrag big list" { + r flushdb + r config resetstat + r config set save "" ;# prevent bgsave from interfereing with save below + r config set hz 100 + r config set activedefrag no + r config set active-defrag-max-scan-fields 1000 + r config set active-defrag-threshold-lower 5 + r config set active-defrag-cycle-min 65 + r config set active-defrag-cycle-max 75 + r config set active-defrag-ignore-bytes 2mb + r config set maxmemory 0 + r config set list-max-ziplist-size 5 ;# list of 500k items will have 100k quicklist nodes + + # create big keys with 10k items + set rd [redis_deferring_client] + + set expected_frag 1.7 + # add a mass of list nodes to two lists (allocations are interlaced) + set val [string repeat A 100] ;# 5 items of 100 bytes puts us in the 640 bytes bin, which has 32 regs, so high potential for fragmentation + for {set j 0} {$j < 500000} {incr j} { + $rd lpush biglist1 $val + $rd lpush biglist2 $val + } + for {set j 0} {$j < 500000} {incr j} { + $rd read ; # Discard replies + $rd read ; # Discard replies + } + + # create some fragmentation + r del biglist2 + + # start defrag + after 120 ;# serverCron only updates the info once in 100ms + set frag [s allocator_frag_ratio] + if {$::verbose} { + puts "frag $frag" + } + + assert {$frag >= $expected_frag} + r config set latency-monitor-threshold 5 + r latency reset + + set digest [r debug digest] + catch {r config set activedefrag yes} e + if {![string match {DISABLED*} $e]} { + # wait for the active defrag to start working (decision once a second) + wait_for_condition 50 100 { + [s active_defrag_running] ne 0 + } else { + fail "defrag not started." + } + + # wait for the active defrag to stop working + wait_for_condition 500 100 { + [s active_defrag_running] eq 0 + } else { + after 120 ;# serverCron only updates the info once in 100ms + puts [r info memory] + puts [r info stats] + puts [r memory malloc-stats] + fail "defrag didn't stop." + } + + # test the the fragmentation is lower + after 120 ;# serverCron only updates the info once in 100ms + set frag [s allocator_frag_ratio] + set max_latency 0 + foreach event [r latency latest] { + lassign $event eventname time latency max + if {$eventname == "active-defrag-cycle"} { + set max_latency $max + } + } + if {$::verbose} { + puts "frag $frag" + puts "max latency $max_latency" + puts [r latency latest] + puts [r latency history active-defrag-cycle] + } + assert {$frag < 1.1} + # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, + # we expect max latency to be not much higher than 7.5ms + assert {$max_latency <= 12} + } + # verify the data isn't corrupted or changed + set newdigest [r debug digest] + assert {$digest eq $newdigest} + r save ;# saving an rdb iterates over all the data / pointers + r del biglist1 ;# coverage for quicklistBookmarksClear + } {1} } } diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl new file mode 100644 index 000000000..2058319f7 --- /dev/null +++ b/tests/unit/tracking.tcl @@ -0,0 +1,66 @@ +start_server {tags {"tracking"}} { + # Create a deferred client we'll use to redirect invalidation + # messages to. + set rd1 [redis_deferring_client] + $rd1 client id + set redir [$rd1 read] + $rd1 subscribe __redis__:invalidate + $rd1 read ; # Consume the SUBSCRIBE reply. + + test {Clients are able to enable tracking and redirect it} { + r CLIENT TRACKING on REDIRECT $redir + } {*OK} + + test {The other connection is able to get invalidations} { + r SET a 1 + r GET a + r INCR a + r INCR b ; # This key should not be notified, since it wasn't fetched. + set keys [lindex [$rd1 read] 2] + assert {[llength $keys] == 1} + assert {[lindex $keys 0] eq {a}} + } + + test {The client is now able to disable tracking} { + # Make sure to add a few more keys in the tracking list + # so that we can check for leaks, as a side effect. + r MGET a b c d e f g + r CLIENT TRACKING off + } + + test {Clients can enable the BCAST mode with the empty prefix} { + r CLIENT TRACKING on BCAST REDIRECT $redir + } {*OK*} + + test {The connection gets invalidation messages about all the keys} { + r MSET a 1 b 2 c 3 + set keys [lsort [lindex [$rd1 read] 2]] + assert {$keys eq {a b c}} + } + + test {Clients can enable the BCAST mode with prefixes} { + r CLIENT TRACKING off + r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX a: PREFIX b: + r MULTI + r INCR a:1 + r INCR a:2 + r INCR b:1 + r INCR b:2 + r EXEC + # Because of the internals, we know we are going to receive + # two separated notifications for the two different prefixes. + set keys1 [lsort [lindex [$rd1 read] 2]] + set keys2 [lsort [lindex [$rd1 read] 2]] + set keys [lsort [list {*}$keys1 {*}$keys2]] + assert {$keys eq {a:1 a:2 b:1 b:2}} + } + + test {Adding prefixes to BCAST mode works} { + r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX c: + r INCR c:1234 + set keys [lsort [lindex [$rd1 read] 2]] + assert {$keys eq {c:1234}} + } + + $rd1 close +} diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index a59e168ef..072ed14d6 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -161,6 +161,15 @@ start_server { assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}} } + test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 100 STREAMS mystream ">" + r XGROUP DESTROY mystream mygroup + assert_error "*NOGROUP*" {$rd read} + } + test {XCLAIM can claim PEL items from another consumer} { # Add 3 items into the stream, and create a consumer group r del mystream