diff --git a/src/module.cpp b/src/module.cpp index 56a951977..66c62b696 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -4988,6 +4988,12 @@ int RM_UnregisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilter *fi return REDISMODULE_OK; } +int moduleHasCommandFilters() +{ + // Note: called outside the global lock + return listLength(moduleCommandFilters); +} + void moduleCallCommandFilters(client *c) { if (listLength(moduleCommandFilters) == 0) return; diff --git a/src/networking.cpp b/src/networking.cpp index 92b41d277..fa6740048 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -36,66 +36,12 @@ #include #include #include +#include "aelocker.h" static void setProtocolError(const char *errstr, client *c); void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); void addReplyBulkCStringCore(client *c, const char *s, bool fAsync); -class AeLocker -{ - bool m_fArmed = false; - -public: - AeLocker() - { - } - - void arm(client *c) // if a client is passed, then the client is already locked - { - if (c != nullptr) - { - serverAssert(!m_fArmed); - serverAssert(c->lock.fOwnLock()); - - bool fClientLocked = true; - while (!aeTryAcquireLock()) - { - if (fClientLocked) c->lock.unlock(); - fClientLocked = false; - aeAcquireLock(); - if (!c->lock.try_lock()) - { - aeReleaseLock(); - } - else - { - break; - } - } - - m_fArmed = true; - } - else if (!m_fArmed) - { - m_fArmed = true; - aeAcquireLock(); - } - } - - void disarm() - { - serverAssert(m_fArmed); - m_fArmed = false; - aeReleaseLock(); - } - - ~AeLocker() - { - if (m_fArmed) - aeReleaseLock(); - } -}; - /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute * the client output buffer size. */ @@ -2078,9 +2024,6 @@ void processInputBuffer(client *c, int callFlags) { if (c->argc == 0) { resetClient(c); } else { - AeLocker locker; - locker.arm(c); - /* We are finally ready to execute the command. */ if (processCommandAndResetClient(c, callFlags) == C_ERR) { /* If the client is no longer valid, we avoid exiting this diff --git a/src/server.cpp b/src/server.cpp index 46e4aa9a8..443ff53ee 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -59,6 +59,7 @@ #include #include #include +#include "aelocker.h" /* Our shared "common" objects */ @@ -3434,8 +3435,14 @@ void call(client *c, int flags) { * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ int processCommand(client *c, int callFlags) { - serverAssert(GlobalLocksAcquired()); - moduleCallCommandFilters(c); + AeLocker locker; + AssertCorrectThread(c); + + if (moduleHasCommandFilters()) + { + locker.arm(c); + moduleCallCommandFilters(c); + } /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble @@ -3447,10 +3454,6 @@ int processCommand(client *c, int callFlags) { return C_ERR; } - AssertCorrectThread(c); - serverAssert(GlobalLocksAcquired()); - incrementMvccTstamp(); - /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ c->cmd = c->lastcmd = lookupCommand((sds)ptrFromObj(c->argv[0])); @@ -3487,6 +3490,8 @@ int processCommand(client *c, int callFlags) { /* Check if the user can run this command according to the current * ACLs. */ + if (c->puser && !(c->puser->flags & USER_FLAG_ALLCOMMANDS)) + locker.arm(c); // ACLs require the lock int acl_retval = ACLCheckCommandPerm(c); if (acl_retval != ACL_OK) { flagTransaction(c); @@ -3512,6 +3517,7 @@ int processCommand(client *c, int callFlags) { !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) { + locker.arm(c); int hashslot; int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, @@ -3527,6 +3533,11 @@ int processCommand(client *c, int callFlags) { } } + incrementMvccTstamp(); + + if (!locker.isArmed()) + locker.arm(c); + /* Handle the maxmemory directive. * * Note that we do not want to reclaim memory if we are here re-entering @@ -4917,7 +4928,7 @@ void incrementMvccTstamp() } else { - g_pserver->mvcc_tstamp = ((uint64_t)g_pserver->mstime) << 20; + atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)g_pserver->mstime) << 20); } } diff --git a/src/server.h b/src/server.h index 72c2bf634..d3765a000 100644 --- a/src/server.h +++ b/src/server.h @@ -1671,6 +1671,7 @@ void moduleAcquireGIL(int fServerThread); void moduleReleaseGIL(int fServerThread); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void moduleCallCommandFilters(client *c); +int moduleHasCommandFilters(); /* Utils */ long long ustime(void);