From f4159146c81102c53ebf0ef01dfceddb0b80792f Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 30 Apr 2021 18:45:37 +0000 Subject: [PATCH] Fix majority of test issues with multithread load Former-commit-id: 4db88176e33e3615ffb90852b49e76b12d5b4622 --- src/ae.cpp | 8 ++++- src/ae.h | 1 + src/rdb.cpp | 92 +++++++++++++++++++++++++++++++---------------------- src/rio.cpp | 5 +++ src/rio.h | 1 + 5 files changed, 68 insertions(+), 39 deletions(-) diff --git a/src/ae.cpp b/src/ae.cpp index f96ef4f6c..29d687077 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -846,6 +846,7 @@ void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep, } thread_local spin_worker tl_worker = nullptr; +thread_local bool fOwnLockOverride = false; void setAeLockSetThreadSpinWorker(spin_worker worker) { tl_worker = worker; @@ -866,9 +867,14 @@ void aeReleaseLock() g_lock.unlock(); } +void aeSetThreadOwnsLockOverride(bool fOverride) +{ + fOwnLockOverride = fOverride; +} + int aeThreadOwnsLock() { - return g_lock.fOwnLock(); + return fOwnLockOverride || g_lock.fOwnLock(); } int aeLockContested(int threshold) diff --git a/src/ae.h b/src/ae.h index aec1df154..9d8821143 100644 --- a/src/ae.h +++ b/src/ae.h @@ -169,6 +169,7 @@ void aeAcquireLock(); int aeTryAcquireLock(int fWeak); void aeReleaseLock(); int aeThreadOwnsLock(); +void aeSetThreadOwnsLockOverride(bool fOverride); int aeLockContested(int threshold); int aeLockContention(); // returns the number of instantaneous threads waiting on the lock diff --git a/src/rdb.cpp b/src/rdb.cpp index a9d701933..ad2823197 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2336,41 +2336,6 @@ void stopSaving(int success) { NULL); } -/* Track loading progress in order to serve client's from time to time - and if needed calculate rdb checksum */ -void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { - if (g_pserver->rdb_checksum) - rioGenericUpdateChecksum(r, buf, len); - - if ((g_pserver->loading_process_events_interval_bytes && - (r->processed_bytes + len)/g_pserver->loading_process_events_interval_bytes > r->processed_bytes/g_pserver->loading_process_events_interval_bytes) || - (g_pserver->loading_process_events_interval_keys && - (r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys))) - { - listIter li; - listNode *ln; - listRewind(g_pserver->masters, &li); - while ((ln = listNext(&li))) - { - struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln); - if (mi->repl_state == REPL_STATE_TRANSFER) - replicationSendNewlineToMaster(mi); - } - loadingProgress(r->processed_bytes); - processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); - processModuleLoadingProgressEvent(0); - - robj *ping_argv[1]; - - ping_argv[0] = createStringObject("PING",4); - replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1); - decrRefCount(ping_argv[0]); - - r->keys_since_last_callback = 0; - } -} - - struct rdbInsertJob { redisDb *db; @@ -2393,6 +2358,7 @@ class rdbAsyncWorkThread bool fExit = false; std::atomic ckeysLoaded; std::thread m_thread; + list *clients_pending_async_write = nullptr; public: @@ -2405,10 +2371,14 @@ public: ~rdbAsyncWorkThread() { if (!fExit && m_thread.joinable()) endWork(); + if (clients_pending_async_write) + listRelease(clients_pending_async_write); } void start() { - m_thread = std::thread(&rdbAsyncWorkThread::loadWorkerThreadMain, this); + if (clients_pending_async_write == nullptr) + clients_pending_async_write = listCreate(); + m_thread = std::thread(&rdbAsyncWorkThread::loadWorkerThreadMain, this, clients_pending_async_write); } void enqueue(rdbInsertJob &job) { @@ -2435,18 +2405,24 @@ public: cv.notify_one(); l.unlock(); m_thread.join(); + listJoin(serverTL->clients_pending_asyncwrite, clients_pending_async_write); + ProcessPendingAsyncWrites(); return ckeysLoaded; } - static void loadWorkerThreadMain(rdbAsyncWorkThread *pqueue) { + static void loadWorkerThreadMain(rdbAsyncWorkThread *pqueue, list *clients_pending_asyncwrite) { rdbAsyncWorkThread &queue = *pqueue; + redisServerThreadVars vars; + vars.clients_pending_asyncwrite = clients_pending_asyncwrite; + serverTL = &vars; + aeSetThreadOwnsLockOverride(true); for (;;) { std::unique_lock lock(queue.mutex); if (queue.queuejobs.empty() && queue.queuefn.empty()) { if (queue.fExit) break; queue.cv.wait(lock); - if (queue.fExit) + if (queue.queuejobs.empty() && queue.queuefn.empty() && queue.fExit) break; } @@ -2518,9 +2494,48 @@ public: } } } + aeSetThreadOwnsLockOverride(false); } }; +/* Track loading progress in order to serve client's from time to time + and if needed calculate rdb checksum */ +void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { + if (g_pserver->rdb_checksum) + rioGenericUpdateChecksum(r, buf, len); + + if ((g_pserver->loading_process_events_interval_bytes && + (r->processed_bytes + len)/g_pserver->loading_process_events_interval_bytes > r->processed_bytes/g_pserver->loading_process_events_interval_bytes) || + (g_pserver->loading_process_events_interval_keys && + (r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys))) + { + rdbAsyncWorkThread *pwthread = reinterpret_cast(r->chksum_arg); + pwthread->endWork(); // We can't have the work queue modifying the database while processEventsWhileBlocked does its thing + listIter li; + listNode *ln; + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) + { + struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln); + if (mi->repl_state == REPL_STATE_TRANSFER) + replicationSendNewlineToMaster(mi); + } + loadingProgress(r->processed_bytes); + processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); + processModuleLoadingProgressEvent(0); + + robj *ping_argv[1]; + + ping_argv[0] = createStringObject("PING",4); + replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1); + decrRefCount(ping_argv[0]); + pwthread->start(); + + r->keys_since_last_callback = 0; + } +} + + /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { @@ -2543,6 +2558,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } rdb->update_cksum = rdbLoadProgressCallback; + rdb->chksum_arg = &wqueue; rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes; if (rioRead(rdb,buf,9) == 0) goto eoferr; buf[9] = '\0'; diff --git a/src/rio.cpp b/src/rio.cpp index 99c8e98c6..82a3969f3 100644 --- a/src/rio.cpp +++ b/src/rio.cpp @@ -99,6 +99,7 @@ static const rio rioBufferIO = { rioBufferTell, rioBufferFlush, NULL, /* update_checksum */ + NULL, /* update checksum arg */ 0, /* current checksum */ 0, /* flags */ 0, /* bytes read or written */ @@ -113,6 +114,7 @@ static const rio rioConstBufferIO = { rioBufferTell, rioBufferFlush, NULL, /* update_checksum */ + NULL, /* update checksum arg */ 0, /* current checksum */ 0, /* flags */ 0, /* bytes read or written */ @@ -176,6 +178,7 @@ static const rio rioFileIO = { rioFileTell, rioFileFlush, NULL, /* update_checksum */ + NULL, /* update checksum arg */ 0, /* current checksum */ 0, /* flags */ 0, /* bytes read or written */ @@ -272,6 +275,7 @@ static const rio rioConnIO = { rioConnTell, rioConnFlush, NULL, /* update_checksum */ + NULL, /* update checksum arg */ 0, /* current checksum */ 0, /* flags */ 0, /* bytes read or written */ @@ -391,6 +395,7 @@ static const rio rioFdIO = { rioFdTell, rioFdFlush, NULL, /* update_checksum */ + NULL, /* update checksum arg */ 0, /* current checksum */ 0, /* flags */ 0, /* bytes read or written */ diff --git a/src/rio.h b/src/rio.h index d48474fcb..86f3fa465 100644 --- a/src/rio.h +++ b/src/rio.h @@ -58,6 +58,7 @@ struct _rio { * and len fields pointing to the new block of data to add to the checksum * computation. */ void (*update_cksum)(struct _rio *, const void *buf, size_t len); + void *chksum_arg; /* The current checksum and flags (see RIO_FLAG_*) */ uint64_t cksum, flags;