Merge commit '79e8b17d7b44c793d8b22668b8583a297ee1b387' into redis_6_merge

Former-commit-id: 28cbed1d13961c5568f2bdc50c6a23107d3434d0
This commit is contained in:
John Sully 2020-04-14 20:09:53 -04:00
commit 2c049c16a2
16 changed files with 709 additions and 78 deletions

View File

@ -2253,7 +2253,6 @@ standardConfig configs[] = {
createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->key_load_delay, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("tracking-table-max-fill", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->tracking_table_max_fill, 10, INTEGER_CONFIG, NULL, NULL), /* Default: 10% tracking table max fill. */
createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, cserver.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */
createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ),
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves),
@ -2288,6 +2287,7 @@ standardConfig configs[] = {
createSizeTConfig("stream-node-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->stream_node_max_bytes, 4096, MEMORY_CONFIG, NULL, NULL),
createSizeTConfig("zset-max-ziplist-value", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->zset_max_ziplist_value, 64, MEMORY_CONFIG, NULL, NULL),
createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL),
createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */
/* Other configs */
createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */

View File

@ -1550,8 +1550,10 @@ int expireIfNeeded(redisDb *db, robj *key) {
propagateExpire(db,key,g_pserver->lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
return g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
int retval = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
if (retval) signalModifiedKey(db,key);
return retval;
}
/* -----------------------------------------------------------------------------

View File

@ -5,8 +5,8 @@
* We do that by scanning the keyspace and for each pointer we have, we can try to
* ask the allocator if moving it to a new address will help reduce fragmentation.
*
* Copyright (c) 2017, Oran Agra
* Copyright (c) 2017, Redis Labs, Inc
* Copyright (c) 2020, Oran Agra
* Copyright (c) 2020, Redis Labs, Inc
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -421,25 +421,32 @@ bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) {
return false;
}
long activeDefragQuickListNodes(quicklist *ql) {
quicklistNode *node = ql->head, *newnode;
long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
quicklistNode *newnode, *node = *node_ref;
long defragged = 0;
unsigned char *newzl;
if ((newnode = (quicklistNode*)activeDefragAlloc(node))) {
if (newnode->prev)
newnode->prev->next = newnode;
else
ql->head = newnode;
if (newnode->next)
newnode->next->prev = newnode;
else
ql->tail = newnode;
*node_ref = node = newnode;
defragged++;
}
if ((newzl = (unsigned char*)activeDefragAlloc(node->zl)))
defragged++, node->zl = newzl;
return defragged;
}
long activeDefragQuickListNodes(quicklist *ql) {
quicklistNode *node = ql->head;
long defragged = 0;
while (node) {
if ((newnode = (quicklistNode*)activeDefragAlloc(node))) {
if (newnode->prev)
newnode->prev->next = newnode;
else
ql->head = newnode;
if (newnode->next)
newnode->next->prev = newnode;
else
ql->tail = newnode;
node = newnode;
defragged++;
}
if ((newzl = (unsigned char*)activeDefragAlloc(node->zl)))
defragged++, node->zl = newzl;
defragged += activeDefragQuickListNode(ql, &node);
node = node->next;
}
return defragged;
@ -453,12 +460,48 @@ void defragLater(redisDb *db, dictEntry *kde) {
listAddNodeTail(db->defrag_later, key);
}
long scanLaterList(robj *ob) {
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
quicklist *ql = (quicklist*)ptrFromObj(ob);
quicklistNode *node;
long iterations = 0;
int bookmark_failed = 0;
if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST)
return 0;
g_pserver->stat_active_defrag_scanned+=ql->len;
return activeDefragQuickListNodes(ql);
if (*cursor == 0) {
/* if cursor is 0, we start new iteration */
node = ql->head;
} else {
node = quicklistBookmarkFind(ql, "_AD");
if (!node) {
/* if the bookmark was deleted, it means we reached the end. */
*cursor = 0;
return 0;
}
node = node->next;
}
(*cursor)++;
while (node) {
(*defragged) += activeDefragQuickListNode(ql, &node);
g_pserver->stat_active_defrag_scanned++;
if (++iterations > 128 && !bookmark_failed) {
if (ustime() > endtime) {
if (!quicklistBookmarkCreate(&ql, "_AD", node)) {
bookmark_failed = 1;
} else {
ob->m_ptr = ql; /* bookmark creation may have re-allocated the quicklist */
return 1;
}
}
iterations = 0;
}
node = node->next;
}
quicklistBookmarkDelete(ql, "_AD");
*cursor = 0;
return bookmark_failed? 1: 0;
}
typedef struct {
@ -651,7 +694,8 @@ int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime,
void *newdata = activeDefragAlloc(ri.data);
if (newdata)
raxSetData(ri.node, ri.data=newdata), (*defragged)++;
if (++iterations > 16) {
g_pserver->stat_active_defrag_scanned++;
if (++iterations > 128) {
if (ustime() > endtime) {
serverAssert(ri.key_len==sizeof(last));
memcpy(last,ri.key,ri.key_len);
@ -916,8 +960,7 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) {
if (de) {
robj *ob = (robj*)dictGetVal(de);
if (ob->type == OBJ_LIST) {
g_pserver->stat_active_defrag_hits += scanLaterList(ob);
*cursor = 0; /* list has no scan, we must finish it in one go */
return scanLaterList(ob, cursor, endtime, &g_pserver->stat_active_defrag_hits);
} else if (ob->type == OBJ_SET) {
g_pserver->stat_active_defrag_hits += scanLaterSet(ob, cursor);
} else if (ob->type == OBJ_ZSET) {
@ -977,11 +1020,6 @@ int defragLaterStep(redisDb *db, long long endtime) {
if (defragLaterItem(de, &defrag_later_cursor, endtime))
quit = 1; /* time is up, we didn't finish all the work */
/* Don't start a new BIG key in this loop, this is because the
* next key can be a list, and scanLaterList must be done in once cycle */
if (!defrag_later_cursor)
quit = 1;
/* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields
* (if we have a lot of pointers in one hash bucket, or rehashing),
* check if we reached the time limit. */

View File

@ -176,6 +176,7 @@ client *createClient(connection *conn, int iel) {
c->master_error = 0;
memset(c->uuid, 0, UUID_BINARY_LEN);
c->client_tracking_prefixes = NULL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
@ -2584,7 +2585,6 @@ int clientSetNameOrReply(client *c, robj *name) {
void clientCommand(client *c) {
listNode *ln;
listIter li;
client *client;
if (c->argc == 2 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"help")) {
const char *help[] = {
@ -2698,7 +2698,7 @@ NULL
/* Iterate clients killing all the matching clients. */
listRewind(g_pserver->clients,&li);
while ((ln = listNext(&li)) != NULL) {
client = (struct client*)listNodeValue(ln);
client *client = (struct client*)listNodeValue(ln);
if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
if (type != -1 && getClientType(client) != type) continue;
if (id != 0 && client->id != id) continue;
@ -2787,38 +2787,74 @@ NULL
UNIT_MILLISECONDS) != C_OK) return;
pauseClients(duration);
addReply(c,shared.ok);
} else if (!strcasecmp(szFromObj(c->argv[1]),"tracking") &&
(c->argc == 3 || c->argc == 5))
{
/* CLIENT TRACKING (on|off) [REDIRECT <id>] */
} else if (!strcasecmp(szFromObj(c->argv[1]),"tracking") && c->argc >= 3) {
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
* [PREFIX second] ... */
long long redir = 0;
int bcast = 0;
robj **prefix = NULL;
size_t numprefix = 0;
/* Parse the redirection option: we'll require the client with
* the specified ID to exist right now, even if it is possible
* it will get disconnected later. */
if (c->argc == 5) {
if (strcasecmp(szFromObj(c->argv[3]),"redirect") != 0) {
addReply(c,shared.syntaxerr);
return;
} else {
if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) !=
/* Parse the options. */
for (int j = 3; j < c->argc; j++) {
int moreargs = (c->argc-1) - j;
if (!strcasecmp(szFromObj(c->argv[j]),"redirect") && moreargs) {
j++;
if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) !=
C_OK) return;
/* We will require the client with the specified ID to exist
* right now, even if it is possible that it gets disconnected
* later. Still a valid sanity check. */
if (lookupClientByID(redir) == NULL) {
addReplyError(c,"The client ID you want redirect to "
"does not exist");
return;
}
} else if (!strcasecmp(szFromObj(c->argv[j]),"bcast")) {
bcast++;
} else if (!strcasecmp(szFromObj(c->argv[j]),"prefix") && moreargs) {
j++;
prefix = (robj**)zrealloc(prefix,sizeof(robj*)*(numprefix+1), MALLOC_LOCAL);
prefix[numprefix++] = c->argv[j];
} else {
zfree(prefix);
addReply(c,shared.syntaxerr);
return;
}
}
/* Options are ok: enable or disable the tracking for this client. */
if (!strcasecmp(szFromObj(c->argv[2]),"on")) {
enableTracking(c,redir);
/* Before enabling tracking, make sure options are compatible
* among each other and with the current state of the client. */
if (!bcast && numprefix) {
addReplyError(c,
"PREFIX option requires BCAST mode to be enabled");
zfree(prefix);
return;
}
if (c->flags & CLIENT_TRACKING) {
int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST);
if (oldbcast != bcast) {
addReplyError(c,
"You can't switch BCAST mode on/off before disabling "
"tracking for this client, and then re-enabling it with "
"a different mode.");
zfree(prefix);
return;
}
}
enableTracking(c,redir,bcast,prefix,numprefix);
} else if (!strcasecmp(szFromObj(c->argv[2]),"off")) {
disableTracking(c);
} else {
zfree(prefix);
addReply(c,shared.syntaxerr);
return;
}
zfree(prefix);
addReply(c,shared.ok);
} else if (!strcasecmp(szFromObj(c->argv[1]),"getredir") && c->argc == 2) {
/* CLIENT GETREDIR */

View File

@ -1151,13 +1151,13 @@ sds getMemoryDoctorReport(void) {
num_reports++;
}
/* Allocator fss is higher than 1.1 and 10MB ? */
/* Allocator rss is higher than 1.1 and 10MB ? */
if (mh->allocator_rss > 1.1 && mh->allocator_rss_bytes > 10<<20) {
high_alloc_rss = 1;
num_reports++;
}
/* Non-Allocator fss is higher than 1.1 and 10MB ? */
/* Non-Allocator rss is higher than 1.1 and 10MB ? */
if (mh->rss_extra > 1.1 && mh->rss_extra_bytes > 10<<20) {
high_proc_rss = 1;
num_reports++;

View File

@ -35,7 +35,11 @@ int clientSubscriptionsCount(client *c);
* Pubsub client replies API
*----------------------------------------------------------------------------*/
/* Send a pubsub message of type "message" to the client. */
/* Send a pubsub message of type "message" to the client.
* Normally 'msg' is a Redis object containing the string to send as
* message. However if the caller sets 'msg' as NULL, it will be able
* to send a special message (for instance an Array type) by using the
* addReply*() API family. */
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
if (c->resp == 2)
addReplyAsync(c,shared.mbulkhdr[3]);
@ -43,7 +47,7 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
addReplyPushLenAsync(c,3);
addReplyAsync(c,shared.messagebulk);
addReplyBulkAsync(c,channel);
addReplyBulkAsync(c,msg);
if (msg) addReplyBulkAsync(c,msg);
}
/* Send a pubsub message of type "pmessage" to the client. The difference

View File

@ -70,6 +70,12 @@ static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
} while (0);
#endif
/* Bookmarks forward declarations */
#define QL_MAX_BM ((1 << QL_BM_BITS)-1)
quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name);
quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node);
void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm);
/* Simple way to give quicklistEntry structs default values with one call. */
#define initEntry(e) \
do { \
@ -100,10 +106,11 @@ quicklist *quicklistCreate(void) {
quicklist->count = 0;
quicklist->compress = 0;
quicklist->fill = -2;
quicklist->bookmark_count = 0;
return quicklist;
}
#define COMPRESS_MAX (1 << 16)
#define COMPRESS_MAX (1 << QL_COMP_BITS)
void quicklistSetCompressDepth(quicklist *quicklist, int compress) {
if (compress > COMPRESS_MAX) {
compress = COMPRESS_MAX;
@ -113,7 +120,7 @@ void quicklistSetCompressDepth(quicklist *quicklist, int compress) {
quicklist->compress = compress;
}
#define FILL_MAX (1 << 15)
#define FILL_MAX (1 << (QL_FILL_BITS-1))
void quicklistSetFill(quicklist *quicklist, int fill) {
if (fill > FILL_MAX) {
fill = FILL_MAX;
@ -169,6 +176,7 @@ void quicklistRelease(quicklist *quicklist) {
quicklist->len--;
current = next;
}
quicklistBookmarksClear(quicklist);
zfree(quicklist);
}
@ -578,6 +586,15 @@ quicklist *quicklistCreateFromZiplist(int fill, int compress,
REDIS_STATIC void __quicklistDelNode(quicklist *quicklist,
quicklistNode *node) {
/* Update the bookmark if any */
quicklistBookmark *bm = _quicklistBookmarkFindByNode(quicklist, node);
if (bm) {
bm->node = node->next;
/* if the bookmark was to the last node, delete it. */
if (!bm->node)
_quicklistBookmarkDelete(quicklist, bm);
}
if (node->next)
node->next->prev = node->prev;
if (node->prev)
@ -1410,6 +1427,87 @@ void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
}
}
/* Create or update a bookmark in the list which will be updated to the next node
* automatically when the one referenced gets deleted.
* Returns 1 on success (creation of new bookmark or override of an existing one).
* Returns 0 on failure (reached the maximum supported number of bookmarks).
* NOTE: use short simple names, so that string compare on find is quick.
* NOTE: bookmakrk creation may re-allocate the quicklist, so the input pointer
may change and it's the caller responsibilty to update the reference.
*/
int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node) {
quicklist *ql = *ql_ref;
if (ql->bookmark_count >= QL_MAX_BM)
return 0;
quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name);
if (bm) {
bm->node = node;
return 1;
}
ql = zrealloc(ql, sizeof(quicklist) + (ql->bookmark_count+1) * sizeof(quicklistBookmark), MALLOC_SHARED);
*ql_ref = ql;
ql->bookmarks[ql->bookmark_count].node = node;
ql->bookmarks[ql->bookmark_count].name = zstrdup(name);
ql->bookmark_count++;
return 1;
}
/* Find the quicklist node referenced by a named bookmark.
* When the bookmarked node is deleted the bookmark is updated to the next node,
* and if that's the last node, the bookmark is deleted (so find returns NULL). */
quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name) {
quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name);
if (!bm) return NULL;
return bm->node;
}
/* Delete a named bookmark.
* returns 0 if bookmark was not found, and 1 if deleted.
* Note that the bookmark memory is not freed yet, and is kept for future use. */
int quicklistBookmarkDelete(quicklist *ql, const char *name) {
quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name);
if (!bm)
return 0;
_quicklistBookmarkDelete(ql, bm);
return 1;
}
quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name) {
unsigned i;
for (i=0; i<ql->bookmark_count; i++) {
if (!strcmp(ql->bookmarks[i].name, name)) {
return &ql->bookmarks[i];
}
}
return NULL;
}
quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node) {
unsigned i;
for (i=0; i<ql->bookmark_count; i++) {
if (ql->bookmarks[i].node == node) {
return &ql->bookmarks[i];
}
}
return NULL;
}
void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm) {
int index = bm - ql->bookmarks;
zfree(bm->name);
ql->bookmark_count--;
memmove(bm, bm+1, (ql->bookmark_count - index)* sizeof(*bm));
/* NOTE: We do not shrink (realloc) the quicklist yet (to avoid resonance,
* it may be re-used later (a call to realloc may NOP). */
}
void quicklistBookmarksClear(quicklist *ql) {
while (ql->bookmark_count)
zfree(ql->bookmarks[--ql->bookmark_count].name);
/* NOTE: We do not shrink (realloc) the quick list. main use case for this
* function is just before releasing the allocation. */
}
/* The rest of this file is test cases and test helpers. */
#ifdef REDIS_TEST
#include <stdint.h>
@ -2641,6 +2739,54 @@ int quicklistTest(int argc, char *argv[]) {
printf("Compressions: %0.2f seconds.\n", (float)(stop - start) / 1000);
printf("\n");
TEST("bookmark get updated to next item") {
quicklist *ql = quicklistNew(1, 0);
quicklistPushTail(ql, "1", 1);
quicklistPushTail(ql, "2", 1);
quicklistPushTail(ql, "3", 1);
quicklistPushTail(ql, "4", 1);
quicklistPushTail(ql, "5", 1);
assert(ql->len==5);
/* add two bookmarks, one pointing to the node before the last. */
assert(quicklistBookmarkCreate(&ql, "_dummy", ql->head->next));
assert(quicklistBookmarkCreate(&ql, "_test", ql->tail->prev));
/* test that the bookmark returns the right node, delete it and see that the bookmark points to the last node */
assert(quicklistBookmarkFind(ql, "_test") == ql->tail->prev);
assert(quicklistDelRange(ql, -2, 1));
assert(quicklistBookmarkFind(ql, "_test") == ql->tail);
/* delete the last node, and see that the bookmark was deleted. */
assert(quicklistDelRange(ql, -1, 1));
assert(quicklistBookmarkFind(ql, "_test") == NULL);
/* test that other bookmarks aren't affected */
assert(quicklistBookmarkFind(ql, "_dummy") == ql->head->next);
assert(quicklistBookmarkFind(ql, "_missing") == NULL);
assert(ql->len==3);
quicklistBookmarksClear(ql); /* for coverage */
assert(quicklistBookmarkFind(ql, "_dummy") == NULL);
quicklistRelease(ql);
}
TEST("bookmark limit") {
int i;
quicklist *ql = quicklistNew(1, 0);
quicklistPushHead(ql, "1", 1);
for (i=0; i<QL_MAX_BM; i++)
assert(quicklistBookmarkCreate(&ql, genstr("",i), ql->head));
/* when all bookmarks are used, creation fails */
assert(!quicklistBookmarkCreate(&ql, "_test", ql->head));
/* delete one and see that we can now create another */
assert(quicklistBookmarkDelete(ql, "0"));
assert(quicklistBookmarkCreate(&ql, "_test", ql->head));
/* delete one and see that the rest survive */
assert(quicklistBookmarkDelete(ql, "_test"));
for (i=1; i<QL_MAX_BM; i++)
assert(quicklistBookmarkFind(ql, genstr("",i)) == ql->head);
/* make sure the deleted ones are indeed gone */
assert(!quicklistBookmarkFind(ql, "0"));
assert(!quicklistBookmarkFind(ql, "_test"));
quicklistRelease(ql);
}
if (!err)
printf("ALL TESTS PASSED!\n");
else

View File

@ -28,6 +28,8 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdint.h> // for UINTPTR_MAX
#ifndef __QUICKLIST_H__
#define __QUICKLIST_H__
@ -72,19 +74,53 @@ typedef struct quicklistLZF {
#endif
} quicklistLZF;
/* Bookmarks are padded with realloc at the end of of the quicklist struct.
* They should only be used for very big lists if thousands of nodes were the
* excess memory usage is negligible, and there's a real need to iterate on them
* in portions.
* When not used, they don't add any memory overhead, but when used and then
* deleted, some overhead remains (to avoid resonance).
* The number of bookmarks used should be kept to minimum since it also adds
* overhead on node deletion (searching for a bookmark to update). */
typedef struct quicklistBookmark {
quicklistNode *node;
char *name;
} quicklistBookmark;
#if UINTPTR_MAX == 0xffffffff
/* 32-bit */
# define QL_FILL_BITS 14
# define QL_COMP_BITS 14
# define QL_BM_BITS 4
#elif UINTPTR_MAX == 0xffffffffffffffff
/* 64-bit */
# define QL_FILL_BITS 16
# define QL_COMP_BITS 16
# define QL_BM_BITS 4 /* we can encode more, but we rather limit the user
since they cause performance degradation. */
#else
# error unknown arch bits count
#endif
/* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist.
* 'count' is the number of total entries.
* 'len' is the number of quicklist nodes.
* 'compress' is: -1 if compression disabled, otherwise it's the number
* of quicklistNodes to leave uncompressed at ends of quicklist.
* 'fill' is the user-requested (or default) fill factor. */
* 'fill' is the user-requested (or default) fill factor.
* 'bookmakrs are an optional feature that is used by realloc this struct,
* so that they don't consume memory when not used. */
typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* total count of all entries in all ziplists */
unsigned long len; /* number of quicklistNodes */
int fill : 16; /* fill factor for individual nodes */
unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
int fill : QL_FILL_BITS; /* fill factor for individual nodes */
unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
unsigned int bookmark_count: QL_BM_BITS;
#ifndef __cplusplus
quicklistBookmark bookmarks[];
#endif
} quicklist;
typedef struct quicklistIter {
@ -170,6 +206,12 @@ unsigned long quicklistCount(const quicklist *ql);
int quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len);
size_t quicklistGetLzf(const quicklistNode *node, void **data);
/* bookmarks */
int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node);
int quicklistBookmarkDelete(quicklist *ql, const char *name);
quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name);
void quicklistBookmarksClear(quicklist *ql);
#ifdef REDIS_TEST
int quicklistTest(int argc, char *argv[]);
#endif

