Stream avoid duplicate parse id (#7450)

This commit is contained in:
杨博东 2020-07-16 13:57:27 +08:00 committed by GitHub
parent df4c74ef07
commit 2028ad5a12

View File

@ -40,6 +40,11 @@
#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */
#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
/* For stream commands that require multiple IDs
* when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN',
* avoid malloc allocation.*/
#define STREAMID_STATIC_VECTOR_LEN 8
void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
@ -1375,7 +1380,6 @@ void xreadCommand(client *c) {
int streams_count = 0;
int streams_arg = 0;
int noack = 0; /* True if NOACK option was specified. */
#define STREAMID_STATIC_VECTOR_LEN 8
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
streamCG **groups = NULL;
@ -1949,18 +1953,19 @@ void xackCommand(client *c) {
* error: the return value of this command cannot be an error in case
* the client successfully acknowledged some messages, so it should be
* executed in a "all or nothing" fashion. */
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
int id_count = c->argc-3;
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (int j = 3; j < c->argc; j++) {
streamID id;
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0) != C_OK) goto cleanup;
}
int acknowledged = 0;
for (int j = 3; j < c->argc; j++) {
streamID id;
unsigned char buf[sizeof(streamID)];
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
serverPanic("StreamID invalid after check. Should not be possible.");
streamEncodeID(buf,&id);
streamEncodeID(buf,&ids[j-3]);
/* Lookup the ID in the group PEL: it will have a reference to the
* NACK structure that will have a reference to the consumer, so that
@ -1975,6 +1980,8 @@ void xackCommand(client *c) {
}
}
addReplyLongLong(c,acknowledged);
cleanup:
if (ids != static_ids) zfree(ids);
}
/* XPENDING <key> <group> [<start> <stop> <count> [<consumer>]]
@ -2227,9 +2234,13 @@ void xclaimCommand(client *c) {
* the client successfully claimed some message, so it should be
* executed in a "all or nothing" fashion. */
int j;
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
int id_count = c->argc-5;
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (j = 5; j < c->argc; j++) {
streamID id;
if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;
if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0) != C_OK) break;
}
int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
@ -2249,24 +2260,24 @@ void xclaimCommand(client *c) {
j++;
if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
"Invalid IDLE option argument for XCLAIM")
!= C_OK) return;
!= C_OK) goto cleanup;
deliverytime = now - deliverytime;
} else if (!strcasecmp(opt,"TIME") && moreargs) {
j++;
if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
"Invalid TIME option argument for XCLAIM")
!= C_OK) return;
!= C_OK) goto cleanup;
} else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) {
j++;
if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
"Invalid RETRYCOUNT option argument for XCLAIM")
!= C_OK) return;
!= C_OK) goto cleanup;
} else if (!strcasecmp(opt,"LASTID") && moreargs) {
j++;
if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return;
if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) goto cleanup;
} else {
addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
return;
goto cleanup;
}
}
@ -2296,10 +2307,8 @@ void xclaimCommand(client *c) {
void *arraylenptr = addReplyDeferredLen(c);
size_t arraylen = 0;
for (int j = 5; j <= last_id_arg; j++) {
streamID id;
streamID id = ids[j-5];
unsigned char buf[sizeof(streamID)];
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
serverPanic("StreamID invalid after check. Should not be possible.");
streamEncodeID(buf,&id);
/* Lookup the ID in the group PEL. */
@ -2379,6 +2388,8 @@ void xclaimCommand(client *c) {
}
setDeferredArrayLen(c,arraylenptr,arraylen);
preventCommandPropagation(c);
cleanup:
if (ids != static_ids) zfree(ids);
}
@ -2397,16 +2408,19 @@ void xdelCommand(client *c) {
/* We need to sanity check the IDs passed to start. Even if not
* a big issue, it is not great that the command is only partially
* executed because at some point an invalid ID is parsed. */
streamID id;
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
int id_count = c->argc-2;
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (int j = 2; j < c->argc; j++) {
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0) != C_OK) goto cleanup;
}
/* Actually apply the command. */
int deleted = 0;
for (int j = 2; j < c->argc; j++) {
streamParseStrictIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */
deleted += streamDeleteItem(s,&id);
deleted += streamDeleteItem(s,&ids[j-2]);
}
/* Propagate the write if needed. */
@ -2416,6 +2430,8 @@ void xdelCommand(client *c) {
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
cleanup:
if (ids != static_ids) zfree(ids);
}
/* General form: XTRIM <key> [... options ...]