Fix TSAN warnings on the repl backlog
This commit is contained in:
parent
20766a02b8
commit
ba1275f653
@ -2079,8 +2079,6 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
|
|||||||
* that may trigger write error or recreate handler. */
|
* that may trigger write error or recreate handler. */
|
||||||
if ((flags & CLIENT_PROTECTED) && !(flags & CLIENT_SLAVE)) continue;
|
if ((flags & CLIENT_PROTECTED) && !(flags & CLIENT_SLAVE)) continue;
|
||||||
|
|
||||||
//std::unique_lock<decltype(c->lock)> lock(c->lock);
|
|
||||||
|
|
||||||
/* Don't write to clients that are going to be closed anyway. */
|
/* Don't write to clients that are going to be closed anyway. */
|
||||||
if (c->flags & CLIENT_CLOSE_ASAP) continue;
|
if (c->flags & CLIENT_CLOSE_ASAP) continue;
|
||||||
|
|
||||||
@ -2098,6 +2096,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
|
|||||||
|
|
||||||
/* If after the synchronous writes above we still have data to
|
/* If after the synchronous writes above we still have data to
|
||||||
* output to the client, we need to install the writable handler. */
|
* output to the client, we need to install the writable handler. */
|
||||||
|
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
||||||
if (clientHasPendingReplies(c)) {
|
if (clientHasPendingReplies(c)) {
|
||||||
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) {
|
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) {
|
||||||
freeClientAsync(c);
|
freeClientAsync(c);
|
||||||
|
@ -264,9 +264,8 @@ void resizeReplicationBacklog(long long newsize) {
|
|||||||
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
|
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
|
||||||
if (g_pserver->repl_backlog_size == newsize) return;
|
if (g_pserver->repl_backlog_size == newsize) return;
|
||||||
|
|
||||||
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
|
|
||||||
|
|
||||||
if (g_pserver->repl_backlog != NULL) {
|
if (g_pserver->repl_backlog != NULL) {
|
||||||
|
std::unique_lock<fastlock> repl_backlog_lock(g_pserver->repl_backlog_lock);
|
||||||
/* What we actually do is to flush the old buffer and realloc a new
|
/* What we actually do is to flush the old buffer and realloc a new
|
||||||
* empty one. It will refill with new data incrementally.
|
* empty one. It will refill with new data incrementally.
|
||||||
* The reason is that copying a few gigabytes adds latency and even
|
* The reason is that copying a few gigabytes adds latency and even
|
||||||
@ -357,7 +356,7 @@ void freeReplicationBacklog(void) {
|
|||||||
void feedReplicationBacklog(const void *ptr, size_t len) {
|
void feedReplicationBacklog(const void *ptr, size_t len) {
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
const unsigned char *p = (const unsigned char*)ptr;
|
const unsigned char *p = (const unsigned char*)ptr;
|
||||||
|
std::unique_lock<fastlock> repl_backlog_lock(g_pserver->repl_backlog_lock, std::defer_lock);
|
||||||
|
|
||||||
if (g_pserver->repl_batch_idxStart >= 0) {
|
if (g_pserver->repl_batch_idxStart >= 0) {
|
||||||
/* We are lower bounded by the lowest replica offset, or the batch offset start if not applicable */
|
/* We are lower bounded by the lowest replica offset, or the batch offset start if not applicable */
|
||||||
@ -417,6 +416,8 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
|
|||||||
// We need to update a few variables or later asserts will notice we dropped data
|
// We need to update a few variables or later asserts will notice we dropped data
|
||||||
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset + len;
|
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset + len;
|
||||||
g_pserver->repl_lowest_off = -1;
|
g_pserver->repl_lowest_off = -1;
|
||||||
|
if (!repl_backlog_lock.owns_lock())
|
||||||
|
repl_backlog_lock.lock(); // we need to acquire the lock if we'll be overwriting data that writeToClient may be reading
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user