Initial implementation of async commands
Former-commit-id: 0d3a4ce7e60c42b2d81962f13100bd1789bf64a8
This commit is contained in:
parent
34396255a2
commit
cf85a9cd02
31
src/db.cpp
31
src/db.cpp
@ -159,7 +159,7 @@ static robj_roptr lookupKeyConst(redisDb *db, robj *key, int flags) {
|
||||
* expiring our key via DELs in the replication link. */
|
||||
robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
|
||||
robj_roptr val;
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
//serverAssert(GlobalLocksAcquired());
|
||||
|
||||
if (expireIfNeeded(db,key) == 1) {
|
||||
/* If we are in the context of a master, expireIfNeeded() returns 1
|
||||
@ -204,8 +204,35 @@ keymiss:
|
||||
/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
|
||||
* common case. */
|
||||
robj_roptr lookupKeyRead(redisDb *db, robj *key) {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
|
||||
}
|
||||
robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) {
|
||||
robj_roptr o;
|
||||
|
||||
if (aeThreadOwnsLock()) {
|
||||
return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
|
||||
} else {
|
||||
// This is an async command
|
||||
int idb = db->id;
|
||||
if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < mvccCheckpoint) {
|
||||
AeLocker locker;
|
||||
locker.arm(serverTL->current_client);
|
||||
if (serverTL->rgdbSnapshot[idb] != nullptr)
|
||||
db->endSnapshot(serverTL->rgdbSnapshot[idb]);
|
||||
serverTL->rgdbSnapshot[idb] = db->createSnapshot(mvccCheckpoint, true);
|
||||
if (serverTL->rgdbSnapshot[idb] == nullptr) {
|
||||
// We still need to service the read
|
||||
o = lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
|
||||
}
|
||||
}
|
||||
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
||||
o = serverTL->rgdbSnapshot[idb]->find_cached_threadsafe(szFromObj(key)).val();
|
||||
}
|
||||
}
|
||||
|
||||
return o;
|
||||
}
|
||||
|
||||
/* Lookup a key for write operations, and as a side effect, if needed, expires
|
||||
* the key if its TTL is reached.
|
||||
@ -231,7 +258,7 @@ static void SentReplyOnKeyMiss(client *c, robj *reply){
|
||||
}
|
||||
}
|
||||
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
|
||||
robj_roptr o = lookupKeyRead(c->db, key);
|
||||
robj_roptr o = lookupKeyRead(c->db, key, c->mvccCheckpoint);
|
||||
if (!o) SentReplyOnKeyMiss(c, reply);
|
||||
return o;
|
||||
}
|
||||
|
@ -839,6 +839,7 @@ int64_t commandFlagsFromString(char *s) {
|
||||
else if (!strcasecmp(t,"may-replicate")) flags |= CMD_MAY_REPLICATE;
|
||||
else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS;
|
||||
else if (!strcasecmp(t,"no-cluster")) flags |= CMD_MODULE_NO_CLUSTER;
|
||||
else if (!strcasecmp(t,"async")) flags |= CMD_ASYNC_OK;
|
||||
else break;
|
||||
}
|
||||
sdsfreesplitres(tokens,count);
|
||||
|
@ -531,7 +531,7 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
|
||||
/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
|
||||
void afterErrorReply(client *c, const char *s, size_t len, int severity = ERR_CRITICAL) {
|
||||
/* Increment the global error counter */
|
||||
g_pserver->stat_total_error_replies++;
|
||||
serverTL->stat_total_error_replies++;
|
||||
/* Increment the error stats
|
||||
* If the string already starts with "-..." then the error prefix
|
||||
* is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */
|
||||
@ -2465,7 +2465,7 @@ int processCommandAndResetClient(client *c, int flags) {
|
||||
int deadclient = 0;
|
||||
client *old_client = serverTL->current_client;
|
||||
serverTL->current_client = c;
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
serverAssert((flags & CMD_CALL_ASYNC) || GlobalLocksAcquired());
|
||||
|
||||
if (processCommand(c, flags) == C_OK) {
|
||||
commandProcessed(c, flags);
|
||||
@ -2571,6 +2571,15 @@ void parseClientCommandBuffer(client *c) {
|
||||
}
|
||||
}
|
||||
|
||||
bool FAsyncCommand(parsed_command &cmd)
|
||||
{
|
||||
if (serverTL->in_eval || serverTL->in_exec)
|
||||
return false;
|
||||
auto parsedcmd = lookupCommand(szFromObj(cmd.argv[0]));
|
||||
static const long long expectedFlags = CMD_ASYNC_OK | CMD_READONLY;
|
||||
return (parsedcmd->flags & expectedFlags) == expectedFlags;
|
||||
}
|
||||
|
||||
/* This function is called every time, in the client structure 'c', there is
|
||||
* more query buffer to process, because we read more data from the socket
|
||||
* or because a client was blocked and later reactivated, so there could be
|
||||
@ -2589,6 +2598,9 @@ void processInputBuffer(client *c, bool fParse, int callFlags) {
|
||||
|
||||
if (!FClientReady(c)) break;
|
||||
|
||||
if ((callFlags & CMD_CALL_ASYNC) && !FAsyncCommand(cmd))
|
||||
break;
|
||||
|
||||
zfree(c->argv);
|
||||
c->argc = cmd.argc;
|
||||
c->argv = cmd.argv;
|
||||
@ -2698,6 +2710,8 @@ void readQueryFromClient(connection *conn) {
|
||||
|
||||
if (cserver.cthreads > 1) {
|
||||
parseClientCommandBuffer(c);
|
||||
processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC);
|
||||
if (!c->vecqueuedcmd.empty())
|
||||
serverTL->vecclientsProcess.push_back(c);
|
||||
} else {
|
||||
// If we're single threaded its actually better to just process the command here while the query is hot in the cache
|
||||
|
@ -221,7 +221,7 @@ struct redisCommand redisCommandTable[] = {
|
||||
0,NULL,0,0,0,0,0,0},
|
||||
|
||||
{"get",getCommand,2,
|
||||
"read-only fast @string",
|
||||
"read-only fast async @string",
|
||||
0,NULL,1,1,1,0,0,0},
|
||||
|
||||
{"getex",getexCommand,-2,
|
||||
@ -315,7 +315,7 @@ struct redisCommand redisCommandTable[] = {
|
||||
0,NULL,1,1,1,0,0,0},
|
||||
|
||||
{"mget",mgetCommand,-2,
|
||||
"read-only fast @string",
|
||||
"read-only fast async @string",
|
||||
0,NULL,1,-1,1,0,0,0},
|
||||
|
||||
{"rpush",rpushCommand,-3,
|
||||
@ -2407,7 +2407,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
stat_net_input_bytes = g_pserver->stat_net_input_bytes.load(std::memory_order_relaxed);
|
||||
stat_net_output_bytes = g_pserver->stat_net_output_bytes.load(std::memory_order_relaxed);
|
||||
|
||||
trackInstantaneousMetric(STATS_METRIC_COMMAND,g_pserver->stat_numcommands);
|
||||
long long stat_numcommands;
|
||||
__atomic_load(&g_pserver->stat_numcommands, &stat_numcommands, __ATOMIC_RELAXED);
|
||||
trackInstantaneousMetric(STATS_METRIC_COMMAND,stat_numcommands);
|
||||
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
|
||||
stat_net_input_bytes);
|
||||
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
|
||||
@ -2781,6 +2783,13 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
|
||||
locker.arm();
|
||||
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
||||
g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]);
|
||||
serverTL->rgdbSnapshot[idb] = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
size_t zmalloc_used = zmalloc_used_memory();
|
||||
if (zmalloc_used > g_pserver->stat_peak_memory)
|
||||
g_pserver->stat_peak_memory = zmalloc_used;
|
||||
@ -3685,7 +3694,8 @@ void resetServerStats(void) {
|
||||
g_pserver->stat_net_input_bytes = 0;
|
||||
g_pserver->stat_net_output_bytes = 0;
|
||||
g_pserver->stat_unexpected_error_replies = 0;
|
||||
g_pserver->stat_total_error_replies = 0;
|
||||
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||
g_pserver->rgthreadvar[iel].stat_total_error_replies = 0;
|
||||
g_pserver->stat_dump_payload_sanitizations = 0;
|
||||
g_pserver->aof_delayed_fsync = 0;
|
||||
}
|
||||
@ -4045,6 +4055,8 @@ int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags)
|
||||
c->flags |= CMD_NO_AUTH;
|
||||
} else if (!strcasecmp(flag,"may-replicate")) {
|
||||
c->flags |= CMD_MAY_REPLICATE;
|
||||
} else if (!strcasecmp(flag,"async")) {
|
||||
c->flags |= CMD_ASYNC_OK;
|
||||
} else {
|
||||
/* Parse ACL categories here if the flag name starts with @. */
|
||||
uint64_t catflag;
|
||||
@ -4344,8 +4356,8 @@ void call(client *c, int flags) {
|
||||
monotime call_timer;
|
||||
int client_old_flags = c->flags;
|
||||
struct redisCommand *real_cmd = c->cmd;
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
static long long prev_err_count;
|
||||
serverAssert(((flags & CMD_CALL_ASYNC) && (c->cmd->flags & CMD_READONLY)) || GlobalLocksAcquired());
|
||||
long long prev_err_count;
|
||||
|
||||
serverTL->fixed_time_expire++;
|
||||
|
||||
@ -4368,12 +4380,15 @@ void call(client *c, int flags) {
|
||||
/* Initialization: clear the flags that must be set by the command on
|
||||
* demand, and initialize the array for additional commands propagation. */
|
||||
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
|
||||
redisOpArray prev_also_propagate = g_pserver->also_propagate;
|
||||
redisOpArray prev_also_propagate;
|
||||
if (!(flags & CMD_CALL_ASYNC)) {
|
||||
prev_also_propagate = g_pserver->also_propagate;
|
||||
redisOpArrayInit(&g_pserver->also_propagate);
|
||||
}
|
||||
|
||||
/* Call the command. */
|
||||
dirty = g_pserver->dirty;
|
||||
prev_err_count = g_pserver->stat_total_error_replies;
|
||||
prev_err_count = serverTL->stat_total_error_replies;
|
||||
incrementMvccTstamp();
|
||||
elapsedStart(&call_timer);
|
||||
try {
|
||||
@ -4398,7 +4413,7 @@ void call(client *c, int flags) {
|
||||
* We leverage a static variable (prev_err_count) to retain
|
||||
* the counter across nested function calls and avoid logging
|
||||
* the same error twice. */
|
||||
if ((g_pserver->stat_total_error_replies - prev_err_count) > 0) {
|
||||
if ((serverTL->stat_total_error_replies - prev_err_count) > 0) {
|
||||
real_cmd->failed_calls++;
|
||||
}
|
||||
|
||||
@ -4439,8 +4454,13 @@ void call(client *c, int flags) {
|
||||
|
||||
/* Log the command into the Slow log if needed.
|
||||
* If the client is blocked we will handle slowlog when it is unblocked. */
|
||||
if ((flags & CMD_CALL_SLOWLOG) && !(c->flags & CLIENT_BLOCKED))
|
||||
if ((flags & CMD_CALL_SLOWLOG) && !(c->flags & CLIENT_BLOCKED)) {
|
||||
if (duration >= g_pserver->slowlog_log_slower_than) {
|
||||
AeLocker locker;
|
||||
locker.arm(c);
|
||||
slowlogPushCurrentCommand(c, real_cmd, duration);
|
||||
}
|
||||
}
|
||||
|
||||
/* Clear the original argv.
|
||||
* If the client is blocked we will handle slowlog when it is unblocked. */
|
||||
@ -4449,8 +4469,8 @@ void call(client *c, int flags) {
|
||||
|
||||
/* populate the per-command statistics that we show in INFO commandstats. */
|
||||
if (flags & CMD_CALL_STATS) {
|
||||
real_cmd->microseconds += duration;
|
||||
real_cmd->calls++;
|
||||
__atomic_fetch_add(&real_cmd->microseconds, duration, __ATOMIC_RELAXED);
|
||||
__atomic_fetch_add(&real_cmd->calls, 1, __ATOMIC_RELAXED);
|
||||
}
|
||||
|
||||
/* Propagate the command into the AOF and replication link */
|
||||
@ -4494,6 +4514,7 @@ void call(client *c, int flags) {
|
||||
c->flags |= client_old_flags &
|
||||
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
|
||||
|
||||
if (!(flags & CMD_CALL_ASYNC)) {
|
||||
/* Handle the alsoPropagate() API to handle commands that want to propagate
|
||||
* multiple separated commands. Note that alsoPropagate() is not affected
|
||||
* by CLIENT_PREVENT_PROP flag. */
|
||||
@ -4536,6 +4557,7 @@ void call(client *c, int flags) {
|
||||
}
|
||||
|
||||
g_pserver->also_propagate = prev_also_propagate;
|
||||
}
|
||||
|
||||
/* Client pause takes effect after a transaction has finished. This needs
|
||||
* to be located after everything is propagated. */
|
||||
@ -4555,16 +4577,18 @@ void call(client *c, int flags) {
|
||||
}
|
||||
}
|
||||
|
||||
g_pserver->stat_numcommands++;
|
||||
__atomic_fetch_add(&g_pserver->stat_numcommands, 1, __ATOMIC_RELAXED);
|
||||
serverTL->fixed_time_expire--;
|
||||
prev_err_count = g_pserver->stat_total_error_replies;
|
||||
prev_err_count = serverTL->stat_total_error_replies;
|
||||
|
||||
if (!(flags & CMD_CALL_ASYNC)) {
|
||||
/* Record peak memory after each command and before the eviction that runs
|
||||
* before the next command. */
|
||||
size_t zmalloc_used = zmalloc_used_memory();
|
||||
if (zmalloc_used > g_pserver->stat_peak_memory)
|
||||
g_pserver->stat_peak_memory = zmalloc_used;
|
||||
}
|
||||
}
|
||||
|
||||
/* Used when a command that is ready for execution needs to be rejected, due to
|
||||
* varios pre-execution checks. it returns the appropriate error to the client.
|
||||
@ -4619,7 +4643,7 @@ static int cmdHasMovableKeys(struct redisCommand *cmd) {
|
||||
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
|
||||
int processCommand(client *c, int callFlags) {
|
||||
AssertCorrectThread(c);
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
serverAssert((callFlags & CMD_CALL_ASYNC) || GlobalLocksAcquired());
|
||||
if (!g_pserver->lua_timedout) {
|
||||
/* Both EXEC and EVAL call call() directly so there should be
|
||||
* no way in_exec or in_eval or propagate_in_transaction is 1.
|
||||
@ -5798,6 +5822,10 @@ sds genRedisInfoString(const char *section) {
|
||||
stat_net_input_bytes = g_pserver->stat_net_input_bytes.load(std::memory_order_relaxed);
|
||||
stat_net_output_bytes = g_pserver->stat_net_output_bytes.load(std::memory_order_relaxed);
|
||||
|
||||
long long stat_total_error_replies = 0;
|
||||
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||
stat_total_error_replies += g_pserver->rgthreadvar[iel].stat_total_error_replies;
|
||||
|
||||
if (sections++) info = sdscat(info,"\r\n");
|
||||
info = sdscatprintf(info,
|
||||
"# Stats\r\n"
|
||||
@ -5873,7 +5901,7 @@ sds genRedisInfoString(const char *section) {
|
||||
(unsigned long long) trackingGetTotalItems(),
|
||||
(unsigned long long) trackingGetTotalPrefixes(),
|
||||
g_pserver->stat_unexpected_error_replies,
|
||||
g_pserver->stat_total_error_replies,
|
||||
stat_total_error_replies,
|
||||
g_pserver->stat_dump_payload_sanitizations,
|
||||
stat_total_reads_processed,
|
||||
stat_total_writes_processed,
|
||||
|
@ -473,6 +473,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
||||
#define CMD_CATEGORY_SCRIPTING (1ULL<<38)
|
||||
#define CMD_CATEGORY_REPLICATION (1ULL<<39)
|
||||
#define CMD_SKIP_PROPOGATE (1ULL<<40) /* "noprop" flag */
|
||||
#define CMD_ASYNC_OK (1ULL<<41) /* This command is safe without a lock */
|
||||
|
||||
/* AOF states */
|
||||
#define AOF_OFF 0 /* AOF is off */
|
||||
@ -721,6 +722,7 @@ typedef enum {
|
||||
#define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE | CMD_CALL_NOWRAP)
|
||||
#define CMD_CALL_NOWRAP (1<<4) /* Don't wrap also propagate array into
|
||||
MULTI/EXEC: the caller will handle it. */
|
||||
#define CMD_CALL_ASYNC (1<<5)
|
||||
|
||||
/* Command propagation flags, see propagate() function */
|
||||
#define PROPAGATE_NONE 0
|
||||
@ -2017,6 +2019,7 @@ struct redisServerThreadVars {
|
||||
long unsigned commandsExecuted = 0;
|
||||
GarbageCollectorCollection::Epoch gcEpoch;
|
||||
const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
|
||||
long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */
|
||||
bool fRetrySetAofEvent = false;
|
||||
bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before
|
||||
the thread went to sleep? */
|
||||
@ -2241,7 +2244,6 @@ struct redisServer {
|
||||
double stat_module_progress; /* Module save progress. */
|
||||
uint64_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
|
||||
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
|
||||
long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */
|
||||
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
|
||||
std::atomic<long long> stat_total_reads_processed; /* Total number of read events processed */
|
||||
std::atomic<long long> stat_total_writes_processed; /* Total number of write events processed */
|
||||
@ -3311,6 +3313,7 @@ void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey);
|
||||
int expireIfNeeded(redisDb *db, robj *key);
|
||||
void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when);
|
||||
void setExpire(client *c, redisDb *db, robj *key, expireEntry &&entry);
|
||||
robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint);
|
||||
robj_roptr lookupKeyRead(redisDb *db, robj *key);
|
||||
int checkAlreadyExpired(long long when);
|
||||
robj *lookupKeyWrite(redisDb *db, robj *key);
|
||||
|
@ -541,18 +541,15 @@ void mgetCore(client *c, robj **keys, int count, const redisDbPersistentDataSnap
|
||||
}
|
||||
|
||||
void mgetCommand(client *c) {
|
||||
// Do async version for large number of arguments
|
||||
if (c->argc > 100) {
|
||||
if (c->asyncCommand(
|
||||
[c] (const redisDbPersistentDataSnapshot *snapshot, const std::vector<robj_sharedptr> &keys) {
|
||||
mgetCore(c, (robj **)keys.data() + 1, keys.size() - 1, snapshot);
|
||||
}
|
||||
)) {
|
||||
return;
|
||||
addReplyArrayLen(c,c->argc-1);
|
||||
for (int i = 1; i < c->argc; i++) {
|
||||
robj_roptr o = lookupKeyRead(c->db,c->argv[i], c->mvccCheckpoint);
|
||||
if (o == nullptr || o->type != OBJ_STRING) {
|
||||
addReplyNull(c);
|
||||
} else {
|
||||
addReplyBulk(c,o);
|
||||
}
|
||||
}
|
||||
|
||||
mgetCore(c, c->argv + 1, c->argc - 1);
|
||||
}
|
||||
|
||||
void msetGenericCommand(client *c, int nx) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user