parent
e5f1de1448
commit
3e78344d87
@ -1119,6 +1119,19 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin
|
|||||||
return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
|
return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* We propagate MAXLEN ~ <count> as MAXLEN = <resulting-len-of-stream>
|
||||||
|
* otherwise trimming is no longer determinsitic on replicas / AOF. */
|
||||||
|
void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) {
|
||||||
|
robj *maxlen_obj = createStringObjectFromLongLong(s->length);
|
||||||
|
robj *equal_obj = createStringObject("=",1);
|
||||||
|
|
||||||
|
rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
|
||||||
|
rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
|
||||||
|
|
||||||
|
decrRefCount(equal_obj);
|
||||||
|
decrRefCount(maxlen_obj);
|
||||||
|
}
|
||||||
|
|
||||||
/* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */
|
/* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */
|
||||||
void xaddCommand(client *c) {
|
void xaddCommand(client *c) {
|
||||||
streamID id;
|
streamID id;
|
||||||
@ -1198,17 +1211,7 @@ void xaddCommand(client *c) {
|
|||||||
if (streamTrimByLength(s,maxlen,approx_maxlen)) {
|
if (streamTrimByLength(s,maxlen,approx_maxlen)) {
|
||||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
|
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
|
||||||
}
|
}
|
||||||
|
if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
|
||||||
/* Rewrite approximated MAXLEN as specified s->length. */
|
|
||||||
if (approx_maxlen) {
|
|
||||||
robj *maxlen_obj = createStringObjectFromLongLong(s->length);
|
|
||||||
rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
|
|
||||||
decrRefCount(maxlen_obj);
|
|
||||||
|
|
||||||
robj *equal_obj = createStringObject("=",1);
|
|
||||||
rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
|
|
||||||
decrRefCount(equal_obj);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Let's rewrite the ID argument with the one actually generated for
|
/* Let's rewrite the ID argument with the one actually generated for
|
||||||
@ -2262,17 +2265,7 @@ void xtrimCommand(client *c) {
|
|||||||
signalModifiedKey(c->db,c->argv[1]);
|
signalModifiedKey(c->db,c->argv[1]);
|
||||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
|
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
|
||||||
server.dirty += deleted;
|
server.dirty += deleted;
|
||||||
|
if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
|
||||||
/* Rewrite approximated MAXLEN as specified s->length. */
|
|
||||||
if (approx_maxlen) {
|
|
||||||
robj *maxlen_obj = createStringObjectFromLongLong(s->length);
|
|
||||||
rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
|
|
||||||
decrRefCount(maxlen_obj);
|
|
||||||
|
|
||||||
robj *equal_obj = createStringObject("=",1);
|
|
||||||
rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
|
|
||||||
decrRefCount(equal_obj);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
addReplyLongLong(c,deleted);
|
addReplyLongLong(c,deleted);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user