Merge commit '973297336fc05a601e17be70aba88e5dca6480ae' into redis_6_merge

Former-commit-id: ef1236b6009ebd7b00f6dd2f43df57ad95e51253
This commit is contained in:
John Sully 2020-04-14 20:19:48 -04:00
commit f27524674a
14 changed files with 281 additions and 111 deletions

View File

@ -3,11 +3,8 @@ name: CI
on: [push, pull_request] on: [push, pull_request]
jobs: jobs:
build-ubuntu: test-ubuntu-latest:
strategy: runs-on: ubuntu-latest
matrix:
platform: [ubuntu-latest, ubuntu-16.04]
runs-on: ${{ matrix.platform }}
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- name: make - name: make
@ -15,13 +12,17 @@ jobs:
- name: test - name: test
run: | run: |
sudo apt-get install tcl8.5 sudo apt-get install tcl8.5
make test ./runtest --clients 2 --verbose
build-macos-latest: build-ubuntu-old:
strategy: runs-on: ubuntu-16.04
matrix: steps:
platform: [macos-latest, macOS-10.14] - uses: actions/checkout@v1
runs-on: ${{ matrix.platform }} - name: make
run: make
build-macos-latest:
runs-on: macos-latest
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- name: make - name: make

View File

@ -372,6 +372,7 @@ void debugCommand(client *c) {
"LOADAOF -- Flush the AOF buffers on disk and reload the AOF in memory.", "LOADAOF -- Flush the AOF buffers on disk and reload the AOF in memory.",
"LUA-ALWAYS-REPLICATE-COMMANDS <0|1> -- Setting it to 1 makes Lua replication defaulting to replicating single commands, without the script having to enable effects replication.", "LUA-ALWAYS-REPLICATE-COMMANDS <0|1> -- Setting it to 1 makes Lua replication defaulting to replicating single commands, without the script having to enable effects replication.",
"OBJECT <key> -- Show low level info about key and associated value.", "OBJECT <key> -- Show low level info about key and associated value.",
"OOM -- Crash the server simulating an out-of-memory error.",
"PANIC -- Crash the server simulating a panic.", "PANIC -- Crash the server simulating a panic.",
"POPULATE <count> [prefix] [size] -- Create <count> string keys named key:<num>. If a prefix is specified is used instead of the 'key' prefix.", "POPULATE <count> [prefix] [size] -- Create <count> string keys named key:<num>. If a prefix is specified is used instead of the 'key' prefix.",
"RELOAD -- Save the RDB on disk and reload it back in memory.", "RELOAD -- Save the RDB on disk and reload it back in memory.",

View File

@ -4370,12 +4370,15 @@ void unblockClientFromModule(client *c) {
* We must call moduleUnblockClient in order to free privdata and * We must call moduleUnblockClient in order to free privdata and
* RedisModuleBlockedClient. * RedisModuleBlockedClient.
* *
* Note that clients implementing threads and working with private data, * Note that we only do that for clients that are blocked on keys, for which
* should make sure to stop the threads or protect the private data * the contract is that the module should not call RM_UnblockClient under
* in some other way in the disconnection and timeout callback, because * normal circumstances.
* here we are going to free the private data associated with the * Clients implementing threads and working with private data should be
* blocked client. */ * aware that calling RM_UnblockClient for every blocked client is their
if (!bc->unblocked) * responsibility, and if they fail to do so memory may leak. Ideally they
* should implement the disconnect and timeout callbacks and call
* RM_UnblockClient, but any other way is also acceptable. */
if (bc->blocked_on_keys && !bc->unblocked)
moduleUnblockClient(c); moduleUnblockClient(c);
bc->client = NULL; bc->client = NULL;
@ -4489,6 +4492,10 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
* *
* free_privdata: called in order to free the private data that is passed * free_privdata: called in order to free the private data that is passed
* by RedisModule_UnblockClient() call. * by RedisModule_UnblockClient() call.
*
* Note: RedisModule_UnblockClient should be called for every blocked client,
* even if client was killed, timed-out or disconnected. Failing to do so
* will result in memory leaks.
*/ */
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL); return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
@ -4543,7 +4550,15 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
* freed using the free_privdata callback provided by the user. * freed using the free_privdata callback provided by the user.
* *
* However the reply callback will be able to access the argument vector of * However the reply callback will be able to access the argument vector of
* the command, so the private data is often not needed. */ * the command, so the private data is often not needed.
*
* Note: Under normal circumstances RedisModule_UnblockClient should not be
* called for clients that are blocked on keys (Either the key will
* become ready or a timeout will occur). If for some reason you do want
* to call RedisModule_UnblockClient it is possible: Client will be
* handled as if it were timed-out (You must implement the timeout
* callback in that case).
*/
RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata); return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
} }
@ -4821,9 +4836,9 @@ int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) {
* *
* To call non-reply APIs, the thread safe context must be prepared with: * To call non-reply APIs, the thread safe context must be prepared with:
* *
* RedisModule_ThreadSafeCallStart(ctx); * RedisModule_ThreadSafeContextLock(ctx);
* ... make your call here ... * ... make your call here ...
* RedisModule_ThreadSafeCallStop(ctx); * RedisModule_ThreadSafeContextUnlock(ctx);
* *
* This is not needed when using `RedisModule_Reply*` functions, assuming * This is not needed when using `RedisModule_Reply*` functions, assuming
* that a blocked client was used when the context was created, otherwise * that a blocked client was used when the context was created, otherwise

View File

@ -1919,6 +1919,12 @@ void resetClient(client *c) {
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand) if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
c->flags &= ~CLIENT_ASKING; c->flags &= ~CLIENT_ASKING;
/* We do the same for the CACHING command as well. It also affects
* the next command or transaction executed, in a way very similar
* to ASKING. */
if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand)
c->flags &= ~CLIENT_TRACKING_CACHING;
/* Remove the CLIENT_REPLY_SKIP flag if any so that the reply /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
* to the next command will be sent, but set the flag if the command * to the next command will be sent, but set the flag if the command
* we just processed was "CLIENT REPLY SKIP". */ * we just processed was "CLIENT REPLY SKIP". */
@ -2601,7 +2607,7 @@ void clientCommand(client *c) {
"REPLY (on|off|skip) -- Control the replies sent to the current connection.", "REPLY (on|off|skip) -- Control the replies sent to the current connection.",
"SETNAME <name> -- Assign the name <name> to the current connection.", "SETNAME <name> -- Assign the name <name> to the current connection.",
"UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.", "UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
"TRACKING (on|off) [REDIRECT <id>] -- Enable client keys tracking for client side caching.", "TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] [PREFIX second] [OPTIN] [OPTOUT]... -- Enable client keys tracking for client side caching.",
"GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.", "GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.",
NULL NULL
}; };
@ -2789,9 +2795,9 @@ NULL
addReply(c,shared.ok); addReply(c,shared.ok);
} else if (!strcasecmp(szFromObj(c->argv[1]),"tracking") && c->argc >= 3) { } else if (!strcasecmp(szFromObj(c->argv[1]),"tracking") && c->argc >= 3) {
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] /* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
* [PREFIX second] ... */ * [PREFIX second] [OPTIN] [OPTOUT] ... */
long long redir = 0; long long redir = 0;
int bcast = 0; uint64_t options = 0;
robj **prefix = NULL; robj **prefix = NULL;
size_t numprefix = 0; size_t numprefix = 0;
@ -2801,18 +2807,34 @@ NULL
if (!strcasecmp(szFromObj(c->argv[j]),"redirect") && moreargs) { if (!strcasecmp(szFromObj(c->argv[j]),"redirect") && moreargs) {
j++; j++;
if (redir != 0) {
addReplyError(c,"A client can only redirect to a single "
"other client");
zfree(prefix);
return;
}
if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) != if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) !=
C_OK) return; C_OK)
{
zfree(prefix);
return;
}
/* We will require the client with the specified ID to exist /* We will require the client with the specified ID to exist
* right now, even if it is possible that it gets disconnected * right now, even if it is possible that it gets disconnected
* later. Still a valid sanity check. */ * later. Still a valid sanity check. */
if (lookupClientByID(redir) == NULL) { if (lookupClientByID(redir) == NULL) {
addReplyError(c,"The client ID you want redirect to " addReplyError(c,"The client ID you want redirect to "
"does not exist"); "does not exist");
zfree(prefix);
return; return;
} }
} else if (!strcasecmp(szFromObj(c->argv[j]),"bcast")) { } else if (!strcasecmp(szFromObj(c->argv[j]),"bcast")) {
bcast++; options |= CLIENT_TRACKING_BCAST;
} else if (!strcasecmp(szFromObj(c->argv[j]),"optin")) {
options |= CLIENT_TRACKING_OPTIN;
} else if (!strcasecmp(szFromObj(c->argv[j]),"optout")) {
options |= CLIENT_TRACKING_OPTOUT;
} else if (!strcasecmp(szFromObj(c->argv[j]),"prefix") && moreargs) { } else if (!strcasecmp(szFromObj(c->argv[j]),"prefix") && moreargs) {
j++; j++;
prefix = (robj**)zrealloc(prefix,sizeof(robj*)*(numprefix+1), MALLOC_LOCAL); prefix = (robj**)zrealloc(prefix,sizeof(robj*)*(numprefix+1), MALLOC_LOCAL);
@ -2828,7 +2850,7 @@ NULL
if (!strcasecmp(szFromObj(c->argv[2]),"on")) { if (!strcasecmp(szFromObj(c->argv[2]),"on")) {
/* Before enabling tracking, make sure options are compatible /* Before enabling tracking, make sure options are compatible
* among each other and with the current state of the client. */ * among each other and with the current state of the client. */
if (!bcast && numprefix) { if (!(options & CLIENT_TRACKING_BCAST) && numprefix) {
addReplyError(c, addReplyError(c,
"PREFIX option requires BCAST mode to be enabled"); "PREFIX option requires BCAST mode to be enabled");
zfree(prefix); zfree(prefix);
@ -2837,7 +2859,8 @@ NULL
if (c->flags & CLIENT_TRACKING) { if (c->flags & CLIENT_TRACKING) {
int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST); int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST);
if (oldbcast != bcast) { int newbcast = !!(options & CLIENT_TRACKING_BCAST);
if (oldbcast != newbcast) {
addReplyError(c, addReplyError(c,
"You can't switch BCAST mode on/off before disabling " "You can't switch BCAST mode on/off before disabling "
"tracking for this client, and then re-enabling it with " "tracking for this client, and then re-enabling it with "
@ -2846,7 +2869,17 @@ NULL
return; return;
} }
} }
enableTracking(c,redir,bcast,prefix,numprefix);
if (options & CLIENT_TRACKING_BCAST &&
options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT))
{
addReplyError(c,
"OPTIN and OPTOUT are not compatible with BCAST");
zfree(prefix);
return;
}
enableTracking(c,redir,options,prefix,numprefix);
} else if (!strcasecmp(szFromObj(c->argv[2]),"off")) { } else if (!strcasecmp(szFromObj(c->argv[2]),"off")) {
disableTracking(c); disableTracking(c);
} else { } else {
@ -2856,6 +2889,36 @@ NULL
} }
zfree(prefix); zfree(prefix);
addReply(c,shared.ok); addReply(c,shared.ok);
} else if (!strcasecmp(szFromObj(c->argv[1]),"caching") && c->argc >= 3) {
if (!(c->flags & CLIENT_TRACKING)) {
addReplyError(c,"CLIENT CACHING can be called only when the "
"client is in tracking mode with OPTIN or "
"OPTOUT mode enabled");
return;
}
char *opt = szFromObj(c->argv[2]);
if (!strcasecmp(opt,"yes")) {
if (c->flags & CLIENT_TRACKING_OPTIN) {
c->flags |= CLIENT_TRACKING_CACHING;
} else {
addReplyError(c,"CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode.");
return;
}
} else if (!strcasecmp(opt,"no")) {
if (c->flags & CLIENT_TRACKING_OPTOUT) {
c->flags |= CLIENT_TRACKING_CACHING;
} else {
addReplyError(c,"CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode.");
return;
}
} else {
addReply(c,shared.syntaxerr);
return;
}
/* Common reply for when we succeeded. */
addReply(c,shared.ok);
} else if (!strcasecmp(szFromObj(c->argv[1]),"getredir") && c->argc == 2) { } else if (!strcasecmp(szFromObj(c->argv[1]),"getredir") && c->argc == 2) {
/* CLIENT GETREDIR */ /* CLIENT GETREDIR */
if (c->flags & CLIENT_TRACKING) { if (c->flags & CLIENT_TRACKING) {

View File

@ -1718,7 +1718,7 @@ void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags
/* Asynchronously read the SYNC payload we receive from a master */ /* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(connection *conn) { void readSyncBulkPayload(connection *conn) {
char buf[4096]; char buf[PROTO_IOBUF_LEN];
ssize_t nread, readlen, nwritten; ssize_t nread, readlen, nwritten;
int use_diskless_load = useDisklessLoad(); int use_diskless_load = useDisklessLoad();
redisDb *diskless_load_backup = NULL; redisDb *diskless_load_backup = NULL;

View File

@ -411,8 +411,12 @@ public:
#define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to #define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to
perform client side caching. */ perform client side caching. */
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */ #define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
#define CLIENT_FORCE_REPLY (1ULL<<33) /* Should addReply be forced to write the text? */ #define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */
#define CLIENT_TRACKING_BCAST (1ULL<<34) /* Tracking in BCAST mode. */ #define CLIENT_TRACKING_OPTIN (1ULL<<34) /* Tracking in opt-in mode. */
#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */
#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given,
depending on optin/optout mode. */
#define CLIENT_FORCE_REPLY (1ULL<<37) /* Should addReply be forced to write the text? */
/* Client block type (btype field in client structure) /* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */ * if CLIENT_BLOCKED flag is set. */
@ -2227,7 +2231,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
#endif #endif
/* Client side caching (tracking mode) */ /* Client side caching (tracking mode) */
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix); void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix);
void disableTracking(client *c); void disableTracking(client *c);
void trackingRememberKeys(client *c); void trackingRememberKeys(client *c);
void trackingInvalidateKey(robj *keyobj); void trackingInvalidateKey(robj *keyobj);

View File

@ -853,7 +853,7 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
argv[11] = createStringObject("JUSTID",6); argv[11] = createStringObject("JUSTID",6);
argv[12] = createStringObject("LASTID",6); argv[12] = createStringObject("LASTID",6);
argv[13] = createObjectFromStreamID(&group->last_id); argv[13] = createObjectFromStreamID(&group->last_id);
propagate(cserver.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); alsoPropagate(cserver.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]); decrRefCount(argv[0]);
decrRefCount(argv[3]); decrRefCount(argv[3]);
decrRefCount(argv[4]); decrRefCount(argv[4]);
@ -880,7 +880,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
argv[2] = key; argv[2] = key;
argv[3] = groupname; argv[3] = groupname;
argv[4] = createObjectFromStreamID(&group->last_id); argv[4] = createObjectFromStreamID(&group->last_id);
propagate(cserver.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); alsoPropagate(cserver.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]); decrRefCount(argv[0]);
decrRefCount(argv[1]); decrRefCount(argv[1]);
decrRefCount(argv[4]); decrRefCount(argv[4]);

