KEYS now only blocks one database
Former-commit-id: 18d42a5c353f76533a0ccc4ded24ed089cedacc8
This commit is contained in:
parent
ab48b2bfed
commit
2912b22403
@ -61,6 +61,11 @@ public:
|
|||||||
return m_fArmed;
|
return m_fArmed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void release()
|
||||||
|
{
|
||||||
|
m_fArmed = false;
|
||||||
|
}
|
||||||
|
|
||||||
~AeLocker()
|
~AeLocker()
|
||||||
{
|
{
|
||||||
if (m_fArmed)
|
if (m_fArmed)
|
||||||
|
12
src/db.cpp
12
src/db.cpp
@ -30,6 +30,7 @@
|
|||||||
#include "server.h"
|
#include "server.h"
|
||||||
#include "cluster.h"
|
#include "cluster.h"
|
||||||
#include "atomicvar.h"
|
#include "atomicvar.h"
|
||||||
|
#include "aelocker.h"
|
||||||
|
|
||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
#include <ctype.h>
|
#include <ctype.h>
|
||||||
@ -642,6 +643,8 @@ void keysCommand(client *c) {
|
|||||||
unsigned long numkeys = 0;
|
unsigned long numkeys = 0;
|
||||||
void *replylen = addReplyDeferredLen(c);
|
void *replylen = addReplyDeferredLen(c);
|
||||||
|
|
||||||
|
aeReleaseLock();
|
||||||
|
|
||||||
di = dictGetSafeIterator(c->db->pdict);
|
di = dictGetSafeIterator(c->db->pdict);
|
||||||
allkeys = (pattern[0] == '*' && pattern[1] == '\0');
|
allkeys = (pattern[0] == '*' && pattern[1] == '\0');
|
||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
@ -659,6 +662,12 @@ void keysCommand(client *c) {
|
|||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
setDeferredArrayLen(c,replylen,numkeys);
|
setDeferredArrayLen(c,replylen,numkeys);
|
||||||
|
|
||||||
|
fastlock_unlock(&c->db->lock); // we must release the DB lock before acquiring the AE lock to prevent deadlocks
|
||||||
|
AeLocker lock;
|
||||||
|
lock.arm(c);
|
||||||
|
fastlock_lock(&c->db->lock); // we still need the DB lock
|
||||||
|
lock.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This callback is used by scanGenericCommand in order to collect elements
|
/* This callback is used by scanGenericCommand in order to collect elements
|
||||||
@ -1132,7 +1141,8 @@ int dbSwapDatabases(int id1, int id2) {
|
|||||||
if (id1 < 0 || id1 >= cserver.dbnum ||
|
if (id1 < 0 || id1 >= cserver.dbnum ||
|
||||||
id2 < 0 || id2 >= cserver.dbnum) return C_ERR;
|
id2 < 0 || id2 >= cserver.dbnum) return C_ERR;
|
||||||
if (id1 == id2) return C_OK;
|
if (id1 == id2) return C_OK;
|
||||||
redisDb aux = g_pserver->db[id1];
|
redisDb aux;
|
||||||
|
memcpy(&aux, &g_pserver->db[id1], sizeof(redisDb));
|
||||||
redisDb *db1 = &g_pserver->db[id1], *db2 = &g_pserver->db[id2];
|
redisDb *db1 = &g_pserver->db[id1], *db2 = &g_pserver->db[id2];
|
||||||
|
|
||||||
/* Swap hash tables. Note that we don't swap blocking_keys,
|
/* Swap hash tables. Note that we don't swap blocking_keys,
|
||||||
|
@ -247,7 +247,7 @@ void clientInstallAsyncWriteHandler(client *c) {
|
|||||||
* data should be appended to the output buffers. */
|
* data should be appended to the output buffers. */
|
||||||
int prepareClientToWrite(client *c, bool fAsync) {
|
int prepareClientToWrite(client *c, bool fAsync) {
|
||||||
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
||||||
serverAssert(!fAsync || GlobalLocksAcquired());
|
serverAssert(FCorrectThread(c) || fAsync);
|
||||||
serverAssert(c->fd <= 0 || c->lock.fOwnLock());
|
serverAssert(c->fd <= 0 || c->lock.fOwnLock());
|
||||||
|
|
||||||
if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer.
|
if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer.
|
||||||
|
@ -59,6 +59,7 @@
|
|||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <uuid/uuid.h>
|
#include <uuid/uuid.h>
|
||||||
|
#include <mutex>
|
||||||
#include "aelocker.h"
|
#include "aelocker.h"
|
||||||
|
|
||||||
int g_fTestMode = false;
|
int g_fTestMode = false;
|
||||||
@ -2922,6 +2923,7 @@ void initServer(void) {
|
|||||||
|
|
||||||
/* Create the Redis databases, and initialize other internal state. */
|
/* Create the Redis databases, and initialize other internal state. */
|
||||||
for (int j = 0; j < cserver.dbnum; j++) {
|
for (int j = 0; j < cserver.dbnum; j++) {
|
||||||
|
new (&g_pserver->db[j]) redisDb;
|
||||||
g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL);
|
g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL);
|
||||||
g_pserver->db[j].setexpire = new(MALLOC_LOCAL) expireset();
|
g_pserver->db[j].setexpire = new(MALLOC_LOCAL) expireset();
|
||||||
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
|
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
|
||||||
@ -3696,6 +3698,7 @@ int processCommand(client *c, int callFlags) {
|
|||||||
queueMultiCommand(c);
|
queueMultiCommand(c);
|
||||||
addReply(c,shared.queued);
|
addReply(c,shared.queued);
|
||||||
} else {
|
} else {
|
||||||
|
std::unique_lock<decltype(c->db->lock)> ulock(c->db->lock);
|
||||||
call(c,callFlags);
|
call(c,callFlags);
|
||||||
c->woff = g_pserver->master_repl_offset;
|
c->woff = g_pserver->master_repl_offset;
|
||||||
if (listLength(g_pserver->ready_keys))
|
if (listLength(g_pserver->ready_keys))
|
||||||
@ -4097,10 +4100,12 @@ sds genRedisInfoString(const char *section) {
|
|||||||
"connected_clients:%lu\r\n"
|
"connected_clients:%lu\r\n"
|
||||||
"client_recent_max_input_buffer:%zu\r\n"
|
"client_recent_max_input_buffer:%zu\r\n"
|
||||||
"client_recent_max_output_buffer:%zu\r\n"
|
"client_recent_max_output_buffer:%zu\r\n"
|
||||||
"blocked_clients:%d\r\n",
|
"blocked_clients:%d\r\n"
|
||||||
|
"current_client_thread:%d\r\n",
|
||||||
listLength(g_pserver->clients)-listLength(g_pserver->slaves),
|
listLength(g_pserver->clients)-listLength(g_pserver->slaves),
|
||||||
maxin, maxout,
|
maxin, maxout,
|
||||||
g_pserver->blocked_clients);
|
g_pserver->blocked_clients,
|
||||||
|
static_cast<int>(serverTL - g_pserver->rgthreadvar));
|
||||||
for (int ithread = 0; ithread < cserver.cthreads; ++ithread)
|
for (int ithread = 0; ithread < cserver.cthreads; ++ithread)
|
||||||
{
|
{
|
||||||
info = sdscatprintf(info,
|
info = sdscatprintf(info,
|
||||||
|
@ -1014,6 +1014,9 @@ typedef struct clientReplyBlock {
|
|||||||
* by integers from 0 (the default database) up to the max configured
|
* by integers from 0 (the default database) up to the max configured
|
||||||
* database. The database number is the 'id' field in the structure. */
|
* database. The database number is the 'id' field in the structure. */
|
||||||
typedef struct redisDb {
|
typedef struct redisDb {
|
||||||
|
redisDb()
|
||||||
|
: expireitr(nullptr)
|
||||||
|
{};
|
||||||
dict *pdict; /* The keyspace for this DB */
|
dict *pdict; /* The keyspace for this DB */
|
||||||
expireset *setexpire;
|
expireset *setexpire;
|
||||||
expireset::setiter expireitr;
|
expireset::setiter expireitr;
|
||||||
@ -1025,6 +1028,8 @@ typedef struct redisDb {
|
|||||||
long long last_expire_set; /* when the last expire was set */
|
long long last_expire_set; /* when the last expire was set */
|
||||||
double avg_ttl; /* Average TTL, just for stats */
|
double avg_ttl; /* Average TTL, just for stats */
|
||||||
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
||||||
|
|
||||||
|
fastlock lock;
|
||||||
} redisDb;
|
} redisDb;
|
||||||
|
|
||||||
/* Client MULTI/EXEC state */
|
/* Client MULTI/EXEC state */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user