Add tag for dual-channel logs (#999)

This PR introduces a consistent tagging system for dual-channel logs.
The goal is to improve log readability and filterability, making it
easier for operators to manage and analyze log entries.

Resolves https://github.com/valkey-io/valkey/issues/986

---------

Signed-off-by: naglera <anagler123@gmail.com>
This commit is contained in:
Amit Nagler 2024-11-26 16:51:52 +02:00 committed by GitHub
parent 469d41fb37
commit 9305b49145
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 70 additions and 60 deletions

View File

@ -1713,10 +1713,10 @@ void freeClient(client *c) {
/* Log link disconnection with replica */
if (getClientType(c) == CLIENT_TYPE_REPLICA) {
serverLog(LL_NOTICE,
c->flag.repl_rdb_channel ? "Replica %s rdb channel disconnected."
: "Connection with replica %s lost.",
replicationGetReplicaName(c));
if (c->flag.repl_rdb_channel)
dualChannelServerLog(LL_NOTICE, "Replica %s rdb channel disconnected.", replicationGetReplicaName(c));
else
serverLog(LL_NOTICE, "Connection with replica %s lost.", replicationGetReplicaName(c));
}
/* Free the query buffer */
@ -1963,14 +1963,15 @@ int freeClientsInAsyncFreeQueue(void) {
if (!c->rdb_client_disconnect_time) {
if (c->conn) connSetReadHandler(c->conn, NULL);
c->rdb_client_disconnect_time = server.unixtime;
serverLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id,
replicationGetReplicaName(c), server.wait_before_rdb_client_free);
dualChannelServerLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds",
(unsigned long long)c->id, replicationGetReplicaName(c), server.wait_before_rdb_client_free);
}
if (server.unixtime - c->rdb_client_disconnect_time <= server.wait_before_rdb_client_free) continue;
serverLog(LL_NOTICE,
"Replica main channel failed to establish PSYNC within the grace period (%ld seconds). "
"Freeing RDB client %llu.",
(long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id);
dualChannelServerLog(
LL_NOTICE,
"Replica main channel failed to establish PSYNC within the grace period (%ld seconds). "
"Freeing RDB client %llu.",
(long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id);
c->flag.protected_rdb_channel = 0;
}

View File

@ -227,9 +227,9 @@ void addRdbReplicaToPsyncWait(client *replica_rdb_client) {
tail->refcount++;
}
}
serverLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ",
replicationGetReplicaName(replica_rdb_client), (unsigned long long)replica_rdb_client->id,
tail ? "tracking repl-backlog tail" : "no repl-backlog to track");
dualChannelServerLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ",
replicationGetReplicaName(replica_rdb_client), (unsigned long long)replica_rdb_client->id,
tail ? "tracking repl-backlog tail" : "no repl-backlog to track");
replica_rdb_client->ref_repl_buf_node = tail ? ln : NULL;
/* Prevent rdb client from being freed before psync is established. */
replica_rdb_client->flag.protected_rdb_channel = 1;
@ -252,8 +252,8 @@ void backfillRdbReplicasToPsyncWait(void) {
if (replica_rdb_client->ref_repl_buf_node) continue;
replica_rdb_client->ref_repl_buf_node = ln;
head->refcount++;
serverLog(LL_DEBUG, "Attach replica rdb client %llu to repl buf block",
(long long unsigned int)replica_rdb_client->id);
dualChannelServerLog(LL_DEBUG, "Attach replica rdb client %llu to repl buf block",
(long long unsigned int)replica_rdb_client->id);
}
raxStop(&iter);
}
@ -271,10 +271,10 @@ void removeReplicaFromPsyncWait(client *replica_main_client) {
}
replica_rdb_client->ref_repl_buf_node = NULL;
replica_rdb_client->flag.protected_rdb_channel = 0;
serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu, repl buffer block %s",
replicationGetReplicaName(replica_main_client),
(long long unsigned int)replica_main_client->associated_rdb_client_id,
o ? "ref count decreased" : "doesn't exist");
dualChannelServerLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu, repl buffer block %s",
replicationGetReplicaName(replica_main_client),
(long long unsigned int)replica_main_client->associated_rdb_client_id,
o ? "ref count decreased" : "doesn't exist");
uint64_t id = htonu64(replica_rdb_client->id);
raxRemove(server.replicas_waiting_psync, (unsigned char *)&id, sizeof(id), NULL);
}
@ -391,8 +391,8 @@ void freeReplicaReferencedReplBuffer(client *replica) {
if (replica->flag.repl_rdb_channel) {
uint64_t rdb_cid = htonu64(replica->id);
if (raxRemove(server.replicas_waiting_psync, (unsigned char *)&rdb_cid, sizeof(rdb_cid), NULL)) {
serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu from replicas rax.",
replicationGetReplicaName(replica), (long long unsigned int)replica->id);
dualChannelServerLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu from replicas rax.",
replicationGetReplicaName(replica), (long long unsigned int)replica->id);
}
}
if (replica->ref_repl_buf_node != NULL) {
@ -1121,10 +1121,11 @@ void syncCommand(client *c) {
* resync. */
if (primary_replid[0] != '?') server.stat_sync_partial_err++;
if (c->replica_capa & REPLICA_CAPA_DUAL_CHANNEL) {
serverLog(LL_NOTICE,
"Replica %s is capable of dual channel synchronization, and partial sync isn't possible. "
"Full sync will continue with dedicated RDB channel.",
replicationGetReplicaName(c));
dualChannelServerLog(LL_NOTICE,
"Replica %s is capable of dual channel synchronization, and partial sync "
"isn't possible. "
"Full sync will continue with dedicated RDB channel.",
replicationGetReplicaName(c));
const char *buf = "+DUALCHANNELSYNC\r\n";
if (connWrite(c->conn, buf, strlen(buf)) != (int)strlen(buf)) {
freeClientAsync(c);
@ -2565,7 +2566,7 @@ void freePendingReplDataBuf(void) {
* provisional primary struct, and free local replication buffer. */
void replicationAbortDualChannelSyncTransfer(void) {
serverAssert(server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE);
serverLog(LL_NOTICE, "Aborting dual channel sync");
dualChannelServerLog(LL_NOTICE, "Aborting dual channel sync");
if (server.repl_rdb_transfer_s) {
connClose(server.repl_rdb_transfer_s);
server.repl_rdb_transfer_s = NULL;
@ -2594,8 +2595,9 @@ int sendCurrentOffsetToReplica(client *replica) {
int buflen;
buflen = snprintf(buf, sizeof(buf), "$ENDOFF:%lld %s %d %llu\r\n", server.primary_repl_offset, server.replid,
server.db->id, (long long unsigned int)replica->id);
serverLog(LL_NOTICE, "Sending to replica %s RDB end offset %lld and client-id %llu",
replicationGetReplicaName(replica), server.primary_repl_offset, (long long unsigned int)replica->id);
dualChannelServerLog(LL_NOTICE, "Sending to replica %s RDB end offset %lld and client-id %llu",
replicationGetReplicaName(replica), server.primary_repl_offset,
(long long unsigned int)replica->id);
if (connSyncWrite(replica->conn, buf, buflen, server.repl_syncio_timeout * 1000) != buflen) {
freeClientAsync(replica);
return C_ERR;
@ -2604,7 +2606,7 @@ int sendCurrentOffsetToReplica(client *replica) {
}
static int dualChannelReplHandleHandshake(connection *conn, sds *err) {
serverLog(LL_DEBUG, "Received first reply from primary using rdb connection.");
dualChannelServerLog(LL_DEBUG, "Received first reply from primary using rdb connection.");
/* AUTH with the primary if required. */
if (server.primary_auth) {
char *args[] = {"AUTH", NULL, NULL};
@ -2620,7 +2622,7 @@ static int dualChannelReplHandleHandshake(connection *conn, sds *err) {
argc++;
*err = sendCommandArgv(conn, argc, args, lens);
if (*err) {
serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err);
dualChannelServerLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err);
return C_ERR;
}
}
@ -2630,14 +2632,14 @@ static int dualChannelReplHandleHandshake(connection *conn, sds *err) {
NULL);
sdsfree(portstr);
if (*err) {
serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err);
dualChannelServerLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err);
return C_ERR;
}
if (connSetReadHandler(conn, dualChannelFullSyncWithPrimary) == C_ERR) {
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno),
connGetInfo(conn, conninfo, sizeof(conninfo)));
dualChannelServerLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno),
connGetInfo(conn, conninfo, sizeof(conninfo)));
return C_ERR;
}
return C_OK;
@ -2646,11 +2648,11 @@ static int dualChannelReplHandleHandshake(connection *conn, sds *err) {
static int dualChannelReplHandleAuthReply(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) {
serverLog(LL_WARNING, "Primary did not respond to auth command during SYNC handshake");
dualChannelServerLog(LL_WARNING, "Primary did not respond to auth command during SYNC handshake");
return C_ERR;
}
if ((*err)[0] == '-') {
serverLog(LL_WARNING, "Unable to AUTH to Primary: %s", *err);
dualChannelServerLog(LL_WARNING, "Unable to AUTH to Primary: %s", *err);
return C_ERR;
}
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY;
@ -2660,17 +2662,17 @@ static int dualChannelReplHandleAuthReply(connection *conn, sds *err) {
static int dualChannelReplHandleReplconfReply(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) {
serverLog(LL_WARNING, "Primary did not respond to replconf command during SYNC handshake");
dualChannelServerLog(LL_WARNING, "Primary did not respond to replconf command during SYNC handshake");
return C_ERR;
}
if (*err[0] == '-') {
serverLog(LL_NOTICE, "Server does not support sync with offset, dual channel sync approach cannot be used: %s",
*err);
dualChannelServerLog(LL_NOTICE, "Server does not support sync with offset, dual channel sync approach cannot be used: %s",
*err);
return C_ERR;
}
if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) {
serverLog(LL_WARNING, "I/O error writing to Primary: %s", connGetLastError(conn));
dualChannelServerLog(LL_WARNING, "I/O error writing to Primary: %s", connGetLastError(conn));
return C_ERR;
}
return C_OK;
@ -2684,7 +2686,7 @@ static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) {
}
if (*err[0] == '\0') {
/* Retry again later */
serverLog(LL_DEBUG, "Received empty $ENDOFF response");
dualChannelServerLog(LL_DEBUG, "Received empty $ENDOFF response");
return C_RETRY;
}
long long reploffset;
@ -2693,7 +2695,7 @@ static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) {
/* Parse end offset response */
char *endoff_format = "$ENDOFF:%lld %40s %d %llu";
if (sscanf(*err, endoff_format, &reploffset, primary_replid, &dbid, &rdb_client_id) != 4) {
serverLog(LL_WARNING, "Received unexpected $ENDOFF response: %s", *err);
dualChannelServerLog(LL_WARNING, "Received unexpected $ENDOFF response: %s", *err);
return C_ERR;
}
server.rdb_client_id = rdb_client_id;
@ -2741,7 +2743,8 @@ static void dualChannelFullSyncWithPrimary(connection *conn) {
/* Check for errors in the socket: after a non blocking connect() we
* may find that the socket is in error state. */
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING, "Error condition on socket for dual channel replication: %s", connGetLastError(conn));
dualChannelServerLog(LL_WARNING, "Error condition on socket for dual channel replication: %s",
connGetLastError(conn));
goto error;
}
switch (server.repl_rdb_channel_state) {
@ -2830,13 +2833,13 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t
int nread = connRead(conn, data_block->buf + data_block->used, read);
if (nread == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_NOTICE, "Error reading from primary: %s", connGetLastError(conn));
dualChannelServerLog(LL_NOTICE, "Error reading from primary: %s", connGetLastError(conn));
cancelReplicationHandshake(1);
}
return C_ERR;
}
if (nread == 0) {
serverLog(LL_VERBOSE, "Provisional primary closed connection");
dualChannelServerLog(LL_VERBOSE, "Provisional primary closed connection");
cancelReplicationHandshake(1);
return C_ERR;
}
@ -2865,7 +2868,7 @@ void bufferReplData(connection *conn) {
if (readlen && remaining_bytes == 0) {
if (server.client_obuf_limits[CLIENT_TYPE_REPLICA].hard_limit_bytes &&
server.pending_repl_data.len > server.client_obuf_limits[CLIENT_TYPE_REPLICA].hard_limit_bytes) {
serverLog(LL_NOTICE, "Replication buffer limit reached, stopping buffering.");
dualChannelServerLog(LL_NOTICE, "Replication buffer limit reached, stopping buffering.");
/* Stop accumulating primary commands. */
connSetReadHandler(conn, NULL);
break;
@ -2938,7 +2941,7 @@ void dualChannelSyncSuccess(void) {
/* Wait for the accumulated buffer to be processed before reading any more replication updates */
if (server.pending_repl_data.blocks && streamReplDataBufToDb(server.primary) == C_ERR) {
/* Sync session aborted during repl data streaming. */
serverLog(LL_WARNING, "Failed to stream local replication buffer into memory");
dualChannelServerLog(LL_WARNING, "Failed to stream local replication buffer into memory");
/* Verify sync is still in progress */
if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) {
replicationAbortDualChannelSyncTransfer();
@ -2947,7 +2950,7 @@ void dualChannelSyncSuccess(void) {
return;
}
freePendingReplDataBuf();
serverLog(LL_NOTICE, "Successfully streamed replication data into memory");
dualChannelServerLog(LL_NOTICE, "Successfully streamed replication data into memory");
/* We can resume reading from the primary connection once the local replication buffer has been loaded. */
replicationSteadyStateInit();
replicationSendAck(); /* Send ACK to notify primary that replica is synced */
@ -2963,7 +2966,7 @@ int dualChannelSyncHandlePsync(void) {
if (server.repl_rdb_channel_state < REPL_DUAL_CHANNEL_RDB_LOADED) {
/* RDB is still loading */
if (connSetReadHandler(server.repl_provisional_primary.conn, bufferReplData) == C_ERR) {
serverLog(LL_WARNING, "Error while setting readable handler: %s", strerror(errno));
dualChannelServerLog(LL_WARNING, "Error while setting readable handler: %s", strerror(errno));
cancelReplicationHandshake(1);
return C_ERR;
}
@ -2972,7 +2975,7 @@ int dualChannelSyncHandlePsync(void) {
}
serverAssert(server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RDB_LOADED);
/* RDB is loaded */
serverLog(LL_DEBUG, "Dual channel sync - psync established after rdb load");
dualChannelServerLog(LL_DEBUG, "Psync established after rdb load");
dualChannelSyncSuccess();
return C_OK;
}
@ -3066,8 +3069,9 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) {
/* While in dual channel replication, we should use our prepared repl id and offset. */
psync_replid = server.repl_provisional_primary.replid;
snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_primary.reploff + 1);
serverLog(LL_NOTICE, "Trying a partial resynchronization using main channel (request %s:%s).", psync_replid,
psync_offset);
dualChannelServerLog(LL_NOTICE,
"Trying a partial resynchronization using main channel (request %s:%s).",
psync_replid, psync_offset);
} else if (server.cached_primary) {
psync_replid = server.cached_primary->replid;
snprintf(psync_offset, sizeof(psync_offset), "%lld", server.cached_primary->reploff + 1);
@ -3214,7 +3218,7 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) {
/* A response of +DUALCHANNELSYNC from the primary implies that partial
* synchronization is not possible and that the primary supports full
* sync using dedicated RDB channel. Full sync will continue that way. */
serverLog(LL_NOTICE, "PSYNC is not possible, initialize RDB channel.");
dualChannelServerLog(LL_NOTICE, "PSYNC is not possible, initialize RDB channel.");
sdsfree(reply);
return PSYNC_FULLRESYNC_DUAL_CHANNEL;
}
@ -3258,7 +3262,7 @@ int dualChannelReplMainConnRecvCapaReply(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
if ((*err)[0] == '-') {
serverLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", *err);
dualChannelServerLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", *err);
return C_ERR;
}
return C_OK;
@ -3267,7 +3271,7 @@ int dualChannelReplMainConnRecvCapaReply(connection *conn, sds *err) {
int dualChannelReplMainConnSendPsync(connection *conn, sds *err) {
if (server.debug_pause_after_fork) debugPauseProcess();
if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) {
serverLog(LL_WARNING, "Aborting dual channel sync. Write error.");
dualChannelServerLog(LL_WARNING, "Aborting dual channel sync. Write error.");
*err = sdsnew(connGetLastError(conn));
return C_ERR;
}
@ -3279,8 +3283,8 @@ int dualChannelReplMainConnRecvPsyncReply(connection *conn, sds *err) {
if (psync_result == PSYNC_WAIT_REPLY) return C_OK; /* Try again later... */
if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Primary accepted a Partial Resynchronization%s",
server.repl_rdb_transfer_s != NULL ? ", RDB load in background." : ".");
dualChannelServerLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Primary accepted a Partial Resynchronization%s",
server.repl_rdb_transfer_s != NULL ? ", RDB load in background." : ".");
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Partial Resynchronization accepted. Ready to "
"accept connections in read-write mode.\n");
@ -3328,7 +3332,7 @@ void dualChannelSetupMainConnForPsync(connection *conn) {
}
if (ret == C_ERR) {
serverLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d %s", ret, err ? err : "");
dualChannelServerLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d %s", ret, err ? err : "");
cancelReplicationHandshake(1);
}
sdsfree(err);
@ -3717,8 +3721,8 @@ void syncWithPrimary(connection *conn) {
}
if (connSetReadHandler(conn, NULL) == C_ERR) {
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING, "Can't clear main connection handler: %s (%s)", strerror(errno),
connGetInfo(conn, conninfo, sizeof(conninfo)));
dualChannelServerLog(LL_WARNING, "Can't clear main connection handler: %s (%s)", strerror(errno),
connGetInfo(conn, conninfo, sizeof(conninfo)));
goto error;
}
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_SEND_HANDSHAKE;

View File

@ -4044,6 +4044,11 @@ void debugPauseProcess(void);
_serverLog(level, __VA_ARGS__); \
} while (0)
/* dualChannelServerLog - Log messages related to dual-channel operations
* This macro wraps the serverLog function, prepending "<Dual Channel>"
* to the log message. */
#define dualChannelServerLog(level, ...) serverLog(level, "<Dual Channel> " __VA_ARGS__)
#define serverDebug(fmt, ...) printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__)
#define serverDebugMark() printf("-- MARK %s:%d --\n", __FILE__, __LINE__)

View File

@ -485,7 +485,7 @@ start_server {tags {"dual-channel-replication external:skip"}} {
}
wait_for_value_to_propegate_to_replica $primary $replica "key1"
# Confirm the occurrence of a race condition.
wait_for_log_messages -1 {"*Dual channel sync - psync established after rdb load*"} 0 2000 1
wait_for_log_messages -1 {"*<Dual Channel> Psync established after rdb load*"} 0 2000 1
}
}
}