From 4cac0ca35a39d5f29be278c973b915a68f38f20b Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 28 Sep 2019 00:10:46 -0400 Subject: [PATCH] Implement left and right shift BITOP operators Former-commit-id: ba365298ed37a76f0a8630e0ec6c86393293aebe --- src/bitops.cpp | 235 +++++++++++++++++++++++++++--------------- src/server.h | 2 +- tests/unit/bitops.tcl | 58 +++++++++++ 3 files changed, 213 insertions(+), 82 deletions(-) diff --git a/src/bitops.cpp b/src/bitops.cpp index 02034f377..bcb8840b3 100644 --- a/src/bitops.cpp +++ b/src/bitops.cpp @@ -396,6 +396,8 @@ void printBits(unsigned char *p, unsigned long count) { #define BITOP_OR 1 #define BITOP_XOR 2 #define BITOP_NOT 3 +#define BITOP_LSHIFT 4 +#define BITOP_RSHIFT 5 #define BITFIELDOP_GET 0 #define BITFIELDOP_SET 1 @@ -592,7 +594,8 @@ void bitopCommand(client *c) { char *opname = szFromObj(c->argv[1]); robj *targetkey = c->argv[2]; robj_roptr o; - unsigned long op, j, numkeys; + int op; + unsigned long j, numkeys; robj_roptr *objects; /* Array of source objects. */ unsigned char **src; /* Array of source strings pointers. */ unsigned long *len, maxlen = 0; /* Array of length of src strings, @@ -609,6 +612,10 @@ void bitopCommand(client *c) { op = BITOP_XOR; else if((opname[0] == 'n' || opname[0] == 'N') && !strcasecmp(opname,"not")) op = BITOP_NOT; + else if (!strcasecmp(opname, "lshift")) + op = BITOP_LSHIFT; + else if (!strcasecmp(opname, "rshift")) + op = BITOP_RSHIFT; else { addReply(c,shared.syntaxerr); return; @@ -620,8 +627,25 @@ void bitopCommand(client *c) { return; } + bool fShiftOp = (op == BITOP_LSHIFT) || (op == BITOP_RSHIFT); + long long shift = 0; + + /* Sanity check: SHIFTS only accept a single arg and an integer */ + if (fShiftOp) { + if (c->argc != 5) { + addReplyError(c,"BITOP SHIFT must be called with a single source key and an integer shift."); + return; + } + if (getLongLongFromObject(c->argv[4], &shift) != C_OK) { + addReplyError(c, "BITOP SHIFT's last parameter must be an integer"); + return; + } + if (op == BITOP_RSHIFT) + shift = -shift; + } + /* Lookup keys, and store pointers to the string objects into an array. */ - numkeys = c->argc - 3; + numkeys = c->argc - (fShiftOp ? 4 : 3); src = (unsigned char**)zmalloc(sizeof(unsigned char*) * numkeys, MALLOC_LOCAL); len = (unsigned long*)zmalloc(sizeof(long) * numkeys, MALLOC_LOCAL); objects = (robj_roptr*)zmalloc(sizeof(robj_roptr) * numkeys, MALLOC_LOCAL); @@ -654,94 +678,143 @@ void bitopCommand(client *c) { if (j == 0 || len[j] < minlen) minlen = len[j]; } - /* Compute the bit operation, if at least one string is not empty. */ - if (maxlen) { - res = (unsigned char*) sdsnewlen(NULL,maxlen); - unsigned char output, byte; - unsigned long i; + if (fShiftOp) + { + long newlen = (long)maxlen + shift/CHAR_BIT; + if (shift > 0 && (shift % CHAR_BIT) != 0) + newlen++; - /* Fast path: as far as we have data for all the input bitmaps we - * can take a fast path that performs much better than the - * vanilla algorithm. On ARM we skip the fast path since it will - * result in GCC compiling the code using multiple-words load/store - * operations that are not supported even in ARM >= v6. */ - j = 0; - #ifndef USE_ALIGNED_ACCESS - if (minlen >= sizeof(unsigned long)*4 && numkeys <= 16) { - unsigned long *lp[16]; - unsigned long *lres = (unsigned long*) res; + if (newlen < 0) + newlen = 0; + + if (newlen) + { + res = (unsigned char*) sdsnewlen(NULL,newlen); + if (shift >= 0) + { // left shift + long byteoffset = shift/CHAR_BIT; + memset(res, 0, byteoffset); + long srcLen = newlen - byteoffset - ((shift % CHAR_BIT) ? 1 : 0); - /* Note: sds pointer is always aligned to 8 byte boundary. */ - memcpy(lp,src,sizeof(unsigned long*)*numkeys); - memcpy(res,src[0],minlen); - - /* Different branches per different operations for speed (sorry). */ - if (op == BITOP_AND) { - while(minlen >= sizeof(unsigned long)*4) { - for (i = 1; i < numkeys; i++) { - lres[0] &= lp[i][0]; - lres[1] &= lp[i][1]; - lres[2] &= lp[i][2]; - lres[3] &= lp[i][3]; - lp[i]+=4; - } - lres+=4; - j += sizeof(unsigned long)*4; - minlen -= sizeof(unsigned long)*4; + // now the bitshift+copy + unsigned bitshift = shift % CHAR_BIT; + unsigned char carry = 0; + for (long iSrc = 0; iSrc < srcLen; ++iSrc) + { + res[byteoffset+iSrc] = (src[0][iSrc] << bitshift) | carry; + carry = src[0][iSrc] >> (CHAR_BIT - bitshift); } - } else if (op == BITOP_OR) { - while(minlen >= sizeof(unsigned long)*4) { - for (i = 1; i < numkeys; i++) { - lres[0] |= lp[i][0]; - lres[1] |= lp[i][1]; - lres[2] |= lp[i][2]; - lres[3] |= lp[i][3]; - lp[i]+=4; - } - lres+=4; - j += sizeof(unsigned long)*4; - minlen -= sizeof(unsigned long)*4; - } - } else if (op == BITOP_XOR) { - while(minlen >= sizeof(unsigned long)*4) { - for (i = 1; i < numkeys; i++) { - lres[0] ^= lp[i][0]; - lres[1] ^= lp[i][1]; - lres[2] ^= lp[i][2]; - lres[3] ^= lp[i][3]; - lp[i]+=4; - } - lres+=4; - j += sizeof(unsigned long)*4; - minlen -= sizeof(unsigned long)*4; - } - } else if (op == BITOP_NOT) { - while(minlen >= sizeof(unsigned long)*4) { - lres[0] = ~lres[0]; - lres[1] = ~lres[1]; - lres[2] = ~lres[2]; - lres[3] = ~lres[3]; - lres+=4; - j += sizeof(unsigned long)*4; - minlen -= sizeof(unsigned long)*4; + if (bitshift) + res[newlen-1] = carry; + } + else + { // right shift + long byteoffset = -shift/CHAR_BIT; + unsigned bitshift = -shift % CHAR_BIT; + if (bitshift) + ++byteoffset; + res[0] = (src[0][byteoffset] << (CHAR_BIT-bitshift)); + if (byteoffset > 0) + res[0] |= (src[0][byteoffset-1] >> bitshift); + for (long idx = 1; idx < newlen; ++idx) + { + res[idx] = (src[0][byteoffset+idx] << (CHAR_BIT-bitshift)) | (src[0][byteoffset+idx-1] >> bitshift); } } } - #endif + maxlen = newlen; // this is to ensure we DEL below if newlen was 0 + } + else + { + /* Compute the bit operation, if at least one string is not empty. */ + if (maxlen) { + res = (unsigned char*) sdsnewlen(NULL,maxlen); + unsigned char output, byte; + unsigned long i; - /* j is set to the next byte to process by the previous loop. */ - for (; j < maxlen; j++) { - output = (len[0] <= j) ? 0 : src[0][j]; - if (op == BITOP_NOT) output = ~output; - for (i = 1; i < numkeys; i++) { - byte = (len[i] <= j) ? 0 : src[i][j]; - switch(op) { - case BITOP_AND: output &= byte; break; - case BITOP_OR: output |= byte; break; - case BITOP_XOR: output ^= byte; break; + /* Fast path: as far as we have data for all the input bitmaps we + * can take a fast path that performs much better than the + * vanilla algorithm. On ARM we skip the fast path since it will + * result in GCC compiling the code using multiple-words load/store + * operations that are not supported even in ARM >= v6. */ + j = 0; + #ifndef USE_ALIGNED_ACCESS + if (minlen >= sizeof(unsigned long)*4 && numkeys <= 16) { + unsigned long *lp[16]; + unsigned long *lres = (unsigned long*) res; + + /* Note: sds pointer is always aligned to 8 byte boundary. */ + memcpy(lp,src,sizeof(unsigned long*)*numkeys); + memcpy(res,src[0],minlen); + + /* Different branches per different operations for speed (sorry). */ + if (op == BITOP_AND) { + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] &= lp[i][0]; + lres[1] &= lp[i][1]; + lres[2] &= lp[i][2]; + lres[3] &= lp[i][3]; + lp[i]+=4; + } + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_OR) { + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] |= lp[i][0]; + lres[1] |= lp[i][1]; + lres[2] |= lp[i][2]; + lres[3] |= lp[i][3]; + lp[i]+=4; + } + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_XOR) { + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] ^= lp[i][0]; + lres[1] ^= lp[i][1]; + lres[2] ^= lp[i][2]; + lres[3] ^= lp[i][3]; + lp[i]+=4; + } + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_NOT) { + while(minlen >= sizeof(unsigned long)*4) { + lres[0] = ~lres[0]; + lres[1] = ~lres[1]; + lres[2] = ~lres[2]; + lres[3] = ~lres[3]; + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } } } - res[j] = output; + #endif + + /* j is set to the next byte to process by the previous loop. */ + for (; j < maxlen; j++) { + output = (len[0] <= j) ? 0 : src[0][j]; + if (op == BITOP_NOT) output = ~output; + for (i = 1; i < numkeys; i++) { + byte = (len[i] <= j) ? 0 : src[i][j]; + switch(op) { + case BITOP_AND: output &= byte; break; + case BITOP_OR: output |= byte; break; + case BITOP_XOR: output ^= byte; break; + } + } + res[j] = output; + } } } for (j = 0; j < numkeys; j++) { diff --git a/src/server.h b/src/server.h index dbcff38f5..6a2bda9fa 100644 --- a/src/server.h +++ b/src/server.h @@ -729,7 +729,7 @@ typedef struct redisObject { * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ private: - mutable std::atomic refcount; + mutable std::atomic refcount {0}; public: uint64_t mvcc_tstamp; void *m_ptr; diff --git a/tests/unit/bitops.tcl b/tests/unit/bitops.tcl index 926f38295..f8a5cbe18 100644 --- a/tests/unit/bitops.tcl +++ b/tests/unit/bitops.tcl @@ -214,6 +214,64 @@ start_server {tags {"bitops"}} { r bitop or x a b } {32} + test {BITOP lshift size} { + r set a " " + r bitop lshift x a 1 + } {2} + + test {BITOP rshift size} { + r set a " " + r bitop rshift x a 1 + } {1} + + test {BITOP rshift 0 byte} { + r set a " " + r bitop rshift x a 8 + } {0} + + test {BITOP rshift underflow} { + r set a " " + r bitop rshift x a 65 + } {0} + + test {BITOP lshift string} { + r set a "abcdefg" + r bitop lshift x a 8 + r get x + } "\x00abcdefg" + + test {BITOP lshift char} { + r set a "\xAA" + r bitop lshift x a 4 + r get x + } "\xA0\x0A" + + test {BITOP rshift char} { + r set a "\xAA" + r bitop rshift x a 3 + r get x + } "\x15" + + test {BITOP lshift carry} { + r set a "\xFF" + r bitop lshift x a 1 + r get x + } "\xFE\x01" + + test {BITOP rshift carry} { + r set a "\x00\xFF" + r bitop rshift x a 1 + r get x + } "\x80\x7F" + + test {BITOP rshift reciprocal} { + r flushdb + r set a "abcdefg" + r bitop lshift b a 14 + r bitop rshift res b 14 + r get res + } "abcdefg\x00" + test {BITPOS bit=0 with empty key returns 0} { r del str r bitpos str 0