diff --git a/src/blocked.c b/src/blocked.c index c467ebf2c..ad7113d52 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -161,10 +161,12 @@ void queueClientForReprocessing(client *c) { /* The client may already be into the unblocked list because of a previous * blocking operation, don't add back it into the list multiple times. */ serverAssert(aeThreadOwnsLock()); + fastlock_lock(&c->lock); if (!(c->flags & CLIENT_UNBLOCKED)) { c->flags |= CLIENT_UNBLOCKED; listAddNodeTail(server.rgthreadvar[c->iel].unblocked_clients,c); } + fastlock_unlock(&c->lock); } /* Unblock a client calling the right function depending on the kind @@ -258,6 +260,7 @@ void disconnectAllBlockedClients(void) { * be used only for a single type, like virtually any Redis application will * do, the function is already fair. */ void handleClientsBlockedOnKeys(void) { + serverAssert(aeThreadOwnsLock()); while(listLength(server.ready_keys) != 0) { list *l; diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 4108c8e42..4a4fb2962 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -33,6 +33,7 @@ #include #include #include +#include /**************************************************** * @@ -56,6 +57,7 @@ extern "C" void fastlock_init(struct fastlock *lock) lock->m_ticket.m_active = 0; lock->m_ticket.m_avail = 0; lock->m_depth = 0; + lock->m_pidOwner = -1; } extern "C" void fastlock_lock(struct fastlock *lock) @@ -111,6 +113,7 @@ extern "C" void fastlock_unlock(struct fastlock *lock) --lock->m_depth; if (lock->m_depth == 0) { + assert((int)__atomic_load_4(&lock->m_pidOwner, __ATOMIC_RELAXED) >= 0); // unlock after free lock->m_pidOwner = -1; std::atomic_thread_fence(std::memory_order_acquire); __atomic_fetch_add(&lock->m_ticket.m_active, 1, __ATOMIC_ACQ_REL); @@ -120,7 +123,9 @@ extern "C" void fastlock_unlock(struct fastlock *lock) extern "C" void fastlock_free(struct fastlock *lock) { // NOP - (void)lock; + assert((lock->m_ticket.m_active == lock->m_ticket.m_avail) // Asser the lock is unlocked + || (lock->m_pidOwner == gettid() && (lock->m_ticket.m_active == lock->m_ticket.m_avail-1))); // OR we own the lock and nobody else is waiting + lock->m_pidOwner = -2; // sentinal value indicating free } diff --git a/src/module.c b/src/module.c index ea5f3271c..54a36e0c1 100644 --- a/src/module.c +++ b/src/module.c @@ -3642,6 +3642,7 @@ void moduleHandleBlockedClients(void) { if (c) { AssertCorrectThread(c); + fastlock_lock(&c->lock); } /* Release the lock during the loop, as long as we don't @@ -3708,6 +3709,7 @@ void moduleHandleBlockedClients(void) { /* Free 'bc' only after unblocking the client, since it is * referenced in the client blocking context, and must be valid * when calling unblockClient(). */ + fastlock_unlock(&c->lock); zfree(bc); /* Lock again before to iterate the loop. */ diff --git a/src/networking.cpp b/src/networking.cpp index 435241a7b..6379ffd09 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -55,12 +55,23 @@ public: { serverAssert(!m_fArmed); serverAssert(c->lock.fOwnLock()); + + bool fClientLocked = true; while (!aeTryAcquireLock()) { - c->lock.unlock(); - // give a chance for the global lock to progress if they were waiting on the client - c->lock.lock(); + if (fClientLocked) c->lock.unlock(); + fClientLocked = false; + aeAcquireLock(); + if (!c->lock.try_lock()) + { + aeReleaseLock(); + } + else + { + break; + } } + m_fArmed = true; } else if (!m_fArmed) @@ -239,6 +250,7 @@ void clientInstallWriteHandler(client *c) { (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) { AssertCorrectThread(c); + serverAssert(c->lock.fOwnLock()); /* Here instead of installing the write handler, we just flag the * client and put it into a list of clients that have something * to write to the socket. This way before re-entering the event @@ -324,6 +336,7 @@ int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) { int minsize = len + c->bufposAsync; c->buflenAsync = std::max(minsize, c->buflenAsync*2 - c->buflenAsync); c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL); + c->buflenAsync = zmalloc_usable(c->bufAsync); } memcpy(c->bufAsync+c->bufposAsync,s,len); c->bufposAsync += len; @@ -1185,6 +1198,7 @@ void unlinkClient(client *c) { listNode *ln; AssertCorrectThread(c); serverAssert(aeThreadOwnsLock()); + serverAssert(c->lock.fOwnLock()); /* If this is marked as current client unset it. */ if (server.current_client == c) server.current_client = NULL; @@ -1227,15 +1241,17 @@ void unlinkClient(client *c) { if (c->fPendingAsyncWrite) { ln = NULL; - int iel = 0; - for (; iel < server.cthreads; ++iel) + bool fFound = false; + for (int iel = 0; iel < server.cthreads; ++iel) { ln = listSearchKey(server.rgthreadvar[iel].clients_pending_asyncwrite,c); if (ln) - break; + { + fFound = true; + listDelNode(server.rgthreadvar[iel].clients_pending_asyncwrite,ln); + } } - serverAssert(ln != NULL); - listDelNode(server.rgthreadvar[iel].clients_pending_asyncwrite,ln); + serverAssert(fFound); c->fPendingAsyncWrite = FALSE; } } @@ -1244,6 +1260,7 @@ void freeClient(client *c) { listNode *ln; serverAssert(aeThreadOwnsLock()); AssertCorrectThread(c); + std::unique_locklock)> ulock(c->lock); /* If a client is protected, yet we need to free it right now, make sure * to at least use asynchronous freeing. */ @@ -1340,6 +1357,7 @@ void freeClient(client *c) { zfree(c->argv); freeClientMultiState(c); sdsfree(c->peerid); + ulock.unlock(); fastlock_free(&c->lock); zfree(c); } @@ -1352,6 +1370,7 @@ void freeClientAsync(client *c) { if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; AeLocker lock; lock.arm(nullptr); + std::lock_guardlock)> clientlock(c->lock); c->flags |= CLIENT_CLOSE_ASAP; listAddNodeTail(server.clients_to_close,c); } @@ -1456,6 +1475,7 @@ int writeToClient(int fd, client *c, int handler_installed) { } else { serverLog(LL_VERBOSE, "Error writing to client: %s", strerror(errno)); + lock.unlock(); if (aeTryAcquireLock()) { freeClient(c); @@ -1483,6 +1503,7 @@ int writeToClient(int fd, client *c, int handler_installed) { /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { + lock.unlock(); if (aeTryAcquireLock()) { freeClient(c); @@ -1574,7 +1595,7 @@ int handleClientsWithPendingWrites(int iel) { listRewind(list,&li); while((ln = listNext(&li))) { client *c = (client*)listNodeValue(ln); - std::lock_guardlock)> lock(c->lock); + std::unique_locklock)> lock(c->lock); c->flags &= ~CLIENT_PENDING_WRITE; listDelNode(list,ln); @@ -1585,7 +1606,10 @@ int handleClientsWithPendingWrites(int iel) { if (c->flags & CLIENT_PROTECTED) continue; /* Try to write buffers to the client socket. */ - if (writeToClient(c->fd,c,0) == C_ERR) continue; + if (writeToClient(c->fd,c,0) == C_ERR) { + lock.release(); // client is free'd + continue; + } /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ @@ -1956,14 +1980,15 @@ void processInputBuffer(client *c) { } else { serverPanic("Unknown request type"); } - AeLocker locker; - locker.arm(c); - server.current_client = c; /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { + AeLocker locker; + locker.arm(c); + server.current_client = c; + /* Only reset the client when the command was executed. */ if (processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { @@ -1982,6 +2007,7 @@ void processInputBuffer(client *c) { * result into a slave, that may be the active client, to be * freed. */ if (server.current_client == NULL) break; + server.current_client = NULL; } } @@ -1990,8 +2016,6 @@ void processInputBuffer(client *c) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } - - server.current_client = NULL; } /* This is a wrapper for processInputBuffer that also cares about handling @@ -2058,13 +2082,15 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); - aelock.arm(c); + lock.unlock(); + aelock.arm(nullptr); freeClient(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); - aelock.arm(c); + lock.unlock(); + aelock.arm(nullptr); freeClient(c); return; } else if (c->flags & CLIENT_MASTER) { @@ -2086,7 +2112,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); - aelock.arm(c); + lock.unlock(); + aelock.arm(nullptr); freeClient(c); return; } @@ -2098,7 +2125,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ processInputBufferAndReplicate(c); - aelock.arm(c); + aelock.arm(nullptr); ProcessPendingAsyncWrites(); } diff --git a/src/object.c b/src/object.c index 88efba4d5..5ceb386ec 100644 --- a/src/object.c +++ b/src/object.c @@ -82,10 +82,10 @@ robj *createRawStringObject(const char *ptr, size_t len) { * an object where the sds string is actually an unmodifiable string * allocated in the same chunk as the object itself. */ robj *createEmbeddedStringObject(const char *ptr, size_t len) { - size_t alloclen = len; - if (len < sizeof(void*)) - alloclen = sizeof(void*); - robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr8)+alloclen+1-sizeof(o->m_ptr), MALLOC_SHARED); + size_t allocsize = sizeof(struct sdshdr8)+len+1; + if (allocsize < sizeof(void*)) + allocsize = sizeof(void*); + robj *o = zmalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED); struct sdshdr8 *sh = (void*)(&o->m_ptr); o->type = OBJ_STRING; diff --git a/src/replication.cpp b/src/replication.cpp index 7b4409e50..81ec857f8 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -188,13 +188,6 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { * master replication history and has the same backlog and offsets). */ if (server.masterhost != NULL) return; - /* If the instance is not a top level master, return ASAP: we'll just proxy - * the stream of data we receive from our master instead, in order to - * propagate *identical* replication stream. In this way this slave can - * advertise the same replication ID as the master (since it shares the - * master replication history and has the same backlog and offsets). */ - if (server.masterhost != NULL) return; - /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ if (server.repl_backlog == NULL && listLength(slaves) == 0) return; @@ -889,7 +882,7 @@ void putSlaveOnline(client *slave) { slave->replstate = SLAVE_STATE_ONLINE; slave->repl_put_online_on_ack = 0; slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ - //AssertCorrectThread(slave); + AssertCorrectThread(slave); if (aeCreateFileEvent(server.rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE|AE_WRITE_THREADSAFE, sendReplyToClient, slave) == AE_ERR) { serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno)); @@ -2042,7 +2035,12 @@ void replicationUnsetMaster(void) { * used as secondary ID up to the current offset, and a new replication * ID is created to continue with a new replication history. */ shiftReplicationId(); - if (server.master) freeClientAsync(server.master); + if (server.master) { + if (FCorrectThread(server.master)) + freeClient(server.master); + else + freeClientAsync(server.master); + } replicationDiscardCachedMaster(); cancelReplicationHandshake(); /* Disconnecting all the slaves is required: we need to inform slaves @@ -2216,6 +2214,7 @@ void replicationCacheMaster(client *c) { serverAssert(server.master != NULL && server.cached_master == NULL); serverLog(LL_NOTICE,"Caching the disconnected master state."); AssertCorrectThread(c); + std::lock_guardlock)> clientlock(c->lock); /* Unlink the client from the server structures. */ unlinkClient(c); @@ -2265,6 +2264,7 @@ void replicationCacheMasterUsingMyself(void) { * the new master will start its replication stream with SELECT. */ server.master_initial_offset = server.master_repl_offset; replicationCreateMasterClient(-1,-1); + std::lock_guardlock)> lock(server.master->lock); /* Use our own ID / offset. */ memcpy(server.master->replid, server.replid, sizeof(server.replid)); @@ -2283,7 +2283,10 @@ void replicationDiscardCachedMaster(void) { serverLog(LL_NOTICE,"Discarding previously cached master state."); server.cached_master->flags &= ~CLIENT_MASTER; - freeClientAsync(server.cached_master); + if (FCorrectThread(server.cached_master)) + freeClient(server.cached_master); + else + freeClientAsync(server.cached_master); server.cached_master = NULL; } @@ -2705,7 +2708,10 @@ void replicationCron(void) { { serverLog(LL_WARNING, "Disconnecting timedout replica: %s", replicationGetSlaveName(slave)); - freeClientAsync(slave); + if (FCorrectThread(slave)) + freeClient(slave); + else + freeClientAsync(slave); } } } diff --git a/src/server.c b/src/server.c index 6b49c5809..457eaee66 100644 --- a/src/server.c +++ b/src/server.c @@ -2109,9 +2109,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ - aeReleaseLock(); handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN); - aeAcquireLock(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this @@ -2128,10 +2126,10 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) if (listLength(server.rgthreadvar[iel].unblocked_clients)) { processUnblockedClients(iel); } - aeReleaseLock(); /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(iel); + aeReleaseLock(); } /* This function is called immadiately after the event loop multiplexing @@ -4065,6 +4063,7 @@ sds genRedisInfoString(char *section) { bytesToHuman(maxmemory_hmem,server.maxmemory); if (sections++) info = sdscat(info,"\r\n"); + serverLog(LL_WARNING, "OOM max sent used_memory: %zu", zmalloc_used); info = sdscatprintf(info, "# Memory\r\n" "used_memory:%zu\r\n" @@ -4499,6 +4498,7 @@ void infoCommand(client *c) { return; } addReplyBulkSds(c, genRedisInfoString(section)); + serverLog(LL_WARNING, "OOM max info command %zu", zmalloc_used_memory()); } void monitorCommand(client *c) { @@ -5038,8 +5038,8 @@ int main(int argc, char **argv) { initServer(); - server.cthreads = 2; //testing - initNetworking(1 /* fReusePort */); + server.cthreads = 1; //testing + initNetworking(0 /* fReusePort */); if (background || server.pidfile) createPidFile(); redisSetProcTitle(argv[0]); @@ -5080,12 +5080,11 @@ int main(int argc, char **argv) { serverAssert(server.cthreads > 0 && server.cthreads <= MAX_EVENT_LOOPS); pthread_t rgthread[MAX_EVENT_LOOPS]; - for (int iel = 0; iel < server.cthreads; ++iel) + for (int iel = 1; iel < server.cthreads; ++iel) { pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel)); } - void *pretT; - pthread_join(rgthread[IDX_EVENT_LOOP_MAIN], &pretT); + workerThreadMain((void*)((int64_t)IDX_EVENT_LOOP_MAIN)); return 0; } diff --git a/src/zmalloc.c b/src/zmalloc.c index c1e1ab081..090009485 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -74,6 +74,10 @@ void zlibc_free(void *ptr) { #define free(ptr) je_free(ptr) #define mallocx(size,flags) je_mallocx(size,flags) #define dallocx(ptr,flags) je_dallocx(ptr,flags) +#else +#define malloc(size, type) malloc(size) +#define calloc(count,size,type) calloc(count,size) +#define realloc(ptr,size,type) realloc(ptr,size) #endif #define update_zmalloc_stat_alloc(__n) do { \ diff --git a/src/zmalloc.h b/src/zmalloc.h index 4f917717a..a7b980025 100644 --- a/src/zmalloc.h +++ b/src/zmalloc.h @@ -40,9 +40,6 @@ #define ZMALLOC_LIB ("memkind") #undef USE_JEMALLOC #define USE_MALLOC_CLASS 1 - // Even though memkind supports malloc_usable_size we don't use it for performance reasons - //#define HAVE_MALLOC_SIZE 0 - //#define zmalloc_size(p) salloc_usable_size(p) #elif defined(USE_TCMALLOC) #define ZMALLOC_LIB ("tcmalloc-" __xstr(TC_VERSION_MAJOR) "." __xstr(TC_VERSION_MINOR)) #include