lookupKeyByPattern() used by SORT GET/BY rewritten. Fixes issue #460.

lookupKeyByPattern() was implemented with a trick to speedup the lookup
process allocating two fake Redis obejcts on the stack. However now that
we propagate expires to the slave as DEL operations the lookup of the
key may result into a call to expireIfNeeded() having the stack
allocated object as argument, that may in turn use it to create the
protocol to send to the slave. But since this fake obejcts are
inherently read-only this is a problem.

As a side effect of this fix there are no longer size limits in the
pattern to be used with GET/BY option of SORT.

See https://github.com/antirez/redis/issues/460 for bug details.
This commit is contained in:
antirez 2012-04-17 13:05:09 +02:00
parent 0705ff3f04
commit 3c25c4a691

View File

@ -9,21 +9,27 @@ redisSortOperation *createSortOperation(int type, robj *pattern) {
return so; return so;
} }
/* Return the value associated to the key with a name obtained /* Return the value associated to the key with a name obtained using
* substituting the first occurence of '*' in 'pattern' with 'subst'. * the following rules:
*
* 1) The first occurence of '*' in 'pattern' is substituted with 'subst'.
*
* 2) If 'pattern' matches the "->" string, everything on the left of
* the arrow is treated as the name of an hash field, and the part on the
* left as the key name containing an hash. The value of the specified
* field is returned.
*
* 3) If 'pattern' equals "#", the function simply returns 'subst' itself so
* that the SORT command can be used like: SORT key GET # to retrieve
* the Set/List elements directly.
*
* The returned object will always have its refcount increased by 1 * The returned object will always have its refcount increased by 1
* when it is non-NULL. */ * when it is non-NULL. */
robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
char *p, *f; char *p, *f, *k;
sds spat, ssub; sds spat, ssub;
robj keyobj, fieldobj, *o; robj *keyobj, *fieldobj, *o;
int prefixlen, sublen, postfixlen, fieldlen; int prefixlen, sublen, postfixlen, fieldlen;
/* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
struct {
int len;
int free;
char buf[REDIS_SORTKEY_MAX+1];
} keyname, fieldname;
/* If the pattern is "#" return the substitution object itself in order /* If the pattern is "#" return the substitution object itself in order
* to implement the "SORT ... GET #" feature. */ * to implement the "SORT ... GET #" feature. */
@ -37,9 +43,10 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
* a decoded object on the fly. Otherwise getDecodedObject will just * a decoded object on the fly. Otherwise getDecodedObject will just
* increment the ref count, that we'll decrement later. */ * increment the ref count, that we'll decrement later. */
subst = getDecodedObject(subst); subst = getDecodedObject(subst);
ssub = subst->ptr; ssub = subst->ptr;
if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
/* If we can't find '*' in the pattern we return NULL as to GET a
* fixed key does not make sense. */
p = strchr(spat,'*'); p = strchr(spat,'*');
if (!p) { if (!p) {
decrRefCount(subst); decrRefCount(subst);
@ -47,46 +54,49 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
} }
/* Find out if we're dealing with a hash dereference. */ /* Find out if we're dealing with a hash dereference. */
if ((f = strstr(p+1, "->")) != NULL) { if ((f = strstr(p+1, "->")) != NULL && *(f+2) != '\0') {
fieldlen = sdslen(spat)-(f-spat); fieldlen = sdslen(spat)-(f-spat)-2;
/* this also copies \0 character */ fieldobj = createStringObject(f+2,fieldlen);
memcpy(fieldname.buf,f+2,fieldlen-1);
fieldname.len = fieldlen-2;
} else { } else {
fieldlen = 0; fieldlen = 0;
} }
/* Perform the '*' substitution. */
prefixlen = p-spat; prefixlen = p-spat;
sublen = sdslen(ssub); sublen = sdslen(ssub);
postfixlen = sdslen(spat)-(prefixlen+1)-fieldlen; postfixlen = sdslen(spat)-(prefixlen+1)-(fieldlen ? fieldlen+2 : 0);
memcpy(keyname.buf,spat,prefixlen); keyobj = createStringObject(NULL,prefixlen+sublen+postfixlen);
memcpy(keyname.buf+prefixlen,ssub,sublen); k = keyobj->ptr;
memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen); memcpy(k,spat,prefixlen);
keyname.buf[prefixlen+sublen+postfixlen] = '\0'; memcpy(k+prefixlen,ssub,sublen);
keyname.len = prefixlen+sublen+postfixlen; memcpy(k+prefixlen+sublen,p+1,postfixlen);
decrRefCount(subst); decrRefCount(subst); /* Incremented by decodeObject() */
/* Lookup substituted key */ /* Lookup substituted key */
initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(struct sdshdr))); o = lookupKeyRead(db,keyobj);
o = lookupKeyRead(db,&keyobj); if (o == NULL) goto noobj;
if (o == NULL) return NULL;
if (fieldlen > 0) { if (fieldlen > 0) {
if (o->type != REDIS_HASH || fieldname.len < 1) return NULL; if (o->type != REDIS_HASH) goto noobj;
/* Retrieve value from hash by the field name. This operation /* Retrieve value from hash by the field name. This operation
* already increases the refcount of the returned object. */ * already increases the refcount of the returned object. */
initStaticStringObject(fieldobj,((char*)&fieldname)+(sizeof(struct sdshdr))); o = hashTypeGetObject(o, fieldobj);
o = hashTypeGetObject(o, &fieldobj);
} else { } else {
if (o->type != REDIS_STRING) return NULL; if (o->type != REDIS_STRING) goto noobj;
/* Every object that this function returns needs to have its refcount /* Every object that this function returns needs to have its refcount
* increased. sortCommand decreases it again. */ * increased. sortCommand decreases it again. */
incrRefCount(o); incrRefCount(o);
} }
decrRefCount(keyobj);
if (fieldlen) decrRefCount(fieldobj);
return o; return o;
noobj:
decrRefCount(keyobj);
if (fieldlen) decrRefCount(fieldobj);
return NULL;
} }
/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with