diff --git a/src/aof.c b/src/aof.c index 6f8e53712..cbc0989d0 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1244,12 +1244,16 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { while(raxNext(&ri)) { streamCG *group = ri.data; /* Emit the XGROUP CREATE in order to create the group. */ - if (rioWriteBulkCount(r,'*',5) == 0) return 0; - if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0; - if (rioWriteBulkString(r,"CREATE",6) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; - if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return 0; - if (rioWriteBulkStreamID(r,&group->last_id) == 0) return 0; + if (!rioWriteBulkCount(r,'*',5) || + !rioWriteBulkString(r,"XGROUP",6) || + !rioWriteBulkString(r,"CREATE",6) || + !rioWriteBulkObject(r,key) || + !rioWriteBulkString(r,(char*)ri.key,ri.key_len) || + !rioWriteBulkStreamID(r,&group->last_id)) + { + raxStop(&ri); + return 0; + } /* Generate XCLAIMs for each consumer that happens to * have pending entries. Empty consumers have no semantical @@ -1270,6 +1274,9 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { ri.key_len,consumer, ri_pel.key,nack) == 0) { + raxStop(&ri_pel); + raxStop(&ri_cons); + raxStop(&ri); return 0; } } diff --git a/src/defrag.c b/src/defrag.c index 2d8db8ea5..07a16ca6c 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -662,6 +662,7 @@ int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime, /* if cursor is non-zero, we seek to the static 'last' */ if (!raxSeek(&ri,">", last, sizeof(last))) { *cursor = 0; + raxStop(&ri); return 0; } /* assign the iterator node callback after the seek, so that the diff --git a/src/rdb.c b/src/rdb.c index 5cec208c5..ac1985d24 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -697,15 +697,23 @@ ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) { while(raxNext(&ri)) { /* We store IDs in raw form as 128 big big endian numbers, like * they are inside the radix tree key. */ - if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) return -1; + if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) { + raxStop(&ri); + return -1; + } nwritten += n; if (nacks) { streamNACK *nack = ri.data; - if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1) + if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1) { + raxStop(&ri); return -1; + } nwritten += n; - if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) return -1; + if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) { + raxStop(&ri); + return -1; + } nwritten += n; /* We don't save the consumer name: we'll save the pending IDs * for each consumer in the consumer PEL, and resolve the consumer @@ -734,20 +742,27 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) { streamConsumer *consumer = ri.data; /* Consumer name. */ - if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1; + if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) { + raxStop(&ri); + return -1; + } nwritten += n; /* Last seen time. */ - if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1) + if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1) { + raxStop(&ri); return -1; + } nwritten += n; /* Consumer PEL, without the ACKs (see last parameter of the function * passed with value of 0), at loading time we'll lookup the ID * in the consumer group global PEL and will put a reference in the * consumer local PEL. */ - if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1) + if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1) { + raxStop(&ri); return -1; + } nwritten += n; } raxStop(&ri); @@ -912,9 +927,15 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { while (raxNext(&ri)) { unsigned char *lp = ri.data; size_t lp_bytes = lpBytes(lp); - if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1; + if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) { + raxStop(&ri); + return -1; + } nwritten += n; - if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1; + if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) { + raxStop(&ri); + return -1; + } nwritten += n; } raxStop(&ri); @@ -946,22 +967,36 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { streamCG *cg = ri.data; /* Save the group name. */ - if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) + if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) { + raxStop(&ri); return -1; + } nwritten += n; /* Last ID. */ - if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) return -1; + if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) { + raxStop(&ri); + return -1; + } nwritten += n; - if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) return -1; + if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) { + raxStop(&ri); + return -1; + } nwritten += n; /* Save the global PEL. */ - if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1; + if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) { + raxStop(&ri); + return -1; + } nwritten += n; /* Save the consumers of this group. */ - if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) return -1; + if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) { + raxStop(&ri); + return -1; + } nwritten += n; } raxStop(&ri); diff --git a/src/timeout.c b/src/timeout.c index 7787a049f..d4c4690e5 100644 --- a/src/timeout.c +++ b/src/timeout.c @@ -150,6 +150,7 @@ void handleBlockedClientsTimeout(void) { raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL); raxSeek(&ri,"^",NULL,0); } + raxStop(&ri); } /* Get a timeout value from an object and store it into 'timeout'.