From 762fbcb687c6823d3403cfe1ea485aced0ba4409 Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Sun, 16 Feb 2020 05:16:51 -0800 Subject: [PATCH 01/16] Minor CSC fixes and fixed documentation --- src/networking.c | 11 ++++++++--- src/tracking.c | 22 ++++++++-------------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/networking.c b/src/networking.c index 69350eed1..dad61904d 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2044,7 +2044,7 @@ void clientCommand(client *c) { "REPLY (on|off|skip) -- Control the replies sent to the current connection.", "SETNAME -- Assign the name to the current connection.", "UNBLOCK [TIMEOUT|ERROR] -- Unblock the specified blocked client.", -"TRACKING (on|off) [REDIRECT ] -- Enable client keys tracking for client side caching.", +"TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] [PREFIX second] ... -- Enable client keys tracking for client side caching.", "GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.", NULL }; @@ -2234,17 +2234,22 @@ NULL if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) { j++; if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) != - C_OK) return; + C_OK) + { + zfree(prefix); + return; + } /* We will require the client with the specified ID to exist * right now, even if it is possible that it gets disconnected * later. Still a valid sanity check. */ if (lookupClientByID(redir) == NULL) { addReplyError(c,"The client ID you want redirect to " "does not exist"); + zfree(prefix); return; } } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) { - bcast++; + bcast = 1; } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) { j++; prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1)); diff --git a/src/tracking.c b/src/tracking.c index 7179a54f8..619148f2f 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -44,7 +44,7 @@ rax *TrackingTable = NULL; rax *PrefixTable = NULL; uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across - the whole tracking table. This givesn + the whole tracking table. This gives an hint about the total memory we are using server side for CSC. */ robj *TrackingChannelName; @@ -145,9 +145,9 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s } } -/* This function is called after the excution of a readonly command in the +/* This function is called after the execution of a readonly command in the * case the client 'c' has keys tracking enabled. It will populate the - * tracking ivalidation table according to the keys the user fetched, so that + * tracking invalidation table according to the keys the user fetched, so that * Redis will know what are the clients that should receive an invalidation * message with certain groups of keys are modified. */ void trackingRememberKeys(client *c) { @@ -292,19 +292,12 @@ void trackingInvalidateKey(robj *keyobj) { } /* This function is called when one or all the Redis databases are flushed - * (dbid == -1 in case of FLUSHALL). Caching slots are not specific for - * each DB but are global: currently what we do is sending a special + * (dbid == -1 in case of FLUSHALL). Caching keys are not specific for + * each DB but are global: currently what we do is send a special * notification to clients with tracking enabled, invalidating the caching - * slot "-1", which means, "all the keys", in order to avoid flooding clients + * key "", which means, "all the keys", in order to avoid flooding clients * with many invalidation messages for all the keys they may hold. - * - * However trying to flush the tracking table here is very costly: - * we need scanning 16 million caching slots in the table to check - * if they are used, this introduces a big delay. So what we do is to really - * flush the table in the case of FLUSHALL. When a FLUSHDB is called instead - * we just send the invalidation message to all the clients, but don't - * flush the table: it will slowly get garbage collected as more keys - * are modified in the used caching slots. */ + */ void freeTrackingRadixTree(void *rt) { raxFree(rt); } @@ -325,6 +318,7 @@ void trackingInvalidateKeysOnFlush(int dbid) { /* In case of FLUSHALL, reclaim all the memory used by tracking. */ if (dbid == -1 && TrackingTable) { raxFreeWithCallback(TrackingTable,freeTrackingRadixTree); + TrackingTable = raxNew(); TrackingTableTotalItems = 0; } } From d1f22eaca422039e89b593e8c951439a538a4764 Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Sun, 16 Feb 2020 05:41:39 -0800 Subject: [PATCH 02/16] Give an error message if you specify redirect twice --- src/networking.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/networking.c b/src/networking.c index dad61904d..5b1229fde 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2233,6 +2233,13 @@ NULL if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) { j++; + if (redir != 0) { + addReplyError(c,"A client can only redirect to a single " + "other client"); + zfree(prefix); + return; + } + if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) != C_OK) { From 5d0890c0a83e4f3c2398a3ff7381c72ef36602df Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Thu, 6 Feb 2020 18:36:21 +0530 Subject: [PATCH 03/16] Fix memory leak in test_ld_conv --- tests/modules/misc.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/modules/misc.c b/tests/modules/misc.c index 41bec06ed..1048d5065 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -74,6 +74,7 @@ int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_ReplyWithError(ctx, err); goto final; } + /* Make sure we can't convert a string that has \0 in it */ char buf[4] = "123"; buf[1] = '\0'; @@ -81,8 +82,11 @@ int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { long double ld3; if (RedisModule_StringToLongDouble(s3, &ld3) == REDISMODULE_OK) { RedisModule_ReplyWithError(ctx, "Invalid string successfully converted to long double"); + RedisModule_FreeString(ctx, s3); goto final; } + RedisModule_FreeString(ctx, s3); + RedisModule_ReplyWithLongDouble(ctx, ld2); final: RedisModule_FreeString(ctx, s1); From ba0270799ed6b347c5e096a5ebbf9c7e1bc5c370 Mon Sep 17 00:00:00 2001 From: hwware Date: Mon, 17 Feb 2020 23:40:24 -0500 Subject: [PATCH 04/16] add missing subcommand description for debug oom --- src/debug.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/debug.c b/src/debug.c index b910d2d2d..dd96ad416 100644 --- a/src/debug.c +++ b/src/debug.c @@ -363,6 +363,7 @@ void debugCommand(client *c) { "LOADAOF -- Flush the AOF buffers on disk and reload the AOF in memory.", "LUA-ALWAYS-REPLICATE-COMMANDS <0|1> -- Setting it to 1 makes Lua replication defaulting to replicating single commands, without the script having to enable effects replication.", "OBJECT -- Show low level info about key and associated value.", +"OOM -- Crash the server simulating an out-of-memory error.", "PANIC -- Crash the server simulating a panic.", "POPULATE [prefix] [size] -- Create string keys named key:. If a prefix is specified is used instead of the 'key' prefix.", "RELOAD -- Save the RDB on disk and reload it back in memory.", From 294c9af469b1bc883dc5b8d6d6b8ce7ec695f29a Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 21 Feb 2020 17:08:45 +0100 Subject: [PATCH 05/16] Test engine: better tracking of what workers are doing. --- tests/support/server.tcl | 3 +++ tests/test_helper.tcl | 13 +++++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/tests/support/server.tcl b/tests/support/server.tcl index b20f1ad36..174b05852 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -53,6 +53,7 @@ proc kill_server config { } # kill server and wait for the process to be totally exited + send_data_packet $::test_server_fd server-killing $pid catch {exec kill $pid} if {$::valgrind} { set max_wait 60000 @@ -231,6 +232,8 @@ proc start_server {options {code undefined}} { set stdout [format "%s/%s" [dict get $config "dir"] "stdout"] set stderr [format "%s/%s" [dict get $config "dir"] "stderr"] + send_data_packet $::test_server_fd "server-spawning" "port $::port" + if {$::valgrind} { set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &] } elseif ($::stack_logging) { diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index b266bc56d..fa5579669 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -289,7 +289,7 @@ proc read_from_test_client fd { puts "\[$completed_tests_count/$all_tests_count [colorstr yellow $status]\]: $data ($elapsed seconds)" lappend ::clients_time_history $elapsed $data signal_idle_client $fd - set ::active_clients_task($fd) DONE + set ::active_clients_task($fd) "(DONE) $data" } elseif {$status eq {ok}} { if {!$::quiet} { puts "\[[colorstr green $status]\]: $data" @@ -320,10 +320,16 @@ proc read_from_test_client fd { exit 1 } elseif {$status eq {testing}} { set ::active_clients_task($fd) "(IN PROGRESS) $data" + } elseif {$status eq {server-spawning}} { + set ::active_clients_task($fd) "(SPAWNING SERVER) $data" } elseif {$status eq {server-spawned}} { lappend ::active_servers $data + set ::active_clients_task($fd) "(SPAWNED SERVER) pid:$data" + } elseif {$status eq {server-killing}} { + set ::active_clients_task($fd) "(KILLING SERVER) pid:$data" } elseif {$status eq {server-killed}} { set ::active_servers [lsearch -all -inline -not -exact $::active_servers $data] + set ::active_clients_task($fd) "(KILLED SERVER) pid:$data" } else { if {!$::quiet} { puts "\[$status\]: $data" @@ -333,7 +339,7 @@ proc read_from_test_client fd { proc show_clients_state {} { # The following loop is only useful for debugging tests that may - # enter an infinite loop. Commented out normally. + # enter an infinite loop. foreach x $::active_clients { if {[info exist ::active_clients_task($x)]} { puts "$x => $::active_clients_task($x)" @@ -363,8 +369,6 @@ proc signal_idle_client fd { set ::active_clients \ [lsearch -all -inline -not -exact $::active_clients $fd] - if 0 {show_clients_state} - # New unit to process? if {$::next_test != [llength $::all_tests]} { if {!$::quiet} { @@ -380,6 +384,7 @@ proc signal_idle_client fd { } } else { lappend ::idle_clients $fd + set ::active_clients_task($fd) "SLEEPING, no more units to assign" if {[llength $::active_clients] == 0} { the_end } From 72c053519c282a8fea2a6c10578ca9dacc183cac Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 21 Feb 2020 18:55:56 +0100 Subject: [PATCH 06/16] Test engine: detect timeout when checking for Redis startup. --- tests/support/server.tcl | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/support/server.tcl b/tests/support/server.tcl index 174b05852..eec43e485 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -271,9 +271,19 @@ proc start_server {options {code undefined}} { } # Wait for actual startup + set checkperiod 100; # Milliseconds + set maxiter [expr {120*1000/100}] ; # Wait up to 2 minutes. while {![info exists _pid]} { regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid - after 100 + after $checkperiod + incr maxiter -1 + if {$maxiter == 0} { + start_server_error $config_file "No PID detected in log $stdout" + puts "--- LOG CONTENT ---" + puts [exec cat $stdout] + puts "-------------------" + break + } } # setup properties to be able to initialize a client object From ef3551d1499f5ea4bfc75d33c6ff1979b0aa9b07 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 24 Feb 2020 10:46:23 +0100 Subject: [PATCH 07/16] Test engine: experimental change to avoid busy port problems. --- tests/support/server.tcl | 149 ++++++++++++++++++++++++--------------- 1 file changed, 92 insertions(+), 57 deletions(-) diff --git a/tests/support/server.tcl b/tests/support/server.tcl index eec43e485..d086366dc 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -141,6 +141,18 @@ proc tags {tags code} { uplevel 1 $code set ::tags [lrange $::tags 0 end-[llength $tags]] } + +# Write the configuration in the dictionary 'config' in the specified +# file name. +proc create_server_config_file {filename config} { + set fp [open $filename w+] + foreach directive [dict keys $config] { + puts -nonewline $fp "$directive " + puts $fp [dict get $config $directive] + } + close $fp +} + proc start_server {options {code undefined}} { # If we are running against an external server, we just push the # host/port pair in the stack the first time @@ -222,68 +234,91 @@ proc start_server {options {code undefined}} { # write new configuration to temporary file set config_file [tmpfile redis.conf] - set fp [open $config_file w+] - foreach directive [dict keys $config] { - puts -nonewline $fp "$directive " - puts $fp [dict get $config $directive] - } - close $fp + create_server_config_file $config_file $config set stdout [format "%s/%s" [dict get $config "dir"] "stdout"] set stderr [format "%s/%s" [dict get $config "dir"] "stderr"] - send_data_packet $::test_server_fd "server-spawning" "port $::port" - - if {$::valgrind} { - set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &] - } elseif ($::stack_logging) { - set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/redis-server $config_file > $stdout 2> $stderr &] - } else { - set pid [exec src/redis-server $config_file > $stdout 2> $stderr &] - } - - # Tell the test server about this new instance. - send_data_packet $::test_server_fd server-spawned $pid - - # check that the server actually started - # ugly but tries to be as fast as possible... - if {$::valgrind} {set retrynum 1000} else {set retrynum 100} - - if {$::verbose} { - puts -nonewline "=== ($tags) Starting server ${::host}:${::port} " - } - - if {$code ne "undefined"} { - set serverisup [server_is_up $::host $::port $retrynum] - } else { - set serverisup 1 - } - - if {$::verbose} { - puts "" - } - - if {!$serverisup} { - set err {} - append err [exec cat $stdout] "\n" [exec cat $stderr] - start_server_error $config_file $err - return - } - - # Wait for actual startup - set checkperiod 100; # Milliseconds - set maxiter [expr {120*1000/100}] ; # Wait up to 2 minutes. - while {![info exists _pid]} { - regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid - after $checkperiod - incr maxiter -1 - if {$maxiter == 0} { - start_server_error $config_file "No PID detected in log $stdout" - puts "--- LOG CONTENT ---" - puts [exec cat $stdout] - puts "-------------------" - break + # We need a loop here to retry with different ports. + set server_started 0 + while {$server_started == 0} { + if {$::verbose} { + puts -nonewline "=== ($tags) Starting server ${::host}:${::port} " } + + send_data_packet $::test_server_fd "server-spawning" "port $::port" + + if {$::valgrind} { + set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &] + } elseif ($::stack_logging) { + set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/redis-server $config_file > $stdout 2> $stderr &] + } else { + set pid [exec src/redis-server $config_file > $stdout 2> $stderr &] + } + + # Tell the test server about this new instance. + send_data_packet $::test_server_fd server-spawned $pid + + # check that the server actually started + # ugly but tries to be as fast as possible... + if {$::valgrind} {set retrynum 1000} else {set retrynum 100} + + # Wait for actual startup + set checkperiod 100; # Milliseconds + set maxiter [expr {120*1000/100}] ; # Wait up to 2 minutes. + set port_busy 0 + while {![info exists _pid]} { + regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid + after $checkperiod + incr maxiter -1 + if {$maxiter == 0} { + start_server_error $config_file "No PID detected in log $stdout" + puts "--- LOG CONTENT ---" + puts [exec cat $stdout] + puts "-------------------" + break + } + + # Check if the port is actually busy and the server failed + # for this reason. + if {[regexp {Could not create server TCP} [exec cat $stdout]]} { + set port_busy 1 + break + } + } + + # Sometimes we have to try a different port, even if we checked + # for availability. Other test clients may grab the port before we + # are able to do it for example. + if {$port_busy} { + puts "Port $::port was already busy, trying another port..." + set ::port [find_available_port [expr {$::port+1}]] + if {$::tls} { + dict set config "tls-port" $::port + } else { + dict set config port $::port + } + create_server_config_file $config_file $config + continue; # Try again + } + + if {$code ne "undefined"} { + set serverisup [server_is_up $::host $::port $retrynum] + } else { + set serverisup 1 + } + + if {$::verbose} { + puts "" + } + + if {!$serverisup} { + set err {} + append err [exec cat $stdout] "\n" [exec cat $stderr] + start_server_error $config_file $err + return + } + set server_started 1 } # setup properties to be able to initialize a client object From b43954260452fb600d9cc5c30aa96c17804bc1b5 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 21 Feb 2020 16:39:42 +0100 Subject: [PATCH 08/16] Tracking: optin/out implemented. --- src/networking.c | 65 ++++++++++++++++++++++++++++++++++++++++++------ src/server.h | 6 ++++- src/tracking.c | 27 ++++++++++++++------ 3 files changed, 82 insertions(+), 16 deletions(-) diff --git a/src/networking.c b/src/networking.c index 5b1229fde..4c394af70 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1365,6 +1365,12 @@ void resetClient(client *c) { if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand) c->flags &= ~CLIENT_ASKING; + /* We do the same for the CACHING command as well. It also affects + * the next command or transaction executed, in a way very similar + * to ASKING. */ + if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand) + c->flags &= ~CLIENT_TRACKING_CACHING; + /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply * to the next command will be sent, but set the flag if the command * we just processed was "CLIENT REPLY SKIP". */ @@ -2044,7 +2050,7 @@ void clientCommand(client *c) { "REPLY (on|off|skip) -- Control the replies sent to the current connection.", "SETNAME -- Assign the name to the current connection.", "UNBLOCK [TIMEOUT|ERROR] -- Unblock the specified blocked client.", -"TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] [PREFIX second] ... -- Enable client keys tracking for client side caching.", +"TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] [PREFIX second] [OPTIN] [OPTOUT]... -- Enable client keys tracking for client side caching.", "GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.", NULL }; @@ -2221,9 +2227,9 @@ NULL addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) { /* CLIENT TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] - * [PREFIX second] ... */ + * [PREFIX second] [OPTIN] [OPTOUT] ... */ long long redir = 0; - int bcast = 0; + uint64_t options = 0; robj **prefix = NULL; size_t numprefix = 0; @@ -2256,7 +2262,11 @@ NULL return; } } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) { - bcast = 1; + options |= CLIENT_TRACKING_BCAST; + } else if (!strcasecmp(c->argv[j]->ptr,"optin")) { + options |= CLIENT_TRACKING_OPTIN; + } else if (!strcasecmp(c->argv[j]->ptr,"optout")) { + options |= CLIENT_TRACKING_OPTOUT; } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) { j++; prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1)); @@ -2272,7 +2282,7 @@ NULL if (!strcasecmp(c->argv[2]->ptr,"on")) { /* Before enabling tracking, make sure options are compatible * among each other and with the current state of the client. */ - if (!bcast && numprefix) { + if (!(options & CLIENT_TRACKING_BCAST) && numprefix) { addReplyError(c, "PREFIX option requires BCAST mode to be enabled"); zfree(prefix); @@ -2281,7 +2291,8 @@ NULL if (c->flags & CLIENT_TRACKING) { int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST); - if (oldbcast != bcast) { + int newbcast = !!(options & CLIENT_TRACKING_BCAST); + if (oldbcast != newbcast) { addReplyError(c, "You can't switch BCAST mode on/off before disabling " "tracking for this client, and then re-enabling it with " @@ -2290,7 +2301,17 @@ NULL return; } } - enableTracking(c,redir,bcast,prefix,numprefix); + + if (options & CLIENT_TRACKING_BCAST && + options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT)) + { + addReplyError(c, + "OPTIN and OPTOUT are not compatible with BCAST"); + zfree(prefix); + return; + } + + enableTracking(c,redir,options,prefix,numprefix); } else if (!strcasecmp(c->argv[2]->ptr,"off")) { disableTracking(c); } else { @@ -2300,6 +2321,36 @@ NULL } zfree(prefix); addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"caching") && c->argc >= 3) { + if (!(c->flags & CLIENT_TRACKING)) { + addReplyError(c,"CLIENT CACHING can be called only when the " + "client is in tracking mode with OPTIN or " + "OPTOUT mode enabled"); + return; + } + + char *opt = c->argv[2]->ptr; + if (!strcasecmp(opt,"yes")) { + if (c->flags & CLIENT_TRACKING_OPTIN) { + c->flags |= CLIENT_TRACKING_CACHING; + } else { + addReplyError(c,"CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode."); + return; + } + } else if (!strcasecmp(opt,"no")) { + if (c->flags & CLIENT_TRACKING_OPTOUT) { + c->flags |= CLIENT_TRACKING_CACHING; + } else { + addReplyError(c,"CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode."); + return; + } + } else { + addReply(c,shared.syntaxerr); + return; + } + + /* Common reply for when we succeeded. */ + addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) { /* CLIENT GETREDIR */ if (c->flags & CLIENT_TRACKING) { diff --git a/src/server.h b/src/server.h index bb40ca4e1..87c293c26 100644 --- a/src/server.h +++ b/src/server.h @@ -248,6 +248,10 @@ typedef long long ustime_t; /* microsecond time type. */ perform client side caching. */ #define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */ #define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */ +#define CLIENT_TRACKING_OPTIN (1ULL<<34) /* Tracking in opt-in mode. */ +#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */ +#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given, + depending on optin/optout mode. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -1651,7 +1655,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...); #endif /* Client side caching (tracking mode) */ -void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix); +void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix); void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); diff --git a/src/tracking.c b/src/tracking.c index 619148f2f..45f83103a 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -93,7 +93,8 @@ void disableTracking(client *c) { if (c->flags & CLIENT_TRACKING) { server.tracking_clients--; c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| - CLIENT_TRACKING_BCAST); + CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN| + CLIENT_TRACKING_OPTOUT); } } @@ -124,10 +125,11 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { * eventually get freed, we'll send a message to the original client to * inform it of the condition. Multiple clients can redirect the invalidation * messages to the same client ID. */ -void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) { +void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) { if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++; c->flags |= CLIENT_TRACKING; - c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST); + c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST| + CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT); c->client_tracking_redirection = redirect_to; if (TrackingTable == NULL) { TrackingTable = raxNew(); @@ -135,7 +137,7 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s TrackingChannelName = createStringObject("__redis__:invalidate",20); } - if (bcast) { + if (options & CLIENT_TRACKING_BCAST) { c->flags |= CLIENT_TRACKING_BCAST; if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); for (size_t j = 0; j < numprefix; j++) { @@ -143,14 +145,23 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix)); } } + c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT); } /* This function is called after the execution of a readonly command in the - * case the client 'c' has keys tracking enabled. It will populate the - * tracking invalidation table according to the keys the user fetched, so that - * Redis will know what are the clients that should receive an invalidation - * message with certain groups of keys are modified. */ + * case the client 'c' has keys tracking enabled and the tracking is not + * in BCAST mode. It will populate the tracking invalidation table according + * to the keys the user fetched, so that Redis will know what are the clients + * that should receive an invalidation message with certain groups of keys + * are modified. */ void trackingRememberKeys(client *c) { + /* Return if we are in optin/out mode and the right CACHING command + * was/wasn't given in order to modify the default behavior. */ + uint64_t optin = c->flags & CLIENT_TRACKING_OPTIN; + uint64_t optout = c->flags & CLIENT_TRACKING_OPTOUT; + uint64_t caching_given = c->flags & CLIENT_TRACKING_CACHING; + if ((optin && !caching_given) || (optout && caching_given)) return; + int numkeys; int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys); if (keys == NULL) return; From 60096bc1a1d24e87a4ab9b4bcc671e1a9925476a Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 23 Feb 2020 12:46:14 +0200 Subject: [PATCH 09/16] Fix latency sensitivity of new defrag test I saw that the new defag test for list was failing in CI recently, so i reduce it's threshold from 12 to 60. besides that, i add / improve the latency test for that other two defrag tests (add a sensitive latency and digest / save checks) and fix bad usage of debug populate (can't overrides existing keys). this was the original intention, which creates higher fragmentation. --- tests/unit/memefficiency.tcl | 40 ++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index ec80c7384..468825a47 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -39,6 +39,8 @@ start_server {tags {"memefficiency"}} { start_server {tags {"defrag"}} { if {[string match {*jemalloc*} [s mem_allocator]]} { test "Active defrag" { + r config set save "" ;# prevent bgsave from interfereing with save below + r config set hz 100 r config set activedefrag no r config set active-defrag-threshold-lower 5 r config set active-defrag-cycle-min 65 @@ -46,8 +48,8 @@ start_server {tags {"defrag"}} { r config set active-defrag-ignore-bytes 2mb r config set maxmemory 100mb r config set maxmemory-policy allkeys-lru - r debug populate 700000 asdf 150 - r debug populate 170000 asdf 300 + r debug populate 700000 asdf1 150 + r debug populate 170000 asdf2 300 r ping ;# trigger eviction following the previous population after 120 ;# serverCron only updates the info once in 100ms set frag [s allocator_frag_ratio] @@ -55,6 +57,10 @@ start_server {tags {"defrag"}} { puts "frag $frag" } assert {$frag >= 1.4} + + r config set latency-monitor-threshold 5 + r latency reset + set digest [r debug digest] catch {r config set activedefrag yes} e if {![string match {DISABLED*} $e]} { # Wait for the active defrag to start working (decision once a @@ -78,19 +84,37 @@ start_server {tags {"defrag"}} { # Test the the fragmentation is lower. after 120 ;# serverCron only updates the info once in 100ms set frag [s allocator_frag_ratio] + set max_latency 0 + foreach event [r latency latest] { + lassign $event eventname time latency max + if {$eventname == "active-defrag-cycle"} { + set max_latency $max + } + } if {$::verbose} { puts "frag $frag" + puts "max latency $max_latency" + puts [r latency latest] + puts [r latency history active-defrag-cycle] } assert {$frag < 1.1} + # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, + # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher + assert {$max_latency <= 60} } else { set _ "" } - } {} + # verify the data isn't corrupted or changed + set newdigest [r debug digest] + assert {$digest eq $newdigest} + r save ;# saving an rdb iterates over all the data / pointers + } {OK} test "Active defrag big keys" { r flushdb r config resetstat r config set save "" ;# prevent bgsave from interfereing with save below + r config set hz 100 r config set activedefrag no r config set active-defrag-max-scan-fields 1000 r config set active-defrag-threshold-lower 5 @@ -200,9 +224,9 @@ start_server {tags {"defrag"}} { puts [r latency history active-defrag-cycle] } assert {$frag < 1.1} - # due to high fragmentation, 10hz, and active-defrag-cycle-max set to 75, - # we expect max latency to be not much higher than 75ms - assert {$max_latency <= 120} + # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, + # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher + assert {$max_latency <= 60} } # verify the data isn't corrupted or changed set newdigest [r debug digest] @@ -292,8 +316,8 @@ start_server {tags {"defrag"}} { } assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, - # we expect max latency to be not much higher than 7.5ms - assert {$max_latency <= 12} + # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher + assert {$max_latency <= 60} } # verify the data isn't corrupted or changed set newdigest [r debug digest] From 0b988fa9ece7e92bd84d509ea281b03c120db74f Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 25 Feb 2020 13:01:52 +0200 Subject: [PATCH 10/16] fix github actions failing latency test for active defrag seems that github actions are slow, using just one client to reduce false positives. also adding verbose, testing only on latest ubuntu, and building on older one. when doing that, i can reduce the test threshold back to something saner --- .github/workflows/ci.yml | 23 ++++++++++++----------- tests/unit/memefficiency.tcl | 6 +++--- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 847abcf02..559ae61d8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,11 +3,8 @@ name: CI on: [push, pull_request] jobs: - build-ubuntu: - strategy: - matrix: - platform: [ubuntu-latest, ubuntu-16.04] - runs-on: ${{ matrix.platform }} + test-ubuntu-latest: + runs-on: ubuntu-latest steps: - uses: actions/checkout@v1 - name: make @@ -15,13 +12,17 @@ jobs: - name: test run: | sudo apt-get install tcl8.5 - make test + ./runtest --clients 1 --verbose - build-macos-latest: - strategy: - matrix: - platform: [macos-latest, macOS-10.14] - runs-on: ${{ matrix.platform }} + test-ubuntu-old: + runs-on: ubuntu-16.04 + steps: + - uses: actions/checkout@v1 + - name: make + run: make + + build-macos-latest: + runs-on: macos-latest steps: - uses: actions/checkout@v1 - name: make diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 468825a47..c899103fd 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -100,7 +100,7 @@ start_server {tags {"defrag"}} { assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher - assert {$max_latency <= 60} + assert {$max_latency <= 30} } else { set _ "" } @@ -226,7 +226,7 @@ start_server {tags {"defrag"}} { assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher - assert {$max_latency <= 60} + assert {$max_latency <= 30} } # verify the data isn't corrupted or changed set newdigest [r debug digest] @@ -317,7 +317,7 @@ start_server {tags {"defrag"}} { assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher - assert {$max_latency <= 60} + assert {$max_latency <= 30} } # verify the data isn't corrupted or changed set newdigest [r debug digest] From 635321d47e4a7fe70ab6a38a66a7b87d16455adb Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Wed, 26 Feb 2020 08:12:07 +0200 Subject: [PATCH 11/16] fix github actions failing latency test for active defrag - part 2 it seems that running two clients at a time is ok too, resuces action time from 20 minutes to 10. we'll use this for now, and if one day it won't be enough we'll have to run just the sensitive tests one by one separately from the others. this commit also fixes an issue with the defrag test that appears to be very rare. --- .github/workflows/ci.yml | 4 ++-- tests/unit/memefficiency.tcl | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 559ae61d8..cc4991606 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,9 +12,9 @@ jobs: - name: test run: | sudo apt-get install tcl8.5 - ./runtest --clients 1 --verbose + ./runtest --clients 2 --verbose - test-ubuntu-old: + build-ubuntu-old: runs-on: ubuntu-16.04 steps: - uses: actions/checkout@v1 diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index c899103fd..06b0e07d7 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -60,6 +60,7 @@ start_server {tags {"defrag"}} { r config set latency-monitor-threshold 5 r latency reset + r config set maxmemory 110mb ;# prevent further eviction (not to fail the digest test) set digest [r debug digest] catch {r config set activedefrag yes} e if {![string match {DISABLED*} $e]} { @@ -166,7 +167,7 @@ start_server {tags {"defrag"}} { for {set j 0} {$j < 500000} {incr j} { $rd read ; # Discard replies } - assert {[r dbsize] == 500010} + assert_equal [r dbsize] 500010 # create some fragmentation for {set j 0} {$j < 500000} {incr j 2} { @@ -175,7 +176,7 @@ start_server {tags {"defrag"}} { for {set j 0} {$j < 500000} {incr j 2} { $rd read ; # Discard replies } - assert {[r dbsize] == 250010} + assert_equal [r dbsize] 250010 # start defrag after 120 ;# serverCron only updates the info once in 100ms From 2ecab0b63a632672f7f60a83b578a8aba398487e Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Wed, 19 Feb 2020 13:24:50 +0530 Subject: [PATCH 12/16] Modules: Do not auto-unblock clients if not blocked on keys --- src/module.c | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/module.c b/src/module.c index aae15d74b..f37d987e0 100644 --- a/src/module.c +++ b/src/module.c @@ -4290,12 +4290,15 @@ void unblockClientFromModule(client *c) { * We must call moduleUnblockClient in order to free privdata and * RedisModuleBlockedClient. * - * Note that clients implementing threads and working with private data, - * should make sure to stop the threads or protect the private data - * in some other way in the disconnection and timeout callback, because - * here we are going to free the private data associated with the - * blocked client. */ - if (!bc->unblocked) + * Note that we only do that for clients that are blocked on keys, for which + * the contract is that the module should not call RM_UnblockClient under + * normal circumstances. + * Clients implementing threads and working with private data should be + * aware that calling RM_UnblockClient for every blocked client is their + * responsibility, and if they fail to do so memory may leak. Ideally they + * should implement the disconnect and timeout callbacks and call + * RM_UnblockClient, but any other way is also acceptable. */ + if (bc->blocked_on_keys && !bc->unblocked) moduleUnblockClient(c); bc->client = NULL; @@ -4409,6 +4412,10 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) { * * free_privdata: called in order to free the private data that is passed * by RedisModule_UnblockClient() call. + * + * Note: RedisModule_UnblockClient should be called for every blocked client, + * even if client was killed, timed-out or disconnected. Failing to do so + * will result in memory leaks. */ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL); @@ -4463,7 +4470,15 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc * freed using the free_privdata callback provided by the user. * * However the reply callback will be able to access the argument vector of - * the command, so the private data is often not needed. */ + * the command, so the private data is often not needed. + * + * Note: Under normal circumstances RedisModule_UnblockClient should not be + * called for clients that are blocked on keys (Either the key will + * become ready or a timeout will occur). If for some reason you do want + * to call RedisModule_UnblockClient it is possible: Client will be + * handled as if it were timed-out (You must implement the timeout + * callback in that case). + */ RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata); } From 12626ce9bb58bbc9f32d7ae94e343a827bd81663 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Thu, 20 Feb 2020 17:56:52 +0200 Subject: [PATCH 13/16] fix race in module api test for fork in some cases we were trying to kill the fork before it got created --- tests/modules/fork.c | 2 +- tests/unit/moduleapi/fork.tcl | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/modules/fork.c b/tests/modules/fork.c index 1a139ef1b..0443d9ef0 100644 --- a/tests/modules/fork.c +++ b/tests/modules/fork.c @@ -42,7 +42,7 @@ int fork_create(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) /* child */ RedisModule_Log(ctx, "notice", "fork child started"); - usleep(200000); + usleep(500000); RedisModule_Log(ctx, "notice", "fork child exiting"); RedisModule_ExitFromChild(code_to_exit_with); /* unreachable */ diff --git a/tests/unit/moduleapi/fork.tcl b/tests/unit/moduleapi/fork.tcl index f7d7e47d5..8535a3382 100644 --- a/tests/unit/moduleapi/fork.tcl +++ b/tests/unit/moduleapi/fork.tcl @@ -20,9 +20,8 @@ start_server {tags {"modules"}} { test {Module fork kill} { r fork.create 3 - after 20 + after 250 r fork.kill - after 100 assert {[count_log_message "fork child started"] eq "2"} assert {[count_log_message "Received SIGUSR1 in child"] eq "1"} From 4d12c37c54d81a89ccbbea3c23f783477fb97c51 Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Sun, 23 Feb 2020 19:13:09 +0530 Subject: [PATCH 14/16] XREADGROUP should propagate XCALIM/SETID in MULTI/EXEC Use built-in alsoPropagate mechanism that wraps commands in MULTI/EXEC before sending them to replica/AOF --- src/t_stream.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 900fa3a17..e1efc6bca 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -848,7 +848,7 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam argv[11] = createStringObject("JUSTID",6); argv[12] = createStringObject("LASTID",6); argv[13] = createObjectFromStreamID(&group->last_id); - propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[3]); decrRefCount(argv[4]); @@ -875,7 +875,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna argv[2] = key; argv[3] = groupname; argv[4] = createObjectFromStreamID(&group->last_id); - propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[1]); decrRefCount(argv[4]); From 15ea13245ae388a34277595efaf17e94cb6b6bc1 Mon Sep 17 00:00:00 2001 From: Ariel Date: Sat, 22 Feb 2020 23:49:23 +0200 Subject: [PATCH 15/16] fix ThreadSafeContext lock/unlock function names --- src/module.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/module.c b/src/module.c index f37d987e0..8c61f60e2 100644 --- a/src/module.c +++ b/src/module.c @@ -4740,9 +4740,9 @@ int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) { * * To call non-reply APIs, the thread safe context must be prepared with: * - * RedisModule_ThreadSafeCallStart(ctx); + * RedisModule_ThreadSafeContextLock(ctx); * ... make your call here ... - * RedisModule_ThreadSafeCallStop(ctx); + * RedisModule_ThreadSafeContextUnlock(ctx); * * This is not needed when using `RedisModule_Reply*` functions, assuming * that a blocked client was used when the context was created, otherwise From 973297336fc05a601e17be70aba88e5dca6480ae Mon Sep 17 00:00:00 2001 From: Hengjian Tang Date: Tue, 25 Feb 2020 15:55:28 +0800 Subject: [PATCH 16/16] modify the read buf size according to the write buf size PROTO_IOBUF_LEN defined before --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 4843f97d5..c497051c8 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1352,7 +1352,7 @@ void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(connection *conn) { - char buf[4096]; + char buf[PROTO_IOBUF_LEN]; ssize_t nread, readlen, nwritten; int use_diskless_load = useDisklessLoad(); redisDb *diskless_load_backup = NULL;