Merge branch 'keydbpro' of https://gitlab.eqalpha.com/external-collab/keydb-pro-6 into keydbpro_collab

Former-commit-id: e077756b140855cb5a7e60567abf845dbfbc162d
This commit is contained in:
John Sully 2021-04-07 15:14:59 +00:00
commit d832ebb221
8 changed files with 60 additions and 35 deletions

View File

@ -2535,6 +2535,7 @@ standardConfig configs[] = {
createBoolConfig("use-fork", NULL, IMMUTABLE_CONFIG, cserver.fForkBgSave, 0, NULL, NULL), createBoolConfig("use-fork", NULL, IMMUTABLE_CONFIG, cserver.fForkBgSave, 0, NULL, NULL),
createBoolConfig("allow-write-during-load", NULL, MODIFIABLE_CONFIG, g_pserver->fWriteDuringActiveLoad, 0, NULL, NULL), createBoolConfig("allow-write-during-load", NULL, MODIFIABLE_CONFIG, g_pserver->fWriteDuringActiveLoad, 0, NULL, NULL),
createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0, NULL, NULL), createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0, NULL, NULL),
createBoolConfig("time-thread-priority", NULL, IMMUTABLE_CONFIG, cserver.time_thread_priority, 0, NULL, NULL),
/* String Configs */ /* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL), createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL),

View File

@ -3005,7 +3005,7 @@ int dbnumFromDb(redisDb *db)
serverPanic("invalid database pointer"); serverPanic("invalid database pointer");
} }
void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command) bool redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command, bool fExecOK)
{ {
if (m_spstorage == nullptr) { if (m_spstorage == nullptr) {
#if defined(__x86_64__) || defined(__i386__) #if defined(__x86_64__) || defined(__i386__)
@ -3039,7 +3039,7 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
getKeysResult result = GETKEYS_RESULT_INIT; getKeysResult result = GETKEYS_RESULT_INIT;
auto cmd = lookupCommand(szFromObj(command.argv[0])); auto cmd = lookupCommand(szFromObj(command.argv[0]));
if (cmd == nullptr) if (cmd == nullptr)
return; // Bad command? It's not for us to judge, just bail return false; // Bad command? It's not for us to judge, just bail
int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result); int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result);
for (int ikey = 0; ikey < numkeys; ++ikey) for (int ikey = 0; ikey < numkeys; ++ikey)
{ {
@ -3071,41 +3071,53 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
} }
} }
lock.arm(c); if (!vecInserts.empty()) {
for (auto &tuple : vecInserts) lock.arm(c);
{ for (auto &tuple : vecInserts)
sds sharedKey = std::get<0>(tuple);
robj *o = std::get<1>(tuple);
std::unique_ptr<expireEntry> spexpire = std::move(std::get<2>(tuple));
if (o != nullptr)
{ {
if (this->find_cached_threadsafe(sharedKey) != nullptr) sds sharedKey = std::get<0>(tuple);
robj *o = std::get<1>(tuple);
std::unique_ptr<expireEntry> spexpire = std::move(std::get<2>(tuple));
if (o != nullptr)
{ {
// While unlocked this was already ensured if (this->find_cached_threadsafe(sharedKey) != nullptr)
decrRefCount(o); {
sdsfree(sharedKey); // While unlocked this was already ensured
decrRefCount(o);
sdsfree(sharedKey);
}
else
{
dictAdd(m_pdict, sharedKey, o);
o->SetFExpires(spexpire != nullptr);
if (spexpire != nullptr)
{
auto itr = m_setexpire->find(sharedKey);
if (itr != m_setexpire->end())
m_setexpire->erase(itr);
m_setexpire->insert(std::move(*spexpire));
serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end());
}
serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end()));
}
} }
else else
{ {
dictAdd(m_pdict, sharedKey, o); if (sharedKey != nullptr)
o->SetFExpires(spexpire != nullptr); sdsfree(sharedKey); // BUG but don't bother crashing
if (spexpire != nullptr)
{
auto itr = m_setexpire->find(sharedKey);
if (itr != m_setexpire->end())
m_setexpire->erase(itr);
m_setexpire->insert(std::move(*spexpire));
serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end());
}
serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end()));
} }
} }
else lock.disarm();
{ }
if (sharedKey != nullptr)
sdsfree(sharedKey); // BUG but don't bother crashing if (fExecOK && cmd->proc == getCommand && !vecInserts.empty()) {
robj *o = std::get<1>(vecInserts[0]);
if (o != nullptr) {
addReplyBulk(c, o);
return true;
} }
} }
return false;
} }

View File

@ -587,7 +587,7 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) {
/* volatile-random and allkeys-random policy */ /* volatile-random and allkeys-random policy */
if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM
|| fEvictToStorage) || fFallback)
{ {
/* When evicting a random key, we try to evict a key for /* When evicting a random key, we try to evict a key for
* each DB, so we use the static 'next_db' variable to * each DB, so we use the static 'next_db' variable to

View File

@ -2353,7 +2353,7 @@ void parseClientCommandBuffer(client *c) {
} }
} }
size_t cqueries = c->vecqueuedcmd.size(); size_t cqueriesStart = c->vecqueuedcmd.size();
if (c->reqtype == PROTO_REQ_INLINE) { if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break; if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) { } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
@ -2372,7 +2372,9 @@ void parseClientCommandBuffer(client *c) {
if (cqueries < c->vecqueuedcmd.size() && !GlobalLocksAcquired()) { if (cqueries < c->vecqueuedcmd.size() && !GlobalLocksAcquired()) {
auto &query = c->vecqueuedcmd.back(); auto &query = c->vecqueuedcmd.back();
if (query.argc > 0 && query.argc == query.argcMax) { if (query.argc > 0 && query.argc == query.argcMax) {
c->db->prefetchKeysAsync(c, query); if (c->db->prefetchKeysAsync(c, query, c->vecqueuedcmd.size() == 1)) {
c->vecqueuedcmd.erase(c->vecqueuedcmd.begin());
}
} }
} }
c->reqtype = 0; c->reqtype = 0;

View File

@ -2168,6 +2168,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
aeAcquireLock(); aeAcquireLock();
} }
if (g_pserver->maxmemory && g_pserver->m_pstorageFactory)
freeMemoryIfNeededAndSafe(false, false);
/* If another threads unblocked one of our clients, and this thread has been idle /* If another threads unblocked one of our clients, and this thread has been idle
then beforeSleep won't have a chance to process the unblocking. So we also then beforeSleep won't have a chance to process the unblocking. So we also
process them here in the cron job to ensure they don't starve. process them here in the cron job to ensure they don't starve.
@ -2455,6 +2458,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
aeAcquireLock(); aeAcquireLock();
} }
if (g_pserver->maxmemory && g_pserver->m_pstorageFactory)
freeMemoryIfNeededAndSafe(false, false);
int iel = ielFromEventLoop(eventLoop); int iel = ielFromEventLoop(eventLoop);
serverAssert(iel != IDX_EVENT_LOOP_MAIN); serverAssert(iel != IDX_EVENT_LOOP_MAIN);
@ -6443,9 +6449,11 @@ int main(int argc, char **argv) {
serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS); serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
pthread_create(&cserver.time_thread_id, nullptr, timeThreadMain, nullptr); pthread_create(&cserver.time_thread_id, nullptr, timeThreadMain, nullptr);
if (cserver.time_thread_priority) {
struct sched_param time_thread_priority; struct sched_param time_thread_priority;
time_thread_priority.sched_priority = sched_get_priority_max(SCHED_FIFO); time_thread_priority.sched_priority = sched_get_priority_max(SCHED_FIFO);
pthread_setschedparam(cserver.time_thread_id, SCHED_FIFO, &time_thread_priority); pthread_setschedparam(cserver.time_thread_id, SCHED_FIFO, &time_thread_priority);
}
pthread_attr_t tattr; pthread_attr_t tattr;
pthread_attr_init(&tattr); pthread_attr_init(&tattr);

View File

@ -1134,7 +1134,7 @@ public:
bool removeCachedValue(const char *key); bool removeCachedValue(const char *key);
void removeAllCachedValues(); void removeAllCachedValues();
void prefetchKeysAsync(client *c, struct parsed_command &command); bool prefetchKeysAsync(client *c, struct parsed_command &command, bool fExecOK);
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
@ -2000,6 +2000,7 @@ struct redisServerConst {
int storage_memory_model = STORAGE_WRITETHROUGH; int storage_memory_model = STORAGE_WRITETHROUGH;
char *storage_conf = nullptr; char *storage_conf = nullptr;
int fForkBgSave = false; int fForkBgSave = false;
int time_thread_priority = false;
}; };
struct redisServer { struct redisServer {

View File

@ -67,7 +67,6 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons
options.max_background_flushes = 2; options.max_background_flushes = 2;
options.bytes_per_sync = 1048576; options.bytes_per_sync = 1048576;
options.compaction_pri = rocksdb::kMinOverlappingRatio; options.compaction_pri = rocksdb::kMinOverlappingRatio;
//options.compression = rocksdb::kLZ4Compression;
options.compression = rocksdb::kNoCompression; options.compression = rocksdb::kNoCompression;
options.enable_pipelined_write = true; options.enable_pipelined_write = true;
options.sst_file_manager = m_pfilemanager; options.sst_file_manager = m_pfilemanager;
@ -77,6 +76,7 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons
table_options.pin_l0_filter_and_index_blocks_in_cache = true; table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.data_block_index_type = rocksdb::BlockBasedTableOptions::kDataBlockBinaryAndHash; table_options.data_block_index_type = rocksdb::BlockBasedTableOptions::kDataBlockBinaryAndHash;
table_options.checksum = rocksdb::kNoChecksum; table_options.checksum = rocksdb::kNoChecksum;
table_options.format_version = 4;
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.table_factory.reset( options.table_factory.reset(
rocksdb::NewBlockBasedTableFactory(table_options)); rocksdb::NewBlockBasedTableFactory(table_options));

View File

@ -90,6 +90,7 @@ start_server {tags {"introspection"}} {
server_cpulist server_cpulist
bio_cpulist bio_cpulist
aof_rewrite_cpulist aof_rewrite_cpulist
time-thread-priority
bgsave_cpulist bgsave_cpulist
storage-cache-mode storage-cache-mode
storage-provider-options storage-provider-options