diff --git a/src/aelocker.h b/src/aelocker.h index d5c8186bf..eca15f491 100644 --- a/src/aelocker.h +++ b/src/aelocker.h @@ -61,6 +61,11 @@ public: return m_fArmed; } + void release() + { + m_fArmed = false; + } + ~AeLocker() { if (m_fArmed) diff --git a/src/db.cpp b/src/db.cpp index ca42453e8..b4ac46a2a 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -30,6 +30,7 @@ #include "server.h" #include "cluster.h" #include "atomicvar.h" +#include "aelocker.h" #include #include @@ -642,6 +643,8 @@ void keysCommand(client *c) { unsigned long numkeys = 0; void *replylen = addReplyDeferredLen(c); + aeReleaseLock(); + di = dictGetSafeIterator(c->db->pdict); allkeys = (pattern[0] == '*' && pattern[1] == '\0'); while((de = dictNext(di)) != NULL) { @@ -659,6 +662,12 @@ void keysCommand(client *c) { } dictReleaseIterator(di); setDeferredArrayLen(c,replylen,numkeys); + + fastlock_unlock(&c->db->lock); // we must release the DB lock before acquiring the AE lock to prevent deadlocks + AeLocker lock; + lock.arm(c); + fastlock_lock(&c->db->lock); // we still need the DB lock + lock.release(); } /* This callback is used by scanGenericCommand in order to collect elements @@ -1132,7 +1141,8 @@ int dbSwapDatabases(int id1, int id2) { if (id1 < 0 || id1 >= cserver.dbnum || id2 < 0 || id2 >= cserver.dbnum) return C_ERR; if (id1 == id2) return C_OK; - redisDb aux = g_pserver->db[id1]; + redisDb aux; + memcpy(&aux, &g_pserver->db[id1], sizeof(redisDb)); redisDb *db1 = &g_pserver->db[id1], *db2 = &g_pserver->db[id2]; /* Swap hash tables. Note that we don't swap blocking_keys, diff --git a/src/networking.cpp b/src/networking.cpp index 15973fe1b..0588745b2 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -247,7 +247,7 @@ void clientInstallAsyncWriteHandler(client *c) { * data should be appended to the output buffers. */ int prepareClientToWrite(client *c, bool fAsync) { fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread - serverAssert(!fAsync || GlobalLocksAcquired()); + serverAssert(FCorrectThread(c) || fAsync); serverAssert(c->fd <= 0 || c->lock.fOwnLock()); if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer. diff --git a/src/server.cpp b/src/server.cpp index 9866117aa..74c18df24 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -59,6 +59,7 @@ #include #include #include +#include #include "aelocker.h" int g_fTestMode = false; @@ -2922,6 +2923,7 @@ void initServer(void) { /* Create the Redis databases, and initialize other internal state. */ for (int j = 0; j < cserver.dbnum; j++) { + new (&g_pserver->db[j]) redisDb; g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL); g_pserver->db[j].setexpire = new(MALLOC_LOCAL) expireset(); g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); @@ -3696,6 +3698,7 @@ int processCommand(client *c, int callFlags) { queueMultiCommand(c); addReply(c,shared.queued); } else { + std::unique_lockdb->lock)> ulock(c->db->lock); call(c,callFlags); c->woff = g_pserver->master_repl_offset; if (listLength(g_pserver->ready_keys)) @@ -4097,10 +4100,12 @@ sds genRedisInfoString(const char *section) { "connected_clients:%lu\r\n" "client_recent_max_input_buffer:%zu\r\n" "client_recent_max_output_buffer:%zu\r\n" - "blocked_clients:%d\r\n", + "blocked_clients:%d\r\n" + "current_client_thread:%d\r\n", listLength(g_pserver->clients)-listLength(g_pserver->slaves), maxin, maxout, - g_pserver->blocked_clients); + g_pserver->blocked_clients, + static_cast(serverTL - g_pserver->rgthreadvar)); for (int ithread = 0; ithread < cserver.cthreads; ++ithread) { info = sdscatprintf(info, diff --git a/src/server.h b/src/server.h index 62f803761..94e679dc2 100644 --- a/src/server.h +++ b/src/server.h @@ -1014,6 +1014,9 @@ typedef struct clientReplyBlock { * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { + redisDb() + : expireitr(nullptr) + {}; dict *pdict; /* The keyspace for this DB */ expireset *setexpire; expireset::setiter expireitr; @@ -1025,6 +1028,8 @@ typedef struct redisDb { long long last_expire_set; /* when the last expire was set */ double avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ + + fastlock lock; } redisDb; /* Client MULTI/EXEC state */