Enable prefetch with storage providers

Former-commit-id: 21f9c52c446ad1b460a340c3d873839b2355728f
This commit is contained in:
John Sully 2021-02-11 00:50:21 +00:00
parent e5e1350219
commit ef471e6e53
8 changed files with 205 additions and 93 deletions

View File

@ -741,7 +741,7 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
/* In Redis commands are always executed in the context of a client, so in /* In Redis commands are always executed in the context of a client, so in
* order to load the append only file we need to create a fake client. */ * order to load the append only file we need to create a fake client. */
struct client *createAOFClient(void) { struct client *createAOFClient(void) {
struct client *c =(client*) zmalloc(sizeof(*c), MALLOC_LOCAL); struct client *c = new client();
selectDb(c,0); selectDb(c,0);
c->id = CLIENT_ID_AOF; /* So modules can identify it's the AOF client. */ c->id = CLIENT_ID_AOF; /* So modules can identify it's the AOF client. */
@ -752,7 +752,6 @@ struct client *createAOFClient(void) {
c->querybuf_peak = 0; c->querybuf_peak = 0;
c->argc = 0; c->argc = 0;
c->argv = NULL; c->argv = NULL;
c->argv_len_sum = 0;
c->bufpos = 0; c->bufpos = 0;
c->flags = 0; c->flags = 0;
c->fPendingAsyncWrite = FALSE; c->fPendingAsyncWrite = FALSE;
@ -783,7 +782,7 @@ void freeFakeClientArgv(struct client *c) {
for (j = 0; j < c->argc; j++) for (j = 0; j < c->argc; j++)
decrRefCount(c->argv[j]); decrRefCount(c->argv[j]);
zfree(c->argv); zfree(c->argv);
c->argv_len_sum = 0; c->argv_len_sumActive = 0;
} }
void freeFakeClient(struct client *c) { void freeFakeClient(struct client *c) {
@ -793,7 +792,7 @@ void freeFakeClient(struct client *c) {
freeClientMultiState(c); freeClientMultiState(c);
fastlock_unlock(&c->lock); fastlock_unlock(&c->lock);
fastlock_free(&c->lock); fastlock_free(&c->lock);
zfree(c); delete c;
} }
/* Replay the append log file. On success C_OK is returned. On non fatal /* Replay the append log file. On success C_OK is returned. On non fatal

View File

@ -121,7 +121,7 @@ void processUnblockedClients(int iel) {
* the code is conceptually more correct this way. */ * the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) { if (!(c->flags & CLIENT_BLOCKED)) {
if (c->querybuf && sdslen(c->querybuf) > 0) { if (c->querybuf && sdslen(c->querybuf) > 0) {
processInputBuffer(c, CMD_CALL_FULL); processInputBuffer(c, true /*fParse*/, CMD_CALL_FULL);
} }
} }
fastlock_unlock(&c->lock); fastlock_unlock(&c->lock);

View File

@ -2959,24 +2959,27 @@ int dbnumFromDb(redisDb *db)
serverPanic("invalid database pointer"); serverPanic("invalid database pointer");
} }
void redisDbPersistentData::prefetchKeysAsync(AeLocker &lock, client *c) void redisDbPersistentData::prefetchKeysAsync(parsed_command &command)
{ {
if (m_spstorage == nullptr) if (m_spstorage == nullptr)
return; return;
AeLocker lock;
std::vector<robj*> veckeys; std::vector<robj*> veckeys;
lock.arm(c); lock.arm(nullptr);
getKeysResult* result = nullptr; getKeysResult result = GETKEYS_RESULT_INIT;
int numkeys = getKeysFromCommand(c->cmd, c->argv, c->argc, result); auto cmd = lookupCommand(szFromObj(command.argv[0]));
int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result);
for (int ikey = 0; ikey < numkeys; ++ikey) for (int ikey = 0; ikey < numkeys; ++ikey)
{ {
robj *objKey = c->argv[result->keys[ikey]]; robj *objKey = command.argv[result.keys[ikey]];
if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr) if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr)
veckeys.push_back(objKey); veckeys.push_back(objKey);
} }
lock.disarm(); lock.disarm();
getKeysFreeResult(result); getKeysFreeResult(&result);
std::vector<std::tuple<sds, robj*, std::unique_ptr<expireEntry>>> vecInserts; std::vector<std::tuple<sds, robj*, std::unique_ptr<expireEntry>>> vecInserts;
for (robj *objKey : veckeys) for (robj *objKey : veckeys)
@ -2997,7 +3000,7 @@ void redisDbPersistentData::prefetchKeysAsync(AeLocker &lock, client *c)
vecInserts.emplace_back(sharedKey, o, std::move(spexpire)); vecInserts.emplace_back(sharedKey, o, std::move(spexpire));
} }
lock.arm(c); lock.arm(nullptr);
for (auto &tuple : vecInserts) for (auto &tuple : vecInserts)
{ {
sds sharedKey = std::get<0>(tuple); sds sharedKey = std::get<0>(tuple);

View File

@ -102,7 +102,7 @@ void linkClient(client *c) {
} }
client *createClient(connection *conn, int iel) { client *createClient(connection *conn, int iel) {
client *c = (client*)zmalloc(sizeof(client), MALLOC_LOCAL); client *c = new client;
serverAssert(conn == nullptr || (iel == (serverTL - g_pserver->rgthreadvar))); serverAssert(conn == nullptr || (iel == (serverTL - g_pserver->rgthreadvar)));
c->iel = iel; c->iel = iel;
@ -124,7 +124,6 @@ client *createClient(connection *conn, int iel) {
uint64_t client_id; uint64_t client_id;
client_id = g_pserver->next_client_id.fetch_add(1); client_id = g_pserver->next_client_id.fetch_add(1);
c->iel = iel; c->iel = iel;
fastlock_init(&c->lock, "client");
c->id = client_id; c->id = client_id;
c->resp = 2; c->resp = 2;
c->conn = conn; c->conn = conn;
@ -137,7 +136,6 @@ client *createClient(connection *conn, int iel) {
c->reqtype = 0; c->reqtype = 0;
c->argc = 0; c->argc = 0;
c->argv = NULL; c->argv = NULL;
c->argv_len_sum = 0;
c->cmd = c->lastcmd = NULL; c->cmd = c->lastcmd = NULL;
c->puser = DefaultUser; c->puser = DefaultUser;
c->multibulklen = 0; c->multibulklen = 0;
@ -157,6 +155,7 @@ client *createClient(connection *conn, int iel) {
c->reploff = 0; c->reploff = 0;
c->reploff_skipped = 0; c->reploff_skipped = 0;
c->read_reploff = 0; c->read_reploff = 0;
c->reploff_cmd = 0;
c->repl_ack_off = 0; c->repl_ack_off = 0;
c->repl_ack_time = 0; c->repl_ack_time = 0;
c->slave_listening_port = 0; c->slave_listening_port = 0;
@ -203,6 +202,13 @@ client *createClient(connection *conn, int iel) {
return c; return c;
} }
size_t client::argv_len_sum() const {
size_t sum = 0;
for (auto &cmd : vecqueuedcmd)
sum += cmd.argv_len_sum;
return sum + argv_len_sumActive;
}
/* This function puts the client in the queue of clients that should write /* This function puts the client in the queue of clients that should write
* their output buffers to the socket. Note that it does not *yet* install * their output buffers to the socket. Note that it does not *yet* install
* the write handler, to start clients are put in a queue of clients that need * the write handler, to start clients are put in a queue of clients that need
@ -1343,7 +1349,7 @@ static void freeClientArgv(client *c) {
decrRefCount(c->argv[j]); decrRefCount(c->argv[j]);
c->argc = 0; c->argc = 0;
c->cmd = NULL; c->cmd = NULL;
c->argv_len_sum = 0; c->argv_len_sumActive = 0;
} }
void disconnectSlavesExcept(unsigned char *uuid) void disconnectSlavesExcept(unsigned char *uuid)
@ -1574,12 +1580,12 @@ bool freeClient(client *c) {
zfree(c->replyAsync); zfree(c->replyAsync);
if (c->name) decrRefCount(c->name); if (c->name) decrRefCount(c->name);
zfree(c->argv); zfree(c->argv);
c->argv_len_sum = 0; c->argv_len_sumActive = 0;
freeClientMultiState(c); freeClientMultiState(c);
sdsfree(c->peerid); sdsfree(c->peerid);
ulock.unlock(); ulock.unlock();
fastlock_free(&c->lock); fastlock_free(&c->lock);
zfree(c); delete c;
return true; return true;
} }
@ -1918,9 +1924,6 @@ void resetClient(client *c) {
redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
freeClientArgv(c); freeClientArgv(c);
c->reqtype = 0;
c->multibulklen = 0;
c->bulklen = -1;
/* We clear the ASKING flag as well if we are not inside a MULTI, and /* We clear the ASKING flag as well if we are not inside a MULTI, and
* if what we just executed is not the ASKING command itself. */ * if what we just executed is not the ASKING command itself. */
@ -2042,16 +2045,13 @@ int processInlineBuffer(client *c) {
/* Setup argv array on client structure */ /* Setup argv array on client structure */
if (argc) { if (argc) {
if (c->argv) zfree(c->argv); /* Create redis objects for all arguments. */
c->argv = (robj**)zmalloc(sizeof(robj*)*argc, MALLOC_LOCAL); c->vecqueuedcmd.emplace_back(argc);
c->argv_len_sum = 0; auto &cmd = c->vecqueuedcmd.back();
} for (cmd.argc = 0, j = 0; j < argc; j++) {
cmd.argv[cmd.argc++] = createObject(OBJ_STRING,argv[j]);
/* Create redis objects for all arguments. */ cmd.argv_len_sum += sdslen(argv[j]);
for (c->argc = 0, j = 0; j < argc; j++) { }
c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
c->argc++;
c->argv_len_sum += sdslen(argv[j]);
} }
sds_free(argv); sds_free(argv);
return C_OK; return C_OK;
@ -2141,9 +2141,7 @@ int processMultibulkBuffer(client *c) {
c->multibulklen = ll; c->multibulklen = ll;
/* Setup argv array on client structure */ /* Setup argv array on client structure */
if (c->argv) zfree(c->argv); c->vecqueuedcmd.emplace_back(c->multibulklen);
c->argv = (robj**)zmalloc(sizeof(robj*)*c->multibulklen, MALLOC_LOCAL);
c->argv_len_sum = 0;
} }
serverAssertWithInfo(c,NULL,c->multibulklen > 0); serverAssertWithInfo(c,NULL,c->multibulklen > 0);
@ -2211,21 +2209,22 @@ int processMultibulkBuffer(client *c) {
/* Optimization: if the buffer contains JUST our bulk element /* Optimization: if the buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we * instead of creating a new object by *copying* the sds we
* just use the current sds string. */ * just use the current sds string. */
auto &cmd = c->vecqueuedcmd.back();
if (c->qb_pos == 0 && if (c->qb_pos == 0 &&
c->bulklen >= PROTO_MBULK_BIG_ARG && c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen+2)) sdslen(c->querybuf) == (size_t)(c->bulklen+2))
{ {
c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); cmd.argv[cmd.argc++] = createObject(OBJ_STRING,c->querybuf);
c->argv_len_sum += c->bulklen; cmd.argv_len_sum += c->bulklen;
sdsIncrLen(c->querybuf,-2); /* remove CRLF */ sdsIncrLen(c->querybuf,-2); /* remove CRLF */
/* Assume that if we saw a fat argument we'll see another one /* Assume that if we saw a fat argument we'll see another one
* likely... */ * likely... */
c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
sdsclear(c->querybuf); sdsclear(c->querybuf);
} else { } else {
c->argv[c->argc++] = cmd.argv[cmd.argc++] =
createStringObject(c->querybuf+c->qb_pos,c->bulklen); createStringObject(c->querybuf+c->qb_pos,c->bulklen);
c->argv_len_sum += c->bulklen; cmd.argv_len_sum += c->bulklen;
c->qb_pos += c->bulklen+2; c->qb_pos += c->bulklen+2;
} }
c->bulklen = -1; c->bulklen = -1;
@ -2249,7 +2248,8 @@ void commandProcessed(client *c, int flags) {
long long prev_offset = c->reploff; long long prev_offset = c->reploff;
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */ /* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; serverAssert(c->reploff <= c->reploff_cmd);
c->reploff = c->reploff_cmd;
} }
/* Don't reset the client structure for clients blocked in a /* Don't reset the client structure for clients blocked in a
@ -2306,34 +2306,34 @@ int processCommandAndResetClient(client *c, int flags) {
return deadclient ? C_ERR : C_OK; return deadclient ? C_ERR : C_OK;
} }
/* This function is called every time, in the client structure 'c', there is static bool FClientReady(client *c) {
* more query buffer to process, because we read more data from the socket /* Return if clients are paused. */
* or because a client was blocked and later reactivated, so there could be if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) return false;
* pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c, int callFlags) { /* Immediately abort if the client is in the middle of something. */
AssertCorrectThread(c); if (c->flags & CLIENT_BLOCKED) return false;
/* Don't process input from the master while there is a busy script
* condition on the replica. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */
if (g_pserver->lua_timedout && c->flags & CLIENT_MASTER) return false;
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands).
*
* The same applies for clients we want to terminate ASAP. */
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) return false;
return true;
}
void parseClientCommandBuffer(client *c) {
if (!FClientReady(c))
return;
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) { while(c->qb_pos < sdslen(c->querybuf)) {
/* Return if clients are paused. */
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;
/* Don't process input from the master while there is a busy script
* condition on the replica. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */
if (g_pserver->lua_timedout && c->flags & CLIENT_MASTER) break;
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands).
*
* The same applies for clients we want to terminate ASAP. */
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
/* Determine request type when unknown. */ /* Determine request type when unknown. */
if (!c->reqtype) { if (!c->reqtype) {
if (c->querybuf[c->qb_pos] == '*') { if (c->querybuf[c->qb_pos] == '*') {
@ -2343,6 +2343,7 @@ void processInputBuffer(client *c, int callFlags) {
} }
} }
size_t cqueries = c->vecqueuedcmd.size();
if (c->reqtype == PROTO_REQ_INLINE) { if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break; if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) { } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
@ -2350,6 +2351,61 @@ void processInputBuffer(client *c, int callFlags) {
} else { } else {
serverPanic("Unknown request type"); serverPanic("Unknown request type");
} }
if (!c->vecqueuedcmd.empty() && (c->vecqueuedcmd.back().argc <= 0 || c->vecqueuedcmd.back().argv == nullptr)) {
c->vecqueuedcmd.pop_back();
} else if (!c->vecqueuedcmd.empty()) {
if (c->flags & CLIENT_MASTER) c->vecqueuedcmd.back().reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
serverAssert(c->vecqueuedcmd.back().reploff >= 0);
}
/* Prefetch if we have a storage provider and we're not in the global lock */
if (cqueries < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) {
auto &query = c->vecqueuedcmd.back();
if (query.argc > 0 && query.argc == query.argcMax) {
c->db->prefetchKeysAsync(query);
}
}
c->reqtype = 0;
c->multibulklen = 0;
c->bulklen = -1;
c->reqtype = 0;
}
/* Trim to pos */
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
}
/* This function is called every time, in the client structure 'c', there is
* more query buffer to process, because we read more data from the socket
* or because a client was blocked and later reactivated, so there could be
* pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c, bool fParse, int callFlags) {
AssertCorrectThread(c);
if (fParse)
parseClientCommandBuffer(c);
/* Keep processing while there is something in the input buffer */
while (!c->vecqueuedcmd.empty()) {
/* Return if we're still parsing this command */
auto &cmd = c->vecqueuedcmd.front();
if (cmd.argc != cmd.argcMax) break;
if (!FClientReady(c)) break;
zfree(c->argv);
c->argc = cmd.argc;
c->argv = cmd.argv;
cmd.argv = nullptr;
c->argv_len_sumActive = cmd.argv_len_sum;
cmd.argv_len_sum = 0;
c->reploff_cmd = cmd.reploff;
serverAssert(c->argv != nullptr);
c->vecqueuedcmd.erase(c->vecqueuedcmd.begin());
/* Multibulk processing could see a <= 0 length. */ /* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) { if (c->argc == 0) {
@ -2364,12 +2420,6 @@ void processInputBuffer(client *c, int callFlags) {
} }
} }
} }
/* Trim to pos */
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
} }
void readQueryFromClient(connection *conn) { void readQueryFromClient(connection *conn) {
@ -2450,6 +2500,8 @@ void readQueryFromClient(connection *conn) {
return; return;
} }
parseClientCommandBuffer(c);
serverTL->vecclientsProcess.push_back(c); serverTL->vecclientsProcess.push_back(c);
} }
@ -2461,7 +2513,7 @@ void processClients()
/* There is more data in the client input buffer, continue parsing it /* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */ * in case to check if there is a full command to execute. */
std::unique_lock<fastlock> ul(c->lock); std::unique_lock<fastlock> ul(c->lock);
processInputBuffer(c, CMD_CALL_FULL); processInputBuffer(c, false /*fParse*/, CMD_CALL_FULL);
} }
if (listLength(serverTL->clients_pending_asyncwrite)) if (listLength(serverTL->clients_pending_asyncwrite))
@ -2568,7 +2620,7 @@ sds catClientInfoString(sds s, client *client) {
/* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
* i.e. unused sds space and internal fragmentation, just the string length. but this is enough to * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
* spot problematic clients. */ * spot problematic clients. */
total_mem += client->argv_len_sum; total_mem += client->argv_len_sum();
if (client->argv) if (client->argv)
total_mem += zmalloc_size(client->argv); total_mem += zmalloc_size(client->argv);
@ -2587,7 +2639,7 @@ sds catClientInfoString(sds s, client *client) {
(client->flags & CLIENT_MULTI) ? client->mstate.count : -1, (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
(unsigned long long) sdslen(client->querybuf), (unsigned long long) sdslen(client->querybuf),
(unsigned long long) sdsavail(client->querybuf), (unsigned long long) sdsavail(client->querybuf),
(unsigned long long) client->argv_len_sum, (unsigned long long) client->argv_len_sum(),
(unsigned long long) client->bufpos, (unsigned long long) client->bufpos,
(unsigned long long) listLength(client->reply), (unsigned long long) listLength(client->reply),
(unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
@ -3144,10 +3196,10 @@ void rewriteClientCommandVector(client *c, int argc, ...) {
/* Replace argv and argc with our new versions. */ /* Replace argv and argc with our new versions. */
c->argv = argv; c->argv = argv;
c->argc = argc; c->argc = argc;
c->argv_len_sum = 0; c->argv_len_sumActive = 0;
for (j = 0; j < c->argc; j++) for (j = 0; j < c->argc; j++)
if (c->argv[j]) if (c->argv[j])
c->argv_len_sum += getStringObjectLen(c->argv[j]); c->argv_len_sumActive += getStringObjectLen(c->argv[j]);
c->cmd = lookupCommandOrOriginal((sds)ptrFromObj(c->argv[0])); c->cmd = lookupCommandOrOriginal((sds)ptrFromObj(c->argv[0]));
serverAssertWithInfo(c,NULL,c->cmd != NULL); serverAssertWithInfo(c,NULL,c->cmd != NULL);
va_end(ap); va_end(ap);
@ -3160,10 +3212,10 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) {
zfree(c->argv); zfree(c->argv);
c->argv = argv; c->argv = argv;
c->argc = argc; c->argc = argc;
c->argv_len_sum = 0; c->argv_len_sumActive = 0;
for (j = 0; j < c->argc; j++) for (j = 0; j < c->argc; j++)
if (c->argv[j]) if (c->argv[j])
c->argv_len_sum += getStringObjectLen(c->argv[j]); c->argv_len_sumActive += getStringObjectLen(c->argv[j]);
c->cmd = lookupCommandOrOriginal((sds)ptrFromObj(c->argv[0])); c->cmd = lookupCommandOrOriginal((sds)ptrFromObj(c->argv[0]));
serverAssertWithInfo(c,NULL,c->cmd != NULL); serverAssertWithInfo(c,NULL,c->cmd != NULL);
} }
@ -3188,8 +3240,8 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
c->argv[i] = NULL; c->argv[i] = NULL;
} }
oldval = c->argv[i]; oldval = c->argv[i];
if (oldval) c->argv_len_sum -= getStringObjectLen(oldval); if (oldval) c->argv_len_sumActive -= getStringObjectLen(oldval);
if (newval) c->argv_len_sum += getStringObjectLen(newval); if (newval) c->argv_len_sumActive += getStringObjectLen(newval);
c->argv[i] = newval; c->argv[i] = newval;
incrRefCount(newval); incrRefCount(newval);
if (oldval) decrRefCount(oldval); if (oldval) decrRefCount(oldval);

View File

@ -3432,6 +3432,13 @@ void replicationCacheMaster(redisMaster *mi, client *c) {
* offsets, including pending transactions, already populated arguments, * offsets, including pending transactions, already populated arguments,
* pending outputs to the master. */ * pending outputs to the master. */
sdsclear(mi->master->querybuf); sdsclear(mi->master->querybuf);
if (!mi->master->vecqueuedcmd.empty()) {
// Clear out everything except for partially parsed commands (which we'll cache)
auto cmd = std::move(mi->master->vecqueuedcmd.front());
mi->master->vecqueuedcmd.clear();
if (cmd.argc != cmd.argcMax)
mi->master->vecqueuedcmd.emplace_back(std::move(cmd));
}
sdsclear(mi->master->pending_querybuf); sdsclear(mi->master->pending_querybuf);
mi->master->read_reploff = mi->master->reploff; mi->master->read_reploff = mi->master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c); if (c->flags & CLIENT_MULTI) discardTransaction(c);
@ -4307,10 +4314,12 @@ void replicaReplayCommand(client *c)
cFake->authenticated = c->authenticated; cFake->authenticated = c->authenticated;
cFake->puser = c->puser; cFake->puser = c->puser;
cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2]));
cFake->read_reploff = sdslen(cFake->querybuf);
cFake->reploff = 0;
selectDb(cFake, c->db->id); selectDb(cFake, c->db->id);
auto ccmdPrev = serverTL->commandsExecuted; auto ccmdPrev = serverTL->commandsExecuted;
cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP; cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP;
processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); processInputBuffer(cFake, true /*fParse*/, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE)));
cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP); cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP);
bool fExec = ccmdPrev != serverTL->commandsExecuted; bool fExec = ccmdPrev != serverTL->commandsExecuted;
cFake->lock.unlock(); cFake->lock.unlock();

