Fix snapshot consolidation bugs and reduce log noise

Former-commit-id: 7f3cb2981529277d3f02dceb2f60e7aa8385847d
This commit is contained in:
John Sully 2020-01-12 01:22:44 -05:00
parent bddcdd856e
commit 1a70e322dd
3 changed files with 105 additions and 13 deletions

View File

@ -2703,6 +2703,61 @@ int restartServer(int flags, mstime_t delay) {
return C_ERR; /* Never reached. */ return C_ERR; /* Never reached. */
} }
bool getCommandAsync(client *c)
{
if (c->argc != 2)
return false;
if (g_pserver->m_pstorageFactory)
return false; // BUG! But we don't have time to fix right now, fails tests
int idb;
for (idb = 0; idb < cserver.dbnum; ++idb)
{
if (c->db == g_pserver->db[idb])
break;
}
if (idb >= cserver.dbnum)
return false;
if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < c->mvccCheckpoint || serverTL->rgdbSnapshot[idb]->FStale()) {
AeLocker locker;
locker.arm(c);
if (serverTL->rgdbSnapshot[idb] != nullptr)
{
g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]);
serverTL->rgdbSnapshot[idb] = nullptr;
}
serverTL->rgdbSnapshot[idb] = c->db->createSnapshot(c->mvccCheckpoint, true);
}
if (serverTL->rgdbSnapshot[idb] == nullptr)
return false;
auto itr = serverTL->rgdbSnapshot[idb]->find_cached_threadsafe(szFromObj(c->argv[1]));
if (itr == serverTL->rgdbSnapshot[idb]->end())
{
if (g_pserver->m_pstorageFactory)
return false; // no cached result doesn't mean the value doesn't exist
addReply(c, shared.null[c->resp]);
return true;
}
// Are we expired?
const expireEntry *expire = serverTL->rgdbSnapshot[idb]->getExpire(c->argv[1]);
long long when;
if (expire && expire->FGetPrimaryExpire(&when) && when > 0) {
if (g_pserver->mstime >= when) {
addReply(c, shared.null[c->resp]);
return true;
}
}
addReplyBulk(c,itr.val());
return true;
}
/* This function will try to raise the max number of open files accordingly to /* This function will try to raise the max number of open files accordingly to
* the configured max number of clients. It also reserves a number of file * the configured max number of clients. It also reserves a number of file
* descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of
@ -3048,6 +3103,13 @@ void initServer(void) {
g_pserver->db[j]->initialize(j); g_pserver->db[j]->initialize(j);
} }
for (int i = 0; i < MAX_EVENT_LOOPS; ++i)
{
g_pserver->rgthreadvar[i].rgdbSnapshot = (const redisDbPersistentDataSnapshot**)zcalloc(sizeof(redisDbPersistentDataSnapshot*)*cserver.dbnum, MALLOC_LOCAL);
serverAssert(g_pserver->rgthreadvar[i].rgdbSnapshot != nullptr);
}
serverAssert(g_pserver->rgthreadvar[0].rgdbSnapshot != nullptr);
/* Fixup Master Client Database */ /* Fixup Master Client Database */
listIter li; listIter li;
listNode *ln; listNode *ln;
@ -3820,6 +3882,11 @@ int processCommand(client *c, int callFlags) {
queueMultiCommand(c); queueMultiCommand(c);
addReply(c,shared.queued); addReply(c,shared.queued);
} else { } else {
if (listLength(g_pserver->monitors) == 0 && c->cmd->proc == getCommand)
{
if (getCommandAsync(c))
return C_OK;
}
locker.arm(c); locker.arm(c);
call(c,callFlags); call(c,callFlags);
c->woff = g_pserver->master_repl_offset; c->woff = g_pserver->master_repl_offset;
@ -3917,11 +3984,19 @@ int prepareForShutdown(int flags) {
/* Close the listening sockets. Apparently this allows faster restarts. */ /* Close the listening sockets. Apparently this allows faster restarts. */
closeListeningSockets(1); closeListeningSockets(1);
for (int ithread = 0; ithread < MAX_EVENT_LOOPS; ++ithread) {
for (int idb = 0; idb < cserver.dbnum; ++idb) {
if (g_pserver->rgthreadvar[ithread].rgdbSnapshot[idb] != nullptr)
g_pserver->db[idb]->endSnapshot(g_pserver->rgthreadvar[ithread].rgdbSnapshot[idb]);
}
}
/* free our databases */ /* free our databases */
for (int idb = 0; idb < cserver.dbnum; ++idb) { for (int idb = 0; idb < cserver.dbnum; ++idb) {
delete g_pserver->db[idb]; delete g_pserver->db[idb];
g_pserver->db[idb] = nullptr; g_pserver->db[idb] = nullptr;
} }
delete g_pserver->m_pstorageFactory; delete g_pserver->m_pstorageFactory;
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",

View File

@ -1356,6 +1356,9 @@ public:
bool removeCachedValue(const char *key); bool removeCachedValue(const char *key);
void removeAllCachedValues(); void removeAllCachedValues();
protected:
uint64_t m_mvccCheckpoint = 0;
private: private:
struct changedescCmp struct changedescCmp
{ {
@ -1379,7 +1382,6 @@ private:
std::set<changedesc, changedescCmp> m_setchanged; std::set<changedesc, changedescCmp> m_setchanged;
size_t m_cnewKeysPending = 0; size_t m_cnewKeysPending = 0;
std::shared_ptr<IStorage> m_spstorage = nullptr; std::shared_ptr<IStorage> m_spstorage = nullptr;
uint64_t mvccCheckpoint = 0;
// Expire // Expire
expireset *m_setexpire = nullptr; expireset *m_setexpire = nullptr;
@ -1418,6 +1420,10 @@ public:
const expireEntry *getExpire(const char *key) const; const expireEntry *getExpire(const char *key) const;
const expireEntry *getExpire(robj_roptr key) const { return getExpire(szFromObj(key)); } const expireEntry *getExpire(robj_roptr key) const { return getExpire(szFromObj(key)); }
uint64_t mvccCheckpoint() const { return m_mvccCheckpoint; }
bool FStale() const;
// These need to be fixed // These need to be fixed
using redisDbPersistentData::size; using redisDbPersistentData::size;
using redisDbPersistentData::expireSize; using redisDbPersistentData::expireSize;
@ -1892,6 +1898,7 @@ struct redisServerThreadVars {
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
long unsigned commandsExecuted = 0; long unsigned commandsExecuted = 0;
uint64_t gcEpoch = 0; uint64_t gcEpoch = 0;
const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
}; };
struct redisMaster { struct redisMaster {

View File

@ -6,7 +6,6 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
serverAssert(m_refCount == 0); // do not call this on a snapshot serverAssert(m_refCount == 0); // do not call this on a snapshot
// First see if we have too many levels and can bail out of this to reduce load
int levels = 1; int levels = 1;
redisDbPersistentDataSnapshot *psnapshot = m_spdbSnapshotHOLDER.get(); redisDbPersistentDataSnapshot *psnapshot = m_spdbSnapshotHOLDER.get();
while (psnapshot != nullptr) while (psnapshot != nullptr)
@ -14,23 +13,25 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
++levels; ++levels;
psnapshot = psnapshot->m_spdbSnapshotHOLDER.get(); psnapshot = psnapshot->m_spdbSnapshotHOLDER.get();
} }
if (fOptional && (levels > 8))
return nullptr;
if (m_spdbSnapshotHOLDER != nullptr) if (m_spdbSnapshotHOLDER != nullptr)
{ {
// If possible reuse an existing snapshot (we want to minimize nesting) // If possible reuse an existing snapshot (we want to minimize nesting)
if (mvccCheckpoint <= m_spdbSnapshotHOLDER->mvccCheckpoint) if (mvccCheckpoint <= m_spdbSnapshotHOLDER->m_mvccCheckpoint)
{ {
if (((getMvccTstamp() - m_spdbSnapshotHOLDER->mvccCheckpoint) >> MVCC_MS_SHIFT) < 1*1000) if (!m_spdbSnapshotHOLDER->FStale())
{ {
m_spdbSnapshotHOLDER->m_refCount++; m_spdbSnapshotHOLDER->m_refCount++;
return m_spdbSnapshotHOLDER.get(); return m_spdbSnapshotHOLDER.get();
} }
serverLog(LL_WARNING, "Existing snapshot too old, creating a new one"); serverLog(LL_VERBOSE, "Existing snapshot too old, creating a new one");
} }
serverLog(levels > 5 ? LL_NOTICE : LL_VERBOSE, "Nested snapshot created: %d levels", levels);
} }
// See if we have too many levels and can bail out of this to reduce load
if (fOptional && (levels >= 4))
return nullptr;
auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot()); auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot());
spdb->m_fAllChanged = false; spdb->m_fAllChanged = false;
@ -45,7 +46,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
spdb->m_spstorage = std::shared_ptr<IStorage>(const_cast<IStorage*>(m_spstorage->clone())); spdb->m_spstorage = std::shared_ptr<IStorage>(const_cast<IStorage*>(m_spstorage->clone()));
spdb->m_pdbSnapshot = m_pdbSnapshot; spdb->m_pdbSnapshot = m_pdbSnapshot;
spdb->m_refCount = 1; spdb->m_refCount = 1;
spdb->mvccCheckpoint = getMvccTstamp(); spdb->m_mvccCheckpoint = getMvccTstamp();
if (m_setexpire != nullptr) if (m_setexpire != nullptr)
{ {
spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire); spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire);
@ -107,7 +108,7 @@ void redisDbPersistentData::recursiveFreeSnapshots(redisDbPersistentDataSnapshot
psnapshot->m_spdbSnapshotHOLDER.release(); psnapshot->m_spdbSnapshotHOLDER.release();
//psnapshot->m_pdbSnapshot = nullptr; //psnapshot->m_pdbSnapshot = nullptr;
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::unique_ptr<redisDbPersistentDataSnapshot>(psnapshot)); g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::unique_ptr<redisDbPersistentDataSnapshot>(psnapshot));
serverLog(LL_WARNING, "Garbage collected snapshot"); serverLog(LL_VERBOSE, "Garbage collected snapshot");
} }
} }
@ -268,7 +269,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
while(fResult && ((de = dictNext(di)) != nullptr)) while(fResult && ((de = dictNext(di)) != nullptr))
{ {
--celem; --celem;
robj *o = fKeyOnly ? nullptr : (robj*)dictGetVal(de); robj *o = (robj*)dictGetVal(de);
if (!fn((const char*)dictGetKey(de), o)) if (!fn((const char*)dictGetKey(de), o))
fResult = false; fResult = false;
} }
@ -368,8 +369,10 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *
dictExpand(spdb->m_pdict, m_pdbSnapshot->size()); dictExpand(spdb->m_pdict, m_pdbSnapshot->size());
m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){ m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){
if (o != nullptr) if (o != nullptr) {
dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast()); dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast());
incrRefCount(o);
}
return true; return true;
}, true /*fKeyOnly*/); }, true /*fKeyOnly*/);
spdb->m_spstorage = m_pdbSnapshot->m_spstorage; spdb->m_spstorage = m_pdbSnapshot->m_spstorage;
@ -397,7 +400,7 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *
return; // we were unlinked and this was a waste of time return; // we were unlinked and this was a waste of time
} }
serverLog(LL_WARNING, "cleaned %d snapshots", snapshot_depth()-1); serverLog(LL_VERBOSE, "cleaned %d snapshots", snapshot_depth()-1);
spdb->m_refCount = depth; spdb->m_refCount = depth;
spdb->m_fConsolidated = true; spdb->m_fConsolidated = true;
// Drop our refs from this snapshot and its children // Drop our refs from this snapshot and its children
@ -419,4 +422,11 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *
const redisDbPersistentDataSnapshot *ptrT = m_spdbSnapshotHOLDER.get(); const redisDbPersistentDataSnapshot *ptrT = m_spdbSnapshotHOLDER.get();
__atomic_store(&m_pdbSnapshot, &ptrT, __ATOMIC_SEQ_CST); __atomic_store(&m_pdbSnapshot, &ptrT, __ATOMIC_SEQ_CST);
locker.disarm(); // ensure we're not locked for any dtors locker.disarm(); // ensure we're not locked for any dtors
}
bool redisDbPersistentDataSnapshot::FStale() const
{
// 0.5 seconds considered stale;
static const uint64_t msStale = 500;
return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= msStale;
} }