Merge branch 'keydbpro' into 'add_ci'

# Conflicts:
#   .gitlab-ci.yml

Former-commit-id: 828924d67d622483fccf570649fc81aa3d78673b
This commit is contained in:
Malavan Sotheeswaran 2021-07-19 16:16:36 +00:00
commit 57c97e0b5b
28 changed files with 261 additions and 100 deletions

View File

@ -11,6 +11,40 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
SECURITY: There are security fixes in the release. SECURITY: There are security fixes in the release.
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
================================================================================
Redis 6.2.3 Released Mon May 3 19:00:00 IST 2021
================================================================================
Upgrade urgency: SECURITY, Contains fixes to security issues that affect
authenticated client connections. LOW otherwise.
Integer overflow in STRALGO LCS command (CVE-2021-29477):
An integer overflow bug in Redis version 6.0 or newer could be exploited using
the STRALGO LCS command to corrupt the heap and potentially result in remote
code execution. The integer overflow bug exists in all versions of Redis
starting with 6.0.
Integer overflow in COPY command for large intsets (CVE-2021-29478):
An integer overflow bug in Redis 6.2 could be exploited to corrupt the heap and
potentially result with remote code execution. The vulnerability involves
changing the default set-max-intset-entries configuration value, creating a
large set key that consists of integer values and using the COPY command to
duplicate it. The integer overflow bug exists in all versions of Redis starting
with 2.6, where it could result with a corrupted RDB or DUMP payload, but not
exploited through COPY (which did not exist before 6.2).
Bug fixes that are only applicable to previous releases of Redis 6.2:
* Fix memory leak in moduleDefragGlobals (#8853)
* Fix memory leak when doing lazy freeing client tracking table (#8822)
* Block abusive replicas from sending command that could assert and crash redis (#8868)
Other bug fixes:
* Use a monotonic clock to check for Lua script timeout (#8812)
* redis-cli: Do not use unix socket when we got redirected in cluster mode (#8870)
Modules:
* Fix RM_GetClusterNodeInfo() to correctly populate master id (#8846)
================================================================================ ================================================================================
Redis 6.2.2 Released Mon April 19 19:00:00 IST 2021 Redis 6.2.2 Released Mon April 19 19:00:00 IST 2021
================================================================================ ================================================================================

View File

@ -241,7 +241,7 @@ iget_defrag_hint(tsdn_t *tsdn, void* ptr) {
int free_in_slab = extent_nfree_get(slab); int free_in_slab = extent_nfree_get(slab);
if (free_in_slab) { if (free_in_slab) {
const bin_info_t *bin_info = &bin_infos[binind]; const bin_info_t *bin_info = &bin_infos[binind];
int curslabs = binshard->stats.curslabs; ssize_t curslabs = binshard->stats.curslabs;
size_t curregs = binshard->stats.curregs; size_t curregs = binshard->stats.curregs;
if (binshard->slabcur) { if (binshard->slabcur) {
/* remove slabcur from the overall utilization */ /* remove slabcur from the overall utilization */

View File

@ -140,6 +140,10 @@ DEBUG=-g -ggdb
ifneq ($(uname_S),Darwin) ifneq ($(uname_S),Darwin)
FINAL_LIBS+=-latomic FINAL_LIBS+=-latomic
endif endif
# Linux ARM32 needs -latomic at linking time
ifneq (,$(findstring armv,$(uname_M)))
FINAL_LIBS+=-latomic
endif
ifeq ($(uname_S),SunOS) ifeq ($(uname_S),SunOS)

View File

@ -25,6 +25,12 @@ StorageCache::StorageCache(IStorage *storage, bool fCache)
m_pdict = dictCreate(&dbStorageCacheType, nullptr); m_pdict = dictCreate(&dbStorageCacheType, nullptr);
} }
StorageCache::~StorageCache()
{
if (m_pdict != nullptr)
dictRelease(m_pdict);
}
void StorageCache::clear() void StorageCache::clear()
{ {
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);

View File

@ -29,6 +29,8 @@ class StorageCache
} }
public: public:
~StorageCache();
static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) { static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) {
StorageCache *cache = new StorageCache(nullptr, pfactory->FSlow() /*fCache*/); StorageCache *cache = new StorageCache(nullptr, pfactory->FSlow() /*fCache*/);
load_iter_data data = {cache, fn, privdata}; load_iter_data data = {cache, fn, privdata};

View File

@ -5588,9 +5588,10 @@ try_again:
if (ttl < 1) ttl = 1; if (ttl < 1) ttl = 1;
} }
/* Relocate valid (non expired) keys into the array in successive /* Relocate valid (non expired) keys and values into the array in successive
* positions to remove holes created by the keys that were present * positions to remove holes created by the keys that were present
* in the first lookup but are now expired after the second lookup. */ * in the first lookup but are now expired after the second lookup. */
ov[non_expired] = ov[j];
kv[non_expired++] = kv[j]; kv[non_expired++] = kv[j];
serverAssertWithInfo(c,NULL, serverAssertWithInfo(c,NULL,

View File

@ -2470,6 +2470,7 @@ static int updateReplBacklogSize(long long val, long long prev, const char **err
* being able to tell when the size changes, so restore prev before calling it. */ * being able to tell when the size changes, so restore prev before calling it. */
UNUSED(err); UNUSED(err);
g_pserver->repl_backlog_size = prev; g_pserver->repl_backlog_size = prev;
g_pserver->repl_backlog_config_size = val;
resizeReplicationBacklog(val); resizeReplicationBacklog(val);
return 1; return 1;
} }

View File

@ -2018,7 +2018,7 @@ int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key) {
* script execution, making propagation to slaves / AOF consistent. * script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */ * See issue #1525 on Github for more information. */
if (g_pserver->lua_caller) { if (g_pserver->lua_caller) {
now = g_pserver->lua_time_start; now = g_pserver->lua_time_snapshot;
} }
/* If we are in the middle of a command execution, we still want to use /* If we are in the middle of a command execution, we still want to use
* a reference time that does not change: in that case we just use the * a reference time that does not change: in that case we just use the
@ -2079,14 +2079,17 @@ int expireIfNeeded(redisDb *db, robj *key) {
if (checkClientPauseTimeoutAndReturnIfPaused()) return 1; if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;
/* Delete the key */ /* Delete the key */
if (g_pserver->lazyfree_lazy_expire) {
dbAsyncDelete(db,key);
} else {
dbSyncDelete(db,key);
}
g_pserver->stat_expiredkeys++; g_pserver->stat_expiredkeys++;
propagateExpire(db,key,g_pserver->lazyfree_lazy_expire); propagateExpire(db,key,g_pserver->lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED, notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id); "expired",key,db->id);
int retval = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(db,key) : signalModifiedKey(NULL,db,key);
dbSyncDelete(db,key); return 1;
if (retval) signalModifiedKey(NULL,db,key);
return retval;
} }
/* ----------------------------------------------------------------------------- /* -----------------------------------------------------------------------------
@ -2960,13 +2963,13 @@ dict_iter redisDbPersistentData::random()
return dict_iter(m_pdict, de); return dict_iter(m_pdict, de);
} }
size_t redisDbPersistentData::size() const size_t redisDbPersistentData::size(bool fCachedOnly) const
{ {
if (m_spstorage != nullptr && !m_fAllChanged) if (m_spstorage != nullptr && !m_fAllChanged && !fCachedOnly)
return m_spstorage->count() + m_cnewKeysPending; return m_spstorage->count() + m_cnewKeysPending;
return dictSize(m_pdict) return dictSize(m_pdict)
+ (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0); + (m_pdbSnapshot ? (m_pdbSnapshot->size(fCachedOnly) - dictSize(m_pdictTombstone)) : 0);
} }
bool redisDbPersistentData::removeCachedValue(const char *key) bool redisDbPersistentData::removeCachedValue(const char *key)

View File

@ -281,7 +281,7 @@ uint32_t intsetLen(const intset *is) {
/* Return intset blob size in bytes. */ /* Return intset blob size in bytes. */
size_t intsetBlobLen(intset *is) { size_t intsetBlobLen(intset *is) {
return sizeof(intset)+intrev32ifbe(is->length)*intrev32ifbe(is->encoding); return sizeof(intset)+(size_t)intrev32ifbe(is->length)*intrev32ifbe(is->encoding);
} }
/* Validate the integrity of the data structure. /* Validate the integrity of the data structure.

View File

@ -39,12 +39,11 @@ void lazyfreeFreeSlotsMap(void *args[]) {
atomicIncr(lazyfreed_objects,len); atomicIncr(lazyfreed_objects,len);
} }
/* Release the rax mapping Redis Cluster keys to slots in the /* Release the key tracking table. */
* lazyfree thread. */
void lazyFreeTrackingTable(void *args[]) { void lazyFreeTrackingTable(void *args[]) {
rax *rt = (rax*)args[0]; rax *rt = (rax*)args[0];
size_t len = rt->numele; size_t len = rt->numele;
raxFree(rt); freeTrackingRadixTree(rt);
atomicDecr(lazyfree_objects,len); atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len); atomicIncr(lazyfreed_objects,len);
} }

View File

@ -94,8 +94,8 @@ lwCanvas *lwCreateCanvas(int width, int height, int bgcolor) {
lwCanvas *canvas = zmalloc(sizeof(*canvas)); lwCanvas *canvas = zmalloc(sizeof(*canvas));
canvas->width = width; canvas->width = width;
canvas->height = height; canvas->height = height;
canvas->pixels = zmalloc(width*height); canvas->pixels = zmalloc((size_t)width*height);
memset(canvas->pixels,bgcolor,width*height); memset(canvas->pixels,bgcolor,(size_t)width*height);
return canvas; return canvas;
} }

View File

@ -71,7 +71,7 @@ void memtest_progress_start(char *title, int pass) {
printf("\x1b[H\x1b[2K"); /* Cursor home, clear current line. */ printf("\x1b[H\x1b[2K"); /* Cursor home, clear current line. */
printf("%s [%d]\n", title, pass); /* Print title. */ printf("%s [%d]\n", title, pass); /* Print title. */
progress_printed = 0; progress_printed = 0;
progress_full = ws.ws_col*(ws.ws_row-3); progress_full = (size_t)ws.ws_col*(ws.ws_row-3);
fflush(stdout); fflush(stdout);
} }

View File

@ -6367,7 +6367,7 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m
/* If the information is not available, the function will set the /* If the information is not available, the function will set the
* field to zero bytes, so that when the field can't be populated the * field to zero bytes, so that when the field can't be populated the
* function kinda remains predictable. */ * function kinda remains predictable. */
if (node->flags & CLUSTER_NODE_MASTER && node->slaveof) if (node->flags & CLUSTER_NODE_SLAVE && node->slaveof)
memcpy(master_id,node->slaveof->name,REDISMODULE_NODE_ID_LEN); memcpy(master_id,node->slaveof->name,REDISMODULE_NODE_ID_LEN);
else else
memset(master_id,0,REDISMODULE_NODE_ID_LEN); memset(master_id,0,REDISMODULE_NODE_ID_LEN);
@ -9430,6 +9430,7 @@ long moduleDefragGlobals(void) {
module->defrag_cb(&defrag_ctx); module->defrag_cb(&defrag_ctx);
defragged += defrag_ctx.defragged; defragged += defrag_ctx.defragged;
} }
dictReleaseIterator(di);
return defragged; return defragged;
} }

