From cf85a9cd0214f454d727eada981ce0e4abc88250 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 21 Sep 2021 20:43:16 +0000 Subject: [PATCH] Initial implementation of async commands Former-commit-id: 0d3a4ce7e60c42b2d81962f13100bd1789bf64a8 --- src/db.cpp | 31 +++++++++- src/module.cpp | 1 + src/networking.cpp | 22 +++++-- src/server.cpp | 148 +++++++++++++++++++++++++++------------------ src/server.h | 5 +- src/t_string.cpp | 17 +++--- 6 files changed, 147 insertions(+), 77 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 589181e58..235816875 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -159,7 +159,7 @@ static robj_roptr lookupKeyConst(redisDb *db, robj *key, int flags) { * expiring our key via DELs in the replication link. */ robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { robj_roptr val; - serverAssert(GlobalLocksAcquired()); + //serverAssert(GlobalLocksAcquired()); if (expireIfNeeded(db,key) == 1) { /* If we are in the context of a master, expireIfNeeded() returns 1 @@ -204,8 +204,35 @@ keymiss: /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the * common case. */ robj_roptr lookupKeyRead(redisDb *db, robj *key) { + serverAssert(GlobalLocksAcquired()); return lookupKeyReadWithFlags(db,key,LOOKUP_NONE); } +robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) { + robj_roptr o; + + if (aeThreadOwnsLock()) { + return lookupKeyReadWithFlags(db,key,LOOKUP_NONE); + } else { + // This is an async command + int idb = db->id; + if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < mvccCheckpoint) { + AeLocker locker; + locker.arm(serverTL->current_client); + if (serverTL->rgdbSnapshot[idb] != nullptr) + db->endSnapshot(serverTL->rgdbSnapshot[idb]); + serverTL->rgdbSnapshot[idb] = db->createSnapshot(mvccCheckpoint, true); + if (serverTL->rgdbSnapshot[idb] == nullptr) { + // We still need to service the read + o = lookupKeyReadWithFlags(db,key,LOOKUP_NONE); + } + } + if (serverTL->rgdbSnapshot[idb] != nullptr) { + o = serverTL->rgdbSnapshot[idb]->find_cached_threadsafe(szFromObj(key)).val(); + } + } + + return o; +} /* Lookup a key for write operations, and as a side effect, if needed, expires * the key if its TTL is reached. @@ -231,7 +258,7 @@ static void SentReplyOnKeyMiss(client *c, robj *reply){ } } robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) { - robj_roptr o = lookupKeyRead(c->db, key); + robj_roptr o = lookupKeyRead(c->db, key, c->mvccCheckpoint); if (!o) SentReplyOnKeyMiss(c, reply); return o; } diff --git a/src/module.cpp b/src/module.cpp index 6963801ce..76ebd0b5b 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -839,6 +839,7 @@ int64_t commandFlagsFromString(char *s) { else if (!strcasecmp(t,"may-replicate")) flags |= CMD_MAY_REPLICATE; else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS; else if (!strcasecmp(t,"no-cluster")) flags |= CMD_MODULE_NO_CLUSTER; + else if (!strcasecmp(t,"async")) flags |= CMD_ASYNC_OK; else break; } sdsfreesplitres(tokens,count); diff --git a/src/networking.cpp b/src/networking.cpp index eaddab9e7..618f9dc01 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -531,7 +531,7 @@ void addReplyErrorLength(client *c, const char *s, size_t len) { /* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */ void afterErrorReply(client *c, const char *s, size_t len, int severity = ERR_CRITICAL) { /* Increment the global error counter */ - g_pserver->stat_total_error_replies++; + serverTL->stat_total_error_replies++; /* Increment the error stats * If the string already starts with "-..." then the error prefix * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */ @@ -2441,7 +2441,7 @@ void commandProcessed(client *c, int flags) { * sub-replicas and to the replication backlog. */ if (c->flags & CLIENT_MASTER) { AeLocker ae; - ae.arm(c); + ae.arm(c); long long applied = c->reploff - prev_offset; if (applied) { if (!g_pserver->fActiveReplica && (flags & CMD_CALL_PROPAGATE)) @@ -2465,7 +2465,7 @@ int processCommandAndResetClient(client *c, int flags) { int deadclient = 0; client *old_client = serverTL->current_client; serverTL->current_client = c; - serverAssert(GlobalLocksAcquired()); + serverAssert((flags & CMD_CALL_ASYNC) || GlobalLocksAcquired()); if (processCommand(c, flags) == C_OK) { commandProcessed(c, flags); @@ -2571,6 +2571,15 @@ void parseClientCommandBuffer(client *c) { } } +bool FAsyncCommand(parsed_command &cmd) +{ + if (serverTL->in_eval || serverTL->in_exec) + return false; + auto parsedcmd = lookupCommand(szFromObj(cmd.argv[0])); + static const long long expectedFlags = CMD_ASYNC_OK | CMD_READONLY; + return (parsedcmd->flags & expectedFlags) == expectedFlags; +} + /* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be @@ -2589,6 +2598,9 @@ void processInputBuffer(client *c, bool fParse, int callFlags) { if (!FClientReady(c)) break; + if ((callFlags & CMD_CALL_ASYNC) && !FAsyncCommand(cmd)) + break; + zfree(c->argv); c->argc = cmd.argc; c->argv = cmd.argv; @@ -2698,7 +2710,9 @@ void readQueryFromClient(connection *conn) { if (cserver.cthreads > 1) { parseClientCommandBuffer(c); - serverTL->vecclientsProcess.push_back(c); + processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC); + if (!c->vecqueuedcmd.empty()) + serverTL->vecclientsProcess.push_back(c); } else { // If we're single threaded its actually better to just process the command here while the query is hot in the cache // multithreaded lock contention dominates and batching is better diff --git a/src/server.cpp b/src/server.cpp index 3b20f59f7..30a030052 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -221,7 +221,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"get",getCommand,2, - "read-only fast @string", + "read-only fast async @string", 0,NULL,1,1,1,0,0,0}, {"getex",getexCommand,-2, @@ -315,7 +315,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,1,1,1,0,0,0}, {"mget",mgetCommand,-2, - "read-only fast @string", + "read-only fast async @string", 0,NULL,1,-1,1,0,0,0}, {"rpush",rpushCommand,-3, @@ -2407,7 +2407,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { stat_net_input_bytes = g_pserver->stat_net_input_bytes.load(std::memory_order_relaxed); stat_net_output_bytes = g_pserver->stat_net_output_bytes.load(std::memory_order_relaxed); - trackInstantaneousMetric(STATS_METRIC_COMMAND,g_pserver->stat_numcommands); + long long stat_numcommands; + __atomic_load(&g_pserver->stat_numcommands, &stat_numcommands, __ATOMIC_RELAXED); + trackInstantaneousMetric(STATS_METRIC_COMMAND,stat_numcommands); trackInstantaneousMetric(STATS_METRIC_NET_INPUT, stat_net_input_bytes); trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, @@ -2781,6 +2783,13 @@ void beforeSleep(struct aeEventLoop *eventLoop) { locker.arm(); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + if (serverTL->rgdbSnapshot[idb] != nullptr) { + g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]); + serverTL->rgdbSnapshot[idb] = nullptr; + } + } + size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > g_pserver->stat_peak_memory) g_pserver->stat_peak_memory = zmalloc_used; @@ -3685,7 +3694,8 @@ void resetServerStats(void) { g_pserver->stat_net_input_bytes = 0; g_pserver->stat_net_output_bytes = 0; g_pserver->stat_unexpected_error_replies = 0; - g_pserver->stat_total_error_replies = 0; + for (int iel = 0; iel < cserver.cthreads; ++iel) + g_pserver->rgthreadvar[iel].stat_total_error_replies = 0; g_pserver->stat_dump_payload_sanitizations = 0; g_pserver->aof_delayed_fsync = 0; } @@ -4045,6 +4055,8 @@ int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags) c->flags |= CMD_NO_AUTH; } else if (!strcasecmp(flag,"may-replicate")) { c->flags |= CMD_MAY_REPLICATE; + } else if (!strcasecmp(flag,"async")) { + c->flags |= CMD_ASYNC_OK; } else { /* Parse ACL categories here if the flag name starts with @. */ uint64_t catflag; @@ -4344,8 +4356,8 @@ void call(client *c, int flags) { monotime call_timer; int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; - serverAssert(GlobalLocksAcquired()); - static long long prev_err_count; + serverAssert(((flags & CMD_CALL_ASYNC) && (c->cmd->flags & CMD_READONLY)) || GlobalLocksAcquired()); + long long prev_err_count; serverTL->fixed_time_expire++; @@ -4368,12 +4380,15 @@ void call(client *c, int flags) { /* Initialization: clear the flags that must be set by the command on * demand, and initialize the array for additional commands propagation. */ c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); - redisOpArray prev_also_propagate = g_pserver->also_propagate; - redisOpArrayInit(&g_pserver->also_propagate); + redisOpArray prev_also_propagate; + if (!(flags & CMD_CALL_ASYNC)) { + prev_also_propagate = g_pserver->also_propagate; + redisOpArrayInit(&g_pserver->also_propagate); + } /* Call the command. */ dirty = g_pserver->dirty; - prev_err_count = g_pserver->stat_total_error_replies; + prev_err_count = serverTL->stat_total_error_replies; incrementMvccTstamp(); elapsedStart(&call_timer); try { @@ -4398,7 +4413,7 @@ void call(client *c, int flags) { * We leverage a static variable (prev_err_count) to retain * the counter across nested function calls and avoid logging * the same error twice. */ - if ((g_pserver->stat_total_error_replies - prev_err_count) > 0) { + if ((serverTL->stat_total_error_replies - prev_err_count) > 0) { real_cmd->failed_calls++; } @@ -4439,8 +4454,13 @@ void call(client *c, int flags) { /* Log the command into the Slow log if needed. * If the client is blocked we will handle slowlog when it is unblocked. */ - if ((flags & CMD_CALL_SLOWLOG) && !(c->flags & CLIENT_BLOCKED)) - slowlogPushCurrentCommand(c, real_cmd, duration); + if ((flags & CMD_CALL_SLOWLOG) && !(c->flags & CLIENT_BLOCKED)) { + if (duration >= g_pserver->slowlog_log_slower_than) { + AeLocker locker; + locker.arm(c); + slowlogPushCurrentCommand(c, real_cmd, duration); + } + } /* Clear the original argv. * If the client is blocked we will handle slowlog when it is unblocked. */ @@ -4449,8 +4469,8 @@ void call(client *c, int flags) { /* populate the per-command statistics that we show in INFO commandstats. */ if (flags & CMD_CALL_STATS) { - real_cmd->microseconds += duration; - real_cmd->calls++; + __atomic_fetch_add(&real_cmd->microseconds, duration, __ATOMIC_RELAXED); + __atomic_fetch_add(&real_cmd->calls, 1, __ATOMIC_RELAXED); } /* Propagate the command into the AOF and replication link */ @@ -4494,48 +4514,50 @@ void call(client *c, int flags) { c->flags |= client_old_flags & (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); - /* Handle the alsoPropagate() API to handle commands that want to propagate - * multiple separated commands. Note that alsoPropagate() is not affected - * by CLIENT_PREVENT_PROP flag. */ - if (g_pserver->also_propagate.numops) { - int j; - redisOp *rop; + if (!(flags & CMD_CALL_ASYNC)) { + /* Handle the alsoPropagate() API to handle commands that want to propagate + * multiple separated commands. Note that alsoPropagate() is not affected + * by CLIENT_PREVENT_PROP flag. */ + if (g_pserver->also_propagate.numops) { + int j; + redisOp *rop; - if (flags & CMD_CALL_PROPAGATE) { - bool multi_emitted = false; - /* Wrap the commands in g_pserver->also_propagate array, - * but don't wrap it if we are already in MULTI context, - * in case the nested MULTI/EXEC. - * - * And if the array contains only one command, no need to - * wrap it, since the single command is atomic. */ - if (g_pserver->also_propagate.numops > 1 && - !(c->cmd->flags & CMD_MODULE) && - !(c->flags & CLIENT_MULTI) && - !(flags & CMD_CALL_NOWRAP)) - { - execCommandPropagateMulti(c->db->id); - multi_emitted = true; - } - - for (j = 0; j < g_pserver->also_propagate.numops; j++) { - rop = &g_pserver->also_propagate.ops[j]; - int target = rop->target; - /* Whatever the command wish is, we honor the call() flags. */ - if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF; - if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL; - if (target) - propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); - } + if (flags & CMD_CALL_PROPAGATE) { + bool multi_emitted = false; + /* Wrap the commands in g_pserver->also_propagate array, + * but don't wrap it if we are already in MULTI context, + * in case the nested MULTI/EXEC. + * + * And if the array contains only one command, no need to + * wrap it, since the single command is atomic. */ + if (g_pserver->also_propagate.numops > 1 && + !(c->cmd->flags & CMD_MODULE) && + !(c->flags & CLIENT_MULTI) && + !(flags & CMD_CALL_NOWRAP)) + { + execCommandPropagateMulti(c->db->id); + multi_emitted = true; + } + + for (j = 0; j < g_pserver->also_propagate.numops; j++) { + rop = &g_pserver->also_propagate.ops[j]; + int target = rop->target; + /* Whatever the command wish is, we honor the call() flags. */ + if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF; + if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL; + if (target) + propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); + } - if (multi_emitted) { - execCommandPropagateExec(c->db->id); + if (multi_emitted) { + execCommandPropagateExec(c->db->id); + } } + redisOpArrayFree(&g_pserver->also_propagate); } - redisOpArrayFree(&g_pserver->also_propagate); + + g_pserver->also_propagate = prev_also_propagate; } - - g_pserver->also_propagate = prev_also_propagate; /* Client pause takes effect after a transaction has finished. This needs * to be located after everything is propagated. */ @@ -4555,15 +4577,17 @@ void call(client *c, int flags) { } } - g_pserver->stat_numcommands++; + __atomic_fetch_add(&g_pserver->stat_numcommands, 1, __ATOMIC_RELAXED); serverTL->fixed_time_expire--; - prev_err_count = g_pserver->stat_total_error_replies; + prev_err_count = serverTL->stat_total_error_replies; - /* Record peak memory after each command and before the eviction that runs - * before the next command. */ - size_t zmalloc_used = zmalloc_used_memory(); - if (zmalloc_used > g_pserver->stat_peak_memory) - g_pserver->stat_peak_memory = zmalloc_used; + if (!(flags & CMD_CALL_ASYNC)) { + /* Record peak memory after each command and before the eviction that runs + * before the next command. */ + size_t zmalloc_used = zmalloc_used_memory(); + if (zmalloc_used > g_pserver->stat_peak_memory) + g_pserver->stat_peak_memory = zmalloc_used; + } } /* Used when a command that is ready for execution needs to be rejected, due to @@ -4619,7 +4643,7 @@ static int cmdHasMovableKeys(struct redisCommand *cmd) { * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ int processCommand(client *c, int callFlags) { AssertCorrectThread(c); - serverAssert(GlobalLocksAcquired()); + serverAssert((callFlags & CMD_CALL_ASYNC) || GlobalLocksAcquired()); if (!g_pserver->lua_timedout) { /* Both EXEC and EVAL call call() directly so there should be * no way in_exec or in_eval or propagate_in_transaction is 1. @@ -5798,6 +5822,10 @@ sds genRedisInfoString(const char *section) { stat_net_input_bytes = g_pserver->stat_net_input_bytes.load(std::memory_order_relaxed); stat_net_output_bytes = g_pserver->stat_net_output_bytes.load(std::memory_order_relaxed); + long long stat_total_error_replies = 0; + for (int iel = 0; iel < cserver.cthreads; ++iel) + stat_total_error_replies += g_pserver->rgthreadvar[iel].stat_total_error_replies; + if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# Stats\r\n" @@ -5873,7 +5901,7 @@ sds genRedisInfoString(const char *section) { (unsigned long long) trackingGetTotalItems(), (unsigned long long) trackingGetTotalPrefixes(), g_pserver->stat_unexpected_error_replies, - g_pserver->stat_total_error_replies, + stat_total_error_replies, g_pserver->stat_dump_payload_sanitizations, stat_total_reads_processed, stat_total_writes_processed, diff --git a/src/server.h b/src/server.h index 2081de713..6a0196a22 100644 --- a/src/server.h +++ b/src/server.h @@ -473,6 +473,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CMD_CATEGORY_SCRIPTING (1ULL<<38) #define CMD_CATEGORY_REPLICATION (1ULL<<39) #define CMD_SKIP_PROPOGATE (1ULL<<40) /* "noprop" flag */ +#define CMD_ASYNC_OK (1ULL<<41) /* This command is safe without a lock */ /* AOF states */ #define AOF_OFF 0 /* AOF is off */ @@ -721,6 +722,7 @@ typedef enum { #define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE | CMD_CALL_NOWRAP) #define CMD_CALL_NOWRAP (1<<4) /* Don't wrap also propagate array into MULTI/EXEC: the caller will handle it. */ +#define CMD_CALL_ASYNC (1<<5) /* Command propagation flags, see propagate() function */ #define PROPAGATE_NONE 0 @@ -2017,6 +2019,7 @@ struct redisServerThreadVars { long unsigned commandsExecuted = 0; GarbageCollectorCollection::Epoch gcEpoch; const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr; + long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ bool fRetrySetAofEvent = false; bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before the thread went to sleep? */ @@ -2241,7 +2244,6 @@ struct redisServer { double stat_module_progress; /* Module save progress. */ uint64_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */ long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ - long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ std::atomic stat_total_reads_processed; /* Total number of read events processed */ std::atomic stat_total_writes_processed; /* Total number of write events processed */ @@ -3311,6 +3313,7 @@ void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey); int expireIfNeeded(redisDb *db, robj *key); void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when); void setExpire(client *c, redisDb *db, robj *key, expireEntry &&entry); +robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint); robj_roptr lookupKeyRead(redisDb *db, robj *key); int checkAlreadyExpired(long long when); robj *lookupKeyWrite(redisDb *db, robj *key); diff --git a/src/t_string.cpp b/src/t_string.cpp index de98bea7b..bc28a7bdb 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -541,18 +541,15 @@ void mgetCore(client *c, robj **keys, int count, const redisDbPersistentDataSnap } void mgetCommand(client *c) { - // Do async version for large number of arguments - if (c->argc > 100) { - if (c->asyncCommand( - [c] (const redisDbPersistentDataSnapshot *snapshot, const std::vector &keys) { - mgetCore(c, (robj **)keys.data() + 1, keys.size() - 1, snapshot); - } - )) { - return; + addReplyArrayLen(c,c->argc-1); + for (int i = 1; i < c->argc; i++) { + robj_roptr o = lookupKeyRead(c->db,c->argv[i], c->mvccCheckpoint); + if (o == nullptr || o->type != OBJ_STRING) { + addReplyNull(c); + } else { + addReplyBulk(c,o); } } - - mgetCore(c, c->argv + 1, c->argc - 1); } void msetGenericCommand(client *c, int nx) {