Fix majority of test issues with multithread load

Former-commit-id: 4db88176e33e3615ffb90852b49e76b12d5b4622
This commit is contained in:
John Sully 2021-04-30 18:45:37 +00:00
parent 3023bf4e6e
commit f4159146c8
5 changed files with 68 additions and 39 deletions

View File

@ -846,6 +846,7 @@ void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep,
} }
thread_local spin_worker tl_worker = nullptr; thread_local spin_worker tl_worker = nullptr;
thread_local bool fOwnLockOverride = false;
void setAeLockSetThreadSpinWorker(spin_worker worker) void setAeLockSetThreadSpinWorker(spin_worker worker)
{ {
tl_worker = worker; tl_worker = worker;
@ -866,9 +867,14 @@ void aeReleaseLock()
g_lock.unlock(); g_lock.unlock();
} }
void aeSetThreadOwnsLockOverride(bool fOverride)
{
fOwnLockOverride = fOverride;
}
int aeThreadOwnsLock() int aeThreadOwnsLock()
{ {
return g_lock.fOwnLock(); return fOwnLockOverride || g_lock.fOwnLock();
} }
int aeLockContested(int threshold) int aeLockContested(int threshold)

View File

@ -169,6 +169,7 @@ void aeAcquireLock();
int aeTryAcquireLock(int fWeak); int aeTryAcquireLock(int fWeak);
void aeReleaseLock(); void aeReleaseLock();
int aeThreadOwnsLock(); int aeThreadOwnsLock();
void aeSetThreadOwnsLockOverride(bool fOverride);
int aeLockContested(int threshold); int aeLockContested(int threshold);
int aeLockContention(); // returns the number of instantaneous threads waiting on the lock int aeLockContention(); // returns the number of instantaneous threads waiting on the lock

View File