View File

@ -902,7 +902,7 @@ size_t objectComputeSize(robj_roptr o, size_t sample_size) {
if (samples) asize += (double)elesize/samples*dictSize(d); if (samples) asize += (double)elesize/samples*dictSize(d);
} else if (o->encoding == OBJ_ENCODING_INTSET) { } else if (o->encoding == OBJ_ENCODING_INTSET) {
intset *is = (intset*)ptrFromObj(o); intset *is = (intset*)ptrFromObj(o);
asize = sizeof(*o)+sizeof(*is)+is->encoding*is->length; asize = sizeof(*o)+sizeof(*is)+(size_t)is->encoding*is->length;
} else { } else {
serverPanic("Unknown set encoding"); serverPanic("Unknown set encoding");
} }

View File

@ -103,7 +103,6 @@ static struct config {
int randomkeys_keyspacelen; int randomkeys_keyspacelen;
int keepalive; int keepalive;
int pipeline; int pipeline;
int showerrors;
long long start; long long start;
long long totlatency; long long totlatency;
const char *title; const char *title;
@ -313,7 +312,9 @@ static redisContext *getRedisContext(const char *ip, int port,
fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str); fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str);
else else
fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str); fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str);
goto cleanup; freeReplyObject(reply);
redisFree(ctx);
exit(1);
} }
freeReplyObject(reply); freeReplyObject(reply);
return ctx; return ctx;
@ -370,9 +371,15 @@ fail:
fprintf(stderr, "ERROR: failed to fetch CONFIG from "); fprintf(stderr, "ERROR: failed to fetch CONFIG from ");
if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port); if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port);
else fprintf(stderr, "%s\n", hostsocket); else fprintf(stderr, "%s\n", hostsocket);
int abort_test = 0;
if (!strncmp(reply->str,"NOAUTH",5) ||
!strncmp(reply->str,"WRONGPASS",9) ||
!strncmp(reply->str,"NOPERM",5))
abort_test = 1;
freeReplyObject(reply); freeReplyObject(reply);
redisFree(c); redisFree(c);
freeRedisConfig(cfg); freeRedisConfig(cfg);
if (abort_test) exit(1);
return NULL; return NULL;
} }
static void freeRedisConfig(redisConfig *cfg) { static void freeRedisConfig(redisConfig *cfg) {
@ -517,27 +524,13 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
exit(1); exit(1);
} }
redisReply *r = (redisReply*)reply; redisReply *r = (redisReply*)reply;
int is_err = (r->type == REDIS_REPLY_ERROR); if (r->type == REDIS_REPLY_ERROR) {
if (is_err && config.showerrors) {
/* TODO: static lasterr_time not thread-safe */
static time_t lasterr_time = 0;
time_t now = time(NULL);
if (lasterr_time != now) {
lasterr_time = now;
if (c->cluster_node) {
printf("Error from server %s:%d: %s\n",
c->cluster_node->ip,
c->cluster_node->port,
r->str);
} else printf("Error from server: %s\n", r->str);
}
}
/* Try to update slots configuration if reply error is /* Try to update slots configuration if reply error is
* MOVED/ASK/CLUSTERDOWN and the key(s) used by the command * MOVED/ASK/CLUSTERDOWN and the key(s) used by the command
* contain(s) the slot hash tag. */ * contain(s) the slot hash tag.
if (is_err && c->cluster_node && c->staglen) { * If the error is not topology-update related then we
* immediately exit to avoid false results. */
if (c->cluster_node && c->staglen) {
int fetch_slots = 0, do_wait = 0; int fetch_slots = 0, do_wait = 0;
if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3)) if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3))
fetch_slots = 1; fetch_slots = 1;
@ -547,7 +540,7 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
* before requesting the new configuration. */ * before requesting the new configuration. */
fetch_slots = 1; fetch_slots = 1;
do_wait = 1; do_wait = 1;
printf("Error from server %s:%d: %s\n", printf("Error from server %s:%d: %s.\n",
c->cluster_node->ip, c->cluster_node->ip,
c->cluster_node->port, c->cluster_node->port,
r->str); r->str);
@ -555,6 +548,15 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
if (do_wait) sleep(1); if (do_wait) sleep(1);
if (fetch_slots && !fetchClusterSlotsConfiguration(c)) if (fetch_slots && !fetchClusterSlotsConfiguration(c))
exit(1); exit(1);
} else {
if (c->cluster_node) {
printf("Error from server %s:%d: %s\n",
c->cluster_node->ip,
c->cluster_node->port,
r->str);
} else printf("Error from server: %s\n", r->str);
exit(1);
}
} }
freeReplyObject(reply); freeReplyObject(reply);
@ -1302,8 +1304,7 @@ static int fetchClusterSlotsConfiguration(client c) {
atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1); atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1);
if (is_fetching_slots) return -1; //TODO: use other codes || errno ? if (is_fetching_slots) return -1; //TODO: use other codes || errno ?
atomicSet(config.is_fetching_slots, 1); atomicSet(config.is_fetching_slots, 1);
if (config.showerrors) printf("WARNING: Cluster slots configuration changed, fetching new one...\n");
printf("Cluster slots configuration changed, fetching new one...\n");
const char *errmsg = "Failed to update cluster slots configuration"; const char *errmsg = "Failed to update cluster slots configuration";
static dictType dtype = { static dictType dtype = {
dictSdsHash, /* hash function */ dictSdsHash, /* hash function */
@ -1479,7 +1480,8 @@ int parseOptions(int argc, const char **argv) {
} else if (!strcmp(argv[i],"-I")) { } else if (!strcmp(argv[i],"-I")) {
config.idlemode = 1; config.idlemode = 1;
} else if (!strcmp(argv[i],"-e")) { } else if (!strcmp(argv[i],"-e")) {
config.showerrors = 1; printf("WARNING: -e option has been deprecated. "
"We now immediatly exit on error to avoid false results.\n");
} else if (!strcmp(argv[i],"-t")) { } else if (!strcmp(argv[i],"-t")) {
if (lastarg) goto invalid; if (lastarg) goto invalid;
/* We get the list of tests to run as a string in the form /* We get the list of tests to run as a string in the form
@ -1582,8 +1584,6 @@ usage:
" is executed. Default tests use this to hit random keys in the\n" " is executed. Default tests use this to hit random keys in the\n"
" specified range.\n" " specified range.\n"
" -P <numreq> Pipeline <numreq> requests. Default 1 (no pipeline).\n" " -P <numreq> Pipeline <numreq> requests. Default 1 (no pipeline).\n"
" -e If server replies with errors, show them on stdout.\n"
" (no more than 1 error per second is displayed)\n"
" -q Quiet. Just show query/sec values\n" " -q Quiet. Just show query/sec values\n"
" --precision Number of decimal places to display in latency output (default 0)\n" " --precision Number of decimal places to display in latency output (default 0)\n"
" --csv Output in CSV format\n" " --csv Output in CSV format\n"
@ -1711,7 +1711,6 @@ int main(int argc, const char **argv) {
config.keepalive = 1; config.keepalive = 1;
config.datasize = 3; config.datasize = 3;
config.pipeline = 1; config.pipeline = 1;
config.showerrors = 0;
config.randomkeys = 0; config.randomkeys = 0;
config.randomkeys_keyspacelen = 0; config.randomkeys_keyspacelen = 0;
config.quiet = 0; config.quiet = 0;
@ -1794,9 +1793,10 @@ int main(int argc, const char **argv) {
} else { } else {
config.redis_config = config.redis_config =
getRedisConfig(config.hostip, config.hostport, config.hostsocket); getRedisConfig(config.hostip, config.hostport, config.hostsocket);
if (config.redis_config == NULL) if (config.redis_config == NULL) {
fprintf(stderr, "WARN: could not fetch server CONFIG\n"); fprintf(stderr, "WARN: could not fetch server CONFIG\n");
} }
}
if (config.num_threads > 0) { if (config.num_threads > 0) {
pthread_mutex_init(&(config.liveclients_mutex), NULL); pthread_mutex_init(&(config.liveclients_mutex), NULL);
pthread_mutex_init(&(config.is_updating_slots_mutex), NULL); pthread_mutex_init(&(config.is_updating_slots_mutex), NULL);
@ -1958,8 +1958,8 @@ int main(int argc, const char **argv) {
} }
if (test_is_selected("lrange") || test_is_selected("lrange_500")) { if (test_is_selected("lrange") || test_is_selected("lrange_500")) {
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 449",tag); len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 499",tag);
benchmark("LRANGE_500 (first 450 elements)",cmd,len); benchmark("LRANGE_500 (first 500 elements)",cmd,len);
free(cmd); free(cmd);
} }
@ -1987,6 +1987,7 @@ int main(int argc, const char **argv) {
} while(config.loop); } while(config.loop);
zfree(data); zfree(data);
zfree(data);
if (config.redis_config != NULL) freeRedisConfig(config.redis_config); if (config.redis_config != NULL) freeRedisConfig(config.redis_config);
return 0; return 0;

View File

@ -39,12 +39,14 @@
static char error[1044]; static char error[1044];
static off_t epos; static off_t epos;
static long long line = 1;
int consumeNewline(char *buf) { int consumeNewline(char *buf) {
if (strncmp(buf,"\r\n",2) != 0) { if (strncmp(buf,"\r\n",2) != 0) {
ERROR("Expected \\r\\n, got: %02x%02x",buf[0],buf[1]); ERROR("Expected \\r\\n, got: %02x%02x",buf[0],buf[1]);
return 0; return 0;
} }
line += 1;
return 1; return 1;
} }
@ -201,8 +203,8 @@ int redis_check_aof_main(int argc, char **argv) {
off_t pos = process(fp); off_t pos = process(fp);
off_t diff = size-pos; off_t diff = size-pos;
printf("AOF analyzed: size=%lld, ok_up_to=%lld, diff=%lld\n", printf("AOF analyzed: size=%lld, ok_up_to=%lld, ok_up_to_line=%lld, diff=%lld\n",
(long long) size, (long long) pos, (long long) diff); (long long) size, (long long) pos, line, (long long) diff);
if (diff > 0) { if (diff > 0) {
if (fix) { if (fix) {
char buf[2]; char buf[2];

View File

@ -250,7 +250,7 @@ int redis_check_rdb(const char *rdbfilename, FILE *fp) {
rdbstate.doing = RDB_CHECK_DOING_READ_LEN; rdbstate.doing = RDB_CHECK_DOING_READ_LEN;
if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR) if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
goto eoferr; goto eoferr;
rdbCheckInfo("Selecting DB ID %d", dbid); rdbCheckInfo("Selecting DB ID %llu", (unsigned long long)dbid);
continue; /* Read type again. */ continue; /* Read type again. */
} else if (type == RDB_OPCODE_RESIZEDB) { } else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently /* RESIZEDB: Hint about the size of the keys in the currently

View File

@ -472,7 +472,7 @@ static void cliOutputHelp(int argc, char **argv) {
help = entry->org; help = entry->org;
if (group == -1) { if (group == -1) {
/* Compare all arguments */ /* Compare all arguments */
if (argc == entry->argc) { if (argc <= entry->argc) {
for (j = 0; j < argc; j++) { for (j = 0; j < argc; j++) {
if (strcasecmp(argv[j],entry->argv[j]) != 0) break; if (strcasecmp(argv[j],entry->argv[j]) != 0) break;
} }
@ -653,7 +653,9 @@ static int cliConnect(int flags) {
cliRefreshPrompt(); cliRefreshPrompt();
} }
if (config.hostsocket == NULL) { /* Do not use hostsocket when we got redirected in cluster mode */
if (config.hostsocket == NULL ||
(config.cluster_mode && config.cluster_reissue_command)) {
context = redisConnect(config.hostip,config.hostport); context = redisConnect(config.hostip,config.hostport);
} else { } else {
context = redisConnectUnix(config.hostsocket); context = redisConnectUnix(config.hostsocket);
@ -4559,7 +4561,7 @@ static void clusterManagerNodeArrayReset(clusterManagerNodeArray *array) {
static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array, static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array,
clusterManagerNode **nodeptr) clusterManagerNode **nodeptr)
{ {
assert(array->nodes < (array->nodes + array->len)); assert(array->len > 0);
/* If the first node to be shifted is not NULL, decrement count. */ /* If the first node to be shifted is not NULL, decrement count. */
if (*array->nodes != NULL) array->count--; if (*array->nodes != NULL) array->count--;
/* Store the first node to be shifted into 'nodeptr'. */ /* Store the first node to be shifted into 'nodeptr'. */
@ -4572,7 +4574,7 @@ static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array,
static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array, static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
clusterManagerNode *node) clusterManagerNode *node)
{ {
assert(array->nodes < (array->nodes + array->len)); assert(array->len > 0);
assert(node != NULL); assert(node != NULL);
assert(array->count < array->len); assert(array->count < array->len);
array->nodes[array->count++] = node; array->nodes[array->count++] = node;
@ -5949,7 +5951,7 @@ void showLatencyDistSamples(struct distsamples *samples, long long tot) {
printf("\033[38;5;0m"); /* Set foreground color to black. */ printf("\033[38;5;0m"); /* Set foreground color to black. */
for (j = 0; ; j++) { for (j = 0; ; j++) {
int coloridx = int coloridx =
ceil((float) samples[j].count / tot * (spectrum_palette_size-1)); ceil((double) samples[j].count / tot * (spectrum_palette_size-1));
int color = spectrum_palette[coloridx]; int color = spectrum_palette[coloridx];
printf("\033[48;5;%dm%c", (int)color, samples[j].character); printf("\033[48;5;%dm%c", (int)color, samples[j].character);
samples[j].count = 0; samples[j].count = 0;

View File

@ -254,9 +254,11 @@ void resizeReplicationBacklog(long long newsize) {
zfree(g_pserver->repl_backlog); zfree(g_pserver->repl_backlog);
g_pserver->repl_backlog = backlog; g_pserver->repl_backlog = backlog;
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen; g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
if (g_pserver->repl_batch_idxStart >= 0) {
g_pserver->repl_batch_idxStart -= earliest_idx; g_pserver->repl_batch_idxStart -= earliest_idx;
if (g_pserver->repl_batch_idxStart < 0) if (g_pserver->repl_batch_idxStart < 0)
g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size; g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size;
}
g_pserver->repl_backlog_start = earliest_off; g_pserver->repl_backlog_start = earliest_off;
} else { } else {
zfree(g_pserver->repl_backlog); zfree(g_pserver->repl_backlog);
@ -301,19 +303,56 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
if (lower_bound == -1) if (lower_bound == -1)
lower_bound = g_pserver->repl_batch_offStart; lower_bound = g_pserver->repl_batch_offStart;
long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
if (minimumsize > g_pserver->repl_backlog_size) {
flushReplBacklogToClients();
lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst);
if (lower_bound == -1)
lower_bound = g_pserver->repl_batch_offStart;
minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
if (minimumsize > g_pserver->repl_backlog_size) { if (minimumsize > g_pserver->repl_backlog_size) {
listIter li;
listNode *ln;
listRewind(g_pserver->slaves, &li);
long long maxClientBuffer = (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes;
if (maxClientBuffer <= 0)
maxClientBuffer = LLONG_MAX; // infinite essentially
long long min_offset = LLONG_MAX;
int listening_replicas = 0;
while ((ln = listNext(&li))) {
client *replica = (client*)listNodeValue(ln);
if (!canFeedReplicaReplBuffer(replica)) continue;
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
std::unique_lock<fastlock> ul(replica->lock);
// Would this client overflow? If so close it
long long neededBuffer = g_pserver->master_repl_offset + len - replica->repl_curr_off + 1;
if (neededBuffer > maxClientBuffer) {
sds clientInfo = catClientInfoString(sdsempty(),replica);
freeClientAsync(replica);
serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP due to exceeding output buffer hard limit.", clientInfo);
sdsfree(clientInfo);
continue;
}
min_offset = std::min(min_offset, replica->repl_curr_off);
++listening_replicas;
}
if (min_offset == LLONG_MAX) {
min_offset = g_pserver->repl_batch_offStart;
g_pserver->repl_lowest_off = -1;
} else {
g_pserver->repl_lowest_off = min_offset;
}
minimumsize = g_pserver->master_repl_offset + len - min_offset + 1;
serverAssert(listening_replicas == 0 || minimumsize <= maxClientBuffer);
if (minimumsize > g_pserver->repl_backlog_size && listening_replicas) {
// This is an emergency overflow, we better resize to fit // This is an emergency overflow, we better resize to fit
long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize); long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize);
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize); serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld bytes", newsize);
resizeReplicationBacklog(newsize); resizeReplicationBacklog(newsize);
} else if (!listening_replicas) {
// We need to update a few variables or later asserts will notice we dropped data
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset + len;
g_pserver->repl_lowest_off = -1;
} }
} }
} }
@ -4319,6 +4358,8 @@ void replicationCron(void) {
replicationStartPendingFork(); replicationStartPendingFork();
trimReplicationBacklog();
/* Remove the RDB file used for replication if Redis is not running /* Remove the RDB file used for replication if Redis is not running
* with any persistence. */ * with any persistence. */
removeRDBUsedToSyncReplicas(); removeRDBUsedToSyncReplicas();
@ -5064,3 +5105,17 @@ void updateFailoverStatus(void) {
g_pserver->target_replica_port); g_pserver->target_replica_port);
} }
} }
// If we automatically grew the backlog we need to trim it back to
// the config setting when possible
void trimReplicationBacklog() {
serverAssert(GlobalLocksAcquired());
serverAssert(g_pserver->repl_batch_offStart < 0); // we shouldn't be in a batch
if (g_pserver->repl_backlog_size <= g_pserver->repl_backlog_config_size)
return; // We're already a good size
if (g_pserver->repl_lowest_off > 0 && (g_pserver->master_repl_offset - g_pserver->repl_lowest_off + 1) > g_pserver->repl_backlog_config_size)
return; // There is untransmitted data we can't truncate
serverLog(LL_NOTICE, "Reclaiming %lld replication backlog bytes", g_pserver->repl_backlog_size - g_pserver->repl_backlog_config_size);
resizeReplicationBacklog(g_pserver->repl_backlog_config_size);
}

View File

@ -31,6 +31,7 @@
#include "sha1.h" #include "sha1.h"
#include "rand.h" #include "rand.h"
#include "cluster.h" #include "cluster.h"
#include "monotonic.h"
extern "C" { extern "C" {
#include <lua.h> #include <lua.h>
@ -1437,7 +1438,7 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body) {
/* This is the Lua script "count" hook that we use to detect scripts timeout. */ /* This is the Lua script "count" hook that we use to detect scripts timeout. */
void luaMaskCountHook(lua_State *lua, lua_Debug *ar) { void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
long long elapsed = mstime() - g_pserver->lua_time_start; long long elapsed = elapsedMs(g_pserver->lua_time_start);
UNUSED(ar); UNUSED(ar);
UNUSED(lua); UNUSED(lua);
@ -1591,7 +1592,8 @@ void evalGenericCommand(client *c, int evalsha) {
serverTL->in_eval = 1; serverTL->in_eval = 1;
g_pserver->lua_caller = c; g_pserver->lua_caller = c;
g_pserver->lua_cur_script = funcname + 2; g_pserver->lua_cur_script = funcname + 2;
g_pserver->lua_time_start = mstime(); g_pserver->lua_time_start = getMonotonicUs();
g_pserver->lua_time_snapshot = mstime();
g_pserver->lua_kill = 0; g_pserver->lua_kill = 0;
if (g_pserver->lua_time_limit > 0 && ldb.active == 0) { if (g_pserver->lua_time_limit > 0 && ldb.active == 0) {
lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000); lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000);
@ -2750,7 +2752,7 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
/* Check if a timeout occurred. */ /* Check if a timeout occurred. */
if (ar->event == LUA_HOOKCOUNT && ldb.step == 0 && bp == 0) { if (ar->event == LUA_HOOKCOUNT && ldb.step == 0 && bp == 0) {
mstime_t elapsed = mstime() - g_pserver->lua_time_start; mstime_t elapsed = elapsedMs(g_pserver->lua_time_start);
mstime_t timelimit = g_pserver->lua_time_limit ? mstime_t timelimit = g_pserver->lua_time_limit ?
g_pserver->lua_time_limit : 5000; g_pserver->lua_time_limit : 5000;
if (elapsed >= timelimit) { if (elapsed >= timelimit) {
@ -2780,6 +2782,7 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
lua_pushstring(lua, "timeout during Lua debugging with client closing connection"); lua_pushstring(lua, "timeout during Lua debugging with client closing connection");
lua_error(lua); lua_error(lua);
} }
g_pserver->lua_time_start = mstime(); g_pserver->lua_time_start = getMonotonicUs();
g_pserver->lua_time_snapshot = mstime();
} }
} }

View File

@ -4131,16 +4131,16 @@ void sentinelSetCommand(client *c) {
int numargs = j-old_j+1; int numargs = j-old_j+1;
switch(numargs) { switch(numargs) {
case 2: case 2:
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",ptrFromObj(c->argv[old_j]), sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",szFromObj(c->argv[old_j]),
ptrFromObj(c->argv[old_j+1])); szFromObj(c->argv[old_j+1]));
break; break;
case 3: case 3:
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",ptrFromObj(c->argv[old_j]), sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",szFromObj(c->argv[old_j]),
ptrFromObj(c->argv[old_j+1]), szFromObj(c->argv[old_j+1]),
ptrFromObj(c->argv[old_j+2])); szFromObj(c->argv[old_j+2]));
break; break;
default: default:
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",ptrFromObj(c->argv[old_j])); sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",szFromObj(c->argv[old_j]));
break; break;
} }
} }

