From 6fb90adf4be1c3b703040c9f744bc9cf17d3d577 Mon Sep 17 00:00:00 2001 From: nitaicaro <42576749+nitaicaro@users.noreply.github.com> Date: Thu, 30 May 2024 22:55:00 +0300 Subject: [PATCH] =?UTF-8?q?Fix=20crash=20where=20command=20duration=20is?= =?UTF-8?q?=20not=20reset=20when=20client=20is=20blocked=20=E2=80=A6=20(#5?= =?UTF-8?q?26)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In #11012, we changed the way command durations were computed to handle the same command being executed multiple times. In #11970, we added an assert if the duration is not properly reset, potentially indicating that a call to report statistics was missed. I found an edge case where this happens - easily reproduced by blocking a client on `XGROUPREAD` and migrating the stream's slot. This causes the engine to process the `XGROUPREAD` command twice: 1. First time, we are blocked on the stream, so we wait for unblock to come back to it a second time. In most cases, when we come back to process the command second time after unblock, we process the command normally, which includes recording the duration and then resetting it. 2. After unblocking we come back to process the command, and this is where we hit the edge case - at this point, we had already migrated the slot to another node, so we return a `MOVED` response. But when we do that, we don’t reset the duration field. Fix: also reset the duration when returning a `MOVED` response. I think this is right, because the client should redirect the command to the right node, which in turn will calculate the execution duration. Also wrote a test which reproduces this, it fails without the fix and passes with it. --------- Signed-off-by: Nitai Caro Co-authored-by: Nitai Caro --- src/server.c | 1 + tests/unit/cluster/slot-migration.tcl | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/src/server.c b/src/server.c index f87193b74..bf4967c10 100644 --- a/src/server.c +++ b/src/server.c @@ -3883,6 +3883,7 @@ int processCommand(client *c) { flagTransaction(c); } clusterRedirectClient(c, n, c->slot, error_code); + c->duration = 0; c->cmd->rejected_calls++; return C_OK; } diff --git a/tests/unit/cluster/slot-migration.tcl b/tests/unit/cluster/slot-migration.tcl index 008e97e03..d4f0d43b3 100644 --- a/tests/unit/cluster/slot-migration.tcl +++ b/tests/unit/cluster/slot-migration.tcl @@ -422,3 +422,24 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica resume_process [srv -3 pid] } } + +start_cluster 2 0 {tags {external:skip cluster regression} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } { + # Issue #563 regression test + test "Client blocked on XREADGROUP while stream's slot is migrated" { + set stream_name aga + set slot 609 + + # Start a deferring client to simulate a blocked client on XREADGROUP + R 0 XGROUP CREATE $stream_name mygroup $ MKSTREAM + set rd [valkey_deferring_client] + $rd xreadgroup GROUP mygroup consumer BLOCK 0 streams $stream_name > + wait_for_blocked_client + + # Migrate the slot to the target node + R 0 CLUSTER SETSLOT $slot MIGRATING [dict get [cluster_get_myself 1] id] + R 1 CLUSTER SETSLOT $slot IMPORTING [dict get [cluster_get_myself 0] id] + + # This line should cause the crash + R 0 MIGRATE 127.0.0.1 [lindex [R 1 CONFIG GET port] 1] $stream_name 0 5000 + } +}