Multiple threads should be able to get the same snapshot provided its not too old
Former-commit-id: 054331098ee18dfb1887fd2b0a67688ef894823e
This commit is contained in:
parent
4084c04112
commit
e25a4e80e7
@ -676,6 +676,7 @@ client *createFakeClient(void) {
|
||||
c->peerid = NULL;
|
||||
c->resp = 2;
|
||||
c->puser = NULL;
|
||||
c->mvccCheckpoint = 0;
|
||||
listSetFreeMethod(c->reply,freeClientReplyValue);
|
||||
listSetDupMethod(c->reply,dupClientReplyValue);
|
||||
fastlock_init(&c->lock, "fake client");
|
||||
|
19
src/db.cpp
19
src/db.cpp
@ -724,6 +724,7 @@ void keysCommand(client *c) {
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
setDeferredArrayLen(c,replylen,numkeys);
|
||||
}
|
||||
|
||||
@ -2077,10 +2078,18 @@ void redisDbPersistentData::processChanges()
|
||||
m_setchanged.clear();
|
||||
}
|
||||
|
||||
redisDbPersistentData *redisDbPersistentData::createSnapshot()
|
||||
redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint)
|
||||
{
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
serverAssert(m_spdbSnapshotHOLDER == nullptr);
|
||||
if (m_spdbSnapshotHOLDER != nullptr)
|
||||
{
|
||||
if (mvccCheckpoint <= m_spdbSnapshotHOLDER->mvccCheckpoint)
|
||||
{
|
||||
++m_snapshotRefcount;
|
||||
return m_spdbSnapshotHOLDER.get();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
auto spdb = std::make_unique<redisDbPersistentData>();
|
||||
|
||||
spdb->m_fAllChanged = false;
|
||||
@ -2095,6 +2104,7 @@ redisDbPersistentData *redisDbPersistentData::createSnapshot()
|
||||
|
||||
m_spdbSnapshotHOLDER = std::move(spdb);
|
||||
m_pdbSnapshot = m_spdbSnapshotHOLDER.get();
|
||||
++m_snapshotRefcount;
|
||||
return m_pdbSnapshot;
|
||||
}
|
||||
|
||||
@ -2103,6 +2113,11 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot)
|
||||
if (!GlobalLocksAcquired())
|
||||
serverLog(LL_WARNING, "Global locks not acquired");
|
||||
serverAssert(m_spdbSnapshotHOLDER.get() == psnapshot);
|
||||
|
||||
--m_snapshotRefcount;
|
||||
if (m_snapshotRefcount > 0)
|
||||
return;
|
||||
|
||||
m_spdbSnapshotHOLDER->m_pdict->iterators--;
|
||||
|
||||
if (m_pdbSnapshot == nullptr)
|
||||
|
@ -176,6 +176,7 @@ client *createClient(int fd, int iel) {
|
||||
c->bufposAsync = 0;
|
||||
c->client_tracking_redirection = 0;
|
||||
c->casyncOpsPending = 0;
|
||||
c->mvccCheckpoint = 0;
|
||||
memset(c->uuid, 0, UUID_BINARY_LEN);
|
||||
|
||||
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
|
||||
|
@ -1403,7 +1403,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
|
||||
args->rsi.master_repl_offset = g_pserver->master_repl_offset;
|
||||
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot();
|
||||
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(getMvccTstamp());
|
||||
|
||||
g_pserver->rdbThreadVars.tmpfileNum++;
|
||||
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
|
||||
@ -2585,7 +2585,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
||||
start = ustime();
|
||||
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot();
|
||||
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(getMvccTstamp());
|
||||
|
||||
g_pserver->rdbThreadVars.tmpfileNum++;
|
||||
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
|
||||
|
@ -3385,6 +3385,9 @@ void call(client *c, int flags) {
|
||||
dirty = g_pserver->dirty-dirty;
|
||||
if (dirty < 0) dirty = 0;
|
||||
|
||||
if (dirty)
|
||||
c->mvccCheckpoint = getMvccTstamp();
|
||||
|
||||
/* When EVAL is called loading the AOF we don't want commands called
|
||||
* from Lua to go into the slowlog or to populate statistics. */
|
||||
if (g_pserver->loading && c->flags & CLIENT_LUA)
|
||||
|
@ -1211,7 +1211,7 @@ public:
|
||||
expireset *setexpireUnsafe() { return m_setexpire; }
|
||||
const expireset *setexpire() { return m_setexpire; }
|
||||
|
||||
redisDbPersistentData *createSnapshot();
|
||||
redisDbPersistentData *createSnapshot(uint64_t mvccCheckpoint);
|
||||
void endSnapshot(const redisDbPersistentData *psnapshot);
|
||||
|
||||
private:
|
||||
@ -1227,6 +1227,7 @@ private:
|
||||
bool m_fAllChanged = false;
|
||||
std::set<std::string> m_setchanged;
|
||||
IStorage *m_pstorage = nullptr;
|
||||
uint64_t mvccCheckpoint = 0;
|
||||
|
||||
// Expire
|
||||
expireset *m_setexpire = nullptr;
|
||||
@ -1236,6 +1237,7 @@ private:
|
||||
// in a snapshot
|
||||
redisDbPersistentData *m_pdbSnapshot = nullptr;
|
||||
std::unique_ptr<redisDbPersistentData> m_spdbSnapshotHOLDER;
|
||||
int m_snapshotRefcount = 0;
|
||||
};
|
||||
|
||||
/* Redis database representation. There are multiple databases identified
|
||||
@ -1472,6 +1474,8 @@ typedef struct client {
|
||||
int buflenAsync;
|
||||
char *bufAsync;
|
||||
|
||||
uint64_t mvccCheckpoint = 0; // the MVCC checkpoint of our last write
|
||||
|
||||
int iel; /* the event loop index we're registered with */
|
||||
struct fastlock lock;
|
||||
} client;
|
||||
|
Loading…
x
Reference in New Issue
Block a user