diff --git a/src/server.cpp b/src/server.cpp index 2700daf1c..83146a03e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2703,6 +2703,61 @@ int restartServer(int flags, mstime_t delay) { return C_ERR; /* Never reached. */ } +bool getCommandAsync(client *c) +{ + if (c->argc != 2) + return false; + + if (g_pserver->m_pstorageFactory) + return false; // BUG! But we don't have time to fix right now, fails tests + + int idb; + for (idb = 0; idb < cserver.dbnum; ++idb) + { + if (c->db == g_pserver->db[idb]) + break; + } + if (idb >= cserver.dbnum) + return false; + + if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < c->mvccCheckpoint || serverTL->rgdbSnapshot[idb]->FStale()) { + AeLocker locker; + locker.arm(c); + if (serverTL->rgdbSnapshot[idb] != nullptr) + { + g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]); + serverTL->rgdbSnapshot[idb] = nullptr; + } + + serverTL->rgdbSnapshot[idb] = c->db->createSnapshot(c->mvccCheckpoint, true); + } + + if (serverTL->rgdbSnapshot[idb] == nullptr) + return false; + + auto itr = serverTL->rgdbSnapshot[idb]->find_cached_threadsafe(szFromObj(c->argv[1])); + if (itr == serverTL->rgdbSnapshot[idb]->end()) + { + if (g_pserver->m_pstorageFactory) + return false; // no cached result doesn't mean the value doesn't exist + addReply(c, shared.null[c->resp]); + return true; + } + + // Are we expired? + const expireEntry *expire = serverTL->rgdbSnapshot[idb]->getExpire(c->argv[1]); + long long when; + if (expire && expire->FGetPrimaryExpire(&when) && when > 0) { + if (g_pserver->mstime >= when) { + addReply(c, shared.null[c->resp]); + return true; + } + } + + addReplyBulk(c,itr.val()); + return true; +} + /* This function will try to raise the max number of open files accordingly to * the configured max number of clients. It also reserves a number of file * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of @@ -3048,6 +3103,13 @@ void initServer(void) { g_pserver->db[j]->initialize(j); } + for (int i = 0; i < MAX_EVENT_LOOPS; ++i) + { + g_pserver->rgthreadvar[i].rgdbSnapshot = (const redisDbPersistentDataSnapshot**)zcalloc(sizeof(redisDbPersistentDataSnapshot*)*cserver.dbnum, MALLOC_LOCAL); + serverAssert(g_pserver->rgthreadvar[i].rgdbSnapshot != nullptr); + } + serverAssert(g_pserver->rgthreadvar[0].rgdbSnapshot != nullptr); + /* Fixup Master Client Database */ listIter li; listNode *ln; @@ -3820,6 +3882,11 @@ int processCommand(client *c, int callFlags) { queueMultiCommand(c); addReply(c,shared.queued); } else { + if (listLength(g_pserver->monitors) == 0 && c->cmd->proc == getCommand) + { + if (getCommandAsync(c)) + return C_OK; + } locker.arm(c); call(c,callFlags); c->woff = g_pserver->master_repl_offset; @@ -3917,11 +3984,19 @@ int prepareForShutdown(int flags) { /* Close the listening sockets. Apparently this allows faster restarts. */ closeListeningSockets(1); + for (int ithread = 0; ithread < MAX_EVENT_LOOPS; ++ithread) { + for (int idb = 0; idb < cserver.dbnum; ++idb) { + if (g_pserver->rgthreadvar[ithread].rgdbSnapshot[idb] != nullptr) + g_pserver->db[idb]->endSnapshot(g_pserver->rgthreadvar[ithread].rgdbSnapshot[idb]); + } + } + /* free our databases */ for (int idb = 0; idb < cserver.dbnum; ++idb) { delete g_pserver->db[idb]; g_pserver->db[idb] = nullptr; } + delete g_pserver->m_pstorageFactory; serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", diff --git a/src/server.h b/src/server.h index b467ea4a7..fcca68ee9 100644 --- a/src/server.h +++ b/src/server.h @@ -1356,6 +1356,9 @@ public: bool removeCachedValue(const char *key); void removeAllCachedValues(); +protected: + uint64_t m_mvccCheckpoint = 0; + private: struct changedescCmp { @@ -1379,7 +1382,6 @@ private: std::set m_setchanged; size_t m_cnewKeysPending = 0; std::shared_ptr m_spstorage = nullptr; - uint64_t mvccCheckpoint = 0; // Expire expireset *m_setexpire = nullptr; @@ -1418,6 +1420,10 @@ public: const expireEntry *getExpire(const char *key) const; const expireEntry *getExpire(robj_roptr key) const { return getExpire(szFromObj(key)); } + uint64_t mvccCheckpoint() const { return m_mvccCheckpoint; } + + bool FStale() const; + // These need to be fixed using redisDbPersistentData::size; using redisDbPersistentData::expireSize; @@ -1892,6 +1898,7 @@ struct redisServerThreadVars { char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; uint64_t gcEpoch = 0; + const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr; }; struct redisMaster { diff --git a/src/snapshot.cpp b/src/snapshot.cpp index d0f81e7ef..3c338ffbf 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -6,7 +6,6 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 serverAssert(GlobalLocksAcquired()); serverAssert(m_refCount == 0); // do not call this on a snapshot - // First see if we have too many levels and can bail out of this to reduce load int levels = 1; redisDbPersistentDataSnapshot *psnapshot = m_spdbSnapshotHOLDER.get(); while (psnapshot != nullptr) @@ -14,23 +13,25 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 ++levels; psnapshot = psnapshot->m_spdbSnapshotHOLDER.get(); } - if (fOptional && (levels > 8)) - return nullptr; if (m_spdbSnapshotHOLDER != nullptr) { // If possible reuse an existing snapshot (we want to minimize nesting) - if (mvccCheckpoint <= m_spdbSnapshotHOLDER->mvccCheckpoint) + if (mvccCheckpoint <= m_spdbSnapshotHOLDER->m_mvccCheckpoint) { - if (((getMvccTstamp() - m_spdbSnapshotHOLDER->mvccCheckpoint) >> MVCC_MS_SHIFT) < 1*1000) + if (!m_spdbSnapshotHOLDER->FStale()) { m_spdbSnapshotHOLDER->m_refCount++; return m_spdbSnapshotHOLDER.get(); } - serverLog(LL_WARNING, "Existing snapshot too old, creating a new one"); + serverLog(LL_VERBOSE, "Existing snapshot too old, creating a new one"); } - serverLog(levels > 5 ? LL_NOTICE : LL_VERBOSE, "Nested snapshot created: %d levels", levels); } + + // See if we have too many levels and can bail out of this to reduce load + if (fOptional && (levels >= 4)) + return nullptr; + auto spdb = std::unique_ptr(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot()); spdb->m_fAllChanged = false; @@ -45,7 +46,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 spdb->m_spstorage = std::shared_ptr(const_cast(m_spstorage->clone())); spdb->m_pdbSnapshot = m_pdbSnapshot; spdb->m_refCount = 1; - spdb->mvccCheckpoint = getMvccTstamp(); + spdb->m_mvccCheckpoint = getMvccTstamp(); if (m_setexpire != nullptr) { spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire); @@ -107,7 +108,7 @@ void redisDbPersistentData::recursiveFreeSnapshots(redisDbPersistentDataSnapshot psnapshot->m_spdbSnapshotHOLDER.release(); //psnapshot->m_pdbSnapshot = nullptr; g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::unique_ptr(psnapshot)); - serverLog(LL_WARNING, "Garbage collected snapshot"); + serverLog(LL_VERBOSE, "Garbage collected snapshot"); } } @@ -268,7 +269,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functionm_pdict, m_pdbSnapshot->size()); m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){ - if (o != nullptr) + if (o != nullptr) { dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast()); + incrRefCount(o); + } return true; }, true /*fKeyOnly*/); spdb->m_spstorage = m_pdbSnapshot->m_spstorage; @@ -397,7 +400,7 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData * return; // we were unlinked and this was a waste of time } - serverLog(LL_WARNING, "cleaned %d snapshots", snapshot_depth()-1); + serverLog(LL_VERBOSE, "cleaned %d snapshots", snapshot_depth()-1); spdb->m_refCount = depth; spdb->m_fConsolidated = true; // Drop our refs from this snapshot and its children @@ -419,4 +422,11 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData * const redisDbPersistentDataSnapshot *ptrT = m_spdbSnapshotHOLDER.get(); __atomic_store(&m_pdbSnapshot, &ptrT, __ATOMIC_SEQ_CST); locker.disarm(); // ensure we're not locked for any dtors +} + +bool redisDbPersistentDataSnapshot::FStale() const +{ + // 0.5 seconds considered stale; + static const uint64_t msStale = 500; + return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= msStale; } \ No newline at end of file