View File

@ -44,7 +44,7 @@
rax *TrackingTable = NULL; rax *TrackingTable = NULL;
rax *PrefixTable = NULL; rax *PrefixTable = NULL;
uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
the whole tracking table. This givesn the whole tracking table. This gives
an hint about the total memory we an hint about the total memory we
are using server side for CSC. */ are using server side for CSC. */
robj *TrackingChannelName; robj *TrackingChannelName;
@ -93,7 +93,8 @@ void disableTracking(client *c) {
if (c->flags & CLIENT_TRACKING) { if (c->flags & CLIENT_TRACKING) {
g_pserver->tracking_clients--; g_pserver->tracking_clients--;
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
CLIENT_TRACKING_BCAST); CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN|
CLIENT_TRACKING_OPTOUT);
} }
} }
@ -124,10 +125,11 @@ void enableBcastTrackingForPrefix(client *c, const char *prefix, size_t plen) {
* eventually get freed, we'll send a message to the original client to * eventually get freed, we'll send a message to the original client to
* inform it of the condition. Multiple clients can redirect the invalidation * inform it of the condition. Multiple clients can redirect the invalidation
* messages to the same client ID. */ * messages to the same client ID. */
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) { void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) {
if (!(c->flags & CLIENT_TRACKING)) g_pserver->tracking_clients++; if (!(c->flags & CLIENT_TRACKING)) g_pserver->tracking_clients++;
c->flags |= CLIENT_TRACKING; c->flags |= CLIENT_TRACKING;
c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST); c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST|
CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT);
c->client_tracking_redirection = redirect_to; c->client_tracking_redirection = redirect_to;
if (TrackingTable == NULL) { if (TrackingTable == NULL) {
TrackingTable = raxNew(); TrackingTable = raxNew();
@ -135,7 +137,7 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s
TrackingChannelName = createStringObject("__redis__:invalidate",20); TrackingChannelName = createStringObject("__redis__:invalidate",20);
} }
if (bcast) { if (options & CLIENT_TRACKING_BCAST) {
c->flags |= CLIENT_TRACKING_BCAST; c->flags |= CLIENT_TRACKING_BCAST;
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
for (size_t j = 0; j < numprefix; j++) { for (size_t j = 0; j < numprefix; j++) {
@ -143,14 +145,23 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s
enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix)); enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
} }
} }
c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT);
} }
/* This function is called after the excution of a readonly command in the /* This function is called after the execution of a readonly command in the
* case the client 'c' has keys tracking enabled. It will populate the * case the client 'c' has keys tracking enabled and the tracking is not
* tracking ivalidation table according to the keys the user fetched, so that * in BCAST mode. It will populate the tracking invalidation table according
* Redis will know what are the clients that should receive an invalidation * to the keys the user fetched, so that Redis will know what are the clients
* message with certain groups of keys are modified. */ * that should receive an invalidation message with certain groups of keys
* are modified. */
void trackingRememberKeys(client *c) { void trackingRememberKeys(client *c) {
/* Return if we are in optin/out mode and the right CACHING command
* was/wasn't given in order to modify the default behavior. */
uint64_t optin = c->flags & CLIENT_TRACKING_OPTIN;
uint64_t optout = c->flags & CLIENT_TRACKING_OPTOUT;
uint64_t caching_given = c->flags & CLIENT_TRACKING_CACHING;
if ((optin && !caching_given) || (optout && caching_given)) return;
int numkeys; int numkeys;
int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys); int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys);
if (keys == NULL) return; if (keys == NULL) return;
@ -292,19 +303,12 @@ void trackingInvalidateKey(robj *keyobj) {
} }
/* This function is called when one or all the Redis databases are flushed /* This function is called when one or all the Redis databases are flushed
* (dbid == -1 in case of FLUSHALL). Caching slots are not specific for * (dbid == -1 in case of FLUSHALL). Caching keys are not specific for
* each DB but are global: currently what we do is sending a special * each DB but are global: currently what we do is send a special
* notification to clients with tracking enabled, invalidating the caching * notification to clients with tracking enabled, invalidating the caching
* slot "-1", which means, "all the keys", in order to avoid flooding clients * key "", which means, "all the keys", in order to avoid flooding clients
* with many invalidation messages for all the keys they may hold. * with many invalidation messages for all the keys they may hold.
* */
* However trying to flush the tracking table here is very costly:
* we need scanning 16 million caching slots in the table to check
* if they are used, this introduces a big delay. So what we do is to really
* flush the table in the case of FLUSHALL. When a FLUSHDB is called instead
* we just send the invalidation message to all the clients, but don't
* flush the table: it will slowly get garbage collected as more keys
* are modified in the used caching slots. */
void freeTrackingRadixTree(void *rt) { void freeTrackingRadixTree(void *rt) {
raxFree((rax*)rt); raxFree((rax*)rt);
} }
@ -325,6 +329,7 @@ void trackingInvalidateKeysOnFlush(int dbid) {
/* In case of FLUSHALL, reclaim all the memory used by tracking. */ /* In case of FLUSHALL, reclaim all the memory used by tracking. */
if (dbid == -1 && TrackingTable) { if (dbid == -1 && TrackingTable) {
raxFreeWithCallback(TrackingTable,freeTrackingRadixTree); raxFreeWithCallback(TrackingTable,freeTrackingRadixTree);
TrackingTable = raxNew();
TrackingTableTotalItems = 0; TrackingTableTotalItems = 0;
} }
} }

View File

@ -42,7 +42,7 @@ int fork_create(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
/* child */ /* child */
RedisModule_Log(ctx, "notice", "fork child started"); RedisModule_Log(ctx, "notice", "fork child started");
usleep(200000); usleep(500000);
RedisModule_Log(ctx, "notice", "fork child exiting"); RedisModule_Log(ctx, "notice", "fork child exiting");
RedisModule_ExitFromChild(code_to_exit_with); RedisModule_ExitFromChild(code_to_exit_with);
/* unreachable */ /* unreachable */

View File

@ -74,6 +74,7 @@ int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_ReplyWithError(ctx, err); RedisModule_ReplyWithError(ctx, err);
goto final; goto final;
} }
/* Make sure we can't convert a string that has \0 in it */ /* Make sure we can't convert a string that has \0 in it */
char buf[4] = "123"; char buf[4] = "123";
buf[1] = '\0'; buf[1] = '\0';
@ -81,8 +82,11 @@ int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
long double ld3; long double ld3;
if (RedisModule_StringToLongDouble(s3, &ld3) == REDISMODULE_OK) { if (RedisModule_StringToLongDouble(s3, &ld3) == REDISMODULE_OK) {
RedisModule_ReplyWithError(ctx, "Invalid string successfully converted to long double"); RedisModule_ReplyWithError(ctx, "Invalid string successfully converted to long double");
RedisModule_FreeString(ctx, s3);
goto final; goto final;
} }
RedisModule_FreeString(ctx, s3);
RedisModule_ReplyWithLongDouble(ctx, ld2); RedisModule_ReplyWithLongDouble(ctx, ld2);
final: final:
RedisModule_FreeString(ctx, s1); RedisModule_FreeString(ctx, s1);

