Merge remote-tracking branch 'oss/unstable' into module_rdb_load_errors
This commit is contained in:
commit
e5187ad2ae
@ -406,7 +406,7 @@ replicas, or to continue the replication after a disconnection.
|
||||
Other C files
|
||||
---
|
||||
|
||||
* `t_hash.c`, `t_list.c`, `t_set.c`, `t_string.c` and `t_zset.c` contains the implementation of the Redis data types. They implement both an API to access a given data type, and the client commands implementations for these data types.
|
||||
* `t_hash.c`, `t_list.c`, `t_set.c`, `t_string.c`, `t_zset.c` and `t_stream.c` contains the implementation of the Redis data types. They implement both an API to access a given data type, and the client commands implementations for these data types.
|
||||
* `ae.c` implements the Redis event loop, it's a self contained library which is simple to read and understand.
|
||||
* `sds.c` is the Redis string library, check http://github.com/antirez/sds for more information.
|
||||
* `anet.c` is a library to use POSIX networking in a simpler way compared to the raw interface exposed by the kernel.
|
||||
|
121
redis.conf
121
redis.conf
@ -336,13 +336,11 @@ replica-read-only yes
|
||||
|
||||
# Replication SYNC strategy: disk or socket.
|
||||
#
|
||||
# -------------------------------------------------------
|
||||
# WARNING: DISKLESS REPLICATION IS EXPERIMENTAL CURRENTLY
|
||||
# -------------------------------------------------------
|
||||
# New replicas and reconnecting replicas that are not able to continue the
|
||||
# replication process just receiving differences, need to do what is called a
|
||||
# "full synchronization". An RDB file is transmitted from the master to the
|
||||
# replicas.
|
||||
#
|
||||
# New replicas and reconnecting replicas that are not able to continue the replication
|
||||
# process just receiving differences, need to do what is called a "full
|
||||
# synchronization". An RDB file is transmitted from the master to the replicas.
|
||||
# The transmission can happen in two different ways:
|
||||
#
|
||||
# 1) Disk-backed: The Redis master creates a new process that writes the RDB
|
||||
@ -352,14 +350,14 @@ replica-read-only yes
|
||||
# RDB file to replica sockets, without touching the disk at all.
|
||||
#
|
||||
# With disk-backed replication, while the RDB file is generated, more replicas
|
||||
# can be queued and served with the RDB file as soon as the current child producing
|
||||
# the RDB file finishes its work. With diskless replication instead once
|
||||
# the transfer starts, new replicas arriving will be queued and a new transfer
|
||||
# will start when the current one terminates.
|
||||
# can be queued and served with the RDB file as soon as the current child
|
||||
# producing the RDB file finishes its work. With diskless replication instead
|
||||
# once the transfer starts, new replicas arriving will be queued and a new
|
||||
# transfer will start when the current one terminates.
|
||||
#
|
||||
# When diskless replication is used, the master waits a configurable amount of
|
||||
# time (in seconds) before starting the transfer in the hope that multiple replicas
|
||||
# will arrive and the transfer can be parallelized.
|
||||
# time (in seconds) before starting the transfer in the hope that multiple
|
||||
# replicas will arrive and the transfer can be parallelized.
|
||||
#
|
||||
# With slow disks and fast (large bandwidth) networks, diskless replication
|
||||
# works better.
|
||||
@ -370,22 +368,32 @@ repl-diskless-sync no
|
||||
# to the replicas.
|
||||
#
|
||||
# This is important since once the transfer starts, it is not possible to serve
|
||||
# new replicas arriving, that will be queued for the next RDB transfer, so the server
|
||||
# waits a delay in order to let more replicas arrive.
|
||||
# new replicas arriving, that will be queued for the next RDB transfer, so the
|
||||
# server waits a delay in order to let more replicas arrive.
|
||||
#
|
||||
# The delay is specified in seconds, and by default is 5 seconds. To disable
|
||||
# it entirely just set it to 0 seconds and the transfer will start ASAP.
|
||||
repl-diskless-sync-delay 5
|
||||
|
||||
# Replica can load the rdb it reads from the replication link directly from the
|
||||
# socket, or store the rdb to a file and read that file after it was completely
|
||||
# -----------------------------------------------------------------------------
|
||||
# WARNING: RDB diskless load is experimental. Since in this setup the replica
|
||||
# does not immediately store an RDB on disk, it may cause data loss during
|
||||
# failovers. RDB diskless load + Redis modules not handling I/O reads may also
|
||||
# cause Redis to abort in case of I/O errors during the initial synchronization
|
||||
# stage with the master. Use only if your do what you are doing.
|
||||
# -----------------------------------------------------------------------------
|
||||
#
|
||||
# Replica can load the RDB it reads from the replication link directly from the
|
||||
# socket, or store the RDB to a file and read that file after it was completely
|
||||
# recived from the master.
|
||||
#
|
||||
# In many cases the disk is slower than the network, and storing and loading
|
||||
# the rdb file may increase replication time (and even increase the master's
|
||||
# the RDB file may increase replication time (and even increase the master's
|
||||
# Copy on Write memory and salve buffers).
|
||||
# However, parsing the rdb file directly from the socket may mean that we have
|
||||
# to flush the contents of the current database before the full rdb was received.
|
||||
# for this reason we have the following options:
|
||||
# However, parsing the RDB file directly from the socket may mean that we have
|
||||
# to flush the contents of the current database before the full rdb was
|
||||
# received. For this reason we have the following options:
|
||||
#
|
||||
# "disabled" - Don't use diskless load (store the rdb file to the disk first)
|
||||
# "on-empty-db" - Use diskless load only when it is completely safe.
|
||||
# "swapdb" - Keep a copy of the current db contents in RAM while parsing
|
||||
@ -393,9 +401,9 @@ repl-diskless-sync-delay 5
|
||||
# sufficient memory, if you don't have it, you risk an OOM kill.
|
||||
repl-diskless-load disabled
|
||||
|
||||
# Replicas send PINGs to server in a predefined interval. It's possible to change
|
||||
# this interval with the repl_ping_replica_period option. The default value is 10
|
||||
# seconds.
|
||||
# Replicas send PINGs to server in a predefined interval. It's possible to
|
||||
# change this interval with the repl_ping_replica_period option. The default
|
||||
# value is 10 seconds.
|
||||
#
|
||||
# repl-ping-replica-period 10
|
||||
|
||||
@ -427,10 +435,10 @@ repl-diskless-load disabled
|
||||
repl-disable-tcp-nodelay no
|
||||
|
||||
# Set the replication backlog size. The backlog is a buffer that accumulates
|
||||
# replica data when replicas are disconnected for some time, so that when a replica
|
||||
# wants to reconnect again, often a full resync is not needed, but a partial
|
||||
# resync is enough, just passing the portion of data the replica missed while
|
||||
# disconnected.
|
||||
# replica data when replicas are disconnected for some time, so that when a
|
||||
# replica wants to reconnect again, often a full resync is not needed, but a
|
||||
# partial resync is enough, just passing the portion of data the replica
|
||||
# missed while disconnected.
|
||||
#
|
||||
# The bigger the replication backlog, the longer the time the replica can be
|
||||
# disconnected and later be able to perform a partial resynchronization.
|
||||
@ -452,13 +460,13 @@ repl-disable-tcp-nodelay no
|
||||
#
|
||||
# repl-backlog-ttl 3600
|
||||
|
||||
# The replica priority is an integer number published by Redis in the INFO output.
|
||||
# It is used by Redis Sentinel in order to select a replica to promote into a
|
||||
# master if the master is no longer working correctly.
|
||||
# The replica priority is an integer number published by Redis in the INFO
|
||||
# output. It is used by Redis Sentinel in order to select a replica to promote
|
||||
# into a master if the master is no longer working correctly.
|
||||
#
|
||||
# A replica with a low priority number is considered better for promotion, so
|
||||
# for instance if there are three replicas with priority 10, 100, 25 Sentinel will
|
||||
# pick the one with priority 10, that is the lowest.
|
||||
# for instance if there are three replicas with priority 10, 100, 25 Sentinel
|
||||
# will pick the one with priority 10, that is the lowest.
|
||||
#
|
||||
# However a special priority of 0 marks the replica as not able to perform the
|
||||
# role of master, so a replica with priority of 0 will never be selected by
|
||||
@ -518,6 +526,39 @@ replica-priority 100
|
||||
# replica-announce-ip 5.5.5.5
|
||||
# replica-announce-port 1234
|
||||
|
||||
############################### KEYS TRACKING #################################
|
||||
|
||||
# Redis implements server assisted support for client side caching of values.
|
||||
# This is implemented using an invalidation table that remembers, using
|
||||
# 16 millions of slots, what clients may have certain subsets of keys. In turn
|
||||
# this is used in order to send invalidation messages to clients. Please
|
||||
# to understand more about the feature check this page:
|
||||
#
|
||||
# https://redis.io/topics/client-side-caching
|
||||
#
|
||||
# When tracking is enabled for a client, all the read only queries are assumed
|
||||
# to be cached: this will force Redis to store information in the invalidation
|
||||
# table. When keys are modified, such information is flushed away, and
|
||||
# invalidation messages are sent to the clients. However if the workload is
|
||||
# heavily dominated by reads, Redis could use more and more memory in order
|
||||
# to track the keys fetched by many clients.
|
||||
#
|
||||
# For this reason it is possible to configure a maximum fill value for the
|
||||
# invalidation table. By default it is set to 10%, and once this limit is
|
||||
# reached, Redis will start to evict caching slots in the invalidation table
|
||||
# even if keys are not modified, just to reclaim memory: this will in turn
|
||||
# force the clients to invalidate the cached values. Basically the table
|
||||
# maximum fill rate is a trade off between the memory you want to spend server
|
||||
# side to track information about who cached what, and the ability of clients
|
||||
# to retain cached objects in memory.
|
||||
#
|
||||
# If you set the value to 0, it means there are no limits, and all the 16
|
||||
# millions of caching slots can be used at the same time. In the "stats"
|
||||
# INFO section, you can find information about the amount of caching slots
|
||||
# used at every given moment.
|
||||
#
|
||||
# tracking-table-max-fill 10
|
||||
|
||||
################################## SECURITY ###################################
|
||||
|
||||
# Warning: since Redis is pretty fast an outside user can try up to
|
||||
@ -747,17 +788,17 @@ replica-priority 100
|
||||
# DEL commands to the replica as keys evict in the master side.
|
||||
#
|
||||
# This behavior ensures that masters and replicas stay consistent, and is usually
|
||||
# what you want, however if your replica is writable, or you want the replica to have
|
||||
# a different memory setting, and you are sure all the writes performed to the
|
||||
# replica are idempotent, then you may change this default (but be sure to understand
|
||||
# what you are doing).
|
||||
# what you want, however if your replica is writable, or you want the replica
|
||||
# to have a different memory setting, and you are sure all the writes performed
|
||||
# to the replica are idempotent, then you may change this default (but be sure
|
||||
# to understand what you are doing).
|
||||
#
|
||||
# Note that since the replica by default does not evict, it may end using more
|
||||
# memory than the one set via maxmemory (there are certain buffers that may
|
||||
# be larger on the replica, or data structures may sometimes take more memory and so
|
||||
# forth). So make sure you monitor your replicas and make sure they have enough
|
||||
# memory to never hit a real out-of-memory condition before the master hits
|
||||
# the configured maxmemory setting.
|
||||
# be larger on the replica, or data structures may sometimes take more memory
|
||||
# and so forth). So make sure you monitor your replicas and make sure they
|
||||
# have enough memory to never hit a real out-of-memory condition before the
|
||||
# master hits the configured maxmemory setting.
|
||||
#
|
||||
# replica-ignore-maxmemory yes
|
||||
|
||||
|
@ -13,4 +13,4 @@ then
|
||||
fi
|
||||
|
||||
make -C tests/modules && \
|
||||
$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter "${@}"
|
||||
$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter --single unit/moduleapi/testrdb "${@}"
|
||||
|
@ -303,9 +303,7 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) {
|
||||
nwritten = write(fd, buf, len);
|
||||
|
||||
if (nwritten < 0) {
|
||||
if (errno == EINTR) {
|
||||
continue;
|
||||
}
|
||||
if (errno == EINTR) continue;
|
||||
return totwritten ? totwritten : -1;
|
||||
}
|
||||
|
||||
|
@ -4251,12 +4251,7 @@ NULL
|
||||
}
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
|
||||
/* CLUSTER NODES */
|
||||
robj *o;
|
||||
sds ci = clusterGenNodesDescription(0);
|
||||
|
||||
o = createObject(OBJ_STRING,ci);
|
||||
addReplyBulk(c,o);
|
||||
decrRefCount(o);
|
||||
addReplyBulkSds(c,clusterGenNodesDescription(0));
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
|
||||
/* CLUSTER MYID */
|
||||
addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
|
||||
@ -4832,7 +4827,7 @@ int verifyDumpPayload(unsigned char *p, size_t len) {
|
||||
* DUMP is actually not used by Redis Cluster but it is the obvious
|
||||
* complement of RESTORE and can be useful for different applications. */
|
||||
void dumpCommand(client *c) {
|
||||
robj *o, *dumpobj;
|
||||
robj *o;
|
||||
rio payload;
|
||||
|
||||
/* Check if the key is here. */
|
||||
@ -4845,9 +4840,7 @@ void dumpCommand(client *c) {
|
||||
createDumpPayload(&payload,o,c->argv[1]);
|
||||
|
||||
/* Transfer to the client */
|
||||
dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
|
||||
addReplyBulk(c,dumpobj);
|
||||
decrRefCount(dumpobj);
|
||||
addReplyBulkSds(c,payload.io.buffer.ptr);
|
||||
return;
|
||||
}
|
||||
|
||||
|
24
src/config.c
24
src/config.c
@ -686,6 +686,17 @@ void loadServerConfigFromString(char *config) {
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"slowlog-max-len") && argc == 2) {
|
||||
server.slowlog_max_len = strtoll(argv[1],NULL,10);
|
||||
} else if (!strcasecmp(argv[0],"tracking-table-max-fill") &&
|
||||
argc == 2)
|
||||
{
|
||||
server.tracking_table_max_fill = strtoll(argv[1],NULL,10);
|
||||
if (server.tracking_table_max_fill > 100 ||
|
||||
server.tracking_table_max_fill < 0)
|
||||
{
|
||||
err = "The tracking table fill percentage must be an "
|
||||
"integer between 0 and 100";
|
||||
goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"client-output-buffer-limit") &&
|
||||
argc == 5)
|
||||
{
|
||||
@ -1133,6 +1144,8 @@ void configSetCommand(client *c) {
|
||||
"slowlog-max-len",ll,0,LONG_MAX) {
|
||||
/* Cast to unsigned. */
|
||||
server.slowlog_max_len = (unsigned long)ll;
|
||||
} config_set_numerical_field(
|
||||
"tracking-table-max-fill",server.tracking_table_max_fill,0,100) {
|
||||
} config_set_numerical_field(
|
||||
"latency-monitor-threshold",server.latency_monitor_threshold,0,LLONG_MAX){
|
||||
} config_set_numerical_field(
|
||||
@ -1338,8 +1351,8 @@ void configGetCommand(client *c) {
|
||||
server.slowlog_log_slower_than);
|
||||
config_get_numerical_field("latency-monitor-threshold",
|
||||
server.latency_monitor_threshold);
|
||||
config_get_numerical_field("slowlog-max-len",
|
||||
server.slowlog_max_len);
|
||||
config_get_numerical_field("slowlog-max-len", server.slowlog_max_len);
|
||||
config_get_numerical_field("tracking-table-max-fill", server.tracking_table_max_fill);
|
||||
config_get_numerical_field("port",server.port);
|
||||
config_get_numerical_field("cluster-announce-port",server.cluster_announce_port);
|
||||
config_get_numerical_field("cluster-announce-bus-port",server.cluster_announce_bus_port);
|
||||
@ -1470,12 +1483,10 @@ void configGetCommand(client *c) {
|
||||
matches++;
|
||||
}
|
||||
if (stringmatch(pattern,"notify-keyspace-events",1)) {
|
||||
robj *flagsobj = createObject(OBJ_STRING,
|
||||
keyspaceEventsFlagsToString(server.notify_keyspace_events));
|
||||
sds flags = keyspaceEventsFlagsToString(server.notify_keyspace_events);
|
||||
|
||||
addReplyBulkCString(c,"notify-keyspace-events");
|
||||
addReplyBulk(c,flagsobj);
|
||||
decrRefCount(flagsobj);
|
||||
addReplyBulkSds(c,flags);
|
||||
matches++;
|
||||
}
|
||||
if (stringmatch(pattern,"bind",1)) {
|
||||
@ -2167,6 +2178,7 @@ int rewriteConfig(char *path) {
|
||||
rewriteConfigNumericalOption(state,"slowlog-log-slower-than",server.slowlog_log_slower_than,CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN);
|
||||
rewriteConfigNumericalOption(state,"latency-monitor-threshold",server.latency_monitor_threshold,CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD);
|
||||
rewriteConfigNumericalOption(state,"slowlog-max-len",server.slowlog_max_len,CONFIG_DEFAULT_SLOWLOG_MAX_LEN);
|
||||
rewriteConfigNumericalOption(state,"tracking-table-max-fill",server.tracking_table_max_fill,CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL);
|
||||
rewriteConfigNotifykeyspaceeventsOption(state);
|
||||
rewriteConfigNumericalOption(state,"hash-max-ziplist-entries",server.hash_max_ziplist_entries,OBJ_HASH_MAX_ZIPLIST_ENTRIES);
|
||||
rewriteConfigNumericalOption(state,"hash-max-ziplist-value",server.hash_max_ziplist_value,OBJ_HASH_MAX_ZIPLIST_VALUE);
|
||||
|
10
src/db.c
10
src/db.c
@ -353,6 +353,11 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Make sure the WATCHed keys are affected by the FLUSH* commands.
|
||||
* Note that we need to call the function while the keys are still
|
||||
* there. */
|
||||
signalFlushedDb(dbnum);
|
||||
|
||||
int startdb, enddb;
|
||||
if (dbnum == -1) {
|
||||
startdb = 0;
|
||||
@ -412,11 +417,12 @@ long long dbTotalServerKeyCount() {
|
||||
|
||||
void signalModifiedKey(redisDb *db, robj *key) {
|
||||
touchWatchedKey(db,key);
|
||||
if (server.tracking_clients) trackingInvalidateKey(key);
|
||||
trackingInvalidateKey(key);
|
||||
}
|
||||
|
||||
void signalFlushedDb(int dbid) {
|
||||
touchWatchedKeysOnFlush(dbid);
|
||||
trackingInvalidateKeysOnFlush(dbid);
|
||||
}
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
@ -452,7 +458,6 @@ void flushdbCommand(client *c) {
|
||||
int flags;
|
||||
|
||||
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
|
||||
signalFlushedDb(c->db->id);
|
||||
server.dirty += emptyDb(c->db->id,flags,NULL);
|
||||
addReply(c,shared.ok);
|
||||
}
|
||||
@ -464,7 +469,6 @@ void flushallCommand(client *c) {
|
||||
int flags;
|
||||
|
||||
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
|
||||
signalFlushedDb(-1);
|
||||
server.dirty += emptyDb(-1,flags,NULL);
|
||||
addReply(c,shared.ok);
|
||||
if (server.rdb_child_pid != -1) killRDBChild();
|
||||
|
@ -64,7 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
|
||||
dbSyncDelete(db,keyobj);
|
||||
notifyKeyspaceEvent(NOTIFY_EXPIRED,
|
||||
"expired",keyobj,db->id);
|
||||
if (server.tracking_clients) trackingInvalidateKey(keyobj);
|
||||
trackingInvalidateKey(keyobj);
|
||||
decrRefCount(keyobj);
|
||||
server.stat_expiredkeys++;
|
||||
return 1;
|
||||
|
@ -701,6 +701,7 @@ int hllSparseSet(robj *o, long index, uint8_t count) {
|
||||
first += span;
|
||||
}
|
||||
if (span == 0) return -1; /* Invalid format. */
|
||||
if (span >= end) return -1; /* Invalid format. */
|
||||
|
||||
next = HLL_SPARSE_IS_XZERO(p) ? p+2 : p+1;
|
||||
if (next >= end) next = NULL;
|
||||
|
51
src/module.c
51
src/module.c
@ -29,6 +29,7 @@
|
||||
|
||||
#include "server.h"
|
||||
#include "cluster.h"
|
||||
#include "rdb.h"
|
||||
#include <dlfcn.h>
|
||||
|
||||
#define REDISMODULE_CORE 1
|
||||
@ -3092,6 +3093,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
|
||||
moduleTypeMemUsageFunc mem_usage;
|
||||
moduleTypeDigestFunc digest;
|
||||
moduleTypeFreeFunc free;
|
||||
struct {
|
||||
moduleTypeAuxLoadFunc aux_load;
|
||||
moduleTypeAuxSaveFunc aux_save;
|
||||
int aux_save_triggers;
|
||||
} v2;
|
||||
} *tms = (struct typemethods*) typemethods_ptr;
|
||||
|
||||
moduleType *mt = zcalloc(sizeof(*mt));
|
||||
@ -3103,6 +3109,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
|
||||
mt->mem_usage = tms->mem_usage;
|
||||
mt->digest = tms->digest;
|
||||
mt->free = tms->free;
|
||||
if (tms->version >= 2) {
|
||||
mt->aux_load = tms->v2.aux_load;
|
||||
mt->aux_save = tms->v2.aux_save;
|
||||
mt->aux_save_triggers = tms->v2.aux_save_triggers;
|
||||
}
|
||||
memcpy(mt->name,name,sizeof(mt->name));
|
||||
listAddNodeTail(ctx->module->types,mt);
|
||||
return mt;
|
||||
@ -3405,6 +3416,36 @@ loaderr:
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Iterate over modules, and trigger rdb aux saving for the ones modules types
|
||||
* who asked for it. */
|
||||
ssize_t rdbSaveModulesAux(rio *rdb, int when) {
|
||||
size_t total_written = 0;
|
||||
dictIterator *di = dictGetIterator(modules);
|
||||
dictEntry *de;
|
||||
|
||||
while ((de = dictNext(di)) != NULL) {
|
||||
struct RedisModule *module = dictGetVal(de);
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
||||
listRewind(module->types,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
moduleType *mt = ln->value;
|
||||
if (!mt->aux_save || !(mt->aux_save_triggers & when))
|
||||
continue;
|
||||
ssize_t ret = rdbSaveSingleModuleAux(rdb, when, mt);
|
||||
if (ret==-1) {
|
||||
dictReleaseIterator(di);
|
||||
return -1;
|
||||
}
|
||||
total_written += ret;
|
||||
}
|
||||
}
|
||||
|
||||
dictReleaseIterator(di);
|
||||
return total_written;
|
||||
}
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
* Key digest API (DEBUG DIGEST interface for modules types)
|
||||
* -------------------------------------------------------------------------- */
|
||||
@ -3565,7 +3606,7 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li
|
||||
|
||||
if (level < server.verbosity) return;
|
||||
|
||||
name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name);
|
||||
name_len = snprintf(msg, sizeof(msg),"<%s> ", module? module->name: "module");
|
||||
vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap);
|
||||
serverLogRaw(level,msg);
|
||||
}
|
||||
@ -3583,13 +3624,15 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li
|
||||
* There is a fixed limit to the length of the log line this function is able
|
||||
* to emit, this limit is not specified but is guaranteed to be more than
|
||||
* a few lines of text.
|
||||
*
|
||||
* The ctx argument may be NULL if cannot be provided in the context of the
|
||||
* caller for instance threads or callbacks, in which case a generic "module"
|
||||
* will be used instead of the module name.
|
||||
*/
|
||||
void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) {
|
||||
if (!ctx->module) return; /* Can only log if module is initialized */
|
||||
|
||||
va_list ap;
|
||||
va_start(ap, fmt);
|
||||
RM_LogRaw(ctx->module,levelstr,fmt,ap);
|
||||
RM_LogRaw(ctx? ctx->module: NULL,levelstr,fmt,ap);
|
||||
va_end(ap);
|
||||
}
|
||||
|
||||
|
254
src/rdb.c
254
src/rdb.c
@ -42,31 +42,35 @@
|
||||
#include <sys/stat.h>
|
||||
#include <sys/param.h>
|
||||
|
||||
#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__)
|
||||
/* This macro is called when the internal RDB stracture is corrupt */
|
||||
#define rdbExitReportCorruptRDB(...) rdbReportError(1, __LINE__,__VA_ARGS__)
|
||||
/* This macro is called when RDB read failed (possibly a short read) */
|
||||
#define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__)
|
||||
|
||||
char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
|
||||
extern int rdbCheckMode;
|
||||
void rdbCheckError(const char *fmt, ...);
|
||||
void rdbCheckSetError(const char *fmt, ...);
|
||||
|
||||
void rdbCheckThenExit(int linenum, char *reason, ...) {
|
||||
void rdbReportError(int corruption_error, int linenum, char *reason, ...) {
|
||||
va_list ap;
|
||||
char msg[1024];
|
||||
int len;
|
||||
|
||||
len = snprintf(msg,sizeof(msg),
|
||||
"Internal error in RDB reading function at rdb.c:%d -> ", linenum);
|
||||
"Internal error in RDB reading offset %llu, function at rdb.c:%d -> ",
|
||||
(unsigned long long)server.loading_loaded_bytes, linenum);
|
||||
va_start(ap,reason);
|
||||
vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
|
||||
va_end(ap);
|
||||
|
||||
if (!rdbCheckMode) {
|
||||
serverLog(LL_WARNING, "%s", msg);
|
||||
if (rdbFileBeingLoaded) {
|
||||
if (rdbFileBeingLoaded || corruption_error) {
|
||||
serverLog(LL_WARNING, "%s", msg);
|
||||
char *argv[2] = {"",rdbFileBeingLoaded};
|
||||
redis_check_rdb_main(2,argv,NULL);
|
||||
} else {
|
||||
serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation.");
|
||||
serverLog(LL_WARNING, "%s. Failure loading rdb format from socket, assuming connection error, resuming operation.", msg);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
@ -82,18 +86,6 @@ static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
|
||||
return len;
|
||||
}
|
||||
|
||||
/* This is just a wrapper for the low level function rioRead() that will
|
||||
* automatically abort if it is not possible to read the specified amount
|
||||
* of bytes. */
|
||||
void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) {
|
||||
if (rioRead(rdb,buf,len) == 0) {
|
||||
rdbExitReportCorruptRDB(
|
||||
"Impossible to read %llu bytes in rdbLoadRaw()",
|
||||
(unsigned long long) len);
|
||||
return; /* Not reached. */
|
||||
}
|
||||
}
|
||||
|
||||
int rdbSaveType(rio *rdb, unsigned char type) {
|
||||
return rdbWriteRaw(rdb,&type,1);
|
||||
}
|
||||
@ -109,10 +101,12 @@ int rdbLoadType(rio *rdb) {
|
||||
|
||||
/* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME
|
||||
* opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS
|
||||
* opcode. */
|
||||
* opcode. On error -1 is returned, however this could be a valid time, so
|
||||
* to check for loading errors the caller should call rioGetReadError() after
|
||||
* calling this function. */
|
||||
time_t rdbLoadTime(rio *rdb) {
|
||||
int32_t t32;
|
||||
rdbLoadRaw(rdb,&t32,4);
|
||||
if (rioRead(rdb,&t32,4) == 0) return -1;
|
||||
return (time_t)t32;
|
||||
}
|
||||
|
||||
@ -132,10 +126,14 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) {
|
||||
* after upgrading to Redis version 5 they will no longer be able to load their
|
||||
* own old RDB files. Because of that, we instead fix the function only for new
|
||||
* RDB versions, and load older RDB versions as we used to do in the past,
|
||||
* allowing big endian systems to load their own old RDB files. */
|
||||
* allowing big endian systems to load their own old RDB files.
|
||||
*
|
||||
* On I/O error the function returns LLONG_MAX, however if this is also a
|
||||
* valid stored value, the caller should use rioGetReadError() to check for
|
||||
* errors after calling this function. */
|
||||
long long rdbLoadMillisecondTime(rio *rdb, int rdbver) {
|
||||
int64_t t64;
|
||||
rdbLoadRaw(rdb,&t64,8);
|
||||
if (rioRead(rdb,&t64,8) == 0) return LLONG_MAX;
|
||||
if (rdbver >= 9) /* Check the top comment of this function. */
|
||||
memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */
|
||||
return (long long)t64;
|
||||
@ -284,8 +282,8 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
|
||||
v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
|
||||
val = (int32_t)v;
|
||||
} else {
|
||||
val = 0; /* anti-warning */
|
||||
rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype);
|
||||
return NULL; /* Never reached. */
|
||||
}
|
||||
if (plain || sds) {
|
||||
char buf[LONG_STR_SIZE], *p;
|
||||
@ -388,8 +386,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
|
||||
/* Load the compressed representation and uncompress it to target. */
|
||||
if (rioRead(rdb,c,clen) == 0) goto err;
|
||||
if (lzf_decompress(c,clen,val,len) == 0) {
|
||||
if (rdbCheckMode) rdbCheckSetError("Invalid LZF compressed string");
|
||||
goto err;
|
||||
rdbExitReportCorruptRDB("Invalid LZF compressed string");
|
||||
}
|
||||
zfree(c);
|
||||
|
||||
@ -503,6 +500,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
|
||||
return rdbLoadLzfStringObject(rdb,flags,lenptr);
|
||||
default:
|
||||
rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len);
|
||||
return NULL; /* Never reached. */
|
||||
}
|
||||
}
|
||||
|
||||
@ -973,7 +971,6 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
|
||||
RedisModuleIO io;
|
||||
moduleValue *mv = o->ptr;
|
||||
moduleType *mt = mv->type;
|
||||
moduleInitIOContext(io,mt,rdb,key);
|
||||
|
||||
/* Write the "module" identifier as prefix, so that we'll be able
|
||||
* to call the right module during loading. */
|
||||
@ -982,10 +979,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
|
||||
io.bytes += retval;
|
||||
|
||||
/* Then write the module-specific representation + EOF marker. */
|
||||
moduleInitIOContext(io,mt,rdb,key);
|
||||
mt->rdb_save(&io,mv->value);
|
||||
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
|
||||
if (retval == -1) return -1;
|
||||
io.bytes += retval;
|
||||
if (retval == -1)
|
||||
io.error = 1;
|
||||
else
|
||||
io.bytes += retval;
|
||||
|
||||
if (io.ctx) {
|
||||
moduleFreeContext(io.ctx);
|
||||
@ -1103,6 +1103,40 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
|
||||
/* Save a module-specific aux value. */
|
||||
RedisModuleIO io;
|
||||
int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX);
|
||||
|
||||
/* Write the "module" identifier as prefix, so that we'll be able
|
||||
* to call the right module during loading. */
|
||||
retval = rdbSaveLen(rdb,mt->id);
|
||||
if (retval == -1) return -1;
|
||||
io.bytes += retval;
|
||||
|
||||
/* write the 'when' so that we can provide it on loading */
|
||||
retval = rdbSaveLen(rdb,when);
|
||||
if (retval == -1) return -1;
|
||||
io.bytes += retval;
|
||||
|
||||
/* Then write the module-specific representation + EOF marker. */
|
||||
moduleInitIOContext(io,mt,rdb,NULL);
|
||||
mt->aux_save(&io,when);
|
||||
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
|
||||
if (retval == -1)
|
||||
io.error = 1;
|
||||
else
|
||||
io.bytes += retval;
|
||||
|
||||
if (io.ctx) {
|
||||
moduleFreeContext(io.ctx);
|
||||
zfree(io.ctx);
|
||||
}
|
||||
if (io.error)
|
||||
return -1;
|
||||
return io.bytes;
|
||||
}
|
||||
|
||||
/* Produces a dump of the database in RDB format sending it to the specified
|
||||
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
|
||||
* is returned and part of the output, or all the output, can be
|
||||
@ -1124,6 +1158,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
|
||||
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
|
||||
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
|
||||
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
|
||||
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
|
||||
|
||||
for (j = 0; j < server.dbnum; j++) {
|
||||
redisDb *db = server.db+j;
|
||||
@ -1185,6 +1220,8 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
|
||||
di = NULL; /* So that we don't release it again on error. */
|
||||
}
|
||||
|
||||
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
|
||||
|
||||
/* EOF opcode */
|
||||
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
|
||||
|
||||
@ -1644,6 +1681,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
hashTypeConvert(o, OBJ_ENCODING_HT);
|
||||
break;
|
||||
default:
|
||||
/* totally unreachable */
|
||||
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
|
||||
break;
|
||||
}
|
||||
@ -1651,6 +1689,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
o = createStreamObject();
|
||||
stream *s = o->ptr;
|
||||
uint64_t listpacks = rdbLoadLen(rdb,NULL);
|
||||
if (listpacks == RDB_LENERR) {
|
||||
rdbReportReadError("Stream listpacks len loading failed.");
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
while(listpacks--) {
|
||||
/* Get the master ID, the one we'll use as key of the radix tree
|
||||
@ -1658,7 +1701,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
* relatively to this ID. */
|
||||
sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
|
||||
if (nodekey == NULL) {
|
||||
rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error.");
|
||||
rdbReportReadError("Stream master ID loading failed: invalid encoding or I/O error.");
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
if (sdslen(nodekey) != sizeof(streamID)) {
|
||||
rdbExitReportCorruptRDB("Stream node key entry is not the "
|
||||
@ -1668,7 +1713,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
/* Load the listpack. */
|
||||
unsigned char *lp =
|
||||
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
|
||||
if (lp == NULL) return NULL;
|
||||
if (lp == NULL) {
|
||||
rdbReportReadError("Stream listpacks loading failed.");
|
||||
sdsfree(nodekey);
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
unsigned char *first = lpFirst(lp);
|
||||
if (first == NULL) {
|
||||
/* Serialized listpacks should never be empty, since on
|
||||
@ -1686,12 +1736,24 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
}
|
||||
/* Load total number of items inside the stream. */
|
||||
s->length = rdbLoadLen(rdb,NULL);
|
||||
|
||||
/* Load the last entry ID. */
|
||||
s->last_id.ms = rdbLoadLen(rdb,NULL);
|
||||
s->last_id.seq = rdbLoadLen(rdb,NULL);
|
||||
|
||||
if (rioGetReadError(rdb)) {
|
||||
rdbReportReadError("Stream object metadata loading failed.");
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Consumer groups loading */
|
||||
size_t cgroups_count = rdbLoadLen(rdb,NULL);
|
||||
uint64_t cgroups_count = rdbLoadLen(rdb,NULL);
|
||||
if (cgroups_count == RDB_LENERR) {
|
||||
rdbReportReadError("Stream cgroup count loading failed.");
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
while(cgroups_count--) {
|
||||
/* Get the consumer group name and ID. We can then create the
|
||||
* consumer group ASAP and populate its structure as
|
||||
@ -1699,11 +1761,21 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
streamID cg_id;
|
||||
sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
|
||||
if (cgname == NULL) {
|
||||
rdbExitReportCorruptRDB(
|
||||
rdbReportReadError(
|
||||
"Error reading the consumer group name from Stream");
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
cg_id.ms = rdbLoadLen(rdb,NULL);
|
||||
cg_id.seq = rdbLoadLen(rdb,NULL);
|
||||
if (rioGetReadError(rdb)) {
|
||||
rdbReportReadError("Stream cgroup ID loading failed.");
|
||||
sdsfree(cgname);
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
|
||||
if (cgroup == NULL)
|
||||
rdbExitReportCorruptRDB("Duplicated consumer group name %s",
|
||||
@ -1715,13 +1787,28 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
* owner, since consumers for this group and their messages will
|
||||
* be read as a next step. So for now leave them not resolved
|
||||
* and later populate it. */
|
||||
size_t pel_size = rdbLoadLen(rdb,NULL);
|
||||
uint64_t pel_size = rdbLoadLen(rdb,NULL);
|
||||
if (pel_size == RDB_LENERR) {
|
||||
rdbReportReadError("Stream PEL size loading failed.");
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
while(pel_size--) {
|
||||
unsigned char rawid[sizeof(streamID)];
|
||||
rdbLoadRaw(rdb,rawid,sizeof(rawid));
|
||||
if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
|
||||
rdbReportReadError("Stream PEL ID loading failed.");
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
streamNACK *nack = streamCreateNACK(NULL);
|
||||
nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
|
||||
nack->delivery_count = rdbLoadLen(rdb,NULL);
|
||||
if (rioGetReadError(rdb)) {
|
||||
rdbReportReadError("Stream PEL NACK loading failed.");
|
||||
decrRefCount(o);
|
||||
streamFreeNACK(nack);
|
||||
return NULL;
|
||||
}
|
||||
if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL))
|
||||
rdbExitReportCorruptRDB("Duplicated gobal PEL entry "
|
||||
"loading stream consumer group");
|
||||
@ -1729,24 +1816,47 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
|
||||
/* Now that we loaded our global PEL, we need to load the
|
||||
* consumers and their local PELs. */
|
||||
size_t consumers_num = rdbLoadLen(rdb,NULL);
|
||||
uint64_t consumers_num = rdbLoadLen(rdb,NULL);
|
||||
if (consumers_num == RDB_LENERR) {
|
||||
rdbReportReadError("Stream consumers num loading failed.");
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
while(consumers_num--) {
|
||||
sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
|
||||
if (cname == NULL) {
|
||||
rdbExitReportCorruptRDB(
|
||||
"Error reading the consumer name from Stream group");
|
||||
rdbReportReadError(
|
||||
"Error reading the consumer name from Stream group.");
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
|
||||
1);
|
||||
sdsfree(cname);
|
||||
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
|
||||
if (rioGetReadError(rdb)) {
|
||||
rdbReportReadError("Stream short read reading seen time.");
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Load the PEL about entries owned by this specific
|
||||
* consumer. */
|
||||
pel_size = rdbLoadLen(rdb,NULL);
|
||||
if (pel_size == RDB_LENERR) {
|
||||
rdbReportReadError(
|
||||
"Stream consumer PEL num loading failed.");
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
while(pel_size--) {
|
||||
unsigned char rawid[sizeof(streamID)];
|
||||
rdbLoadRaw(rdb,rawid,sizeof(rawid));
|
||||
if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
|
||||
rdbReportReadError(
|
||||
"Stream short read reading PEL streamID.");
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid));
|
||||
if (nack == raxNotFound)
|
||||
rdbExitReportCorruptRDB("Consumer entry not found in "
|
||||
@ -1765,6 +1875,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
}
|
||||
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
|
||||
uint64_t moduleid = rdbLoadLen(rdb,NULL);
|
||||
if (rioGetReadError(rdb)) return NULL;
|
||||
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
|
||||
char name[10];
|
||||
|
||||
@ -1792,6 +1903,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
/* Module v2 serialization has an EOF mark at the end. */
|
||||
if (io.ver == 2) {
|
||||
uint64_t eof = rdbLoadLen(rdb,NULL);
|
||||
if (eof == RDB_LENERR) {
|
||||
o = createModuleObject(mt,ptr); /* creating just in order to easily destroy */
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
if (eof != RDB_MODULE_OPCODE_EOF) {
|
||||
serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name);
|
||||
exit(1);
|
||||
@ -1805,7 +1921,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
}
|
||||
o = createModuleObject(mt,ptr);
|
||||
} else {
|
||||
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
|
||||
rdbReportReadError("Unknown RDB encoding type %d",rdbtype);
|
||||
return NULL;
|
||||
}
|
||||
return o;
|
||||
}
|
||||
@ -1904,11 +2021,13 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
||||
* load the actual type, and continue. */
|
||||
expiretime = rdbLoadTime(rdb);
|
||||
expiretime *= 1000;
|
||||
if (rioGetReadError(rdb)) goto eoferr;
|
||||
continue; /* Read next opcode. */
|
||||
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
|
||||
/* EXPIRETIME_MS: milliseconds precision expire times introduced
|
||||
* with RDB v3. Like EXPIRETIME but no with more precision. */
|
||||
expiretime = rdbLoadMillisecondTime(rdb,rdbver);
|
||||
if (rioGetReadError(rdb)) goto eoferr;
|
||||
continue; /* Read next opcode. */
|
||||
} else if (type == RDB_OPCODE_FREQ) {
|
||||
/* FREQ: LFU frequency. */
|
||||
@ -2009,15 +2128,12 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
||||
decrRefCount(auxval);
|
||||
continue; /* Read type again. */
|
||||
} else if (type == RDB_OPCODE_MODULE_AUX) {
|
||||
/* This is just for compatibility with the future: we have plans
|
||||
* to add the ability for modules to store anything in the RDB
|
||||
* file, like data that is not related to the Redis key space.
|
||||
* Such data will potentially be stored both before and after the
|
||||
* RDB keys-values section. For this reason since RDB version 9,
|
||||
* we have the ability to read a MODULE_AUX opcode followed by an
|
||||
* identifier of the module, and a serialized value in "MODULE V2"
|
||||
* format. */
|
||||
/* Load module data that is not related to the Redis key space.
|
||||
* Such data can be potentially be stored both before and after the
|
||||
* RDB keys-values section. */
|
||||
uint64_t moduleid = rdbLoadLen(rdb,NULL);
|
||||
int when = rdbLoadLen(rdb,NULL);
|
||||
if (rioGetReadError(rdb)) goto eoferr;
|
||||
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
|
||||
char name[10];
|
||||
moduleTypeNameByID(name,moduleid);
|
||||
@ -2027,14 +2143,37 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
||||
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);
|
||||
exit(1);
|
||||
} else if (!rdbCheckMode && mt != NULL) {
|
||||
/* This version of Redis actually does not know what to do
|
||||
* with modules AUX data... */
|
||||
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name);
|
||||
exit(1);
|
||||
if (!mt->aux_load) {
|
||||
/* Module doesn't support AUX. */
|
||||
serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
RedisModuleIO io;
|
||||
moduleInitIOContext(io,mt,rdb,NULL);
|
||||
io.ver = 2;
|
||||
/* Call the rdb_load method of the module providing the 10 bit
|
||||
* encoding version in the lower 10 bits of the module ID. */
|
||||
if (mt->aux_load(&io,moduleid&1023, when) || io.error) {
|
||||
moduleTypeNameByID(name,moduleid);
|
||||
serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
|
||||
exit(1);
|
||||
}
|
||||
if (io.ctx) {
|
||||
moduleFreeContext(io.ctx);
|
||||
zfree(io.ctx);
|
||||
}
|
||||
uint64_t eof = rdbLoadLen(rdb,NULL);
|
||||
if (eof != RDB_MODULE_OPCODE_EOF) {
|
||||
serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name);
|
||||
exit(1);
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
/* RDB check mode. */
|
||||
robj *aux = rdbLoadCheckModuleValue(rdb,name);
|
||||
decrRefCount(aux);
|
||||
continue; /* Read next opcode. */
|
||||
}
|
||||
}
|
||||
|
||||
@ -2088,10 +2227,15 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
||||
}
|
||||
return C_OK;
|
||||
|
||||
eoferr: /* unexpected end of file is handled here with a fatal exit */
|
||||
serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
|
||||
rdbExitReportCorruptRDB("Unexpected EOF reading RDB file");
|
||||
return C_ERR; /* Just to avoid warning */
|
||||
/* Unexpected end of file is handled here calling rdbReportReadError():
|
||||
* this will in turn either abort Redis in most cases, or if we are loading
|
||||
* the RDB file from a socket during initial SYNC (diskless replica mode),
|
||||
* we'll report the error to the caller, so that we can retry. */
|
||||
eoferr:
|
||||
serverLog(LL_WARNING,
|
||||
"Short read or OOM loading DB. Unrecoverable error, aborting now.");
|
||||
rdbReportReadError("Unexpected EOF reading RDB file");
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
|
||||
|
@ -145,6 +145,7 @@ size_t rdbSavedObjectLen(robj *o);
|
||||
robj *rdbLoadObject(int type, rio *rdb, robj *key);
|
||||
void backgroundSaveDoneHandler(int exitcode, int bysignal);
|
||||
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
|
||||
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);
|
||||
robj *rdbLoadStringObject(rio *rdb);
|
||||
ssize_t rdbSaveStringObject(rio *rdb, robj *obj);
|
||||
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len);
|
||||
|
@ -104,6 +104,7 @@ static struct config {
|
||||
int is_fetching_slots;
|
||||
int is_updating_slots;
|
||||
int slots_last_update;
|
||||
int enable_tracking;
|
||||
/* Thread mutexes to be used as fallbacks by atomicvar.h */
|
||||
pthread_mutex_t requests_issued_mutex;
|
||||
pthread_mutex_t requests_finished_mutex;
|
||||
@ -255,7 +256,7 @@ static redisConfig *getRedisConfig(const char *ip, int port,
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if(config.auth){
|
||||
if(config.auth) {
|
||||
void *authReply = NULL;
|
||||
redisAppendCommand(c, "AUTH %s", config.auth);
|
||||
if (REDIS_OK != redisGetReply(c, &authReply)) goto fail;
|
||||
@ -633,6 +634,14 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
|
||||
c->prefix_pending++;
|
||||
}
|
||||
|
||||
if (config.enable_tracking) {
|
||||
char *buf = NULL;
|
||||
int len = redisFormatCommand(&buf, "CLIENT TRACKING on");
|
||||
c->obuf = sdscatlen(c->obuf, buf, len);
|
||||
free(buf);
|
||||
c->prefix_pending++;
|
||||
}
|
||||
|
||||
/* If a DB number different than zero is selected, prefix our request
|
||||
* buffer with the SELECT command, that will be discarded the first
|
||||
* time the replies are received, so if the client is reused the
|
||||
@ -1350,6 +1359,8 @@ int parseOptions(int argc, const char **argv) {
|
||||
} else if (config.num_threads < 0) config.num_threads = 0;
|
||||
} else if (!strcmp(argv[i],"--cluster")) {
|
||||
config.cluster_mode = 1;
|
||||
} else if (!strcmp(argv[i],"--enable-tracking")) {
|
||||
config.enable_tracking = 1;
|
||||
} else if (!strcmp(argv[i],"--help")) {
|
||||
exit_status = 0;
|
||||
goto usage;
|
||||
@ -1380,6 +1391,7 @@ usage:
|
||||
" --dbnum <db> SELECT the specified db number (default 0)\n"
|
||||
" --threads <num> Enable multi-thread mode.\n"
|
||||
" --cluster Enable cluster mode.\n"
|
||||
" --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n"
|
||||
" -k <boolean> 1=keep alive 0=reconnect (default 1)\n"
|
||||
" -r <keyspacelen> Use random keys for SET/GET/INCR, random values for SADD\n"
|
||||
" Using this option the benchmark will expand the string __rand_int__\n"
|
||||
@ -1504,6 +1516,7 @@ int main(int argc, const char **argv) {
|
||||
config.is_fetching_slots = 0;
|
||||
config.is_updating_slots = 0;
|
||||
config.slots_last_update = 0;
|
||||
config.enable_tracking = 0;
|
||||
|
||||
i = parseOptions(argc,argv);
|
||||
argc -= i;
|
||||
|
@ -216,14 +216,16 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
|
||||
/* EXPIRETIME: load an expire associated with the next key
|
||||
* to load. Note that after loading an expire we need to
|
||||
* load the actual type, and continue. */
|
||||
if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
|
||||
expiretime = rdbLoadTime(&rdb);
|
||||
expiretime *= 1000;
|
||||
if (rioGetReadError(&rdb)) goto eoferr;
|
||||
continue; /* Read next opcode. */
|
||||
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
|
||||
/* EXPIRETIME_MS: milliseconds precision expire times introduced
|
||||
* with RDB v3. Like EXPIRETIME but no with more precision. */
|
||||
rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE;
|
||||
if ((expiretime = rdbLoadMillisecondTime(&rdb, rdbver)) == -1) goto eoferr;
|
||||
expiretime = rdbLoadMillisecondTime(&rdb, rdbver);
|
||||
if (rioGetReadError(&rdb)) goto eoferr;
|
||||
continue; /* Read next opcode. */
|
||||
} else if (type == RDB_OPCODE_FREQ) {
|
||||
/* FREQ: LFU frequency. */
|
||||
|
@ -129,6 +129,10 @@
|
||||
|
||||
#define REDISMODULE_NOT_USED(V) ((void) V)
|
||||
|
||||
/* Bit flags for aux_save_triggers and the aux_load and aux_save callbacks */
|
||||
#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
|
||||
#define REDISMODULE_AUX_AFTER_RDB (1<<1)
|
||||
|
||||
/* This type represents a timer handle, and is returned when a timer is
|
||||
* registered and used in order to invalidate a timer. It's just a 64 bit
|
||||
* number, because this is how each timer is represented inside the radix tree
|
||||
@ -169,6 +173,8 @@ typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlocke
|
||||
typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
|
||||
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
|
||||
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
|
||||
typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when);
|
||||
typedef void (*RedisModuleTypeAuxSaveFunc)(RedisModuleIO *rdb, int when);
|
||||
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
|
||||
typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value);
|
||||
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value);
|
||||
@ -177,7 +183,7 @@ typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const cha
|
||||
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
|
||||
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
|
||||
|
||||
#define REDISMODULE_TYPE_METHOD_VERSION 1
|
||||
#define REDISMODULE_TYPE_METHOD_VERSION 2
|
||||
typedef struct RedisModuleTypeMethods {
|
||||
uint64_t version;
|
||||
RedisModuleTypeLoadFunc rdb_load;
|
||||
@ -186,6 +192,9 @@ typedef struct RedisModuleTypeMethods {
|
||||
RedisModuleTypeMemUsageFunc mem_usage;
|
||||
RedisModuleTypeDigestFunc digest;
|
||||
RedisModuleTypeFreeFunc free;
|
||||
RedisModuleTypeAuxLoadFunc aux_load;
|
||||
RedisModuleTypeAuxSaveFunc aux_save;
|
||||
int aux_save_triggers;
|
||||
} RedisModuleTypeMethods;
|
||||
|
||||
#define REDISMODULE_GET_API(name) \
|
||||
|
@ -92,6 +92,7 @@ static const rio rioBufferIO = {
|
||||
rioBufferFlush,
|
||||
NULL, /* update_checksum */
|
||||
0, /* current checksum */
|
||||
0, /* flags */
|
||||
0, /* bytes read or written */
|
||||
0, /* read/write chunk size */
|
||||
{ { NULL, 0 } } /* union for io-specific vars */
|
||||
@ -145,6 +146,7 @@ static const rio rioFileIO = {
|
||||
rioFileFlush,
|
||||
NULL, /* update_checksum */
|
||||
0, /* current checksum */
|
||||
0, /* flags */
|
||||
0, /* bytes read or written */
|
||||
0, /* read/write chunk size */
|
||||
{ { NULL, 0 } } /* union for io-specific vars */
|
||||
@ -239,6 +241,7 @@ static const rio rioFdIO = {
|
||||
rioFdFlush,
|
||||
NULL, /* update_checksum */
|
||||
0, /* current checksum */
|
||||
0, /* flags */
|
||||
0, /* bytes read or written */
|
||||
0, /* read/write chunk size */
|
||||
{ { NULL, 0 } } /* union for io-specific vars */
|
||||
@ -374,6 +377,7 @@ static const rio rioFdsetIO = {
|
||||
rioFdsetFlush,
|
||||
NULL, /* update_checksum */
|
||||
0, /* current checksum */
|
||||
0, /* flags */
|
||||
0, /* bytes read or written */
|
||||
0, /* read/write chunk size */
|
||||
{ { NULL, 0 } } /* union for io-specific vars */
|
||||
|
35
src/rio.h
35
src/rio.h
@ -1,6 +1,6 @@
|
||||
/*
|
||||
* Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
|
||||
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
|
||||
* Copyright (c) 2009-2019, Salvatore Sanfilippo <antirez at gmail dot com>
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
@ -36,6 +36,9 @@
|
||||
#include <stdint.h>
|
||||
#include "sds.h"
|
||||
|
||||
#define RIO_FLAG_READ_ERROR (1<<0)
|
||||
#define RIO_FLAG_WRITE_ERROR (1<<1)
|
||||
|
||||
struct _rio {
|
||||
/* Backend functions.
|
||||
* Since this functions do not tolerate short writes or reads the return
|
||||
@ -51,8 +54,8 @@ struct _rio {
|
||||
* computation. */
|
||||
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
|
||||
|
||||
/* The current checksum */
|
||||
uint64_t cksum;
|
||||
/* The current checksum and flags (see RIO_FLAG_*) */
|
||||
uint64_t cksum, flags;
|
||||
|
||||
/* number of bytes read or written */
|
||||
size_t processed_bytes;
|
||||
@ -99,11 +102,14 @@ typedef struct _rio rio;
|
||||
* if needed. */
|
||||
|
||||
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
|
||||
if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
|
||||
while (len) {
|
||||
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
|
||||
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
|
||||
if (r->write(r,buf,bytes_to_write) == 0)
|
||||
if (r->write(r,buf,bytes_to_write) == 0) {
|
||||
r->flags |= RIO_FLAG_WRITE_ERROR;
|
||||
return 0;
|
||||
}
|
||||
buf = (char*)buf + bytes_to_write;
|
||||
len -= bytes_to_write;
|
||||
r->processed_bytes += bytes_to_write;
|
||||
@ -112,10 +118,13 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
|
||||
}
|
||||
|
||||
static inline size_t rioRead(rio *r, void *buf, size_t len) {
|
||||
if (r->flags & RIO_FLAG_READ_ERROR) return 0;
|
||||
while (len) {
|
||||
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
|
||||
if (r->read(r,buf,bytes_to_read) == 0)
|
||||
if (r->read(r,buf,bytes_to_read) == 0) {
|
||||
r->flags |= RIO_FLAG_READ_ERROR;
|
||||
return 0;
|
||||
}
|
||||
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
|
||||
buf = (char*)buf + bytes_to_read;
|
||||
len -= bytes_to_read;
|
||||
@ -132,6 +141,22 @@ static inline int rioFlush(rio *r) {
|
||||
return r->flush(r);
|
||||
}
|
||||
|
||||
/* This function allows to know if there was a read error in any past
|
||||
* operation, since the rio stream was created or since the last call
|
||||
* to rioClearError(). */
|
||||
static inline int rioGetReadError(rio *r) {
|
||||
return (r->flags & RIO_FLAG_READ_ERROR) != 0;
|
||||
}
|
||||
|
||||
/* Like rioGetReadError() but for write errors. */
|
||||
static inline int rioGetWriteError(rio *r) {
|
||||
return (r->flags & RIO_FLAG_READ_ERROR) != 0;
|
||||
}
|
||||
|
||||
static inline void rioClearErrors(rio *r) {
|
||||
r->flags &= ~(RIO_FLAG_READ_ERROR|RIO_FLAG_WRITE_ERROR);
|
||||
}
|
||||
|
||||
void rioInitWithFile(rio *r, FILE *fp);
|
||||
void rioInitWithBuffer(rio *r, sds s);
|
||||
void rioInitWithFd(rio *r, int fd, size_t read_limit);
|
||||
|
13
src/server.c
13
src/server.c
@ -2403,6 +2403,9 @@ void initServerConfig(void) {
|
||||
/* Latency monitor */
|
||||
server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD;
|
||||
|
||||
/* Tracking. */
|
||||
server.tracking_table_max_fill = CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL;
|
||||
|
||||
/* Debugging */
|
||||
server.assert_failed = "<no assertion failed>";
|
||||
server.assert_file = "<no file>";
|
||||
@ -3401,6 +3404,10 @@ int processCommand(client *c) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Make sure to use a reasonable amount of memory for client side
|
||||
* caching metadata. */
|
||||
if (server.tracking_clients) trackingLimitUsedSlots();
|
||||
|
||||
/* Don't accept write commands if there are problems persisting on disk
|
||||
* and if this is a master instance. */
|
||||
int deny_write_type = writeCommandsDeniedByDiskError();
|
||||
@ -4140,7 +4147,8 @@ sds genRedisInfoString(char *section) {
|
||||
"active_defrag_hits:%lld\r\n"
|
||||
"active_defrag_misses:%lld\r\n"
|
||||
"active_defrag_key_hits:%lld\r\n"
|
||||
"active_defrag_key_misses:%lld\r\n",
|
||||
"active_defrag_key_misses:%lld\r\n"
|
||||
"tracking_used_slots:%lld\r\n",
|
||||
server.stat_numconnections,
|
||||
server.stat_numcommands,
|
||||
getInstantaneousMetric(STATS_METRIC_COMMAND),
|
||||
@ -4166,7 +4174,8 @@ sds genRedisInfoString(char *section) {
|
||||
server.stat_active_defrag_hits,
|
||||
server.stat_active_defrag_misses,
|
||||
server.stat_active_defrag_key_hits,
|
||||
server.stat_active_defrag_key_misses);
|
||||
server.stat_active_defrag_key_misses,
|
||||
trackingGetUsedSlots());
|
||||
}
|
||||
|
||||
/* Replication */
|
||||
|
15
src/server.h
15
src/server.h
@ -171,6 +171,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */
|
||||
#define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */
|
||||
#define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */
|
||||
#define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */
|
||||
|
||||
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
|
||||
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
|
||||
@ -536,6 +537,10 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define REDISMODULE_TYPE_ENCVER(id) (id & REDISMODULE_TYPE_ENCVER_MASK)
|
||||
#define REDISMODULE_TYPE_SIGN(id) ((id & ~((uint64_t)REDISMODULE_TYPE_ENCVER_MASK)) >>REDISMODULE_TYPE_ENCVER_BITS)
|
||||
|
||||
/* Bit flags for moduleTypeAuxSaveFunc */
|
||||
#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
|
||||
#define REDISMODULE_AUX_AFTER_RDB (1<<1)
|
||||
|
||||
struct RedisModule;
|
||||
struct RedisModuleIO;
|
||||
struct RedisModuleDigest;
|
||||
@ -548,6 +553,8 @@ struct redisObject;
|
||||
* is deleted. */
|
||||
typedef void *(*moduleTypeLoadFunc)(struct RedisModuleIO *io, int encver);
|
||||
typedef void (*moduleTypeSaveFunc)(struct RedisModuleIO *io, void *value);
|
||||
typedef int (*moduleTypeAuxLoadFunc)(struct RedisModuleIO *rdb, int encver, int when);
|
||||
typedef void (*moduleTypeAuxSaveFunc)(struct RedisModuleIO *rdb, int when);
|
||||
typedef void (*moduleTypeRewriteFunc)(struct RedisModuleIO *io, struct redisObject *key, void *value);
|
||||
typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *value);
|
||||
typedef size_t (*moduleTypeMemUsageFunc)(const void *value);
|
||||
@ -564,6 +571,9 @@ typedef struct RedisModuleType {
|
||||
moduleTypeMemUsageFunc mem_usage;
|
||||
moduleTypeDigestFunc digest;
|
||||
moduleTypeFreeFunc free;
|
||||
moduleTypeAuxLoadFunc aux_load;
|
||||
moduleTypeAuxSaveFunc aux_save;
|
||||
int aux_save_triggers;
|
||||
char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */
|
||||
} moduleType;
|
||||
|
||||
@ -1316,6 +1326,7 @@ struct redisServer {
|
||||
list *ready_keys; /* List of readyList structures for BLPOP & co */
|
||||
/* Client side caching. */
|
||||
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
|
||||
int tracking_table_max_fill; /* Max fill percentage. */
|
||||
/* Sort parameters - qsort_r() is only available under BSD so we
|
||||
* have to take this state global, in order to pass it to sortCompare() */
|
||||
int sort_desc;
|
||||
@ -1528,6 +1539,7 @@ void moduleAcquireGIL(void);
|
||||
void moduleReleaseGIL(void);
|
||||
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
|
||||
void moduleCallCommandFilters(client *c);
|
||||
ssize_t rdbSaveModulesAux(rio *rdb, int when);
|
||||
int moduleAllDatatypesHandleErrors();
|
||||
|
||||
/* Utils */
|
||||
@ -1639,6 +1651,9 @@ void enableTracking(client *c, uint64_t redirect_to);
|
||||
void disableTracking(client *c);
|
||||
void trackingRememberKeys(client *c);
|
||||
void trackingInvalidateKey(robj *keyobj);
|
||||
void trackingInvalidateKeysOnFlush(int dbid);
|
||||
void trackingLimitUsedSlots(void);
|
||||
unsigned long long trackingGetUsedSlots(void);
|
||||
|
||||
/* List data type */
|
||||
void listTypeTryConversion(robj *subject, robj *value);
|
||||
|
@ -109,5 +109,6 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
|
||||
streamNACK *streamCreateNACK(streamConsumer *consumer);
|
||||
void streamDecodeID(void *buf, streamID *id);
|
||||
int streamCompareID(streamID *a, streamID *b);
|
||||
void streamFreeNACK(streamNACK *na);
|
||||
|
||||
#endif
|
||||
|
@ -1357,9 +1357,8 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
|
||||
/* Optimize: check if the element is too large or the list
|
||||
* becomes too long *before* executing zzlInsert. */
|
||||
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
|
||||
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
|
||||
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
|
||||
if (sdslen(ele) > server.zset_max_ziplist_value)
|
||||
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
|
||||
sdslen(ele) > server.zset_max_ziplist_value)
|
||||
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
|
||||
if (newscore) *newscore = score;
|
||||
*flags |= ZADD_ADDED;
|
||||
|
211
src/tracking.c
211
src/tracking.c
@ -60,6 +60,7 @@
|
||||
* use the most significant bits instead of the full 24 bits. */
|
||||
#define TRACKING_TABLE_SIZE (1<<24)
|
||||
rax **TrackingTable = NULL;
|
||||
unsigned long TrackingTableUsedSlots = 0;
|
||||
robj *TrackingChannelName;
|
||||
|
||||
/* Remove the tracking state from the client 'c'. Note that there is not much
|
||||
@ -109,67 +110,187 @@ void trackingRememberKeys(client *c) {
|
||||
sds sdskey = c->argv[idx]->ptr;
|
||||
uint64_t hash = crc64(0,
|
||||
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
|
||||
if (TrackingTable[hash] == NULL)
|
||||
if (TrackingTable[hash] == NULL) {
|
||||
TrackingTable[hash] = raxNew();
|
||||
TrackingTableUsedSlots++;
|
||||
}
|
||||
raxTryInsert(TrackingTable[hash],
|
||||
(unsigned char*)&c->id,sizeof(c->id),NULL,NULL);
|
||||
}
|
||||
getKeysFreeResult(keys);
|
||||
}
|
||||
|
||||
/* This function is called from signalModifiedKey() or other places in Redis
|
||||
* when a key changes value. In the context of keys tracking, our task here is
|
||||
* to send a notification to every client that may have keys about such . */
|
||||
void trackingInvalidateKey(robj *keyobj) {
|
||||
sds sdskey = keyobj->ptr;
|
||||
uint64_t hash = crc64(0,
|
||||
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
|
||||
if (TrackingTable == NULL || TrackingTable[hash] == NULL) return;
|
||||
void sendTrackingMessage(client *c, long long hash) {
|
||||
int using_redirection = 0;
|
||||
if (c->client_tracking_redirection) {
|
||||
client *redir = lookupClientByID(c->client_tracking_redirection);
|
||||
if (!redir) {
|
||||
/* We need to signal to the original connection that we
|
||||
* are unable to send invalidation messages to the redirected
|
||||
* connection, because the client no longer exist. */
|
||||
if (c->resp > 2) {
|
||||
addReplyPushLen(c,3);
|
||||
addReplyBulkCBuffer(c,"tracking-redir-broken",21);
|
||||
addReplyLongLong(c,c->client_tracking_redirection);
|
||||
}
|
||||
return;
|
||||
}
|
||||
c = redir;
|
||||
using_redirection = 1;
|
||||
}
|
||||
|
||||
/* Only send such info for clients in RESP version 3 or more. However
|
||||
* if redirection is active, and the connection we redirect to is
|
||||
* in Pub/Sub mode, we can support the feature with RESP 2 as well,
|
||||
* by sending Pub/Sub messages in the __redis__:invalidate channel. */
|
||||
if (c->resp > 2) {
|
||||
addReplyPushLen(c,2);
|
||||
addReplyBulkCBuffer(c,"invalidate",10);
|
||||
addReplyLongLong(c,hash);
|
||||
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
|
||||
robj *msg = createStringObjectFromLongLong(hash);
|
||||
addReplyPubsubMessage(c,TrackingChannelName,msg);
|
||||
decrRefCount(msg);
|
||||
}
|
||||
}
|
||||
|
||||
/* Invalidates a caching slot: this is actually the low level implementation
|
||||
* of the API that Redis calls externally, that is trackingInvalidateKey(). */
|
||||
void trackingInvalidateSlot(uint64_t slot) {
|
||||
if (TrackingTable == NULL || TrackingTable[slot] == NULL) return;
|
||||
|
||||
raxIterator ri;
|
||||
raxStart(&ri,TrackingTable[hash]);
|
||||
raxStart(&ri,TrackingTable[slot]);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
while(raxNext(&ri)) {
|
||||
uint64_t id;
|
||||
memcpy(&id,ri.key,ri.key_len);
|
||||
client *c = lookupClientByID(id);
|
||||
if (c == NULL) continue;
|
||||
int using_redirection = 0;
|
||||
if (c->client_tracking_redirection) {
|
||||
client *redir = lookupClientByID(c->client_tracking_redirection);
|
||||
if (!redir) {
|
||||
/* We need to signal to the original connection that we
|
||||
* are unable to send invalidation messages to the redirected
|
||||
* connection, because the client no longer exist. */
|
||||
if (c->resp > 2) {
|
||||
addReplyPushLen(c,3);
|
||||
addReplyBulkCBuffer(c,"tracking-redir-broken",21);
|
||||
addReplyLongLong(c,c->client_tracking_redirection);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
c = redir;
|
||||
using_redirection = 1;
|
||||
}
|
||||
|
||||
/* Only send such info for clients in RESP version 3 or more. However
|
||||
* if redirection is active, and the connection we redirect to is
|
||||
* in Pub/Sub mode, we can support the feature with RESP 2 as well,
|
||||
* by sending Pub/Sub messages in the __redis__:invalidate channel. */
|
||||
if (c->resp > 2) {
|
||||
addReplyPushLen(c,2);
|
||||
addReplyBulkCBuffer(c,"invalidate",10);
|
||||
addReplyLongLong(c,hash);
|
||||
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
|
||||
robj *msg = createStringObjectFromLongLong(hash);
|
||||
addReplyPubsubMessage(c,TrackingChannelName,msg);
|
||||
decrRefCount(msg);
|
||||
}
|
||||
if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
|
||||
sendTrackingMessage(c,slot);
|
||||
}
|
||||
raxStop(&ri);
|
||||
|
||||
/* Free the tracking table: we'll create the radix tree and populate it
|
||||
* again if more keys will be modified in this hash slot. */
|
||||
raxFree(TrackingTable[hash]);
|
||||
TrackingTable[hash] = NULL;
|
||||
* again if more keys will be modified in this caching slot. */
|
||||
raxFree(TrackingTable[slot]);
|
||||
TrackingTable[slot] = NULL;
|
||||
TrackingTableUsedSlots--;
|
||||
}
|
||||
|
||||
/* This function is called from signalModifiedKey() or other places in Redis
|
||||
* when a key changes value. In the context of keys tracking, our task here is
|
||||
* to send a notification to every client that may have keys about such caching
|
||||
* slot. */
|
||||
void trackingInvalidateKey(robj *keyobj) {
|
||||
if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return;
|
||||
|
||||
sds sdskey = keyobj->ptr;
|
||||
uint64_t hash = crc64(0,
|
||||
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
|
||||
trackingInvalidateSlot(hash);
|
||||
}
|
||||
|
||||
/* This function is called when one or all the Redis databases are flushed
|
||||
* (dbid == -1 in case of FLUSHALL). Caching slots are not specific for
|
||||
* each DB but are global: currently what we do is sending a special
|
||||
* notification to clients with tracking enabled, invalidating the caching
|
||||
* slot "-1", which means, "all the keys", in order to avoid flooding clients
|
||||
* with many invalidation messages for all the keys they may hold.
|
||||
*
|
||||
* However trying to flush the tracking table here is very costly:
|
||||
* we need scanning 16 million caching slots in the table to check
|
||||
* if they are used, this introduces a big delay. So what we do is to really
|
||||
* flush the table in the case of FLUSHALL. When a FLUSHDB is called instead
|
||||
* we just send the invalidation message to all the clients, but don't
|
||||
* flush the table: it will slowly get garbage collected as more keys
|
||||
* are modified in the used caching slots. */
|
||||
void trackingInvalidateKeysOnFlush(int dbid) {
|
||||
if (server.tracking_clients) {
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
listRewind(server.clients,&li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
client *c = listNodeValue(ln);
|
||||
if (c->flags & CLIENT_TRACKING) {
|
||||
sendTrackingMessage(c,-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
|
||||
if (dbid == -1 && TrackingTable) {
|
||||
for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) {
|
||||
if (TrackingTable[j] != NULL) {
|
||||
raxFree(TrackingTable[j]);
|
||||
TrackingTable[j] = NULL;
|
||||
TrackingTableUsedSlots--;
|
||||
}
|
||||
}
|
||||
|
||||
/* If there are no clients with tracking enabled, we can even
|
||||
* reclaim the memory used by the table itself. The code assumes
|
||||
* the table is allocated only if there is at least one client alive
|
||||
* with tracking enabled. */
|
||||
if (server.tracking_clients == 0) {
|
||||
zfree(TrackingTable);
|
||||
TrackingTable = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Tracking forces Redis to remember information about which client may have
|
||||
* keys about certian caching slots. In workloads where there are a lot of
|
||||
* reads, but keys are hardly modified, the amount of information we have
|
||||
* to remember server side could be a lot: for each 16 millions of caching
|
||||
* slots we may end with a radix tree containing many entries.
|
||||
*
|
||||
* So Redis allows the user to configure a maximum fill rate for the
|
||||
* invalidation table. This function makes sure that we don't go over the
|
||||
* specified fill rate: if we are over, we can just evict informations about
|
||||
* random caching slots, and send invalidation messages to clients like if
|
||||
* the key was modified. */
|
||||
void trackingLimitUsedSlots(void) {
|
||||
static unsigned int timeout_counter = 0;
|
||||
|
||||
if (server.tracking_table_max_fill == 0) return; /* No limits set. */
|
||||
unsigned int max_slots =
|
||||
(TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill;
|
||||
if (TrackingTableUsedSlots <= max_slots) {
|
||||
timeout_counter = 0;
|
||||
return; /* Limit not reached. */
|
||||
}
|
||||
|
||||
/* We have to invalidate a few slots to reach the limit again. The effort
|
||||
* we do here is proportional to the number of times we entered this
|
||||
* function and found that we are still over the limit. */
|
||||
int effort = 100 * (timeout_counter+1);
|
||||
|
||||
/* Let's start at a random position, and perform linear probing, in order
|
||||
* to improve cache locality. However once we are able to find an used
|
||||
* slot, jump again randomly, in order to avoid creating big holes in the
|
||||
* table (that will make this funciton use more resourced later). */
|
||||
while(effort > 0) {
|
||||
unsigned int idx = rand() % TRACKING_TABLE_SIZE;
|
||||
do {
|
||||
effort--;
|
||||
idx = (idx+1) % TRACKING_TABLE_SIZE;
|
||||
if (TrackingTable[idx] != NULL) {
|
||||
trackingInvalidateSlot(idx);
|
||||
if (TrackingTableUsedSlots <= max_slots) {
|
||||
timeout_counter = 0;
|
||||
return; /* Return ASAP: we are again under the limit. */
|
||||
} else {
|
||||
break; /* Jump to next random position. */
|
||||
}
|
||||
}
|
||||
} while(effort > 0);
|
||||
}
|
||||
timeout_counter++;
|
||||
}
|
||||
|
||||
/* This is just used in order to access the amount of used slots in the
|
||||
* tracking table. */
|
||||
unsigned long long trackingGetUsedSlots(void) {
|
||||
return TrackingTableUsedSlots;
|
||||
}
|
||||
|
@ -192,12 +192,6 @@ foreach mdl {no yes} {
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
set slaves {}
|
||||
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000]
|
||||
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000]
|
||||
set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000]
|
||||
set load_handle3 [start_write_load $master_host $master_port 8]
|
||||
set load_handle4 [start_write_load $master_host $master_port 4]
|
||||
after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork
|
||||
start_server {} {
|
||||
lappend slaves [srv 0 client]
|
||||
start_server {} {
|
||||
@ -205,6 +199,14 @@ foreach mdl {no yes} {
|
||||
start_server {} {
|
||||
lappend slaves [srv 0 client]
|
||||
test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" {
|
||||
# start load handles only inside the test, so that the test can be skipped
|
||||
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000]
|
||||
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000]
|
||||
set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000]
|
||||
set load_handle3 [start_write_load $master_host $master_port 8]
|
||||
set load_handle4 [start_write_load $master_host $master_port 4]
|
||||
after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork
|
||||
|
||||
# Send SLAVEOF commands to slaves
|
||||
[lindex $slaves 0] config set repl-diskless-load $sdl
|
||||
[lindex $slaves 1] config set repl-diskless-load $sdl
|
||||
@ -278,9 +280,9 @@ start_server {tags {"repl"}} {
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
set load_handle0 [start_write_load $master_host $master_port 3]
|
||||
start_server {} {
|
||||
test "Master stream is correctly processed while the replica has a script in -BUSY state" {
|
||||
set load_handle0 [start_write_load $master_host $master_port 3]
|
||||
set slave [srv 0 client]
|
||||
$slave config set lua-time-limit 500
|
||||
$slave slaveof $master_host $master_port
|
||||
@ -383,3 +385,84 @@ test {slave fails full sync and diskless load swapdb recoveres it} {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test {diskless loading short read} {
|
||||
start_server {tags {"repl"}} {
|
||||
set replica [srv 0 client]
|
||||
set replica_host [srv 0 host]
|
||||
set replica_port [srv 0 port]
|
||||
start_server {} {
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
|
||||
# Set master and replica to use diskless replication
|
||||
$master config set repl-diskless-sync yes
|
||||
$master config set rdbcompression no
|
||||
$replica config set repl-diskless-load swapdb
|
||||
# Try to fill the master with all types of data types / encodings
|
||||
for {set k 0} {$k < 3} {incr k} {
|
||||
for {set i 0} {$i < 10} {incr i} {
|
||||
r set "$k int_$i" [expr {int(rand()*10000)}]
|
||||
r expire "$k int_$i" [expr {int(rand()*10000)}]
|
||||
r set "$k string_$i" [string repeat A [expr {int(rand()*1000000)}]]
|
||||
r hset "$k hash_small" [string repeat A [expr {int(rand()*10)}]] 0[string repeat A [expr {int(rand()*10)}]]
|
||||
r hset "$k hash_large" [string repeat A [expr {int(rand()*10000)}]] [string repeat A [expr {int(rand()*1000000)}]]
|
||||
r sadd "$k set_small" [string repeat A [expr {int(rand()*10)}]]
|
||||
r sadd "$k set_large" [string repeat A [expr {int(rand()*1000000)}]]
|
||||
r zadd "$k zset_small" [expr {rand()}] [string repeat A [expr {int(rand()*10)}]]
|
||||
r zadd "$k zset_large" [expr {rand()}] [string repeat A [expr {int(rand()*1000000)}]]
|
||||
r lpush "$k list_small" [string repeat A [expr {int(rand()*10)}]]
|
||||
r lpush "$k list_large" [string repeat A [expr {int(rand()*1000000)}]]
|
||||
for {set j 0} {$j < 10} {incr j} {
|
||||
r xadd "$k stream" * foo "asdf" bar "1234"
|
||||
}
|
||||
r xgroup create "$k stream" "mygroup_$i" 0
|
||||
r xreadgroup GROUP "mygroup_$i" Alice COUNT 1 STREAMS "$k stream" >
|
||||
}
|
||||
}
|
||||
|
||||
# Start the replication process...
|
||||
$master config set repl-diskless-sync-delay 0
|
||||
$replica replicaof $master_host $master_port
|
||||
|
||||
# kill the replication at various points
|
||||
set attempts 3
|
||||
if {$::accurate} { set attempts 10 }
|
||||
for {set i 0} {$i < $attempts} {incr i} {
|
||||
# wait for the replica to start reading the rdb
|
||||
# using the log file since the replica only responds to INFO once in 2mb
|
||||
wait_for_log_message -1 "*Loading DB in memory*" 5 2000 1
|
||||
|
||||
# add some additional random sleep so that we kill the master on a different place each time
|
||||
after [expr {int(rand()*100)}]
|
||||
|
||||
# kill the replica connection on the master
|
||||
set killed [$master client kill type replica]
|
||||
|
||||
if {[catch {
|
||||
set res [wait_for_log_message -1 "*Internal error in RDB*" 5 100 10]
|
||||
if {$::verbose} {
|
||||
puts $res
|
||||
}
|
||||
}]} {
|
||||
puts "failed triggering short read"
|
||||
# force the replica to try another full sync
|
||||
$master client kill type replica
|
||||
$master set asdf asdf
|
||||
# the side effect of resizing the backlog is that it is flushed (16k is the min size)
|
||||
$master config set repl-backlog-size [expr {16384 + $i}]
|
||||
}
|
||||
# wait for loading to stop (fail)
|
||||
wait_for_condition 100 10 {
|
||||
[s -1 loading] eq 0
|
||||
} else {
|
||||
fail "Replica didn't disconnect"
|
||||
}
|
||||
}
|
||||
# enable fast shutdown
|
||||
$master config set rdb-key-save-delay 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -13,12 +13,16 @@ endif
|
||||
|
||||
.SUFFIXES: .c .so .xo .o
|
||||
|
||||
all: commandfilter.so
|
||||
all: commandfilter.so testrdb.so
|
||||
|
||||
.c.xo:
|
||||
$(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@
|
||||
|
||||
commandfilter.xo: ../../src/redismodule.h
|
||||
testrdb.xo: ../../src/redismodule.h
|
||||
|
||||
commandfilter.so: commandfilter.xo
|
||||
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
|
||||
|
||||
testrdb.so: testrdb.xo
|
||||
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
|
||||
|
229
tests/modules/testrdb.c
Normal file
229
tests/modules/testrdb.c
Normal file
@ -0,0 +1,229 @@
|
||||
#include "redismodule.h"
|
||||
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
|
||||
/* Module configuration, save aux or not? */
|
||||
long long conf_aux_count = 0;
|
||||
|
||||
/* Registered type */
|
||||
RedisModuleType *testrdb_type = NULL;
|
||||
|
||||
/* Global values to store and persist to aux */
|
||||
RedisModuleString *before_str = NULL;
|
||||
RedisModuleString *after_str = NULL;
|
||||
|
||||
void *testrdb_type_load(RedisModuleIO *rdb, int encver) {
|
||||
int count = RedisModule_LoadSigned(rdb);
|
||||
assert(count==1);
|
||||
assert(encver==1);
|
||||
RedisModuleString *str = RedisModule_LoadString(rdb);
|
||||
return str;
|
||||
}
|
||||
|
||||
void testrdb_type_save(RedisModuleIO *rdb, void *value) {
|
||||
RedisModuleString *str = (RedisModuleString*)value;
|
||||
RedisModule_SaveSigned(rdb, 1);
|
||||
RedisModule_SaveString(rdb, str);
|
||||
}
|
||||
|
||||
void testrdb_aux_save(RedisModuleIO *rdb, int when) {
|
||||
if (conf_aux_count==1) assert(when == REDISMODULE_AUX_AFTER_RDB);
|
||||
if (conf_aux_count==0) assert(0);
|
||||
if (when == REDISMODULE_AUX_BEFORE_RDB) {
|
||||
if (before_str) {
|
||||
RedisModule_SaveSigned(rdb, 1);
|
||||
RedisModule_SaveString(rdb, before_str);
|
||||
} else {
|
||||
RedisModule_SaveSigned(rdb, 0);
|
||||
}
|
||||
} else {
|
||||
if (after_str) {
|
||||
RedisModule_SaveSigned(rdb, 1);
|
||||
RedisModule_SaveString(rdb, after_str);
|
||||
} else {
|
||||
RedisModule_SaveSigned(rdb, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) {
|
||||
assert(encver == 1);
|
||||
if (conf_aux_count==1) assert(when == REDISMODULE_AUX_AFTER_RDB);
|
||||
if (conf_aux_count==0) assert(0);
|
||||
RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb);
|
||||
if (when == REDISMODULE_AUX_BEFORE_RDB) {
|
||||
if (before_str)
|
||||
RedisModule_FreeString(ctx, before_str);
|
||||
before_str = NULL;
|
||||
int count = RedisModule_LoadSigned(rdb);
|
||||
if (count)
|
||||
before_str = RedisModule_LoadString(rdb);
|
||||
} else {
|
||||
if (after_str)
|
||||
RedisModule_FreeString(ctx, after_str);
|
||||
after_str = NULL;
|
||||
int count = RedisModule_LoadSigned(rdb);
|
||||
if (count)
|
||||
after_str = RedisModule_LoadString(rdb);
|
||||
}
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
void testrdb_type_free(void *value) {
|
||||
RedisModule_FreeString(NULL, (RedisModuleString*)value);
|
||||
}
|
||||
|
||||
int testrdb_set_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
if (argc != 2) {
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
if (before_str)
|
||||
RedisModule_FreeString(ctx, before_str);
|
||||
before_str = argv[1];
|
||||
RedisModule_RetainString(ctx, argv[1]);
|
||||
RedisModule_ReplyWithLongLong(ctx, 1);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int testrdb_get_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
if (argc != 1){
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
if (before_str)
|
||||
RedisModule_ReplyWithString(ctx, before_str);
|
||||
else
|
||||
RedisModule_ReplyWithStringBuffer(ctx, "", 0);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int testrdb_set_after(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
if (argc != 2){
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
if (after_str)
|
||||
RedisModule_FreeString(ctx, after_str);
|
||||
after_str = argv[1];
|
||||
RedisModule_RetainString(ctx, argv[1]);
|
||||
RedisModule_ReplyWithLongLong(ctx, 1);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int testrdb_get_after(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
if (argc != 1){
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
if (after_str)
|
||||
RedisModule_ReplyWithString(ctx, after_str);
|
||||
else
|
||||
RedisModule_ReplyWithStringBuffer(ctx, "", 0);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int testrdb_set_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
if (argc != 3){
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
|
||||
RedisModuleString *str = RedisModule_ModuleTypeGetValue(key);
|
||||
if (str)
|
||||
RedisModule_FreeString(ctx, str);
|
||||
RedisModule_ModuleTypeSetValue(key, testrdb_type, argv[2]);
|
||||
RedisModule_RetainString(ctx, argv[2]);
|
||||
RedisModule_CloseKey(key);
|
||||
RedisModule_ReplyWithLongLong(ctx, 1);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int testrdb_get_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
if (argc != 2){
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
|
||||
RedisModuleString *str = RedisModule_ModuleTypeGetValue(key);
|
||||
RedisModule_CloseKey(key);
|
||||
RedisModule_ReplyWithString(ctx, str);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
|
||||
if (RedisModule_Init(ctx,"testrdb",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (argc > 0)
|
||||
RedisModule_StringToLongLong(argv[0], &conf_aux_count);
|
||||
|
||||
if (conf_aux_count==0) {
|
||||
RedisModuleTypeMethods datatype_methods = {
|
||||
.version = 1,
|
||||
.rdb_load = testrdb_type_load,
|
||||
.rdb_save = testrdb_type_save,
|
||||
.aof_rewrite = NULL,
|
||||
.digest = NULL,
|
||||
.free = testrdb_type_free,
|
||||
};
|
||||
|
||||
testrdb_type = RedisModule_CreateDataType(ctx, "test__rdb", 1, &datatype_methods);
|
||||
if (testrdb_type == NULL)
|
||||
return REDISMODULE_ERR;
|
||||
} else {
|
||||
RedisModuleTypeMethods datatype_methods = {
|
||||
.version = REDISMODULE_TYPE_METHOD_VERSION,
|
||||
.rdb_load = testrdb_type_load,
|
||||
.rdb_save = testrdb_type_save,
|
||||
.aof_rewrite = NULL,
|
||||
.digest = NULL,
|
||||
.free = testrdb_type_free,
|
||||
.aux_load = testrdb_aux_load,
|
||||
.aux_save = testrdb_aux_save,
|
||||
.aux_save_triggers = (conf_aux_count == 1 ?
|
||||
REDISMODULE_AUX_AFTER_RDB :
|
||||
REDISMODULE_AUX_BEFORE_RDB | REDISMODULE_AUX_AFTER_RDB)
|
||||
};
|
||||
|
||||
testrdb_type = RedisModule_CreateDataType(ctx, "test__rdb", 1, &datatype_methods);
|
||||
if (testrdb_type == NULL)
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"testrdb.set.before", testrdb_set_before,"deny-oom",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"testrdb.get.before", testrdb_get_before,"",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"testrdb.set.after", testrdb_set_after,"deny-oom",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"testrdb.get.after", testrdb_get_after,"",0,0,0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"testrdb.set.key", testrdb_set_key,"deny-oom",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"testrdb.get.key", testrdb_get_key,"",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
@ -99,6 +99,25 @@ proc wait_for_ofs_sync {r1 r2} {
|
||||
}
|
||||
}
|
||||
|
||||
proc wait_for_log_message {srv_idx pattern last_lines maxtries delay} {
|
||||
set retry $maxtries
|
||||
set stdout [srv $srv_idx stdout]
|
||||
while {$retry} {
|
||||
set result [exec tail -$last_lines < $stdout]
|
||||
set result [split $result "\n"]
|
||||
foreach line $result {
|
||||
if {[string match $pattern $line]} {
|
||||
return $line
|
||||
}
|
||||
}
|
||||
incr retry -1
|
||||
after $delay
|
||||
}
|
||||
if {$retry == 0} {
|
||||
fail "log message of '$pattern' not found"
|
||||
}
|
||||
}
|
||||
|
||||
# Random integer between 0 and max (excluded).
|
||||
proc randomInt {max} {
|
||||
expr {int(rand()*$max)}
|
||||
|
@ -61,6 +61,7 @@ set regression_vectors {
|
||||
{939895 151 59.149620271823181 65.204186651485145}
|
||||
{1412 156 149.29737817929004 15.95807862745508}
|
||||
{564862 149 84.062063109158544 -65.685403922426232}
|
||||
{1546032440391 16751 -1.8175081637769495 20.665668878082954}
|
||||
}
|
||||
set rv_idx 0
|
||||
|
||||
@ -274,8 +275,19 @@ start_server {tags {"geo"}} {
|
||||
foreach place $diff {
|
||||
set mydist [geo_distance $lon $lat $search_lon $search_lat]
|
||||
set mydist [expr $mydist/1000]
|
||||
if {($mydist / $radius_km) > 0.999} {incr rounding_errors}
|
||||
if {($mydist / $radius_km) > 0.999} {
|
||||
incr rounding_errors
|
||||
continue
|
||||
}
|
||||
if {$mydist < $radius_m} {
|
||||
# This is a false positive for redis since given the
|
||||
# same points the higher precision calculation provided
|
||||
# by TCL shows the point within range
|
||||
incr rounding_errors
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
# Make sure this is a real error and not a rounidng issue.
|
||||
if {[llength $diff] == $rounding_errors} {
|
||||
set res $res2; # Error silenced
|
||||
|
62
tests/unit/moduleapi/testrdb.tcl
Normal file
62
tests/unit/moduleapi/testrdb.tcl
Normal file
@ -0,0 +1,62 @@
|
||||
set testmodule [file normalize tests/modules/testrdb.so]
|
||||
|
||||
proc restart_and_wait {} {
|
||||
catch {
|
||||
r debug restart
|
||||
}
|
||||
|
||||
# wait for the server to come back up
|
||||
set retry 50
|
||||
while {$retry} {
|
||||
if {[catch { r ping }]} {
|
||||
after 100
|
||||
} else {
|
||||
break
|
||||
}
|
||||
incr retry -1
|
||||
}
|
||||
}
|
||||
|
||||
tags "modules" {
|
||||
start_server [list overrides [list loadmodule "$testmodule"]] {
|
||||
test {modules are able to persist types} {
|
||||
r testrdb.set.key key1 value1
|
||||
assert_equal "value1" [r testrdb.get.key key1]
|
||||
r debug reload
|
||||
assert_equal "value1" [r testrdb.get.key key1]
|
||||
}
|
||||
|
||||
test {modules global are lost without aux} {
|
||||
r testrdb.set.before global1
|
||||
assert_equal "global1" [r testrdb.get.before]
|
||||
restart_and_wait
|
||||
assert_equal "" [r testrdb.get.before]
|
||||
}
|
||||
}
|
||||
|
||||
start_server [list overrides [list loadmodule "$testmodule 2"]] {
|
||||
test {modules are able to persist globals before and after} {
|
||||
r testrdb.set.before global1
|
||||
r testrdb.set.after global2
|
||||
assert_equal "global1" [r testrdb.get.before]
|
||||
assert_equal "global2" [r testrdb.get.after]
|
||||
restart_and_wait
|
||||
assert_equal "global1" [r testrdb.get.before]
|
||||
assert_equal "global2" [r testrdb.get.after]
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
start_server [list overrides [list loadmodule "$testmodule 1"]] {
|
||||
test {modules are able to persist globals just after} {
|
||||
r testrdb.set.after global2
|
||||
assert_equal "global2" [r testrdb.get.after]
|
||||
restart_and_wait
|
||||
assert_equal "global2" [r testrdb.get.after]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# TODO: test short read handling
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user