View File

@ -2209,6 +2209,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
processUnblockedClients(IDX_EVENT_LOOP_MAIN);
}
/* Send the invalidation messages to clients participating to the
* client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);
@ -3526,8 +3530,11 @@ void call(client *c, int flags) {
if (c->cmd->flags & CMD_READONLY) {
client *caller = (c->flags & CLIENT_LUA && g_pserver->lua_caller) ?
g_pserver->lua_caller : c;
if (caller->flags & CLIENT_TRACKING)
if (caller->flags & CLIENT_TRACKING &&
!(caller->flags & CLIENT_TRACKING_BCAST))
{
trackingRememberKeys(caller);
}
}
g_pserver->stat_numcommands++;
@ -4477,6 +4484,7 @@ sds genRedisInfoString(const char *section) {
"active_defrag_misses:%lld\r\n"
"active_defrag_key_hits:%lld\r\n"
"active_defrag_key_misses:%lld\r\n"
"tracking_total_keys:%lld\r\n"
"tracking_total_items:%llu\r\n",
g_pserver->stat_numconnections,
g_pserver->stat_numcommands,
@ -4505,6 +4513,7 @@ sds genRedisInfoString(const char *section) {
g_pserver->stat_active_defrag_misses,
g_pserver->stat_active_defrag_key_hits,
g_pserver->stat_active_defrag_key_misses,
(unsigned long long) trackingGetTotalKeys(),
(unsigned long long) trackingGetTotalItems());
}

View File

@ -412,6 +412,7 @@ public:
perform client side caching. */
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
#define CLIENT_FORCE_REPLY (1ULL<<33) /* Should addReply be forced to write the text? */
#define CLIENT_TRACKING_BCAST (1ULL<<34) /* Tracking in BCAST mode. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@ -1298,7 +1299,9 @@ typedef struct client {
* invalidation messages for keys fetched by this client will be send to
* the specified client ID. */
uint64_t client_tracking_redirection;
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
subscribed to in BCAST mode, in the
context of client side caching. */
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
@ -1848,7 +1851,7 @@ struct redisServer {
list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Client side caching. */
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
int tracking_table_max_fill; /* Max fill percentage. */
size_t tracking_table_max_keys; /* Max number of keys in tracking table. */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
@ -2224,13 +2227,15 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
#endif
/* Client side caching (tracking mode) */
void enableTracking(client *c, uint64_t redirect_to);
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix);
void disableTracking(client *c);
void trackingRememberKeys(client *c);
void trackingInvalidateKey(robj *keyobj);
void trackingInvalidateKeysOnFlush(int dbid);
void trackingLimitUsedSlots(void);
uint64_t trackingGetTotalItems(void);
uint64_t trackingGetTotalKeys(void);
void trackingBroadcastInvalidationMessages(void);
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);

