diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 847abcf02..cc4991606 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,11 +3,8 @@ name: CI on: [push, pull_request] jobs: - build-ubuntu: - strategy: - matrix: - platform: [ubuntu-latest, ubuntu-16.04] - runs-on: ${{ matrix.platform }} + test-ubuntu-latest: + runs-on: ubuntu-latest steps: - uses: actions/checkout@v1 - name: make @@ -15,13 +12,17 @@ jobs: - name: test run: | sudo apt-get install tcl8.5 - make test + ./runtest --clients 2 --verbose - build-macos-latest: - strategy: - matrix: - platform: [macos-latest, macOS-10.14] - runs-on: ${{ matrix.platform }} + build-ubuntu-old: + runs-on: ubuntu-16.04 + steps: + - uses: actions/checkout@v1 + - name: make + run: make + + build-macos-latest: + runs-on: macos-latest steps: - uses: actions/checkout@v1 - name: make diff --git a/src/debug.cpp b/src/debug.cpp index 4e0f5f123..5ffa8df47 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -372,6 +372,7 @@ void debugCommand(client *c) { "LOADAOF -- Flush the AOF buffers on disk and reload the AOF in memory.", "LUA-ALWAYS-REPLICATE-COMMANDS <0|1> -- Setting it to 1 makes Lua replication defaulting to replicating single commands, without the script having to enable effects replication.", "OBJECT -- Show low level info about key and associated value.", +"OOM -- Crash the server simulating an out-of-memory error.", "PANIC -- Crash the server simulating a panic.", "POPULATE [prefix] [size] -- Create string keys named key:. If a prefix is specified is used instead of the 'key' prefix.", "RELOAD -- Save the RDB on disk and reload it back in memory.", diff --git a/src/module.cpp b/src/module.cpp index 82c06f1bb..a27bf95ff 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -4370,12 +4370,15 @@ void unblockClientFromModule(client *c) { * We must call moduleUnblockClient in order to free privdata and * RedisModuleBlockedClient. * - * Note that clients implementing threads and working with private data, - * should make sure to stop the threads or protect the private data - * in some other way in the disconnection and timeout callback, because - * here we are going to free the private data associated with the - * blocked client. */ - if (!bc->unblocked) + * Note that we only do that for clients that are blocked on keys, for which + * the contract is that the module should not call RM_UnblockClient under + * normal circumstances. + * Clients implementing threads and working with private data should be + * aware that calling RM_UnblockClient for every blocked client is their + * responsibility, and if they fail to do so memory may leak. Ideally they + * should implement the disconnect and timeout callbacks and call + * RM_UnblockClient, but any other way is also acceptable. */ + if (bc->blocked_on_keys && !bc->unblocked) moduleUnblockClient(c); bc->client = NULL; @@ -4489,6 +4492,10 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) { * * free_privdata: called in order to free the private data that is passed * by RedisModule_UnblockClient() call. + * + * Note: RedisModule_UnblockClient should be called for every blocked client, + * even if client was killed, timed-out or disconnected. Failing to do so + * will result in memory leaks. */ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL); @@ -4543,7 +4550,15 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc * freed using the free_privdata callback provided by the user. * * However the reply callback will be able to access the argument vector of - * the command, so the private data is often not needed. */ + * the command, so the private data is often not needed. + * + * Note: Under normal circumstances RedisModule_UnblockClient should not be + * called for clients that are blocked on keys (Either the key will + * become ready or a timeout will occur). If for some reason you do want + * to call RedisModule_UnblockClient it is possible: Client will be + * handled as if it were timed-out (You must implement the timeout + * callback in that case). + */ RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata); } @@ -4821,9 +4836,9 @@ int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) { * * To call non-reply APIs, the thread safe context must be prepared with: * - * RedisModule_ThreadSafeCallStart(ctx); + * RedisModule_ThreadSafeContextLock(ctx); * ... make your call here ... - * RedisModule_ThreadSafeCallStop(ctx); + * RedisModule_ThreadSafeContextUnlock(ctx); * * This is not needed when using `RedisModule_Reply*` functions, assuming * that a blocked client was used when the context was created, otherwise diff --git a/src/networking.cpp b/src/networking.cpp index 1c8d561e6..5c1519c1c 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1919,6 +1919,12 @@ void resetClient(client *c) { if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand) c->flags &= ~CLIENT_ASKING; + /* We do the same for the CACHING command as well. It also affects + * the next command or transaction executed, in a way very similar + * to ASKING. */ + if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand) + c->flags &= ~CLIENT_TRACKING_CACHING; + /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply * to the next command will be sent, but set the flag if the command * we just processed was "CLIENT REPLY SKIP". */ @@ -2601,7 +2607,7 @@ void clientCommand(client *c) { "REPLY (on|off|skip) -- Control the replies sent to the current connection.", "SETNAME -- Assign the name to the current connection.", "UNBLOCK [TIMEOUT|ERROR] -- Unblock the specified blocked client.", -"TRACKING (on|off) [REDIRECT ] -- Enable client keys tracking for client side caching.", +"TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] [PREFIX second] [OPTIN] [OPTOUT]... -- Enable client keys tracking for client side caching.", "GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.", NULL }; @@ -2789,9 +2795,9 @@ NULL addReply(c,shared.ok); } else if (!strcasecmp(szFromObj(c->argv[1]),"tracking") && c->argc >= 3) { /* CLIENT TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] - * [PREFIX second] ... */ + * [PREFIX second] [OPTIN] [OPTOUT] ... */ long long redir = 0; - int bcast = 0; + uint64_t options = 0; robj **prefix = NULL; size_t numprefix = 0; @@ -2801,18 +2807,34 @@ NULL if (!strcasecmp(szFromObj(c->argv[j]),"redirect") && moreargs) { j++; + if (redir != 0) { + addReplyError(c,"A client can only redirect to a single " + "other client"); + zfree(prefix); + return; + } + if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) != - C_OK) return; + C_OK) + { + zfree(prefix); + return; + } /* We will require the client with the specified ID to exist * right now, even if it is possible that it gets disconnected * later. Still a valid sanity check. */ if (lookupClientByID(redir) == NULL) { addReplyError(c,"The client ID you want redirect to " "does not exist"); + zfree(prefix); return; } } else if (!strcasecmp(szFromObj(c->argv[j]),"bcast")) { - bcast++; + options |= CLIENT_TRACKING_BCAST; + } else if (!strcasecmp(szFromObj(c->argv[j]),"optin")) { + options |= CLIENT_TRACKING_OPTIN; + } else if (!strcasecmp(szFromObj(c->argv[j]),"optout")) { + options |= CLIENT_TRACKING_OPTOUT; } else if (!strcasecmp(szFromObj(c->argv[j]),"prefix") && moreargs) { j++; prefix = (robj**)zrealloc(prefix,sizeof(robj*)*(numprefix+1), MALLOC_LOCAL); @@ -2828,7 +2850,7 @@ NULL if (!strcasecmp(szFromObj(c->argv[2]),"on")) { /* Before enabling tracking, make sure options are compatible * among each other and with the current state of the client. */ - if (!bcast && numprefix) { + if (!(options & CLIENT_TRACKING_BCAST) && numprefix) { addReplyError(c, "PREFIX option requires BCAST mode to be enabled"); zfree(prefix); @@ -2837,7 +2859,8 @@ NULL if (c->flags & CLIENT_TRACKING) { int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST); - if (oldbcast != bcast) { + int newbcast = !!(options & CLIENT_TRACKING_BCAST); + if (oldbcast != newbcast) { addReplyError(c, "You can't switch BCAST mode on/off before disabling " "tracking for this client, and then re-enabling it with " @@ -2846,7 +2869,17 @@ NULL return; } } - enableTracking(c,redir,bcast,prefix,numprefix); + + if (options & CLIENT_TRACKING_BCAST && + options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT)) + { + addReplyError(c, + "OPTIN and OPTOUT are not compatible with BCAST"); + zfree(prefix); + return; + } + + enableTracking(c,redir,options,prefix,numprefix); } else if (!strcasecmp(szFromObj(c->argv[2]),"off")) { disableTracking(c); } else { @@ -2856,6 +2889,36 @@ NULL } zfree(prefix); addReply(c,shared.ok); + } else if (!strcasecmp(szFromObj(c->argv[1]),"caching") && c->argc >= 3) { + if (!(c->flags & CLIENT_TRACKING)) { + addReplyError(c,"CLIENT CACHING can be called only when the " + "client is in tracking mode with OPTIN or " + "OPTOUT mode enabled"); + return; + } + + char *opt = szFromObj(c->argv[2]); + if (!strcasecmp(opt,"yes")) { + if (c->flags & CLIENT_TRACKING_OPTIN) { + c->flags |= CLIENT_TRACKING_CACHING; + } else { + addReplyError(c,"CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode."); + return; + } + } else if (!strcasecmp(opt,"no")) { + if (c->flags & CLIENT_TRACKING_OPTOUT) { + c->flags |= CLIENT_TRACKING_CACHING; + } else { + addReplyError(c,"CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode."); + return; + } + } else { + addReply(c,shared.syntaxerr); + return; + } + + /* Common reply for when we succeeded. */ + addReply(c,shared.ok); } else if (!strcasecmp(szFromObj(c->argv[1]),"getredir") && c->argc == 2) { /* CLIENT GETREDIR */ if (c->flags & CLIENT_TRACKING) { diff --git a/src/replication.cpp b/src/replication.cpp index a0db6481e..2d38ab006 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1718,7 +1718,7 @@ void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(connection *conn) { - char buf[4096]; + char buf[PROTO_IOBUF_LEN]; ssize_t nread, readlen, nwritten; int use_diskless_load = useDisklessLoad(); redisDb *diskless_load_backup = NULL; diff --git a/src/server.h b/src/server.h index bb3f65c25..9056611ef 100644 --- a/src/server.h +++ b/src/server.h @@ -411,8 +411,12 @@ public: #define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to perform client side caching. */ #define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */ -#define CLIENT_FORCE_REPLY (1ULL<<33) /* Should addReply be forced to write the text? */ -#define CLIENT_TRACKING_BCAST (1ULL<<34) /* Tracking in BCAST mode. */ +#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */ +#define CLIENT_TRACKING_OPTIN (1ULL<<34) /* Tracking in opt-in mode. */ +#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */ +#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given, + depending on optin/optout mode. */ +#define CLIENT_FORCE_REPLY (1ULL<<37) /* Should addReply be forced to write the text? */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -2227,7 +2231,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...); #endif /* Client side caching (tracking mode) */ -void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix); +void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix); void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); diff --git a/src/t_stream.cpp b/src/t_stream.cpp index cd69dbe93..51b5f55cf 100644 --- a/src/t_stream.cpp +++ b/src/t_stream.cpp @@ -853,7 +853,7 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam argv[11] = createStringObject("JUSTID",6); argv[12] = createStringObject("LASTID",6); argv[13] = createObjectFromStreamID(&group->last_id); - propagate(cserver.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(cserver.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[3]); decrRefCount(argv[4]); @@ -880,7 +880,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna argv[2] = key; argv[3] = groupname; argv[4] = createObjectFromStreamID(&group->last_id); - propagate(cserver.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(cserver.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[1]); decrRefCount(argv[4]); diff --git a/src/tracking.cpp b/src/tracking.cpp index 50f03c946..e82d358cc 100644 --- a/src/tracking.cpp +++ b/src/tracking.cpp @@ -44,7 +44,7 @@ rax *TrackingTable = NULL; rax *PrefixTable = NULL; uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across - the whole tracking table. This givesn + the whole tracking table. This gives an hint about the total memory we are using server side for CSC. */ robj *TrackingChannelName; @@ -93,7 +93,8 @@ void disableTracking(client *c) { if (c->flags & CLIENT_TRACKING) { g_pserver->tracking_clients--; c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| - CLIENT_TRACKING_BCAST); + CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN| + CLIENT_TRACKING_OPTOUT); } } @@ -124,10 +125,11 @@ void enableBcastTrackingForPrefix(client *c, const char *prefix, size_t plen) { * eventually get freed, we'll send a message to the original client to * inform it of the condition. Multiple clients can redirect the invalidation * messages to the same client ID. */ -void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) { +void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) { if (!(c->flags & CLIENT_TRACKING)) g_pserver->tracking_clients++; c->flags |= CLIENT_TRACKING; - c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST); + c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST| + CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT); c->client_tracking_redirection = redirect_to; if (TrackingTable == NULL) { TrackingTable = raxNew(); @@ -135,7 +137,7 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s TrackingChannelName = createStringObject("__redis__:invalidate",20); } - if (bcast) { + if (options & CLIENT_TRACKING_BCAST) { c->flags |= CLIENT_TRACKING_BCAST; if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); for (size_t j = 0; j < numprefix; j++) { @@ -143,14 +145,23 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix)); } } + c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT); } -/* This function is called after the excution of a readonly command in the - * case the client 'c' has keys tracking enabled. It will populate the - * tracking ivalidation table according to the keys the user fetched, so that - * Redis will know what are the clients that should receive an invalidation - * message with certain groups of keys are modified. */ +/* This function is called after the execution of a readonly command in the + * case the client 'c' has keys tracking enabled and the tracking is not + * in BCAST mode. It will populate the tracking invalidation table according + * to the keys the user fetched, so that Redis will know what are the clients + * that should receive an invalidation message with certain groups of keys + * are modified. */ void trackingRememberKeys(client *c) { + /* Return if we are in optin/out mode and the right CACHING command + * was/wasn't given in order to modify the default behavior. */ + uint64_t optin = c->flags & CLIENT_TRACKING_OPTIN; + uint64_t optout = c->flags & CLIENT_TRACKING_OPTOUT; + uint64_t caching_given = c->flags & CLIENT_TRACKING_CACHING; + if ((optin && !caching_given) || (optout && caching_given)) return; + int numkeys; int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys); if (keys == NULL) return; @@ -292,19 +303,12 @@ void trackingInvalidateKey(robj *keyobj) { } /* This function is called when one or all the Redis databases are flushed - * (dbid == -1 in case of FLUSHALL). Caching slots are not specific for - * each DB but are global: currently what we do is sending a special + * (dbid == -1 in case of FLUSHALL). Caching keys are not specific for + * each DB but are global: currently what we do is send a special * notification to clients with tracking enabled, invalidating the caching - * slot "-1", which means, "all the keys", in order to avoid flooding clients + * key "", which means, "all the keys", in order to avoid flooding clients * with many invalidation messages for all the keys they may hold. - * - * However trying to flush the tracking table here is very costly: - * we need scanning 16 million caching slots in the table to check - * if they are used, this introduces a big delay. So what we do is to really - * flush the table in the case of FLUSHALL. When a FLUSHDB is called instead - * we just send the invalidation message to all the clients, but don't - * flush the table: it will slowly get garbage collected as more keys - * are modified in the used caching slots. */ + */ void freeTrackingRadixTree(void *rt) { raxFree((rax*)rt); } @@ -325,6 +329,7 @@ void trackingInvalidateKeysOnFlush(int dbid) { /* In case of FLUSHALL, reclaim all the memory used by tracking. */ if (dbid == -1 && TrackingTable) { raxFreeWithCallback(TrackingTable,freeTrackingRadixTree); + TrackingTable = raxNew(); TrackingTableTotalItems = 0; } } diff --git a/tests/modules/fork.c b/tests/modules/fork.c index 1a139ef1b..0443d9ef0 100644 --- a/tests/modules/fork.c +++ b/tests/modules/fork.c @@ -42,7 +42,7 @@ int fork_create(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) /* child */ RedisModule_Log(ctx, "notice", "fork child started"); - usleep(200000); + usleep(500000); RedisModule_Log(ctx, "notice", "fork child exiting"); RedisModule_ExitFromChild(code_to_exit_with); /* unreachable */ diff --git a/tests/modules/misc.c b/tests/modules/misc.c index 41bec06ed..1048d5065 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -74,6 +74,7 @@ int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_ReplyWithError(ctx, err); goto final; } + /* Make sure we can't convert a string that has \0 in it */ char buf[4] = "123"; buf[1] = '\0'; @@ -81,8 +82,11 @@ int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { long double ld3; if (RedisModule_StringToLongDouble(s3, &ld3) == REDISMODULE_OK) { RedisModule_ReplyWithError(ctx, "Invalid string successfully converted to long double"); + RedisModule_FreeString(ctx, s3); goto final; } + RedisModule_FreeString(ctx, s3); + RedisModule_ReplyWithLongDouble(ctx, ld2); final: RedisModule_FreeString(ctx, s1); diff --git a/tests/support/server.tcl b/tests/support/server.tcl index a9dcc754d..bf97124a9 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -53,6 +53,7 @@ proc kill_server config { } # kill server and wait for the process to be totally exited + send_data_packet $::test_server_fd server-killing $pid catch {exec kill $pid} if {$::valgrind} { set max_wait 60000 @@ -140,6 +141,18 @@ proc tags {tags code} { uplevel 1 $code set ::tags [lrange $::tags 0 end-[llength $tags]] } + +# Write the configuration in the dictionary 'config' in the specified +# file name. +proc create_server_config_file {filename config} { + set fp [open $filename w+] + foreach directive [dict keys $config] { + puts -nonewline $fp "$directive " + puts $fp [dict get $config $directive] + } + close $fp +} + proc start_server {options {code undefined}} { # If we are running against an external server, we just push the # host/port pair in the stack the first time @@ -221,56 +234,91 @@ proc start_server {options {code undefined}} { # write new configuration to temporary file set config_file [tmpfile redis.conf] - set fp [open $config_file w+] - foreach directive [dict keys $config] { - puts -nonewline $fp "$directive " - puts $fp [dict get $config $directive] - } - close $fp + create_server_config_file $config_file $config set stdout [format "%s/%s" [dict get $config "dir"] "stdout"] set stderr [format "%s/%s" [dict get $config "dir"] "stderr"] - if {$::valgrind} { - set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/keydb-server $config_file > $stdout 2> $stderr &] - } elseif ($::stack_logging) { - set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/keydb-server $config_file > $stdout 2> $stderr &] - } else { - set pid [exec src/keydb-server $config_file > $stdout 2> $stderr &] - } + # We need a loop here to retry with different ports. + set server_started 0 + while {$server_started == 0} { + if {$::verbose} { + puts -nonewline "=== ($tags) Starting server ${::host}:${::port} " + } - # Tell the test server about this new instance. - send_data_packet $::test_server_fd server-spawned $pid + send_data_packet $::test_server_fd "server-spawning" "port $::port" - # check that the server actually started - # ugly but tries to be as fast as possible... - if {$::valgrind} {set retrynum 1000} else {set retrynum 100} + if {$::valgrind} { + set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/keydb-server $config_file > $stdout 2> $stderr &] + } elseif ($::stack_logging) { + set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/keydb-server $config_file > $stdout 2> $stderr &] + } else { + set pid [exec src/keydb-server $config_file > $stdout 2> $stderr &] + } - if {$::verbose} { - puts -nonewline "=== ($tags) Starting server ${::host}:${::port} " - } + # Tell the test server about this new instance. + send_data_packet $::test_server_fd server-spawned $pid - if {$code ne "undefined"} { - set serverisup [server_is_up $::host $::port $retrynum] - } else { - set serverisup 1 - } + # check that the server actually started + # ugly but tries to be as fast as possible... + if {$::valgrind} {set retrynum 1000} else {set retrynum 100} - if {$::verbose} { - puts "" - } + # Wait for actual startup + set checkperiod 100; # Milliseconds + set maxiter [expr {120*1000/100}] ; # Wait up to 2 minutes. + set port_busy 0 + while {![info exists _pid]} { + regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid + after $checkperiod + incr maxiter -1 + if {$maxiter == 0} { + start_server_error $config_file "No PID detected in log $stdout" + puts "--- LOG CONTENT ---" + puts [exec cat $stdout] + puts "-------------------" + break + } - if {!$serverisup} { - set err {} - append err [exec cat $stdout] "\n" [exec cat $stderr] - start_server_error $config_file $err - return - } + # Check if the port is actually busy and the server failed + # for this reason. + if {[regexp {Could not create server TCP} [exec cat $stdout]]} { + set port_busy 1 + break + } + } - # Wait for actual startup - while {![info exists _pid]} { - regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid - after 100 + # Sometimes we have to try a different port, even if we checked + # for availability. Other test clients may grab the port before we + # are able to do it for example. + if {$port_busy} { + puts "Port $::port was already busy, trying another port..." + set ::port [find_available_port [expr {$::port+1}]] + if {$::tls} { + dict set config "tls-port" $::port + } else { + dict set config port $::port + } + create_server_config_file $config_file $config + continue; # Try again + } + + if {$code ne "undefined"} { + set serverisup [server_is_up $::host $::port $retrynum] + } else { + set serverisup 1 + } + + if {$::verbose} { + puts "" + } + + if {!$serverisup} { + set err {} + append err [exec cat $stdout] "\n" [exec cat $stderr] + start_server_error $config_file $err + return + } + set server_started 1 } # setup properties to be able to initialize a client object diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index dfcdb8e35..5731a6eec 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -294,7 +294,7 @@ proc read_from_test_client fd { puts "\[$completed_tests_count/$all_tests_count [colorstr yellow $status]\]: $data ($elapsed seconds)" lappend ::clients_time_history $elapsed $data signal_idle_client $fd - set ::active_clients_task($fd) DONE + set ::active_clients_task($fd) "(DONE) $data" } elseif {$status eq {ok}} { if {!$::quiet} { puts "\[[colorstr green $status]\]: $data" @@ -325,10 +325,16 @@ proc read_from_test_client fd { exit 1 } elseif {$status eq {testing}} { set ::active_clients_task($fd) "(IN PROGRESS) $data" + } elseif {$status eq {server-spawning}} { + set ::active_clients_task($fd) "(SPAWNING SERVER) $data" } elseif {$status eq {server-spawned}} { lappend ::active_servers $data + set ::active_clients_task($fd) "(SPAWNED SERVER) pid:$data" + } elseif {$status eq {server-killing}} { + set ::active_clients_task($fd) "(KILLING SERVER) pid:$data" } elseif {$status eq {server-killed}} { set ::active_servers [lsearch -all -inline -not -exact $::active_servers $data] + set ::active_clients_task($fd) "(KILLED SERVER) pid:$data" } else { if {!$::quiet} { puts "\[$status\]: $data" @@ -338,7 +344,7 @@ proc read_from_test_client fd { proc show_clients_state {} { # The following loop is only useful for debugging tests that may - # enter an infinite loop. Commented out normally. + # enter an infinite loop. foreach x $::active_clients { if {[info exist ::active_clients_task($x)]} { puts "$x => $::active_clients_task($x)" @@ -368,8 +374,6 @@ proc signal_idle_client fd { set ::active_clients \ [lsearch -all -inline -not -exact $::active_clients $fd] - if 0 {show_clients_state} - # New unit to process? if {$::next_test != [llength $::all_tests]} { if {!$::quiet} { @@ -385,6 +389,7 @@ proc signal_idle_client fd { } } else { lappend ::idle_clients $fd + set ::active_clients_task($fd) "SLEEPING, no more units to assign" if {[llength $::active_clients] == 0} { the_end } diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index ec80c7384..06b0e07d7 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -39,6 +39,8 @@ start_server {tags {"memefficiency"}} { start_server {tags {"defrag"}} { if {[string match {*jemalloc*} [s mem_allocator]]} { test "Active defrag" { + r config set save "" ;# prevent bgsave from interfereing with save below + r config set hz 100 r config set activedefrag no r config set active-defrag-threshold-lower 5 r config set active-defrag-cycle-min 65 @@ -46,8 +48,8 @@ start_server {tags {"defrag"}} { r config set active-defrag-ignore-bytes 2mb r config set maxmemory 100mb r config set maxmemory-policy allkeys-lru - r debug populate 700000 asdf 150 - r debug populate 170000 asdf 300 + r debug populate 700000 asdf1 150 + r debug populate 170000 asdf2 300 r ping ;# trigger eviction following the previous population after 120 ;# serverCron only updates the info once in 100ms set frag [s allocator_frag_ratio] @@ -55,6 +57,11 @@ start_server {tags {"defrag"}} { puts "frag $frag" } assert {$frag >= 1.4} + + r config set latency-monitor-threshold 5 + r latency reset + r config set maxmemory 110mb ;# prevent further eviction (not to fail the digest test) + set digest [r debug digest] catch {r config set activedefrag yes} e if {![string match {DISABLED*} $e]} { # Wait for the active defrag to start working (decision once a @@ -78,19 +85,37 @@ start_server {tags {"defrag"}} { # Test the the fragmentation is lower. after 120 ;# serverCron only updates the info once in 100ms set frag [s allocator_frag_ratio] + set max_latency 0 + foreach event [r latency latest] { + lassign $event eventname time latency max + if {$eventname == "active-defrag-cycle"} { + set max_latency $max + } + } if {$::verbose} { puts "frag $frag" + puts "max latency $max_latency" + puts [r latency latest] + puts [r latency history active-defrag-cycle] } assert {$frag < 1.1} + # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, + # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher + assert {$max_latency <= 30} } else { set _ "" } - } {} + # verify the data isn't corrupted or changed + set newdigest [r debug digest] + assert {$digest eq $newdigest} + r save ;# saving an rdb iterates over all the data / pointers + } {OK} test "Active defrag big keys" { r flushdb r config resetstat r config set save "" ;# prevent bgsave from interfereing with save below + r config set hz 100 r config set activedefrag no r config set active-defrag-max-scan-fields 1000 r config set active-defrag-threshold-lower 5 @@ -142,7 +167,7 @@ start_server {tags {"defrag"}} { for {set j 0} {$j < 500000} {incr j} { $rd read ; # Discard replies } - assert {[r dbsize] == 500010} + assert_equal [r dbsize] 500010 # create some fragmentation for {set j 0} {$j < 500000} {incr j 2} { @@ -151,7 +176,7 @@ start_server {tags {"defrag"}} { for {set j 0} {$j < 500000} {incr j 2} { $rd read ; # Discard replies } - assert {[r dbsize] == 250010} + assert_equal [r dbsize] 250010 # start defrag after 120 ;# serverCron only updates the info once in 100ms @@ -200,9 +225,9 @@ start_server {tags {"defrag"}} { puts [r latency history active-defrag-cycle] } assert {$frag < 1.1} - # due to high fragmentation, 10hz, and active-defrag-cycle-max set to 75, - # we expect max latency to be not much higher than 75ms - assert {$max_latency <= 120} + # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, + # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher + assert {$max_latency <= 30} } # verify the data isn't corrupted or changed set newdigest [r debug digest] @@ -292,8 +317,8 @@ start_server {tags {"defrag"}} { } assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, - # we expect max latency to be not much higher than 7.5ms - assert {$max_latency <= 12} + # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher + assert {$max_latency <= 30} } # verify the data isn't corrupted or changed set newdigest [r debug digest] diff --git a/tests/unit/moduleapi/fork.tcl b/tests/unit/moduleapi/fork.tcl index f7d7e47d5..8535a3382 100644 --- a/tests/unit/moduleapi/fork.tcl +++ b/tests/unit/moduleapi/fork.tcl @@ -20,9 +20,8 @@ start_server {tags {"modules"}} { test {Module fork kill} { r fork.create 3 - after 20 + after 250 r fork.kill - after 100 assert {[count_log_message "fork child started"] eq "2"} assert {[count_log_message "Received SIGUSR1 in child"] eq "1"}