Merge commit '05b332e550e7ce32e364dde6cdd8820e8665eb21' into unstable

Former-commit-id: c52e1fc78c369843e49a22765bed2c53e99d9709
This commit is contained in:
John Sully 2020-05-22 15:42:50 -04:00
commit bd3deb6c67
5 changed files with 61 additions and 19 deletions

View File

@ -392,9 +392,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
int noack = 0; int noack = 0;
if (group) { if (group) {
consumer = streamLookupConsumer(group, consumer =
streamLookupConsumer(group,
szFromObj(receiver->bpop.xread_consumer), szFromObj(receiver->bpop.xread_consumer),
1); SLC_NONE);
noack = receiver->bpop.xread_group_noack; noack = receiver->bpop.xread_group_noack;
} }

View File

@ -566,6 +566,34 @@ void addReplyStatusFormat(client *c, const char *fmt, ...) {
sdsfree(s); sdsfree(s);
} }
/* Sometimes we are forced to create a new reply node, and we can't append to
* the previous one, when that happens, we wanna try to trim the unused space
* at the end of the last reply node which we won't use anymore. */
void trimReplyUnusedTailSpace(client *c) {
listNode *ln = listLast(c->reply);
clientReplyBlock *tail = ln? (clientReplyBlock*)listNodeValue(ln): NULL;
/* Note that 'tail' may be NULL even if we have a tail node, becuase when
* addDeferredMultiBulkLength() is used */
if (!tail) return;
/* We only try to trim the space is relatively high (more than a 1/4 of the
* allocation), otherwise there's a high chance realloc will NOP.
* Also, to avoid large memmove which happens as part of realloc, we only do
* that if the used part is small. */
if (tail->size - tail->used > tail->size / 4 &&
tail->used < PROTO_REPLY_CHUNK_BYTES)
{
size_t old_size = tail->size;
tail = (clientReplyBlock*)zrealloc(tail, tail->used + sizeof(clientReplyBlock));
/* take over the allocation's internal fragmentation (at least for
* memory usage tracking) */
tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
c->reply_bytes += tail->size - old_size;
listNodeValue(ln) = tail;
}
}
/* Adds an empty object to the reply list that will contain the multi bulk /* Adds an empty object to the reply list that will contain the multi bulk
* length, which is not known when this function is called. */ * length, which is not known when this function is called. */
void *addReplyDeferredLen(client *c) { void *addReplyDeferredLen(client *c) {
@ -573,6 +601,7 @@ void *addReplyDeferredLen(client *c) {
* ready to be sent, since we are sure that before returning to the * ready to be sent, since we are sure that before returning to the
* event loop setDeferredAggregateLen() will be called. */ * event loop setDeferredAggregateLen() will be called. */
if (prepareClientToWrite(c, false) != C_OK) return NULL; if (prepareClientToWrite(c, false) != C_OK) return NULL;
trimReplyUnusedTailSpace(c);
listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */ listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
return listLast(c->reply); return listLast(c->reply);
} }

View File

@ -1933,8 +1933,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) {
decrRefCount(o); decrRefCount(o);
return NULL; return NULL;
} }
streamConsumer *consumer = streamLookupConsumer(cgroup,cname, streamConsumer *consumer =
1); streamLookupConsumer(cgroup,cname,SLC_NONE);
sdsfree(cname); sdsfree(cname);
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
if (rioGetReadError(rdb)) { if (rioGetReadError(rdb)) {

View File

@ -96,6 +96,11 @@ typedef struct streamPropInfo {
/* Prototypes of exported APIs. */ /* Prototypes of exported APIs. */
struct client; struct client;
/* Flags for streamLookupConsumer */
#define SLC_NONE 0
#define SLC_NOCREAT (1<<0) /* Do not create the consumer if it doesn't exist */
#define SLC_NOREFRESH (1<<1) /* Do not update consumer's seen-time */
stream *streamNew(void); stream *streamNew(void);
void freeStream(stream *s); void freeStream(stream *s);
unsigned long streamLength(robj_roptr subject); unsigned long streamLength(robj_roptr subject);
@ -105,7 +110,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorStop(streamIterator *si); void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname); streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create); streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
streamNACK *streamCreateNACK(streamConsumer *consumer); streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id); void streamDecodeID(void *buf, streamID *id);

