From 076bf6605fbf2b4b95637bdd4b14a9571139c2fc Mon Sep 17 00:00:00 2001 From: Lipeng Zhu Date: Wed, 28 Aug 2024 10:11:09 +0800 Subject: [PATCH] Move prepareClientToWrite out of loop for lrange command to reduce the redundant call. (#860) ## Description When I explore the cycles distributions for `lrange` test ( `valkey-benchmark -p 9001 -t lrange -d 100 -r 1000000 -n 1000000 -c 50 --threads 4`). I found the `prepareClientToWrite` and `clientHasPendingReplies` could be reduced to single call outside instead of called in a loop, ideally we can gain 3% performance. The corresponding `LRANG_100`, `LRANG_300`, `LRANGE_500`, `LRANGE_600` have ~2% - 3% performance boost, the benchmark test prove it helps. This patch try to move the `prepareClientToWrite` and its child `clientHasPendingReplies` out of the loop to reduce the function overhead. --------- Signed-off-by: Lipeng Zhu --- src/networking.c | 29 +++++++++++++++++++++++++++++ src/server.h | 10 ++++++++++ src/t_list.c | 19 +++++++++++-------- 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/src/networking.c b/src/networking.c index 6c1548703..aa1ae4e5e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -910,6 +910,16 @@ void setDeferredPushLen(client *c, void *node, long length) { setDeferredAggregateLen(c, node, length, '>'); } +/* Prepare a client for future writes. This is used so that we can + * skip a large number of calls to prepareClientToWrite when + * a command produces a lot of discrete elements in its output. */ +writePreparedClient *prepareClientForFutureWrites(client *c) { + if (prepareClientToWrite(c) == C_OK) { + return (writePreparedClient *)c; + } + return NULL; +} + /* Add a double as a bulk reply */ void addReplyDouble(client *c, double d) { if (c->resp == 3) { @@ -1026,6 +1036,11 @@ void addReplyArrayLen(client *c, long length) { addReplyAggregateLen(c, length, '*'); } +void addWritePreparedReplyArrayLen(writePreparedClient *c, long length) { + serverAssert(length >= 0); + _addReplyLongLongWithPrefix(c, length, '*'); +} + void addReplyMapLen(client *c, long length) { int prefix = c->resp == 2 ? '*' : '%'; if (c->resp == 2) length *= 2; @@ -1098,6 +1113,12 @@ void addReplyBulkCBuffer(client *c, const void *p, size_t len) { _addReplyToBufferOrList(c, "\r\n", 2); } +void addWritePreparedReplyBulkCBuffer(writePreparedClient *c, const void *p, size_t len) { + _addReplyLongLongWithPrefix(c, len, '$'); + _addReplyToBufferOrList(c, p, len); + _addReplyToBufferOrList(c, "\r\n", 2); +} + /* Add sds to reply (takes ownership of sds and frees it) */ void addReplyBulkSds(client *c, sds s) { if (prepareClientToWrite(c) != C_OK) { @@ -1136,6 +1157,14 @@ void addReplyBulkLongLong(client *c, long long ll) { addReplyBulkCBuffer(c, buf, len); } +void addWritePreparedReplyBulkLongLong(writePreparedClient *c, long long ll) { + char buf[64]; + int len; + + len = ll2string(buf, 64, ll); + addWritePreparedReplyBulkCBuffer(c, buf, len); +} + /* Reply with a verbatim type having the specified extension. * * The 'ext' is the "extension" of the file, actually just a three diff --git a/src/server.h b/src/server.h index f7d4912b4..fc806968d 100644 --- a/src/server.h +++ b/src/server.h @@ -1390,6 +1390,12 @@ typedef struct client { net_output_bytes_curr_cmd; /* Total network output bytes sent to this client, by the current command. */ } client; +/* When a command generates a lot of discrete elements to the client output buffer, it is much faster to + * skip certain types of initialization. This type is used to indicate a client that has been initialized + * and can be used with addWritePreparedReply* functions. A client can be cast into this type with + * prepareClientForFutureWrites(client *c). */ +typedef client writePreparedClient; + /* ACL information */ typedef struct aclInfo { long long user_auth_failures; /* Auth failure counts on user level */ @@ -2779,6 +2785,7 @@ int processInputBuffer(client *c); void acceptCommonHandler(connection *conn, struct ClientFlags flags, char *ip); void readQueryFromClient(connection *conn); int prepareClientToWrite(client *c); +writePreparedClient *prepareClientForFutureWrites(client *c); void addReplyNull(client *c); void addReplyNullArray(client *c); void addReplyBool(client *c, int b); @@ -2788,7 +2795,9 @@ void AddReplyFromClient(client *c, client *src); void addReplyBulk(client *c, robj *obj); void addReplyBulkCString(client *c, const char *s); void addReplyBulkCBuffer(client *c, const void *p, size_t len); +void addWritePreparedReplyBulkCBuffer(writePreparedClient *c, const void *p, size_t len); void addReplyBulkLongLong(client *c, long long ll); +void addWritePreparedReplyBulkLongLong(writePreparedClient *c, long long ll); void addReply(client *c, robj *obj); void addReplyStatusLength(client *c, const char *s, size_t len); void addReplySds(client *c, sds s); @@ -2810,6 +2819,7 @@ void addReplyBigNum(client *c, const char *num, size_t len); void addReplyHumanLongDouble(client *c, long double d); void addReplyLongLong(client *c, long long ll); void addReplyArrayLen(client *c, long length); +void addWritePreparedReplyArrayLen(writePreparedClient *c, long length); void addReplyMapLen(client *c, long length); void addReplySetLen(client *c, long length); void addReplyAttributeLen(client *c, long length); diff --git a/src/t_list.c b/src/t_list.c index f5df563e4..ffe3e9b08 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -650,8 +650,10 @@ void listPopRangeAndReplyWithKey(client *c, robj *o, robj *key, int where, long * Note that the purpose is to make the methods small so that the * code in the loop can be inlined better to improve performance. */ void addListQuicklistRangeReply(client *c, robj *o, int from, int rangelen, int reverse) { + writePreparedClient *wpc = prepareClientForFutureWrites(c); + if (!wpc) return; /* Return the result in form of a multi-bulk reply */ - addReplyArrayLen(c, rangelen); + addWritePreparedReplyArrayLen(wpc, rangelen); int direction = reverse ? AL_START_TAIL : AL_START_HEAD; quicklistIter *iter = quicklistGetIteratorAtIdx(o->ptr, direction, from); @@ -659,9 +661,9 @@ void addListQuicklistRangeReply(client *c, robj *o, int from, int rangelen, int quicklistEntry qe; serverAssert(quicklistNext(iter, &qe)); /* fail on corrupt data */ if (qe.value) { - addReplyBulkCBuffer(c, qe.value, qe.sz); + addWritePreparedReplyBulkCBuffer(wpc, qe.value, qe.sz); } else { - addReplyBulkLongLong(c, qe.longval); + addWritePreparedReplyBulkLongLong(wpc, qe.longval); } } quicklistReleaseIterator(iter); @@ -671,21 +673,22 @@ void addListQuicklistRangeReply(client *c, robj *o, int from, int rangelen, int * Note that the purpose is to make the methods small so that the * code in the loop can be inlined better to improve performance. */ void addListListpackRangeReply(client *c, robj *o, int from, int rangelen, int reverse) { + writePreparedClient *wpc = prepareClientForFutureWrites(c); + if (!wpc) return; + /* Return the result in form of a multi-bulk reply */ + addWritePreparedReplyArrayLen(wpc, rangelen); unsigned char *p = lpSeek(o->ptr, from); unsigned char *vstr; unsigned int vlen; long long lval; - /* Return the result in form of a multi-bulk reply */ - addReplyArrayLen(c, rangelen); - while (rangelen--) { serverAssert(p); /* fail on corrupt data */ vstr = lpGetValue(p, &vlen, &lval); if (vstr) { - addReplyBulkCBuffer(c, vstr, vlen); + addWritePreparedReplyBulkCBuffer(wpc, vstr, vlen); } else { - addReplyBulkLongLong(c, lval); + addWritePreparedReplyBulkLongLong(wpc, lval); } p = reverse ? lpPrev(o->ptr, p) : lpNext(o->ptr, p); }