View File

@ -53,6 +53,7 @@ proc kill_server config {
} }
# kill server and wait for the process to be totally exited # kill server and wait for the process to be totally exited
send_data_packet $::test_server_fd server-killing $pid
catch {exec kill $pid} catch {exec kill $pid}
if {$::valgrind} { if {$::valgrind} {
set max_wait 60000 set max_wait 60000
@ -140,6 +141,18 @@ proc tags {tags code} {
uplevel 1 $code uplevel 1 $code
set ::tags [lrange $::tags 0 end-[llength $tags]] set ::tags [lrange $::tags 0 end-[llength $tags]]
} }
# Write the configuration in the dictionary 'config' in the specified
# file name.
proc create_server_config_file {filename config} {
set fp [open $filename w+]
foreach directive [dict keys $config] {
puts -nonewline $fp "$directive "
puts $fp [dict get $config $directive]
}
close $fp
}
proc start_server {options {code undefined}} { proc start_server {options {code undefined}} {
# If we are running against an external server, we just push the # If we are running against an external server, we just push the
# host/port pair in the stack the first time # host/port pair in the stack the first time
@ -221,56 +234,91 @@ proc start_server {options {code undefined}} {
# write new configuration to temporary file # write new configuration to temporary file
set config_file [tmpfile redis.conf] set config_file [tmpfile redis.conf]
set fp [open $config_file w+] create_server_config_file $config_file $config
foreach directive [dict keys $config] {
puts -nonewline $fp "$directive "
puts $fp [dict get $config $directive]
}
close $fp
set stdout [format "%s/%s" [dict get $config "dir"] "stdout"] set stdout [format "%s/%s" [dict get $config "dir"] "stdout"]
set stderr [format "%s/%s" [dict get $config "dir"] "stderr"] set stderr [format "%s/%s" [dict get $config "dir"] "stderr"]
if {$::valgrind} { # We need a loop here to retry with different ports.
set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/keydb-server $config_file > $stdout 2> $stderr &] set server_started 0
} elseif ($::stack_logging) { while {$server_started == 0} {
set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/keydb-server $config_file > $stdout 2> $stderr &] if {$::verbose} {
} else { puts -nonewline "=== ($tags) Starting server ${::host}:${::port} "
set pid [exec src/keydb-server $config_file > $stdout 2> $stderr &] }
}
# Tell the test server about this new instance. send_data_packet $::test_server_fd "server-spawning" "port $::port"
send_data_packet $::test_server_fd server-spawned $pid
# check that the server actually started if {$::valgrind} {
# ugly but tries to be as fast as possible... set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/keydb-server $config_file > $stdout 2> $stderr &]
if {$::valgrind} {set retrynum 1000} else {set retrynum 100} } elseif ($::stack_logging) {
set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/keydb-server $config_file > $stdout 2> $stderr &]
} else {
set pid [exec src/keydb-server $config_file > $stdout 2> $stderr &]
}
if {$::verbose} { # Tell the test server about this new instance.
puts -nonewline "=== ($tags) Starting server ${::host}:${::port} " send_data_packet $::test_server_fd server-spawned $pid
}
if {$code ne "undefined"} { # check that the server actually started
set serverisup [server_is_up $::host $::port $retrynum] # ugly but tries to be as fast as possible...
} else { if {$::valgrind} {set retrynum 1000} else {set retrynum 100}
set serverisup 1
}
if {$::verbose} { # Wait for actual startup
puts "" set checkperiod 100; # Milliseconds
} set maxiter [expr {120*1000/100}] ; # Wait up to 2 minutes.
set port_busy 0
while {![info exists _pid]} {
regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid
after $checkperiod
incr maxiter -1
if {$maxiter == 0} {
start_server_error $config_file "No PID detected in log $stdout"
puts "--- LOG CONTENT ---"
puts [exec cat $stdout]
puts "-------------------"
break
}
if {!$serverisup} { # Check if the port is actually busy and the server failed
set err {} # for this reason.
append err [exec cat $stdout] "\n" [exec cat $stderr] if {[regexp {Could not create server TCP} [exec cat $stdout]]} {
start_server_error $config_file $err set port_busy 1
return break
} }
}
# Wait for actual startup # Sometimes we have to try a different port, even if we checked
while {![info exists _pid]} { # for availability. Other test clients may grab the port before we
regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid # are able to do it for example.
after 100 if {$port_busy} {
puts "Port $::port was already busy, trying another port..."
set ::port [find_available_port [expr {$::port+1}]]
if {$::tls} {
dict set config "tls-port" $::port
} else {
dict set config port $::port
}
create_server_config_file $config_file $config
continue; # Try again
}
if {$code ne "undefined"} {
set serverisup [server_is_up $::host $::port $retrynum]
} else {
set serverisup 1
}
if {$::verbose} {
puts ""
}
if {!$serverisup} {
set err {}
append err [exec cat $stdout] "\n" [exec cat $stderr]
start_server_error $config_file $err
return
}
set server_started 1
} }
# setup properties to be able to initialize a client object # setup properties to be able to initialize a client object