View File

@ -1856,6 +1856,8 @@ NULL
g_pserver->dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy",
c->argv[2],c->db->id);
/* We want to unblock any XREADGROUP consumers with -NOGROUP. */
signalKeyAsReady(c->db,c->argv[2]);
} else {
addReply(c,shared.czero);
}

View File

@ -42,12 +42,22 @@
* Clients will normally take frequently requested objects in memory, removing
* them when invalidation messages are received. */
rax *TrackingTable = NULL;
rax *PrefixTable = NULL;
uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
the whole tracking table. This givesn
an hint about the total memory we
are using server side for CSC. */
robj *TrackingChannelName;
/* This is the structure that we have as value of the PrefixTable, and
* represents the list of keys modified, and the list of clients that need
* to be notified, for a given prefix. */
typedef struct bcastState {
rax *keys; /* Keys modified in the current event loop cycle. */
rax *clients; /* Clients subscribed to the notification events for this
prefix. */
} bcastState;
/* Remove the tracking state from the client 'c'. Note that there is not much
* to do for us here, if not to decrement the counter of the clients in
* tracking mode, because we just store the ID of the client in the tracking
@ -55,9 +65,55 @@ robj *TrackingChannelName;
* client with many entries in the table is removed, it would cost a lot of
* time to do the cleanup. */
void disableTracking(client *c) {
/* If this client is in broadcasting mode, we need to unsubscribe it
* from all the prefixes it is registered to. */
if (c->flags & CLIENT_TRACKING_BCAST) {
raxIterator ri;
raxStart(&ri,c->client_tracking_prefixes);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
bcastState *bs = (bcastState*)raxFind(PrefixTable,ri.key,ri.key_len);
serverAssert(bs != raxNotFound);
raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL);
/* Was it the last client? Remove the prefix from the
* table. */
if (raxSize(bs->clients) == 0) {
raxFree(bs->clients);
raxFree(bs->keys);
zfree(bs);
raxRemove(PrefixTable,ri.key,ri.key_len,NULL);
}
}
raxStop(&ri);
raxFree(c->client_tracking_prefixes);
c->client_tracking_prefixes = NULL;
}
/* Clear flags and adjust the count. */
if (c->flags & CLIENT_TRACKING) {
g_pserver->tracking_clients--;
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR);
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
CLIENT_TRACKING_BCAST);
}
}
/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
* already registered for the specified prefix, no operation is performed. */
void enableBcastTrackingForPrefix(client *c, const char *prefix, size_t plen) {
bcastState *bs = (bcastState*)raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix));
/* If this is the first client subscribing to such prefix, create
* the prefix in the table. */
if (bs == raxNotFound) {
bs = (bcastState*)zmalloc(sizeof(*bs));
bs->keys = raxNew();
bs->clients = raxNew();
raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
}
if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
if (c->client_tracking_prefixes == NULL)
c->client_tracking_prefixes = raxNew();
raxInsert(c->client_tracking_prefixes,
(unsigned char*)prefix,plen,NULL,NULL);
}
}
@ -68,16 +124,25 @@ void disableTracking(client *c) {
* eventually get freed, we'll send a message to the original client to
* inform it of the condition. Multiple clients can redirect the invalidation
* messages to the same client ID. */
void enableTracking(client *c, uint64_t redirect_to) {
if (c->flags & CLIENT_TRACKING) return;
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) {
if (!(c->flags & CLIENT_TRACKING)) g_pserver->tracking_clients++;
c->flags |= CLIENT_TRACKING;
c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST);
c->client_tracking_redirection = redirect_to;
g_pserver->tracking_clients++;
if (TrackingTable == NULL) {
TrackingTable = raxNew();
PrefixTable = raxNew();
TrackingChannelName = createStringObject("__redis__:invalidate",20);
}
if (bcast) {
c->flags |= CLIENT_TRACKING_BCAST;
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
for (size_t j = 0; j < numprefix; j++) {
sds sdsprefix = szFromObj(prefix[j]);
enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
}
}
}
/* This function is called after the excution of a readonly command in the
@ -106,7 +171,17 @@ void trackingRememberKeys(client *c) {
getKeysFreeResult(keys);
}
void sendTrackingMessage(client *c, const char *keyname, size_t keylen) {
/* Given a key name, this function sends an invalidation message in the
* proper channel (depending on RESP version: PubSub or Push message) and
* to the proper client (in case fo redirection), in the context of the
* client 'c' with tracking enabled.
*
* In case the 'proto' argument is non zero, the function will assume that
* 'keyname' points to a buffer of 'keylen' bytes already expressed in the
* form of Redis RESP protocol, representing an array of keys to send
* to the client as value of the invalidation. This is used in BCAST mode
* in order to optimized the implementation to use less CPU time. */
void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int proto) {
int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
@ -132,15 +207,45 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen) {
if (c->resp > 2) {
addReplyPushLen(c,2);
addReplyBulkCBuffer(c,"invalidate",10);
addReplyBulkCBuffer(c,keyname,keylen);
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
/* We use a static object to speedup things, however we assume
* that addReplyPubsubMessage() will not take a reference. */
robj keyobj;
initStaticStringObject(keyobj,(char*)keyname);
addReplyPubsubMessage(c,TrackingChannelName,&keyobj);
serverAssert(keyobj.getrefcount(std::memory_order_relaxed) == 1);
addReplyPubsubMessage(c,TrackingChannelName,NULL);
} else {
/* If are here, the client is not using RESP3, nor is
* redirecting to another client. We can't send anything to
* it since RESP2 does not support push messages in the same
* connection. */
return;
}
/* Send the "value" part, which is the array of keys. */
if (proto) {
addReplyProto(c,keyname,keylen);
} else {
addReplyArrayLen(c,1);
addReplyBulkCBuffer(c,keyname,keylen);
}
}
/* This function is called when a key is modified in Redis and in the case
* we have at least one client with the BCAST mode enabled.
* Its goal is to set the key in the right broadcast state if the key
* matches one or more prefixes in the prefix table. Later when we
* return to the event loop, we'll send invalidation messages to the
* clients subscribed to each prefix. */
void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
raxIterator ri;
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
if (ri.key_len > keylen) continue;
if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
continue;
bcastState *bs = (bcastState*)ri.data;
raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL);
}
raxStop(&ri);
}
/* This function is called from signalModifiedKey() or other places in Redis
@ -150,6 +255,10 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen) {
void trackingInvalidateKey(robj *keyobj) {
if (TrackingTable == NULL) return;
sds sdskey = szFromObj(keyobj);
if (raxSize(PrefixTable) > 0)
trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey));
rax *ids = (rax*)raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
if (ids == raxNotFound) return;;
@ -160,8 +269,18 @@ void trackingInvalidateKey(robj *keyobj) {
uint64_t id;
memcpy(&id,ri.key,sizeof(id));
client *c = lookupClientByID(id);
if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
sendTrackingMessage(c,sdskey,sdslen(sdskey));
/* Note that if the client is in BCAST mode, we don't want to
* send invalidation messages that were pending in the case
* previously the client was not in BCAST mode. This can happen if
* TRACKING is enabled normally, and then the client switches to
* BCAST mode. */
if (c == NULL ||
!(c->flags & CLIENT_TRACKING)||
c->flags & CLIENT_TRACKING_BCAST)
{
continue;
}
sendTrackingMessage(c,sdskey,sdslen(sdskey),0);
}
raxStop(&ri);
@ -198,7 +317,7 @@ void trackingInvalidateKeysOnFlush(int dbid) {
while ((ln = listNext(&li)) != NULL) {
client *c = (client*)listNodeValue(ln);
if (c->flags & CLIENT_TRACKING) {
sendTrackingMessage(c,"",1);
sendTrackingMessage(c,"",1,0);
}
}
}
@ -223,8 +342,8 @@ void trackingInvalidateKeysOnFlush(int dbid) {
void trackingLimitUsedSlots(void) {
static unsigned int timeout_counter = 0;
if (TrackingTable == NULL) return;
if (g_pserver->tracking_table_max_fill == 0) return; /* No limits set. */
size_t max_keys = g_pserver->tracking_table_max_fill;
if (g_pserver->tracking_table_max_keys == 0) return; /* No limits set. */
size_t max_keys = g_pserver->tracking_table_max_keys;
if (raxSize(TrackingTable) <= max_keys) {
timeout_counter = 0;
return; /* Limit not reached. */
@ -259,8 +378,69 @@ void trackingLimitUsedSlots(void) {
timeout_counter++;
}
/* This function will run the prefixes of clients in BCAST mode and
* keys that were modified about each prefix, and will send the
* notifications to each client in each prefix. */
void trackingBroadcastInvalidationMessages(void) {
raxIterator ri, ri2;
/* Return ASAP if there is nothing to do here. */
if (TrackingTable == NULL || !g_pserver->tracking_clients) return;
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
bcastState *bs = (bcastState*)ri.data;
if (raxSize(bs->keys)) {
/* Create the array reply with the list of keys once, then send
* it to all the clients subscribed to this prefix. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys));
sds proto = sdsempty();
proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15);
proto = sdscatlen(proto,"*",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
raxStart(&ri2,bs->keys);
raxSeek(&ri2,"^",NULL,0);
while(raxNext(&ri2)) {
len = ll2string(buf,sizeof(buf),ri2.key_len);
proto = sdscatlen(proto,"$",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
proto = sdscatlen(proto,ri2.key,ri2.key_len);
proto = sdscatlen(proto,"\r\n",2);
}
raxStop(&ri2);
/* Send this array of keys to every client in the list. */
raxStart(&ri2,bs->clients);
raxSeek(&ri2,"^",NULL,0);
while(raxNext(&ri2)) {
client *c;
memcpy(&c,ri2.key,sizeof(c));
sendTrackingMessage(c,proto,sdslen(proto),1);
}
raxStop(&ri2);
/* Clean up: we can remove everything from this state, because we
* want to only track the new keys that will be accumulated starting
* from now. */
sdsfree(proto);
}
raxFree(bs->keys);
bs->keys = raxNew();
}
raxStop(&ri);
}
/* This is just used in order to access the amount of used slots in the
* tracking table. */
uint64_t trackingGetTotalItems(void) {
return TrackingTableTotalItems;
}
uint64_t trackingGetTotalKeys(void) {
if (TrackingTable == NULL) return 0;
return raxSize(TrackingTable);
}

View File

@ -92,7 +92,7 @@ set ::file ""; # If set, runs only the tests in this comma separated list
set ::curfile ""; # Hold the filename of the current suite
set ::accurate 0; # If true runs fuzz tests with more iterations
set ::force_failure 0
set ::timeout 600; # 10 minutes without progresses will quit the test.
set ::timeout 1200; # 20 minutes without progresses will quit the test.
set ::last_progress [clock seconds]
set ::active_servers {} ; # Pids of active Redis instances.
set ::dont_clean 0

View File

@ -209,5 +209,97 @@ start_server {tags {"defrag"}} {
assert {$digest eq $newdigest}
r save ;# saving an rdb iterates over all the data / pointers
} {OK}
test "Active defrag big list" {
r flushdb
r config resetstat
r config set save "" ;# prevent bgsave from interfereing with save below
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
r config set active-defrag-cycle-max 75
r config set active-defrag-ignore-bytes 2mb
r config set maxmemory 0
r config set list-max-ziplist-size 5 ;# list of 500k items will have 100k quicklist nodes
# create big keys with 10k items
set rd [redis_deferring_client]
set expected_frag 1.7
# add a mass of list nodes to two lists (allocations are interlaced)
set val [string repeat A 100] ;# 5 items of 100 bytes puts us in the 640 bytes bin, which has 32 regs, so high potential for fragmentation
for {set j 0} {$j < 500000} {incr j} {
$rd lpush biglist1 $val
$rd lpush biglist2 $val
}
for {set j 0} {$j < 500000} {incr j} {
$rd read ; # Discard replies
$rd read ; # Discard replies
}
# create some fragmentation
r del biglist2
# start defrag
after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
if {$::verbose} {
puts "frag $frag"
}
assert {$frag >= $expected_frag}
r config set latency-monitor-threshold 5
r latency reset
set digest [r debug digest]
catch {r config set activedefrag yes} e
if {![string match {DISABLED*} $e]} {
# wait for the active defrag to start working (decision once a second)
wait_for_condition 50 100 {
[s active_defrag_running] ne 0
} else {
fail "defrag not started."
}
# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
} else {
after 120 ;# serverCron only updates the info once in 100ms
puts [r info memory]
puts [r info stats]
puts [r memory malloc-stats]
fail "defrag didn't stop."
}
# test the the fragmentation is lower
after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
set max_latency 0
foreach event [r latency latest] {
lassign $event eventname time latency max
if {$eventname == "active-defrag-cycle"} {
set max_latency $max
}
}
if {$::verbose} {
puts "frag $frag"
puts "max latency $max_latency"
puts [r latency latest]
puts [r latency history active-defrag-cycle]
}
assert {$frag < 1.1}
# due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
# we expect max latency to be not much higher than 7.5ms
assert {$max_latency <= 12}
}
# verify the data isn't corrupted or changed
set newdigest [r debug digest]
assert {$digest eq $newdigest}
r save ;# saving an rdb iterates over all the data / pointers
r del biglist1 ;# coverage for quicklistBookmarksClear
} {1}
}
}

66
tests/unit/tracking.tcl Normal file
View File

@ -0,0 +1,66 @@
start_server {tags {"tracking"}} {
# Create a deferred client we'll use to redirect invalidation
# messages to.
set rd1 [redis_deferring_client]
$rd1 client id
set redir [$rd1 read]
$rd1 subscribe __redis__:invalidate
$rd1 read ; # Consume the SUBSCRIBE reply.
test {Clients are able to enable tracking and redirect it} {
r CLIENT TRACKING on REDIRECT $redir
} {*OK}
test {The other connection is able to get invalidations} {
r SET a 1
r GET a
r INCR a
r INCR b ; # This key should not be notified, since it wasn't fetched.
set keys [lindex [$rd1 read] 2]
assert {[llength $keys] == 1}
assert {[lindex $keys 0] eq {a}}
}
test {The client is now able to disable tracking} {
# Make sure to add a few more keys in the tracking list
# so that we can check for leaks, as a side effect.
r MGET a b c d e f g
r CLIENT TRACKING off
}
test {Clients can enable the BCAST mode with the empty prefix} {
r CLIENT TRACKING on BCAST REDIRECT $redir
} {*OK*}
test {The connection gets invalidation messages about all the keys} {
r MSET a 1 b 2 c 3
set keys [lsort [lindex [$rd1 read] 2]]
assert {$keys eq {a b c}}
}
test {Clients can enable the BCAST mode with prefixes} {
r CLIENT TRACKING off
r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX a: PREFIX b:
r MULTI
r INCR a:1
r INCR a:2
r INCR b:1
r INCR b:2
r EXEC
# Because of the internals, we know we are going to receive
# two separated notifications for the two different prefixes.
set keys1 [lsort [lindex [$rd1 read] 2]]
set keys2 [lsort [lindex [$rd1 read] 2]]
set keys [lsort [list {*}$keys1 {*}$keys2]]
assert {$keys eq {a:1 a:2 b:1 b:2}}
}
test {Adding prefixes to BCAST mode works} {
r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX c:
r INCR c:1234
set keys [lsort [lindex [$rd1 read] 2]]
assert {$keys eq {c:1234}}
}
$rd1 close
}

View File

@ -161,6 +161,15 @@ start_server {
assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}}
}
test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} {
r del mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
set rd [redis_deferring_client]
$rd XREADGROUP GROUP mygroup Alice BLOCK 100 STREAMS mystream ">"
r XGROUP DESTROY mystream mygroup
assert_error "*NOGROUP*" {$rd read}
}
test {XCLAIM can claim PEL items from another consumer} {
# Add 3 items into the stream, and create a consumer group
r del mystream