Merge branch 'keydbpro' into PRO_RELEASE_6

Former-commit-id: 86eb3f5c5ea3c17d798e74126e08114a2dd2449e
This commit is contained in:
John Sully 2020-07-13 18:16:05 +00:00
commit c9ff0292d8
10 changed files with 34 additions and 26 deletions

View File

View File

View File

View File

@ -1460,9 +1460,9 @@ int rewriteAppendOnlyFileRio(rio *aof) {
serverPanic("Unknown object type");
}
/* Save the expire time */
std::unique_lock<fastlock> ul(g_expireLock);
expireEntry *pexpire = db->getExpire(&key);
if (pexpire != nullptr) {
if (o->FExpires()) {
std::unique_lock<fastlock> ul(g_expireLock);
expireEntry *pexpire = db->getExpire(&key);
for (auto &subExpire : *pexpire) {
if (subExpire.subkey() == nullptr)
{
@ -1480,7 +1480,6 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) return false; // common
}
}
ul.unlock();
/* Read some diff from the parent process from time to time. */
if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {

View File

@ -1738,15 +1738,15 @@ void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey)
/* Check if the key is expired. Note, this does not check subexpires */
int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key) {
/* Don't expire anything while loading. It will be done later. */
if (g_pserver->loading) return 0;
std::unique_lock<fastlock> ul(g_expireLock);
const expireEntry *pexpire = db->getExpire(key);
mstime_t now;
if (pexpire == nullptr) return 0; /* No expire for this key */
/* Don't expire anything while loading. It will be done later. */
if (g_pserver->loading) return 0;
long long when = -1;
for (auto &exp : *pexpire)
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <vector>
#include <assert.h>
#include <unordered_set>
template<typename T>
class GarbageCollector
@ -99,6 +100,6 @@ private:
fastlock m_lock { "Garbage Collector"};
std::vector<EpochHolder> m_vecepochs;
std::set<uint64_t> m_setepochOutstanding;
std::unordered_set<uint64_t> m_setepochOutstanding;
uint64_t m_epochNext = 0;
};

View File

@ -1150,11 +1150,14 @@ int saveKey(rio *rdb, const redisDbPersistentDataSnapshot *db, int flags, size_t
robj key;
initStaticStringObject(key,(char*)keystr);
std::unique_lock<fastlock> ul(g_expireLock);
const expireEntry *pexpire = db->getExpire(&key);
serverAssert((o->FExpires() && pexpire != nullptr) || (!o->FExpires() && pexpire == nullptr));
if (pexpire == nullptr)
ul.unlock(); // no need to hold the lock if we're not saving the expire
std::unique_lock<fastlock> ul(g_expireLock, std::defer_lock);
const expireEntry *pexpire = nullptr;
if (o->FExpires())
{
ul.lock();
pexpire = db->getExpire(&key);
serverAssert((o->FExpires() && pexpire != nullptr) || (!o->FExpires() && pexpire == nullptr));
}
if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1)
return 0;

View File

@ -2312,9 +2312,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
}
g_pserver->asyncworkqueue->AddWorkFunction([]{
g_pserver->db[0]->consolidate_snapshot();
}, true /*HiPri*/);
bool fAnySnapshots = false;
for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb)
fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot();
if (fAnySnapshots)
{
g_pserver->asyncworkqueue->AddWorkFunction([]{
g_pserver->db[0]->consolidate_snapshot();
}, true /*HiPri*/);
}
/* Fire the cron loop modules event. */
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,g_pserver->hz};
@ -2441,7 +2447,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
static thread_local bool fFirstRun = true;
// note: we also copy the DB pointer in case a DB swap is done while the lock is released
std::vector<redisDb*> vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released
if (cserver.storage_memory_model == STORAGE_WRITETHROUGH)
if (cserver.storage_memory_model == STORAGE_WRITETHROUGH && g_pserver->m_pstorageFactory != nullptr)
{
if (!fFirstRun) {
mstime_t storage_process_latency;
@ -2462,8 +2468,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
mstime_t commit_latency;
latencyStartMonitor(commit_latency);
for (redisDb *db : vecdb)
db->commitChanges();
if (g_pserver->m_pstorageFactory != nullptr)
{
for (redisDb *db : vecdb)
db->commitChanges();
}
latencyEndMonitor(commit_latency);
latencyAddSampleIfNeeded("storage-commit", commit_latency);
@ -4150,13 +4159,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
#if 0
if (cserver.cthreads >= 2 && !g_fTestMode && g_pserver->m_pstorageFactory == nullptr && listLength(g_pserver->monitors) == 0 && c->cmd->proc == getCommand)
{
if (getCommandAsync(c))
return C_OK;
}
#endif
locker.arm(c);
incrementMvccTstamp();
call(c,callFlags);

View File

@ -1350,6 +1350,8 @@ public:
bool removeCachedValue(const char *key);
void removeAllCachedValues();
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
dict_iter find_cached_threadsafe(const char *key) const;
protected:
@ -1425,6 +1427,7 @@ public:
using redisDbPersistentData::endSnapshotAsync;
using redisDbPersistentData::end;
using redisDbPersistentData::find_cached_threadsafe;
using redisDbPersistentData::FSnapshot;
dict_iter random_cache_threadsafe(bool fPrimaryOnly = false) const;