View File

@ -1576,7 +1576,8 @@ void xreadCommand(client *c) {
addReplyBulk(c,c->argv[streams_arg+i]); addReplyBulk(c,c->argv[streams_arg+i]);
streamConsumer *consumer = NULL; streamConsumer *consumer = NULL;
if (groups) consumer = streamLookupConsumer(groups[i], if (groups) consumer = streamLookupConsumer(groups[i],
szFromObj(consumername),1); szFromObj(consumername),
SLC_NONE);
streamPropInfo spi = {c->argv[i+streams_arg],groupname}; streamPropInfo spi = {c->argv[i+streams_arg],groupname};
int flags = 0; int flags = 0;
if (noack) flags |= STREAM_RWR_NOACK; if (noack) flags |= STREAM_RWR_NOACK;
@ -1712,7 +1713,9 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
* consumer does not exist it is automatically created as a side effect * consumer does not exist it is automatically created as a side effect
* of calling this function, otherwise its last seen time is updated and * of calling this function, otherwise its last seen time is updated and
* the existing consumer reference returned. */ * the existing consumer reference returned. */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
int create = !(flags & SLC_NOCREAT);
int refresh = !(flags & SLC_NOREFRESH);
streamConsumer *consumer = (streamConsumer*)raxFind(cg->consumers,(unsigned char*)name, streamConsumer *consumer = (streamConsumer*)raxFind(cg->consumers,(unsigned char*)name,
sdslen(name)); sdslen(name));
if (consumer == raxNotFound) { if (consumer == raxNotFound) {
@ -1723,7 +1726,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
consumer,NULL); consumer,NULL);
} }
consumer->seen_time = mstime(); if (refresh) consumer->seen_time = mstime();
return consumer; return consumer;
} }
@ -1731,7 +1734,8 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
* may have pending messages: they are removed from the PEL, and the number * may have pending messages: they are removed from the PEL, and the number
* of pending messages "lost" is returned. */ * of pending messages "lost" is returned. */
uint64_t streamDelConsumer(streamCG *cg, sds name) { uint64_t streamDelConsumer(streamCG *cg, sds name) {
streamConsumer *consumer = streamLookupConsumer(cg,name,0); streamConsumer *consumer =
streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH);
if (consumer == NULL) return 0; if (consumer == NULL) return 0;
uint64_t retval = raxSize(consumer->pel); uint64_t retval = raxSize(consumer->pel);
@ -2074,16 +2078,19 @@ void xpendingCommand(client *c) {
} }
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */ /* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
else { else {
streamConsumer *consumer = consumername ? streamConsumer *consumer = NULL;
streamLookupConsumer(group,szFromObj(consumername),0): if (consumername) {
NULL; consumer = streamLookupConsumer(group,
szFromObj(consumername),
SLC_NOCREAT|SLC_NOREFRESH);
/* If a consumer name was mentioned but it does not exist, we can /* If a consumer name was mentioned but it does not exist, we can
* just return an empty array. */ * just return an empty array. */
if (consumername && consumer == NULL) { if (consumer == NULL) {
addReplyArrayLen(c,0); addReplyArrayLen(c,0);
return; return;
} }
}
rax *pel = consumer ? consumer->pel : group->pel; rax *pel = consumer ? consumer->pel : group->pel;
unsigned char startkey[sizeof(streamID)]; unsigned char startkey[sizeof(streamID)];
@ -2344,7 +2351,7 @@ void xclaimCommand(client *c) {
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer and idle time. */ /* Update the consumer and idle time. */
if (consumer == NULL) if (consumer == NULL)
consumer = streamLookupConsumer(group,szFromObj(c->argv[3]),1); consumer = streamLookupConsumer(group,szFromObj(c->argv[3]),SLC_NONE);
nack->consumer = consumer; nack->consumer = consumer;
nack->delivery_time = deliverytime; nack->delivery_time = deliverytime;
/* Set the delivery attempts counter if given, otherwise /* Set the delivery attempts counter if given, otherwise