Merge branch 'flash_read_perf' into 'keydbpro'

Flash read perf

See merge request external-collab/keydb-pro-6!1

Former-commit-id: f6d7277f307b78f079dd99bf532cd26a7b62018d
This commit is contained in:
jsully 2021-04-07 06:12:55 +00:00
commit c168c8fbbf
5 changed files with 54 additions and 34 deletions

View File

@ -3005,7 +3005,7 @@ int dbnumFromDb(redisDb *db)
serverPanic("invalid database pointer"); serverPanic("invalid database pointer");
} }
void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command) bool redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command, bool fExecOK)
{ {
if (m_spstorage == nullptr) { if (m_spstorage == nullptr) {
#if defined(__x86_64__) || defined(__i386__) #if defined(__x86_64__) || defined(__i386__)
@ -3039,7 +3039,7 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
getKeysResult result = GETKEYS_RESULT_INIT; getKeysResult result = GETKEYS_RESULT_INIT;
auto cmd = lookupCommand(szFromObj(command.argv[0])); auto cmd = lookupCommand(szFromObj(command.argv[0]));
if (cmd == nullptr) if (cmd == nullptr)
return; // Bad command? It's not for us to judge, just bail return false; // Bad command? It's not for us to judge, just bail
int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result); int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result);
for (int ikey = 0; ikey < numkeys; ++ikey) for (int ikey = 0; ikey < numkeys; ++ikey)
{ {
@ -3071,6 +3071,7 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
} }
} }
if (!vecInserts.empty()) {
lock.arm(c); lock.arm(c);
for (auto &tuple : vecInserts) for (auto &tuple : vecInserts)
{ {
@ -3108,4 +3109,15 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
sdsfree(sharedKey); // BUG but don't bother crashing sdsfree(sharedKey); // BUG but don't bother crashing
} }
} }
lock.disarm();
}
if (fExecOK && cmd->proc == getCommand && !vecInserts.empty()) {
robj *o = std::get<1>(vecInserts[0]);
if (o != nullptr) {
addReplyBulk(c, o);
return true;
}
}
return false;
} }

View File

@ -587,7 +587,7 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) {
/* volatile-random and allkeys-random policy */ /* volatile-random and allkeys-random policy */
if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM
|| fEvictToStorage) || fFallback)
{ {
/* When evicting a random key, we try to evict a key for /* When evicting a random key, we try to evict a key for
* each DB, so we use the static 'next_db' variable to * each DB, so we use the static 'next_db' variable to

View File

@ -2353,7 +2353,7 @@ void parseClientCommandBuffer(client *c) {
} }
} }
size_t cqueries = c->vecqueuedcmd.size(); size_t cqueriesStart = c->vecqueuedcmd.size();
if (c->reqtype == PROTO_REQ_INLINE) { if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break; if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) { } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
@ -2372,7 +2372,9 @@ void parseClientCommandBuffer(client *c) {
if (cqueries < c->vecqueuedcmd.size() && !GlobalLocksAcquired()) { if (cqueries < c->vecqueuedcmd.size() && !GlobalLocksAcquired()) {
auto &query = c->vecqueuedcmd.back(); auto &query = c->vecqueuedcmd.back();
if (query.argc > 0 && query.argc == query.argcMax) { if (query.argc > 0 && query.argc == query.argcMax) {
c->db->prefetchKeysAsync(c, query); if (c->db->prefetchKeysAsync(c, query, c->vecqueuedcmd.size() == 1)) {
c->vecqueuedcmd.erase(c->vecqueuedcmd.begin());
}
} }
} }
c->reqtype = 0; c->reqtype = 0;

View File

@ -2168,6 +2168,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
aeAcquireLock(); aeAcquireLock();
} }
if (g_pserver->maxmemory && g_pserver->m_pstorageFactory)
freeMemoryIfNeededAndSafe(false, false);
/* If another threads unblocked one of our clients, and this thread has been idle /* If another threads unblocked one of our clients, and this thread has been idle
then beforeSleep won't have a chance to process the unblocking. So we also then beforeSleep won't have a chance to process the unblocking. So we also
process them here in the cron job to ensure they don't starve. process them here in the cron job to ensure they don't starve.
@ -2455,6 +2458,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
aeAcquireLock(); aeAcquireLock();
} }
if (g_pserver->maxmemory && g_pserver->m_pstorageFactory)
freeMemoryIfNeededAndSafe(false, false);
int iel = ielFromEventLoop(eventLoop); int iel = ielFromEventLoop(eventLoop);
serverAssert(iel != IDX_EVENT_LOOP_MAIN); serverAssert(iel != IDX_EVENT_LOOP_MAIN);

View File

@ -1134,7 +1134,7 @@ public:
bool removeCachedValue(const char *key); bool removeCachedValue(const char *key);
void removeAllCachedValues(); void removeAllCachedValues();
void prefetchKeysAsync(client *c, struct parsed_command &command); bool prefetchKeysAsync(client *c, struct parsed_command &command, bool fExecOK);
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }