From 262d970a5018065f34a6357a4dd3c72aa8ec624c Mon Sep 17 00:00:00 2001 From: Masahiro Ide Date: Sat, 12 Oct 2024 13:28:42 +0900 Subject: [PATCH] Move prepareClientToWrite out of loop for HGETALL command (#1119) Similar to #860 but this is for HGETALL families (HGETALL/HKEYS/HVALS). This patch moves `prepareClientToWrite` out of the loop to reduce the function overhead. Signed-off-by: Masahiro Ide Co-authored-by: Madelyn Olson --- src/networking.c | 25 ++++++++++++++++--- src/server.h | 4 ++- src/t_hash.c | 65 ++++++++++++++++++++++++++---------------------- 3 files changed, 59 insertions(+), 35 deletions(-) diff --git a/src/networking.c b/src/networking.c index b18b798ea..c24a95019 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1043,7 +1043,8 @@ void addReplyArrayLen(client *c, long length) { addReplyAggregateLen(c, length, '*'); } -void addWritePreparedReplyArrayLen(writePreparedClient *c, long length) { +void addWritePreparedReplyArrayLen(writePreparedClient *wpc, long length) { + client *c = (client *)wpc; serverAssert(length >= 0); _addReplyLongLongWithPrefix(c, length, '*'); } @@ -1054,6 +1055,13 @@ void addReplyMapLen(client *c, long length) { addReplyAggregateLen(c, length, prefix); } +void addWritePreparedReplyMapLen(writePreparedClient *wpc, long length) { + client *c = (client *)wpc; + int prefix = c->resp == 2 ? '*' : '%'; + if (c->resp == 2) length *= 2; + _addReplyLongLongWithPrefix(c, length, prefix); +} + void addReplySetLen(client *c, long length) { int prefix = c->resp == 2 ? '*' : '~'; addReplyAggregateLen(c, length, prefix); @@ -1120,7 +1128,8 @@ void addReplyBulkCBuffer(client *c, const void *p, size_t len) { _addReplyToBufferOrList(c, "\r\n", 2); } -void addWritePreparedReplyBulkCBuffer(writePreparedClient *c, const void *p, size_t len) { +void addWritePreparedReplyBulkCBuffer(writePreparedClient *wpc, const void *p, size_t len) { + client *c = (client *)wpc; _addReplyLongLongWithPrefix(c, len, '$'); _addReplyToBufferOrList(c, p, len); _addReplyToBufferOrList(c, "\r\n", 2); @@ -1138,6 +1147,14 @@ void addReplyBulkSds(client *c, sds s) { _addReplyToBufferOrList(c, "\r\n", 2); } +void addWritePreparedReplyBulkSds(writePreparedClient *wpc, sds s) { + client *c = (client *)wpc; + _addReplyLongLongWithPrefix(c, sdslen(s), '$'); + _addReplyToBufferOrList(c, s, sdslen(s)); + sdsfree(s); + _addReplyToBufferOrList(c, "\r\n", 2); +} + /* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */ void setDeferredReplyBulkSds(client *c, void *node, sds s) { sds reply = sdscatprintf(sdsempty(), "$%d\r\n%s\r\n", (unsigned)sdslen(s), s); @@ -1164,12 +1181,12 @@ void addReplyBulkLongLong(client *c, long long ll) { addReplyBulkCBuffer(c, buf, len); } -void addWritePreparedReplyBulkLongLong(writePreparedClient *c, long long ll) { +void addWritePreparedReplyBulkLongLong(writePreparedClient *wpc, long long ll) { char buf[64]; int len; len = ll2string(buf, 64, ll); - addWritePreparedReplyBulkCBuffer(c, buf, len); + addWritePreparedReplyBulkCBuffer(wpc, buf, len); } /* Reply with a verbatim type having the specified extension. diff --git a/src/server.h b/src/server.h index 29d675bb1..44ba429b1 100644 --- a/src/server.h +++ b/src/server.h @@ -1379,7 +1379,7 @@ typedef struct client { * skip certain types of initialization. This type is used to indicate a client that has been initialized * and can be used with addWritePreparedReply* functions. A client can be cast into this type with * prepareClientForFutureWrites(client *c). */ -typedef client writePreparedClient; +typedef struct writePreparedClient writePreparedClient; /* ACL information */ typedef struct aclInfo { @@ -2789,6 +2789,7 @@ void addReply(client *c, robj *obj); void addReplyStatusLength(client *c, const char *s, size_t len); void addReplySds(client *c, sds s); void addReplyBulkSds(client *c, sds s); +void addWritePreparedReplyBulkSds(writePreparedClient *c, sds s); void setDeferredReplyBulkSds(client *c, void *node, sds s); void addReplyErrorObject(client *c, robj *err); void addReplyOrErrorObject(client *c, robj *reply); @@ -2808,6 +2809,7 @@ void addReplyLongLong(client *c, long long ll); void addReplyArrayLen(client *c, long length); void addWritePreparedReplyArrayLen(writePreparedClient *c, long length); void addReplyMapLen(client *c, long length); +void addWritePreparedReplyMapLen(writePreparedClient *c, long length); void addReplySetLen(client *c, long length); void addReplyAttributeLen(client *c, long length); void addReplyPushLen(client *c, long length); diff --git a/src/t_hash.c b/src/t_hash.c index 96252bc39..dabe27980 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -788,7 +788,7 @@ void hstrlenCommand(client *c) { addReplyLongLong(c, hashTypeGetValueLength(o, c->argv[2]->ptr)); } -static void addHashIteratorCursorToReply(client *c, hashTypeIterator *hi, int what) { +static void addHashIteratorCursorToReply(writePreparedClient *wpc, hashTypeIterator *hi, int what) { if (hi->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; @@ -796,12 +796,12 @@ static void addHashIteratorCursorToReply(client *c, hashTypeIterator *hi, int wh hashTypeCurrentFromListpack(hi, what, &vstr, &vlen, &vll); if (vstr) - addReplyBulkCBuffer(c, vstr, vlen); + addWritePreparedReplyBulkCBuffer(wpc, vstr, vlen); else - addReplyBulkLongLong(c, vll); + addWritePreparedReplyBulkLongLong(wpc, vll); } else if (hi->encoding == OBJ_ENCODING_HT) { sds value = hashTypeCurrentFromHashTable(hi, what); - addReplyBulkCBuffer(c, value, sdslen(value)); + addWritePreparedReplyBulkCBuffer(wpc, value, sdslen(value)); } else { serverPanic("Unknown hash encoding"); } @@ -815,23 +815,25 @@ void genericHgetallCommand(client *c, int flags) { robj *emptyResp = (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) ? shared.emptymap[c->resp] : shared.emptyarray; if ((o = lookupKeyReadOrReply(c, c->argv[1], emptyResp)) == NULL || checkType(c, o, OBJ_HASH)) return; + writePreparedClient *wpc = prepareClientForFutureWrites(c); + if (!wpc) return; /* We return a map if the user requested keys and values, like in the * HGETALL case. Otherwise to use a flat array makes more sense. */ length = hashTypeLength(o); if (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) { - addReplyMapLen(c, length); + addWritePreparedReplyMapLen(wpc, length); } else { - addReplyArrayLen(c, length); + addWritePreparedReplyArrayLen(wpc, length); } hashTypeInitIterator(o, &hi); while (hashTypeNext(&hi) != C_ERR) { if (flags & OBJ_HASH_KEY) { - addHashIteratorCursorToReply(c, &hi, OBJ_HASH_KEY); + addHashIteratorCursorToReply(wpc, &hi, OBJ_HASH_KEY); count++; } if (flags & OBJ_HASH_VALUE) { - addHashIteratorCursorToReply(c, &hi, OBJ_HASH_VALUE); + addHashIteratorCursorToReply(wpc, &hi, OBJ_HASH_VALUE); count++; } } @@ -871,18 +873,19 @@ void hscanCommand(client *c) { scanGenericCommand(c, o, cursor); } -static void hrandfieldReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) { +static void hrandfieldReplyWithListpack(writePreparedClient *wpc, unsigned int count, listpackEntry *keys, listpackEntry *vals) { + client *c = (client *)wpc; for (unsigned long i = 0; i < count; i++) { - if (vals && c->resp > 2) addReplyArrayLen(c, 2); + if (vals && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2); if (keys[i].sval) - addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen); + addWritePreparedReplyBulkCBuffer(wpc, keys[i].sval, keys[i].slen); else - addReplyBulkLongLong(c, keys[i].lval); + addWritePreparedReplyBulkLongLong(wpc, keys[i].lval); if (vals) { if (vals[i].sval) - addReplyBulkCBuffer(c, vals[i].sval, vals[i].slen); + addWritePreparedReplyBulkCBuffer(wpc, vals[i].sval, vals[i].slen); else - addReplyBulkLongLong(c, vals[i].lval); + addWritePreparedReplyBulkLongLong(wpc, vals[i].lval); } } } @@ -918,6 +921,8 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { return; } + writePreparedClient *wpc = prepareClientForFutureWrites(c); + if (!wpc) return; /* CASE 1: The count was negative, so the extraction method is just: * "return N random elements" sampling the whole set every time. * This case is trivial and can be served without auxiliary data @@ -925,18 +930,18 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { * elements in random order. */ if (!uniq || count == 1) { if (withvalues && c->resp == 2) - addReplyArrayLen(c, count * 2); + addWritePreparedReplyArrayLen(wpc, count * 2); else - addReplyArrayLen(c, count); + addWritePreparedReplyArrayLen(wpc, count); if (hash->encoding == OBJ_ENCODING_HT) { sds key, value; while (count--) { dictEntry *de = dictGetFairRandomKey(hash->ptr); key = dictGetKey(de); value = dictGetVal(de); - if (withvalues && c->resp > 2) addReplyArrayLen(c, 2); - addReplyBulkCBuffer(c, key, sdslen(key)); - if (withvalues) addReplyBulkCBuffer(c, value, sdslen(value)); + if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2); + addWritePreparedReplyBulkCBuffer(wpc, key, sdslen(key)); + if (withvalues) addWritePreparedReplyBulkCBuffer(wpc, value, sdslen(value)); if (c->flag.close_asap) break; } } else if (hash->encoding == OBJ_ENCODING_LISTPACK) { @@ -950,7 +955,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { sample_count = count > limit ? limit : count; count -= sample_count; lpRandomPairs(hash->ptr, sample_count, keys, vals); - hrandfieldReplyWithListpack(c, sample_count, keys, vals); + hrandfieldReplyWithListpack(wpc, sample_count, keys, vals); if (c->flag.close_asap) break; } zfree(keys); @@ -962,9 +967,9 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { /* Initiate reply count, RESP3 responds with nested array, RESP2 with flat one. */ long reply_size = count < size ? count : size; if (withvalues && c->resp == 2) - addReplyArrayLen(c, reply_size * 2); + addWritePreparedReplyArrayLen(wpc, reply_size * 2); else - addReplyArrayLen(c, reply_size); + addWritePreparedReplyArrayLen(wpc, reply_size); /* CASE 2: * The number of requested elements is greater than the number of @@ -973,9 +978,9 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { hashTypeIterator hi; hashTypeInitIterator(hash, &hi); while (hashTypeNext(&hi) != C_ERR) { - if (withvalues && c->resp > 2) addReplyArrayLen(c, 2); - addHashIteratorCursorToReply(c, &hi, OBJ_HASH_KEY); - if (withvalues) addHashIteratorCursorToReply(c, &hi, OBJ_HASH_VALUE); + if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2); + addHashIteratorCursorToReply(wpc, &hi, OBJ_HASH_KEY); + if (withvalues) addHashIteratorCursorToReply(wpc, &hi, OBJ_HASH_VALUE); } hashTypeResetIterator(&hi); return; @@ -994,7 +999,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { keys = zmalloc(sizeof(listpackEntry) * count); if (withvalues) vals = zmalloc(sizeof(listpackEntry) * count); serverAssert(lpRandomPairsUnique(hash->ptr, count, keys, vals) == count); - hrandfieldReplyWithListpack(c, count, keys, vals); + hrandfieldReplyWithListpack(wpc, count, keys, vals); zfree(keys); zfree(vals); return; @@ -1048,9 +1053,9 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { while ((de = dictNext(di)) != NULL) { sds key = dictGetKey(de); sds value = dictGetVal(de); - if (withvalues && c->resp > 2) addReplyArrayLen(c, 2); - addReplyBulkSds(c, key); - if (withvalues) addReplyBulkSds(c, value); + if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2); + addWritePreparedReplyBulkSds(wpc, key); + if (withvalues) addWritePreparedReplyBulkSds(wpc, value); } dictReleaseIterator(di); @@ -1081,7 +1086,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { added++; /* We can reply right away, so that we don't need to store the value in the dict. */ - if (withvalues && c->resp > 2) addReplyArrayLen(c, 2); + if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2); hashReplyFromListpackEntry(c, &key); if (withvalues) hashReplyFromListpackEntry(c, &value); }