Merge commit 'df83892760d19b1b7e92d76e72daf4834ad2df6c' into redis_6_merge
Former-commit-id: cde199a7973ad63317b68f581df607321e12bf46
This commit is contained in:
commit
12999c17dd
61
README.md
61
README.md
@ -88,6 +88,11 @@ Compiling is as simple as:
|
|||||||
|
|
||||||
% make
|
% make
|
||||||
|
|
||||||
|
To build with TLS support, you'll need OpenSSL development libraries (e.g.
|
||||||
|
libssl-dev on Debian/Ubuntu) and run:
|
||||||
|
|
||||||
|
% make BUILD_TLS=yes
|
||||||
|
|
||||||
You can enable flash support with:
|
You can enable flash support with:
|
||||||
|
|
||||||
% make MALLOC=memkind
|
% make MALLOC=memkind
|
||||||
@ -95,6 +100,13 @@ You can enable flash support with:
|
|||||||
***Note that the following dependencies may be needed:
|
***Note that the following dependencies may be needed:
|
||||||
% sudo apt-get install autoconf autotools-dev libnuma-dev libtool
|
% sudo apt-get install autoconf autotools-dev libnuma-dev libtool
|
||||||
|
|
||||||
|
If TLS is built, running the tests with TLS enabled (you will need `tcl-tls`
|
||||||
|
installed):
|
||||||
|
|
||||||
|
% ./utils/gen-test-certs.sh
|
||||||
|
% ./runtest --tls
|
||||||
|
|
||||||
|
|
||||||
Fixing build problems with dependencies or cached build options
|
Fixing build problems with dependencies or cached build options
|
||||||
---------
|
---------
|
||||||
|
|
||||||
@ -177,6 +189,14 @@ as options using the command line. Examples:
|
|||||||
All the options in keydb.conf are also supported as options using the command
|
All the options in keydb.conf are also supported as options using the command
|
||||||
line, with exactly the same name.
|
line, with exactly the same name.
|
||||||
|
|
||||||
|
|
||||||
|
Running Redis with TLS:
|
||||||
|
------------------
|
||||||
|
|
||||||
|
Please consult the [TLS.md](TLS.md) file for more information on
|
||||||
|
how to use Redis with TLS.
|
||||||
|
|
||||||
|
|
||||||
Playing with KeyDB
|
Playing with KeyDB
|
||||||
------------------
|
------------------
|
||||||
|
|
||||||
@ -253,49 +273,8 @@ $ docker run -it --rm -v /path-to-dump-binaries:/keydb_bin eqalpha/keydb-build-b
|
|||||||
```
|
```
|
||||||
Please note that you will need libcurl4-openssl-dev in order to run keydb. With flash version you may need libnuma-dev and libtool installed in order to run the binaries. Keep this in mind especially when running in a container. For a copy of all our Dockerfiles, please see them on [docs]( https://docs.keydb.dev/docs/dockerfiles/).
|
Please note that you will need libcurl4-openssl-dev in order to run keydb. With flash version you may need libnuma-dev and libtool installed in order to run the binaries. Keep this in mind especially when running in a container. For a copy of all our Dockerfiles, please see them on [docs]( https://docs.keydb.dev/docs/dockerfiles/).
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
Code contributions
|
Code contributions
|
||||||
-----------------
|
-----------------
|
||||||
=======
|
|
||||||
One of the most important functions inside this file is `replicationFeedSlaves()` that writes commands to the clients representing replica instances connected
|
|
||||||
to our master, so that the replicas can get the writes performed by the clients:
|
|
||||||
this way their data set will remain synchronized with the one in the master.
|
|
||||||
|
|
||||||
This file also implements both the `SYNC` and `PSYNC` commands that are
|
|
||||||
used in order to perform the first synchronization between masters and
|
|
||||||
replicas, or to continue the replication after a disconnection.
|
|
||||||
|
|
||||||
Other C files
|
|
||||||
---
|
|
||||||
|
|
||||||
* `t_hash.c`, `t_list.c`, `t_set.c`, `t_string.c`, `t_zset.c` and `t_stream.c` contains the implementation of the Redis data types. They implement both an API to access a given data type, and the client commands implementations for these data types.
|
|
||||||
* `ae.c` implements the Redis event loop, it's a self contained library which is simple to read and understand.
|
|
||||||
* `sds.c` is the Redis string library, check http://github.com/antirez/sds for more information.
|
|
||||||
* `anet.c` is a library to use POSIX networking in a simpler way compared to the raw interface exposed by the kernel.
|
|
||||||
* `dict.c` is an implementation of a non-blocking hash table which rehashes incrementally.
|
|
||||||
* `scripting.c` implements Lua scripting. It is completely self contained from the rest of the Redis implementation and is simple enough to understand if you are familar with the Lua API.
|
|
||||||
* `cluster.c` implements the Redis Cluster. Probably a good read only after being very familiar with the rest of the Redis code base. If you want to read `cluster.c` make sure to read the [Redis Cluster specification][3].
|
|
||||||
|
|
||||||
[3]: http://redis.io/topics/cluster-spec
|
|
||||||
|
|
||||||
Anatomy of a Redis command
|
|
||||||
---
|
|
||||||
|
|
||||||
All the Redis commands are defined in the following way:
|
|
||||||
|
|
||||||
void foobarCommand(client *c) {
|
|
||||||
printf("%s",c->argv[1]->ptr); /* Do something with the argument. */
|
|
||||||
addReply(c,shared.ok); /* Reply something to the client. */
|
|
||||||
}
|
|
||||||
|
|
||||||
The command is then referenced inside `server.c` in the command table:
|
|
||||||
|
|
||||||
{"foobar",foobarCommand,2,"rtF",0,NULL,0,0,0,0,0},
|
|
||||||
|
|
||||||
In the above example `2` is the number of arguments the command takes,
|
|
||||||
while `"rtF"` are the command flags, as documented in the command table
|
|
||||||
top comment inside `server.c`.
|
|
||||||
>>>>>>> redis/6.0
|
|
||||||
|
|
||||||
Note: by contributing code to the KeyDB project in any form, including sending
|
Note: by contributing code to the KeyDB project in any form, including sending
|
||||||
a pull request via Github, a code fragment or patch via private email or
|
a pull request via Github, a code fragment or patch via private email or
|
||||||
|
45
TLS.md
45
TLS.md
@ -1,8 +1,5 @@
|
|||||||
TLS Support -- Work In Progress
|
TLS Support
|
||||||
===============================
|
===========
|
||||||
|
|
||||||
This is a brief note to capture current thoughts/ideas and track pending action
|
|
||||||
items.
|
|
||||||
|
|
||||||
Getting Started
|
Getting Started
|
||||||
---------------
|
---------------
|
||||||
@ -69,37 +66,23 @@ probably not be so hard. For cluster keys migration it might be more difficult,
|
|||||||
but there are probably other good reasons to improve that part anyway.
|
but there are probably other good reasons to improve that part anyway.
|
||||||
|
|
||||||
To-Do List
|
To-Do List
|
||||||
==========
|
----------
|
||||||
|
|
||||||
Additional TLS Features
|
- [ ] Add session caching support. Check if/how it's handled by clients to
|
||||||
-----------------------
|
assess how useful/important it is.
|
||||||
|
- [ ] redis-benchmark support. The current implementation is a mix of using
|
||||||
|
hiredis for parsing and basic networking (establishing connections), but
|
||||||
|
directly manipulating sockets for most actions. This will need to be cleaned
|
||||||
|
up for proper TLS support. The best approach is probably to migrate to hiredis
|
||||||
|
async mode.
|
||||||
|
- [ ] redis-cli `--slave` and `--rdb` support.
|
||||||
|
|
||||||
1. Add metrics to INFO?
|
Multi-port
|
||||||
2. Add session caching support. Check if/how it's handled by clients to assess
|
----------
|
||||||
how useful/important it is.
|
|
||||||
|
|
||||||
redis-benchmark
|
|
||||||
---------------
|
|
||||||
|
|
||||||
The current implementation is a mix of using hiredis for parsing and basic
|
|
||||||
networking (establishing connections), but directly manipulating sockets for
|
|
||||||
most actions.
|
|
||||||
|
|
||||||
This will need to be cleaned up for proper TLS support. The best approach is
|
|
||||||
probably to migrate to hiredis async mode.
|
|
||||||
|
|
||||||
redis-cli
|
|
||||||
---------
|
|
||||||
|
|
||||||
1. Add support for TLS in --slave and --rdb modes.
|
|
||||||
|
|
||||||
Others
|
|
||||||
------
|
|
||||||
|
|
||||||
Consider the implications of allowing TLS to be configured on a separate port,
|
Consider the implications of allowing TLS to be configured on a separate port,
|
||||||
making Redis listening on multiple ports.
|
making Redis listening on multiple ports:
|
||||||
|
|
||||||
This impacts many things, like
|
|
||||||
1. Startup banner port notification
|
1. Startup banner port notification
|
||||||
2. Proctitle
|
2. Proctitle
|
||||||
3. How slaves announce themselves
|
3. How slaves announce themselves
|
||||||
|
10
deps/lua/src/lua_struct.c
vendored
10
deps/lua/src/lua_struct.c
vendored
@ -89,12 +89,14 @@ typedef struct Header {
|
|||||||
} Header;
|
} Header;
|
||||||
|
|
||||||
|
|
||||||
static int getnum (const char **fmt, int df) {
|
static int getnum (lua_State *L, const char **fmt, int df) {
|
||||||
if (!isdigit(**fmt)) /* no number? */
|
if (!isdigit(**fmt)) /* no number? */
|
||||||
return df; /* return default value */
|
return df; /* return default value */
|
||||||
else {
|
else {
|
||||||
int a = 0;
|
int a = 0;
|
||||||
do {
|
do {
|
||||||
|
if (a > (INT_MAX / 10) || a * 10 > (INT_MAX - (**fmt - '0')))
|
||||||
|
luaL_error(L, "integral size overflow");
|
||||||
a = a*10 + *((*fmt)++) - '0';
|
a = a*10 + *((*fmt)++) - '0';
|
||||||
} while (isdigit(**fmt));
|
} while (isdigit(**fmt));
|
||||||
return a;
|
return a;
|
||||||
@ -115,9 +117,9 @@ static size_t optsize (lua_State *L, char opt, const char **fmt) {
|
|||||||
case 'f': return sizeof(float);
|
case 'f': return sizeof(float);
|
||||||
case 'd': return sizeof(double);
|
case 'd': return sizeof(double);
|
||||||
case 'x': return 1;
|
case 'x': return 1;
|
||||||
case 'c': return getnum(fmt, 1);
|
case 'c': return getnum(L, fmt, 1);
|
||||||
case 'i': case 'I': {
|
case 'i': case 'I': {
|
||||||
int sz = getnum(fmt, sizeof(int));
|
int sz = getnum(L, fmt, sizeof(int));
|
||||||
if (sz > MAXINTSIZE)
|
if (sz > MAXINTSIZE)
|
||||||
luaL_error(L, "integral size %d is larger than limit of %d",
|
luaL_error(L, "integral size %d is larger than limit of %d",
|
||||||
sz, MAXINTSIZE);
|
sz, MAXINTSIZE);
|
||||||
@ -150,7 +152,7 @@ static void controloptions (lua_State *L, int opt, const char **fmt,
|
|||||||
case '>': h->endian = BIG; return;
|
case '>': h->endian = BIG; return;
|
||||||
case '<': h->endian = LITTLE; return;
|
case '<': h->endian = LITTLE; return;
|
||||||
case '!': {
|
case '!': {
|
||||||
int a = getnum(fmt, MAXALIGN);
|
int a = getnum(L, fmt, MAXALIGN);
|
||||||
if (!isp2(a))
|
if (!isp2(a))
|
||||||
luaL_error(L, "alignment %d is not a power of 2", a);
|
luaL_error(L, "alignment %d is not a power of 2", a);
|
||||||
h->align = a;
|
h->align = a;
|
||||||
|
@ -1361,7 +1361,11 @@ latency-monitor-threshold 0
|
|||||||
# z Sorted set commands
|
# z Sorted set commands
|
||||||
# x Expired events (events generated every time a key expires)
|
# x Expired events (events generated every time a key expires)
|
||||||
# e Evicted events (events generated when a key is evicted for maxmemory)
|
# e Evicted events (events generated when a key is evicted for maxmemory)
|
||||||
# A Alias for g$lshzxe, so that the "AKE" string means all the events.
|
# t Stream commands
|
||||||
|
# m Key-miss events (Note: It is not included in the 'A' class)
|
||||||
|
# A Alias for g$lshzxet, so that the "AKE" string means all the events
|
||||||
|
# (Except key-miss events which are excluded from 'A' due to their
|
||||||
|
# unique nature).
|
||||||
#
|
#
|
||||||
# The "notify-keyspace-events" takes as argument a string that is composed
|
# The "notify-keyspace-events" takes as argument a string that is composed
|
||||||
# of zero or multiple characters. The empty string means that notifications
|
# of zero or multiple characters. The empty string means that notifications
|
||||||
|
@ -358,6 +358,10 @@ void loadServerConfigFromString(char *config) {
|
|||||||
if (addresses > CONFIG_BINDADDR_MAX) {
|
if (addresses > CONFIG_BINDADDR_MAX) {
|
||||||
err = "Too many bind addresses specified"; goto loaderr;
|
err = "Too many bind addresses specified"; goto loaderr;
|
||||||
}
|
}
|
||||||
|
/* Free old bind addresses */
|
||||||
|
for (j = 0; j < g_pserver->bindaddr_count; j++) {
|
||||||
|
zfree(g_pserver->bindaddr[j]);
|
||||||
|
}
|
||||||
for (j = 0; j < addresses; j++)
|
for (j = 0; j < addresses; j++)
|
||||||
g_pserver->bindaddr[j] = zstrdup(argv[j+1]);
|
g_pserver->bindaddr[j] = zstrdup(argv[j+1]);
|
||||||
g_pserver->bindaddr_count = addresses;
|
g_pserver->bindaddr_count = addresses;
|
||||||
@ -2083,8 +2087,9 @@ static int updateMaxmemory(long long val, long long prev, const char **err) {
|
|||||||
UNUSED(prev);
|
UNUSED(prev);
|
||||||
UNUSED(err);
|
UNUSED(err);
|
||||||
if (val) {
|
if (val) {
|
||||||
if ((unsigned long long)val < zmalloc_used_memory()) {
|
size_t used = zmalloc_used_memory()-freeMemoryGetNotCountedMemory();
|
||||||
serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.");
|
if ((unsigned long long)val < used) {
|
||||||
|
serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET (%llu) is smaller than the current memory usage (%zu). This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.", g_pserver->maxmemory, used);
|
||||||
}
|
}
|
||||||
freeMemoryIfNeededAndSafe();
|
freeMemoryIfNeededAndSafe();
|
||||||
}
|
}
|
||||||
|
@ -728,9 +728,9 @@ void RM_KeyAtPos(RedisModuleCtx *ctx, int pos) {
|
|||||||
* flags into the command flags used by the Redis core.
|
* flags into the command flags used by the Redis core.
|
||||||
*
|
*
|
||||||
* It returns the set of flags, or -1 if unknown flags are found. */
|
* It returns the set of flags, or -1 if unknown flags are found. */
|
||||||
int commandFlagsFromString(char *s) {
|
int64_t commandFlagsFromString(char *s) {
|
||||||
int count, j;
|
int count, j;
|
||||||
int flags = 0;
|
int64_t flags = 0;
|
||||||
sds *tokens = sdssplitlen(s,strlen(s)," ",1,&count);
|
sds *tokens = sdssplitlen(s,strlen(s)," ",1,&count);
|
||||||
for (j = 0; j < count; j++) {
|
for (j = 0; j < count; j++) {
|
||||||
char *t = tokens[j];
|
char *t = tokens[j];
|
||||||
@ -744,6 +744,7 @@ int commandFlagsFromString(char *s) {
|
|||||||
else if (!strcasecmp(t,"random")) flags |= CMD_RANDOM;
|
else if (!strcasecmp(t,"random")) flags |= CMD_RANDOM;
|
||||||
else if (!strcasecmp(t,"allow-stale")) flags |= CMD_STALE;
|
else if (!strcasecmp(t,"allow-stale")) flags |= CMD_STALE;
|
||||||
else if (!strcasecmp(t,"no-monitor")) flags |= CMD_SKIP_MONITOR;
|
else if (!strcasecmp(t,"no-monitor")) flags |= CMD_SKIP_MONITOR;
|
||||||
|
else if (!strcasecmp(t,"no-slowlog")) flags |= CMD_SKIP_SLOWLOG;
|
||||||
else if (!strcasecmp(t,"fast")) flags |= CMD_FAST;
|
else if (!strcasecmp(t,"fast")) flags |= CMD_FAST;
|
||||||
else if (!strcasecmp(t,"no-auth")) flags |= CMD_NO_AUTH;
|
else if (!strcasecmp(t,"no-auth")) flags |= CMD_NO_AUTH;
|
||||||
else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS;
|
else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS;
|
||||||
@ -795,6 +796,8 @@ int commandFlagsFromString(char *s) {
|
|||||||
* this means.
|
* this means.
|
||||||
* * **"no-monitor"**: Don't propagate the command on monitor. Use this if
|
* * **"no-monitor"**: Don't propagate the command on monitor. Use this if
|
||||||
* the command has sensible data among the arguments.
|
* the command has sensible data among the arguments.
|
||||||
|
* * **"no-slowlog"**: Don't log this command in the slowlog. Use this if
|
||||||
|
* the command has sensible data among the arguments.
|
||||||
* * **"fast"**: The command time complexity is not greater
|
* * **"fast"**: The command time complexity is not greater
|
||||||
* than O(log(N)) where N is the size of the collection or
|
* than O(log(N)) where N is the size of the collection or
|
||||||
* anything else representing the normal scalability
|
* anything else representing the normal scalability
|
||||||
@ -812,7 +815,7 @@ int commandFlagsFromString(char *s) {
|
|||||||
* to authenticate a client.
|
* to authenticate a client.
|
||||||
*/
|
*/
|
||||||
int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) {
|
int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) {
|
||||||
int flags = strflags ? commandFlagsFromString((char*)strflags) : 0;
|
int64_t flags = strflags ? commandFlagsFromString((char*)strflags) : 0;
|
||||||
if (flags == -1) return REDISMODULE_ERR;
|
if (flags == -1) return REDISMODULE_ERR;
|
||||||
if ((flags & CMD_MODULE_NO_CLUSTER) && g_pserver->cluster_enabled)
|
if ((flags & CMD_MODULE_NO_CLUSTER) && g_pserver->cluster_enabled)
|
||||||
return REDISMODULE_ERR;
|
return REDISMODULE_ERR;
|
||||||
@ -904,7 +907,8 @@ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
|
|||||||
ctx->module->options = options;
|
ctx->module->options = options;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Signals that the key is modified from user's perspective (i.e. invalidate WATCH). */
|
/* Signals that the key is modified from user's perspective (i.e. invalidate WATCH
|
||||||
|
* and client side caching). */
|
||||||
int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) {
|
int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) {
|
||||||
signalModifiedKey(ctx->client->db,keyname);
|
signalModifiedKey(ctx->client->db,keyname);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
@ -3725,14 +3729,15 @@ void moduleRDBLoadError(RedisModuleIO *io) {
|
|||||||
io->error = 1;
|
io->error = 1;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
serverLog(LL_WARNING,
|
serverPanic(
|
||||||
"Error loading data from RDB (short read or EOF). "
|
"Error loading data from RDB (short read or EOF). "
|
||||||
"Read performed by module '%s' about type '%s' "
|
"Read performed by module '%s' about type '%s' "
|
||||||
"after reading '%llu' bytes of a value.",
|
"after reading '%llu' bytes of a value "
|
||||||
|
"for key named: '%s'.",
|
||||||
io->type->module->name,
|
io->type->module->name,
|
||||||
io->type->name,
|
io->type->name,
|
||||||
(unsigned long long)io->bytes);
|
(unsigned long long)io->bytes,
|
||||||
exit(1);
|
io->key? szFromObj(io->key): "(null)");
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Returns 0 if there's at least one registered data type that did not declare
|
/* Returns 0 if there's at least one registered data type that did not declare
|
||||||
@ -4934,7 +4939,8 @@ int moduleGILAcquiredByModule(void) {
|
|||||||
* - REDISMODULE_NOTIFY_EXPIRED: Expiration events
|
* - REDISMODULE_NOTIFY_EXPIRED: Expiration events
|
||||||
* - REDISMODULE_NOTIFY_EVICTED: Eviction events
|
* - REDISMODULE_NOTIFY_EVICTED: Eviction events
|
||||||
* - REDISMODULE_NOTIFY_STREAM: Stream events
|
* - REDISMODULE_NOTIFY_STREAM: Stream events
|
||||||
* - REDISMODULE_NOTIFY_ALL: All events
|
* - REDISMODULE_NOTIFY_KEYMISS: Key-miss events
|
||||||
|
* - REDISMODULE_NOTIFY_ALL: All events (Excluding REDISMODULE_NOTIFY_KEYMISS)
|
||||||
*
|
*
|
||||||
* We do not distinguish between key events and keyspace events, and it is up
|
* We do not distinguish between key events and keyspace events, and it is up
|
||||||
* to the module to filter the actions taken based on the key.
|
* to the module to filter the actions taken based on the key.
|
||||||
|
@ -82,10 +82,10 @@ sds keyspaceEventsFlagsToString(int flags) {
|
|||||||
if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1);
|
if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1);
|
||||||
if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1);
|
if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1);
|
||||||
if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1);
|
if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1);
|
||||||
if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1);
|
|
||||||
}
|
}
|
||||||
if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1);
|
if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1);
|
||||||
if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1);
|
if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1);
|
||||||
|
if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1766,6 +1766,7 @@ int raxRandomWalk(raxIterator *it, size_t steps) {
|
|||||||
if (n->iskey) steps--;
|
if (n->iskey) steps--;
|
||||||
}
|
}
|
||||||
it->node = n;
|
it->node = n;
|
||||||
|
it->data = raxGetData(it->node);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1975,7 +1975,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) {
|
|||||||
}
|
}
|
||||||
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
|
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
|
||||||
uint64_t moduleid = rdbLoadLen(rdb,NULL);
|
uint64_t moduleid = rdbLoadLen(rdb,NULL);
|
||||||
if (rioGetReadError(rdb)) return NULL;
|
if (rioGetReadError(rdb)) {
|
||||||
|
rdbReportReadError("Short read module id");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
|
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
|
||||||
char name[10];
|
char name[10];
|
||||||
|
|
||||||
|
@ -1407,15 +1407,15 @@ static int parseOptions(int argc, char **argv) {
|
|||||||
#ifdef USE_OPENSSL
|
#ifdef USE_OPENSSL
|
||||||
} else if (!strcmp(argv[i],"--tls")) {
|
} else if (!strcmp(argv[i],"--tls")) {
|
||||||
config.tls = 1;
|
config.tls = 1;
|
||||||
} else if (!strcmp(argv[i],"--sni")) {
|
} else if (!strcmp(argv[i],"--sni") && !lastarg) {
|
||||||
config.sni = argv[++i];
|
config.sni = argv[++i];
|
||||||
} else if (!strcmp(argv[i],"--cacertdir")) {
|
} else if (!strcmp(argv[i],"--cacertdir") && !lastarg) {
|
||||||
config.cacertdir = argv[++i];
|
config.cacertdir = argv[++i];
|
||||||
} else if (!strcmp(argv[i],"--cacert")) {
|
} else if (!strcmp(argv[i],"--cacert") && !lastarg) {
|
||||||
config.cacert = argv[++i];
|
config.cacert = argv[++i];
|
||||||
} else if (!strcmp(argv[i],"--cert")) {
|
} else if (!strcmp(argv[i],"--cert") && !lastarg) {
|
||||||
config.cert = argv[++i];
|
config.cert = argv[++i];
|
||||||
} else if (!strcmp(argv[i],"--key")) {
|
} else if (!strcmp(argv[i],"--key") && !lastarg) {
|
||||||
config.key = argv[++i];
|
config.key = argv[++i];
|
||||||
#endif
|
#endif
|
||||||
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
|
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
|
||||||
@ -1500,8 +1500,8 @@ static void usage(void) {
|
|||||||
" You can also use the " REDIS_CLI_AUTH_ENV " environment\n"
|
" You can also use the " REDIS_CLI_AUTH_ENV " environment\n"
|
||||||
" variable to pass this password more safely\n"
|
" variable to pass this password more safely\n"
|
||||||
" (if both are used, this argument takes predecence).\n"
|
" (if both are used, this argument takes predecence).\n"
|
||||||
" -user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n"
|
" --user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n"
|
||||||
" -pass <password> Alias of -a for consistency with the new --user option.\n"
|
" --pass <password> Alias of -a for consistency with the new --user option.\n"
|
||||||
" -u <uri> Server URI.\n"
|
" -u <uri> Server URI.\n"
|
||||||
" -r <repeat> Execute specified command N times.\n"
|
" -r <repeat> Execute specified command N times.\n"
|
||||||
" -i <interval> When -r is used, waits <interval> seconds per command.\n"
|
" -i <interval> When -r is used, waits <interval> seconds per command.\n"
|
||||||
@ -1513,12 +1513,13 @@ static void usage(void) {
|
|||||||
" -c Enable cluster mode (follow -ASK and -MOVED redirections).\n"
|
" -c Enable cluster mode (follow -ASK and -MOVED redirections).\n"
|
||||||
#ifdef USE_OPENSSL
|
#ifdef USE_OPENSSL
|
||||||
" --tls Establish a secure TLS connection.\n"
|
" --tls Establish a secure TLS connection.\n"
|
||||||
" --cacert CA Certificate file to verify with.\n"
|
" --sni <host> Server name indication for TLS.\n"
|
||||||
" --cacertdir Directory where trusted CA certificates are stored.\n"
|
" --cacert <file> CA Certificate file to verify with.\n"
|
||||||
|
" --cacertdir <dir> Directory where trusted CA certificates are stored.\n"
|
||||||
" If neither cacert nor cacertdir are specified, the default\n"
|
" If neither cacert nor cacertdir are specified, the default\n"
|
||||||
" system-wide trusted root certs configuration will apply.\n"
|
" system-wide trusted root certs configuration will apply.\n"
|
||||||
" --cert Client certificate to authenticate with.\n"
|
" --cert <file> Client certificate to authenticate with.\n"
|
||||||
" --key Private key file to authenticate with.\n"
|
" --key <file> Private key file to authenticate with.\n"
|
||||||
#endif
|
#endif
|
||||||
" --raw Use raw formatting for replies (default when STDOUT is\n"
|
" --raw Use raw formatting for replies (default when STDOUT is\n"
|
||||||
" not a tty).\n"
|
" not a tty).\n"
|
||||||
|
@ -131,8 +131,8 @@ extern "C" {
|
|||||||
#define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */
|
#define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */
|
||||||
#define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */
|
#define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */
|
||||||
#define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */
|
#define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */
|
||||||
#define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m */
|
#define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from REDISMODULE_NOTIFY_ALL on purpose) */
|
||||||
#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM | REDISMODULE_NOTIFY_KEY_MISS) /* A */
|
#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */
|
||||||
|
|
||||||
|
|
||||||
/* A special pointer that we can use between the core and the module to signal
|
/* A special pointer that we can use between the core and the module to signal
|
||||||
|
@ -2927,6 +2927,10 @@ void replicationUnsetMaster(redisMaster *mi) {
|
|||||||
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
|
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
|
||||||
REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
|
REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
|
||||||
NULL);
|
NULL);
|
||||||
|
|
||||||
|
/* Restart the AOF subsystem in case we shut it down during a sync when
|
||||||
|
* we were still a slave. */
|
||||||
|
if (g_pserver->aof_enabled && g_pserver->aof_state == AOF_OFF) restartAOFAfterSYNC();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called when the replica lose the connection with the
|
/* This function is called when the replica lose the connection with the
|
||||||
@ -2969,9 +2973,6 @@ void replicaofCommand(client *c) {
|
|||||||
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
|
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
|
||||||
client);
|
client);
|
||||||
sdsfree(client);
|
sdsfree(client);
|
||||||
/* Restart the AOF subsystem in case we shut it down during a sync when
|
|
||||||
* we were still a slave. */
|
|
||||||
if (g_pserver->aof_enabled && g_pserver->aof_state == AOF_OFF) restartAOFAfterSYNC();
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
long port;
|
long port;
|
||||||
|
@ -600,7 +600,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"select",selectCommand,2,
|
{"select",selectCommand,2,
|
||||||
"ok-loading fast @keyspace",
|
"ok-loading fast ok-stale @keyspace",
|
||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"swapdb",swapdbCommand,3,
|
{"swapdb",swapdbCommand,3,
|
||||||
@ -689,7 +689,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"lastsave",lastsaveCommand,1,
|
{"lastsave",lastsaveCommand,1,
|
||||||
"read-only random fast @admin @dangerous",
|
"read-only random fast ok-loading ok-stale @admin @dangerous",
|
||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"type",typeCommand,2,
|
{"type",typeCommand,2,
|
||||||
@ -737,7 +737,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"monitor",monitorCommand,1,
|
{"monitor",monitorCommand,1,
|
||||||
"admin no-script",
|
"admin no-script ok-loading ok-stale",
|
||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"ttl",ttlCommand,-2,
|
{"ttl",ttlCommand,-2,
|
||||||
@ -769,7 +769,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"debug",debugCommand,-2,
|
{"debug",debugCommand,-2,
|
||||||
"admin no-script",
|
"admin no-script ok-loading ok-stale",
|
||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"config",configCommand,-2,
|
{"config",configCommand,-2,
|
||||||
@ -849,11 +849,11 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
0,memoryGetKeys,0,0,0,0,0,0},
|
0,memoryGetKeys,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"client",clientCommand,-2,
|
{"client",clientCommand,-2,
|
||||||
"admin no-script random @connection",
|
"admin no-script random ok-loading ok-stale @connection",
|
||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"hello",helloCommand,-2,
|
{"hello",helloCommand,-2,
|
||||||
"no-auth no-script fast no-monitor no-slowlog @connection",
|
"no-auth no-script fast no-monitor ok-loading ok-stale no-slowlog @connection",
|
||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
/* EVAL can modify the dataset, however it is not flagged as a write
|
/* EVAL can modify the dataset, however it is not flagged as a write
|
||||||
@ -867,7 +867,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
0,evalGetKeys,0,0,0,0,0,0},
|
0,evalGetKeys,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"slowlog",slowlogCommand,-2,
|
{"slowlog",slowlogCommand,-2,
|
||||||
"admin random",
|
"admin random ok-loading ok-stale",
|
||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"script",scriptCommand,-2,
|
{"script",scriptCommand,-2,
|
||||||
@ -875,7 +875,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"time",timeCommand,1,
|
{"time",timeCommand,1,
|
||||||
"read-only random fast",
|
"read-only random fast ok-loading ok-stale",
|
||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"bitop",bitopCommand,-4,
|
{"bitop",bitopCommand,-4,
|
||||||
@ -4477,7 +4477,7 @@ sds genRedisInfoString(const char *section) {
|
|||||||
"active_defrag_misses:%lld\r\n"
|
"active_defrag_misses:%lld\r\n"
|
||||||
"active_defrag_key_hits:%lld\r\n"
|
"active_defrag_key_hits:%lld\r\n"
|
||||||
"active_defrag_key_misses:%lld\r\n"
|
"active_defrag_key_misses:%lld\r\n"
|
||||||
"tracking_used_slots:%llu\r\n",
|
"tracking_total_items:%llu\r\n",
|
||||||
g_pserver->stat_numconnections,
|
g_pserver->stat_numconnections,
|
||||||
g_pserver->stat_numcommands,
|
g_pserver->stat_numcommands,
|
||||||
getInstantaneousMetric(STATS_METRIC_COMMAND),
|
getInstantaneousMetric(STATS_METRIC_COMMAND),
|
||||||
@ -4505,7 +4505,7 @@ sds genRedisInfoString(const char *section) {
|
|||||||
g_pserver->stat_active_defrag_misses,
|
g_pserver->stat_active_defrag_misses,
|
||||||
g_pserver->stat_active_defrag_key_hits,
|
g_pserver->stat_active_defrag_key_hits,
|
||||||
g_pserver->stat_active_defrag_key_misses,
|
g_pserver->stat_active_defrag_key_misses,
|
||||||
trackingGetUsedSlots());
|
(unsigned long long) trackingGetTotalItems());
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Replication */
|
/* Replication */
|
||||||
|
@ -581,8 +581,8 @@ public:
|
|||||||
#define NOTIFY_EXPIRED (1<<8) /* x */
|
#define NOTIFY_EXPIRED (1<<8) /* x */
|
||||||
#define NOTIFY_EVICTED (1<<9) /* e */
|
#define NOTIFY_EVICTED (1<<9) /* e */
|
||||||
#define NOTIFY_STREAM (1<<10) /* t */
|
#define NOTIFY_STREAM (1<<10) /* t */
|
||||||
#define NOTIFY_KEY_MISS (1<<11) /* m */
|
#define NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from NOTIFY_ALL on purpose) */
|
||||||
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_KEY_MISS) /* A flag */
|
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */
|
||||||
|
|
||||||
/* Get the first bind addr or NULL */
|
/* Get the first bind addr or NULL */
|
||||||
#define NET_FIRST_BIND_ADDR (g_pserver->bindaddr_count ? g_pserver->bindaddr[0] : NULL)
|
#define NET_FIRST_BIND_ADDR (g_pserver->bindaddr_count ? g_pserver->bindaddr[0] : NULL)
|
||||||
@ -2230,7 +2230,7 @@ void trackingRememberKeys(client *c);
|
|||||||
void trackingInvalidateKey(robj *keyobj);
|
void trackingInvalidateKey(robj *keyobj);
|
||||||
void trackingInvalidateKeysOnFlush(int dbid);
|
void trackingInvalidateKeysOnFlush(int dbid);
|
||||||
void trackingLimitUsedSlots(void);
|
void trackingLimitUsedSlots(void);
|
||||||
unsigned long long trackingGetUsedSlots(void);
|
uint64_t trackingGetTotalItems(void);
|
||||||
|
|
||||||
/* List data type */
|
/* List data type */
|
||||||
void listTypeTryConversion(robj *subject, robj *value);
|
void listTypeTryConversion(robj *subject, robj *value);
|
||||||
|
200
src/tracking.cpp
200
src/tracking.cpp
@ -30,37 +30,22 @@
|
|||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
|
||||||
/* The tracking table is constituted by 2^24 radix trees (each tree, and the
|
/* The tracking table is constituted by a radix tree of keys, each pointing
|
||||||
* table itself, are allocated in a lazy way only when needed) tracking
|
* to a radix tree of client IDs, used to track the clients that may have
|
||||||
* clients that may have certain keys in their local, client side, cache.
|
* certain keys in their local, client side, cache.
|
||||||
*
|
|
||||||
* Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash
|
|
||||||
* slots, however here the function we use is crc64, taking the least
|
|
||||||
* significant 24 bits of the output.
|
|
||||||
*
|
*
|
||||||
* When a client enables tracking with "CLIENT TRACKING on", each key served to
|
* When a client enables tracking with "CLIENT TRACKING on", each key served to
|
||||||
* the client is hashed to one of such slots, and Redis will remember what
|
* the client is remembered in the table mapping the keys to the client IDs.
|
||||||
* client may have keys about such slot. Later, when a key in a given slot is
|
* Later, when a key is modified, all the clients that may have local copy
|
||||||
* modified, all the clients that may have local copies of keys in that slot
|
* of such key will receive an invalidation message.
|
||||||
* will receive an invalidation message. There is no distinction of database
|
|
||||||
* number: a single table is used.
|
|
||||||
*
|
*
|
||||||
* Clients will normally take frequently requested objects in memory, removing
|
* Clients will normally take frequently requested objects in memory, removing
|
||||||
* them when invalidation messages are received. A strategy clients may use is
|
* them when invalidation messages are received. */
|
||||||
* to just cache objects in a dictionary, associating to each cached object
|
rax *TrackingTable = NULL;
|
||||||
* some incremental epoch, or just a timestamp. When invalidation messages are
|
uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
|
||||||
* received clients may store, in a different table, the timestamp (or epoch)
|
the whole tracking table. This givesn
|
||||||
* of the invalidation of such given slot: later when accessing objects, the
|
an hint about the total memory we
|
||||||
* eviction of stale objects may be performed in a lazy way by checking if the
|
are using server side for CSC. */
|
||||||
* cached object timestamp is older than the invalidation timestamp for such
|
|
||||||
* objects.
|
|
||||||
*
|
|
||||||
* The output of the 24 bit hash function is very large (more than 16 million
|
|
||||||
* possible slots), so clients that may want to use less resources may only
|
|
||||||
* use the most significant bits instead of the full 24 bits. */
|
|
||||||
#define TRACKING_TABLE_SIZE (1<<24)
|
|
||||||
rax **TrackingTable = NULL;
|
|
||||||
unsigned long TrackingTableUsedSlots = 0;
|
|
||||||
robj *TrackingChannelName;
|
robj *TrackingChannelName;
|
||||||
|
|
||||||
/* Remove the tracking state from the client 'c'. Note that there is not much
|
/* Remove the tracking state from the client 'c'. Note that there is not much
|
||||||
@ -90,7 +75,7 @@ void enableTracking(client *c, uint64_t redirect_to) {
|
|||||||
c->client_tracking_redirection = redirect_to;
|
c->client_tracking_redirection = redirect_to;
|
||||||
g_pserver->tracking_clients++;
|
g_pserver->tracking_clients++;
|
||||||
if (TrackingTable == NULL) {
|
if (TrackingTable == NULL) {
|
||||||
TrackingTable = (rax**)zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE, MALLOC_LOCAL);
|
TrackingTable = raxNew();
|
||||||
TrackingChannelName = createStringObject("__redis__:invalidate",20);
|
TrackingChannelName = createStringObject("__redis__:invalidate",20);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -107,20 +92,21 @@ void trackingRememberKeys(client *c) {
|
|||||||
|
|
||||||
for(int j = 0; j < numkeys; j++) {
|
for(int j = 0; j < numkeys; j++) {
|
||||||
int idx = keys[j];
|
int idx = keys[j];
|
||||||
sds sdskey = (sds)ptrFromObj(c->argv[idx]);
|
sds sdskey = szFromObj(c->argv[idx]);
|
||||||
uint64_t hash = crc64(0,
|
rax *ids = (rax*)raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
|
||||||
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
|
if (ids == raxNotFound) {
|
||||||
if (TrackingTable[hash] == NULL) {
|
ids = raxNew();
|
||||||
TrackingTable[hash] = raxNew();
|
int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey,
|
||||||
TrackingTableUsedSlots++;
|
sdslen(sdskey),ids, NULL);
|
||||||
|
serverAssert(inserted == 1);
|
||||||
}
|
}
|
||||||
raxTryInsert(TrackingTable[hash],
|
if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL))
|
||||||
(unsigned char*)&c->id,sizeof(c->id),NULL,NULL);
|
TrackingTableTotalItems++;
|
||||||
}
|
}
|
||||||
getKeysFreeResult(keys);
|
getKeysFreeResult(keys);
|
||||||
}
|
}
|
||||||
|
|
||||||
void sendTrackingMessage(client *c, long long hash) {
|
void sendTrackingMessage(client *c, const char *keyname, size_t keylen) {
|
||||||
int using_redirection = 0;
|
int using_redirection = 0;
|
||||||
if (c->client_tracking_redirection) {
|
if (c->client_tracking_redirection) {
|
||||||
client *redir = lookupClientByID(c->client_tracking_redirection);
|
client *redir = lookupClientByID(c->client_tracking_redirection);
|
||||||
@ -146,49 +132,44 @@ void sendTrackingMessage(client *c, long long hash) {
|
|||||||
if (c->resp > 2) {
|
if (c->resp > 2) {
|
||||||
addReplyPushLen(c,2);
|
addReplyPushLen(c,2);
|
||||||
addReplyBulkCBuffer(c,"invalidate",10);
|
addReplyBulkCBuffer(c,"invalidate",10);
|
||||||
addReplyLongLong(c,hash);
|
addReplyBulkCBuffer(c,keyname,keylen);
|
||||||
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
|
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
|
||||||
robj *msg = createStringObjectFromLongLong(hash);
|
/* We use a static object to speedup things, however we assume
|
||||||
addReplyPubsubMessage(c,TrackingChannelName,msg);
|
* that addReplyPubsubMessage() will not take a reference. */
|
||||||
decrRefCount(msg);
|
robj keyobj;
|
||||||
|
initStaticStringObject(keyobj,(char*)keyname);
|
||||||
|
addReplyPubsubMessage(c,TrackingChannelName,&keyobj);
|
||||||
|
serverAssert(keyobj.getrefcount(std::memory_order_relaxed) == 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Invalidates a caching slot: this is actually the low level implementation
|
|
||||||
* of the API that Redis calls externally, that is trackingInvalidateKey(). */
|
|
||||||
void trackingInvalidateSlot(uint64_t slot) {
|
|
||||||
if (TrackingTable == NULL || TrackingTable[slot] == NULL) return;
|
|
||||||
|
|
||||||
raxIterator ri;
|
|
||||||
raxStart(&ri,TrackingTable[slot]);
|
|
||||||
raxSeek(&ri,"^",NULL,0);
|
|
||||||
while(raxNext(&ri)) {
|
|
||||||
uint64_t id;
|
|
||||||
memcpy(&id,ri.key,sizeof(id));
|
|
||||||
client *c = lookupClientByID(id);
|
|
||||||
if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
|
|
||||||
sendTrackingMessage(c,slot);
|
|
||||||
}
|
|
||||||
raxStop(&ri);
|
|
||||||
|
|
||||||
/* Free the tracking table: we'll create the radix tree and populate it
|
|
||||||
* again if more keys will be modified in this caching slot. */
|
|
||||||
raxFree(TrackingTable[slot]);
|
|
||||||
TrackingTable[slot] = NULL;
|
|
||||||
TrackingTableUsedSlots--;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* This function is called from signalModifiedKey() or other places in Redis
|
/* This function is called from signalModifiedKey() or other places in Redis
|
||||||
* when a key changes value. In the context of keys tracking, our task here is
|
* when a key changes value. In the context of keys tracking, our task here is
|
||||||
* to send a notification to every client that may have keys about such caching
|
* to send a notification to every client that may have keys about such caching
|
||||||
* slot. */
|
* slot. */
|
||||||
void trackingInvalidateKey(robj *keyobj) {
|
void trackingInvalidateKey(robj *keyobj) {
|
||||||
if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return;
|
if (TrackingTable == NULL) return;
|
||||||
|
sds sdskey = szFromObj(keyobj);
|
||||||
|
rax *ids = (rax*)raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
|
||||||
|
if (ids == raxNotFound) return;;
|
||||||
|
|
||||||
sds sdskey = (sds)ptrFromObj(keyobj);
|
raxIterator ri;
|
||||||
uint64_t hash = crc64(0,
|
raxStart(&ri,ids);
|
||||||
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
|
raxSeek(&ri,"^",NULL,0);
|
||||||
trackingInvalidateSlot(hash);
|
while(raxNext(&ri)) {
|
||||||
|
uint64_t id;
|
||||||
|
memcpy(&id,ri.key,sizeof(id));
|
||||||
|
client *c = lookupClientByID(id);
|
||||||
|
if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
|
||||||
|
sendTrackingMessage(c,sdskey,sdslen(sdskey));
|
||||||
|
}
|
||||||
|
raxStop(&ri);
|
||||||
|
|
||||||
|
/* Free the tracking table: we'll create the radix tree and populate it
|
||||||
|
* again if more keys will be modified in this caching slot. */
|
||||||
|
TrackingTableTotalItems -= raxSize(ids);
|
||||||
|
raxFree(ids);
|
||||||
|
raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called when one or all the Redis databases are flushed
|
/* This function is called when one or all the Redis databases are flushed
|
||||||
@ -205,6 +186,10 @@ void trackingInvalidateKey(robj *keyobj) {
|
|||||||
* we just send the invalidation message to all the clients, but don't
|
* we just send the invalidation message to all the clients, but don't
|
||||||
* flush the table: it will slowly get garbage collected as more keys
|
* flush the table: it will slowly get garbage collected as more keys
|
||||||
* are modified in the used caching slots. */
|
* are modified in the used caching slots. */
|
||||||
|
void freeTrackingRadixTree(void *rt) {
|
||||||
|
raxFree((rax*)rt);
|
||||||
|
}
|
||||||
|
|
||||||
void trackingInvalidateKeysOnFlush(int dbid) {
|
void trackingInvalidateKeysOnFlush(int dbid) {
|
||||||
if (g_pserver->tracking_clients) {
|
if (g_pserver->tracking_clients) {
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
@ -213,84 +198,69 @@ void trackingInvalidateKeysOnFlush(int dbid) {
|
|||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
client *c = (client*)listNodeValue(ln);
|
client *c = (client*)listNodeValue(ln);
|
||||||
if (c->flags & CLIENT_TRACKING) {
|
if (c->flags & CLIENT_TRACKING) {
|
||||||
sendTrackingMessage(c,-1);
|
sendTrackingMessage(c,"",1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
|
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
|
||||||
if (dbid == -1 && TrackingTable) {
|
if (dbid == -1 && TrackingTable) {
|
||||||
for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) {
|
raxFreeWithCallback(TrackingTable,freeTrackingRadixTree);
|
||||||
if (TrackingTable[j] != NULL) {
|
TrackingTableTotalItems = 0;
|
||||||
raxFree(TrackingTable[j]);
|
|
||||||
TrackingTable[j] = NULL;
|
|
||||||
TrackingTableUsedSlots--;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* If there are no clients with tracking enabled, we can even
|
|
||||||
* reclaim the memory used by the table itself. The code assumes
|
|
||||||
* the table is allocated only if there is at least one client alive
|
|
||||||
* with tracking enabled. */
|
|
||||||
if (g_pserver->tracking_clients == 0) {
|
|
||||||
zfree(TrackingTable);
|
|
||||||
TrackingTable = NULL;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Tracking forces Redis to remember information about which client may have
|
/* Tracking forces Redis to remember information about which client may have
|
||||||
* keys about certian caching slots. In workloads where there are a lot of
|
* certain keys. In workloads where there are a lot of reads, but keys are
|
||||||
* reads, but keys are hardly modified, the amount of information we have
|
* hardly modified, the amount of information we have to remember server side
|
||||||
* to remember server side could be a lot: for each 16 millions of caching
|
* could be a lot, with the number of keys being totally not bound.
|
||||||
* slots we may end with a radix tree containing many entries.
|
|
||||||
*
|
*
|
||||||
* So Redis allows the user to configure a maximum fill rate for the
|
* So Redis allows the user to configure a maximum number of keys for the
|
||||||
* invalidation table. This function makes sure that we don't go over the
|
* invalidation table. This function makes sure that we don't go over the
|
||||||
* specified fill rate: if we are over, we can just evict informations about
|
* specified fill rate: if we are over, we can just evict informations about
|
||||||
* random caching slots, and send invalidation messages to clients like if
|
* a random key, and send invalidation messages to clients like if the key was
|
||||||
* the key was modified. */
|
* modified. */
|
||||||
void trackingLimitUsedSlots(void) {
|
void trackingLimitUsedSlots(void) {
|
||||||
static unsigned int timeout_counter = 0;
|
static unsigned int timeout_counter = 0;
|
||||||
|
if (TrackingTable == NULL) return;
|
||||||
if (g_pserver->tracking_table_max_fill == 0) return; /* No limits set. */
|
if (g_pserver->tracking_table_max_fill == 0) return; /* No limits set. */
|
||||||
unsigned int max_slots =
|
size_t max_keys = g_pserver->tracking_table_max_fill;
|
||||||
(TRACKING_TABLE_SIZE/100) * g_pserver->tracking_table_max_fill;
|
if (raxSize(TrackingTable) <= max_keys) {
|
||||||
if (TrackingTableUsedSlots <= max_slots) {
|
|
||||||
timeout_counter = 0;
|
timeout_counter = 0;
|
||||||
return; /* Limit not reached. */
|
return; /* Limit not reached. */
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We have to invalidate a few slots to reach the limit again. The effort
|
/* We have to invalidate a few keys to reach the limit again. The effort
|
||||||
* we do here is proportional to the number of times we entered this
|
* we do here is proportional to the number of times we entered this
|
||||||
* function and found that we are still over the limit. */
|
* function and found that we are still over the limit. */
|
||||||
int effort = 100 * (timeout_counter+1);
|
int effort = 100 * (timeout_counter+1);
|
||||||
|
|
||||||
/* Let's start at a random position, and perform linear probing, in order
|
/* We just remove one key after another by using a random walk. */
|
||||||
* to improve cache locality. However once we are able to find an used
|
raxIterator ri;
|
||||||
* slot, jump again randomly, in order to avoid creating big holes in the
|
raxStart(&ri,TrackingTable);
|
||||||
* table (that will make this funciton use more resourced later). */
|
|
||||||
while(effort > 0) {
|
while(effort > 0) {
|
||||||
unsigned int idx = rand() % TRACKING_TABLE_SIZE;
|
|
||||||
do {
|
|
||||||
effort--;
|
effort--;
|
||||||
idx = (idx+1) % TRACKING_TABLE_SIZE;
|
raxSeek(&ri,"^",NULL,0);
|
||||||
if (TrackingTable[idx] != NULL) {
|
raxRandomWalk(&ri,0);
|
||||||
trackingInvalidateSlot(idx);
|
rax *ids = (rax*)ri.data;
|
||||||
if (TrackingTableUsedSlots <= max_slots) {
|
TrackingTableTotalItems -= raxSize(ids);
|
||||||
|
raxFree(ids);
|
||||||
|
raxRemove(TrackingTable,ri.key,ri.key_len,NULL);
|
||||||
|
if (raxSize(TrackingTable) <= max_keys) {
|
||||||
timeout_counter = 0;
|
timeout_counter = 0;
|
||||||
|
raxStop(&ri);
|
||||||
return; /* Return ASAP: we are again under the limit. */
|
return; /* Return ASAP: we are again under the limit. */
|
||||||
} else {
|
|
||||||
break; /* Jump to next random position. */
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} while(effort > 0);
|
|
||||||
}
|
/* If we reach this point, we were not able to go under the configured
|
||||||
|
* limit using the maximum effort we had for this run. */
|
||||||
|
raxStop(&ri);
|
||||||
timeout_counter++;
|
timeout_counter++;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This is just used in order to access the amount of used slots in the
|
/* This is just used in order to access the amount of used slots in the
|
||||||
* tracking table. */
|
* tracking table. */
|
||||||
unsigned long long trackingGetUsedSlots(void) {
|
uint64_t trackingGetTotalItems(void) {
|
||||||
return TrackingTableUsedSlots;
|
return TrackingTableTotalItems;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user