View File

@ -294,7 +294,7 @@ proc read_from_test_client fd {
puts "\[$completed_tests_count/$all_tests_count [colorstr yellow $status]\]: $data ($elapsed seconds)" puts "\[$completed_tests_count/$all_tests_count [colorstr yellow $status]\]: $data ($elapsed seconds)"
lappend ::clients_time_history $elapsed $data lappend ::clients_time_history $elapsed $data
signal_idle_client $fd signal_idle_client $fd
set ::active_clients_task($fd) DONE set ::active_clients_task($fd) "(DONE) $data"
} elseif {$status eq {ok}} { } elseif {$status eq {ok}} {
if {!$::quiet} { if {!$::quiet} {
puts "\[[colorstr green $status]\]: $data" puts "\[[colorstr green $status]\]: $data"
@ -325,10 +325,16 @@ proc read_from_test_client fd {
exit 1 exit 1
} elseif {$status eq {testing}} { } elseif {$status eq {testing}} {
set ::active_clients_task($fd) "(IN PROGRESS) $data" set ::active_clients_task($fd) "(IN PROGRESS) $data"
} elseif {$status eq {server-spawning}} {
set ::active_clients_task($fd) "(SPAWNING SERVER) $data"
} elseif {$status eq {server-spawned}} { } elseif {$status eq {server-spawned}} {
lappend ::active_servers $data lappend ::active_servers $data
set ::active_clients_task($fd) "(SPAWNED SERVER) pid:$data"
} elseif {$status eq {server-killing}} {
set ::active_clients_task($fd) "(KILLING SERVER) pid:$data"
} elseif {$status eq {server-killed}} { } elseif {$status eq {server-killed}} {
set ::active_servers [lsearch -all -inline -not -exact $::active_servers $data] set ::active_servers [lsearch -all -inline -not -exact $::active_servers $data]
set ::active_clients_task($fd) "(KILLED SERVER) pid:$data"
} else { } else {
if {!$::quiet} { if {!$::quiet} {
puts "\[$status\]: $data" puts "\[$status\]: $data"
@ -338,7 +344,7 @@ proc read_from_test_client fd {
proc show_clients_state {} { proc show_clients_state {} {
# The following loop is only useful for debugging tests that may # The following loop is only useful for debugging tests that may
# enter an infinite loop. Commented out normally. # enter an infinite loop.
foreach x $::active_clients { foreach x $::active_clients {
if {[info exist ::active_clients_task($x)]} { if {[info exist ::active_clients_task($x)]} {
puts "$x => $::active_clients_task($x)" puts "$x => $::active_clients_task($x)"
@ -368,8 +374,6 @@ proc signal_idle_client fd {
set ::active_clients \ set ::active_clients \
[lsearch -all -inline -not -exact $::active_clients $fd] [lsearch -all -inline -not -exact $::active_clients $fd]
if 0 {show_clients_state}
# New unit to process? # New unit to process?
if {$::next_test != [llength $::all_tests]} { if {$::next_test != [llength $::all_tests]} {
if {!$::quiet} { if {!$::quiet} {
@ -385,6 +389,7 @@ proc signal_idle_client fd {
} }
} else { } else {
lappend ::idle_clients $fd lappend ::idle_clients $fd
set ::active_clients_task($fd) "SLEEPING, no more units to assign"
if {[llength $::active_clients] == 0} { if {[llength $::active_clients] == 0} {
the_end the_end
} }

View File

@ -39,6 +39,8 @@ start_server {tags {"memefficiency"}} {
start_server {tags {"defrag"}} { start_server {tags {"defrag"}} {
if {[string match {*jemalloc*} [s mem_allocator]]} { if {[string match {*jemalloc*} [s mem_allocator]]} {
test "Active defrag" { test "Active defrag" {
r config set save "" ;# prevent bgsave from interfereing with save below
r config set hz 100
r config set activedefrag no r config set activedefrag no
r config set active-defrag-threshold-lower 5 r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65 r config set active-defrag-cycle-min 65
@ -46,8 +48,8 @@ start_server {tags {"defrag"}} {
r config set active-defrag-ignore-bytes 2mb r config set active-defrag-ignore-bytes 2mb
r config set maxmemory 100mb r config set maxmemory 100mb
r config set maxmemory-policy allkeys-lru r config set maxmemory-policy allkeys-lru
r debug populate 700000 asdf 150 r debug populate 700000 asdf1 150
r debug populate 170000 asdf 300 r debug populate 170000 asdf2 300
r ping ;# trigger eviction following the previous population r ping ;# trigger eviction following the previous population
after 120 ;# serverCron only updates the info once in 100ms after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio] set frag [s allocator_frag_ratio]
@ -55,6 +57,11 @@ start_server {tags {"defrag"}} {
puts "frag $frag" puts "frag $frag"
} }
assert {$frag >= 1.4} assert {$frag >= 1.4}
r config set latency-monitor-threshold 5
r latency reset
r config set maxmemory 110mb ;# prevent further eviction (not to fail the digest test)
set digest [r debug digest]
catch {r config set activedefrag yes} e catch {r config set activedefrag yes} e
if {![string match {DISABLED*} $e]} { if {![string match {DISABLED*} $e]} {
# Wait for the active defrag to start working (decision once a # Wait for the active defrag to start working (decision once a
@ -78,19 +85,37 @@ start_server {tags {"defrag"}} {
# Test the the fragmentation is lower. # Test the the fragmentation is lower.
after 120 ;# serverCron only updates the info once in 100ms after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio] set frag [s allocator_frag_ratio]
set max_latency 0
foreach event [r latency latest] {
lassign $event eventname time latency max
if {$eventname == "active-defrag-cycle"} {
set max_latency $max
}
}
if {$::verbose} { if {$::verbose} {
puts "frag $frag" puts "frag $frag"
puts "max latency $max_latency"
puts [r latency latest]
puts [r latency history active-defrag-cycle]
} }
assert {$frag < 1.1} assert {$frag < 1.1}
# due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
# we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher
assert {$max_latency <= 30}
} else { } else {
set _ "" set _ ""
} }
} {} # verify the data isn't corrupted or changed
set newdigest [r debug digest]
assert {$digest eq $newdigest}
r save ;# saving an rdb iterates over all the data / pointers
} {OK}
test "Active defrag big keys" { test "Active defrag big keys" {
r flushdb r flushdb
r config resetstat r config resetstat
r config set save "" ;# prevent bgsave from interfereing with save below r config set save "" ;# prevent bgsave from interfereing with save below
r config set hz 100
r config set activedefrag no r config set activedefrag no
r config set active-defrag-max-scan-fields 1000 r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5 r config set active-defrag-threshold-lower 5
@ -142,7 +167,7 @@ start_server {tags {"defrag"}} {
for {set j 0} {$j < 500000} {incr j} { for {set j 0} {$j < 500000} {incr j} {
$rd read ; # Discard replies $rd read ; # Discard replies
} }
assert {[r dbsize] == 500010} assert_equal [r dbsize] 500010
# create some fragmentation # create some fragmentation
for {set j 0} {$j < 500000} {incr j 2} { for {set j 0} {$j < 500000} {incr j 2} {
@ -151,7 +176,7 @@ start_server {tags {"defrag"}} {
for {set j 0} {$j < 500000} {incr j 2} { for {set j 0} {$j < 500000} {incr j 2} {
$rd read ; # Discard replies $rd read ; # Discard replies
} }
assert {[r dbsize] == 250010} assert_equal [r dbsize] 250010
# start defrag # start defrag
after 120 ;# serverCron only updates the info once in 100ms after 120 ;# serverCron only updates the info once in 100ms
@ -200,9 +225,9 @@ start_server {tags {"defrag"}} {
puts [r latency history active-defrag-cycle] puts [r latency history active-defrag-cycle]
} }
assert {$frag < 1.1} assert {$frag < 1.1}
# due to high fragmentation, 10hz, and active-defrag-cycle-max set to 75, # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
# we expect max latency to be not much higher than 75ms # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher
assert {$max_latency <= 120} assert {$max_latency <= 30}
} }
# verify the data isn't corrupted or changed # verify the data isn't corrupted or changed
set newdigest [r debug digest] set newdigest [r debug digest]
@ -292,8 +317,8 @@ start_server {tags {"defrag"}} {
} }
assert {$frag < 1.1} assert {$frag < 1.1}
# due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
# we expect max latency to be not much higher than 7.5ms # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher
assert {$max_latency <= 12} assert {$max_latency <= 30}
} }
# verify the data isn't corrupted or changed # verify the data isn't corrupted or changed
set newdigest [r debug digest] set newdigest [r debug digest]

View File

@ -20,9 +20,8 @@ start_server {tags {"modules"}} {
test {Module fork kill} { test {Module fork kill} {
r fork.create 3 r fork.create 3
after 20 after 250
r fork.kill r fork.kill
after 100
assert {[count_log_message "fork child started"] eq "2"} assert {[count_log_message "fork child started"] eq "2"}
assert {[count_log_message "Received SIGUSR1 in child"] eq "1"} assert {[count_log_message "Received SIGUSR1 in child"] eq "1"}