modify how the database array is stored so swaps are easier

Former-commit-id: 48fd6ec5ca69dd505d0e757862c09a898c19cf22
This commit is contained in:
John Sully 2019-12-08 16:49:59 -05:00
parent 86cd47fe80
commit 992784c21d
11 changed files with 75 additions and 71 deletions

View File

@ -1306,7 +1306,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
for (j = 0; j < cserver.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = g_pserver->db+j;
redisDb *db = g_pserver->db[j];
if (db->size() == 0) continue;
/* SELECT the new DB */
@ -1415,7 +1415,7 @@ int rewriteAppendOnlyFile(char *filename) {
std::vector<const redisDbPersistentDataSnapshot*> vecpdb;
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
vecpdb.push_back(&g_pserver->db[idb]);
vecpdb.push_back(g_pserver->db[idb]);
}
if (rdbSaveRio(&aof,vecpdb.data(),&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;

View File

@ -3936,7 +3936,7 @@ int verifyClusterConfigWithData(void) {
/* Make sure we only have keys in DB0. */
for (j = 1; j < cserver.dbnum; j++) {
if (g_pserver->db[j].size()) return C_ERR;
if (g_pserver->db[j]->size()) return C_ERR;
}
/* Check that all the slots we see populated memory have a corresponding
@ -4331,7 +4331,7 @@ NULL
clusterReplyMultiBulkSlots(c);
} else if (!strcasecmp(szFromObj(c->argv[1]),"flushslots") && c->argc == 2) {
/* CLUSTER FLUSHSLOTS */
if (g_pserver->db[0].size() != 0) {
if (g_pserver->db[0]->size() != 0) {
addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
return;
}
@ -4666,7 +4666,7 @@ NULL
* slots nor keys to accept to replicate some other node.
* Slaves can switch to another master without issues. */
if (nodeIsMaster(myself) &&
(myself->numslots != 0 || g_pserver->db[0].size() != 0)) {
(myself->numslots != 0 || g_pserver->db[0]->size() != 0)) {
addReplyError(c,
"To set a master the node must be empty and "
"without assigned slots.");
@ -5650,7 +5650,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
/* Migarting / Improrting slot? Count keys we don't have. */
if ((migrating_slot || importing_slot) &&
lookupKeyRead(&g_pserver->db[0],thiskey) == nullptr)
lookupKeyRead(g_pserver->db[0],thiskey) == nullptr)
{
missing_keys++;
}

View File

@ -481,7 +481,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
}
for (int j = startdb; j <= enddb; j++) {
removed += g_pserver->db[j].clear(!!async, callback);
removed += g_pserver->db[j]->clear(!!async, callback);
}
if (g_pserver->cluster_enabled) {
if (async) {
@ -497,7 +497,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
int selectDb(client *c, int id) {
if (id < 0 || id >= cserver.dbnum)
return C_ERR;
c->db = &g_pserver->db[id];
c->db = g_pserver->db[id];
return C_OK;
}
@ -1237,21 +1237,14 @@ int dbSwapDatabases(int id1, int id2) {
if (id1 < 0 || id1 >= cserver.dbnum ||
id2 < 0 || id2 >= cserver.dbnum) return C_ERR;
if (id1 == id2) return C_OK;
redisDb aux;
memcpy(&aux, &g_pserver->db[id1], sizeof(redisDb));
redisDb *db1 = &g_pserver->db[id1], *db2 = &g_pserver->db[id2];
std::swap(g_pserver->db[id1], g_pserver->db[id2]);
/* Swap hash tables. Note that we don't swap blocking_keys,
/* Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since we want clients to
* remain in the same DB they were. */
redisDbPersistentData::swap(db1, db2);
db1->avg_ttl = db2->avg_ttl;
db1->last_expire_set = db2->last_expire_set;
db1->expireitr = db2->expireitr;
db2->avg_ttl = aux.avg_ttl;
db2->last_expire_set = aux.last_expire_set;
db2->expireitr = aux.expireitr;
* remain in the same DB they were. so put them back */
std::swap(g_pserver->db[id1]->blocking_keys, g_pserver->db[id2]->blocking_keys);
std::swap(g_pserver->db[id2]->ready_keys, g_pserver->db[id2]->ready_keys);
std::swap(g_pserver->db[id2]->watched_keys, g_pserver->db[id2]->watched_keys);
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
@ -1262,8 +1255,8 @@ int dbSwapDatabases(int id1, int id2) {
* in dbAdd() when a list is created. So here we need to rescan
* the list of clients blocked on lists and signal lists as ready
* if needed. */
scanDatabaseForReadyLists(db1);
scanDatabaseForReadyLists(db2);
scanDatabaseForReadyLists(g_pserver->db[id1]);
scanDatabaseForReadyLists(g_pserver->db[id2]);
return C_OK;
}
@ -1881,7 +1874,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
raxNext(&iter);
robj *key = createStringObject((char*)iter.key+2,iter.key_len-2);
dbDelete(&g_pserver->db[0],key);
dbDelete(g_pserver->db[0],key);
decrRefCount(key);
j++;
}
@ -1925,8 +1918,8 @@ void redisDb::initialize(int id)
this->avg_ttl = 0;
this->last_expire_set = 0;
this->defrag_later = listCreate();
if (id == 0)
this->setStorageProvider(create_rocksdb_storage("/tmp/rocks.db"));
//if (id == 0)
// this->setStorageProvider(create_rocksdb_storage("/tmp/rocks.db"));
}
bool redisDbPersistentData::insert(char *key, robj *o)

View File

@ -271,7 +271,7 @@ void computeDatasetDigest(unsigned char *final) {
memset(final,0,20); /* Start with a clean result */
for (j = 0; j < cserver.dbnum; j++) {
redisDb *db = g_pserver->db+j;
redisDb *db = g_pserver->db[j];
if (db->size() == 0) continue;
@ -632,11 +632,11 @@ NULL
}
stats = sdscatprintf(stats,"[Dictionary HT]\n");
g_pserver->db[dbid].getStats(buf,sizeof(buf));
g_pserver->db[dbid]->getStats(buf,sizeof(buf));
stats = sdscat(stats,buf);
stats = sdscatprintf(stats,"[Expires set]\n");
g_pserver->db[dbid].getExpireStats(buf, sizeof(buf));
g_pserver->db[dbid]->getExpireStats(buf, sizeof(buf));
stats = sdscat(stats, buf);
addReplyBulkSds(c,stats);

View File

@ -1102,7 +1102,7 @@ void activeDefragCycle(void) {
start_stat = g_pserver->stat_active_defrag_hits;
}
db = &g_pserver->db[current_db];
db = g_pserver->db[current_db];
cursor = 0;
}

View File

@ -502,7 +502,7 @@ int freeMemoryIfNeeded(void) {
* so to start populate the eviction pool sampling keys from
* every DB. */
for (i = 0; i < cserver.dbnum; i++) {
db = g_pserver->db+i;
db = g_pserver->db[i];
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS)
{
if ((keys = db->size()) != 0) {
@ -526,7 +526,7 @@ int freeMemoryIfNeeded(void) {
bestdbid = pool[k].dbid;
sds key = nullptr;
auto itr = g_pserver->db[pool[k].dbid].find(pool[k].key);
auto itr = g_pserver->db[pool[k].dbid]->find(pool[k].key);
if (itr != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || itr.val()->FExpires()))
key = itr.key();
@ -557,7 +557,7 @@ int freeMemoryIfNeeded(void) {
* incrementally visit all DBs. */
for (i = 0; i < cserver.dbnum; i++) {
j = (++next_db) % cserver.dbnum;
db = g_pserver->db+j;
db = g_pserver->db[j];
if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
{
if (db->size() != 0) {
@ -581,7 +581,7 @@ int freeMemoryIfNeeded(void) {
/* Finally remove the selected key. */
if (bestkey) {
db = g_pserver->db+bestdbid;
db = g_pserver->db[bestdbid];
if (db->FStorageProvider())
{

View File

@ -283,7 +283,7 @@ void activeExpireCycle(int type) {
long total_expired = 0;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
redisDb *db = g_pserver->db+(current_db % cserver.dbnum);
redisDb *db = g_pserver->db[(current_db % cserver.dbnum)];
/* Increment the DB now so we are sure if we run out of time
* in the current DB we'll restart from the next. This allows to
@ -402,7 +402,7 @@ void expireSlaveKeys(void) {
int dbid = 0;
while(dbids && dbid < cserver.dbnum) {
if ((dbids & 1) != 0) {
redisDb *db = g_pserver->db+dbid;
redisDb *db = g_pserver->db[dbid];
// the expire is hashed based on the key pointer, so we need the point in the main db
auto itrDB = db->find(keyname);
@ -414,7 +414,7 @@ void expireSlaveKeys(void) {
if (itrExpire != db->setexpire()->end())
{
if (itrExpire->when() < start) {
activeExpireCycleExpire(g_pserver->db+dbid,*itrExpire,start);
activeExpireCycleExpire(g_pserver->db[dbid],*itrExpire,start);
expired = 1;
}
}

View File

@ -1081,7 +1081,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mem_total+=mem;
for (j = 0; j < cserver.dbnum; j++) {
redisDb *db = g_pserver->db+j;
redisDb *db = g_pserver->db[j];
long long keyscount = db->size();
if (keyscount==0) continue;

View File

@ -1281,7 +1281,7 @@ int rdbSave(const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi)
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
vecdb.push_back(&g_pserver->db[idb]);
vecdb.push_back(g_pserver->db[idb]);
}
rgpdb = vecdb.data();
}
@ -1390,7 +1390,7 @@ void *rdbSaveThread(void *vargs)
if (!g_pserver->rdbThreadVars.fRdbThreadCancel)
aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].endSnapshot(args->rgpdb[idb]);
g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]);
if (!g_pserver->rdbThreadVars.fRdbThreadCancel)
aeReleaseLock();
zfree(args);
@ -1409,13 +1409,13 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
args->rsi.master_repl_offset = g_pserver->master_repl_offset;
for (int idb = 0; idb < cserver.dbnum; ++idb)
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(getMvccTstamp(), false /* fOptional */);
args->rgpdb[idb] = g_pserver->db[idb]->createSnapshot(getMvccTstamp(), false /* fOptional */);
g_pserver->rdbThreadVars.tmpfileNum++;
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
if (pthread_create(&child, NULL, rdbSaveThread, args)) {
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].endSnapshot(args->rgpdb[idb]);
g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]);
zfree(args);
return C_ERR;
}
@ -1981,9 +1981,9 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
uint64_t dbid;
uint64_t dbid = 0;
int type, rdbver;
redisDb *db = g_pserver->db+0;
redisDb *db = g_pserver->db[dbid];
char buf[1024];
/* Key-specific attributes, set by opcodes before the key type. */
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
@ -2055,7 +2055,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
"databases. Exiting\n", cserver.dbnum);
exit(1);
}
db = g_pserver->db+dbid;
db = g_pserver->db[dbid];
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently
@ -2193,7 +2193,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
if (fStaleMvccKey && !fExpiredKey && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, key) == nullptr) {
// We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us
rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(key);
rsi->mi->staleKeyMap->operator[](dbid).push_back(key);
}
decrRefCount(key);
key = nullptr;
@ -2533,7 +2533,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs)
if (!g_pserver->rdbThreadVars.fRdbThreadCancel)
aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].endSnapshot(args->rgpdb[idb]);
g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]);
if (!g_pserver->rdbThreadVars.fRdbThreadCancel)
aeReleaseLock();
zfree(args->clientids);
@ -2598,7 +2598,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
start = ustime();
for (int idb = 0; idb < cserver.dbnum; ++idb)
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(getMvccTstamp(), false /*fOptional*/);
args->rgpdb[idb] = g_pserver->db[idb]->createSnapshot(getMvccTstamp(), false /*fOptional*/);
g_pserver->rdbThreadVars.tmpfileNum++;
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
@ -2625,7 +2625,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
close(pipefds[1]);
closeChildInfoPipe();
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].endSnapshot(args->rgpdb[idb]);
g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]);
zfree(args->clientids);
zfree(args->fds);
zfree(args);

View File

@ -1456,7 +1456,7 @@ int htNeedsResize(dict *dict) {
/* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
* we resize the hash table to save memory */
void tryResizeHashTables(int dbid) {
g_pserver->db[dbid].tryResize();
g_pserver->db[dbid]->tryResize();
}
/* Our hash table implementation performs rehashing incrementally while
@ -1753,7 +1753,7 @@ void databasesCron(void) {
/* Rehash */
if (g_pserver->activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = g_pserver->db[rehash_db].incrementallyRehash();
int work_done = g_pserver->db[rehash_db]->incrementallyRehash();
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
@ -1915,9 +1915,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
for (j = 0; j < cserver.dbnum; j++) {
long long size, used, vkeys;
size = g_pserver->db[j].slots();
used = g_pserver->db[j].size();
vkeys = g_pserver->db[j].expireSize();
size = g_pserver->db[j]->slots();
used = g_pserver->db[j]->size();
vkeys = g_pserver->db[j]->expireSize();
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(g_pserver->dict); */
@ -2090,7 +2090,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
g_pserver->asyncworkqueue->AddWorkFunction([]{
g_pserver->db[0].consolidate_snapshot();
g_pserver->db[0]->consolidate_snapshot();
}, true /*HiPri*/);
g_pserver->cronloops++;
@ -2177,7 +2177,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
static thread_local bool fFirstRun = true;
if (!fFirstRun) {
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].processChanges();
g_pserver->db[idb]->processChanges();
}
else {
fFirstRun = false;
@ -2220,7 +2220,7 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
static thread_local bool fFirstRun = true;
if (!fFirstRun) {
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].processChanges();
g_pserver->db[idb]->processChanges();
}
else {
fFirstRun = false;
@ -2257,7 +2257,7 @@ void afterSleep(struct aeEventLoop *eventLoop) {
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].trackChanges();
g_pserver->db[idb]->trackChanges();
aeReleaseLock();
}
@ -2598,6 +2598,10 @@ void initServerConfig(void) {
/* Multithreading */
cserver.cthreads = CONFIG_DEFAULT_THREADS;
cserver.fThreadAffinity = CONFIG_DEFAULT_THREAD_AFFINITY;
// This will get dereferenced before the second stage init where we have the true db count
// so make sure its zero and initialized
g_pserver->db = (redisDb**)zcalloc(sizeof(redisDb*)*cserver.dbnum, MALLOC_LOCAL);
}
extern char **environ;
@ -3002,12 +3006,13 @@ void initServer(void) {
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
g_pserver->db = (redisDb*)zmalloc(sizeof(redisDb)*cserver.dbnum, MALLOC_LOCAL);
zfree(g_pserver->db); // initServerConfig created a dummy array, free that now
g_pserver->db = (redisDb**)zmalloc(sizeof(redisDb*)*cserver.dbnum, MALLOC_LOCAL);
/* Create the Redis databases, and initialize other internal state. */
for (int j = 0; j < cserver.dbnum; j++) {
new (&g_pserver->db[j]) redisDb;
g_pserver->db[j].initialize(j);
g_pserver->db[j] = new (MALLOC_LOCAL) redisDb();
g_pserver->db[j]->initialize(j);
}
/* Fixup Master Client Database */
@ -3872,6 +3877,12 @@ int prepareForShutdown(int flags) {
/* Close the listening sockets. Apparently this allows faster restarts. */
closeListeningSockets(1);
/* free our databases */
for (int idb = 0; idb < cserver.dbnum; ++idb) {
delete g_pserver->db[idb];
g_pserver->db[idb] = nullptr;
}
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
g_pserver->sentinel_mode ? "Sentinel" : "KeyDB");
return C_OK;
@ -4653,19 +4664,19 @@ sds genRedisInfoString(const char *section) {
for (j = 0; j < cserver.dbnum; j++) {
long long keys, vkeys;
keys = g_pserver->db[j].size();
vkeys = g_pserver->db[j].expireSize();
keys = g_pserver->db[j]->size();
vkeys = g_pserver->db[j]->expireSize();
// Adjust TTL by the current time
g_pserver->db[j].avg_ttl -= (g_pserver->mstime - g_pserver->db[j].last_expire_set);
if (g_pserver->db[j].avg_ttl < 0)
g_pserver->db[j].avg_ttl = 0;
g_pserver->db[j].last_expire_set = g_pserver->mstime;
g_pserver->db[j]->avg_ttl -= (g_pserver->mstime - g_pserver->db[j]->last_expire_set);
if (g_pserver->db[j]->avg_ttl < 0)
g_pserver->db[j]->avg_ttl = 0;
g_pserver->db[j]->last_expire_set = g_pserver->mstime;
if (keys || vkeys) {
info = sdscatprintf(info,
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
j, keys, vkeys, static_cast<long long>(g_pserver->db[j].avg_ttl));
j, keys, vkeys, static_cast<long long>(g_pserver->db[j]->avg_ttl));
}
}
}

View File

@ -1263,7 +1263,7 @@ public:
void trackkey(const char *key)
{
if (m_fTrackingChanges && !m_fAllChanged)
if (m_fTrackingChanges && !m_fAllChanged && m_spstorage)
m_vecchanged.push_back(unique_sds_ptr(sdsdup(key)));
}
@ -1933,7 +1933,7 @@ struct redisServerConst {
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */
int dbnum = 0; /* Total number of configured DBs */
int supervised; /* 1 if supervised, 0 otherwise. */
int supervised_mode; /* See SUPERVISED_* */
int daemonize; /* True if running as a daemon */
@ -1952,7 +1952,7 @@ struct redisServer {
the actual 'hz' field value if dynamic-hz
is enabled. */
std::atomic<int> hz; /* serverCron() calls frequency in hertz */
redisDb *db;
redisDb **db = nullptr;
dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */