diff --git a/src/t_set.c b/src/t_set.c index c65203396..7f620daba 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -807,13 +807,14 @@ void spopWithCountCommand(client *c) { /* Case 2 and 3 require to replicate SPOP as a set of SREM commands. * Prepare our replication argument vector. Also send the array length * which is common to both the code paths. */ - robj *propargv[3]; + unsigned long batchsize = count > 1024 ? 1024 : count; + robj **propargv = zmalloc(sizeof(robj *) * (2 + batchsize)); propargv[0] = shared.srem; propargv[1] = c->argv[1]; + unsigned long propindex = 2; addReplySetLen(c,count); /* Common iteration vars. */ - robj *objele; char *str; size_t len; int64_t llele; @@ -841,16 +842,19 @@ void spopWithCountCommand(client *c) { if (str) { addReplyBulkCBuffer(c, str, len); - objele = createStringObject(str, len); + propargv[propindex++] = createStringObject(str, len); } else { addReplyBulkLongLong(c, llele); - objele = createStringObjectFromLongLong(llele); + propargv[propindex++] = createStringObjectFromLongLong(llele); } - /* Replicate/AOF this command as an SREM operation */ - propargv[2] = objele; - alsoPropagate(c->db->id,propargv,3,PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(objele); + if (propindex == 2 + batchsize) { + alsoPropagate(c->db->id, propargv, propindex, PROPAGATE_AOF | PROPAGATE_REPL); + for (unsigned long j = 2; j < propindex; j++) { + decrRefCount(propargv[j]); + } + propindex = 2; + } /* Store pointer for later deletion and move to next. */ ps[i] = p; @@ -861,14 +865,18 @@ void spopWithCountCommand(client *c) { zfree(ps); set->ptr = lp; } else if (remaining*SPOP_MOVE_STRATEGY_MUL > count) { - while(count--) { - objele = setTypePopRandom(set); - addReplyBulk(c, objele); - + for (unsigned long i = 0; i < count; i++) { + propargv[propindex] = setTypePopRandom(set); + addReplyBulk(c, propargv[propindex]); + propindex++; /* Replicate/AOF this command as an SREM operation */ - propargv[2] = objele; - alsoPropagate(c->db->id,propargv,3,PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(objele); + if (propindex == 2 + batchsize) { + alsoPropagate(c->db->id, propargv, propindex, PROPAGATE_AOF | PROPAGATE_REPL); + for (unsigned long j = 2; j < propindex; j++) { + decrRefCount(propargv[j]); + } + propindex = 2; + } } } else { /* CASE 3: The number of elements to return is very big, approaching @@ -918,16 +926,19 @@ void spopWithCountCommand(client *c) { while (setTypeNext(si, &str, &len, &llele) != -1) { if (str == NULL) { addReplyBulkLongLong(c,llele); - objele = createStringObjectFromLongLong(llele); + propargv[propindex++] = createStringObjectFromLongLong(llele); } else { addReplyBulkCBuffer(c, str, len); - objele = createStringObject(str, len); + propargv[propindex++] = createStringObject(str, len); } - /* Replicate/AOF this command as an SREM operation */ - propargv[2] = objele; - alsoPropagate(c->db->id,propargv,3,PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(objele); + if (propindex == 2 + batchsize) { + alsoPropagate(c->db->id, propargv, propindex, PROPAGATE_AOF | PROPAGATE_REPL); + for (unsigned long i = 2; i < propindex; i++) { + decrRefCount(propargv[i]); + } + propindex = 2; + } } setTypeReleaseIterator(si); @@ -935,6 +946,16 @@ void spopWithCountCommand(client *c) { dbReplaceValue(c->db,c->argv[1],newset); } + /* Replicate/AOF the remaining elements as an SREM operation */ + if (propindex != 2) { + alsoPropagate(c->db->id, propargv, propindex, PROPAGATE_AOF | PROPAGATE_REPL); + for (unsigned long i = 2; i < propindex; i++) { + decrRefCount(propargv[i]); + } + propindex = 2; + } + zfree(propargv); + /* Don't propagate the command itself even if we incremented the * dirty counter. We don't want to propagate an SPOP command since * we propagated the command as a set of SREMs operations using diff --git a/tests/integration/replication-4.tcl b/tests/integration/replication-4.tcl index 867ef364e..4370080b0 100644 --- a/tests/integration/replication-4.tcl +++ b/tests/integration/replication-4.tcl @@ -204,6 +204,30 @@ start_server {tags {"repl external:skip"}} { assert {[$master dbsize] > 0} } + test {spopwithcount rewrite srem command} { + $master del myset + + set content {} + for {set j 0} {$j < 4000} {} { + lappend content [incr j] + } + $master sadd myset {*}$content + $master spop myset 1023 + $master spop myset 1024 + $master spop myset 1025 + + assert_match 928 [$master scard myset] + assert_match {*calls=3,*} [cmdrstat spop $master] + + wait_for_condition 50 100 { + [status $slave master_repl_offset] == [status $master master_repl_offset] + } else { + fail "SREM replication inconsistency." + } + assert_match {*calls=4,*} [cmdrstat srem $slave] + assert_match 928 [$slave scard myset] + } + test {Replication of SPOP command -- alsoPropagate() API} { $master del myset set size [expr 1+[randomInt 100]]