Merge 028c7df2f1749246cedeedc06ac9dfd05791ce11 into 26c6f1af9b29d525831c7fa9840ab3e47ed7b700

This commit is contained in:
xbasel 2025-02-02 18:01:31 +02:00 committed by GitHub
commit 31a0cab08d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 194 additions and 125 deletions

View File

@ -2210,8 +2210,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
serverDb *db = server.db + j;
if (kvstoreSize(db->keys) == 0) continue;
if (databaseEmpty(j)) continue;
serverDb *db = server.db[j];
/* SELECT the new DB */
if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr;

View File

@ -815,7 +815,7 @@ static int shouldReturnTlsInfo(void) {
}
unsigned int countKeysInSlot(unsigned int slot) {
return kvstoreHashtableSize(server.db->keys, slot);
return kvstoreHashtableSize(server.db[0]->keys, slot);
}
void clusterCommandHelp(client *c) {
@ -913,7 +913,7 @@ void clusterCommand(client *c) {
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c, numkeys);
kvstoreHashtableIterator *kvs_di = NULL;
kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot, 0);
kvs_di = kvstoreGetHashtableIterator(server.db[0]->keys, slot, 0);
for (unsigned int i = 0; i < numkeys; i++) {
void *next;
serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next));
@ -1102,7 +1102,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
if ((migrating_slot || importing_slot) && !pubsubshard_included) {
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL)
if (lookupKeyReadWithFlags(server.db[0], thiskey, flags) == NULL)
missing_keys++;
else
existing_keys++;

View File