@ -2336,41 +2336,6 @@ void stopSaving(int success) {
NULL); NULL);
} }
/* Track loading progress in order to serve client's from time to time
and if needed calculate rdb checksum */
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (g_pserver->rdb_checksum)
rioGenericUpdateChecksum(r, buf, len);
if ((g_pserver->loading_process_events_interval_bytes &&
(r->processed_bytes + len)/g_pserver->loading_process_events_interval_bytes > r->processed_bytes/g_pserver->loading_process_events_interval_bytes) ||
(g_pserver->loading_process_events_interval_keys &&
(r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys)))
{
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln);
if (mi->repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster(mi);
}
loadingProgress(r->processed_bytes);
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
processModuleLoadingProgressEvent(0);
robj *ping_argv[1];
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
r->keys_since_last_callback = 0;
}
}
struct rdbInsertJob struct rdbInsertJob
{ {
redisDb *db; redisDb *db;
@ -2393,6 +2358,7 @@ class rdbAsyncWorkThread
bool fExit = false; bool fExit = false;
std::atomic<size_t> ckeysLoaded; std::atomic<size_t> ckeysLoaded;
std::thread m_thread; std::thread m_thread;
list *clients_pending_async_write = nullptr;
public: public:
@ -2405,10 +2371,14 @@ public:
~rdbAsyncWorkThread() { ~rdbAsyncWorkThread() {
if (!fExit && m_thread.joinable()) if (!fExit && m_thread.joinable())
endWork(); endWork();
if (clients_pending_async_write)
listRelease(clients_pending_async_write);
} }
void start() { void start() {
m_thread = std::thread(&rdbAsyncWorkThread::loadWorkerThreadMain, this); if (clients_pending_async_write == nullptr)
clients_pending_async_write = listCreate();
m_thread = std::thread(&rdbAsyncWorkThread::loadWorkerThreadMain, this, clients_pending_async_write);
} }
void enqueue(rdbInsertJob &job) { void enqueue(rdbInsertJob &job) {
@ -2435,18 +2405,24 @@ public:
cv.notify_one(); cv.notify_one();
l.unlock(); l.unlock();
m_thread.join(); m_thread.join();
listJoin(serverTL->clients_pending_asyncwrite, clients_pending_async_write);
ProcessPendingAsyncWrites();
return ckeysLoaded; return ckeysLoaded;
} }
static void loadWorkerThreadMain(rdbAsyncWorkThread *pqueue) { static void loadWorkerThreadMain(rdbAsyncWorkThread *pqueue, list *clients_pending_asyncwrite) {
rdbAsyncWorkThread &queue = *pqueue; rdbAsyncWorkThread &queue = *pqueue;
redisServerThreadVars vars;
vars.clients_pending_asyncwrite = clients_pending_asyncwrite;
serverTL = &vars;
aeSetThreadOwnsLockOverride(true);
for (;;) { for (;;) {
std::unique_lock<std::mutex> lock(queue.mutex); std::unique_lock<std::mutex> lock(queue.mutex);
if (queue.queuejobs.empty() && queue.queuefn.empty()) { if (queue.queuejobs.empty() && queue.queuefn.empty()) {
if (queue.fExit) if (queue.fExit)
break; break;
queue.cv.wait(lock); queue.cv.wait(lock);
if (queue.fExit) if (queue.queuejobs.empty() && queue.queuefn.empty() && queue.fExit)
break; break;
} }
@ -2518,9 +2494,48 @@ public:
} }
} }
} }
aeSetThreadOwnsLockOverride(false);
} }
}; };
/* Track loading progress in order to serve client's from time to time
and if needed calculate rdb checksum */
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (g_pserver->rdb_checksum)
rioGenericUpdateChecksum(r, buf, len);
if ((g_pserver->loading_process_events_interval_bytes &&
(r->processed_bytes + len)/g_pserver->loading_process_events_interval_bytes > r->processed_bytes/g_pserver->loading_process_events_interval_bytes) ||
(g_pserver->loading_process_events_interval_keys &&
(r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys)))
{
rdbAsyncWorkThread *pwthread = reinterpret_cast<rdbAsyncWorkThread*>(r->chksum_arg);
pwthread->endWork(); // We can't have the work queue modifying the database while processEventsWhileBlocked does its thing
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln);
if (mi->repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster(mi);
}
loadingProgress(r->processed_bytes);
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
processModuleLoadingProgressEvent(0);
robj *ping_argv[1];
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
pwthread->start();
r->keys_since_last_callback = 0;
}
}
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */ * otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
@ -2543,6 +2558,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
} }
rdb->update_cksum = rdbLoadProgressCallback; rdb->update_cksum = rdbLoadProgressCallback;
rdb->chksum_arg = &wqueue;
rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes; rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes;
if (rioRead(rdb,buf,9) == 0) goto eoferr; if (rioRead(rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0'; buf[9] = '\0';

View File

@ -99,6 +99,7 @@ static const rio rioBufferIO = {
rioBufferTell, rioBufferTell,
rioBufferFlush, rioBufferFlush,
NULL, /* update_checksum */ NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */ 0, /* current checksum */
0, /* flags */ 0, /* flags */
0, /* bytes read or written */ 0, /* bytes read or written */
@ -113,6 +114,7 @@ static const rio rioConstBufferIO = {
rioBufferTell, rioBufferTell,
rioBufferFlush, rioBufferFlush,
NULL, /* update_checksum */ NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */ 0, /* current checksum */
0, /* flags */ 0, /* flags */
0, /* bytes read or written */ 0, /* bytes read or written */
@ -176,6 +178,7 @@ static const rio rioFileIO = {
rioFileTell, rioFileTell,
rioFileFlush, rioFileFlush,
NULL, /* update_checksum */ NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */ 0, /* current checksum */
0, /* flags */ 0, /* flags */
0, /* bytes read or written */ 0, /* bytes read or written */
@ -272,6 +275,7 @@ static const rio rioConnIO = {
rioConnTell, rioConnTell,
rioConnFlush, rioConnFlush,
NULL, /* update_checksum */ NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */ 0, /* current checksum */
0, /* flags */ 0, /* flags */
0, /* bytes read or written */ 0, /* bytes read or written */
@ -391,6 +395,7 @@ static const rio rioFdIO = {
rioFdTell, rioFdTell,
rioFdFlush, rioFdFlush,
NULL, /* update_checksum */ NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */ 0, /* current checksum */
0, /* flags */ 0, /* flags */
0, /* bytes read or written */ 0, /* bytes read or written */

View File

@ -58,6 +58,7 @@ struct _rio {
* and len fields pointing to the new block of data to add to the checksum * and len fields pointing to the new block of data to add to the checksum
* computation. */ * computation. */
void (*update_cksum)(struct _rio *, const void *buf, size_t len); void (*update_cksum)(struct _rio *, const void *buf, size_t len);
void *chksum_arg;
/* The current checksum and flags (see RIO_FLAG_*) */ /* The current checksum and flags (see RIO_FLAG_*) */
uint64_t cksum, flags; uint64_t cksum, flags;