diff --git a/src/aof.cpp b/src/aof.cpp index ab3aca2c6..e8f77e879 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -741,7 +741,7 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a /* In Redis commands are always executed in the context of a client, so in * order to load the append only file we need to create a fake client. */ struct client *createAOFClient(void) { - struct client *c =(client*) zmalloc(sizeof(*c), MALLOC_LOCAL); + struct client *c = new client(); selectDb(c,0); c->id = CLIENT_ID_AOF; /* So modules can identify it's the AOF client. */ @@ -752,7 +752,6 @@ struct client *createAOFClient(void) { c->querybuf_peak = 0; c->argc = 0; c->argv = NULL; - c->argv_len_sum = 0; c->bufpos = 0; c->flags = 0; c->fPendingAsyncWrite = FALSE; @@ -783,7 +782,7 @@ void freeFakeClientArgv(struct client *c) { for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); zfree(c->argv); - c->argv_len_sum = 0; + c->argv_len_sumActive = 0; } void freeFakeClient(struct client *c) { @@ -793,7 +792,7 @@ void freeFakeClient(struct client *c) { freeClientMultiState(c); fastlock_unlock(&c->lock); fastlock_free(&c->lock); - zfree(c); + delete c; } /* Replay the append log file. On success C_OK is returned. On non fatal diff --git a/src/blocked.cpp b/src/blocked.cpp index d4b7a4804..085090b50 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -121,7 +121,7 @@ void processUnblockedClients(int iel) { * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { if (c->querybuf && sdslen(c->querybuf) > 0) { - processInputBuffer(c, CMD_CALL_FULL); + processInputBuffer(c, true /*fParse*/, CMD_CALL_FULL); } } fastlock_unlock(&c->lock); diff --git a/src/db.cpp b/src/db.cpp index 57e29c7d1..14c292b6f 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2959,24 +2959,27 @@ int dbnumFromDb(redisDb *db) serverPanic("invalid database pointer"); } -void redisDbPersistentData::prefetchKeysAsync(AeLocker &lock, client *c) +void redisDbPersistentData::prefetchKeysAsync(parsed_command &command) { if (m_spstorage == nullptr) return; + AeLocker lock; + std::vector veckeys; - lock.arm(c); - getKeysResult* result = nullptr; - int numkeys = getKeysFromCommand(c->cmd, c->argv, c->argc, result); + lock.arm(nullptr); + getKeysResult result = GETKEYS_RESULT_INIT; + auto cmd = lookupCommand(szFromObj(command.argv[0])); + int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result); for (int ikey = 0; ikey < numkeys; ++ikey) { - robj *objKey = c->argv[result->keys[ikey]]; + robj *objKey = command.argv[result.keys[ikey]]; if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr) veckeys.push_back(objKey); } lock.disarm(); - getKeysFreeResult(result); + getKeysFreeResult(&result); std::vector>> vecInserts; for (robj *objKey : veckeys) @@ -2997,7 +3000,7 @@ void redisDbPersistentData::prefetchKeysAsync(AeLocker &lock, client *c) vecInserts.emplace_back(sharedKey, o, std::move(spexpire)); } - lock.arm(c); + lock.arm(nullptr); for (auto &tuple : vecInserts) { sds sharedKey = std::get<0>(tuple); diff --git a/src/networking.cpp b/src/networking.cpp index 80214963d..be50373c9 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -102,7 +102,7 @@ void linkClient(client *c) { } client *createClient(connection *conn, int iel) { - client *c = (client*)zmalloc(sizeof(client), MALLOC_LOCAL); + client *c = new client; serverAssert(conn == nullptr || (iel == (serverTL - g_pserver->rgthreadvar))); c->iel = iel; @@ -124,7 +124,6 @@ client *createClient(connection *conn, int iel) { uint64_t client_id; client_id = g_pserver->next_client_id.fetch_add(1); c->iel = iel; - fastlock_init(&c->lock, "client"); c->id = client_id; c->resp = 2; c->conn = conn; @@ -137,7 +136,6 @@ client *createClient(connection *conn, int iel) { c->reqtype = 0; c->argc = 0; c->argv = NULL; - c->argv_len_sum = 0; c->cmd = c->lastcmd = NULL; c->puser = DefaultUser; c->multibulklen = 0; @@ -157,6 +155,7 @@ client *createClient(connection *conn, int iel) { c->reploff = 0; c->reploff_skipped = 0; c->read_reploff = 0; + c->reploff_cmd = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; c->slave_listening_port = 0; @@ -203,6 +202,13 @@ client *createClient(connection *conn, int iel) { return c; } +size_t client::argv_len_sum() const { + size_t sum = 0; + for (auto &cmd : vecqueuedcmd) + sum += cmd.argv_len_sum; + return sum + argv_len_sumActive; +} + /* This function puts the client in the queue of clients that should write * their output buffers to the socket. Note that it does not *yet* install * the write handler, to start clients are put in a queue of clients that need @@ -1343,7 +1349,7 @@ static void freeClientArgv(client *c) { decrRefCount(c->argv[j]); c->argc = 0; c->cmd = NULL; - c->argv_len_sum = 0; + c->argv_len_sumActive = 0; } void disconnectSlavesExcept(unsigned char *uuid) @@ -1574,12 +1580,12 @@ bool freeClient(client *c) { zfree(c->replyAsync); if (c->name) decrRefCount(c->name); zfree(c->argv); - c->argv_len_sum = 0; + c->argv_len_sumActive = 0; freeClientMultiState(c); sdsfree(c->peerid); ulock.unlock(); fastlock_free(&c->lock); - zfree(c); + delete c; return true; } @@ -1918,9 +1924,6 @@ void resetClient(client *c) { redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; freeClientArgv(c); - c->reqtype = 0; - c->multibulklen = 0; - c->bulklen = -1; /* We clear the ASKING flag as well if we are not inside a MULTI, and * if what we just executed is not the ASKING command itself. */ @@ -2042,16 +2045,13 @@ int processInlineBuffer(client *c) { /* Setup argv array on client structure */ if (argc) { - if (c->argv) zfree(c->argv); - c->argv = (robj**)zmalloc(sizeof(robj*)*argc, MALLOC_LOCAL); - c->argv_len_sum = 0; - } - - /* Create redis objects for all arguments. */ - for (c->argc = 0, j = 0; j < argc; j++) { - c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); - c->argc++; - c->argv_len_sum += sdslen(argv[j]); + /* Create redis objects for all arguments. */ + c->vecqueuedcmd.emplace_back(argc); + auto &cmd = c->vecqueuedcmd.back(); + for (cmd.argc = 0, j = 0; j < argc; j++) { + cmd.argv[cmd.argc++] = createObject(OBJ_STRING,argv[j]); + cmd.argv_len_sum += sdslen(argv[j]); + } } sds_free(argv); return C_OK; @@ -2141,9 +2141,7 @@ int processMultibulkBuffer(client *c) { c->multibulklen = ll; /* Setup argv array on client structure */ - if (c->argv) zfree(c->argv); - c->argv = (robj**)zmalloc(sizeof(robj*)*c->multibulklen, MALLOC_LOCAL); - c->argv_len_sum = 0; + c->vecqueuedcmd.emplace_back(c->multibulklen); } serverAssertWithInfo(c,NULL,c->multibulklen > 0); @@ -2211,21 +2209,22 @@ int processMultibulkBuffer(client *c) { /* Optimization: if the buffer contains JUST our bulk element * instead of creating a new object by *copying* the sds we * just use the current sds string. */ + auto &cmd = c->vecqueuedcmd.back(); if (c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) { - c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); - c->argv_len_sum += c->bulklen; + cmd.argv[cmd.argc++] = createObject(OBJ_STRING,c->querybuf); + cmd.argv_len_sum += c->bulklen; sdsIncrLen(c->querybuf,-2); /* remove CRLF */ /* Assume that if we saw a fat argument we'll see another one * likely... */ c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); sdsclear(c->querybuf); } else { - c->argv[c->argc++] = + cmd.argv[cmd.argc++] = createStringObject(c->querybuf+c->qb_pos,c->bulklen); - c->argv_len_sum += c->bulklen; + cmd.argv_len_sum += c->bulklen; c->qb_pos += c->bulklen+2; } c->bulklen = -1; @@ -2249,7 +2248,8 @@ void commandProcessed(client *c, int flags) { long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + serverAssert(c->reploff <= c->reploff_cmd); + c->reploff = c->reploff_cmd; } /* Don't reset the client structure for clients blocked in a @@ -2306,34 +2306,34 @@ int processCommandAndResetClient(client *c, int flags) { return deadclient ? C_ERR : C_OK; } -/* This function is called every time, in the client structure 'c', there is - * more query buffer to process, because we read more data from the socket - * or because a client was blocked and later reactivated, so there could be - * pending query buffer, already representing a full command, to process. */ -void processInputBuffer(client *c, int callFlags) { - AssertCorrectThread(c); +static bool FClientReady(client *c) { + /* Return if clients are paused. */ + if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) return false; + + /* Immediately abort if the client is in the middle of something. */ + if (c->flags & CLIENT_BLOCKED) return false; + + /* Don't process input from the master while there is a busy script + * condition on the replica. We want just to accumulate the replication + * stream (instead of replying -BUSY like we do with other clients) and + * later resume the processing. */ + if (g_pserver->lua_timedout && c->flags & CLIENT_MASTER) return false; + + /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is + * written to the client. Make sure to not let the reply grow after + * this flag has been set (i.e. don't process more commands). + * + * The same applies for clients we want to terminate ASAP. */ + if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) return false; + + return true; +} + +void parseClientCommandBuffer(client *c) { + if (!FClientReady(c)) + return; - /* Keep processing while there is something in the input buffer */ - while(c->qb_pos < sdslen(c->querybuf)) { - /* Return if clients are paused. */ - if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; - - /* Immediately abort if the client is in the middle of something. */ - if (c->flags & CLIENT_BLOCKED) break; - - /* Don't process input from the master while there is a busy script - * condition on the replica. We want just to accumulate the replication - * stream (instead of replying -BUSY like we do with other clients) and - * later resume the processing. */ - if (g_pserver->lua_timedout && c->flags & CLIENT_MASTER) break; - - /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is - * written to the client. Make sure to not let the reply grow after - * this flag has been set (i.e. don't process more commands). - * - * The same applies for clients we want to terminate ASAP. */ - if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; - + while(c->qb_pos < sdslen(c->querybuf)) { /* Determine request type when unknown. */ if (!c->reqtype) { if (c->querybuf[c->qb_pos] == '*') { @@ -2343,6 +2343,7 @@ void processInputBuffer(client *c, int callFlags) { } } + size_t cqueries = c->vecqueuedcmd.size(); if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { @@ -2350,6 +2351,61 @@ void processInputBuffer(client *c, int callFlags) { } else { serverPanic("Unknown request type"); } + if (!c->vecqueuedcmd.empty() && (c->vecqueuedcmd.back().argc <= 0 || c->vecqueuedcmd.back().argv == nullptr)) { + c->vecqueuedcmd.pop_back(); + } else if (!c->vecqueuedcmd.empty()) { + if (c->flags & CLIENT_MASTER) c->vecqueuedcmd.back().reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + serverAssert(c->vecqueuedcmd.back().reploff >= 0); + } + + /* Prefetch if we have a storage provider and we're not in the global lock */ + if (cqueries < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) { + auto &query = c->vecqueuedcmd.back(); + if (query.argc > 0 && query.argc == query.argcMax) { + c->db->prefetchKeysAsync(query); + } + } + c->reqtype = 0; + c->multibulklen = 0; + c->bulklen = -1; + c->reqtype = 0; + } + + /* Trim to pos */ + if (c->qb_pos) { + sdsrange(c->querybuf,c->qb_pos,-1); + c->qb_pos = 0; + } +} + +/* This function is called every time, in the client structure 'c', there is + * more query buffer to process, because we read more data from the socket + * or because a client was blocked and later reactivated, so there could be + * pending query buffer, already representing a full command, to process. */ +void processInputBuffer(client *c, bool fParse, int callFlags) { + AssertCorrectThread(c); + + if (fParse) + parseClientCommandBuffer(c); + + /* Keep processing while there is something in the input buffer */ + while (!c->vecqueuedcmd.empty()) { + /* Return if we're still parsing this command */ + auto &cmd = c->vecqueuedcmd.front(); + if (cmd.argc != cmd.argcMax) break; + + if (!FClientReady(c)) break; + + zfree(c->argv); + c->argc = cmd.argc; + c->argv = cmd.argv; + cmd.argv = nullptr; + c->argv_len_sumActive = cmd.argv_len_sum; + cmd.argv_len_sum = 0; + c->reploff_cmd = cmd.reploff; + serverAssert(c->argv != nullptr); + + c->vecqueuedcmd.erase(c->vecqueuedcmd.begin()); /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { @@ -2364,12 +2420,6 @@ void processInputBuffer(client *c, int callFlags) { } } } - - /* Trim to pos */ - if (c->qb_pos) { - sdsrange(c->querybuf,c->qb_pos,-1); - c->qb_pos = 0; - } } void readQueryFromClient(connection *conn) { @@ -2450,6 +2500,8 @@ void readQueryFromClient(connection *conn) { return; } + parseClientCommandBuffer(c); + serverTL->vecclientsProcess.push_back(c); } @@ -2461,7 +2513,7 @@ void processClients() /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ std::unique_lock ul(c->lock); - processInputBuffer(c, CMD_CALL_FULL); + processInputBuffer(c, false /*fParse*/, CMD_CALL_FULL); } if (listLength(serverTL->clients_pending_asyncwrite)) @@ -2568,7 +2620,7 @@ sds catClientInfoString(sds s, client *client) { /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to * spot problematic clients. */ - total_mem += client->argv_len_sum; + total_mem += client->argv_len_sum(); if (client->argv) total_mem += zmalloc_size(client->argv); @@ -2587,7 +2639,7 @@ sds catClientInfoString(sds s, client *client) { (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, (unsigned long long) sdslen(client->querybuf), (unsigned long long) sdsavail(client->querybuf), - (unsigned long long) client->argv_len_sum, + (unsigned long long) client->argv_len_sum(), (unsigned long long) client->bufpos, (unsigned long long) listLength(client->reply), (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ @@ -3144,10 +3196,10 @@ void rewriteClientCommandVector(client *c, int argc, ...) { /* Replace argv and argc with our new versions. */ c->argv = argv; c->argc = argc; - c->argv_len_sum = 0; + c->argv_len_sumActive = 0; for (j = 0; j < c->argc; j++) if (c->argv[j]) - c->argv_len_sum += getStringObjectLen(c->argv[j]); + c->argv_len_sumActive += getStringObjectLen(c->argv[j]); c->cmd = lookupCommandOrOriginal((sds)ptrFromObj(c->argv[0])); serverAssertWithInfo(c,NULL,c->cmd != NULL); va_end(ap); @@ -3160,10 +3212,10 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) { zfree(c->argv); c->argv = argv; c->argc = argc; - c->argv_len_sum = 0; + c->argv_len_sumActive = 0; for (j = 0; j < c->argc; j++) if (c->argv[j]) - c->argv_len_sum += getStringObjectLen(c->argv[j]); + c->argv_len_sumActive += getStringObjectLen(c->argv[j]); c->cmd = lookupCommandOrOriginal((sds)ptrFromObj(c->argv[0])); serverAssertWithInfo(c,NULL,c->cmd != NULL); } @@ -3188,8 +3240,8 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { c->argv[i] = NULL; } oldval = c->argv[i]; - if (oldval) c->argv_len_sum -= getStringObjectLen(oldval); - if (newval) c->argv_len_sum += getStringObjectLen(newval); + if (oldval) c->argv_len_sumActive -= getStringObjectLen(oldval); + if (newval) c->argv_len_sumActive += getStringObjectLen(newval); c->argv[i] = newval; incrRefCount(newval); if (oldval) decrRefCount(oldval); diff --git a/src/replication.cpp b/src/replication.cpp index 1f87c77a4..76c949e4b 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3432,6 +3432,13 @@ void replicationCacheMaster(redisMaster *mi, client *c) { * offsets, including pending transactions, already populated arguments, * pending outputs to the master. */ sdsclear(mi->master->querybuf); + if (!mi->master->vecqueuedcmd.empty()) { + // Clear out everything except for partially parsed commands (which we'll cache) + auto cmd = std::move(mi->master->vecqueuedcmd.front()); + mi->master->vecqueuedcmd.clear(); + if (cmd.argc != cmd.argcMax) + mi->master->vecqueuedcmd.emplace_back(std::move(cmd)); + } sdsclear(mi->master->pending_querybuf); mi->master->read_reploff = mi->master->reploff; if (c->flags & CLIENT_MULTI) discardTransaction(c); @@ -4307,10 +4314,12 @@ void replicaReplayCommand(client *c) cFake->authenticated = c->authenticated; cFake->puser = c->puser; cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); + cFake->read_reploff = sdslen(cFake->querybuf); + cFake->reploff = 0; selectDb(cFake, c->db->id); auto ccmdPrev = serverTL->commandsExecuted; cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP; - processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); + processInputBuffer(cFake, true /*fParse*/, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP); bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); diff --git a/src/server.cpp b/src/server.cpp index e9e71b46e..eda89bc0e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1749,7 +1749,7 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS]; size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS]; int clientsCronTrackExpansiveClients(client *c) { - size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum; + size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum(); size_t out_usage = getClientOutputBufferMemoryUsage(c); int i = g_pserver->unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS; int zeroidx = (i+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS; @@ -1786,7 +1786,7 @@ int clientsCronTrackClientsMemUsage(client *c) { mem += getClientOutputBufferMemoryUsage(c); mem += sdsZmallocSize(c->querybuf); mem += zmalloc_size(c); - mem += c->argv_len_sum; + mem += c->argv_len_sum(); if (c->argv) mem += zmalloc_size(c->argv); /* Now that we have the memory used by the client, remove the old * value from the old category, and add it back. */ diff --git a/src/server.h b/src/server.h index 3f06c9f8b..7d2611c37 100644 --- a/src/server.h +++ b/src/server.h @@ -1136,7 +1136,7 @@ public: bool removeCachedValue(const char *key); void removeAllCachedValues(); - void prefetchKeysAsync(class AeLocker &locker, client *c); + void prefetchKeysAsync(struct parsed_command &command); bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } @@ -1443,7 +1443,53 @@ typedef struct { need more reserved IDs use UINT64_MAX-1, -2, ... and so forth. */ -typedef struct client { +struct parsed_command { + robj** argv = nullptr; + int argc = 0; + int argcMax; + long long reploff = 0; + size_t argv_len_sum = 0; /* Sum of lengths of objects in argv list. */ + + parsed_command(int maxargs) { + argv = (robj**)zmalloc(sizeof(robj*)*maxargs); + argcMax = maxargs; + } + + parsed_command &operator=(parsed_command &&o) { + argv = o.argv; + argc = o.argc; + argcMax = o.argcMax; + reploff = o.reploff; + o.argv = nullptr; + o.argc = 0; + o.argcMax = 0; + o.reploff = 0; + return *this; + } + + parsed_command(parsed_command &o) = delete; + parsed_command(parsed_command &&o) { + argv = o.argv; + argc = o.argc; + argcMax = o.argcMax; + reploff = o.reploff; + o.argv = nullptr; + o.argc = 0; + o.argcMax = 0; + o.reploff = 0; + } + + ~parsed_command() { + if (argv != nullptr) { + for (int i = 0; i < argc; ++i) { + decrRefCount(argv[i]); + } + zfree(argv); + } + } +}; + +struct client { uint64_t id; /* Client incremental unique ID. */ connection *conn; int resp; /* RESP protocol version. Can be 2 or 3. */ @@ -1456,9 +1502,6 @@ typedef struct client { replication stream that we are receiving from the master. */ size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ - int argc; /* Num of arguments of current command. */ - robj **argv; /* Arguments of current command. */ - size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */ user *puser; /* User associated with this connection. If the user is set to NULL the connection can do @@ -1488,6 +1531,7 @@ typedef struct client { long long read_reploff; /* Read replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */ long long reploff_skipped; /* Repl backlog we did not send to this client */ + long long reploff_cmd; /* The replication offset of the executing command, reploff gets set to this after the execution completes */ long long repl_ack_off; /* Replication ack offset, if this is a replica. */ long long repl_ack_time;/* Replication ack time, if this is a replica. */ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves @@ -1545,12 +1589,17 @@ typedef struct client { uint64_t mvccCheckpoint = 0; // the MVCC checkpoint of our last write int iel; /* the event loop index we're registered with */ - struct fastlock lock; + struct fastlock lock {"client"}; int master_error; + std::vector vecqueuedcmd; + int argc; + robj **argv; + size_t argv_len_sumActive = 0; // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); -} client; + size_t argv_len_sum() const; +}; struct saveparam { time_t seconds; @@ -2516,7 +2565,7 @@ void setDeferredMapLen(client *c, void *node, long length); void setDeferredSetLen(client *c, void *node, long length); void setDeferredAttributeLen(client *c, void *node, long length); void setDeferredPushLen(client *c, void *node, long length); -void processInputBuffer(client *c, int callFlags); +void processInputBuffer(client *c, bool fParse, int callFlags); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask); diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 278495931..54d9dbbcd 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -1,7 +1,7 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { r client list - } {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client*} + } {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client*} test {MONITOR can log executed commands} { set rd [redis_deferring_client]