diff --git a/src/bitops.c b/src/bitops.c index 2f62ccc46..d309b7afa 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -162,6 +162,7 @@ void bitopCommand(redisClient *c) { robj **objects; /* Array of soruce objects. */ unsigned char **src; /* Array of source strings pointers. */ long *len, maxlen = 0; /* Array of length of src strings, and max len. */ + long minlen = 0; /* Min len among the input keys. */ unsigned char *res = NULL; /* Resulting string. */ /* Parse the operation name. */ @@ -213,6 +214,7 @@ void bitopCommand(redisClient *c) { src[j] = objects[j]->ptr; len[j] = sdslen(objects[j]->ptr); if (len[j] > maxlen) maxlen = len[j]; + if (j == 0 || len[j] < minlen) minlen = len[j]; } /* Compute the bit operation, if at least one string is not empty. */ @@ -221,7 +223,73 @@ void bitopCommand(redisClient *c) { unsigned char output, byte; long i; - for (j = 0; j < maxlen; j++) { + /* Fast path: as far as we have data for all the input bitmaps we + * can take a fast path that performs much better than the + * vanilla algorithm. */ + j = 0; + if (minlen && numkeys <= 16) { + unsigned long *lp[16]; + unsigned long *lres = (unsigned long*) res; + + /* Note: sds pointer is always aligned to 8 byte boundary. */ + memcpy(lp,src,sizeof(unsigned long*)*numkeys); + memcpy(res,src[0],minlen); + + /* Different branches per different operations for speed (sorry). */ + if (op == BITOP_AND) { + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] &= lp[i][0]; + lres[1] &= lp[i][1]; + lres[2] &= lp[i][2]; + lres[3] &= lp[i][3]; + lp[i]+=4; + } + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_OR) { + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] |= lp[i][0]; + lres[1] |= lp[i][1]; + lres[2] |= lp[i][2]; + lres[3] |= lp[i][3]; + lp[i]+=4; + } + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_XOR) { + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] ^= lp[i][0]; + lres[1] ^= lp[i][1]; + lres[2] ^= lp[i][2]; + lres[3] ^= lp[i][3]; + lp[i]+=4; + } + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_NOT) { + while(minlen >= sizeof(unsigned long)*4) { + lres[0] = ~lres[0]; + lres[1] = ~lres[1]; + lres[2] = ~lres[2]; + lres[3] = ~lres[3]; + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } + } + + /* j is set to the next byte to process by the previous loop. */ + for (; j < maxlen; j++) { output = (len[0] <= j) ? 0 : src[0][j]; if (op == BITOP_NOT) output = ~output; for (i = 1; i < numkeys; i++) { diff --git a/tests/unit/bitops.tcl b/tests/unit/bitops.tcl index b7a76abb1..c8f58ef1e 100644 --- a/tests/unit/bitops.tcl +++ b/tests/unit/bitops.tcl @@ -24,13 +24,13 @@ proc simulate_bit_op {op args} { set out {} for {set x 0} {$x < $maxlen} {incr x} { set bit [string range $b(0) $x $x] + if {$op eq {not}} {set bit [expr {!$bit}]} for {set j 1} {$j < $count} {incr j} { set bit2 [string range $b($j) $x $x] switch $op { and {set bit [expr {$bit & $bit2}]} or {set bit [expr {$bit | $bit2}]} xor {set bit [expr {$bit ^ $bit2}]} - not {set bit [expr {!$bit}]} } } append out $bit @@ -135,6 +135,16 @@ start_server {tags {"bitops"}} { } } + test {BITOP NOT fuzzing} { + for {set i 0} {$i < 10} {incr i} { + r flushall + set str [randstring 0 1000] + r set str $str + r bitop not target str + assert_equal [r get target] [simulate_bit_op not $str] + } + } + test {BITOP with integer encoded source objects} { r set a 1 r set b 2