Don't be in tracking mode during load as processChangesAsync works outside the normal system
Former-commit-id: 8d31ce6eafea1cea2f9f4ea25e44306efef28fa3
This commit is contained in:
parent
14ab4823f0
commit
f3fb4e3209
@ -2902,6 +2902,7 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
|
|||||||
void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
|
void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
|
||||||
{
|
{
|
||||||
++pendingJobs;
|
++pendingJobs;
|
||||||
|
serverAssert(!m_fAllChanged);
|
||||||
dictEmpty(m_dictChanged, nullptr);
|
dictEmpty(m_dictChanged, nullptr);
|
||||||
dict *dictNew = dictCreate(&dbDictType, nullptr);
|
dict *dictNew = dictCreate(&dbDictType, nullptr);
|
||||||
std::swap(dictNew, m_pdict);
|
std::swap(dictNew, m_pdict);
|
||||||
|
23
src/rdb.cpp
23
src/rdb.cpp
@ -2927,6 +2927,16 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
|||||||
bool fLastKeyExpired = false;
|
bool fLastKeyExpired = false;
|
||||||
std::unique_ptr<rdbInsertJob> spjob;
|
std::unique_ptr<rdbInsertJob> spjob;
|
||||||
|
|
||||||
|
// If we're tracking changes we need to reset this
|
||||||
|
bool fTracking = g_pserver->db[0]->FTrackingChanges();
|
||||||
|
if (fTracking) {
|
||||||
|
// We don't want to track here because processChangesAsync is outside the normal scope handling
|
||||||
|
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||||
|
if (g_pserver->db[idb]->processChanges(false))
|
||||||
|
g_pserver->db[idb]->commitChanges();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rdb->update_cksum = rdbLoadProgressCallback;
|
rdb->update_cksum = rdbLoadProgressCallback;
|
||||||
rdb->chksum_arg = &wqueue;
|
rdb->chksum_arg = &wqueue;
|
||||||
rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes;
|
rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes;
|
||||||
@ -3249,6 +3259,12 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
wqueue.endWork();
|
wqueue.endWork();
|
||||||
|
if (fTracking) {
|
||||||
|
// Reset track changes
|
||||||
|
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||||
|
g_pserver->db[idb]->trackChanges(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return C_OK;
|
return C_OK;
|
||||||
|
|
||||||
@ -3257,6 +3273,13 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
|||||||
* the RDB file from a socket during initial SYNC (diskless replica mode),
|
* the RDB file from a socket during initial SYNC (diskless replica mode),
|
||||||
* we'll report the error to the caller, so that we can retry. */
|
* we'll report the error to the caller, so that we can retry. */
|
||||||
eoferr:
|
eoferr:
|
||||||
|
if (fTracking) {
|
||||||
|
// Reset track changes
|
||||||
|
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||||
|
g_pserver->db[idb]->trackChanges(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
wqueue.endWork();
|
wqueue.endWork();
|
||||||
if (key != nullptr)
|
if (key != nullptr)
|
||||||
{
|
{
|
||||||
|
@ -1154,6 +1154,7 @@ public:
|
|||||||
void setStorageProvider(StorageCache *pstorage);
|
void setStorageProvider(StorageCache *pstorage);
|
||||||
|
|
||||||
void trackChanges(bool fBulk, size_t sizeHint = 0);
|
void trackChanges(bool fBulk, size_t sizeHint = 0);
|
||||||
|
bool FTrackingChanges() const { return !!m_fTrackingChanges; }
|
||||||
|
|
||||||
// Process and commit changes for secondary storage. Note that process and commit are seperated
|
// Process and commit changes for secondary storage. Note that process and commit are seperated
|
||||||
// to allow you to release the global lock before commiting. To prevent deadlocks you *must*
|
// to allow you to release the global lock before commiting. To prevent deadlocks you *must*
|
||||||
@ -1338,6 +1339,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
|
|||||||
using redisDbPersistentData::prefetchKeysAsync;
|
using redisDbPersistentData::prefetchKeysAsync;
|
||||||
using redisDbPersistentData::prepOverwriteForSnapshot;
|
using redisDbPersistentData::prepOverwriteForSnapshot;
|
||||||
using redisDbPersistentData::FRehashing;
|
using redisDbPersistentData::FRehashing;
|
||||||
|
using redisDbPersistentData::FTrackingChanges;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
expireset::setiter expireitr;
|
expireset::setiter expireitr;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user