View File

@ -1749,7 +1749,7 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS]; size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
int clientsCronTrackExpansiveClients(client *c) { int clientsCronTrackExpansiveClients(client *c) {
size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum; size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum();
size_t out_usage = getClientOutputBufferMemoryUsage(c); size_t out_usage = getClientOutputBufferMemoryUsage(c);
int i = g_pserver->unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS; int i = g_pserver->unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
int zeroidx = (i+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS; int zeroidx = (i+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
@ -1786,7 +1786,7 @@ int clientsCronTrackClientsMemUsage(client *c) {
mem += getClientOutputBufferMemoryUsage(c); mem += getClientOutputBufferMemoryUsage(c);
mem += sdsZmallocSize(c->querybuf); mem += sdsZmallocSize(c->querybuf);
mem += zmalloc_size(c); mem += zmalloc_size(c);
mem += c->argv_len_sum; mem += c->argv_len_sum();
if (c->argv) mem += zmalloc_size(c->argv); if (c->argv) mem += zmalloc_size(c->argv);
/* Now that we have the memory used by the client, remove the old /* Now that we have the memory used by the client, remove the old
* value from the old category, and add it back. */ * value from the old category, and add it back. */

View File

@ -1136,7 +1136,7 @@ public:
bool removeCachedValue(const char *key); bool removeCachedValue(const char *key);
void removeAllCachedValues(); void removeAllCachedValues();
void prefetchKeysAsync(class AeLocker &locker, client *c); void prefetchKeysAsync(struct parsed_command &command);
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
@ -1443,7 +1443,53 @@ typedef struct {
need more reserved IDs use UINT64_MAX-1, need more reserved IDs use UINT64_MAX-1,
-2, ... and so forth. */ -2, ... and so forth. */
typedef struct client { struct parsed_command {
robj** argv = nullptr;
int argc = 0;
int argcMax;
long long reploff = 0;
size_t argv_len_sum = 0; /* Sum of lengths of objects in argv list. */
parsed_command(int maxargs) {
argv = (robj**)zmalloc(sizeof(robj*)*maxargs);
argcMax = maxargs;
}
parsed_command &operator=(parsed_command &&o) {
argv = o.argv;
argc = o.argc;
argcMax = o.argcMax;
reploff = o.reploff;
o.argv = nullptr;
o.argc = 0;
o.argcMax = 0;
o.reploff = 0;
return *this;
}
parsed_command(parsed_command &o) = delete;
parsed_command(parsed_command &&o) {
argv = o.argv;
argc = o.argc;
argcMax = o.argcMax;
reploff = o.reploff;
o.argv = nullptr;
o.argc = 0;
o.argcMax = 0;
o.reploff = 0;
}
~parsed_command() {
if (argv != nullptr) {
for (int i = 0; i < argc; ++i) {
decrRefCount(argv[i]);
}
zfree(argv);
}
}
};
struct client {
uint64_t id; /* Client incremental unique ID. */ uint64_t id; /* Client incremental unique ID. */
connection *conn; connection *conn;
int resp; /* RESP protocol version. Can be 2 or 3. */ int resp; /* RESP protocol version. Can be 2 or 3. */
@ -1456,9 +1502,6 @@ typedef struct client {
replication stream that we are receiving from replication stream that we are receiving from
the master. */ the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */
user *puser; /* User associated with this connection. If the user *puser; /* User associated with this connection. If the
user is set to NULL the connection can do user is set to NULL the connection can do
@ -1488,6 +1531,7 @@ typedef struct client {
long long read_reploff; /* Read replication offset if this is a master. */ long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */
long long reploff_skipped; /* Repl backlog we did not send to this client */ long long reploff_skipped; /* Repl backlog we did not send to this client */
long long reploff_cmd; /* The replication offset of the executing command, reploff gets set to this after the execution completes */
long long repl_ack_off; /* Replication ack offset, if this is a replica. */ long long repl_ack_off; /* Replication ack offset, if this is a replica. */
long long repl_ack_time;/* Replication ack time, if this is a replica. */ long long repl_ack_time;/* Replication ack time, if this is a replica. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
@ -1545,12 +1589,17 @@ typedef struct client {
uint64_t mvccCheckpoint = 0; // the MVCC checkpoint of our last write uint64_t mvccCheckpoint = 0; // the MVCC checkpoint of our last write
int iel; /* the event loop index we're registered with */ int iel; /* the event loop index we're registered with */
struct fastlock lock; struct fastlock lock {"client"};
int master_error; int master_error;
std::vector<parsed_command> vecqueuedcmd;
int argc;
robj **argv;
size_t argv_len_sumActive = 0;
// post a function from a non-client thread to run on its client thread // post a function from a non-client thread to run on its client thread
bool postFunction(std::function<void(client *)> fn, bool fLock = true); bool postFunction(std::function<void(client *)> fn, bool fLock = true);
} client; size_t argv_len_sum() const;
};
struct saveparam { struct saveparam {
time_t seconds; time_t seconds;
@ -2516,7 +2565,7 @@ void setDeferredMapLen(client *c, void *node, long length);
void setDeferredSetLen(client *c, void *node, long length); void setDeferredSetLen(client *c, void *node, long length);
void setDeferredAttributeLen(client *c, void *node, long length); void setDeferredAttributeLen(client *c, void *node, long length);
void setDeferredPushLen(client *c, void *node, long length); void setDeferredPushLen(client *c, void *node, long length);
void processInputBuffer(client *c, int callFlags); void processInputBuffer(client *c, bool fParse, int callFlags);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask);

View File

@ -1,7 +1,7 @@
start_server {tags {"introspection"}} { start_server {tags {"introspection"}} {
test {CLIENT LIST} { test {CLIENT LIST} {
r client list r client list
} {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client*} } {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client*}
test {MONITOR can log executed commands} { test {MONITOR can log executed commands} {
set rd [redis_deferring_client] set rd [redis_deferring_client]