Enable prefetch with storage providers
Former-commit-id: 21f9c52c446ad1b460a340c3d873839b2355728f
This commit is contained in:
parent
2c6616bd82
commit
bb767ad61d
@ -741,7 +741,7 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
|
||||
/* In Redis commands are always executed in the context of a client, so in
|
||||
* order to load the append only file we need to create a fake client. */
|
||||
struct client *createAOFClient(void) {
|
||||
struct client *c =(client*) zmalloc(sizeof(*c), MALLOC_LOCAL);
|
||||
struct client *c = new client();
|
||||
|
||||
selectDb(c,0);
|
||||
c->id = CLIENT_ID_AOF; /* So modules can identify it's the AOF client. */
|
||||
@ -752,7 +752,6 @@ struct client *createAOFClient(void) {
|
||||
c->querybuf_peak = 0;
|
||||
c->argc = 0;
|
||||
c->argv = NULL;
|
||||
c->argv_len_sum = 0;
|
||||
c->bufpos = 0;
|
||||
c->flags = 0;
|
||||
c->fPendingAsyncWrite = FALSE;
|
||||
@ -783,7 +782,7 @@ void freeFakeClientArgv(struct client *c) {
|
||||
for (j = 0; j < c->argc; j++)
|
||||
decrRefCount(c->argv[j]);
|
||||
zfree(c->argv);
|
||||
c->argv_len_sum = 0;
|
||||
c->argv_len_sumActive = 0;
|
||||
}
|
||||
|
||||
void freeFakeClient(struct client *c) {
|
||||
@ -793,7 +792,7 @@ void freeFakeClient(struct client *c) {
|
||||
freeClientMultiState(c);
|
||||
fastlock_unlock(&c->lock);
|
||||
fastlock_free(&c->lock);
|
||||
zfree(c);
|
||||
delete c;
|
||||
}
|
||||
|
||||
/* Replay the append log file. On success C_OK is returned. On non fatal
|
||||
|
@ -121,7 +121,7 @@ void processUnblockedClients(int iel) {
|
||||
* the code is conceptually more correct this way. */
|
||||
if (!(c->flags & CLIENT_BLOCKED)) {
|
||||
if (c->querybuf && sdslen(c->querybuf) > 0) {
|
||||
processInputBuffer(c, CMD_CALL_FULL);
|
||||
processInputBuffer(c, true /*fParse*/, CMD_CALL_FULL);
|
||||
}
|
||||
}
|
||||
fastlock_unlock(&c->lock);
|
||||
|
17
src/db.cpp
17
src/db.cpp
@ -2959,24 +2959,27 @@ int dbnumFromDb(redisDb *db)
|
||||
serverPanic("invalid database pointer");
|
||||
}
|
||||
|
||||
void redisDbPersistentData::prefetchKeysAsync(AeLocker &lock, client *c)
|
||||
void redisDbPersistentData::prefetchKeysAsync(parsed_command &command)
|
||||
{
|
||||
if (m_spstorage == nullptr)
|
||||
return;
|
||||
|
||||
AeLocker lock;
|
||||
|
||||
std::vector<robj*> veckeys;
|
||||
lock.arm(c);
|
||||
getKeysResult* result = nullptr;
|
||||
int numkeys = getKeysFromCommand(c->cmd, c->argv, c->argc, result);
|
||||
lock.arm(nullptr);
|
||||
getKeysResult result = GETKEYS_RESULT_INIT;
|
||||
auto cmd = lookupCommand(szFromObj(command.argv[0]));
|
||||
int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result);
|
||||
for (int ikey = 0; ikey < numkeys; ++ikey)
|
||||
{
|
||||
robj *objKey = c->argv[result->keys[ikey]];
|
||||
robj *objKey = command.argv[result.keys[ikey]];
|
||||
if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr)
|
||||
veckeys.push_back(objKey);
|
||||
}
|
||||
lock.disarm();
|
||||
|
||||
getKeysFreeResult(result);
|
||||
getKeysFreeResult(&result);
|
||||
|
||||
std::vector<std::tuple<sds, robj*, std::unique_ptr<expireEntry>>> vecInserts;
|
||||
for (robj *objKey : veckeys)
|
||||
@ -2997,7 +3000,7 @@ void redisDbPersistentData::prefetchKeysAsync(AeLocker &lock, client *c)
|
||||
vecInserts.emplace_back(sharedKey, o, std::move(spexpire));
|
||||
}
|
||||
|
||||
lock.arm(c);
|
||||
lock.arm(nullptr);
|
||||
for (auto &tuple : vecInserts)
|
||||
{
|
||||
sds sharedKey = std::get<0>(tuple);
|
||||
|
@ -102,7 +102,7 @@ void linkClient(client *c) {
|
||||
}
|
||||
|
||||
client *createClient(connection *conn, int iel) {
|
||||
client *c = (client*)zmalloc(sizeof(client), MALLOC_LOCAL);
|
||||
client *c = new client;
|
||||
serverAssert(conn == nullptr || (iel == (serverTL - g_pserver->rgthreadvar)));
|
||||
|
||||
c->iel = iel;
|
||||
@ -124,7 +124,6 @@ client *createClient(connection *conn, int iel) {
|
||||
uint64_t client_id;
|
||||
client_id = g_pserver->next_client_id.fetch_add(1);
|
||||
c->iel = iel;
|
||||
fastlock_init(&c->lock, "client");
|
||||
c->id = client_id;
|
||||
c->resp = 2;
|
||||
c->conn = conn;
|
||||
@ -137,7 +136,6 @@ client *createClient(connection *conn, int iel) {
|
||||
c->reqtype = 0;
|
||||
c->argc = 0;
|
||||
c->argv = NULL;
|
||||
c->argv_len_sum = 0;
|
||||
c->cmd = c->lastcmd = NULL;
|
||||
c->puser = DefaultUser;
|
||||
c->multibulklen = 0;
|
||||
@ -157,6 +155,7 @@ client *createClient(connection *conn, int iel) {
|
||||
c->reploff = 0;
|
||||
c->reploff_skipped = 0;
|
||||
c->read_reploff = 0;
|
||||
c->reploff_cmd = 0;
|
||||
c->repl_ack_off = 0;
|
||||
c->repl_ack_time = 0;
|
||||
c->slave_listening_port = 0;
|
||||
@ -203,6 +202,13 @@ client *createClient(connection *conn, int iel) {
|
||||
return c;
|
||||
}
|
||||
|
||||
size_t client::argv_len_sum() const {
|
||||
size_t sum = 0;
|
||||
for (auto &cmd : vecqueuedcmd)
|
||||
sum += cmd.argv_len_sum;
|
||||
return sum + argv_len_sumActive;
|
||||
}
|
||||
|
||||
/* This function puts the client in the queue of clients that should write
|
||||
* their output buffers to the socket. Note that it does not *yet* install
|
||||
* the write handler, to start clients are put in a queue of clients that need
|
||||
@ -1343,7 +1349,7 @@ static void freeClientArgv(client *c) {
|
||||
decrRefCount(c->argv[j]);
|
||||
c->argc = 0;
|
||||
c->cmd = NULL;
|
||||
c->argv_len_sum = 0;
|
||||
c->argv_len_sumActive = 0;
|
||||
}
|
||||
|
||||
void disconnectSlavesExcept(unsigned char *uuid)
|
||||
@ -1574,12 +1580,12 @@ bool freeClient(client *c) {
|
||||
zfree(c->replyAsync);
|
||||
if (c->name) decrRefCount(c->name);
|
||||
zfree(c->argv);
|
||||
c->argv_len_sum = 0;
|
||||
c->argv_len_sumActive = 0;
|
||||
freeClientMultiState(c);
|
||||
sdsfree(c->peerid);
|
||||
ulock.unlock();
|
||||
fastlock_free(&c->lock);
|
||||
zfree(c);
|
||||
delete c;
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1918,9 +1924,6 @@ void resetClient(client *c) {
|
||||
redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
|
||||
|
||||
freeClientArgv(c);
|
||||
c->reqtype = 0;
|
||||
c->multibulklen = 0;
|
||||
c->bulklen = -1;
|
||||
|
||||
/* We clear the ASKING flag as well if we are not inside a MULTI, and
|
||||
* if what we just executed is not the ASKING command itself. */
|
||||
@ -2042,16 +2045,13 @@ int processInlineBuffer(client *c) {
|
||||
|
||||
/* Setup argv array on client structure */
|
||||
if (argc) {
|
||||
if (c->argv) zfree(c->argv);
|
||||
c->argv = (robj**)zmalloc(sizeof(robj*)*argc, MALLOC_LOCAL);
|
||||
c->argv_len_sum = 0;
|
||||
}
|
||||
|
||||
/* Create redis objects for all arguments. */
|
||||
for (c->argc = 0, j = 0; j < argc; j++) {
|
||||
c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
|
||||
c->argc++;
|
||||
c->argv_len_sum += sdslen(argv[j]);
|
||||
/* Create redis objects for all arguments. */
|
||||
c->vecqueuedcmd.emplace_back(argc);
|
||||
auto &cmd = c->vecqueuedcmd.back();
|
||||
for (cmd.argc = 0, j = 0; j < argc; j++) {
|
||||
cmd.argv[cmd.argc++] = createObject(OBJ_STRING,argv[j]);
|
||||
cmd.argv_len_sum += sdslen(argv[j]);
|
||||
}
|
||||
}
|
||||
sds_free(argv);
|
||||
return C_OK;
|
||||
@ -2141,9 +2141,7 @@ int processMultibulkBuffer(client *c) {
|
||||
c->multibulklen = ll;
|
||||
|
||||
/* Setup argv array on client structure */
|
||||
if (c->argv) zfree(c->argv);
|
||||
c->argv = (robj**)zmalloc(sizeof(robj*)*c->multibulklen, MALLOC_LOCAL);
|
||||
c->argv_len_sum = 0;
|
||||
c->vecqueuedcmd.emplace_back(c->multibulklen);
|
||||
}
|
||||
|
||||
serverAssertWithInfo(c,NULL,c->multibulklen > 0);
|
||||
@ -2211,21 +2209,22 @@ int processMultibulkBuffer(client *c) {
|
||||
/* Optimization: if the buffer contains JUST our bulk element
|
||||
* instead of creating a new object by *copying* the sds we
|
||||
* just use the current sds string. */
|
||||
auto &cmd = c->vecqueuedcmd.back();
|
||||
if (c->qb_pos == 0 &&
|
||||
c->bulklen >= PROTO_MBULK_BIG_ARG &&
|
||||
sdslen(c->querybuf) == (size_t)(c->bulklen+2))
|
||||
{
|
||||
c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
|
||||
c->argv_len_sum += c->bulklen;
|
||||
cmd.argv[cmd.argc++] = createObject(OBJ_STRING,c->querybuf);
|
||||
cmd.argv_len_sum += c->bulklen;
|
||||
sdsIncrLen(c->querybuf,-2); /* remove CRLF */
|
||||
/* Assume that if we saw a fat argument we'll see another one
|
||||
* likely... */
|
||||
c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
|
||||
sdsclear(c->querybuf);
|
||||
} else {
|
||||
c->argv[c->argc++] =
|
||||
cmd.argv[cmd.argc++] =
|
||||
createStringObject(c->querybuf+c->qb_pos,c->bulklen);
|
||||
c->argv_len_sum += c->bulklen;
|
||||
cmd.argv_len_sum += c->bulklen;
|
||||
c->qb_pos += c->bulklen+2;
|
||||
}
|
||||
c->bulklen = -1;
|
||||
@ -2249,7 +2248,8 @@ void commandProcessed(client *c, int flags) {
|
||||
long long prev_offset = c->reploff;
|
||||
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
|
||||
/* Update the applied replication offset of our master. */
|
||||
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
|
||||
serverAssert(c->reploff <= c->reploff_cmd);
|
||||
c->reploff = c->reploff_cmd;
|
||||
}
|
||||
|
||||
/* Don't reset the client structure for clients blocked in a
|
||||
@ -2306,34 +2306,34 @@ int processCommandAndResetClient(client *c, int flags) {
|
||||
return deadclient ? C_ERR : C_OK;
|
||||
}
|
||||
|
||||
/* This function is called every time, in the client structure 'c', there is
|
||||
* more query buffer to process, because we read more data from the socket
|
||||
* or because a client was blocked and later reactivated, so there could be
|
||||
* pending query buffer, already representing a full command, to process. */
|
||||
void processInputBuffer(client *c, int callFlags) {
|
||||
AssertCorrectThread(c);
|
||||
static bool FClientReady(client *c) {
|
||||
/* Return if clients are paused. */
|
||||
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) return false;
|
||||
|
||||
/* Immediately abort if the client is in the middle of something. */
|
||||
if (c->flags & CLIENT_BLOCKED) return false;
|
||||
|
||||
/* Don't process input from the master while there is a busy script
|
||||
* condition on the replica. We want just to accumulate the replication
|
||||
* stream (instead of replying -BUSY like we do with other clients) and
|
||||
* later resume the processing. */
|
||||
if (g_pserver->lua_timedout && c->flags & CLIENT_MASTER) return false;
|
||||
|
||||
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
|
||||
* written to the client. Make sure to not let the reply grow after
|
||||
* this flag has been set (i.e. don't process more commands).
|
||||
*
|
||||
* The same applies for clients we want to terminate ASAP. */
|
||||
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void parseClientCommandBuffer(client *c) {
|
||||
if (!FClientReady(c))
|
||||
return;
|
||||
|
||||
/* Keep processing while there is something in the input buffer */
|
||||
while(c->qb_pos < sdslen(c->querybuf)) {
|
||||
/* Return if clients are paused. */
|
||||
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
|
||||
|
||||
/* Immediately abort if the client is in the middle of something. */
|
||||
if (c->flags & CLIENT_BLOCKED) break;
|
||||
|
||||
/* Don't process input from the master while there is a busy script
|
||||
* condition on the replica. We want just to accumulate the replication
|
||||
* stream (instead of replying -BUSY like we do with other clients) and
|
||||
* later resume the processing. */
|
||||
if (g_pserver->lua_timedout && c->flags & CLIENT_MASTER) break;
|
||||
|
||||
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
|
||||
* written to the client. Make sure to not let the reply grow after
|
||||
* this flag has been set (i.e. don't process more commands).
|
||||
*
|
||||
* The same applies for clients we want to terminate ASAP. */
|
||||
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
|
||||
|
||||
while(c->qb_pos < sdslen(c->querybuf)) {
|
||||
/* Determine request type when unknown. */
|
||||
if (!c->reqtype) {
|
||||
if (c->querybuf[c->qb_pos] == '*') {
|
||||
@ -2343,6 +2343,7 @@ void processInputBuffer(client *c, int callFlags) {
|
||||
}
|
||||
}
|
||||
|
||||
size_t cqueries = c->vecqueuedcmd.size();
|
||||
if (c->reqtype == PROTO_REQ_INLINE) {
|
||||
if (processInlineBuffer(c) != C_OK) break;
|
||||
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
|
||||
@ -2350,6 +2351,61 @@ void processInputBuffer(client *c, int callFlags) {
|
||||
} else {
|
||||
serverPanic("Unknown request type");
|
||||
}
|
||||
if (!c->vecqueuedcmd.empty() && (c->vecqueuedcmd.back().argc <= 0 || c->vecqueuedcmd.back().argv == nullptr)) {
|
||||
c->vecqueuedcmd.pop_back();
|
||||
} else if (!c->vecqueuedcmd.empty()) {
|
||||
if (c->flags & CLIENT_MASTER) c->vecqueuedcmd.back().reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
|
||||
serverAssert(c->vecqueuedcmd.back().reploff >= 0);
|
||||
}
|
||||
|
||||
/* Prefetch if we have a storage provider and we're not in the global lock */
|
||||
if (cqueries < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) {
|
||||
auto &query = c->vecqueuedcmd.back();
|
||||
if (query.argc > 0 && query.argc == query.argcMax) {
|
||||
c->db->prefetchKeysAsync(query);
|
||||
}
|
||||
}
|
||||
c->reqtype = 0;
|
||||
c->multibulklen = 0;
|
||||
c->bulklen = -1;
|
||||
c->reqtype = 0;
|
||||
}
|
||||
|
||||
/* Trim to pos */
|
||||
if (c->qb_pos) {
|
||||
sdsrange(c->querybuf,c->qb_pos,-1);
|
||||
c->qb_pos = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* This function is called every time, in the client structure 'c', there is
|
||||
* more query buffer to process, because we read more data from the socket
|
||||
* or because a client was blocked and later reactivated, so there could be
|
||||
* pending query buffer, already representing a full command, to process. */
|
||||
void processInputBuffer(client *c, bool fParse, int callFlags) {
|
||||
AssertCorrectThread(c);
|
||||
|
||||
if (fParse)
|
||||
parseClientCommandBuffer(c);
|
||||
|
||||
/* Keep processing while there is something in the input buffer */
|
||||
while (!c->vecqueuedcmd.empty()) {
|
||||
/* Return if we're still parsing this command */
|
||||
auto &cmd = c->vecqueuedcmd.front();
|
||||
if (cmd.argc != cmd.argcMax) break;
|
||||
|
||||
if (!FClientReady(c)) break;
|
||||
|
||||
zfree(c->argv);
|
||||
c->argc = cmd.argc;
|
||||
c->argv = cmd.argv;
|
||||
cmd.argv = nullptr;
|
||||
c->argv_len_sumActive = cmd.argv_len_sum;
|
||||
cmd.argv_len_sum = 0;
|
||||
c->reploff_cmd = cmd.reploff;
|
||||
serverAssert(c->argv != nullptr);
|
||||
|
||||
c->vecqueuedcmd.erase(c->vecqueuedcmd.begin());
|
||||
|
||||
/* Multibulk processing could see a <= 0 length. */
|
||||
if (c->argc == 0) {
|
||||
@ -2364,12 +2420,6 @@ void processInputBuffer(client *c, int callFlags) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Trim to pos */
|
||||
if (c->qb_pos) {
|
||||
sdsrange(c->querybuf,c->qb_pos,-1);
|
||||
c->qb_pos = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void readQueryFromClient(connection *conn) {
|
||||
@ -2450,6 +2500,8 @@ void readQueryFromClient(connection *conn) {
|
||||
return;
|
||||
}
|
||||
|
||||
parseClientCommandBuffer(c);
|
||||
|
||||
serverTL->vecclientsProcess.push_back(c);
|
||||
}
|
||||
|
||||
@ -2461,7 +2513,7 @@ void processClients()
|
||||
/* There is more data in the client input buffer, continue parsing it
|
||||
* in case to check if there is a full command to execute. */
|
||||
std::unique_lock<fastlock> ul(c->lock);
|
||||
processInputBuffer(c, CMD_CALL_FULL);
|
||||
processInputBuffer(c, false /*fParse*/, CMD_CALL_FULL);
|
||||
}
|
||||
|
||||
if (listLength(serverTL->clients_pending_asyncwrite))
|
||||
@ -2568,7 +2620,7 @@ sds catClientInfoString(sds s, client *client) {
|
||||
/* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
|
||||
* i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
|
||||
* spot problematic clients. */
|
||||
total_mem += client->argv_len_sum;
|
||||
total_mem += client->argv_len_sum();
|
||||
if (client->argv)
|
||||
total_mem += zmalloc_size(client->argv);
|
||||
|
||||
@ -2587,7 +2639,7 @@ sds catClientInfoString(sds s, client *client) {
|
||||
(client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
|
||||
(unsigned long long) sdslen(client->querybuf),
|
||||
(unsigned long long) sdsavail(client->querybuf),
|
||||
(unsigned long long) client->argv_len_sum,
|
||||
(unsigned long long) client->argv_len_sum(),
|
||||
(unsigned long long) client->bufpos,
|
||||
(unsigned long long) listLength(client->reply),
|
||||
(unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
|
||||
@ -3144,10 +3196,10 @@ void rewriteClientCommandVector(client *c, int argc, ...) {
|
||||
/* Replace argv and argc with our new versions. */
|
||||
c->argv = argv;
|
||||
c->argc = argc;
|
||||
c->argv_len_sum = 0;
|
||||
c->argv_len_sumActive = 0;
|
||||
for (j = 0; j < c->argc; j++)
|
||||
if (c->argv[j])
|
||||
c->argv_len_sum += getStringObjectLen(c->argv[j]);
|
||||
c->argv_len_sumActive += getStringObjectLen(c->argv[j]);
|
||||
c->cmd = lookupCommandOrOriginal((sds)ptrFromObj(c->argv[0]));
|
||||
serverAssertWithInfo(c,NULL,c->cmd != NULL);
|
||||
va_end(ap);
|
||||
@ -3160,10 +3212,10 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) {
|
||||
zfree(c->argv);
|
||||
c->argv = argv;
|
||||
c->argc = argc;
|
||||
c->argv_len_sum = 0;
|
||||
c->argv_len_sumActive = 0;
|
||||
for (j = 0; j < c->argc; j++)
|
||||
if (c->argv[j])
|
||||
c->argv_len_sum += getStringObjectLen(c->argv[j]);
|
||||
c->argv_len_sumActive += getStringObjectLen(c->argv[j]);
|
||||
c->cmd = lookupCommandOrOriginal((sds)ptrFromObj(c->argv[0]));
|
||||
serverAssertWithInfo(c,NULL,c->cmd != NULL);
|
||||
}
|
||||
@ -3188,8 +3240,8 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
|
||||
c->argv[i] = NULL;
|
||||
}
|
||||
oldval = c->argv[i];
|
||||
if (oldval) c->argv_len_sum -= getStringObjectLen(oldval);
|
||||
if (newval) c->argv_len_sum += getStringObjectLen(newval);
|
||||
if (oldval) c->argv_len_sumActive -= getStringObjectLen(oldval);
|
||||
if (newval) c->argv_len_sumActive += getStringObjectLen(newval);
|
||||
c->argv[i] = newval;
|
||||
incrRefCount(newval);
|
||||
if (oldval) decrRefCount(oldval);
|
||||
|
@ -3432,6 +3432,13 @@ void replicationCacheMaster(redisMaster *mi, client *c) {
|
||||
* offsets, including pending transactions, already populated arguments,
|
||||
* pending outputs to the master. */
|
||||
sdsclear(mi->master->querybuf);
|
||||
if (!mi->master->vecqueuedcmd.empty()) {
|
||||
// Clear out everything except for partially parsed commands (which we'll cache)
|
||||
auto cmd = std::move(mi->master->vecqueuedcmd.front());
|
||||
mi->master->vecqueuedcmd.clear();
|
||||
if (cmd.argc != cmd.argcMax)
|
||||
mi->master->vecqueuedcmd.emplace_back(std::move(cmd));
|
||||
}
|
||||
sdsclear(mi->master->pending_querybuf);
|
||||
mi->master->read_reploff = mi->master->reploff;
|
||||
if (c->flags & CLIENT_MULTI) discardTransaction(c);
|
||||
@ -4307,10 +4314,12 @@ void replicaReplayCommand(client *c)
|
||||
cFake->authenticated = c->authenticated;
|
||||
cFake->puser = c->puser;
|
||||
cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2]));
|
||||
cFake->read_reploff = sdslen(cFake->querybuf);
|
||||
cFake->reploff = 0;
|
||||
selectDb(cFake, c->db->id);
|
||||
auto ccmdPrev = serverTL->commandsExecuted;
|
||||
cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP;
|
||||
processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE)));
|
||||
processInputBuffer(cFake, true /*fParse*/, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE)));
|
||||
cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP);
|
||||
bool fExec = ccmdPrev != serverTL->commandsExecuted;
|
||||
cFake->lock.unlock();
|
||||
|
@ -1749,7 +1749,7 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
|
||||
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
|
||||
|
||||
int clientsCronTrackExpansiveClients(client *c) {
|
||||
size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum;
|
||||
size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum();
|
||||
size_t out_usage = getClientOutputBufferMemoryUsage(c);
|
||||
int i = g_pserver->unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
|
||||
int zeroidx = (i+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
|
||||
@ -1786,7 +1786,7 @@ int clientsCronTrackClientsMemUsage(client *c) {
|
||||
mem += getClientOutputBufferMemoryUsage(c);
|
||||
mem += sdsZmallocSize(c->querybuf);
|
||||
mem += zmalloc_size(c);
|
||||
mem += c->argv_len_sum;
|
||||
mem += c->argv_len_sum();
|
||||
if (c->argv) mem += zmalloc_size(c->argv);
|
||||
/* Now that we have the memory used by the client, remove the old
|
||||
* value from the old category, and add it back. */
|
||||
|
65
src/server.h
65
src/server.h
@ -1136,7 +1136,7 @@ public:
|
||||
bool removeCachedValue(const char *key);
|
||||
void removeAllCachedValues();
|
||||
|
||||
void prefetchKeysAsync(class AeLocker &locker, client *c);
|
||||
void prefetchKeysAsync(struct parsed_command &command);
|
||||
|
||||
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
|
||||
|
||||
@ -1443,7 +1443,53 @@ typedef struct {
|
||||
need more reserved IDs use UINT64_MAX-1,
|
||||
-2, ... and so forth. */
|
||||
|
||||
typedef struct client {
|
||||
struct parsed_command {
|
||||
robj** argv = nullptr;
|
||||
int argc = 0;
|
||||
int argcMax;
|
||||
long long reploff = 0;
|
||||
size_t argv_len_sum = 0; /* Sum of lengths of objects in argv list. */
|
||||
|
||||
parsed_command(int maxargs) {
|
||||
argv = (robj**)zmalloc(sizeof(robj*)*maxargs);
|
||||
argcMax = maxargs;
|
||||
}
|
||||
|
||||
parsed_command &operator=(parsed_command &&o) {
|
||||
argv = o.argv;
|
||||
argc = o.argc;
|
||||
argcMax = o.argcMax;
|
||||
reploff = o.reploff;
|
||||
o.argv = nullptr;
|
||||
o.argc = 0;
|
||||
o.argcMax = 0;
|
||||
o.reploff = 0;
|
||||
return *this;
|
||||
}
|
||||
|
||||
parsed_command(parsed_command &o) = delete;
|
||||
parsed_command(parsed_command &&o) {
|
||||
argv = o.argv;
|
||||
argc = o.argc;
|
||||
argcMax = o.argcMax;
|
||||
reploff = o.reploff;
|
||||
o.argv = nullptr;
|
||||
o.argc = 0;
|
||||
o.argcMax = 0;
|
||||
o.reploff = 0;
|
||||
}
|
||||
|
||||
~parsed_command() {
|
||||
if (argv != nullptr) {
|
||||
for (int i = 0; i < argc; ++i) {
|
||||
decrRefCount(argv[i]);
|
||||
}
|
||||
zfree(argv);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct client {
|
||||
uint64_t id; /* Client incremental unique ID. */
|
||||
connection *conn;
|
||||
int resp; /* RESP protocol version. Can be 2 or 3. */
|
||||
@ -1456,9 +1502,6 @@ typedef struct client {
|
||||
replication stream that we are receiving from
|
||||
the master. */
|
||||
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
|
||||
int argc; /* Num of arguments of current command. */
|
||||
robj **argv; /* Arguments of current command. */
|
||||
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
|
||||
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
|
||||
user *puser; /* User associated with this connection. If the
|
||||
user is set to NULL the connection can do
|
||||
@ -1488,6 +1531,7 @@ typedef struct client {
|
||||
long long read_reploff; /* Read replication offset if this is a master. */
|
||||
long long reploff; /* Applied replication offset if this is a master. */
|
||||
long long reploff_skipped; /* Repl backlog we did not send to this client */
|
||||
long long reploff_cmd; /* The replication offset of the executing command, reploff gets set to this after the execution completes */
|
||||
long long repl_ack_off; /* Replication ack offset, if this is a replica. */
|
||||
long long repl_ack_time;/* Replication ack time, if this is a replica. */
|
||||
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
|
||||
@ -1545,12 +1589,17 @@ typedef struct client {
|
||||
uint64_t mvccCheckpoint = 0; // the MVCC checkpoint of our last write
|
||||
|
||||
int iel; /* the event loop index we're registered with */
|
||||
struct fastlock lock;
|
||||
struct fastlock lock {"client"};
|
||||
int master_error;
|
||||
std::vector<parsed_command> vecqueuedcmd;
|
||||
int argc;
|
||||
robj **argv;
|
||||
size_t argv_len_sumActive = 0;
|
||||
|
||||
// post a function from a non-client thread to run on its client thread
|
||||
bool postFunction(std::function<void(client *)> fn, bool fLock = true);
|
||||
} client;
|
||||
size_t argv_len_sum() const;
|
||||
};
|
||||
|
||||
struct saveparam {
|
||||
time_t seconds;
|
||||
@ -2516,7 +2565,7 @@ void setDeferredMapLen(client *c, void *node, long length);
|
||||
void setDeferredSetLen(client *c, void *node, long length);
|
||||
void setDeferredAttributeLen(client *c, void *node, long length);
|
||||
void setDeferredPushLen(client *c, void *node, long length);
|
||||
void processInputBuffer(client *c, int callFlags);
|
||||
void processInputBuffer(client *c, bool fParse, int callFlags);
|
||||
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
|
@ -1,7 +1,7 @@
|
||||
start_server {tags {"introspection"}} {
|
||||
test {CLIENT LIST} {
|
||||
r client list
|
||||
} {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client*}
|
||||
} {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client*}
|
||||
|
||||
test {MONITOR can log executed commands} {
|
||||
set rd [redis_deferring_client]
|
||||
|
Loading…
x
Reference in New Issue
Block a user