From 2028ad5a12899352582f540bf8f550f4fd2f2fa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E5=8D=9A=E4=B8=9C?= Date: Thu, 16 Jul 2020 13:57:27 +0800 Subject: [PATCH] Stream avoid duplicate parse id (#7450) --- src/t_stream.c | 58 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index f564b1ff9..d2c5b45a3 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -40,6 +40,11 @@ #define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ +/* For stream commands that require multiple IDs + * when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN', + * avoid malloc allocation.*/ +#define STREAMID_STATIC_VECTOR_LEN 8 + void streamFreeCG(streamCG *cg); void streamFreeNACK(streamNACK *na); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); @@ -1375,7 +1380,6 @@ void xreadCommand(client *c) { int streams_count = 0; int streams_arg = 0; int noack = 0; /* True if NOACK option was specified. */ - #define STREAMID_STATIC_VECTOR_LEN 8 streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; streamID *ids = static_ids; streamCG **groups = NULL; @@ -1949,18 +1953,19 @@ void xackCommand(client *c) { * error: the return value of this command cannot be an error in case * the client successfully acknowledged some messages, so it should be * executed in a "all or nothing" fashion. */ + streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; + streamID *ids = static_ids; + int id_count = c->argc-3; + if (id_count > STREAMID_STATIC_VECTOR_LEN) + ids = zmalloc(sizeof(streamID)*id_count); for (int j = 3; j < c->argc; j++) { - streamID id; - if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0) != C_OK) goto cleanup; } int acknowledged = 0; for (int j = 3; j < c->argc; j++) { - streamID id; unsigned char buf[sizeof(streamID)]; - if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) - serverPanic("StreamID invalid after check. Should not be possible."); - streamEncodeID(buf,&id); + streamEncodeID(buf,&ids[j-3]); /* Lookup the ID in the group PEL: it will have a reference to the * NACK structure that will have a reference to the consumer, so that @@ -1975,6 +1980,8 @@ void xackCommand(client *c) { } } addReplyLongLong(c,acknowledged); +cleanup: + if (ids != static_ids) zfree(ids); } /* XPENDING [ []] @@ -2227,9 +2234,13 @@ void xclaimCommand(client *c) { * the client successfully claimed some message, so it should be * executed in a "all or nothing" fashion. */ int j; + streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; + streamID *ids = static_ids; + int id_count = c->argc-5; + if (id_count > STREAMID_STATIC_VECTOR_LEN) + ids = zmalloc(sizeof(streamID)*id_count); for (j = 5; j < c->argc; j++) { - streamID id; - if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break; + if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0) != C_OK) break; } int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */ @@ -2249,24 +2260,24 @@ void xclaimCommand(client *c) { j++; if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, "Invalid IDLE option argument for XCLAIM") - != C_OK) return; + != C_OK) goto cleanup; deliverytime = now - deliverytime; } else if (!strcasecmp(opt,"TIME") && moreargs) { j++; if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, "Invalid TIME option argument for XCLAIM") - != C_OK) return; + != C_OK) goto cleanup; } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) { j++; if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount, "Invalid RETRYCOUNT option argument for XCLAIM") - != C_OK) return; + != C_OK) goto cleanup; } else if (!strcasecmp(opt,"LASTID") && moreargs) { j++; - if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return; + if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) goto cleanup; } else { addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); - return; + goto cleanup; } } @@ -2296,10 +2307,8 @@ void xclaimCommand(client *c) { void *arraylenptr = addReplyDeferredLen(c); size_t arraylen = 0; for (int j = 5; j <= last_id_arg; j++) { - streamID id; + streamID id = ids[j-5]; unsigned char buf[sizeof(streamID)]; - if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) - serverPanic("StreamID invalid after check. Should not be possible."); streamEncodeID(buf,&id); /* Lookup the ID in the group PEL. */ @@ -2379,6 +2388,8 @@ void xclaimCommand(client *c) { } setDeferredArrayLen(c,arraylenptr,arraylen); preventCommandPropagation(c); +cleanup: + if (ids != static_ids) zfree(ids); } @@ -2397,16 +2408,19 @@ void xdelCommand(client *c) { /* We need to sanity check the IDs passed to start. Even if not * a big issue, it is not great that the command is only partially * executed because at some point an invalid ID is parsed. */ - streamID id; + streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; + streamID *ids = static_ids; + int id_count = c->argc-2; + if (id_count > STREAMID_STATIC_VECTOR_LEN) + ids = zmalloc(sizeof(streamID)*id_count); for (int j = 2; j < c->argc; j++) { - if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0) != C_OK) goto cleanup; } /* Actually apply the command. */ int deleted = 0; for (int j = 2; j < c->argc; j++) { - streamParseStrictIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */ - deleted += streamDeleteItem(s,&id); + deleted += streamDeleteItem(s,&ids[j-2]); } /* Propagate the write if needed. */ @@ -2416,6 +2430,8 @@ void xdelCommand(client *c) { server.dirty += deleted; } addReplyLongLong(c,deleted); +cleanup: + if (ids != static_ids) zfree(ids); } /* General form: XTRIM [... options ...]