View File

@ -4661,6 +4661,8 @@ int processCommand(client *c, int callFlags) {
return C_OK; return C_OK;
} }
int is_read_command = (c->cmd->flags & CMD_READONLY) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_READONLY));
int is_write_command = (c->cmd->flags & CMD_WRITE) || int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) || int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
@ -4891,6 +4893,14 @@ int processCommand(client *c, int callFlags) {
return C_OK; return C_OK;
} }
/* Prevent a replica from sending commands that access the keyspace.
* The main objective here is to prevent abuse of client pause check
* from which replicas are exempt. */
if ((c->flags & CLIENT_SLAVE) && (is_may_replicate_command || is_write_command || is_read_command)) {
rejectCommandFormat(c, "Replica can't interract with the keyspace");
return C_OK;
}
/* If the server is paused, block the client until /* If the server is paused, block the client until
* the pause has ended. Replicas are never paused. */ * the pause has ended. Replicas are never paused. */
if (!(c->flags & CLIENT_SLAVE) && if (!(c->flags & CLIENT_SLAVE) &&
@ -6086,10 +6096,11 @@ sds genRedisInfoString(const char *section) {
if (sections++) info = sdscat(info,"\r\n"); if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info, "# Keyspace\r\n"); info = sdscatprintf(info, "# Keyspace\r\n");
for (j = 0; j < cserver.dbnum; j++) { for (j = 0; j < cserver.dbnum; j++) {
long long keys, vkeys; long long keys, vkeys, cachedKeys;
keys = g_pserver->db[j]->size(); keys = g_pserver->db[j]->size();
vkeys = g_pserver->db[j]->expireSize(); vkeys = g_pserver->db[j]->expireSize();
cachedKeys = g_pserver->db[j]->size(true /* fCachedOnly */);
// Adjust TTL by the current time // Adjust TTL by the current time
mstime_t mstime; mstime_t mstime;
@ -6101,8 +6112,8 @@ sds genRedisInfoString(const char *section) {
if (keys || vkeys) { if (keys || vkeys) {
info = sdscatprintf(info, info = sdscatprintf(info,
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", "db%d:keys=%lld,expires=%lld,avg_ttl=%lld,cached_keys=%lld\r\n",
j, keys, vkeys, static_cast<long long>(g_pserver->db[j]->avg_ttl)); j, keys, vkeys, static_cast<long long>(g_pserver->db[j]->avg_ttl), cachedKeys);
} }
} }
} }
@ -6120,7 +6131,11 @@ sds genRedisInfoString(const char *section) {
"variant:enterprise\r\n" "variant:enterprise\r\n"
"license_status:%s\r\n" "license_status:%s\r\n"
"mvcc_depth:%d\r\n", "mvcc_depth:%d\r\n",
#ifdef NO_LICENSE_CHECK
"OK",
#else
cserver.license_key ? "OK" : "Trial", cserver.license_key ? "OK" : "Trial",
#endif
mvcc_depth mvcc_depth
); );
} }
@ -7070,6 +7085,8 @@ static void validateConfiguration()
serverLog(LL_WARNING, "\tKeyDB will now exit. Please update your configuration file."); serverLog(LL_WARNING, "\tKeyDB will now exit. Please update your configuration file.");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
g_pserver->repl_backlog_config_size = g_pserver->repl_backlog_size; // this is normally set in the update logic, but not on initial config
} }
int iAmMaster(void) { int iAmMaster(void) {

View File

@ -1095,7 +1095,7 @@ public:
redisDbPersistentData(redisDbPersistentData &&) = default; redisDbPersistentData(redisDbPersistentData &&) = default;
size_t slots() const { return dictSlots(m_pdict); } size_t slots() const { return dictSlots(m_pdict); }
size_t size() const; size_t size(bool fCachedOnly = false) const;
void expand(uint64_t slots) { dictExpand(m_pdict, slots); } void expand(uint64_t slots) { dictExpand(m_pdict, slots); }
void trackkey(robj_roptr o, bool fUpdate) void trackkey(robj_roptr o, bool fUpdate)
@ -2358,6 +2358,7 @@ struct redisServer {
int repl_ping_slave_period; /* Master pings the replica every N seconds */ int repl_ping_slave_period; /* Master pings the replica every N seconds */
char *repl_backlog; /* Replication backlog for partial syncs */ char *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */ long long repl_backlog_size; /* Backlog circular buffer size */
long long repl_backlog_config_size; /* The repl backlog may grow but we want to know what the user set it to */
long long repl_backlog_histlen; /* Backlog actual data length */ long long repl_backlog_histlen; /* Backlog actual data length */
long long repl_backlog_idx; /* Backlog circular buffer current offset, long long repl_backlog_idx; /* Backlog circular buffer current offset,
that is the next byte will'll write to.*/ that is the next byte will'll write to.*/
@ -2483,7 +2484,8 @@ struct redisServer {
::dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ ::dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */ unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */
mstime_t lua_time_limit; /* Script timeout in milliseconds */ mstime_t lua_time_limit; /* Script timeout in milliseconds */
mstime_t lua_time_start; /* Start time of script, milliseconds time */ monotime lua_time_start; /* monotonic timer to detect timed-out script */
mstime_t lua_time_snapshot; /* Snapshot of mstime when script is started */
int lua_write_dirty; /* True if a write command was called during the int lua_write_dirty; /* True if a write command was called during the
execution of the current script. */ execution of the current script. */
int lua_random_dirty; /* True if a random command was called during the int lua_random_dirty; /* True if a random command was called during the
@ -2873,6 +2875,7 @@ void disableTracking(client *c);
void trackingRememberKeys(client *c); void trackingRememberKeys(client *c);
void trackingInvalidateKey(client *c, robj *keyobj); void trackingInvalidateKey(client *c, robj *keyobj);
void trackingInvalidateKeysOnFlush(int async); void trackingInvalidateKeysOnFlush(int async);
void freeTrackingRadixTree(rax *rt);
void freeTrackingRadixTreeAsync(rax *rt); void freeTrackingRadixTreeAsync(rax *rt);
void trackingLimitUsedSlots(void); void trackingLimitUsedSlots(void);
uint64_t trackingGetTotalItems(void); uint64_t trackingGetTotalItems(void);
@ -3022,6 +3025,8 @@ void clearFailoverState(void);
void updateFailoverStatus(void); void updateFailoverStatus(void);
void abortFailover(redisMaster *mi, const char *err); void abortFailover(redisMaster *mi, const char *err);
const char *getFailoverStateString(); const char *getFailoverStateString();
int canFeedReplicaReplBuffer(client *replica);
void trimReplicationBacklog();
/* Generic persistence functions */ /* Generic persistence functions */
void startLoadingFile(FILE* fp, const char * filename, int rdbflags); void startLoadingFile(FILE* fp, const char * filename, int rdbflags);

View File

@ -810,7 +810,7 @@ void stralgoLCS(client *c) {
/* Setup an uint32_t array to store at LCS[i,j] the length of the /* Setup an uint32_t array to store at LCS[i,j] the length of the
* LCS A0..i-1, B0..j-1. Note that we have a linear array here, so * LCS A0..i-1, B0..j-1. Note that we have a linear array here, so
* we index it as LCS[j+(blen+1)*j] */ * we index it as LCS[j+(blen+1)*j] */
uint32_t *lcs = (uint32_t*)zmalloc((alen+1)*(blen+1)*sizeof(uint32_t)); uint32_t *lcs = (uint32_t*)zmalloc((size_t)(alen+1)*(blen+1)*sizeof(uint32_t));
#define LCS(A,B) lcs[(B)+((A)*(blen+1))] #define LCS(A,B) lcs[(B)+((A)*(blen+1))]
/* Start building the LCS table. */ /* Start building the LCS table. */

View File

@ -158,6 +158,18 @@ tags {"aof"} {
assert_match "*not valid*" $result assert_match "*not valid*" $result
} }
test "Short read: Utility should show the abnormal line num in AOF" {
create_aof {
append_to_aof [formatCommand set foo hello]
append_to_aof "!!!"
}
catch {
exec src/keydb-check-aof $aof_path
} result
assert_match "*ok_up_to_line=8*" $result
}
test "Short read: Utility should be able to fix the AOF" { test "Short read: Utility should be able to fix the AOF" {
set result [exec src/keydb-check-aof --fix $aof_path << "y\n"] set result [exec src/keydb-check-aof --fix $aof_path << "y\n"]
assert_match "*Successfully truncated AOF*" $result assert_match "*Successfully truncated AOF*" $result

View File

@ -39,6 +39,7 @@ proc show_cluster_status {} {
# all the lists are empty. # all the lists are empty.
# #
# regexp {^[0-9]+:[A-Z] [0-9]+ [A-z]+ [0-9]+ ([0-9:.]+) .*} $l - logdate # regexp {^[0-9]+:[A-Z] [0-9]+ [A-z]+ [0-9]+ ([0-9:.]+) .*} $l - logdate
catch {
while 1 { while 1 {
# Find the log with smallest time. # Find the log with smallest time.
set empty 0 set empty 0
@ -68,6 +69,7 @@ proc show_cluster_status {} {
set log($best) [lrange $log($best) 1 end] set log($best) [lrange $log($best) 1 end]
} }
} }
}
} }
start_server {tags {"psync2"}} { start_server {tags {"psync2"}} {

View File

@ -25,7 +25,7 @@ test {CONFIG SET port number} {
test {CONFIG SET bind address} { test {CONFIG SET bind address} {
start_server {} { start_server {} {
# non-valid address # non-valid address
catch {r CONFIG SET bind "some.wrong.bind.address"} e catch {r CONFIG SET bind "999.999.999.999"} e
assert_match {*Failed to bind to specified addresses*} $e assert_match {*Failed to bind to specified addresses*} $e
# make sure server still bound to the previous address # make sure server still bound to the previous address

View File

@ -395,6 +395,17 @@ start_server {tags {"tracking network"}} {
assert {[lindex msg 2] eq {} } assert {[lindex msg 2] eq {} }
} }
test {Test ASYNC flushall} {
clean_all
r CLIENT TRACKING on REDIRECT $redir_id
r GET key1
r GET key2
assert_equal [s 0 tracking_total_keys] 2
$rd_sg FLUSHALL ASYNC
assert_equal [s 0 tracking_total_keys] 0
assert_equal [lindex [$rd_redirection read] 2] {}
}
# Keys are defined to be evicted 100 at a time by default. # Keys are defined to be evicted 100 at a time by default.
# If after eviction the number of keys still surpasses the limit # If after eviction the number of keys still surpasses the limit
# defined in tracking-table-max-keys, we increases eviction # defined in tracking-table-max-keys, we increases eviction