PFCOUNT support for multi-key union.
This commit is contained in:
parent
fcd2155b6f
commit
0feb2aabca
@ -198,8 +198,9 @@ struct hllhdr {
|
|||||||
#define HLL_REGISTER_MAX ((1<<HLL_BITS)-1)
|
#define HLL_REGISTER_MAX ((1<<HLL_BITS)-1)
|
||||||
#define HLL_HDR_SIZE sizeof(struct hllhdr)
|
#define HLL_HDR_SIZE sizeof(struct hllhdr)
|
||||||
#define HLL_DENSE_SIZE (HLL_HDR_SIZE+((HLL_REGISTERS*HLL_BITS+7)/8))
|
#define HLL_DENSE_SIZE (HLL_HDR_SIZE+((HLL_REGISTERS*HLL_BITS+7)/8))
|
||||||
#define HLL_DENSE 0 /* Dense encoding */
|
#define HLL_DENSE 0 /* Dense encoding. */
|
||||||
#define HLL_SPARSE 1 /* Sparse encoding */
|
#define HLL_SPARSE 1 /* Sparse encoding. */
|
||||||
|
#define HLL_RAW 255 /* Only used internally, never exposed. */
|
||||||
#define HLL_MAX_ENCODING 1
|
#define HLL_MAX_ENCODING 1
|
||||||
|
|
||||||
static char *invalid_hll_err = "-INVALIDOBJ Corrupted HLL object detected\r\n";
|
static char *invalid_hll_err = "-INVALIDOBJ Corrupted HLL object detected\r\n";
|
||||||
@ -920,12 +921,37 @@ double hllSparseSum(uint8_t *sparse, int sparselen, double *PE, int *ezp, int *i
|
|||||||
* as helpers to compute the SUM(2^-reg) part of the computation, which is
|
* as helpers to compute the SUM(2^-reg) part of the computation, which is
|
||||||
* representation-specific, while all the rest is common. */
|
* representation-specific, while all the rest is common. */
|
||||||
|
|
||||||
|
/* Implements the SUM operation for uint8_t data type which is only used
|
||||||
|
* internally as speedup for PFCOUNT with multiple keys. */
|
||||||
|
double hllRawSum(uint8_t *registers, double *PE, int *ezp) {
|
||||||
|
double E = 0;
|
||||||
|
int j, ez = 0;
|
||||||
|
unsigned long reg;
|
||||||
|
|
||||||
|
for (j = 0; j < HLL_REGISTERS; j++) {
|
||||||
|
reg = registers[j];
|
||||||
|
if (reg == 0) {
|
||||||
|
ez++;
|
||||||
|
E += 1; /* 2^(-reg[j]) is 1 when m is 0. */
|
||||||
|
} else {
|
||||||
|
E += PE[reg]; /* Precomputed 2^(-reg[j]). */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*ezp = ez;
|
||||||
|
return E;
|
||||||
|
}
|
||||||
|
|
||||||
/* Return the approximated cardinality of the set based on the armonic
|
/* Return the approximated cardinality of the set based on the armonic
|
||||||
* mean of the registers values. 'hdr' points to the start of the SDS
|
* mean of the registers values. 'hdr' points to the start of the SDS
|
||||||
* representing the String object holding the HLL representation.
|
* representing the String object holding the HLL representation.
|
||||||
*
|
*
|
||||||
* If the sparse representation of the HLL object is not valid, the integer
|
* If the sparse representation of the HLL object is not valid, the integer
|
||||||
* pointed by 'invalid' is set to non-zero, otherwise it is left untouched. */
|
* pointed by 'invalid' is set to non-zero, otherwise it is left untouched.
|
||||||
|
*
|
||||||
|
* hllCount() supports a special internal-only encoding of HLL_RAW, that
|
||||||
|
* is, hdr->registers will point to an uint8_t array of HLL_REGISTERS element.
|
||||||
|
* This is useful in order to speedup PFCOUNT when called against multiple
|
||||||
|
* keys (no need to work with 6-bit integers encoding). */
|
||||||
uint64_t hllCount(struct hllhdr *hdr, int *invalid) {
|
uint64_t hllCount(struct hllhdr *hdr, int *invalid) {
|
||||||
double m = HLL_REGISTERS;
|
double m = HLL_REGISTERS;
|
||||||
double E, alpha = 0.7213/(1+1.079/m);
|
double E, alpha = 0.7213/(1+1.079/m);
|
||||||
@ -947,9 +973,13 @@ uint64_t hllCount(struct hllhdr *hdr, int *invalid) {
|
|||||||
/* Compute SUM(2^-register[0..i]). */
|
/* Compute SUM(2^-register[0..i]). */
|
||||||
if (hdr->encoding == HLL_DENSE) {
|
if (hdr->encoding == HLL_DENSE) {
|
||||||
E = hllDenseSum(hdr->registers,PE,&ez);
|
E = hllDenseSum(hdr->registers,PE,&ez);
|
||||||
} else {
|
} else if (hdr->encoding == HLL_SPARSE) {
|
||||||
E = hllSparseSum(hdr->registers,
|
E = hllSparseSum(hdr->registers,
|
||||||
sdslen((sds)hdr)-HLL_HDR_SIZE,PE,&ez,invalid);
|
sdslen((sds)hdr)-HLL_HDR_SIZE,PE,&ez,invalid);
|
||||||
|
} else if (hdr->encoding == HLL_RAW) {
|
||||||
|
E = hllRawSum(hdr->registers,PE,&ez);
|
||||||
|
} else {
|
||||||
|
redisPanic("Unknown HyperLogLog encoding in hllCount()");
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Muliply the inverse of E for alpha_m * m^2 to have the raw estimate. */
|
/* Muliply the inverse of E for alpha_m * m^2 to have the raw estimate. */
|
||||||
@ -1151,10 +1181,47 @@ void pfaddCommand(redisClient *c) {
|
|||||||
|
|
||||||
/* PFCOUNT var -> approximated cardinality of set. */
|
/* PFCOUNT var -> approximated cardinality of set. */
|
||||||
void pfcountCommand(redisClient *c) {
|
void pfcountCommand(redisClient *c) {
|
||||||
robj *o = lookupKeyRead(c->db,c->argv[1]);
|
robj *o;
|
||||||
struct hllhdr *hdr;
|
struct hllhdr *hdr;
|
||||||
uint64_t card;
|
uint64_t card;
|
||||||
|
|
||||||
|
/* Case 1: multi-key keys, cardinality of the union.
|
||||||
|
*
|
||||||
|
* When multiple keys are specified, PFCOUNT actually computes
|
||||||
|
* the cardinality of the merge of the N HLLs specified. */
|
||||||
|
if (c->argc > 2) {
|
||||||
|
uint8_t max[HLL_HDR_SIZE+HLL_REGISTERS], *registers;
|
||||||
|
int j;
|
||||||
|
|
||||||
|
/* Compute an HLL with M[i] = MAX(M[i]_j). */
|
||||||
|
memset(max,0,sizeof(max));
|
||||||
|
hdr = (struct hllhdr*) max;
|
||||||
|
hdr->encoding = HLL_RAW; /* Special internal-only encoding. */
|
||||||
|
registers = max + HLL_HDR_SIZE;
|
||||||
|
for (j = 1; j < c->argc; j++) {
|
||||||
|
/* Check type and size. */
|
||||||
|
robj *o = lookupKeyRead(c->db,c->argv[j]);
|
||||||
|
if (o == NULL) continue; /* Assume empty HLL for non existing var. */
|
||||||
|
if (isHLLObjectOrReply(c,o) != REDIS_OK) return;
|
||||||
|
|
||||||
|
/* Merge with this HLL with our 'max' HHL by setting max[i]
|
||||||
|
* to MAX(max[i],hll[i]). */
|
||||||
|
if (hllMerge(registers,o) == REDIS_ERR) {
|
||||||
|
addReplySds(c,sdsnew(invalid_hll_err));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Compute cardinality of the resulting set. */
|
||||||
|
addReplyLongLong(c,hllCount(hdr,NULL));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Case 2: cardinality of the single HLL.
|
||||||
|
*
|
||||||
|
* The user specified a single key. Either return the cached value
|
||||||
|
* or compute one and update the cache. */
|
||||||
|
o = lookupKeyRead(c->db,c->argv[1]);
|
||||||
if (o == NULL) {
|
if (o == NULL) {
|
||||||
/* No key? Cardinality is zero since no element was added, otherwise
|
/* No key? Cardinality is zero since no element was added, otherwise
|
||||||
* we would have a key as HLLADD creates it as a side effect. */
|
* we would have a key as HLLADD creates it as a side effect. */
|
||||||
|
@ -274,7 +274,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
{"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0},
|
{"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0},
|
||||||
{"pfselftest",pfselftestCommand,1,"r",0,NULL,0,0,0,0,0},
|
{"pfselftest",pfselftestCommand,1,"r",0,NULL,0,0,0,0,0},
|
||||||
{"pfadd",pfaddCommand,-2,"wm",0,NULL,1,1,1,0,0},
|
{"pfadd",pfaddCommand,-2,"wm",0,NULL,1,1,1,0,0},
|
||||||
{"pfcount",pfcountCommand,2,"w",0,NULL,1,1,1,0,0},
|
{"pfcount",pfcountCommand,-2,"w",0,NULL,1,1,1,0,0},
|
||||||
{"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
|
{"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
|
||||||
{"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0}
|
{"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0}
|
||||||
};
|
};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user