Perform GET command inline

Former-commit-id: 5623936d99e334ab103f3dc1541b145c125d0ee8
This commit is contained in:
John Sully 2021-03-23 03:44:20 +00:00
parent 44603c8227
commit c6fc1bcfe3
4 changed files with 55 additions and 35 deletions

View File

@ -2998,10 +2998,10 @@ int dbnumFromDb(redisDb *db)
serverPanic("invalid database pointer");
}
void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command)
bool redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command, bool fExecOK)
{
if (m_spstorage == nullptr)
return;
return false;
AeLocker lock;
@ -3010,7 +3010,7 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
getKeysResult result = GETKEYS_RESULT_INIT;
auto cmd = lookupCommand(szFromObj(command.argv[0]));
if (cmd == nullptr)
return; // Bad command? It's not for us to judge, just bail
return false; // Bad command? It's not for us to judge, just bail
int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result);
for (int ikey = 0; ikey < numkeys; ++ikey)
{
@ -3042,41 +3042,53 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
}
}
lock.arm(c);
for (auto &tuple : vecInserts)
{
sds sharedKey = std::get<0>(tuple);
robj *o = std::get<1>(tuple);
std::unique_ptr<expireEntry> spexpire = std::move(std::get<2>(tuple));
if (o != nullptr)
if (!vecInserts.empty()) {
lock.arm(c);
for (auto &tuple : vecInserts)
{
if (this->find_cached_threadsafe(sharedKey) != nullptr)
sds sharedKey = std::get<0>(tuple);
robj *o = std::get<1>(tuple);
std::unique_ptr<expireEntry> spexpire = std::move(std::get<2>(tuple));
if (o != nullptr)
{
// While unlocked this was already ensured
decrRefCount(o);
sdsfree(sharedKey);
if (this->find_cached_threadsafe(sharedKey) != nullptr)
{
// While unlocked this was already ensured
decrRefCount(o);
sdsfree(sharedKey);
}
else
{
dictAdd(m_pdict, sharedKey, o);
o->SetFExpires(spexpire != nullptr);
if (spexpire != nullptr)
{
auto itr = m_setexpire->find(sharedKey);
if (itr != m_setexpire->end())
m_setexpire->erase(itr);
m_setexpire->insert(std::move(*spexpire));
serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end());
}
serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end()));
}
}
else
{
dictAdd(m_pdict, sharedKey, o);
o->SetFExpires(spexpire != nullptr);
if (spexpire != nullptr)
{
auto itr = m_setexpire->find(sharedKey);
if (itr != m_setexpire->end())
m_setexpire->erase(itr);
m_setexpire->insert(std::move(*spexpire));
serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end());
}
serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end()));
if (sharedKey != nullptr)
sdsfree(sharedKey); // BUG but don't bother crashing
}
}
else
{
if (sharedKey != nullptr)
sdsfree(sharedKey); // BUG but don't bother crashing
lock.disarm();
}
if (fExecOK && cmd->proc == getCommand && !vecInserts.empty()) {
robj *o = std::get<1>(vecInserts[0]);
if (o != nullptr) {
addReplyBulk(c, o);
return true;
}
}
return false;
}

View File

@ -2343,7 +2343,7 @@ void parseClientCommandBuffer(client *c) {
}
}
size_t cqueries = c->vecqueuedcmd.size();
size_t cqueriesStart = c->vecqueuedcmd.size();
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
@ -2359,10 +2359,12 @@ void parseClientCommandBuffer(client *c) {
}
/* Prefetch if we have a storage provider and we're not in the global lock */
if (cqueries < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) {
if (cqueriesStart < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) {
auto &query = c->vecqueuedcmd.back();
if (query.argc > 0 && query.argc == query.argcMax) {
c->db->prefetchKeysAsync(c, query);
if (c->db->prefetchKeysAsync(c, query, c->vecqueuedcmd.size() == 1)) {
c->vecqueuedcmd.erase(c->vecqueuedcmd.begin());
}
}
}
c->reqtype = 0;

View File

@ -2168,6 +2168,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
aeAcquireLock();
}
if (g_pserver->maxmemory && g_pserver->m_pstorageFactory)
freeMemoryIfNeededAndSafe(false, false);
/* If another threads unblocked one of our clients, and this thread has been idle
then beforeSleep won't have a chance to process the unblocking. So we also
process them here in the cron job to ensure they don't starve.
@ -2455,6 +2458,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
aeAcquireLock();
}
if (g_pserver->maxmemory && g_pserver->m_pstorageFactory)
freeMemoryIfNeededAndSafe(false, false);
int iel = ielFromEventLoop(eventLoop);
serverAssert(iel != IDX_EVENT_LOOP_MAIN);

View File

@ -1134,7 +1134,7 @@ public:
bool removeCachedValue(const char *key);
void removeAllCachedValues();
void prefetchKeysAsync(client *c, struct parsed_command &command);
bool prefetchKeysAsync(client *c, struct parsed_command &command, bool fExecOK);
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }