Fix deadlock with client and ae locks

Former-commit-id: 8630339e43c1de1cd723bdfdca8ab5924e2cb8b0
This commit is contained in:
John Sully 2019-07-12 20:46:50 -04:00
parent 03d41318b0
commit 8f1e7d9d5b
6 changed files with 43 additions and 21 deletions

View File

@ -19,11 +19,12 @@ public:
if (!aeTryAcquireLock(true /*fWeak*/)) // avoid locking the client if we can if (!aeTryAcquireLock(true /*fWeak*/)) // avoid locking the client if we can
{ {
bool fOwnClientLock = true; bool fOwnClientLock = true;
int clientNesting = 1;
for (;;) for (;;)
{ {
if (fOwnClientLock) if (fOwnClientLock)
{ {
c->lock.unlock(); clientNesting = c->lock.unlock_recursive();
fOwnClientLock = false; fOwnClientLock = false;
} }
aeAcquireLock(); aeAcquireLock();
@ -36,6 +37,7 @@ public:
break; break;
} }
} }
c->lock.lock_recursive(clientNesting);
} }
m_fArmed = true; m_fArmed = true;

View File

@ -220,3 +220,17 @@ bool fastlock::fOwnLock()
{ {
return gettid() == m_pidOwner; return gettid() == m_pidOwner;
} }
int fastlock_unlock_recursive(struct fastlock *lock)
{
int rval = lock->m_depth;
lock->m_depth = 1;
fastlock_unlock(lock);
return rval;
}
void fastlock_lock_recursive(struct fastlock *lock, int nesting)
{
fastlock_lock(lock);
lock->m_depth = nesting;
}

View File

@ -12,6 +12,8 @@ void fastlock_lock(struct fastlock *lock);
int fastlock_trylock(struct fastlock *lock, int fWeak); int fastlock_trylock(struct fastlock *lock, int fWeak);
void fastlock_unlock(struct fastlock *lock); void fastlock_unlock(struct fastlock *lock);
void fastlock_free(struct fastlock *lock); void fastlock_free(struct fastlock *lock);
int fastlock_unlock_recursive(struct fastlock *lock);
void fastlock_lock_recursive(struct fastlock *lock, int nesting);
uint64_t fastlock_getlongwaitcount(); // this is a global value uint64_t fastlock_getlongwaitcount(); // this is a global value
@ -65,6 +67,16 @@ struct fastlock
fastlock_unlock(this); fastlock_unlock(this);
} }
int unlock_recursive()
{
return fastlock_unlock_recursive(this);
}
void lock_recursive(int nesting)
{
fastlock_lock_recursive(this, nesting);
}
bool fOwnLock(); // true if this thread owns the lock, NOTE: not 100% reliable, use for debugging only bool fOwnLock(); // true if this thread owns the lock, NOTE: not 100% reliable, use for debugging only
#endif #endif
}; };

View File

@ -1363,9 +1363,9 @@ void freeClientAsync(client *c) {
* are in the context of the main thread while the other threads are * are in the context of the main thread while the other threads are
* idle. */ * idle. */
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
AeLocker lock;
lock.arm(nullptr);
std::lock_guard<decltype(c->lock)> clientlock(c->lock); std::lock_guard<decltype(c->lock)> clientlock(c->lock);
AeLocker lock;
lock.arm(c);
c->flags |= CLIENT_CLOSE_ASAP; c->flags |= CLIENT_CLOSE_ASAP;
listAddNodeTail(g_pserver->clients_to_close,c); listAddNodeTail(g_pserver->clients_to_close,c);
} }
@ -2059,10 +2059,10 @@ void processInputBufferAndReplicate(client *c) {
if (applied) { if (applied) {
if (!g_pserver->fActiveReplica) if (!g_pserver->fActiveReplica)
{ {
aeAcquireLock(); AeLocker ae;
ae.arm(c);
replicationFeedSlavesFromMasterStream(g_pserver->slaves, replicationFeedSlavesFromMasterStream(g_pserver->slaves,
c->pending_querybuf, applied); c->pending_querybuf, applied);
aeReleaseLock();
} }
sdsrange(c->pending_querybuf,applied,-1); sdsrange(c->pending_querybuf,applied,-1);
} }

View File

@ -371,7 +371,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
int acl_retval = 0; int acl_retval = 0;
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS; int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
struct redisCommand *cmd; struct redisCommand *cmd;
client *c = g_pserver->lua_client; client *c = serverTL->lua_client;
sds reply; sds reply;
// Ensure our client is on the right thread // Ensure our client is on the right thread
@ -966,8 +966,11 @@ void scriptingInit(int setup) {
lua_State *lua = lua_open(); lua_State *lua = lua_open();
if (setup) { if (setup) {
g_pserver->lua_client = NULL; for (int iel = 0; iel < cserver.cthreads; ++iel)
g_pserver->lua_caller = NULL; {
g_pserver->rgthreadvar[iel].lua_client = createClient(-1, iel);
g_pserver->rgthreadvar[iel].lua_client->flags |= CLIENT_LUA;
}
g_pserver->lua_timedout = 0; g_pserver->lua_timedout = 0;
ldbInit(); ldbInit();
} }
@ -1117,15 +1120,6 @@ void scriptingInit(int setup) {
lua_pcall(lua,0,0,0); lua_pcall(lua,0,0,0);
} }
/* Create the (non connected) client that we use to execute Redis commands
* inside the Lua interpreter.
* Note: there is no need to create it again when this function is called
* by scriptingReset(). */
if (g_pserver->lua_client == NULL) {
g_pserver->lua_client = createClient(-1, IDX_EVENT_LOOP_MAIN);
g_pserver->lua_client->flags |= CLIENT_LUA;
}
/* Lua beginners often don't use "local", this is likely to introduce /* Lua beginners often don't use "local", this is likely to introduce
* subtle bugs in their code. To prevent problems we protect accesses * subtle bugs in their code. To prevent problems we protect accesses
* to global variables. */ * to global variables. */
@ -1272,7 +1266,7 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body) {
* so that we can replicate / write in the AOF all the * so that we can replicate / write in the AOF all the
* EVALSHA commands as EVAL using the original script. */ * EVALSHA commands as EVAL using the original script. */
int retval = dictAdd(g_pserver->lua_scripts,sha,body); int retval = dictAdd(g_pserver->lua_scripts,sha,body);
serverAssertWithInfo(c ? c : g_pserver->lua_client,NULL,retval == DICT_OK); serverAssertWithInfo(c ? c : serverTL->lua_client,NULL,retval == DICT_OK);
g_pserver->lua_scripts_mem += sdsZmallocSize(sha) + getStringObjectSdsUsedMemory(body); g_pserver->lua_scripts_mem += sdsZmallocSize(sha) + getStringObjectSdsUsedMemory(body);
incrRefCount(body); incrRefCount(body);
return sha; return sha;
@ -1393,7 +1387,7 @@ void evalGenericCommand(client *c, int evalsha) {
luaSetGlobalArray(lua,"ARGV",c->argv+3+numkeys,c->argc-3-numkeys); luaSetGlobalArray(lua,"ARGV",c->argv+3+numkeys,c->argc-3-numkeys);
/* Select the right DB in the context of the Lua client */ /* Select the right DB in the context of the Lua client */
selectDb(g_pserver->lua_client,c->db->id); selectDb(serverTL->lua_client,c->db->id);
/* Set a hook in order to be able to stop the script execution if it /* Set a hook in order to be able to stop the script execution if it
* is running for too much time. * is running for too much time.

View File

@ -1153,6 +1153,7 @@ struct redisServerThreadVars {
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
client blocked on a module command needs client blocked on a module command needs
to be processed. */ to be processed. */
client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */
struct fastlock lockPendingWrite; struct fastlock lockPendingWrite;
}; };
@ -1498,8 +1499,7 @@ struct redisServer {
REDISMODULE_CLUSTER_FLAG_*. */ REDISMODULE_CLUSTER_FLAG_*. */
/* Scripting */ /* Scripting */
lua_State *lua; /* The Lua interpreter. We use just one for all clients */ lua_State *lua; /* The Lua interpreter. We use just one for all clients */
client *lua_client; /* The "fake client" to query Redis from Lua */ client *lua_caller = nullptr; /* The client running EVAL right now, or NULL */
client *lua_caller; /* The client running EVAL right now, or NULL */
dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */ unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */
mstime_t lua_time_limit; /* Script timeout in milliseconds */ mstime_t lua_time_limit; /* Script timeout in milliseconds */