From 78bef6e1fe4b69e9cca6a922911bd88a92584edb Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" <276441700@qq.com> Date: Fri, 25 Mar 2022 10:45:40 +0800 Subject: [PATCH] optimize(remove) usage of client's pending_querybuf (#10413) To remove `pending_querybuf`, the key point is reusing `querybuf`, it means master client's `querybuf` is not only used to parse command, but also proxy to sub-replicas. 1. add a new variable `repl_applied` for master client to record how many data applied (propagated via `replicationFeedStreamFromMasterStream()`) but not trimmed in `querybuf`. 2. don't sdsrange `querybuf` in `commandProcessed()`, we trim it to `repl_applied` after the whole replication pipeline processed to avoid fragmented `sdsrange`. And here are some scenarios we cannot trim to `qb_pos`: * we don't receive complete command from master * master client blocked because of client pause * IO threads operate read, master client flagged with CLIENT_PENDING_COMMAND In these scenarios, `qb_pos` points to the part of the current command or the beginning of next command, and the current command is not applied yet, so the `repl_applied` is not equal to `qb_pos`. Some other notes: * Do not do big arg optimization on master client, since we can only sdsrange `querybuf` after data sent to replicas. * Set `qb_pos` and `repl_applied` to 0 when `freeClient` in `replicationCacheMaster`. * Rewrite `processPendingCommandsAndResetClient` to `processPendingCommandAndInputBuffer`, let `processInputBuffer` to be called successively after `processCommandAndResetClient`. --- src/blocked.c | 7 +--- src/networking.c | 78 +++++++++++++++++++++++++++--------------- src/replication.c | 3 +- src/server.c | 18 ---------- src/server.h | 9 ++--- tests/support/util.tcl | 4 +-- 6 files changed, 58 insertions(+), 61 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 2754da9e0..65b584213 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -141,12 +141,7 @@ void processUnblockedClients(void) { * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { /* If we have a queued command, execute it now. */ - if (processPendingCommandsAndResetClient(c) == C_OK) { - /* Now process client if it has more data in it's buffer. */ - if (c->querybuf && sdslen(c->querybuf) > 0) { - if (processInputBuffer(c) == C_ERR) c = NULL; - } - } else { + if (processPendingCommandAndInputBuffer(c) == C_ERR) { c = NULL; } } diff --git a/src/networking.c b/src/networking.c index a96a2e492..767d871d8 100644 --- a/src/networking.c +++ b/src/networking.c @@ -147,7 +147,6 @@ client *createClient(connection *conn) { c->ref_block_pos = 0; c->qb_pos = 0; c->querybuf = sdsempty(); - c->pending_querybuf = sdsempty(); c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; @@ -167,6 +166,7 @@ client *createClient(connection *conn) { c->repl_start_cmd_stream_on_ack = 0; c->reploff = 0; c->read_reploff = 0; + c->repl_applied = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; c->repl_last_partial_write = 0; @@ -1568,7 +1568,6 @@ void freeClient(client *c) { /* Free the query buffer */ sdsfree(c->querybuf); - sdsfree(c->pending_querybuf); c->querybuf = NULL; /* Deallocate structures used to block on blocking ops. */ @@ -2296,8 +2295,12 @@ int processMultibulkBuffer(client *c) { } c->qb_pos = newline-c->querybuf+2; - if (ll >= PROTO_MBULK_BIG_ARG) { - /* If we are going to read a large object from network + if (!(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) { + /* When the client is not a master client (because master + * client's querybuf can only be trimmed after data applied + * and sent to replicas). + * + * If we are going to read a large object from network * try to make it likely that it will start at c->querybuf * boundary so that we can optimize object creation * avoiding a large copy of data. @@ -2328,10 +2331,11 @@ int processMultibulkBuffer(client *c) { c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len); } - /* Optimization: if the buffer contains JUST our bulk element + /* Optimization: if a non-master client's buffer contains JUST our bulk element * instead of creating a new object by *copying* the sds we * just use the current sds string. */ - if (c->qb_pos == 0 && + if (!(c->flags & CLIENT_MASTER) && + c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) { @@ -2392,8 +2396,8 @@ void commandProcessed(client *c) { if (c->flags & CLIENT_MASTER) { long long applied = c->reploff - prev_offset; if (applied) { - replicationFeedStreamFromMasterStream(c->pending_querybuf,applied); - sdsrange(c->pending_querybuf,applied,-1); + replicationFeedStreamFromMasterStream(c->querybuf+c->repl_applied,applied); + c->repl_applied += applied; } } } @@ -2436,13 +2440,22 @@ int processCommandAndResetClient(client *c) { /* This function will execute any fully parsed commands pending on * the client. Returns C_ERR if the client is no longer valid after executing * the command, and C_OK for all other cases. */ -int processPendingCommandsAndResetClient(client *c) { +int processPendingCommandAndInputBuffer(client *c) { if (c->flags & CLIENT_PENDING_COMMAND) { c->flags &= ~CLIENT_PENDING_COMMAND; if (processCommandAndResetClient(c) == C_ERR) { return C_ERR; } } + + /* Now process client if it has more data in it's buffer. + * + * Note: when a master client steps into this function, + * it can always satisfy this condition, because its querbuf + * contains data not applied. */ + if (c->querybuf && sdslen(c->querybuf) > 0) { + return processInputBuffer(c); + } return C_OK; } @@ -2514,8 +2527,26 @@ int processInputBuffer(client *c) { } } - /* Trim to pos */ - if (c->qb_pos) { + if (c->flags & CLIENT_MASTER) { + /* If the client is a master, trim the querybuf to repl_applied, + * since master client is very special, its querybuf not only + * used to parse command, but also proxy to sub-replicas. + * + * Here are some scenarios we cannot trim to qb_pos: + * 1. we don't receive complete command from master + * 2. master client blocked cause of client pause + * 3. io threads operate read, master client flagged with CLIENT_PENDING_COMMAND + * + * In these scenarios, qb_pos points to the part of the current command + * or the beginning of next command, and the current command is not applied yet, + * so the repl_applied is not equal to qb_pos. */ + if (c->repl_applied) { + sdsrange(c->querybuf,c->repl_applied,-1); + c->qb_pos -= c->repl_applied; + c->repl_applied = 0; + } + } else if (c->qb_pos) { + /* Trim to pos */ sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } @@ -2551,16 +2582,22 @@ void readQueryFromClient(connection *conn) { if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { - ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); + ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); big_arg = 1; /* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ if (remaining > 0) readlen = remaining; + + /* Master client needs expand the readlen when meet BIG_ARG(see #9100), + * but doesn't need align to the next arg, we can read more data. */ + if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) + readlen = PROTO_IOBUF_LEN; } qblen = sdslen(c->querybuf); - if (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN) { + if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy. + (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) { /* When reading a BIG_ARG we won't be reading more than that one arg * into the query buffer, so we don't need to pre-allocate more than we * need, so using the non-greedy growing. For an initial allocation of @@ -2590,12 +2627,6 @@ void readQueryFromClient(connection *conn) { } freeClientAsync(c); goto done; - } else if (c->flags & CLIENT_MASTER) { - /* Append the query buffer to the pending (not applied) buffer - * of the master. We'll use this buffer later in order to have a - * copy of the string applied by the last command executed. */ - c->pending_querybuf = sdscatlen(c->pending_querybuf, - c->querybuf+qblen,nread); } sdsIncrLen(c->querybuf,nread); @@ -4288,14 +4319,7 @@ int handleClientsWithPendingReadsUsingThreads(void) { /* Once io-threads are idle we can update the client in the mem usage */ updateClientMemUsage(c); - if (processPendingCommandsAndResetClient(c) == C_ERR) { - /* If the client is no longer valid, we avoid - * processing the client later. So we just go - * to the next. */ - continue; - } - - if (processInputBuffer(c) == C_ERR) { + if (processPendingCommandAndInputBuffer(c) == C_ERR) { /* If the client is no longer valid, we avoid * processing the client later. So we just go * to the next. */ diff --git a/src/replication.c b/src/replication.c index b93c512fc..e9a754ab4 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3197,7 +3197,8 @@ void replicationCacheMaster(client *c) { * offsets, including pending transactions, already populated arguments, * pending outputs to the master. */ sdsclear(server.master->querybuf); - sdsclear(server.master->pending_querybuf); + server.master->qb_pos = 0; + server.master->repl_applied = 0; server.master->read_reploff = server.master->reploff; if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); diff --git a/src/server.c b/src/server.c index 531709d8a..84f21fed3 100644 --- a/src/server.c +++ b/src/server.c @@ -710,23 +710,6 @@ int clientsCronResizeQueryBuffer(client *c) { * which ever is bigger. */ if (c->bulklen != -1 && (size_t)c->bulklen > c->querybuf_peak) c->querybuf_peak = c->bulklen; - - /* Clients representing masters also use a "pending query buffer" that - * is the yet not applied part of the stream we are reading. Such buffer - * also needs resizing from time to time, otherwise after a very large - * transfer (a huge value or a big MIGRATE operation) it will keep using - * a lot of memory. */ - if (c->flags & CLIENT_MASTER) { - /* There are two conditions to resize the pending query buffer: - * 1) Pending Query buffer is > LIMIT_PENDING_QUERYBUF. - * 2) Used length is smaller than pending_querybuf_size/2 */ - size_t pending_querybuf_size = sdsAllocSize(c->pending_querybuf); - if(pending_querybuf_size > LIMIT_PENDING_QUERYBUF && - sdslen(c->pending_querybuf) < (pending_querybuf_size/2)) - { - c->pending_querybuf = sdsRemoveFreeSpace(c->pending_querybuf); - } - } return 0; } @@ -6418,7 +6401,6 @@ void dismissClientMemory(client *c) { /* Dismiss client query buffer and static reply buffer. */ dismissMemory(c->buf, c->buf_usable_size); dismissSds(c->querybuf); - dismissSds(c->pending_querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ if (c->argc && c->argv_len_sum/c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { diff --git a/src/server.h b/src/server.h index 5f5417de1..ba072e7d3 100644 --- a/src/server.h +++ b/src/server.h @@ -166,8 +166,6 @@ typedef long long ustime_t; /* microsecond time type. */ #define PROTO_REPLY_MIN_BYTES (1024) /* the lower limit on reply buffer size */ #define REDIS_AUTOSYNC_BYTES (1024*1024*4) /* Sync file every 4MB. */ -#define LIMIT_PENDING_QUERYBUF (4*1024*1024) /* 4mb */ - #define REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME 5000 /* 5 seconds */ /* When configuring the server eventloop, we setup it so that the total number @@ -1082,10 +1080,6 @@ typedef struct client { robj *name; /* As set by CLIENT SETNAME. */ sds querybuf; /* Buffer we use to accumulate client queries. */ size_t qb_pos; /* The position we have read in querybuf. */ - sds pending_querybuf; /* If this client is flagged as master, this buffer - represents the yet not applied portion of the - replication stream that we are receiving from - the master. */ size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ int argc; /* Num of arguments of current command. */ robj **argv; /* Arguments of current command. */ @@ -1122,6 +1116,7 @@ typedef struct client { sds replpreamble; /* Replication DB preamble. */ long long read_reploff; /* Read replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */ + long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */ long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */ @@ -2848,7 +2843,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev size_t freeMemoryGetNotCountedMemory(); int overMaxmemoryAfterAlloc(size_t moremem); int processCommand(client *c); -int processPendingCommandsAndResetClient(client *c); +int processPendingCommandAndInputBuffer(client *c); void setupSignalHandlers(void); void removeSignalHandlers(void); int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler); diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 46c9654c8..4ad96ab10 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -122,7 +122,7 @@ proc wait_replica_online r { wait_for_condition 50 100 { [string match "*slave0:*,state=online*" [$r info replication]] } else { - fail "replica didn't sync in time" + fail "replica didn't online in time" } } @@ -130,7 +130,7 @@ proc wait_for_ofs_sync {r1 r2} { wait_for_condition 50 100 { [status $r1 master_repl_offset] eq [status $r2 master_repl_offset] } else { - fail "replica didn't sync in time" + fail "replica offset didn't match in time" } }