Move prepareClientToWrite out of loop for lrange command to reduce the redundant call. (#860)
## Description When I explore the cycles distributions for `lrange` test ( `valkey-benchmark -p 9001 -t lrange -d 100 -r 1000000 -n 1000000 -c 50 --threads 4`). I found the `prepareClientToWrite` and `clientHasPendingReplies` could be reduced to single call outside instead of called in a loop, ideally we can gain 3% performance. The corresponding `LRANG_100`, `LRANG_300`, `LRANGE_500`, `LRANGE_600` have ~2% - 3% performance boost, the benchmark test prove it helps. This patch try to move the `prepareClientToWrite` and its child `clientHasPendingReplies` out of the loop to reduce the function overhead. --------- Signed-off-by: Lipeng Zhu <lipeng.zhu@intel.com>
This commit is contained in:
parent
6a84e06b05
commit
076bf6605f
@ -910,6 +910,16 @@ void setDeferredPushLen(client *c, void *node, long length) {
|
||||
setDeferredAggregateLen(c, node, length, '>');
|
||||
}
|
||||
|
||||
/* Prepare a client for future writes. This is used so that we can
|
||||
* skip a large number of calls to prepareClientToWrite when
|
||||
* a command produces a lot of discrete elements in its output. */
|
||||
writePreparedClient *prepareClientForFutureWrites(client *c) {
|
||||
if (prepareClientToWrite(c) == C_OK) {
|
||||
return (writePreparedClient *)c;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Add a double as a bulk reply */
|
||||
void addReplyDouble(client *c, double d) {
|
||||
if (c->resp == 3) {
|
||||
@ -1026,6 +1036,11 @@ void addReplyArrayLen(client *c, long length) {
|
||||
addReplyAggregateLen(c, length, '*');
|
||||
}
|
||||
|
||||
void addWritePreparedReplyArrayLen(writePreparedClient *c, long length) {
|
||||
serverAssert(length >= 0);
|
||||
_addReplyLongLongWithPrefix(c, length, '*');
|
||||
}
|
||||
|
||||
void addReplyMapLen(client *c, long length) {
|
||||
int prefix = c->resp == 2 ? '*' : '%';
|
||||
if (c->resp == 2) length *= 2;
|
||||
@ -1098,6 +1113,12 @@ void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
|
||||
_addReplyToBufferOrList(c, "\r\n", 2);
|
||||
}
|
||||
|
||||
void addWritePreparedReplyBulkCBuffer(writePreparedClient *c, const void *p, size_t len) {
|
||||
_addReplyLongLongWithPrefix(c, len, '$');
|
||||
_addReplyToBufferOrList(c, p, len);
|
||||
_addReplyToBufferOrList(c, "\r\n", 2);
|
||||
}
|
||||
|
||||
/* Add sds to reply (takes ownership of sds and frees it) */
|
||||
void addReplyBulkSds(client *c, sds s) {
|
||||
if (prepareClientToWrite(c) != C_OK) {
|
||||
@ -1136,6 +1157,14 @@ void addReplyBulkLongLong(client *c, long long ll) {
|
||||
addReplyBulkCBuffer(c, buf, len);
|
||||
}
|
||||
|
||||
void addWritePreparedReplyBulkLongLong(writePreparedClient *c, long long ll) {
|
||||
char buf[64];
|
||||
int len;
|
||||
|
||||
len = ll2string(buf, 64, ll);
|
||||
addWritePreparedReplyBulkCBuffer(c, buf, len);
|
||||
}
|
||||
|
||||
/* Reply with a verbatim type having the specified extension.
|
||||
*
|
||||
* The 'ext' is the "extension" of the file, actually just a three
|
||||
|
10
src/server.h
10
src/server.h
@ -1390,6 +1390,12 @@ typedef struct client {
|
||||
net_output_bytes_curr_cmd; /* Total network output bytes sent to this client, by the current command. */
|
||||
} client;
|
||||
|
||||
/* When a command generates a lot of discrete elements to the client output buffer, it is much faster to
|
||||
* skip certain types of initialization. This type is used to indicate a client that has been initialized
|
||||
* and can be used with addWritePreparedReply* functions. A client can be cast into this type with
|
||||
* prepareClientForFutureWrites(client *c). */
|
||||
typedef client writePreparedClient;
|
||||
|
||||
/* ACL information */
|
||||
typedef struct aclInfo {
|
||||
long long user_auth_failures; /* Auth failure counts on user level */
|
||||
@ -2779,6 +2785,7 @@ int processInputBuffer(client *c);
|
||||
void acceptCommonHandler(connection *conn, struct ClientFlags flags, char *ip);
|
||||
void readQueryFromClient(connection *conn);
|
||||
int prepareClientToWrite(client *c);
|
||||
writePreparedClient *prepareClientForFutureWrites(client *c);
|
||||
void addReplyNull(client *c);
|
||||
void addReplyNullArray(client *c);
|
||||
void addReplyBool(client *c, int b);
|
||||
@ -2788,7 +2795,9 @@ void AddReplyFromClient(client *c, client *src);
|
||||
void addReplyBulk(client *c, robj *obj);
|
||||
void addReplyBulkCString(client *c, const char *s);
|
||||
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
|
||||
void addWritePreparedReplyBulkCBuffer(writePreparedClient *c, const void *p, size_t len);
|
||||
void addReplyBulkLongLong(client *c, long long ll);
|
||||
void addWritePreparedReplyBulkLongLong(writePreparedClient *c, long long ll);
|
||||
void addReply(client *c, robj *obj);
|
||||
void addReplyStatusLength(client *c, const char *s, size_t len);
|
||||
void addReplySds(client *c, sds s);
|
||||
@ -2810,6 +2819,7 @@ void addReplyBigNum(client *c, const char *num, size_t len);
|
||||
void addReplyHumanLongDouble(client *c, long double d);
|
||||
void addReplyLongLong(client *c, long long ll);
|
||||
void addReplyArrayLen(client *c, long length);
|
||||
void addWritePreparedReplyArrayLen(writePreparedClient *c, long length);
|
||||
void addReplyMapLen(client *c, long length);
|
||||
void addReplySetLen(client *c, long length);
|
||||
void addReplyAttributeLen(client *c, long length);
|
||||
|
19
src/t_list.c
19
src/t_list.c
@ -650,8 +650,10 @@ void listPopRangeAndReplyWithKey(client *c, robj *o, robj *key, int where, long
|
||||
* Note that the purpose is to make the methods small so that the
|
||||
* code in the loop can be inlined better to improve performance. */
|
||||
void addListQuicklistRangeReply(client *c, robj *o, int from, int rangelen, int reverse) {
|
||||
writePreparedClient *wpc = prepareClientForFutureWrites(c);
|
||||
if (!wpc) return;
|
||||
/* Return the result in form of a multi-bulk reply */
|
||||
addReplyArrayLen(c, rangelen);
|
||||
addWritePreparedReplyArrayLen(wpc, rangelen);
|
||||
|
||||
int direction = reverse ? AL_START_TAIL : AL_START_HEAD;
|
||||
quicklistIter *iter = quicklistGetIteratorAtIdx(o->ptr, direction, from);
|
||||
@ -659,9 +661,9 @@ void addListQuicklistRangeReply(client *c, robj *o, int from, int rangelen, int
|
||||
quicklistEntry qe;
|
||||
serverAssert(quicklistNext(iter, &qe)); /* fail on corrupt data */
|
||||
if (qe.value) {
|
||||
addReplyBulkCBuffer(c, qe.value, qe.sz);
|
||||
addWritePreparedReplyBulkCBuffer(wpc, qe.value, qe.sz);
|
||||
} else {
|
||||
addReplyBulkLongLong(c, qe.longval);
|
||||
addWritePreparedReplyBulkLongLong(wpc, qe.longval);
|
||||
}
|
||||
}
|
||||
quicklistReleaseIterator(iter);
|
||||
@ -671,21 +673,22 @@ void addListQuicklistRangeReply(client *c, robj *o, int from, int rangelen, int
|
||||
* Note that the purpose is to make the methods small so that the
|
||||
* code in the loop can be inlined better to improve performance. */
|
||||
void addListListpackRangeReply(client *c, robj *o, int from, int rangelen, int reverse) {
|
||||
writePreparedClient *wpc = prepareClientForFutureWrites(c);
|
||||
if (!wpc) return;
|
||||
/* Return the result in form of a multi-bulk reply */
|
||||
addWritePreparedReplyArrayLen(wpc, rangelen);
|
||||
unsigned char *p = lpSeek(o->ptr, from);
|
||||
unsigned char *vstr;
|
||||
unsigned int vlen;
|
||||
long long lval;
|
||||
|
||||
/* Return the result in form of a multi-bulk reply */
|
||||
addReplyArrayLen(c, rangelen);
|
||||
|
||||
while (rangelen--) {
|
||||
serverAssert(p); /* fail on corrupt data */
|
||||
vstr = lpGetValue(p, &vlen, &lval);
|
||||
if (vstr) {
|
||||
addReplyBulkCBuffer(c, vstr, vlen);
|
||||
addWritePreparedReplyBulkCBuffer(wpc, vstr, vlen);
|
||||
} else {
|
||||
addReplyBulkLongLong(c, lval);
|
||||
addWritePreparedReplyBulkLongLong(wpc, lval);
|
||||
}
|
||||
p = reverse ? lpPrev(o->ptr, p) : lpNext(o->ptr, p);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user