From 8f4ad687afaf56de7324ec6ee7fff533c8da4062 Mon Sep 17 00:00:00 2001 From: Wen Hui Date: Tue, 22 Sep 2020 02:05:47 -0400 Subject: [PATCH] refactor rewriteStreamObject code for adding missing streamIteratorStop call (#7829) This commit adds streamIteratorStop call in rewriteStreamObject function in some of the return statement. Although currently this will not cause memory leak since stream id is only 16 bytes long. (cherry picked from commit 7934f163b4b6c1c0c0fc55710d3c7e49f56281f1) --- src/aof.c | 54 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/src/aof.c b/src/aof.c index 757c68807..2114a17e4 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1201,16 +1201,24 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { * the ID, the second is an array of field-value pairs. */ /* Emit the XADD ...fields... command. */ - if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0; - if (rioWriteBulkString(r,"XADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; - if (rioWriteBulkStreamID(r,&id) == 0) return 0; + if (!rioWriteBulkCount(r,'*',3+numfields*2) || + !rioWriteBulkString(r,"XADD",4) || + !rioWriteBulkObject(r,key) || + !rioWriteBulkStreamID(r,&id)) + { + streamIteratorStop(&si); + return 0; + } while(numfields--) { unsigned char *field, *value; int64_t field_len, value_len; streamIteratorGetField(&si,&field,&value,&field_len,&value_len); - if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0; - if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0; + if (!rioWriteBulkString(r,(char*)field,field_len) || + !rioWriteBulkString(r,(char*)value,value_len)) + { + streamIteratorStop(&si); + return 0; + } } } } else { @@ -1218,22 +1226,30 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { * the key we are serializing is an empty string, which is possible * for the Stream type. */ id.ms = 0; id.seq = 1; - if (rioWriteBulkCount(r,'*',7) == 0) return 0; - if (rioWriteBulkString(r,"XADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; - if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0; - if (rioWriteBulkString(r,"0",1) == 0) return 0; - if (rioWriteBulkStreamID(r,&id) == 0) return 0; - if (rioWriteBulkString(r,"x",1) == 0) return 0; - if (rioWriteBulkString(r,"y",1) == 0) return 0; + if (!rioWriteBulkCount(r,'*',7) || + !rioWriteBulkString(r,"XADD",4) || + !rioWriteBulkObject(r,key) || + !rioWriteBulkString(r,"MAXLEN",6) || + !rioWriteBulkString(r,"0",1) || + !rioWriteBulkStreamID(r,&id) || + !rioWriteBulkString(r,"x",1) || + !rioWriteBulkString(r,"y",1)) + { + streamIteratorStop(&si); + return 0; + } } /* Append XSETID after XADD, make sure lastid is correct, * in case of XDEL lastid. */ - if (rioWriteBulkCount(r,'*',3) == 0) return 0; - if (rioWriteBulkString(r,"XSETID",6) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; - if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; + if (!rioWriteBulkCount(r,'*',3) || + !rioWriteBulkString(r,"XSETID",6) || + !rioWriteBulkObject(r,key) || + !rioWriteBulkStreamID(r,&s->last_id)) + { + streamIteratorStop(&si); + return 0; + } /* Create all the stream consumer groups. */ @@ -1252,6 +1268,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { !rioWriteBulkStreamID(r,&group->last_id)) { raxStop(&ri); + streamIteratorStop(&si); return 0; } @@ -1277,6 +1294,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { raxStop(&ri_pel); raxStop(&ri_cons); raxStop(&ri); + streamIteratorStop(&si); return 0; } }