diff --git a/src/db.cpp b/src/db.cpp index 20b70fe02..70fd2d1dc 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2998,10 +2998,10 @@ int dbnumFromDb(redisDb *db) serverPanic("invalid database pointer"); } -void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command) +bool redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command, bool fExecOK) { if (m_spstorage == nullptr) - return; + return false; AeLocker lock; @@ -3010,7 +3010,7 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command getKeysResult result = GETKEYS_RESULT_INIT; auto cmd = lookupCommand(szFromObj(command.argv[0])); if (cmd == nullptr) - return; // Bad command? It's not for us to judge, just bail + return false; // Bad command? It's not for us to judge, just bail int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result); for (int ikey = 0; ikey < numkeys; ++ikey) { @@ -3042,41 +3042,53 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command } } - lock.arm(c); - for (auto &tuple : vecInserts) - { - sds sharedKey = std::get<0>(tuple); - robj *o = std::get<1>(tuple); - std::unique_ptr spexpire = std::move(std::get<2>(tuple)); - - if (o != nullptr) + if (!vecInserts.empty()) { + lock.arm(c); + for (auto &tuple : vecInserts) { - if (this->find_cached_threadsafe(sharedKey) != nullptr) + sds sharedKey = std::get<0>(tuple); + robj *o = std::get<1>(tuple); + std::unique_ptr spexpire = std::move(std::get<2>(tuple)); + + if (o != nullptr) { - // While unlocked this was already ensured - decrRefCount(o); - sdsfree(sharedKey); + if (this->find_cached_threadsafe(sharedKey) != nullptr) + { + // While unlocked this was already ensured + decrRefCount(o); + sdsfree(sharedKey); + } + else + { + dictAdd(m_pdict, sharedKey, o); + o->SetFExpires(spexpire != nullptr); + + if (spexpire != nullptr) + { + auto itr = m_setexpire->find(sharedKey); + if (itr != m_setexpire->end()) + m_setexpire->erase(itr); + m_setexpire->insert(std::move(*spexpire)); + serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end()); + } + serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end())); + } } else { - dictAdd(m_pdict, sharedKey, o); - o->SetFExpires(spexpire != nullptr); - - if (spexpire != nullptr) - { - auto itr = m_setexpire->find(sharedKey); - if (itr != m_setexpire->end()) - m_setexpire->erase(itr); - m_setexpire->insert(std::move(*spexpire)); - serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end()); - } - serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end())); + if (sharedKey != nullptr) + sdsfree(sharedKey); // BUG but don't bother crashing } } - else - { - if (sharedKey != nullptr) - sdsfree(sharedKey); // BUG but don't bother crashing + lock.disarm(); + } + + if (fExecOK && cmd->proc == getCommand && !vecInserts.empty()) { + robj *o = std::get<1>(vecInserts[0]); + if (o != nullptr) { + addReplyBulk(c, o); + return true; } } + return false; } \ No newline at end of file diff --git a/src/networking.cpp b/src/networking.cpp index 15aa6f43a..e9959c263 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2343,7 +2343,7 @@ void parseClientCommandBuffer(client *c) { } } - size_t cqueries = c->vecqueuedcmd.size(); + size_t cqueriesStart = c->vecqueuedcmd.size(); if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { @@ -2359,10 +2359,12 @@ void parseClientCommandBuffer(client *c) { } /* Prefetch if we have a storage provider and we're not in the global lock */ - if (cqueries < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) { + if (cqueriesStart < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) { auto &query = c->vecqueuedcmd.back(); if (query.argc > 0 && query.argc == query.argcMax) { - c->db->prefetchKeysAsync(c, query); + if (c->db->prefetchKeysAsync(c, query, c->vecqueuedcmd.size() == 1)) { + c->vecqueuedcmd.erase(c->vecqueuedcmd.begin()); + } } } c->reqtype = 0; diff --git a/src/server.cpp b/src/server.cpp index 96d62e4ea..814909bf6 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2168,6 +2168,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { aeAcquireLock(); } + if (g_pserver->maxmemory && g_pserver->m_pstorageFactory) + freeMemoryIfNeededAndSafe(false, false); + /* If another threads unblocked one of our clients, and this thread has been idle then beforeSleep won't have a chance to process the unblocking. So we also process them here in the cron job to ensure they don't starve. @@ -2455,6 +2458,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData aeAcquireLock(); } + if (g_pserver->maxmemory && g_pserver->m_pstorageFactory) + freeMemoryIfNeededAndSafe(false, false); + int iel = ielFromEventLoop(eventLoop); serverAssert(iel != IDX_EVENT_LOOP_MAIN); diff --git a/src/server.h b/src/server.h index 07f025c46..9ffccd6a0 100644 --- a/src/server.h +++ b/src/server.h @@ -1134,7 +1134,7 @@ public: bool removeCachedValue(const char *key); void removeAllCachedValues(); - void prefetchKeysAsync(client *c, struct parsed_command &command); + bool prefetchKeysAsync(client *c, struct parsed_command &command, bool fExecOK); bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }