Stream avoid duplicate parse id (#7450)

This commit is contained in:
杨博东 2020-07-16 13:57:27 +08:00 committed by GitHub
parent 5f716ea467
commit 8596d483bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -40,6 +40,11 @@
#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */ #define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */
#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
/* For stream commands that require multiple IDs
* when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN',
* avoid malloc allocation.*/
#define STREAMID_STATIC_VECTOR_LEN 8
void streamFreeCG(streamCG *cg); void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na); void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
@ -1375,7 +1380,6 @@ void xreadCommand(client *c) {
int streams_count = 0; int streams_count = 0;
int streams_arg = 0; int streams_arg = 0;
int noack = 0; /* True if NOACK option was specified. */ int noack = 0; /* True if NOACK option was specified. */
#define STREAMID_STATIC_VECTOR_LEN 8
streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids; streamID *ids = static_ids;
streamCG **groups = NULL; streamCG **groups = NULL;
@ -1949,18 +1953,19 @@ void xackCommand(client *c) {
* error: the return value of this command cannot be an error in case * error: the return value of this command cannot be an error in case
* the client successfully acknowledged some messages, so it should be * the client successfully acknowledged some messages, so it should be
* executed in a "all or nothing" fashion. */ * executed in a "all or nothing" fashion. */
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
int id_count = c->argc-3;
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (int j = 3; j < c->argc; j++) { for (int j = 3; j < c->argc; j++) {
streamID id; if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0) != C_OK) goto cleanup;
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
} }
int acknowledged = 0; int acknowledged = 0;
for (int j = 3; j < c->argc; j++) { for (int j = 3; j < c->argc; j++) {
streamID id;
unsigned char buf[sizeof(streamID)]; unsigned char buf[sizeof(streamID)];
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) streamEncodeID(buf,&ids[j-3]);
serverPanic("StreamID invalid after check. Should not be possible.");
streamEncodeID(buf,&id);
/* Lookup the ID in the group PEL: it will have a reference to the /* Lookup the ID in the group PEL: it will have a reference to the
* NACK structure that will have a reference to the consumer, so that * NACK structure that will have a reference to the consumer, so that
@ -1975,6 +1980,8 @@ void xackCommand(client *c) {
} }
} }
addReplyLongLong(c,acknowledged); addReplyLongLong(c,acknowledged);
cleanup:
if (ids != static_ids) zfree(ids);
} }
/* XPENDING <key> <group> [<start> <stop> <count> [<consumer>]] /* XPENDING <key> <group> [<start> <stop> <count> [<consumer>]]
@ -2227,9 +2234,13 @@ void xclaimCommand(client *c) {
* the client successfully claimed some message, so it should be * the client successfully claimed some message, so it should be
* executed in a "all or nothing" fashion. */ * executed in a "all or nothing" fashion. */
int j; int j;
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
int id_count = c->argc-5;
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (j = 5; j < c->argc; j++) { for (j = 5; j < c->argc; j++) {
streamID id; if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0) != C_OK) break;
if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;
} }
int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */ int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
@ -2249,24 +2260,24 @@ void xclaimCommand(client *c) {
j++; j++;
if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
"Invalid IDLE option argument for XCLAIM") "Invalid IDLE option argument for XCLAIM")
!= C_OK) return; != C_OK) goto cleanup;
deliverytime = now - deliverytime; deliverytime = now - deliverytime;
} else if (!strcasecmp(opt,"TIME") && moreargs) { } else if (!strcasecmp(opt,"TIME") && moreargs) {
j++; j++;
if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
"Invalid TIME option argument for XCLAIM") "Invalid TIME option argument for XCLAIM")
!= C_OK) return; != C_OK) goto cleanup;
} else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) { } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) {
j++; j++;
if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount, if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
"Invalid RETRYCOUNT option argument for XCLAIM") "Invalid RETRYCOUNT option argument for XCLAIM")
!= C_OK) return; != C_OK) goto cleanup;
} else if (!strcasecmp(opt,"LASTID") && moreargs) { } else if (!strcasecmp(opt,"LASTID") && moreargs) {
j++; j++;
if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return; if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) goto cleanup;
} else { } else {
addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
return; goto cleanup;
} }
} }
@ -2296,10 +2307,8 @@ void xclaimCommand(client *c) {
void *arraylenptr = addReplyDeferredLen(c); void *arraylenptr = addReplyDeferredLen(c);
size_t arraylen = 0; size_t arraylen = 0;
for (int j = 5; j <= last_id_arg; j++) { for (int j = 5; j <= last_id_arg; j++) {
streamID id; streamID id = ids[j-5];
unsigned char buf[sizeof(streamID)]; unsigned char buf[sizeof(streamID)];
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
serverPanic("StreamID invalid after check. Should not be possible.");
streamEncodeID(buf,&id); streamEncodeID(buf,&id);
/* Lookup the ID in the group PEL. */ /* Lookup the ID in the group PEL. */
@ -2379,6 +2388,8 @@ void xclaimCommand(client *c) {
} }
setDeferredArrayLen(c,arraylenptr,arraylen); setDeferredArrayLen(c,arraylenptr,arraylen);
preventCommandPropagation(c); preventCommandPropagation(c);
cleanup:
if (ids != static_ids) zfree(ids);
} }
@ -2397,16 +2408,19 @@ void xdelCommand(client *c) {
/* We need to sanity check the IDs passed to start. Even if not /* We need to sanity check the IDs passed to start. Even if not
* a big issue, it is not great that the command is only partially * a big issue, it is not great that the command is only partially
* executed because at some point an invalid ID is parsed. */ * executed because at some point an invalid ID is parsed. */
streamID id; streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
int id_count = c->argc-2;
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (int j = 2; j < c->argc; j++) { for (int j = 2; j < c->argc; j++) {
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0) != C_OK) goto cleanup;
} }
/* Actually apply the command. */ /* Actually apply the command. */
int deleted = 0; int deleted = 0;
for (int j = 2; j < c->argc; j++) { for (int j = 2; j < c->argc; j++) {
streamParseStrictIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */ deleted += streamDeleteItem(s,&ids[j-2]);
deleted += streamDeleteItem(s,&id);
} }
/* Propagate the write if needed. */ /* Propagate the write if needed. */
@ -2416,6 +2430,8 @@ void xdelCommand(client *c) {
server.dirty += deleted; server.dirty += deleted;
} }
addReplyLongLong(c,deleted); addReplyLongLong(c,deleted);
cleanup:
if (ids != static_ids) zfree(ids);
} }
/* General form: XTRIM <key> [... options ...] /* General form: XTRIM <key> [... options ...]