diff --git a/src/aelocker.h b/src/aelocker.h index 562ac55ef..d5c8186bf 100644 --- a/src/aelocker.h +++ b/src/aelocker.h @@ -19,11 +19,12 @@ public: if (!aeTryAcquireLock(true /*fWeak*/)) // avoid locking the client if we can { bool fOwnClientLock = true; + int clientNesting = 1; for (;;) { if (fOwnClientLock) { - c->lock.unlock(); + clientNesting = c->lock.unlock_recursive(); fOwnClientLock = false; } aeAcquireLock(); @@ -36,6 +37,7 @@ public: break; } } + c->lock.lock_recursive(clientNesting); } m_fArmed = true; diff --git a/src/fastlock.cpp b/src/fastlock.cpp index fdf85044a..33de19866 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -220,3 +220,17 @@ bool fastlock::fOwnLock() { return gettid() == m_pidOwner; } + +int fastlock_unlock_recursive(struct fastlock *lock) +{ + int rval = lock->m_depth; + lock->m_depth = 1; + fastlock_unlock(lock); + return rval; +} + +void fastlock_lock_recursive(struct fastlock *lock, int nesting) +{ + fastlock_lock(lock); + lock->m_depth = nesting; +} \ No newline at end of file diff --git a/src/fastlock.h b/src/fastlock.h index 26a4b4d01..c7a40bdf3 100644 --- a/src/fastlock.h +++ b/src/fastlock.h @@ -12,6 +12,8 @@ void fastlock_lock(struct fastlock *lock); int fastlock_trylock(struct fastlock *lock, int fWeak); void fastlock_unlock(struct fastlock *lock); void fastlock_free(struct fastlock *lock); +int fastlock_unlock_recursive(struct fastlock *lock); +void fastlock_lock_recursive(struct fastlock *lock, int nesting); uint64_t fastlock_getlongwaitcount(); // this is a global value @@ -65,6 +67,16 @@ struct fastlock fastlock_unlock(this); } + int unlock_recursive() + { + return fastlock_unlock_recursive(this); + } + + void lock_recursive(int nesting) + { + fastlock_lock_recursive(this, nesting); + } + bool fOwnLock(); // true if this thread owns the lock, NOTE: not 100% reliable, use for debugging only #endif }; diff --git a/src/networking.cpp b/src/networking.cpp index b9b684407..36b2aa9de 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1363,9 +1363,9 @@ void freeClientAsync(client *c) { * are in the context of the main thread while the other threads are * idle. */ if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; - AeLocker lock; - lock.arm(nullptr); std::lock_guardlock)> clientlock(c->lock); + AeLocker lock; + lock.arm(c); c->flags |= CLIENT_CLOSE_ASAP; listAddNodeTail(g_pserver->clients_to_close,c); } @@ -2059,10 +2059,10 @@ void processInputBufferAndReplicate(client *c) { if (applied) { if (!g_pserver->fActiveReplica) { - aeAcquireLock(); + AeLocker ae; + ae.arm(c); replicationFeedSlavesFromMasterStream(g_pserver->slaves, c->pending_querybuf, applied); - aeReleaseLock(); } sdsrange(c->pending_querybuf,applied,-1); } diff --git a/src/scripting.cpp b/src/scripting.cpp index 15f41745e..1548044e2 100644 --- a/src/scripting.cpp +++ b/src/scripting.cpp @@ -371,7 +371,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { int acl_retval = 0; int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS; struct redisCommand *cmd; - client *c = g_pserver->lua_client; + client *c = serverTL->lua_client; sds reply; // Ensure our client is on the right thread @@ -966,8 +966,11 @@ void scriptingInit(int setup) { lua_State *lua = lua_open(); if (setup) { - g_pserver->lua_client = NULL; - g_pserver->lua_caller = NULL; + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + g_pserver->rgthreadvar[iel].lua_client = createClient(-1, iel); + g_pserver->rgthreadvar[iel].lua_client->flags |= CLIENT_LUA; + } g_pserver->lua_timedout = 0; ldbInit(); } @@ -1117,15 +1120,6 @@ void scriptingInit(int setup) { lua_pcall(lua,0,0,0); } - /* Create the (non connected) client that we use to execute Redis commands - * inside the Lua interpreter. - * Note: there is no need to create it again when this function is called - * by scriptingReset(). */ - if (g_pserver->lua_client == NULL) { - g_pserver->lua_client = createClient(-1, IDX_EVENT_LOOP_MAIN); - g_pserver->lua_client->flags |= CLIENT_LUA; - } - /* Lua beginners often don't use "local", this is likely to introduce * subtle bugs in their code. To prevent problems we protect accesses * to global variables. */ @@ -1272,7 +1266,7 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body) { * so that we can replicate / write in the AOF all the * EVALSHA commands as EVAL using the original script. */ int retval = dictAdd(g_pserver->lua_scripts,sha,body); - serverAssertWithInfo(c ? c : g_pserver->lua_client,NULL,retval == DICT_OK); + serverAssertWithInfo(c ? c : serverTL->lua_client,NULL,retval == DICT_OK); g_pserver->lua_scripts_mem += sdsZmallocSize(sha) + getStringObjectSdsUsedMemory(body); incrRefCount(body); return sha; @@ -1393,7 +1387,7 @@ void evalGenericCommand(client *c, int evalsha) { luaSetGlobalArray(lua,"ARGV",c->argv+3+numkeys,c->argc-3-numkeys); /* Select the right DB in the context of the Lua client */ - selectDb(g_pserver->lua_client,c->db->id); + selectDb(serverTL->lua_client,c->db->id); /* Set a hook in order to be able to stop the script execution if it * is running for too much time. diff --git a/src/server.h b/src/server.h index 6b66c9635..824eef04f 100644 --- a/src/server.h +++ b/src/server.h @@ -1153,6 +1153,7 @@ struct redisServerThreadVars { int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a client blocked on a module command needs to be processed. */ + client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */ struct fastlock lockPendingWrite; }; @@ -1498,8 +1499,7 @@ struct redisServer { REDISMODULE_CLUSTER_FLAG_*. */ /* Scripting */ lua_State *lua; /* The Lua interpreter. We use just one for all clients */ - client *lua_client; /* The "fake client" to query Redis from Lua */ - client *lua_caller; /* The client running EVAL right now, or NULL */ + client *lua_caller = nullptr; /* The client running EVAL right now, or NULL */ dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */ mstime_t lua_time_limit; /* Script timeout in milliseconds */