diff --git a/src/networking.cpp b/src/networking.cpp index 08eebedba..5b1fe5894 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2079,8 +2079,6 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { * that may trigger write error or recreate handler. */ if ((flags & CLIENT_PROTECTED) && !(flags & CLIENT_SLAVE)) continue; - //std::unique_locklock)> lock(c->lock); - /* Don't write to clients that are going to be closed anyway. */ if (c->flags & CLIENT_CLOSE_ASAP) continue; @@ -2098,6 +2096,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ + std::unique_locklock)> lock(c->lock); if (clientHasPendingReplies(c)) { if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) { freeClientAsync(c); diff --git a/src/replication.cpp b/src/replication.cpp index a71c1560f..0ef43485b 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -264,9 +264,8 @@ void resizeReplicationBacklog(long long newsize) { newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; if (g_pserver->repl_backlog_size == newsize) return; - std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); - if (g_pserver->repl_backlog != NULL) { + std::unique_lock repl_backlog_lock(g_pserver->repl_backlog_lock); /* What we actually do is to flush the old buffer and realloc a new * empty one. It will refill with new data incrementally. * The reason is that copying a few gigabytes adds latency and even @@ -357,7 +356,7 @@ void freeReplicationBacklog(void) { void feedReplicationBacklog(const void *ptr, size_t len) { serverAssert(GlobalLocksAcquired()); const unsigned char *p = (const unsigned char*)ptr; - + std::unique_lock repl_backlog_lock(g_pserver->repl_backlog_lock, std::defer_lock); if (g_pserver->repl_batch_idxStart >= 0) { /* We are lower bounded by the lowest replica offset, or the batch offset start if not applicable */ @@ -417,6 +416,8 @@ void feedReplicationBacklog(const void *ptr, size_t len) { // We need to update a few variables or later asserts will notice we dropped data g_pserver->repl_batch_offStart = g_pserver->master_repl_offset + len; g_pserver->repl_lowest_off = -1; + if (!repl_backlog_lock.owns_lock()) + repl_backlog_lock.lock(); // we need to acquire the lock if we'll be overwriting data that writeToClient may be reading } } }