Move more code out of the GIL
Former-commit-id: 59dbd625c143c1a890d4387f7a32c997f0d64f5f
This commit is contained in:
parent
89a44bd7e5
commit
913fe8eb78
@ -4988,6 +4988,12 @@ int RM_UnregisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilter *fi
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int moduleHasCommandFilters()
|
||||
{
|
||||
// Note: called outside the global lock
|
||||
return listLength(moduleCommandFilters);
|
||||
}
|
||||
|
||||
void moduleCallCommandFilters(client *c) {
|
||||
if (listLength(moduleCommandFilters) == 0) return;
|
||||
|
||||
|
@ -36,66 +36,12 @@
|
||||
#include <ctype.h>
|
||||
#include <vector>
|
||||
#include <mutex>
|
||||
#include "aelocker.h"
|
||||
|
||||
static void setProtocolError(const char *errstr, client *c);
|
||||
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync);
|
||||
void addReplyBulkCStringCore(client *c, const char *s, bool fAsync);
|
||||
|
||||
class AeLocker
|
||||
{
|
||||
bool m_fArmed = false;
|
||||
|
||||
public:
|
||||
AeLocker()
|
||||
{
|
||||
}
|
||||
|
||||
void arm(client *c) // if a client is passed, then the client is already locked
|
||||
{
|
||||
if (c != nullptr)
|
||||
{
|
||||
serverAssert(!m_fArmed);
|
||||
serverAssert(c->lock.fOwnLock());
|
||||
|
||||
bool fClientLocked = true;
|
||||
while (!aeTryAcquireLock())
|
||||
{
|
||||
if (fClientLocked) c->lock.unlock();
|
||||
fClientLocked = false;
|
||||
aeAcquireLock();
|
||||
if (!c->lock.try_lock())
|
||||
{
|
||||
aeReleaseLock();
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
m_fArmed = true;
|
||||
}
|
||||
else if (!m_fArmed)
|
||||
{
|
||||
m_fArmed = true;
|
||||
aeAcquireLock();
|
||||
}
|
||||
}
|
||||
|
||||
void disarm()
|
||||
{
|
||||
serverAssert(m_fArmed);
|
||||
m_fArmed = false;
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
||||
~AeLocker()
|
||||
{
|
||||
if (m_fArmed)
|
||||
aeReleaseLock();
|
||||
}
|
||||
};
|
||||
|
||||
/* Return the size consumed from the allocator, for the specified SDS string,
|
||||
* including internal fragmentation. This function is used in order to compute
|
||||
* the client output buffer size. */
|
||||
@ -2078,9 +2024,6 @@ void processInputBuffer(client *c, int callFlags) {
|
||||
if (c->argc == 0) {
|
||||
resetClient(c);
|
||||
} else {
|
||||
AeLocker locker;
|
||||
locker.arm(c);
|
||||
|
||||
/* We are finally ready to execute the command. */
|
||||
if (processCommandAndResetClient(c, callFlags) == C_ERR) {
|
||||
/* If the client is no longer valid, we avoid exiting this
|
||||
|
@ -59,6 +59,7 @@
|
||||
#include <sys/socket.h>
|
||||
#include <algorithm>
|
||||
#include <uuid/uuid.h>
|
||||
#include "aelocker.h"
|
||||
|
||||
/* Our shared "common" objects */
|
||||
|
||||
@ -3434,8 +3435,14 @@ void call(client *c, int flags) {
|
||||
* other operations can be performed by the caller. Otherwise
|
||||
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
|
||||
int processCommand(client *c, int callFlags) {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
AeLocker locker;
|
||||
AssertCorrectThread(c);
|
||||
|
||||
if (moduleHasCommandFilters())
|
||||
{
|
||||
locker.arm(c);
|
||||
moduleCallCommandFilters(c);
|
||||
}
|
||||
|
||||
/* The QUIT command is handled separately. Normal command procs will
|
||||
* go through checking for replication and QUIT will cause trouble
|
||||
@ -3447,10 +3454,6 @@ int processCommand(client *c, int callFlags) {
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
AssertCorrectThread(c);
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
incrementMvccTstamp();
|
||||
|
||||
/* Now lookup the command and check ASAP about trivial error conditions
|
||||
* such as wrong arity, bad command name and so forth. */
|
||||
c->cmd = c->lastcmd = lookupCommand((sds)ptrFromObj(c->argv[0]));
|
||||
@ -3487,6 +3490,8 @@ int processCommand(client *c, int callFlags) {
|
||||
|
||||
/* Check if the user can run this command according to the current
|
||||
* ACLs. */
|
||||
if (c->puser && !(c->puser->flags & USER_FLAG_ALLCOMMANDS))
|
||||
locker.arm(c); // ACLs require the lock
|
||||
int acl_retval = ACLCheckCommandPerm(c);
|
||||
if (acl_retval != ACL_OK) {
|
||||
flagTransaction(c);
|
||||
@ -3512,6 +3517,7 @@ int processCommand(client *c, int callFlags) {
|
||||
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
|
||||
c->cmd->proc != execCommand))
|
||||
{
|
||||
locker.arm(c);
|
||||
int hashslot;
|
||||
int error_code;
|
||||
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
|
||||
@ -3527,6 +3533,11 @@ int processCommand(client *c, int callFlags) {
|
||||
}
|
||||
}
|
||||
|
||||
incrementMvccTstamp();
|
||||
|
||||
if (!locker.isArmed())
|
||||
locker.arm(c);
|
||||
|
||||
/* Handle the maxmemory directive.
|
||||
*
|
||||
* Note that we do not want to reclaim memory if we are here re-entering
|
||||
@ -4917,7 +4928,7 @@ void incrementMvccTstamp()
|
||||
}
|
||||
else
|
||||
{
|
||||
g_pserver->mvcc_tstamp = ((uint64_t)g_pserver->mstime) << 20;
|
||||
atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)g_pserver->mstime) << 20);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1671,6 +1671,7 @@ void moduleAcquireGIL(int fServerThread);
|
||||
void moduleReleaseGIL(int fServerThread);
|
||||
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
|
||||
void moduleCallCommandFilters(client *c);
|
||||
int moduleHasCommandFilters();
|
||||
|
||||
/* Utils */
|
||||
long long ustime(void);
|
||||
|
Loading…
x
Reference in New Issue
Block a user