Fix propagation of consumer groups last ID.

Issue #5433.
This commit is contained in:
antirez 2018-10-10 12:51:02 +02:00
parent 19b668de4c
commit 599e872fba
3 changed files with 56 additions and 9 deletions

View File

@ -1702,6 +1702,7 @@ void initServerConfig(void) {
server.expireCommand = lookupCommandByCString("expire"); server.expireCommand = lookupCommandByCString("expire");
server.pexpireCommand = lookupCommandByCString("pexpire"); server.pexpireCommand = lookupCommandByCString("pexpire");
server.xclaimCommand = lookupCommandByCString("xclaim"); server.xclaimCommand = lookupCommandByCString("xclaim");
server.xgroupCommand = lookupCommandByCString("xgroup");
/* Slow log */ /* Slow log */
server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN; server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;

View File

@ -990,7 +990,8 @@ struct redisServer {
struct redisCommand *delCommand, *multiCommand, *lpushCommand, struct redisCommand *delCommand, *multiCommand, *lpushCommand,
*lpopCommand, *rpopCommand, *zpopminCommand, *lpopCommand, *rpopCommand, *zpopminCommand,
*zpopmaxCommand, *sremCommand, *execCommand, *zpopmaxCommand, *sremCommand, *execCommand,
*expireCommand, *pexpireCommand, *xclaimCommand; *expireCommand, *pexpireCommand, *xclaimCommand,
*xgroupCommand;
/* Fields used only for stats */ /* Fields used only for stats */
time_t stat_starttime; /* Server start time */ time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */ long long stat_numcommands; /* Number of processed commands */

View File

@ -791,18 +791,18 @@ robj *createObjectFromStreamID(streamID *id) {
/* As a result of an explicit XCLAIM or XREADGROUP command, new entries /* As a result of an explicit XCLAIM or XREADGROUP command, new entries
* are created in the pending list of the stream and consumers. We need * are created in the pending list of the stream and consumers. We need
* to propagate this changes in the form of XCLAIM commands. */ * to propagate this changes in the form of XCLAIM commands. */
void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNACK *nack) { void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
/* We need to generate an XCLAIM that will work in a idempotent fashion: /* We need to generate an XCLAIM that will work in a idempotent fashion:
* *
* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time> * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
* RETRYCOUNT <count> FORCE JUSTID. * RETRYCOUNT <count> FORCE JUSTID LASTID <id>.
* *
* Note that JUSTID is useful in order to avoid that XCLAIM will do * Note that JUSTID is useful in order to avoid that XCLAIM will do
* useless work in the slave side, trying to fetch the stream item. */ * useless work in the slave side, trying to fetch the stream item. */
robj *argv[12]; robj *argv[14];
argv[0] = createStringObject("XCLAIM",6); argv[0] = createStringObject("XCLAIM",6);
argv[1] = key; argv[1] = key;
argv[2] = group; argv[2] = groupname;
argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name)); argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name));
argv[4] = createStringObjectFromLongLong(0); argv[4] = createStringObjectFromLongLong(0);
argv[5] = id; argv[5] = id;
@ -812,7 +812,9 @@ void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNA
argv[9] = createStringObjectFromLongLong(nack->delivery_count); argv[9] = createStringObjectFromLongLong(nack->delivery_count);
argv[10] = createStringObject("FORCE",5); argv[10] = createStringObject("FORCE",5);
argv[11] = createStringObject("JUSTID",6); argv[11] = createStringObject("JUSTID",6);
propagate(server.xclaimCommand,c->db->id,argv,12,PROPAGATE_AOF|PROPAGATE_REPL); argv[12] = createStringObject("LASTID",6);
argv[13] = createObjectFromStreamID(&group->last_id);
propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]); decrRefCount(argv[0]);
decrRefCount(argv[3]); decrRefCount(argv[3]);
decrRefCount(argv[4]); decrRefCount(argv[4]);
@ -822,6 +824,27 @@ void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNA
decrRefCount(argv[9]); decrRefCount(argv[9]);
decrRefCount(argv[10]); decrRefCount(argv[10]);
decrRefCount(argv[11]); decrRefCount(argv[11]);
decrRefCount(argv[12]);
decrRefCount(argv[13]);
}
/* We need this when we want to propoagate the new last-id of a consumer group
* that was consumed by XREADGROUP with the NOACK option: in that case we can't
* propagate the last ID just using the XCLAIM LASTID option, so we emit
*
* XGROUP SETID <key> <groupname> <id>
*/
void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
robj *argv[5];
argv[0] = createStringObject("XGROUP",6);
argv[1] = createStringObject("SETID",5);
argv[2] = key;
argv[3] = groupname;
argv[4] = createObjectFromStreamID(&group->last_id);
propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[4]);
} }
/* Send the specified range to the client 'c'. The range the client will /* Send the specified range to the client 'c'. The range the client will
@ -873,6 +896,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamIterator si; streamIterator si;
int64_t numfields; int64_t numfields;
streamID id; streamID id;
int lastid_updated = 0;
/* If a group was passed, we check if the request is about messages /* If a group was passed, we check if the request is about messages
* never delivered so far (normally this happens when ">" ID is passed). * never delivered so far (normally this happens when ">" ID is passed).
@ -892,8 +916,10 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamIteratorStart(&si,s,start,end,rev); streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) { while(streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */ /* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0) if (group && streamCompareID(&id,&group->last_id) > 0) {
group->last_id = id; group->last_id = id;
lastid_updated = 1;
}
/* Emit a two elements array for each item. The first is /* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */ * the ID, the second is an array of field-value pairs. */
@ -953,9 +979,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* Propagate as XCLAIM. */ /* Propagate as XCLAIM. */
if (spi) { if (spi) {
robj *idarg = createObjectFromStreamID(&id); robj *idarg = createObjectFromStreamID(&id);
streamPropagateXCLAIM(c,spi->keyname,spi->groupname,idarg,nack); streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
decrRefCount(idarg); decrRefCount(idarg);
} }
} else {
if (lastid_updated)
streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
} }
arraylen++; arraylen++;
@ -1993,6 +2022,14 @@ void xpendingCommand(client *c) {
* Return just an array of IDs of messages successfully claimed, * Return just an array of IDs of messages successfully claimed,
* without returning the actual message. * without returning the actual message.
* *
* 6. LASTID <id>:
* Update the consumer group last ID with the specified ID if the
* current last ID is smaller than the provided one.
* This is used for replication / AOF, so that when we read from a
* consumer group, the XCLAIM that gets propagated to give ownership
* to the consumer, is also used in order to update the group current
* ID.
*
* The command returns an array of messages that the user * The command returns an array of messages that the user
* successfully claimed, so that the caller is able to understand * successfully claimed, so that the caller is able to understand
* what messages it is now in charge of. */ * what messages it is now in charge of. */
@ -2061,6 +2098,14 @@ void xclaimCommand(client *c) {
if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount, if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
"Invalid RETRYCOUNT option argument for XCLAIM") "Invalid RETRYCOUNT option argument for XCLAIM")
!= C_OK) return; != C_OK) return;
} else if (!strcasecmp(opt,"LASTID") && moreargs) {
j++;
streamID id;
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
/* Technically it could be more correct to update that only after
* checking for syntax errors, but this option is only used by
* the replication command that outputs correct syntax. */
if (streamCompareID(&id,&group->last_id) > 0) group->last_id = id;
} else { } else {
addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
return; return;
@ -2147,7 +2192,7 @@ void xclaimCommand(client *c) {
arraylen++; arraylen++;
/* Propagate this change. */ /* Propagate this change. */
streamPropagateXCLAIM(c,c->argv[1],c->argv[3],c->argv[j],nack); streamPropagateXCLAIM(c,c->argv[1],group,c->argv[3],c->argv[j],nack);
server.dirty++; server.dirty++;
} }
} }