From e25a4e80e767351dd7a0f6716167d2dfa2a23603 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 19 Nov 2019 17:40:35 -0500 Subject: [PATCH] Multiple threads should be able to get the same snapshot provided its not too old Former-commit-id: 054331098ee18dfb1887fd2b0a67688ef894823e --- src/aof.cpp | 1 + src/db.cpp | 19 +++++++++++++++++-- src/networking.cpp | 1 + src/rdb.cpp | 4 ++-- src/server.cpp | 3 +++ src/server.h | 6 +++++- 6 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index d35431d73..86f03f48f 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -676,6 +676,7 @@ client *createFakeClient(void) { c->peerid = NULL; c->resp = 2; c->puser = NULL; + c->mvccCheckpoint = 0; listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); fastlock_init(&c->lock, "fake client"); diff --git a/src/db.cpp b/src/db.cpp index 107ad2044..9cba03a2a 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -724,6 +724,7 @@ void keysCommand(client *c) { } return true; }); + setDeferredArrayLen(c,replylen,numkeys); } @@ -2077,10 +2078,18 @@ void redisDbPersistentData::processChanges() m_setchanged.clear(); } -redisDbPersistentData *redisDbPersistentData::createSnapshot() +redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint) { serverAssert(GlobalLocksAcquired()); - serverAssert(m_spdbSnapshotHOLDER == nullptr); + if (m_spdbSnapshotHOLDER != nullptr) + { + if (mvccCheckpoint <= m_spdbSnapshotHOLDER->mvccCheckpoint) + { + ++m_snapshotRefcount; + return m_spdbSnapshotHOLDER.get(); + } + return nullptr; + } auto spdb = std::make_unique(); spdb->m_fAllChanged = false; @@ -2095,6 +2104,7 @@ redisDbPersistentData *redisDbPersistentData::createSnapshot() m_spdbSnapshotHOLDER = std::move(spdb); m_pdbSnapshot = m_spdbSnapshotHOLDER.get(); + ++m_snapshotRefcount; return m_pdbSnapshot; } @@ -2103,6 +2113,11 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot) if (!GlobalLocksAcquired()) serverLog(LL_WARNING, "Global locks not acquired"); serverAssert(m_spdbSnapshotHOLDER.get() == psnapshot); + + --m_snapshotRefcount; + if (m_snapshotRefcount > 0) + return; + m_spdbSnapshotHOLDER->m_pdict->iterators--; if (m_pdbSnapshot == nullptr) diff --git a/src/networking.cpp b/src/networking.cpp index ef3ee2683..905a64693 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -176,6 +176,7 @@ client *createClient(int fd, int iel) { c->bufposAsync = 0; c->client_tracking_redirection = 0; c->casyncOpsPending = 0; + c->mvccCheckpoint = 0; memset(c->uuid, 0, UUID_BINARY_LEN); listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); diff --git a/src/rdb.cpp b/src/rdb.cpp index 12ed78627..cb6ce3948 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1403,7 +1403,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) args->rsi.master_repl_offset = g_pserver->master_repl_offset; for (int idb = 0; idb < cserver.dbnum; ++idb) - args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(); + args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(getMvccTstamp()); g_pserver->rdbThreadVars.tmpfileNum++; g_pserver->rdbThreadVars.fRdbThreadCancel = false; @@ -2585,7 +2585,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { start = ustime(); for (int idb = 0; idb < cserver.dbnum; ++idb) - args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(); + args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(getMvccTstamp()); g_pserver->rdbThreadVars.tmpfileNum++; g_pserver->rdbThreadVars.fRdbThreadCancel = false; diff --git a/src/server.cpp b/src/server.cpp index 3c49fa30b..20a91a763 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3385,6 +3385,9 @@ void call(client *c, int flags) { dirty = g_pserver->dirty-dirty; if (dirty < 0) dirty = 0; + if (dirty) + c->mvccCheckpoint = getMvccTstamp(); + /* When EVAL is called loading the AOF we don't want commands called * from Lua to go into the slowlog or to populate statistics. */ if (g_pserver->loading && c->flags & CLIENT_LUA) diff --git a/src/server.h b/src/server.h index 26b5b9f4f..77455228c 100644 --- a/src/server.h +++ b/src/server.h @@ -1211,7 +1211,7 @@ public: expireset *setexpireUnsafe() { return m_setexpire; } const expireset *setexpire() { return m_setexpire; } - redisDbPersistentData *createSnapshot(); + redisDbPersistentData *createSnapshot(uint64_t mvccCheckpoint); void endSnapshot(const redisDbPersistentData *psnapshot); private: @@ -1227,6 +1227,7 @@ private: bool m_fAllChanged = false; std::set m_setchanged; IStorage *m_pstorage = nullptr; + uint64_t mvccCheckpoint = 0; // Expire expireset *m_setexpire = nullptr; @@ -1236,6 +1237,7 @@ private: // in a snapshot redisDbPersistentData *m_pdbSnapshot = nullptr; std::unique_ptr m_spdbSnapshotHOLDER; + int m_snapshotRefcount = 0; }; /* Redis database representation. There are multiple databases identified @@ -1472,6 +1474,8 @@ typedef struct client { int buflenAsync; char *bufAsync; + uint64_t mvccCheckpoint = 0; // the MVCC checkpoint of our last write + int iel; /* the event loop index we're registered with */ struct fastlock lock; } client;