@ -5714,7 +5714,7 @@ int verifyClusterConfigWithData(void) {
/* Make sure we only have keys in DB0. */
for (j = 1; j < server.dbnum; j++) {
if (kvstoreSize(server.db[j].keys)) return C_ERR;
if (!databaseEmpty(j)) return C_ERR;
}
/* Check that all the slots we see populated memory have a corresponding
@ -6350,19 +6350,20 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
kvstoreHashtableIterator *kvs_di = NULL;
void *next;
kvs_di = kvstoreGetHashtableIterator(server.db->keys, hashslot, HASHTABLE_ITER_SAFE);
serverDb *db = server.db[0];
kvs_di = kvstoreGetHashtableIterator(db->keys, hashslot, HASHTABLE_ITER_SAFE);
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
robj *valkey = next;
enterExecutionUnit(1, 0);
sds sdskey = objectGetKey(valkey);
robj *key = createStringObject(sdskey, sdslen(sdskey));
dbDelete(&server.db[0], key);
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
signalModifiedKey(NULL, &server.db[0], key);
dbDelete(db, key);
propagateDeletion(db, key, server.lazyfree_lazy_server_del);
signalModifiedKey(NULL, db, key);
/* The keys are not actually logically deleted from the database, just moved to another node.
* The modules needs to know that these keys are no longer available locally, so just send the
* keyspace notification to the modules, but not to clients. */
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id);
exitExecutionUnit();
postExecutionUnitOperations();
decrRefCount(key);
@ -6830,7 +6831,7 @@ int clusterCommandSpecial(client *c) {
}
} else if (!strcasecmp(c->argv[1]->ptr, "flushslots") && c->argc == 2) {
/* CLUSTER FLUSHSLOTS */
if (kvstoreSize(server.db[0].keys) != 0) {
if (!databaseEmpty(0)) {
addReplyError(c, "DB must be empty to perform CLUSTER FLUSHSLOTS.");
return 1;
}
@ -6971,7 +6972,7 @@ int clusterCommandSpecial(client *c) {
/* If the instance is currently a primary, it should have no assigned
* slots nor keys to accept to replicate some other node.
* Replicas can switch to another primary without issues. */
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || kvstoreSize(server.db[0].keys) != 0)) {
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || !databaseEmpty(0))) {
addReplyError(c, "To set a master the node must be empty and "
"without assigned slots.");
return 1;

View File

@ -571,7 +571,7 @@ robj *dbUnshareStringValue(serverDb *db, robj *key, robj *o) {
* The dbnum can be -1 if all the DBs should be emptied, or the specified
* DB index if we want to empty only a single database.
* The function returns the number of keys removed from the database(s). */
long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callback)(hashtable *)) {
long long emptyDbStructure(serverDb **dbarray, int dbnum, int async, void(callback)(hashtable *)) {
long long removed = 0;
int startdb, enddb;
@ -583,16 +583,17 @@ long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callbac
}
for (int j = startdb; j <= enddb; j++) {
removed += kvstoreSize(dbarray[j].keys);
if (dbarray[j] == NULL) continue;
removed += kvstoreSize(dbarray[j]->keys);
if (async) {
emptyDbAsync(&dbarray[j]);
emptyDbAsync(dbarray[j]);
} else {
kvstoreEmpty(dbarray[j].keys, callback);
kvstoreEmpty(dbarray[j].expires, callback);
kvstoreEmpty(dbarray[j]->keys, callback);
kvstoreEmpty(dbarray[j]->expires, callback);
}
/* Because all keys of database are removed, reset average ttl. */
dbarray[j].avg_ttl = 0;
dbarray[j].expires_cursor = 0;
dbarray[j]->avg_ttl = 0;
dbarray[j]->expires_cursor = 0;
}
return removed;
@ -650,39 +651,36 @@ long long emptyData(int dbnum, int flags, void(callback)(hashtable *)) {
return removed;
}
/* Initialize temporary db on replica for use during diskless replication. */
serverDb *initTempDb(void) {
int slot_count_bits = 0;
int flags = KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND;
if (server.cluster_enabled) {
slot_count_bits = CLUSTER_SLOT_MASK_BITS;
flags |= KVSTORE_FREE_EMPTY_HASHTABLES;
}
serverDb *tempDb = zcalloc(sizeof(serverDb) * server.dbnum);
for (int i = 0; i < server.dbnum; i++) {
tempDb[i].id = i;
tempDb[i].keys = kvstoreCreate(&kvstoreKeysHashtableType, slot_count_bits, flags);
tempDb[i].expires = kvstoreCreate(&kvstoreExpiresHashtableType, slot_count_bits, flags);
}
return tempDb;
}
/* Discard tempDb, it's always async. */
void discardTempDb(serverDb *tempDb) {
void discardTempDb(serverDb **tempDb) {
/* Release temp DBs. */
emptyDbStructure(tempDb, -1, 1, NULL);
for (int i = 0; i < server.dbnum; i++) {
kvstoreRelease(tempDb[i].keys);
kvstoreRelease(tempDb[i].expires);
}
if (tempDb[i]) {
kvstoreRelease(tempDb[i]->keys);
kvstoreRelease(tempDb[i]->expires);
/* These are expected to be empty on temporary databases */
serverAssert(dictSize(tempDb[i]->blocking_keys) == 0);
serverAssert(dictSize(tempDb[i]->blocking_keys_unblock_on_nokey) == 0);
serverAssert(dictSize(tempDb[i]->ready_keys) == 0);
serverAssert(dictSize(tempDb[i]->watched_keys) == 0);
dictRelease(tempDb[i]->blocking_keys);
dictRelease(tempDb[i]->blocking_keys_unblock_on_nokey);
dictRelease(tempDb[i]->ready_keys);
dictRelease(tempDb[i]->watched_keys);
zfree(tempDb[i]);
tempDb[i] = NULL;
}
}
zfree(tempDb);
}
int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum) return C_ERR;
c->db = &server.db[id];
initDatabase(id);
c->db = server.db[id];
return C_OK;
}
@ -690,7 +688,8 @@ long long dbTotalServerKeyCount(void) {
long long total = 0;
int j;
for (j = 0; j < server.dbnum; j++) {
total += kvstoreSize(server.db[j].keys);
if (databaseEmpty(j)) continue;
total += kvstoreSize(server.db[j]->keys);
}
return total;
}
@ -721,8 +720,9 @@ void signalFlushedDb(int dbid, int async) {
}
for (int j = startdb; j <= enddb; j++) {
scanDatabaseForDeletedKeys(&server.db[j], NULL);
touchAllWatchedKeysInDb(&server.db[j], NULL);
if (server.db[j] == NULL) continue;
scanDatabaseForDeletedKeys(server.db[j], NULL);
touchAllWatchedKeysInDb(server.db[j], NULL);
}
trackingInvalidateKeysOnFlush(async);
@ -1641,8 +1641,10 @@ void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with) {
int dbSwapDatabases(int id1, int id2) {
if (id1 < 0 || id1 >= server.dbnum || id2 < 0 || id2 >= server.dbnum) return C_ERR;
if (id1 == id2) return C_OK;
serverDb aux = server.db[id1];
serverDb *db1 = &server.db[id1], *db2 = &server.db[id2];
initDatabase(id1);
initDatabase(id2);
serverDb aux = *server.db[id1];
serverDb *db1 = server.db[id1], *db2 = server.db[id2];
/* Swapdb should make transaction fail if there is any
* client watching keys */
@ -1683,10 +1685,13 @@ int dbSwapDatabases(int id1, int id2) {
/* Logically, this discards (flushes) the old main database, and apply the newly loaded
* database (temp) as the main (active) database, the actual freeing of old database
* (which will now be placed in the temp one) is done later. */
void swapMainDbWithTempDb(serverDb *tempDb) {
void swapMainDbWithTempDb(serverDb **tempDb) {
for (int i = 0; i < server.dbnum; i++) {
serverDb aux = server.db[i];
serverDb *activedb = &server.db[i], *newdb = &tempDb[i];
if (tempDb[i] == NULL && server.db[i] == NULL) continue;
if (tempDb[i] == NULL) tempDb[i] = createDatabase(i);
if (server.db[i] == NULL) server.db[i] = createDatabase(i);
serverDb aux = *server.db[i];
serverDb *activedb = server.db[i], *newdb = tempDb[i];
/* Swapping databases should make transaction fail if there is any
* client watching keys. */

View File

@ -289,8 +289,8 @@ void computeDatasetDigest(unsigned char *final) {
memset(final, 0, 20); /* Start with a clean result */
for (int j = 0; j < server.dbnum; j++) {
serverDb *db = server.db + j;
if (kvstoreSize(db->keys) == 0) continue;
serverDb *db = server.db[j];
if (db == NULL || kvstoreSize(db->keys) == 0) continue;
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES);
/* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */
@ -907,14 +907,20 @@ void debugCommand(client *c) {
if (c->argc >= 4 && !strcasecmp(c->argv[3]->ptr, "full")) full = 1;
stats = sdscatprintf(stats, "[Dictionary HT]\n");
kvstoreGetStats(server.db[dbid].keys, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
serverDb *db = server.db[dbid];
if (db) {
kvstoreGetStats(db->keys, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
}
stats = sdscatprintf(stats, "[Expires HT]\n");
kvstoreGetStats(server.db[dbid].expires, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
if (db) {
kvstoreGetStats(db->expires, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
}
addReplyVerbatim(c, stats, sdslen(stats), "txt");
sdsfree(stats);
} else if (!strcasecmp(c->argv[1]->ptr, "htstats-key") && c->argc >= 3) {
int full = 0;

View File

@ -726,7 +726,7 @@ static void defragModule(serverDb *db, robj *obj) {
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */
static void defragKey(defragKeysCtx *ctx, robj **elemref) {
serverDb *db = &server.db[ctx->dbid];
serverDb *db = server.db[ctx->dbid];
int slot = ctx->kvstate.slot;
robj *newob, *ob;
unsigned char *newzl;
@ -987,7 +987,7 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];
serverDb *db = server.db[dbid];
static defragKeysCtx ctx; // STATIC - this persists
if (endtime == 0) {
@ -1005,7 +1005,7 @@ static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privda
static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];
serverDb *db = server.db[dbid];
return defragStageKvstoreHelper(endtime, db->expires,
scanHashtableCallbackCountScanned, NULL, NULL);
}
@ -1273,6 +1273,7 @@ static void beginDefragCycle(void) {
defrag.remaining_stages = listCreate();
for (int dbid = 0; dbid < server.dbnum; dbid++) {
if (databaseEmpty(dbid)) continue;
addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL);
addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL);
}

View File

@ -568,7 +568,9 @@ int performEvictions(void) {
* so to start populate the eviction pool sampling keys from
* every DB. */
for (i = 0; i < server.dbnum; i++) {
db = server.db + i;
db = server.db[i];
if (db == NULL) continue;
;
kvstore *kvs;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
kvs = db->keys;
@ -601,9 +603,9 @@ int performEvictions(void) {
kvstore *kvs;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
kvs = server.db[bestdbid].keys;
kvs = server.db[bestdbid]->keys;
} else {
kvs = server.db[bestdbid].expires;
kvs = server.db[bestdbid]->expires;
}
void *entry = NULL;
int found = kvstoreHashtableFind(kvs, pool[k].slot, pool[k].key, &entry);
@ -634,7 +636,9 @@ int performEvictions(void) {
* incrementally visit all DBs. */
for (i = 0; i < server.dbnum; i++) {
j = (++next_db) % server.dbnum;
db = server.db + j;
db = server.db[j];
if (db == NULL) continue;
;
kvstore *kvs;
if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) {
kvs = db->keys;
@ -653,7 +657,7 @@ int performEvictions(void) {
/* Finally remove the selected key. */
if (bestkey) {
db = server.db + bestdbid;
db = server.db[bestdbid];
robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate

View File

@ -234,7 +234,7 @@ void activeExpireCycle(int type) {
data.ttl_sum = 0;
data.ttl_samples = 0;
serverDb *db = server.db + (current_db % server.dbnum);
serverDb *db = server.db[(current_db % server.dbnum)];
data.db = db;
int db_done = 0; /* The scan of the current DB is done? */
@ -245,13 +245,17 @@ void activeExpireCycle(int type) {
* distribute the time evenly across DBs. */
current_db++;
if (kvstoreSize(db->expires)) dbs_performed++;
if (db && kvstoreSize(db->expires)) dbs_performed++;
/* Continue to expire if at the end of the cycle there are still
* a big percentage of keys to expire, compared to the number of keys
* we scanned. The percentage, stored in config_cycle_acceptable_stale
* is not fixed, but depends on the configured "expire effort". */
do {
if (db == NULL) {
break; /* DB not allocated since it was never used */
}
unsigned long num;
iteration++;
@ -421,11 +425,11 @@ void expireReplicaKeys(void) {
int dbid = 0;
while (dbids && dbid < server.dbnum) {
if ((dbids & 1) != 0) {
serverDb *db = server.db + dbid;
robj *expire = dbFindExpires(db, keyname);
serverDb *db = server.db[dbid];
robj *expire = db == NULL ? NULL : dbFindExpires(db, keyname);
int expired = 0;
if (expire && activeExpireCycleTryExpire(server.db + dbid, expire, start)) {
if (expire && activeExpireCycleTryExpire(db, expire, start)) {
expired = 1;
/* Propagate the DEL (writable replicas do not propagate anything to other replicas,
* but they might propagate to AOF) and trigger module hooks. */

View File

@ -1357,8 +1357,8 @@ struct serverMemOverhead *getMemoryOverheadData(void) {
mem_total += mh->functions_caches;
for (j = 0; j < server.dbnum; j++) {
serverDb *db = server.db + j;
if (!kvstoreNumAllocatedHashtables(db->keys)) continue;
serverDb *db = server.db[j];
if (db == NULL || !kvstoreNumAllocatedHashtables(db->keys)) continue;
unsigned long long keyscount = kvstoreSize(db->keys);

View File

@ -1331,7 +1331,8 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
static long long info_updated_time = 0;
char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
serverDb *db = server.db + dbid;
serverDb *db = server.db[dbid];
if (db == NULL) return 0;
unsigned long long int db_size = kvstoreSize(db->keys);
if (db_size == 0) return 0;
@ -3021,7 +3022,10 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
int type, rdbver;
uint64_t db_size = 0, expires_size = 0;
int should_expand_db = 0;
serverDb *db = rdb_loading_ctx->dbarray + 0;
if (rdb_loading_ctx->dbarray[0] == NULL) {
rdb_loading_ctx->dbarray[0] = createDatabase(0);
}
serverDb *db = rdb_loading_ctx->dbarray[0];
char buf[1024];
int error;
long long empty_keys_skipped = 0;
@ -3098,7 +3102,10 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
SERVER_TITLE, server.dbnum);
exit(1);
}
db = rdb_loading_ctx->dbarray + dbid;
if (rdb_loading_ctx->dbarray[dbid] == NULL) {
rdb_loading_ctx->dbarray[dbid] = createDatabase(dbid);
}
db = rdb_loading_ctx->dbarray[dbid];
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently

View File

@ -2048,13 +2048,13 @@ static int useDisklessLoad(void) {
/* Helper function for readSyncBulkPayload() to initialize tempDb
* before socket-loading the new db from primary. The tempDb may be populated
* by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */
serverDb *disklessLoadInitTempDb(void) {
return initTempDb();
serverDb **disklessLoadInitTempDb(void) {
return zcalloc(sizeof(serverDb *) * server.dbnum);
}
/* Helper function for readSyncBulkPayload() to discard our tempDb
* when the loading succeeded or failed. */
void disklessLoadDiscardTempDb(serverDb *tempDb) {
void disklessLoadDiscardTempDb(serverDb **tempDb) {
discardTempDb(tempDb);
}
@ -2091,7 +2091,7 @@ void readSyncBulkPayload(connection *conn) {
char buf[PROTO_IOBUF_LEN];
ssize_t nread, readlen, nwritten;
int use_diskless_load = useDisklessLoad();
serverDb *diskless_load_tempDb = NULL;
serverDb **diskless_load_tempDb = NULL;
functionsLibCtx *temp_functions_lib_ctx = NULL;
int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
off_t left;
@ -2287,7 +2287,7 @@ void readSyncBulkPayload(connection *conn) {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
serverDb *dbarray;
serverDb **dbarray;
functionsLibCtx *functions_lib_ctx;
int asyncLoading = 0;
@ -2667,7 +2667,7 @@ int sendCurrentOffsetToReplica(client *replica) {
char buf[128];
int buflen;
buflen = snprintf(buf, sizeof(buf), "$ENDOFF:%lld %s %d %llu\r\n", server.primary_repl_offset, server.replid,
server.db->id, (long long unsigned int)replica->id);
server.db[0]->id, (long long unsigned int)replica->id); // TODO xbasel , why is it index 0 ?
dualChannelServerLog(LL_NOTICE, "Sending to replica %s RDB end offset %lld and client-id %llu",
replicationGetReplicaName(replica), server.primary_repl_offset,
(long long unsigned int)replica->id);

View File

@ -1254,10 +1254,11 @@ void databasesCron(void) {
if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
for (j = 0; j < dbs_per_call; j++) {
serverDb *db = &server.db[resize_db % server.dbnum];
serverDb *db = server.db[resize_db % server.dbnum];
resize_db++;
if (db == NULL) continue;
kvstoreTryResizeHashtables(db->keys, CRON_DICTS_PER_DB);
kvstoreTryResizeHashtables(db->expires, CRON_DICTS_PER_DB);
resize_db++;
}
/* Rehash */
@ -1265,11 +1266,13 @@ void databasesCron(void) {
uint64_t elapsed_us = 0;
uint64_t threshold_us = 1 * 1000000 / server.hz / 100;
for (j = 0; j < dbs_per_call; j++) {
serverDb *db = &server.db[rehash_db % server.dbnum];
elapsed_us += kvstoreIncrementallyRehash(db->keys, threshold_us - elapsed_us);
if (elapsed_us >= threshold_us) break;
elapsed_us += kvstoreIncrementallyRehash(db->expires, threshold_us - elapsed_us);
if (elapsed_us >= threshold_us) break;
serverDb *db = server.db[rehash_db % server.dbnum];
if (db != NULL) {
elapsed_us += kvstoreIncrementallyRehash(db->keys, threshold_us - elapsed_us);
if (elapsed_us >= threshold_us) break;
elapsed_us += kvstoreIncrementallyRehash(db->expires, threshold_us - elapsed_us);
if (elapsed_us >= threshold_us) break;
}
rehash_db++;
}
}
@ -1514,11 +1517,13 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa
if (server.verbosity <= LL_VERBOSE) {
run_with_period(5000) {
for (j = 0; j < server.dbnum; j++) {
serverDb *db = server.db[j];
if (db == NULL) continue;
long long size, used, vkeys;
size = kvstoreBuckets(server.db[j].keys);
used = kvstoreSize(server.db[j].keys);
vkeys = kvstoreSize(server.db[j].expires);
size = kvstoreBuckets(db->keys);
used = kvstoreSize(db->keys);
vkeys = kvstoreSize(db->expires);
if (used || vkeys) {
serverLog(LL_VERBOSE, "DB %d: %lld keys (%lld volatile) in %lld slots HT.", j, used, vkeys, size);
}
@ -2716,9 +2721,55 @@ void makeThreadKillable(void) {
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
}
void initServer(void) {
int j;
/* Return non-zero if the database is empty */
int databaseEmpty(int id) {
return id < 0 || id >= server.dbnum || !server.db[id] || kvstoreSize(server.db[id]->keys) == 0;
}
// /* Initialize temporary db on replica for use during diskless replication. */
// serverDb *initTempDb(int id) {
// int slot_count_bits = 0;
// int flags = KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND;
// if (server.cluster_enabled) {
// slot_count_bits = CLUSTER_SLOT_MASK_BITS;
// flags |= KVSTORE_FREE_EMPTY_HASHTABLES;
// }
// serverDb *tempDb = zcalloc(sizeof(serverDb) * server.dbnum);
//
// tempDb->id = id;
// tempDb->keys = kvstoreCreate(&kvstoreKeysHashtableType, slot_count_bits, flags);
// tempDb->expires = kvstoreCreate(&kvstoreExpiresHashtableType, slot_count_bits, flags);
//
// return tempDb;
// }
serverDb *createDatabase(int id) {
int slot_count_bits = 0;
int flags = KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND;
if (server.cluster_enabled) {
flags |= KVSTORE_FREE_EMPTY_HASHTABLES;
slot_count_bits = CLUSTER_SLOT_MASK_BITS;
}
serverDb *db = zmalloc(sizeof(serverDb));
db->keys = kvstoreCreate(&kvstoreKeysHashtableType, slot_count_bits, flags);
db->expires = kvstoreCreate(&kvstoreExpiresHashtableType, slot_count_bits, flags);
db->expires_cursor = 0;
db->blocking_keys = dictCreate(&keylistDictType);
db->blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType);
db->ready_keys = dictCreate(&objectKeyPointerValueDictType);
db->watched_keys = dictCreate(&keylistDictType);
db->id = id;
db->avg_ttl = 0;
return db;
}
void initDatabase(int id) {
if (server.db[id] == NULL) {
server.db[id] = createDatabase(id);
}
}
void initServer(void) {
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
@ -2789,33 +2840,17 @@ void initServer(void) {
serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", strerror(errno));
exit(1);
}
server.db = zmalloc(sizeof(serverDb) * server.dbnum);
/* Create the databases, and initialize other internal state. */
int slot_count_bits = 0;
int flags = KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND;
if (server.cluster_enabled) {
slot_count_bits = CLUSTER_SLOT_MASK_BITS;
flags |= KVSTORE_FREE_EMPTY_HASHTABLES;
}
for (j = 0; j < server.dbnum; j++) {
server.db[j].keys = kvstoreCreate(&kvstoreKeysHashtableType, slot_count_bits, flags);
server.db[j].expires = kvstoreCreate(&kvstoreExpiresHashtableType, slot_count_bits, flags);
server.db[j].expires_cursor = 0;
server.db[j].blocking_keys = dictCreate(&keylistDictType);
server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType);
server.db[j].watched_keys = dictCreate(&keylistDictType);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
server.db = zcalloc(sizeof(serverDb *) * server.dbnum);
initDatabase(0); /* The default database should always exist */
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
/* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which
* seems odd) just to make the code cleaner by making it be the same type as server.pubsubshard_channels
* (which has to be kvstore), see pubsubtype.serverPubSubChannels */
server.pubsub_channels = kvstoreCreate(&kvstoreChannelHashtableType, 0, KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND);
server.pubsub_patterns = dictCreate(&objToDictDictType);
server.pubsubshard_channels = kvstoreCreate(&kvstoreChannelHashtableType, slot_count_bits,
server.pubsubshard_channels = kvstoreCreate(&kvstoreChannelHashtableType, server.cluster_enabled ? CLUSTER_SLOT_MASK_BITS : 0,
KVSTORE_ALLOCATE_HASHTABLES_ON_DEMAND | KVSTORE_FREE_EMPTY_HASHTABLES);
server.pubsub_clients = 0;
server.watching_clients = 0;
@ -5562,9 +5597,10 @@ void totalNumberOfStatefulKeys(unsigned long *blocking_keys,
unsigned long *watched_keys) {
unsigned long bkeys = 0, bkeys_on_nokey = 0, wkeys = 0;
for (int j = 0; j < server.dbnum; j++) {
bkeys += dictSize(server.db[j].blocking_keys);
bkeys_on_nokey += dictSize(server.db[j].blocking_keys_unblock_on_nokey);
wkeys += dictSize(server.db[j].watched_keys);
if (server.db[j] == NULL) continue;
bkeys += dictSize(server.db[j]->blocking_keys);
bkeys_on_nokey += dictSize(server.db[j]->blocking_keys_unblock_on_nokey);
wkeys += dictSize(server.db[j]->watched_keys);
}
if (blocking_keys) *blocking_keys = bkeys;
if (blocking_keys_on_nokey) *blocking_keys_on_nokey = bkeys_on_nokey;
@ -6157,13 +6193,15 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
if (sections++) info = sdscat(info, "\r\n");
info = sdscatprintf(info, "# Keyspace\r\n");
for (j = 0; j < server.dbnum; j++) {
serverDb *db = server.db[j];
if (db == NULL) continue;
long long keys, vkeys;
keys = kvstoreSize(server.db[j].keys);
vkeys = kvstoreSize(server.db[j].expires);
keys = kvstoreSize(db->keys);
vkeys = kvstoreSize(db->expires);
if (keys || vkeys) {
info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", j, keys, vkeys,
server.db[j].avg_ttl);
db->avg_ttl);
}
}
}

View File

@ -842,7 +842,7 @@ typedef struct functionsLibCtx functionsLibCtx;
* For example: dbarray need to be set as main database on
* successful loading and dropped on failure. */
typedef struct rdbLoadingCtx {
serverDb *dbarray;
serverDb **dbarray;
functionsLibCtx *functions_lib_ctx;
} rdbLoadingCtx;
@ -1569,7 +1569,7 @@ struct valkeyServer {
mode_t umask; /* The umask value of the process on startup */
int hz; /* serverCron() calls frequency in hertz */
int in_fork_child; /* indication that this is a fork child */
serverDb *db;
serverDb **db;
hashtable *commands; /* Command table */
hashtable *orig_commands; /* Command table before command renaming. */
aeEventLoop *el;
@ -3434,11 +3434,11 @@ robj *dbUnshareStringValue(serverDb *db, robj *key, robj *o);
#define EMPTYDB_ASYNC (1 << 0) /* Reclaim memory in another thread. */
#define EMPTYDB_NOFUNCTIONS (1 << 1) /* Indicate not to flush the functions. */
long long emptyData(int dbnum, int flags, void(callback)(hashtable *));
long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callback)(hashtable *));
long long emptyDbStructure(serverDb **dbarray, int dbnum, int async, void(callback)(hashtable *));
void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount(void);
serverDb *initTempDb(void);
void discardTempDb(serverDb *tempDb);
serverDb *initTempDb(int id);
void discardTempDb(serverDb **tempDb);
int selectDb(client *c, int id);
void signalModifiedKey(client *c, serverDb *db, robj *key);
void signalFlushedDb(int dbid, int async);
@ -3918,7 +3918,10 @@ void commandAddSubcommand(struct serverCommand *parent, struct serverCommand *su
void debugDelay(int usec);
void killThreads(void);
void makeThreadKillable(void);
void swapMainDbWithTempDb(serverDb *tempDb);
serverDb *createDatabase(int id);
int databaseEmpty(int id);
void initDatabase(int id);
void swapMainDbWithTempDb(serverDb **tempDb);
sds getVersion(void);
void debugPauseProcess(void);