Move prepareClientToWrite out of loop for HGETALL command (#1119)
Similar to #860 but this is for HGETALL families (HGETALL/HKEYS/HVALS). This patch moves `prepareClientToWrite` out of the loop to reduce the function overhead. Signed-off-by: Masahiro Ide <imasahiro9@gmail.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
This commit is contained in:
parent
ef971a34eb
commit
262d970a50
@ -1043,7 +1043,8 @@ void addReplyArrayLen(client *c, long length) {
|
|||||||
addReplyAggregateLen(c, length, '*');
|
addReplyAggregateLen(c, length, '*');
|
||||||
}
|
}
|
||||||
|
|
||||||
void addWritePreparedReplyArrayLen(writePreparedClient *c, long length) {
|
void addWritePreparedReplyArrayLen(writePreparedClient *wpc, long length) {
|
||||||
|
client *c = (client *)wpc;
|
||||||
serverAssert(length >= 0);
|
serverAssert(length >= 0);
|
||||||
_addReplyLongLongWithPrefix(c, length, '*');
|
_addReplyLongLongWithPrefix(c, length, '*');
|
||||||
}
|
}
|
||||||
@ -1054,6 +1055,13 @@ void addReplyMapLen(client *c, long length) {
|
|||||||
addReplyAggregateLen(c, length, prefix);
|
addReplyAggregateLen(c, length, prefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addWritePreparedReplyMapLen(writePreparedClient *wpc, long length) {
|
||||||
|
client *c = (client *)wpc;
|
||||||
|
int prefix = c->resp == 2 ? '*' : '%';
|
||||||
|
if (c->resp == 2) length *= 2;
|
||||||
|
_addReplyLongLongWithPrefix(c, length, prefix);
|
||||||
|
}
|
||||||
|
|
||||||
void addReplySetLen(client *c, long length) {
|
void addReplySetLen(client *c, long length) {
|
||||||
int prefix = c->resp == 2 ? '*' : '~';
|
int prefix = c->resp == 2 ? '*' : '~';
|
||||||
addReplyAggregateLen(c, length, prefix);
|
addReplyAggregateLen(c, length, prefix);
|
||||||
@ -1120,7 +1128,8 @@ void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
|
|||||||
_addReplyToBufferOrList(c, "\r\n", 2);
|
_addReplyToBufferOrList(c, "\r\n", 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addWritePreparedReplyBulkCBuffer(writePreparedClient *c, const void *p, size_t len) {
|
void addWritePreparedReplyBulkCBuffer(writePreparedClient *wpc, const void *p, size_t len) {
|
||||||
|
client *c = (client *)wpc;
|
||||||
_addReplyLongLongWithPrefix(c, len, '$');
|
_addReplyLongLongWithPrefix(c, len, '$');
|
||||||
_addReplyToBufferOrList(c, p, len);
|
_addReplyToBufferOrList(c, p, len);
|
||||||
_addReplyToBufferOrList(c, "\r\n", 2);
|
_addReplyToBufferOrList(c, "\r\n", 2);
|
||||||
@ -1138,6 +1147,14 @@ void addReplyBulkSds(client *c, sds s) {
|
|||||||
_addReplyToBufferOrList(c, "\r\n", 2);
|
_addReplyToBufferOrList(c, "\r\n", 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addWritePreparedReplyBulkSds(writePreparedClient *wpc, sds s) {
|
||||||
|
client *c = (client *)wpc;
|
||||||
|
_addReplyLongLongWithPrefix(c, sdslen(s), '$');
|
||||||
|
_addReplyToBufferOrList(c, s, sdslen(s));
|
||||||
|
sdsfree(s);
|
||||||
|
_addReplyToBufferOrList(c, "\r\n", 2);
|
||||||
|
}
|
||||||
|
|
||||||
/* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */
|
/* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */
|
||||||
void setDeferredReplyBulkSds(client *c, void *node, sds s) {
|
void setDeferredReplyBulkSds(client *c, void *node, sds s) {
|
||||||
sds reply = sdscatprintf(sdsempty(), "$%d\r\n%s\r\n", (unsigned)sdslen(s), s);
|
sds reply = sdscatprintf(sdsempty(), "$%d\r\n%s\r\n", (unsigned)sdslen(s), s);
|
||||||
@ -1164,12 +1181,12 @@ void addReplyBulkLongLong(client *c, long long ll) {
|
|||||||
addReplyBulkCBuffer(c, buf, len);
|
addReplyBulkCBuffer(c, buf, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addWritePreparedReplyBulkLongLong(writePreparedClient *c, long long ll) {
|
void addWritePreparedReplyBulkLongLong(writePreparedClient *wpc, long long ll) {
|
||||||
char buf[64];
|
char buf[64];
|
||||||
int len;
|
int len;
|
||||||
|
|
||||||
len = ll2string(buf, 64, ll);
|
len = ll2string(buf, 64, ll);
|
||||||
addWritePreparedReplyBulkCBuffer(c, buf, len);
|
addWritePreparedReplyBulkCBuffer(wpc, buf, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Reply with a verbatim type having the specified extension.
|
/* Reply with a verbatim type having the specified extension.
|
||||||
|
@ -1379,7 +1379,7 @@ typedef struct client {
|
|||||||
* skip certain types of initialization. This type is used to indicate a client that has been initialized
|
* skip certain types of initialization. This type is used to indicate a client that has been initialized
|
||||||
* and can be used with addWritePreparedReply* functions. A client can be cast into this type with
|
* and can be used with addWritePreparedReply* functions. A client can be cast into this type with
|
||||||
* prepareClientForFutureWrites(client *c). */
|
* prepareClientForFutureWrites(client *c). */
|
||||||
typedef client writePreparedClient;
|
typedef struct writePreparedClient writePreparedClient;
|
||||||
|
|
||||||
/* ACL information */
|
/* ACL information */
|
||||||
typedef struct aclInfo {
|
typedef struct aclInfo {
|
||||||
@ -2789,6 +2789,7 @@ void addReply(client *c, robj *obj);
|
|||||||
void addReplyStatusLength(client *c, const char *s, size_t len);
|
void addReplyStatusLength(client *c, const char *s, size_t len);
|
||||||
void addReplySds(client *c, sds s);
|
void addReplySds(client *c, sds s);
|
||||||
void addReplyBulkSds(client *c, sds s);
|
void addReplyBulkSds(client *c, sds s);
|
||||||
|
void addWritePreparedReplyBulkSds(writePreparedClient *c, sds s);
|
||||||
void setDeferredReplyBulkSds(client *c, void *node, sds s);
|
void setDeferredReplyBulkSds(client *c, void *node, sds s);
|
||||||
void addReplyErrorObject(client *c, robj *err);
|
void addReplyErrorObject(client *c, robj *err);
|
||||||
void addReplyOrErrorObject(client *c, robj *reply);
|
void addReplyOrErrorObject(client *c, robj *reply);
|
||||||
@ -2808,6 +2809,7 @@ void addReplyLongLong(client *c, long long ll);
|
|||||||
void addReplyArrayLen(client *c, long length);
|
void addReplyArrayLen(client *c, long length);
|
||||||
void addWritePreparedReplyArrayLen(writePreparedClient *c, long length);
|
void addWritePreparedReplyArrayLen(writePreparedClient *c, long length);
|
||||||
void addReplyMapLen(client *c, long length);
|
void addReplyMapLen(client *c, long length);
|
||||||
|
void addWritePreparedReplyMapLen(writePreparedClient *c, long length);
|
||||||
void addReplySetLen(client *c, long length);
|
void addReplySetLen(client *c, long length);
|
||||||
void addReplyAttributeLen(client *c, long length);
|
void addReplyAttributeLen(client *c, long length);
|
||||||
void addReplyPushLen(client *c, long length);
|
void addReplyPushLen(client *c, long length);
|
||||||
|
65
src/t_hash.c
65
src/t_hash.c
@ -788,7 +788,7 @@ void hstrlenCommand(client *c) {
|
|||||||
addReplyLongLong(c, hashTypeGetValueLength(o, c->argv[2]->ptr));
|
addReplyLongLong(c, hashTypeGetValueLength(o, c->argv[2]->ptr));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void addHashIteratorCursorToReply(client *c, hashTypeIterator *hi, int what) {
|
static void addHashIteratorCursorToReply(writePreparedClient *wpc, hashTypeIterator *hi, int what) {
|
||||||
if (hi->encoding == OBJ_ENCODING_LISTPACK) {
|
if (hi->encoding == OBJ_ENCODING_LISTPACK) {
|
||||||
unsigned char *vstr = NULL;
|
unsigned char *vstr = NULL;
|
||||||
unsigned int vlen = UINT_MAX;
|
unsigned int vlen = UINT_MAX;
|
||||||
@ -796,12 +796,12 @@ static void addHashIteratorCursorToReply(client *c, hashTypeIterator *hi, int wh
|
|||||||
|
|
||||||
hashTypeCurrentFromListpack(hi, what, &vstr, &vlen, &vll);
|
hashTypeCurrentFromListpack(hi, what, &vstr, &vlen, &vll);
|
||||||
if (vstr)
|
if (vstr)
|
||||||
addReplyBulkCBuffer(c, vstr, vlen);
|
addWritePreparedReplyBulkCBuffer(wpc, vstr, vlen);
|
||||||
else
|
else
|
||||||
addReplyBulkLongLong(c, vll);
|
addWritePreparedReplyBulkLongLong(wpc, vll);
|
||||||
} else if (hi->encoding == OBJ_ENCODING_HT) {
|
} else if (hi->encoding == OBJ_ENCODING_HT) {
|
||||||
sds value = hashTypeCurrentFromHashTable(hi, what);
|
sds value = hashTypeCurrentFromHashTable(hi, what);
|
||||||
addReplyBulkCBuffer(c, value, sdslen(value));
|
addWritePreparedReplyBulkCBuffer(wpc, value, sdslen(value));
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown hash encoding");
|
serverPanic("Unknown hash encoding");
|
||||||
}
|
}
|
||||||
@ -815,23 +815,25 @@ void genericHgetallCommand(client *c, int flags) {
|
|||||||
robj *emptyResp = (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) ? shared.emptymap[c->resp] : shared.emptyarray;
|
robj *emptyResp = (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) ? shared.emptymap[c->resp] : shared.emptyarray;
|
||||||
if ((o = lookupKeyReadOrReply(c, c->argv[1], emptyResp)) == NULL || checkType(c, o, OBJ_HASH)) return;
|
if ((o = lookupKeyReadOrReply(c, c->argv[1], emptyResp)) == NULL || checkType(c, o, OBJ_HASH)) return;
|
||||||
|
|
||||||
|
writePreparedClient *wpc = prepareClientForFutureWrites(c);
|
||||||
|
if (!wpc) return;
|
||||||
/* We return a map if the user requested keys and values, like in the
|
/* We return a map if the user requested keys and values, like in the
|
||||||
* HGETALL case. Otherwise to use a flat array makes more sense. */
|
* HGETALL case. Otherwise to use a flat array makes more sense. */
|
||||||
length = hashTypeLength(o);
|
length = hashTypeLength(o);
|
||||||
if (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) {
|
if (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) {
|
||||||
addReplyMapLen(c, length);
|
addWritePreparedReplyMapLen(wpc, length);
|
||||||
} else {
|
} else {
|
||||||
addReplyArrayLen(c, length);
|
addWritePreparedReplyArrayLen(wpc, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
hashTypeInitIterator(o, &hi);
|
hashTypeInitIterator(o, &hi);
|
||||||
while (hashTypeNext(&hi) != C_ERR) {
|
while (hashTypeNext(&hi) != C_ERR) {
|
||||||
if (flags & OBJ_HASH_KEY) {
|
if (flags & OBJ_HASH_KEY) {
|
||||||
addHashIteratorCursorToReply(c, &hi, OBJ_HASH_KEY);
|
addHashIteratorCursorToReply(wpc, &hi, OBJ_HASH_KEY);
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
if (flags & OBJ_HASH_VALUE) {
|
if (flags & OBJ_HASH_VALUE) {
|
||||||
addHashIteratorCursorToReply(c, &hi, OBJ_HASH_VALUE);
|
addHashIteratorCursorToReply(wpc, &hi, OBJ_HASH_VALUE);
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -871,18 +873,19 @@ void hscanCommand(client *c) {
|
|||||||
scanGenericCommand(c, o, cursor);
|
scanGenericCommand(c, o, cursor);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void hrandfieldReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) {
|
static void hrandfieldReplyWithListpack(writePreparedClient *wpc, unsigned int count, listpackEntry *keys, listpackEntry *vals) {
|
||||||
|
client *c = (client *)wpc;
|
||||||
for (unsigned long i = 0; i < count; i++) {
|
for (unsigned long i = 0; i < count; i++) {
|
||||||
if (vals && c->resp > 2) addReplyArrayLen(c, 2);
|
if (vals && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2);
|
||||||
if (keys[i].sval)
|
if (keys[i].sval)
|
||||||
addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen);
|
addWritePreparedReplyBulkCBuffer(wpc, keys[i].sval, keys[i].slen);
|
||||||
else
|
else
|
||||||
addReplyBulkLongLong(c, keys[i].lval);
|
addWritePreparedReplyBulkLongLong(wpc, keys[i].lval);
|
||||||
if (vals) {
|
if (vals) {
|
||||||
if (vals[i].sval)
|
if (vals[i].sval)
|
||||||
addReplyBulkCBuffer(c, vals[i].sval, vals[i].slen);
|
addWritePreparedReplyBulkCBuffer(wpc, vals[i].sval, vals[i].slen);
|
||||||
else
|
else
|
||||||
addReplyBulkLongLong(c, vals[i].lval);
|
addWritePreparedReplyBulkLongLong(wpc, vals[i].lval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -918,6 +921,8 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
writePreparedClient *wpc = prepareClientForFutureWrites(c);
|
||||||
|
if (!wpc) return;
|
||||||
/* CASE 1: The count was negative, so the extraction method is just:
|
/* CASE 1: The count was negative, so the extraction method is just:
|
||||||
* "return N random elements" sampling the whole set every time.
|
* "return N random elements" sampling the whole set every time.
|
||||||
* This case is trivial and can be served without auxiliary data
|
* This case is trivial and can be served without auxiliary data
|
||||||
@ -925,18 +930,18 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
|
|||||||
* elements in random order. */
|
* elements in random order. */
|
||||||
if (!uniq || count == 1) {
|
if (!uniq || count == 1) {
|
||||||
if (withvalues && c->resp == 2)
|
if (withvalues && c->resp == 2)
|
||||||
addReplyArrayLen(c, count * 2);
|
addWritePreparedReplyArrayLen(wpc, count * 2);
|
||||||
else
|
else
|
||||||
addReplyArrayLen(c, count);
|
addWritePreparedReplyArrayLen(wpc, count);
|
||||||
if (hash->encoding == OBJ_ENCODING_HT) {
|
if (hash->encoding == OBJ_ENCODING_HT) {
|
||||||
sds key, value;
|
sds key, value;
|
||||||
while (count--) {
|
while (count--) {
|
||||||
dictEntry *de = dictGetFairRandomKey(hash->ptr);
|
dictEntry *de = dictGetFairRandomKey(hash->ptr);
|
||||||
key = dictGetKey(de);
|
key = dictGetKey(de);
|
||||||
value = dictGetVal(de);
|
value = dictGetVal(de);
|
||||||
if (withvalues && c->resp > 2) addReplyArrayLen(c, 2);
|
if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2);
|
||||||
addReplyBulkCBuffer(c, key, sdslen(key));
|
addWritePreparedReplyBulkCBuffer(wpc, key, sdslen(key));
|
||||||
if (withvalues) addReplyBulkCBuffer(c, value, sdslen(value));
|
if (withvalues) addWritePreparedReplyBulkCBuffer(wpc, value, sdslen(value));
|
||||||
if (c->flag.close_asap) break;
|
if (c->flag.close_asap) break;
|
||||||
}
|
}
|
||||||
} else if (hash->encoding == OBJ_ENCODING_LISTPACK) {
|
} else if (hash->encoding == OBJ_ENCODING_LISTPACK) {
|
||||||
@ -950,7 +955,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
|
|||||||
sample_count = count > limit ? limit : count;
|
sample_count = count > limit ? limit : count;
|
||||||
count -= sample_count;
|
count -= sample_count;
|
||||||
lpRandomPairs(hash->ptr, sample_count, keys, vals);
|
lpRandomPairs(hash->ptr, sample_count, keys, vals);
|
||||||
hrandfieldReplyWithListpack(c, sample_count, keys, vals);
|
hrandfieldReplyWithListpack(wpc, sample_count, keys, vals);
|
||||||
if (c->flag.close_asap) break;
|
if (c->flag.close_asap) break;
|
||||||
}
|
}
|
||||||
zfree(keys);
|
zfree(keys);
|
||||||
@ -962,9 +967,9 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
|
|||||||
/* Initiate reply count, RESP3 responds with nested array, RESP2 with flat one. */
|
/* Initiate reply count, RESP3 responds with nested array, RESP2 with flat one. */
|
||||||
long reply_size = count < size ? count : size;
|
long reply_size = count < size ? count : size;
|
||||||
if (withvalues && c->resp == 2)
|
if (withvalues && c->resp == 2)
|
||||||
addReplyArrayLen(c, reply_size * 2);
|
addWritePreparedReplyArrayLen(wpc, reply_size * 2);
|
||||||
else
|
else
|
||||||
addReplyArrayLen(c, reply_size);
|
addWritePreparedReplyArrayLen(wpc, reply_size);
|
||||||
|
|
||||||
/* CASE 2:
|
/* CASE 2:
|
||||||
* The number of requested elements is greater than the number of
|
* The number of requested elements is greater than the number of
|
||||||
@ -973,9 +978,9 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
|
|||||||
hashTypeIterator hi;
|
hashTypeIterator hi;
|
||||||
hashTypeInitIterator(hash, &hi);
|
hashTypeInitIterator(hash, &hi);
|
||||||
while (hashTypeNext(&hi) != C_ERR) {
|
while (hashTypeNext(&hi) != C_ERR) {
|
||||||
if (withvalues && c->resp > 2) addReplyArrayLen(c, 2);
|
if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2);
|
||||||
addHashIteratorCursorToReply(c, &hi, OBJ_HASH_KEY);
|
addHashIteratorCursorToReply(wpc, &hi, OBJ_HASH_KEY);
|
||||||
if (withvalues) addHashIteratorCursorToReply(c, &hi, OBJ_HASH_VALUE);
|
if (withvalues) addHashIteratorCursorToReply(wpc, &hi, OBJ_HASH_VALUE);
|
||||||
}
|
}
|
||||||
hashTypeResetIterator(&hi);
|
hashTypeResetIterator(&hi);
|
||||||
return;
|
return;
|
||||||
@ -994,7 +999,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
|
|||||||
keys = zmalloc(sizeof(listpackEntry) * count);
|
keys = zmalloc(sizeof(listpackEntry) * count);
|
||||||
if (withvalues) vals = zmalloc(sizeof(listpackEntry) * count);
|
if (withvalues) vals = zmalloc(sizeof(listpackEntry) * count);
|
||||||
serverAssert(lpRandomPairsUnique(hash->ptr, count, keys, vals) == count);
|
serverAssert(lpRandomPairsUnique(hash->ptr, count, keys, vals) == count);
|
||||||
hrandfieldReplyWithListpack(c, count, keys, vals);
|
hrandfieldReplyWithListpack(wpc, count, keys, vals);
|
||||||
zfree(keys);
|
zfree(keys);
|
||||||
zfree(vals);
|
zfree(vals);
|
||||||
return;
|
return;
|
||||||
@ -1048,9 +1053,9 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
|
|||||||
while ((de = dictNext(di)) != NULL) {
|
while ((de = dictNext(di)) != NULL) {
|
||||||
sds key = dictGetKey(de);
|
sds key = dictGetKey(de);
|
||||||
sds value = dictGetVal(de);
|
sds value = dictGetVal(de);
|
||||||
if (withvalues && c->resp > 2) addReplyArrayLen(c, 2);
|
if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2);
|
||||||
addReplyBulkSds(c, key);
|
addWritePreparedReplyBulkSds(wpc, key);
|
||||||
if (withvalues) addReplyBulkSds(c, value);
|
if (withvalues) addWritePreparedReplyBulkSds(wpc, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
@ -1081,7 +1086,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
|
|||||||
added++;
|
added++;
|
||||||
|
|
||||||
/* We can reply right away, so that we don't need to store the value in the dict. */
|
/* We can reply right away, so that we don't need to store the value in the dict. */
|
||||||
if (withvalues && c->resp > 2) addReplyArrayLen(c, 2);
|
if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2);
|
||||||
hashReplyFromListpackEntry(c, &key);
|
hashReplyFromListpackEntry(c, &key);
|
||||||
if (withvalues) hashReplyFromListpackEntry(c, &value);
|
if (withvalues) hashReplyFromListpackEntry(c, &value);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user