optimize spopwithcount propagation (#12082)

A single SPOP with command with count argument resulted in many SPOP
commands being propagated to the replica.
This is inefficient because the key name is repeated many times, and is also
being looked-up many times.
also it results in high QPS metrics on the replica.
To solve that, we flush batches of 1024 fields per SPOP command.

Co-authored-by: zhaozhao.zz <zhaozhao.zz@alibaba-inc.com>
This commit is contained in:
binfeng-xin 2023-05-22 15:27:14 +08:00 committed by GitHub
parent 48757934ff
commit 38e284f106
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 21 deletions

View File

@ -807,13 +807,14 @@ void spopWithCountCommand(client *c) {
/* Case 2 and 3 require to replicate SPOP as a set of SREM commands.
* Prepare our replication argument vector. Also send the array length
* which is common to both the code paths. */
robj *propargv[3];
unsigned long batchsize = count > 1024 ? 1024 : count;
robj **propargv = zmalloc(sizeof(robj *) * (2 + batchsize));
propargv[0] = shared.srem;
propargv[1] = c->argv[1];
unsigned long propindex = 2;
addReplySetLen(c,count);
/* Common iteration vars. */
robj *objele;
char *str;
size_t len;
int64_t llele;
@ -841,16 +842,19 @@ void spopWithCountCommand(client *c) {
if (str) {
addReplyBulkCBuffer(c, str, len);
objele = createStringObject(str, len);
propargv[propindex++] = createStringObject(str, len);
} else {
addReplyBulkLongLong(c, llele);
objele = createStringObjectFromLongLong(llele);
propargv[propindex++] = createStringObjectFromLongLong(llele);
}
/* Replicate/AOF this command as an SREM operation */
propargv[2] = objele;
alsoPropagate(c->db->id,propargv,3,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(objele);
if (propindex == 2 + batchsize) {
alsoPropagate(c->db->id, propargv, propindex, PROPAGATE_AOF | PROPAGATE_REPL);
for (unsigned long j = 2; j < propindex; j++) {
decrRefCount(propargv[j]);
}
propindex = 2;
}
/* Store pointer for later deletion and move to next. */
ps[i] = p;
@ -861,14 +865,18 @@ void spopWithCountCommand(client *c) {
zfree(ps);
set->ptr = lp;
} else if (remaining*SPOP_MOVE_STRATEGY_MUL > count) {
while(count--) {
objele = setTypePopRandom(set);
addReplyBulk(c, objele);
for (unsigned long i = 0; i < count; i++) {
propargv[propindex] = setTypePopRandom(set);
addReplyBulk(c, propargv[propindex]);
propindex++;
/* Replicate/AOF this command as an SREM operation */
propargv[2] = objele;
alsoPropagate(c->db->id,propargv,3,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(objele);
if (propindex == 2 + batchsize) {
alsoPropagate(c->db->id, propargv, propindex, PROPAGATE_AOF | PROPAGATE_REPL);
for (unsigned long j = 2; j < propindex; j++) {
decrRefCount(propargv[j]);
}
propindex = 2;
}
}
} else {
/* CASE 3: The number of elements to return is very big, approaching
@ -918,16 +926,19 @@ void spopWithCountCommand(client *c) {
while (setTypeNext(si, &str, &len, &llele) != -1) {
if (str == NULL) {
addReplyBulkLongLong(c,llele);
objele = createStringObjectFromLongLong(llele);
propargv[propindex++] = createStringObjectFromLongLong(llele);
} else {
addReplyBulkCBuffer(c, str, len);
objele = createStringObject(str, len);
propargv[propindex++] = createStringObject(str, len);
}
/* Replicate/AOF this command as an SREM operation */
propargv[2] = objele;
alsoPropagate(c->db->id,propargv,3,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(objele);
if (propindex == 2 + batchsize) {
alsoPropagate(c->db->id, propargv, propindex, PROPAGATE_AOF | PROPAGATE_REPL);
for (unsigned long i = 2; i < propindex; i++) {
decrRefCount(propargv[i]);
}
propindex = 2;
}
}
setTypeReleaseIterator(si);
@ -935,6 +946,16 @@ void spopWithCountCommand(client *c) {
dbReplaceValue(c->db,c->argv[1],newset);
}
/* Replicate/AOF the remaining elements as an SREM operation */
if (propindex != 2) {
alsoPropagate(c->db->id, propargv, propindex, PROPAGATE_AOF | PROPAGATE_REPL);
for (unsigned long i = 2; i < propindex; i++) {
decrRefCount(propargv[i]);
}
propindex = 2;
}
zfree(propargv);
/* Don't propagate the command itself even if we incremented the
* dirty counter. We don't want to propagate an SPOP command since
* we propagated the command as a set of SREMs operations using

View File

@ -204,6 +204,30 @@ start_server {tags {"repl external:skip"}} {
assert {[$master dbsize] > 0}
}
test {spopwithcount rewrite srem command} {
$master del myset
set content {}
for {set j 0} {$j < 4000} {} {
lappend content [incr j]
}
$master sadd myset {*}$content
$master spop myset 1023
$master spop myset 1024
$master spop myset 1025
assert_match 928 [$master scard myset]
assert_match {*calls=3,*} [cmdrstat spop $master]
wait_for_condition 50 100 {
[status $slave master_repl_offset] == [status $master master_repl_offset]
} else {
fail "SREM replication inconsistency."
}
assert_match {*calls=4,*} [cmdrstat srem $slave]
assert_match 928 [$slave scard myset]
}
test {Replication of SPOP command -- alsoPropagate() API} {
$master del myset
set size [expr 1+[randomInt 100]]