diff --git a/src/rdb.c b/src/rdb.c index 8f896983b..45c2b3acd 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -642,7 +642,48 @@ int rdbLoadObjectType(rio *rdb) { return type; } -/* Save a Redis object. Returns -1 on error, number of bytes written on success. */ +/* This helper function serializes a consumer group Pending Entries List (PEL) + * into the RDB file. The 'nacks' argument tells the function if also persist + * the informations about the not acknowledged message, or if to persist + * just the IDs: this is useful because for the global consumer group PEL + * we serialized the NACKs as well, but when serializing the local consumer + * PELs we just add the ID, that will be resolved inside the global PEL to + * put a reference to the same structure. */ +ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) { + ssize_t n, nwritten = 0; + + /* Number of entries in the PEL. */ + if ((n = rdbSaveLen(rdb,raxSize(pel))) == -1) return -1; + nwritten += n; + + /* Save each entry. */ + raxIterator ri; + raxStart(&ri,pel); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + /* We store IDs in raw form as 128 big big endian numbers, like + * they are inside the radix tree key. */ + if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) return -1; + nwritten += n; + + if (nacks) { + streamNACK *nack = ri.data; + if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1) + return -1; + nwritten += n; + if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) return -1; + nwritten += n; + /* We don't save the consumer name: we'll save the pending IDs + * for each consumer in the consumer PEL, and resolve the consumer + * at loading time. */ + } + } + raxStop(&ri); + return nwritten; +} + +/* Save a Redis object. + * Returns -1 on error, number of bytes written on success. */ ssize_t rdbSaveObject(rio *rdb, robj *o) { ssize_t n = 0, nwritten = 0; @@ -798,6 +839,37 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { nwritten += n; if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1; nwritten += n; + + /* The consumer groups and their clients are part of the stream + * type, so serialize every consumer group. */ + + /* Save the number of groups. */ + if ((n = rdbSaveLen(rdb,raxSize(s->cgroups))) == -1) return -1; + nwritten += n; + + /* Serialize each consumer group. */ + raxStart(&ri,s->cgroups); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + streamCG *cg = ri.data; + + /* Save the consumer group name. */ + if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1; + nwritten += n; + + /* Last ID. */ + if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) return -1; + nwritten += n; + if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) return -1; + nwritten += n; + + /* Save the global PEL. */ + if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1; + nwritten += n; + + /* Save the consumers of this group. */ + } + raxStop(&ri); } else if (o->type == OBJ_MODULE) { /* Save a module-specific value. */ RedisModuleIO io;