Merge branch 'keydbpro_collab' into multithread_load

Former-commit-id: 8016c20f1f9a648e658c816e2f6777c5718d5e19
This commit is contained in:
John Sully 2021-08-09 20:20:34 +00:00
commit c77ce968c5
37 changed files with 833 additions and 288 deletions

147
.gitlab-ci.yml Normal file
View File

@ -0,0 +1,147 @@
build:
rules:
- if: '$COVERAGE'
when: never
- if: '$ENDURANCE'
when: never
- when: always
tags:
- docker
stage: build
script:
- git submodule init && git submodule update
- make distclean
- make -j
make-test:
rules:
- if: '$COVERAGE'
when: never
- if: '$ENDURANCE'
when: never
- when: always
tags:
- docker
stage: test
script:
- git submodule init && git submodule update
- make distclean
- make -j
- make test -j
node-redis-test:
rules:
- if: '$COVERAGE'
when: never
- if: '$ENDURANCE'
when: never
- when: always
tags:
- docker
- ipv6
stage: test
script:
- git submodule init && git submodule update
- make distclean
- make -j
- make install
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/node-redis.git
- cd node-redis
- npm install
- npm run test
jedis-test:
rules:
- if: '$COVERAGE'
when: never
- if: '$ENDURANCE'
when: never
- when: always
tags:
- docker
- ipv4
stage: test
script:
- git submodule init && git submodule update
- make distclean
- make -j
- make install
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/jedis.git
- cd jedis
- make test
redis-rs-test:
rules:
- if: '$COVERAGE'
when: never
- if: '$ENDURANCE'
when: never
- when: always
tags:
- docker
stage: test
script:
- git submodule init && git submodule update
- make distclean
- make -j
- make install
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/redis-rs.git
- cd redis-rs
- make test
endurance-test:
rules:
- if: '$ENDURANCE'
tags:
- docker
stage: test
script:
- git submodule init && git submodule update
- make distclean
- make -j
- ./runtest --loop --stop
coverage-test:
rules:
- if: '$COVERAGE'
tags:
- docker
stage: test
script:
- git submodule init && git submodule update
- make distclean
- make gcov -j
- make install
- ./runtest || true
- pkill keydb-server || true
- pkill stunnel || true
- ./runtest-cluster || true
- pkill keydb-server || true
- pkill stunnel || true
- ./runtest-sentinel || true
- pkill keydb-server || true
- pkill stunnel || true
- ./runtest-moduleapi || true
- pkill keydb-server || true
- pkill stunnel || true
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/redis-rs.git
- cd redis-rs
- make test || true
- pkill keydb-server || true
- pkill stunnel || true
- cd ..
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/jedis.git
- cd jedis
- make test || true
- pkill keydb-server || true
- pkill stunnel || true
- cd ..
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/node-redis.git
- cd node-redis
- npm install
- npm run test || true
- pkill keydb-server || true
- pkill stunnel || true
- cd ..
- geninfo -o KeyDB.info --no-external .
- genhtml --legend -o lcov-html KeyDB.info

View File

@ -11,6 +11,40 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
SECURITY: There are security fixes in the release.
--------------------------------------------------------------------------------
================================================================================
Redis 6.2.3 Released Mon May 3 19:00:00 IST 2021
================================================================================
Upgrade urgency: SECURITY, Contains fixes to security issues that affect
authenticated client connections. LOW otherwise.
Integer overflow in STRALGO LCS command (CVE-2021-29477):
An integer overflow bug in Redis version 6.0 or newer could be exploited using
the STRALGO LCS command to corrupt the heap and potentially result in remote
code execution. The integer overflow bug exists in all versions of Redis
starting with 6.0.
Integer overflow in COPY command for large intsets (CVE-2021-29478):
An integer overflow bug in Redis 6.2 could be exploited to corrupt the heap and
potentially result with remote code execution. The vulnerability involves
changing the default set-max-intset-entries configuration value, creating a
large set key that consists of integer values and using the COPY command to
duplicate it. The integer overflow bug exists in all versions of Redis starting
with 2.6, where it could result with a corrupted RDB or DUMP payload, but not
exploited through COPY (which did not exist before 6.2).
Bug fixes that are only applicable to previous releases of Redis 6.2:
* Fix memory leak in moduleDefragGlobals (#8853)
* Fix memory leak when doing lazy freeing client tracking table (#8822)
* Block abusive replicas from sending command that could assert and crash redis (#8868)
Other bug fixes:
* Use a monotonic clock to check for Lua script timeout (#8812)
* redis-cli: Do not use unix socket when we got redirected in cluster mode (#8870)
Modules:
* Fix RM_GetClusterNodeInfo() to correctly populate master id (#8846)
================================================================================
Redis 6.2.2 Released Mon April 19 19:00:00 IST 2021
================================================================================

View File

@ -241,7 +241,7 @@ iget_defrag_hint(tsdn_t *tsdn, void* ptr) {
int free_in_slab = extent_nfree_get(slab);
if (free_in_slab) {
const bin_info_t *bin_info = &bin_infos[binind];
int curslabs = binshard->stats.curslabs;
ssize_t curslabs = binshard->stats.curslabs;
size_t curregs = binshard->stats.curregs;
if (binshard->slabcur) {
/* remove slabcur from the overall utilization */

View File

@ -1,21 +1,17 @@
import keydb
import random
import sched, time
import time
import socket
import asyncore
import threading
import sys
import argparse
from pprint import pprint
# Parameters
numclients = 50
#numkeys = 1000000
numkeys = 100000
# Globals
ops=0
s = sched.scheduler(time.time, time.sleep)
g_exit = False
ops = {}
numclients = 0
numkeys = 0
runtime = 0
clients = []
def _buildResp(*args):
result = "*" + str(len(args)) + "\r\n"
@ -32,6 +28,8 @@ class Client(asyncore.dispatcher):
self.buf = b''
self.inbuf = b''
self.callbacks = list()
self.client_id = 0
self.get_client_id()
def handle_connect(self):
pass
@ -55,6 +53,9 @@ class Client(asyncore.dispatcher):
endrange = self.inbuf[startpos+1:].find(ord('\r')) + 1 + startpos
assert(endrange > 0)
numargs = int(self.inbuf[startpos+1:endrange])
if numargs == -1: # Nil array, used in some returns
startpos = endrange + 2
return startpos, []
assert(numargs > 0)
args = list()
startpos = endrange + 2 # plus 1 gets us to the '\n' and the next gets us to the start char
@ -127,10 +128,18 @@ class Client(asyncore.dispatcher):
self.buf += _buildResp("lpush", key, val)
self.callbacks.append(callback)
def blpop(self, *keys, timeout=0, callback=default_result_handler):
self.buf += _buildResp("blpop", *keys, str(timeout))
self.callbacks.append(callback)
def delete(self, key, callback = default_result_handler):
self.buf += _buildResp("del", key)
self.callbacks.append(callback)
def unblock(self, client_id, callback=default_result_handler):
self.buf += _buildResp("client", "unblock", str(client_id))
self.callbacks.append(callback)
def scan(self, iter, match=None, count=None, callback = default_result_handler):
args = ["scan", str(iter)]
if match != None:
@ -142,68 +151,133 @@ class Client(asyncore.dispatcher):
self.buf += _buildResp(*args)
self.callbacks.append(callback)
def get_client_id(self):
self.buf += _buildResp("client", "id")
self.callbacks.append(self.store_client_id)
def store_client_id(self, c, resp):
assert(resp[0] == ord(':'))
self.client_id = int(resp[1:])
assert(self.client_id == c.client_id)
def get(self, key, callback = None):
return
def getrandomkey():
return str(random.randrange(0, numkeys))
def handle_lpush_response(c, resp):
def handle_lpush_response(c, resp=None):
global ops
if resp != None:
ops = ops + 1
ops['lpush'] += 1
assert(resp[0] == ord(':'))
c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response)
def handle_set_response(c, resp):
def handle_blpop_response(c, resp=None):
global ops
if resp != None:
ops = ops + 1
ops['blpop'] += 1
c.blpop("list_" + getrandomkey(), callback=handle_blpop_response)
def handle_set_response(c, resp=None):
global ops
if resp != None:
ops['set'] += 1
assert(resp[0] == ord('+'))
c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response)
def handle_del_response(c, resp):
def handle_del_response(c, resp=None):
global ops
if resp != None:
ops = ops + 1
ops['del'] += 1
c.delete("list_" + getrandomkey(), handle_del_response)
def scan_callback(c, resp):
def scan_callback(c, resp=None):
global ops
nextstart = int(resp[0])
c.scan(nextstart, count=500, callback=scan_callback)
ops = ops+1
ops['scan'] += 1
def clear_ops():
global ops
ops = {'lpush': 0, 'blpop': 0, 'del': 0, 'scan': 0, 'set': 0, 'get': 0}
def stats_thread():
global ops
global g_exit
while not g_exit:
global runtime
i = 0
while i < runtime or not runtime:
time.sleep(1)
print("Ops per second: " + str(ops))
ops = 0
print("Ops per second: " + str({k:v for (k,v) in ops.items() if v}))
clear_ops()
i += 1
asyncore.close_all()
def main():
global g_exit
clients = []
def flush_db_sync():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.connect(('127.0.0.1', 6379))
server.send(_buildResp("flushdb"))
resp = server.recv(8192)
assert(resp[:3] == "+OK".encode('utf-8'))
def init_blocking():
global clients
if numkeys > 100 * numclients:
print("WARNING: High ratio of keys to clients. Most lpushes will not be popped and unblocking will take a long time!")
for i in range(numclients):
clients.append(Client('127.0.0.1', 6379))
if i % 2:
handle_blpop_response(clients[-1])
else:
handle_lpush_response(clients[-1])
def init_lpush():
global clients
for i in range(numclients):
clients.append(Client('127.0.0.1', 6379))
for i in range (10):
handle_lpush_response(clients[-1], None)
handle_lpush_response(clients[-1])
#handle_set_response(clients[-1], None)
scan_client = Client('127.0.0.1', 6379)
scan_client.scan(0, count=500, callback=scan_callback)
del_client = Client('127.0.0.1', 6379)
handle_del_response(del_client, None)
handle_del_response(del_client)
def main(test, flush):
clear_ops()
if flush:
flush_db_sync()
try:
globals()[f"init_{test}"]()
except KeyError:
print(f"Test \"{test}\" not found. Exiting...")
exit()
except ConnectionRefusedError:
print("Could not connect to server. Is it running?")
print("Exiting...")
exit()
threading.Thread(target=stats_thread).start()
asyncore.loop()
g_exit = True
sys.exit(0)
print("DONE")
print("Done.")
parser = argparse.ArgumentParser(description="Test use cases for KeyDB.")
parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())], help="which test to run")
parser.add_argument('-c', '--clients', type=int, default=50, help="number of running clients to use")
parser.add_argument('-k', '--keys', type=int, default=100000, help="number of keys to choose from for random tests")
parser.add_argument('-t', '--runtime', type=int, default=0, help="how long to run the test for (default: 0 for infinite)")
parser.add_argument('-f', '--flush', action="store_true", help="flush the db before running the test")
if __name__ == "__main__":
main()
try:
args = parser.parse_args()
except:
exit()
numclients = args.clients
numkeys = args.keys
runtime = args.runtime
main(args.test, args.flush)

View File

@ -15,7 +15,7 @@
release_hdr := $(shell sh -c './mkreleasehdr.sh')
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not')
OPTIMIZATION?=-O2
OPTIMIZATION?=-O2 -flto
DEPENDENCY_TARGETS=hiredis linenoise lua hdr_histogram rocksdb
NODEPS:=clean distclean
@ -140,6 +140,10 @@ DEBUG=-g -ggdb
ifneq ($(uname_S),Darwin)
FINAL_LIBS+=-latomic
endif
# Linux ARM32 needs -latomic at linking time
ifneq (,$(findstring armv,$(uname_M)))
FINAL_LIBS+=-latomic
endif
ifeq ($(uname_S),SunOS)
@ -349,9 +353,9 @@ endif
REDIS_SERVER_NAME=keydb-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=keydb-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd_server.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
REDIS_CLI_NAME=keydb-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd_client.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX)
REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o release.o crcspeed.o crc64.o siphash.o redis-benchmark.o storage-lite.o fastlock.o new.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
REDIS_CHECK_RDB_NAME=keydb-check-rdb$(PROG_SUFFIX)
@ -435,6 +439,12 @@ DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ
# Because the jemalloc.h header is generated as a part of the jemalloc build,
# building it should complete before building any other object. Instead of
# depending on a single artifact, build all dependencies first.
motd_client.o: motd.cpp .make-prerequisites
$(REDIS_CXX) -MMD -o motd_client.o -c $< -DCLIENT -fno-lto
motd_server.o: motd.cpp .make-prerequisites
$(REDIS_CXX) -MMD -o motd_server.o -c $< -DSERVER
%.o: %.c .make-prerequisites
$(REDIS_CC) -MMD -o $@ -c $<

View File

@ -25,6 +25,12 @@ StorageCache::StorageCache(IStorage *storage, bool fCache)
m_pdict = dictCreate(&dbStorageCacheType, nullptr);
}
StorageCache::~StorageCache()
{
if (m_pdict != nullptr)
dictRelease(m_pdict);
}
void StorageCache::clear()
{
std::unique_lock<fastlock> ul(m_lock);
@ -130,9 +136,10 @@ void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const
size_t StorageCache::count() const
{
std::unique_lock<fastlock> ul(m_lock);
std::unique_lock<fastlock> ul(m_lock, std::defer_lock);
bool fLocked = ul.try_lock();
size_t count = m_spstorage->count();
if (m_pdict != nullptr) {
if (m_pdict != nullptr && fLocked) {
serverAssert(bulkInsertsInProgress.load(std::memory_order_seq_cst) || count == (dictSize(m_pdict) + m_collisionCount));
}
return count;
@ -140,6 +147,5 @@ size_t StorageCache::count() const
void StorageCache::beginWriteBatch() {
serverAssert(GlobalLocksAcquired()); // Otherwise we deadlock
m_lock.lock();
m_spstorage->beginWriteBatch();
}

View File

@ -29,6 +29,8 @@ class StorageCache
}
public:
~StorageCache();
static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) {
StorageCache *cache = new StorageCache(nullptr, pfactory->FSlow() /*fCache*/);
load_iter_data data = {cache, fn, privdata};
@ -45,7 +47,7 @@ public:
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }
void beginWriteBatch();
void endWriteBatch() { m_spstorage->endWriteBatch(); m_lock.unlock(); }
void endWriteBatch() { m_spstorage->endWriteBatch(); }
void batch_lock() { return m_spstorage->batch_lock(); }
void batch_unlock() { return m_spstorage->batch_unlock(); }

View File

@ -5588,9 +5588,10 @@ try_again:
if (ttl < 1) ttl = 1;
}
/* Relocate valid (non expired) keys into the array in successive
/* Relocate valid (non expired) keys and values into the array in successive
* positions to remove holes created by the keys that were present
* in the first lookup but are now expired after the second lookup. */
ov[non_expired] = ov[j];
kv[non_expired++] = kv[j];
serverAssertWithInfo(c,NULL,

View File

@ -2470,6 +2470,7 @@ static int updateReplBacklogSize(long long val, long long prev, const char **err
* being able to tell when the size changes, so restore prev before calling it. */
UNUSED(err);
g_pserver->repl_backlog_size = prev;
g_pserver->repl_backlog_config_size = val;
resizeReplicationBacklog(val);
return 1;
}

View File

@ -2018,7 +2018,7 @@ int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key) {
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
if (g_pserver->lua_caller) {
now = g_pserver->lua_time_start;
now = g_pserver->lua_time_snapshot;
}
/* If we are in the middle of a command execution, we still want to use
* a reference time that does not change: in that case we just use the
@ -2079,14 +2079,17 @@ int expireIfNeeded(redisDb *db, robj *key) {
if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;
/* Delete the key */
if (g_pserver->lazyfree_lazy_expire) {
dbAsyncDelete(db,key);
} else {
dbSyncDelete(db,key);
}
g_pserver->stat_expiredkeys++;
propagateExpire(db,key,g_pserver->lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
int retval = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
if (retval) signalModifiedKey(NULL,db,key);
return retval;
signalModifiedKey(NULL,db,key);
return 1;
}
/* -----------------------------------------------------------------------------
@ -2726,6 +2729,8 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
serverAssert(sdsKey != nullptr);
serverAssert(FImplies(*pde != nullptr, dictGetVal(*pde) != nullptr)); // early versions set a NULL object, this is no longer valid
serverAssert(m_refCount == 0);
if (m_pdbSnapshot == nullptr && g_pserver->m_pstorageFactory == nullptr)
return;
std::unique_lock<fastlock> ul(g_expireLock);
// First see if the key can be obtained from a snapshot
@ -2986,13 +2991,13 @@ dict_iter redisDbPersistentData::random()
return dict_iter(m_pdict, de);
}
size_t redisDbPersistentData::size() const
size_t redisDbPersistentData::size(bool fCachedOnly) const
{
if (m_spstorage != nullptr && !m_fAllChanged)
if (m_spstorage != nullptr && !m_fAllChanged && !fCachedOnly)
return m_spstorage->count() + m_cnewKeysPending;
return dictSize(m_pdict)
+ (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
+ (m_pdbSnapshot ? (m_pdbSnapshot->size(fCachedOnly) - dictSize(m_pdictTombstone)) : 0);
}
bool redisDbPersistentData::removeCachedValue(const char *key, dictEntry **ppde)
@ -3047,13 +3052,13 @@ void redisDbPersistentData::removeAllCachedValues()
trackChanges(false);
}
if (m_pdict->pauserehash == 0) {
if (m_pdict->pauserehash == 0 && m_pdict->refcount == 1) {
dict *dT = m_pdict;
m_pdict = dictCreate(&dbDictType, this);
dictExpand(m_pdict, dictSize(dT)/2, false); // Make room for about half so we don't excessively rehash
g_pserver->asyncworkqueue->AddWorkFunction([dT]{
dictRelease(dT);
}, true);
}, false);
} else {
dictEmpty(m_pdict, nullptr);
}

View File

@ -535,7 +535,8 @@ dictAsyncRehashCtl::~dictAsyncRehashCtl() {
while (deGCList != nullptr) {
auto next = deGCList->next;
dictFreeKey(dict, deGCList);
dictFreeVal(dict, deGCList);
if (deGCList->v.val != nullptr)
dictFreeVal(dict, deGCList);
zfree(deGCList);
deGCList = next;
}
@ -694,6 +695,8 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
d->ht[table].table[idx] = he->next;
if (!nofree) {
if (table == 0 && d->asyncdata != nullptr && (ssize_t)idx < d->rehashidx) {
dictFreeVal(d, he);
he->v.val = nullptr;
he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he;
} else {
@ -752,6 +755,8 @@ void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
if (he == NULL) return;
if (d->asyncdata) {
dictFreeVal(d, he);
he->v.val = nullptr;
he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he;
} else {
@ -775,6 +780,8 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
while(he) {
nextHe = he->next;
if (d->asyncdata && (ssize_t)i < d->rehashidx) {
dictFreeVal(d, he);
he->v.val = nullptr;
he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he;
} else {

View File

@ -355,6 +355,8 @@ unsigned long LFUDecrAndReturn(robj_roptr o) {
return counter;
}
unsigned long getClientReplicationBacklogSharedUsage(client *c);
/* We don't want to count AOF buffers and slaves output buffers as
* used memory: the eviction should use mostly data size. This function
* returns the sum of AOF and slaves buffer. */
@ -371,9 +373,15 @@ size_t freeMemoryGetNotCountedMemory(void) {
while((ln = listNext(&li))) {
client *replica = (client*)listNodeValue(ln);
std::unique_lock<fastlock>(replica->lock);
overhead += getClientOutputBufferMemoryUsage(replica);
/* we don't wish to multiple count the replication backlog shared usage */
overhead += (getClientOutputBufferMemoryUsage(replica) - getClientReplicationBacklogSharedUsage(replica));
}
}
/* also don't count the replication backlog memory
* that's where the replication clients get their memory from */
overhead += g_pserver->repl_backlog_size;
if (g_pserver->aof_state != AOF_OFF) {
overhead += sdsalloc(g_pserver->aof_buf)+aofRewriteBufferSize();
}
@ -470,11 +478,13 @@ public:
FreeMemoryLazyFree(FreeMemoryLazyFree&&) = default;
~FreeMemoryLazyFree() {
aeAcquireLock();
for (auto &pair : vecdictvecde) {
for (auto de : pair.second) {
dictFreeUnlinkedEntry(pair.first, de);
}
}
aeReleaseLock();
--s_clazyFreesInProgress;
}
@ -822,9 +832,9 @@ int performEvictions(bool fPreSnapshot) {
/* After some time, exit the loop early - even if memory limit
* hasn't been reached. If we suddenly need to free a lot of
* memory, don't want to spend too much time here. */
if (elapsedUs(evictionTimer) > eviction_time_limit_us) {
if (g_pserver->m_pstorageFactory == nullptr && elapsedUs(evictionTimer) > eviction_time_limit_us) {
// We still need to free memory - start eviction timer proc
if (!isEvictionProcRunning) {
if (!isEvictionProcRunning && serverTL->el != nullptr) {
isEvictionProcRunning = 1;
aeCreateTimeEvent(serverTL->el, 0,
evictionTimeProc, NULL, NULL);

View File

@ -281,7 +281,7 @@ uint32_t intsetLen(const intset *is) {
/* Return intset blob size in bytes. */
size_t intsetBlobLen(intset *is) {
return sizeof(intset)+intrev32ifbe(is->length)*intrev32ifbe(is->encoding);
return sizeof(intset)+(size_t)intrev32ifbe(is->length)*intrev32ifbe(is->encoding);
}
/* Validate the integrity of the data structure.

View File

@ -39,12 +39,11 @@ void lazyfreeFreeSlotsMap(void *args[]) {
atomicIncr(lazyfreed_objects,len);
}
/* Release the rax mapping Redis Cluster keys to slots in the
* lazyfree thread. */
/* Release the key tracking table. */
void lazyFreeTrackingTable(void *args[]) {
rax *rt = (rax*)args[0];
size_t len = rt->numele;
raxFree(rt);
freeTrackingRadixTree(rt);
atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
}

View File

@ -94,8 +94,8 @@ lwCanvas *lwCreateCanvas(int width, int height, int bgcolor) {
lwCanvas *canvas = zmalloc(sizeof(*canvas));
canvas->width = width;
canvas->height = height;
canvas->pixels = zmalloc(width*height);
memset(canvas->pixels,bgcolor,width*height);
canvas->pixels = zmalloc((size_t)width*height);
memset(canvas->pixels,bgcolor,(size_t)width*height);
return canvas;
}

View File

@ -71,7 +71,7 @@ void memtest_progress_start(char *title, int pass) {
printf("\x1b[H\x1b[2K"); /* Cursor home, clear current line. */
printf("%s [%d]\n", title, pass); /* Print title. */
progress_printed = 0;
progress_full = ws.ws_col*(ws.ws_row-3);
progress_full = (size_t)ws.ws_col*(ws.ws_row-3);
fflush(stdout);
}

View File

@ -6367,7 +6367,7 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m
/* If the information is not available, the function will set the
* field to zero bytes, so that when the field can't be populated the
* function kinda remains predictable. */
if (node->flags & CLUSTER_NODE_MASTER && node->slaveof)
if (node->flags & CLUSTER_NODE_SLAVE && node->slaveof)
memcpy(master_id,node->slaveof->name,REDISMODULE_NODE_ID_LEN);
else
memset(master_id,0,REDISMODULE_NODE_ID_LEN);
@ -9430,6 +9430,7 @@ long moduleDefragGlobals(void) {
module->defrag_cb(&defrag_ctx);
defragged += defrag_ctx.defragged;
}
dictReleaseIterator(di);
return defragged;
}

View File

@ -1,7 +1,11 @@
#ifdef CLIENT
extern "C" {
#include <sdscompat.h>
#include <sds.h>
}
#else
#include "sds.h"
#endif
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
@ -15,6 +19,7 @@ extern "C" {
#ifdef MOTD
#include <curl/curl.h>
#ifdef CLIENT
extern "C" {
__attribute__ ((weak)) hisds hi_sdscatlen(hisds s, const void *t, size_t len) {
return sdscatlen(s, t, len);
@ -23,6 +28,7 @@ __attribute__ ((weak)) hisds hi_sdscat(hisds s, const char *t) {
return sdscat(s, t);
}
}
#endif
static const char *szMotdCachePath()
{

View File

@ -136,6 +136,7 @@ client *createClient(connection *conn, int iel) {
client_id = g_pserver->next_client_id.fetch_add(1);
c->iel = iel;
c->id = client_id;
sprintf(c->lock.szName, "client %lu", client_id);
c->resp = 2;
c->conn = conn;
c->name = NULL;
@ -234,6 +235,7 @@ void clientInstallWriteHandler(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for slaves, if the replica can actually receive
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
@ -315,7 +317,7 @@ int prepareClientToWrite(client *c) {
/* Schedule the client to write the output buffers to the socket, unless
* it should already be setup to do so (it has already pending data). */
if (!fAsync && !clientHasPendingReplies(c)) clientInstallWriteHandler(c);
if (!fAsync && (c->flags & CLIENT_SLAVE || !clientHasPendingReplies(c))) clientInstallWriteHandler(c);
if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
/* Authorize the caller to queue in the output buffer of this client. */
@ -1129,7 +1131,7 @@ void copyClientOutputBuffer(client *dst, client *src) {
/* Return true if the specified client has pending reply buffers to write to
* the socket. */
int clientHasPendingReplies(client *c) {
return (c->bufpos || listLength(c->reply));
return (c->bufpos || listLength(c->reply) || c->FPendingReplicaWrite());
}
static std::atomic<int> rgacceptsInFlight[MAX_EVENT_LOOPS];
@ -1764,6 +1766,8 @@ client *lookupClientByID(uint64_t id) {
return (c == raxNotFound) ? NULL : c;
}
long long getReplIndexFromOffset(long long offset);
/* Write data in output buffers to client. Return C_OK if the client
* is still valid after the call, C_ERR if it was freed because of some
* error. If handler_installed is set, it will attempt to clear the
@ -1782,61 +1786,98 @@ int writeToClient(client *c, int handler_installed) {
std::unique_lock<decltype(c->lock)> lock(c->lock);
while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) {
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
/* We can only directly read from the replication backlog if the client
is a replica, so only attempt to do so if that's the case. */
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) {
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
serverAssert(c->repl_curr_off != -1);
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
o = (clientReplyBlock*)listNodeValue(listFirst(c->reply));
if (o->used == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
continue;
if (c->repl_curr_off != c->repl_end_off){
long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off);
long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog
* in the event of a wrap around write */
/* normal case with no wrap around */
if (repl_end_idx >= repl_curr_idx){
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx);
/* wrap around case */
} else {
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx);
/* only attempt wrapping if we write the correct number of bytes */
if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){
nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx);
if (nwritten2ndStage != -1)
nwritten += nwritten2ndStage;
}
}
nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
/* If we fully sent the object on head go to the next one */
if (c->sentlen == o->used) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
/* only increment bytes if an error didn't occur */
if (nwritten > 0){
totwritten += nwritten;
c->repl_curr_off += nwritten;
serverAssert(c->repl_curr_off <= c->repl_end_off);
}
/* If the second part of a write didn't go through, we still need to register that */
if (nwritten2ndStage == -1) nwritten = -1;
}
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interface).
*
* However if we are over the maxmemory limit we ignore that and
* just deliver as much data as it is possible to deliver.
*
* Moreover, we also send as much as possible if the client is
* a replica or a monitor (otherwise, on high-speed traffic, the
* replication/output buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(g_pserver->maxmemory == 0 ||
zmalloc_used_memory() < g_pserver->maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
} else {
while(clientHasPendingReplies(c)) {
serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR);
if (c->bufpos > 0) {
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
o = (clientReplyBlock*)listNodeValue(listFirst(c->reply));
if (o->used == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
continue;
}
nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
/* If we fully sent the object on head go to the next one */
if (c->sentlen == o->used) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
}
}
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interface).
*
* However if we are over the maxmemory limit we ignore that and
* just deliver as much data as it is possible to deliver.
*
* Moreover, we also send as much as possible if the client is
* a replica or a monitor (otherwise, on high-speed traffic, the
* replication/output buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(g_pserver->maxmemory == 0 ||
zmalloc_used_memory() < g_pserver->maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
}
g_pserver->stat_net_output_bytes += totwritten;
if (nwritten == -1) {
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
@ -1898,27 +1939,37 @@ void ProcessPendingAsyncWrites()
serverAssert(c->fPendingAsyncWrite);
if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY))
{
zfree(c->replyAsync);
c->replyAsync = nullptr;
if (c->replyAsync != nullptr){
zfree(c->replyAsync);
c->replyAsync = nullptr;
}
c->fPendingAsyncWrite = FALSE;
continue;
}
int size = c->replyAsync->used;
/* since writes from master to replica can come directly from the replication backlog,
* writes may have been signalled without having been copied to the replyAsync buffer,
* thus causing the buffer to be NULL */
if (c->replyAsync != nullptr){
int size = c->replyAsync->used;
if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) {
memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size);
c->bufpos += size;
} else {
c->reply_bytes += c->replyAsync->size;
listAddNodeTail(c->reply, c->replyAsync);
if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) {
memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size);
c->bufpos += size;
} else {
c->reply_bytes += c->replyAsync->size;
listAddNodeTail(c->reply, c->replyAsync);
c->replyAsync = nullptr;
}
zfree(c->replyAsync);
c->replyAsync = nullptr;
} else {
/* Only replicas should have empty async reply buffers */
serverAssert(c->flags & CLIENT_SLAVE);
}
zfree(c->replyAsync);
c->replyAsync = nullptr;
c->fPendingAsyncWrite = FALSE;
// Now install the write event handler
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
/* For the fsync=always policy, we want that a given FD is never
@ -1968,7 +2019,6 @@ void ProcessPendingAsyncWrites()
* need to use a syscall in order to install the writable event handler,
* get it called, and so forth. */
int handleClientsWithPendingWrites(int iel, int aof_state) {
std::unique_lock<fastlock> lockf(g_pserver->rgthreadvar[iel].lockPendingWrite);
int processed = 0;
serverAssert(iel == (serverTL - g_pserver->rgthreadvar));
@ -1991,7 +2041,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
ae_flags |= AE_BARRIER;
}
std::unique_lock<fastlock> lockf(g_pserver->rgthreadvar[iel].lockPendingWrite);
auto vec = std::move(g_pserver->rgthreadvar[iel].clients_pending_write);
lockf.unlock();
processed += (int)vec.size();
for (client *c : vec) {
@ -2025,8 +2077,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR)
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) {
freeClientAsync(c);
}
}
}
@ -3647,6 +3700,12 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
}
}
/* In the case of a replica client, writes to said replica are using data from the replication backlog
* as opposed to it's own internal buffer, this number should keep track of that */
unsigned long getClientReplicationBacklogSharedUsage(client *c) {
return (!(c->flags & CLIENT_SLAVE) || !c->FPendingReplicaWrite() ) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off;
}
/* This function returns the number of bytes that Redis is
* using to store the reply still not read by the client.
*
@ -3655,9 +3714,11 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* enforcing the client output length limits. */
unsigned long getClientOutputBufferMemoryUsage(client *c) {
unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0);
return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0) + getClientReplicationBacklogSharedUsage(c);
}
/* Get the class of a client, used in order to enforce limits to different
* classes of clients.
*

View File

@ -902,7 +902,7 @@ size_t objectComputeSize(robj_roptr o, size_t sample_size) {
if (samples) asize += (double)elesize/samples*dictSize(d);
} else if (o->encoding == OBJ_ENCODING_INTSET) {
intset *is = (intset*)ptrFromObj(o);
asize = sizeof(*o)+sizeof(*is)+is->encoding*is->length;
asize = sizeof(*o)+sizeof(*is)+(size_t)is->encoding*is->length;
} else {
serverPanic("Unknown set encoding");
}

View File

@ -2788,6 +2788,8 @@ public:
if (fHighMemory)
performEvictions(false /* fPreSnapshot*/);
}
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
}
}
@ -2916,6 +2918,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
/* Key-specific attributes, set by opcodes before the key type. */
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now;
long long lru_clock = 0;
unsigned long long ckeysLoaded = 0;
uint64_t mvcc_tstamp = OBJ_MVCC_INVALID;
now = mstime();
rdbAsyncWorkThread wqueue(rsi, rdbflags, now);
@ -3189,6 +3192,13 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now;
fLastKeyExpired = fStaleMvccKey || fExpiredKey;
ckeysLoaded++;
if (g_pserver->m_pstorageFactory && (ckeysLoaded % 128) == 0)
{
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
}
if (g_pserver->key_load_delay)
debugDelay(g_pserver->key_load_delay);

View File

@ -103,7 +103,6 @@ static struct config {
int randomkeys_keyspacelen;
int keepalive;
int pipeline;
int showerrors;
long long start;
long long totlatency;
const char *title;
@ -313,7 +312,9 @@ static redisContext *getRedisContext(const char *ip, int port,
fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str);
else
fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str);
goto cleanup;
freeReplyObject(reply);
redisFree(ctx);
exit(1);
}
freeReplyObject(reply);
return ctx;
@ -370,9 +371,15 @@ fail:
fprintf(stderr, "ERROR: failed to fetch CONFIG from ");
if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port);
else fprintf(stderr, "%s\n", hostsocket);
int abort_test = 0;
if (!strncmp(reply->str,"NOAUTH",5) ||
!strncmp(reply->str,"WRONGPASS",9) ||
!strncmp(reply->str,"NOPERM",5))
abort_test = 1;
freeReplyObject(reply);
redisFree(c);
freeRedisConfig(cfg);
if (abort_test) exit(1);
return NULL;
}
static void freeRedisConfig(redisConfig *cfg) {
@ -517,44 +524,39 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
exit(1);
}
redisReply *r = (redisReply*)reply;
int is_err = (r->type == REDIS_REPLY_ERROR);
if (is_err && config.showerrors) {
/* TODO: static lasterr_time not thread-safe */
static time_t lasterr_time = 0;
time_t now = time(NULL);
if (lasterr_time != now) {
lasterr_time = now;
if (c->cluster_node) {
printf("Error from server %s:%d: %s\n",
if (r->type == REDIS_REPLY_ERROR) {
/* Try to update slots configuration if reply error is
* MOVED/ASK/CLUSTERDOWN and the key(s) used by the command
* contain(s) the slot hash tag.
* If the error is not topology-update related then we
* immediately exit to avoid false results. */
if (c->cluster_node && c->staglen) {
int fetch_slots = 0, do_wait = 0;
if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3))
fetch_slots = 1;
else if (!strncmp(r->str,"CLUSTERDOWN",11)) {
/* Usually the cluster is able to recover itself after
* a CLUSTERDOWN error, so try to sleep one second
* before requesting the new configuration. */
fetch_slots = 1;
do_wait = 1;
printf("Error from server %s:%d: %s.\n",
c->cluster_node->ip,
c->cluster_node->port,
r->str);
}
if (do_wait) sleep(1);
if (fetch_slots && !fetchClusterSlotsConfiguration(c))
exit(1);
} else {
if (c->cluster_node) {
printf("Error from server %s:%d: %s\n",
c->cluster_node->ip,
c->cluster_node->port,
r->str);
} else printf("Error from server: %s\n", r->str);
}
}
/* Try to update slots configuration if reply error is
* MOVED/ASK/CLUSTERDOWN and the key(s) used by the command
* contain(s) the slot hash tag. */
if (is_err && c->cluster_node && c->staglen) {
int fetch_slots = 0, do_wait = 0;
if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3))
fetch_slots = 1;
else if (!strncmp(r->str,"CLUSTERDOWN",11)) {
/* Usually the cluster is able to recover itself after
* a CLUSTERDOWN error, so try to sleep one second
* before requesting the new configuration. */
fetch_slots = 1;
do_wait = 1;
printf("Error from server %s:%d: %s\n",
c->cluster_node->ip,
c->cluster_node->port,
r->str);
}
if (do_wait) sleep(1);
if (fetch_slots && !fetchClusterSlotsConfiguration(c))
exit(1);
}
}
freeReplyObject(reply);
@ -1302,8 +1304,7 @@ static int fetchClusterSlotsConfiguration(client c) {
atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1);
if (is_fetching_slots) return -1; //TODO: use other codes || errno ?
atomicSet(config.is_fetching_slots, 1);
if (config.showerrors)
printf("Cluster slots configuration changed, fetching new one...\n");
printf("WARNING: Cluster slots configuration changed, fetching new one...\n");
const char *errmsg = "Failed to update cluster slots configuration";
static dictType dtype = {
dictSdsHash, /* hash function */
@ -1479,7 +1480,8 @@ int parseOptions(int argc, const char **argv) {
} else if (!strcmp(argv[i],"-I")) {
config.idlemode = 1;
} else if (!strcmp(argv[i],"-e")) {
config.showerrors = 1;
printf("WARNING: -e option has been deprecated. "
"We now immediatly exit on error to avoid false results.\n");
} else if (!strcmp(argv[i],"-t")) {
if (lastarg) goto invalid;
/* We get the list of tests to run as a string in the form
@ -1582,8 +1584,6 @@ usage:
" is executed. Default tests use this to hit random keys in the\n"
" specified range.\n"
" -P <numreq> Pipeline <numreq> requests. Default 1 (no pipeline).\n"
" -e If server replies with errors, show them on stdout.\n"
" (no more than 1 error per second is displayed)\n"
" -q Quiet. Just show query/sec values\n"
" --precision Number of decimal places to display in latency output (default 0)\n"
" --csv Output in CSV format\n"
@ -1711,7 +1711,6 @@ int main(int argc, const char **argv) {
config.keepalive = 1;
config.datasize = 3;
config.pipeline = 1;
config.showerrors = 0;
config.randomkeys = 0;
config.randomkeys_keyspacelen = 0;
config.quiet = 0;
@ -1794,8 +1793,9 @@ int main(int argc, const char **argv) {
} else {
config.redis_config =
getRedisConfig(config.hostip, config.hostport, config.hostsocket);
if (config.redis_config == NULL)
if (config.redis_config == NULL) {
fprintf(stderr, "WARN: could not fetch server CONFIG\n");
}
}
if (config.num_threads > 0) {
pthread_mutex_init(&(config.liveclients_mutex), NULL);
@ -1958,8 +1958,8 @@ int main(int argc, const char **argv) {
}
if (test_is_selected("lrange") || test_is_selected("lrange_500")) {
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 449",tag);
benchmark("LRANGE_500 (first 450 elements)",cmd,len);
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 499",tag);
benchmark("LRANGE_500 (first 500 elements)",cmd,len);
free(cmd);
}
@ -1987,6 +1987,7 @@ int main(int argc, const char **argv) {
} while(config.loop);
zfree(data);
zfree(data);
if (config.redis_config != NULL) freeRedisConfig(config.redis_config);
return 0;

View File

@ -39,12 +39,14 @@
static char error[1044];
static off_t epos;
static long long line = 1;
int consumeNewline(char *buf) {
if (strncmp(buf,"\r\n",2) != 0) {
ERROR("Expected \\r\\n, got: %02x%02x",buf[0],buf[1]);
return 0;
}
line += 1;
return 1;
}
@ -201,8 +203,8 @@ int redis_check_aof_main(int argc, char **argv) {
off_t pos = process(fp);
off_t diff = size-pos;
printf("AOF analyzed: size=%lld, ok_up_to=%lld, diff=%lld\n",
(long long) size, (long long) pos, (long long) diff);
printf("AOF analyzed: size=%lld, ok_up_to=%lld, ok_up_to_line=%lld, diff=%lld\n",
(long long) size, (long long) pos, line, (long long) diff);
if (diff > 0) {
if (fix) {
char buf[2];

View File

@ -250,7 +250,7 @@ int redis_check_rdb(const char *rdbfilename, FILE *fp) {
rdbstate.doing = RDB_CHECK_DOING_READ_LEN;
if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
goto eoferr;
rdbCheckInfo("Selecting DB ID %d", dbid);
rdbCheckInfo("Selecting DB ID %llu", (unsigned long long)dbid);
continue; /* Read type again. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently

View File

@ -472,7 +472,7 @@ static void cliOutputHelp(int argc, char **argv) {
help = entry->org;
if (group == -1) {
/* Compare all arguments */
if (argc == entry->argc) {
if (argc <= entry->argc) {
for (j = 0; j < argc; j++) {
if (strcasecmp(argv[j],entry->argv[j]) != 0) break;
}
@ -653,7 +653,9 @@ static int cliConnect(int flags) {
cliRefreshPrompt();
}
if (config.hostsocket == NULL) {
/* Do not use hostsocket when we got redirected in cluster mode */
if (config.hostsocket == NULL ||
(config.cluster_mode && config.cluster_reissue_command)) {
context = redisConnect(config.hostip,config.hostport);
} else {
context = redisConnectUnix(config.hostsocket);
@ -4559,7 +4561,7 @@ static void clusterManagerNodeArrayReset(clusterManagerNodeArray *array) {
static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array,
clusterManagerNode **nodeptr)
{
assert(array->nodes < (array->nodes + array->len));
assert(array->len > 0);
/* If the first node to be shifted is not NULL, decrement count. */
if (*array->nodes != NULL) array->count--;
/* Store the first node to be shifted into 'nodeptr'. */
@ -4572,7 +4574,7 @@ static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array,
static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
clusterManagerNode *node)
{
assert(array->nodes < (array->nodes + array->len));
assert(array->len > 0);
assert(node != NULL);
assert(array->count < array->len);
array->nodes[array->count++] = node;
@ -5949,7 +5951,7 @@ void showLatencyDistSamples(struct distsamples *samples, long long tot) {
printf("\033[38;5;0m"); /* Set foreground color to black. */
for (j = 0; ; j++) {
int coloridx =
ceil((float) samples[j].count / tot * (spectrum_palette_size-1));
ceil((double) samples[j].count / tot * (spectrum_palette_size-1));
int color = spectrum_palette[coloridx];
printf("\033[48;5;%dm%c", (int)color, samples[j].character);
samples[j].count = 0;

View File

@ -189,6 +189,7 @@ void createReplicationBacklog(void) {
g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL);
g_pserver->repl_backlog_histlen = 0;
g_pserver->repl_backlog_idx = 0;
g_pserver->repl_backlog_start = g_pserver->master_repl_offset;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
@ -200,6 +201,15 @@ void createReplicationBacklog(void) {
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
}
/* Compute the corresponding index from a replication backlog offset
* Since this computation needs the size of the replication backlog,
* you need to have the repl_backlog_lock in order to call it */
long long getReplIndexFromOffset(long long offset){
serverAssert(g_pserver->repl_backlog_lock.fOwnLock());
long long index = (offset - g_pserver->repl_backlog_start) % g_pserver->repl_backlog_size;
return index;
}
/* This function is called when the user modifies the replication backlog
* size at runtime. It is up to the function to both update the
* g_pserver->repl_backlog_size and to resize the buffer and setup it so that
@ -211,6 +221,8 @@ void resizeReplicationBacklog(long long newsize) {
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
if (g_pserver->repl_backlog_size == newsize) return;
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
if (g_pserver->repl_backlog != NULL) {
/* What we actually do is to flush the old buffer and realloc a new
* empty one. It will refill with new data incrementally.
@ -218,19 +230,23 @@ void resizeReplicationBacklog(long long newsize) {
* worse often we need to alloc additional space before freeing the
* old buffer. */
if (g_pserver->repl_batch_idxStart >= 0) {
// We need to keep critical data so we can't shrink less than the hot data in the buffer
newsize = std::max(newsize, g_pserver->master_repl_offset - g_pserver->repl_batch_offStart);
char *backlog = (char*)zmalloc(newsize);
g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - g_pserver->repl_batch_offStart;
/* get the critical client size, i.e. the size of the data unflushed to clients */
long long earliest_off = g_pserver->repl_lowest_off.load();
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
auto cbActiveBacklog = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart;
memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbActiveBacklog);
if (earliest_off != -1) {
// We need to keep critical data so we can't shrink less than the hot data in the buffer
newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off);
char *backlog = (char*)zmalloc(newsize);
g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - earliest_off;
long long earliest_idx = getReplIndexFromOffset(earliest_off);
if (g_pserver->repl_backlog_idx >= earliest_idx) {
auto cbActiveBacklog = g_pserver->repl_backlog_idx - earliest_idx;
memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbActiveBacklog);
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
} else {
auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart;
memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1);
auto cbPhase1 = g_pserver->repl_backlog_size - earliest_idx;
memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbPhase1);
memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx;
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
@ -238,7 +254,12 @@ void resizeReplicationBacklog(long long newsize) {
zfree(g_pserver->repl_backlog);
g_pserver->repl_backlog = backlog;
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
g_pserver->repl_batch_idxStart = 0;
if (g_pserver->repl_batch_idxStart >= 0) {
g_pserver->repl_batch_idxStart -= earliest_idx;
if (g_pserver->repl_batch_idxStart < 0)
g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size;
}
g_pserver->repl_backlog_start = earliest_off;
} else {
zfree(g_pserver->repl_backlog);
g_pserver->repl_backlog = (char*)zmalloc(newsize);
@ -246,11 +267,13 @@ void resizeReplicationBacklog(long long newsize) {
g_pserver->repl_backlog_idx = 0;
/* Next byte we have is... the next since the buffer is empty. */
g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1;
g_pserver->repl_backlog_start = g_pserver->master_repl_offset;
}
}
g_pserver->repl_backlog_size = newsize;
}
void freeReplicationBacklog(void) {
serverAssert(GlobalLocksAcquired());
listIter li;
@ -273,18 +296,63 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
serverAssert(GlobalLocksAcquired());
const unsigned char *p = (const unsigned char*)ptr;
if (g_pserver->repl_batch_idxStart >= 0) {
long long minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1;
if (minimumsize > g_pserver->repl_backlog_size) {
flushReplBacklogToClients();
serverAssert(g_pserver->master_repl_offset == g_pserver->repl_batch_offStart);
minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1;
if (minimumsize > g_pserver->repl_backlog_size && minimumsize < (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes) {
if (g_pserver->repl_batch_idxStart >= 0) {
/* We are lower bounded by the lowest replica offset, or the batch offset start if not applicable */
long long lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst);
if (lower_bound == -1)
lower_bound = g_pserver->repl_batch_offStart;
long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
if (minimumsize > g_pserver->repl_backlog_size) {
listIter li;
listNode *ln;
listRewind(g_pserver->slaves, &li);
long long maxClientBuffer = (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes;
if (maxClientBuffer <= 0)
maxClientBuffer = LLONG_MAX; // infinite essentially
long long min_offset = LLONG_MAX;
int listening_replicas = 0;
while ((ln = listNext(&li))) {
client *replica = (client*)listNodeValue(ln);
if (!canFeedReplicaReplBuffer(replica)) continue;
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
std::unique_lock<fastlock> ul(replica->lock);
// Would this client overflow? If so close it
long long neededBuffer = g_pserver->master_repl_offset + len - replica->repl_curr_off + 1;
if (neededBuffer > maxClientBuffer) {
sds clientInfo = catClientInfoString(sdsempty(),replica);
freeClientAsync(replica);
serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP due to exceeding output buffer hard limit.", clientInfo);
sdsfree(clientInfo);
continue;
}
min_offset = std::min(min_offset, replica->repl_curr_off);
++listening_replicas;
}
if (min_offset == LLONG_MAX) {
min_offset = g_pserver->repl_batch_offStart;
g_pserver->repl_lowest_off = -1;
} else {
g_pserver->repl_lowest_off = min_offset;
}
minimumsize = g_pserver->master_repl_offset + len - min_offset + 1;
serverAssert(listening_replicas == 0 || minimumsize <= maxClientBuffer);
if (minimumsize > g_pserver->repl_backlog_size && listening_replicas) {
// This is an emergency overflow, we better resize to fit
long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize);
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize);
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld bytes", newsize);
resizeReplicationBacklog(newsize);
} else if (!listening_replicas) {
// We need to update a few variables or later asserts will notice we dropped data
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset + len;
g_pserver->repl_lowest_off = -1;
}
}
}
@ -293,6 +361,7 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
/* This is a circular buffer, so write as much data we can at every
* iteration and rewind the "idx" index if we reach the limit. */
while(len) {
size_t thislen = g_pserver->repl_backlog_size - g_pserver->repl_backlog_idx;
if (thislen > len) thislen = len;
@ -479,7 +548,6 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc)
if (fSendRaw)
{
char aux[LONG_STR_SIZE+3];
/* Add the multi bulk reply length. */
aux[0] = '*';
int multilen = ll2string(aux+1,sizeof(aux)-1,argc);
@ -653,15 +721,19 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
decrRefCount(cmdobj);
}
int prepareClientToWrite(client *c);
/* Feed the replica 'c' with the replication backlog starting from the
* specified 'offset' up to the end of the backlog. */
long long addReplyReplicationBacklog(client *c, long long offset) {
long long j, skip, len;
long long skip, len;
serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);
if (g_pserver->repl_backlog_histlen == 0) {
serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
c->repl_curr_off = g_pserver->master_repl_offset;
c->repl_end_off = g_pserver->master_repl_offset;
return 0;
}
@ -678,31 +750,19 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
skip = offset - g_pserver->repl_backlog_off;
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
/* Point j to the oldest byte, that is actually our
* g_pserver->repl_backlog_off byte. */
j = (g_pserver->repl_backlog_idx +
(g_pserver->repl_backlog_size-g_pserver->repl_backlog_histlen)) %
g_pserver->repl_backlog_size;
serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
/* Discard the amount of data to seek to the specified 'offset'. */
j = (j + skip) % g_pserver->repl_backlog_size;
/* Feed replica with data. Since it is a circular buffer we have to
* split the reply in two parts if we are cross-boundary. */
len = g_pserver->repl_backlog_histlen - skip;
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
while(len) {
long long thislen =
((g_pserver->repl_backlog_size - j) < len) ?
(g_pserver->repl_backlog_size - j) : len;
serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
addReplySds(c,sdsnewlen(g_pserver->repl_backlog + j, thislen));
len -= thislen;
j = 0;
}
return g_pserver->repl_backlog_histlen - skip;
/* Set the start and end offsets for the replica so that a future
* writeToClient will send the backlog from the given offset to
* the current end of the backlog to said replica */
c->repl_curr_off = offset - 1;
c->repl_end_off = g_pserver->master_repl_offset;
/* Force the partial sync to be queued */
prepareClientToWrite(c);
return len;
}
/* Return the offset to provide as reply to the PSYNC command received
@ -735,6 +795,10 @@ int replicationSetupSlaveForFullResync(client *replica, long long offset) {
replica->psync_initial_offset = offset;
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
replica->repl_curr_off = offset;
replica->repl_end_off = g_pserver->master_repl_offset;
/* We are going to accumulate the incremental changes for this
* replica as well. Set replicaseldb to -1 in order to force to re-emit
* a SELECT statement in the replication stream. */
@ -1357,6 +1421,7 @@ void replconfCommand(client *c) {
* 4) Update the count of "good replicas". */
void putSlaveOnline(client *replica) {
replica->replstate = SLAVE_STATE_ONLINE;
replica->repl_put_online_on_ack = 0;
replica->repl_ack_time = g_pserver->unixtime; /* Prevent false timeout. */
@ -3059,6 +3124,11 @@ void syncWithMaster(connection *conn) {
if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
/* Reset the bulklen information in case it is lingering from the last connection
* The partial sync will start from the beginning of a command so these should be reset */
mi->master->reqtype = 0;
mi->master->multibulklen = 0;
mi->master->bulklen = -1;
if (cserver.supervised_mode == SUPERVISED_SYSTEMD) {
redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n");
}
@ -4287,6 +4357,8 @@ void replicationCron(void) {
replicationStartPendingFork();
trimReplicationBacklog();
/* Remove the RDB file used for replication if Redis is not running
* with any persistence. */
removeRDBUsedToSyncReplicas();
@ -4898,15 +4970,19 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long
}
void _clientAsyncReplyBufferReserve(client *c, size_t len);
void flushReplBacklogToClients()
{
serverAssert(GlobalLocksAcquired());
/* If we have the repl backlog lock, we will deadlock */
serverAssert(!g_pserver->repl_backlog_lock.fOwnLock());
if (g_pserver->repl_batch_offStart < 0)
return;
if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) {
bool fAsyncWrite = false;
long long min_offset = LONG_LONG_MAX;
// Ensure no overflow
serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset);
if (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart > g_pserver->repl_backlog_size) {
// We overflowed
@ -4930,33 +5006,32 @@ void flushReplBacklogToClients()
listIter li;
listNode *ln;
listRewind(g_pserver->slaves, &li);
/* We don't actually write any data in this function since we send data
* directly from the replication backlog to replicas in writeToClient.
*
* What we do however, is set the end offset of each replica here. This way,
* future calls to writeToClient will know up to where in the replication
* backlog is valid for writing. */
while ((ln = listNext(&li))) {
client *replica = (client*)listNodeValue(ln);
if (!canFeedReplicaReplBuffer(replica)) continue;
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
std::unique_lock<fastlock> ul(replica->lock, std::defer_lock);
if (FCorrectThread(replica))
ul.lock();
else
std::unique_lock<fastlock> ul(replica->lock);
if (!FCorrectThread(replica))
fAsyncWrite = true;
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart;
serverAssert((g_pserver->master_repl_offset - g_pserver->repl_batch_offStart) == cbCopy);
serverAssert((g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart) >= (cbCopy));
serverAssert((g_pserver->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size);
/* We should have set the repl_curr_off when synchronizing, so it shouldn't be -1 here */
serverAssert(replica->repl_curr_off != -1);
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy);
} else {
auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart;
if (fAsyncWrite)
_clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx);
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1);
addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart));
}
min_offset = std::min(min_offset, replica->repl_curr_off);
replica->repl_end_off = g_pserver->master_repl_offset;
/* Only if the there isn't already a pending write do we prepare the client to write */
serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset);
prepareClientToWrite(replica);
}
if (fAsyncWrite)
ProcessPendingAsyncWrites();
@ -4965,6 +5040,7 @@ LDone:
// This may be called multiple times per "frame" so update with our progress flushing to clients
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst);
}
}
@ -5041,3 +5117,17 @@ void updateFailoverStatus(void) {
g_pserver->target_replica_port);
}
}
// If we automatically grew the backlog we need to trim it back to
// the config setting when possible
void trimReplicationBacklog() {
serverAssert(GlobalLocksAcquired());
serverAssert(g_pserver->repl_batch_offStart < 0); // we shouldn't be in a batch
if (g_pserver->repl_backlog_size <= g_pserver->repl_backlog_config_size)
return; // We're already a good size
if (g_pserver->repl_lowest_off > 0 && (g_pserver->master_repl_offset - g_pserver->repl_lowest_off + 1) > g_pserver->repl_backlog_config_size)
return; // There is untransmitted data we can't truncate
serverLog(LL_NOTICE, "Reclaiming %lld replication backlog bytes", g_pserver->repl_backlog_size - g_pserver->repl_backlog_config_size);
resizeReplicationBacklog(g_pserver->repl_backlog_config_size);
}

View File

@ -31,6 +31,7 @@
#include "sha1.h"
#include "rand.h"
#include "cluster.h"
#include "monotonic.h"
extern "C" {
#include <lua.h>
@ -1437,7 +1438,7 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body) {
/* This is the Lua script "count" hook that we use to detect scripts timeout. */
void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
long long elapsed = mstime() - g_pserver->lua_time_start;
long long elapsed = elapsedMs(g_pserver->lua_time_start);
UNUSED(ar);
UNUSED(lua);
@ -1591,7 +1592,8 @@ void evalGenericCommand(client *c, int evalsha) {
serverTL->in_eval = 1;
g_pserver->lua_caller = c;
g_pserver->lua_cur_script = funcname + 2;
g_pserver->lua_time_start = mstime();
g_pserver->lua_time_start = getMonotonicUs();
g_pserver->lua_time_snapshot = mstime();
g_pserver->lua_kill = 0;
if (g_pserver->lua_time_limit > 0 && ldb.active == 0) {
lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000);
@ -2750,7 +2752,7 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
/* Check if a timeout occurred. */
if (ar->event == LUA_HOOKCOUNT && ldb.step == 0 && bp == 0) {
mstime_t elapsed = mstime() - g_pserver->lua_time_start;
mstime_t elapsed = elapsedMs(g_pserver->lua_time_start);
mstime_t timelimit = g_pserver->lua_time_limit ?
g_pserver->lua_time_limit : 5000;
if (elapsed >= timelimit) {
@ -2780,6 +2782,7 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
lua_pushstring(lua, "timeout during Lua debugging with client closing connection");
lua_error(lua);
}
g_pserver->lua_time_start = mstime();
g_pserver->lua_time_start = getMonotonicUs();
g_pserver->lua_time_snapshot = mstime();
}
}

View File

@ -4131,16 +4131,16 @@ void sentinelSetCommand(client *c) {
int numargs = j-old_j+1;
switch(numargs) {
case 2:
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",ptrFromObj(c->argv[old_j]),
ptrFromObj(c->argv[old_j+1]));
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",szFromObj(c->argv[old_j]),
szFromObj(c->argv[old_j+1]));
break;
case 3:
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",ptrFromObj(c->argv[old_j]),
ptrFromObj(c->argv[old_j+1]),
ptrFromObj(c->argv[old_j+2]));
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",szFromObj(c->argv[old_j]),
szFromObj(c->argv[old_j+1]),
szFromObj(c->argv[old_j+2]));
break;
default:
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",ptrFromObj(c->argv[old_j]));
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",szFromObj(c->argv[old_j]));
break;
}
}

View File

@ -2021,7 +2021,6 @@ void clientsCron(int iel) {
while(listLength(g_pserver->clients) && iterations--) {
client *c;
listNode *head;
/* Rotate the list, take the current head, process.
* This way if the client must be removed from the list it's the
* first element and we don't incur into O(N) computation. */
@ -2125,7 +2124,7 @@ void databasesCron(bool fMainThread) {
::dict *dict = g_pserver->db[rehash_db]->dictUnsafeKeyOnly();
/* Are we async rehashing? And if so is it time to re-calibrate? */
/* The recalibration limit is a prime number to ensure balancing across threads */
if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled && cserver.cthreads > 1) {
if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled && cserver.cthreads > 1 && dictSize(dict) > 2048 && dictIsRehashing(dict) && !g_pserver->loading) {
serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms);
++async_rehashes;
}
@ -3261,6 +3260,7 @@ void initServerConfig(void) {
g_pserver->enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER;
g_pserver->repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
g_pserver->master_repl_offset = 0;
g_pserver->repl_lowest_off.store(-1, std::memory_order_seq_cst);
/* Replication partial resync backlog */
g_pserver->repl_backlog = NULL;
@ -4677,6 +4677,8 @@ int processCommand(client *c, int callFlags) {
return C_OK;
}
int is_read_command = (c->cmd->flags & CMD_READONLY) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_READONLY));
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
@ -4895,7 +4897,7 @@ int processCommand(client *c, int callFlags) {
c->cmd->proc != discardCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != unwatchCommand &&
c->cmd->proc != resetCommand &&
c->cmd->proc != resetCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)ptrFromObj(c->argv[1]))[0]) == 'n') &&
@ -4907,6 +4909,14 @@ int processCommand(client *c, int callFlags) {
return C_OK;
}
/* Prevent a replica from sending commands that access the keyspace.
* The main objective here is to prevent abuse of client pause check
* from which replicas are exempt. */
if ((c->flags & CLIENT_SLAVE) && (is_may_replicate_command || is_write_command || is_read_command)) {
rejectCommandFormat(c, "Replica can't interract with the keyspace");
return C_OK;
}
/* If the server is paused, block the client until
* the pause has ended. Replicas are never paused. */
if (!(c->flags & CLIENT_SLAVE) &&
@ -6102,10 +6112,11 @@ sds genRedisInfoString(const char *section) {
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info, "# Keyspace\r\n");
for (j = 0; j < cserver.dbnum; j++) {
long long keys, vkeys;
long long keys, vkeys, cachedKeys;
keys = g_pserver->db[j]->size();
vkeys = g_pserver->db[j]->expireSize();
cachedKeys = g_pserver->db[j]->size(true /* fCachedOnly */);
// Adjust TTL by the current time
mstime_t mstime;
@ -6117,8 +6128,8 @@ sds genRedisInfoString(const char *section) {
if (keys || vkeys) {
info = sdscatprintf(info,
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
j, keys, vkeys, static_cast<long long>(g_pserver->db[j]->avg_ttl));
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld,cached_keys=%lld\r\n",
j, keys, vkeys, static_cast<long long>(g_pserver->db[j]->avg_ttl), cachedKeys);
}
}
}
@ -6136,7 +6147,11 @@ sds genRedisInfoString(const char *section) {
"variant:enterprise\r\n"
"license_status:%s\r\n"
"mvcc_depth:%d\r\n",
#ifdef NO_LICENSE_CHECK
"OK",
#else
cserver.license_key ? "OK" : "Trial",
#endif
mvcc_depth
);
}
@ -7012,9 +7027,10 @@ void OnTerminate()
void wakeTimeThread() {
updateCachedTime();
std::lock_guard<std::mutex> lock(time_thread_mutex);
if (sleeping_threads >= cserver.cthreads)
time_thread_cv.notify_one();
sleeping_threads--;
serverAssert(sleeping_threads >= 0);
time_thread_cv.notify_one();
}
void *timeThreadMain(void*) {
@ -7085,6 +7101,8 @@ static void validateConfiguration()
serverLog(LL_WARNING, "\tKeyDB will now exit. Please update your configuration file.");
exit(EXIT_FAILURE);
}
g_pserver->repl_backlog_config_size = g_pserver->repl_backlog_size; // this is normally set in the update logic, but not on initial config
}
int iAmMaster(void) {

View File

@ -1095,7 +1095,7 @@ public:
redisDbPersistentData(redisDbPersistentData &&) = default;
size_t slots() const { return dictSlots(m_pdict); }
size_t size() const;
size_t size(bool fCachedOnly = false) const;
void expand(uint64_t slots) { dictExpand(m_pdict, slots); }
void trackkey(robj_roptr o, bool fUpdate)
@ -1591,6 +1591,12 @@ struct client {
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this replica output buffer
should use. */
long long repl_curr_off = -1;/* Replication offset of the replica, also where in the backlog we need to start from
* when sending data to this replica. */
long long repl_end_off = -1; /* Replication offset to write to, stored in the replica, as opposed to using the global offset
* to prevent needing the global lock */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
char *slave_addr; /* Optionally given by REPLCONF ip-address */
@ -1652,6 +1658,10 @@ struct client {
robj **argv;
size_t argv_len_sumActive = 0;
bool FPendingReplicaWrite() const {
return repl_curr_off != repl_end_off;
}
// post a function from a non-client thread to run on its client thread
bool postFunction(std::function<void(client *)> fn, bool fLock = true);
size_t argv_len_sum() const;
@ -1991,7 +2001,7 @@ public:
// Per-thread variabels that may be accessed without a lock
struct redisServerThreadVars {
aeEventLoop *el;
aeEventLoop *el = nullptr;
socketFds ipfd; /* TCP socket file descriptors */
socketFds tlsfd; /* TLS socket file descriptors */
int in_eval; /* Are we inside EVAL? */
@ -2355,11 +2365,15 @@ struct redisServer {
int repl_ping_slave_period; /* Master pings the replica every N seconds */
char *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */
long long repl_backlog_config_size; /* The repl backlog may grow but we want to know what the user set it to */
long long repl_backlog_histlen; /* Backlog actual data length */
long long repl_backlog_idx; /* Backlog circular buffer current offset,
that is the next byte will'll write to.*/
long long repl_backlog_off; /* Replication "master offset" of first
byte in the replication backlog buffer.*/
long long repl_backlog_start; /* Used to compute indicies from offsets
basically, index = (offset - start) % size */
fastlock repl_backlog_lock {"replication backlog"};
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
time_t repl_no_slaves_since; /* We have no slaves since that time.
@ -2371,6 +2385,8 @@ struct redisServer {
int repl_diskless_load; /* Slave parse RDB directly from the socket.
* see REPL_DISKLESS_LOAD_* enum */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
std::atomic <long long> repl_lowest_off; /* The lowest offset amongst all replicas
-1 if there are no replicas */
/* Replication (replica) */
list *masters;
int enable_multimaster;
@ -2475,7 +2491,8 @@ struct redisServer {
::dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */
mstime_t lua_time_limit; /* Script timeout in milliseconds */
mstime_t lua_time_start; /* Start time of script, milliseconds time */
monotime lua_time_start; /* monotonic timer to detect timed-out script */
mstime_t lua_time_snapshot; /* Snapshot of mstime when script is started */
int lua_write_dirty; /* True if a write command was called during the
execution of the current script. */
int lua_random_dirty; /* True if a random command was called during the
@ -2865,6 +2882,7 @@ void disableTracking(client *c);
void trackingRememberKeys(client *c);
void trackingInvalidateKey(client *c, robj *keyobj);
void trackingInvalidateKeysOnFlush(int async);
void freeTrackingRadixTree(rax *rt);
void freeTrackingRadixTreeAsync(rax *rt);
void trackingLimitUsedSlots(void);
uint64_t trackingGetTotalItems(void);
@ -3014,6 +3032,8 @@ void clearFailoverState(void);
void updateFailoverStatus(void);
void abortFailover(redisMaster *mi, const char *err);
const char *getFailoverStateString();
int canFeedReplicaReplBuffer(client *replica);
void trimReplicationBacklog();
/* Generic persistence functions */
void startLoadingFile(FILE* fp, const char * filename, int rdbflags);
@ -3716,6 +3736,8 @@ void mixDigest(unsigned char *digest, const void *ptr, size_t len);
void xorDigest(unsigned char *digest, const void *ptr, size_t len);
int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags);
int moduleGILAcquiredByModule(void);
extern int g_fInCrash;
static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate
@ -3783,6 +3805,7 @@ void tlsCleanup(void);
int tlsConfigure(redisTLSContextConfig *ctx_config);
class ShutdownException
{};
@ -3794,3 +3817,5 @@ class ShutdownException
int iAmMaster(void);
#endif

View File

@ -810,7 +810,7 @@ void stralgoLCS(client *c) {
/* Setup an uint32_t array to store at LCS[i,j] the length of the
* LCS A0..i-1, B0..j-1. Note that we have a linear array here, so
* we index it as LCS[j+(blen+1)*j] */
uint32_t *lcs = (uint32_t*)zmalloc((alen+1)*(blen+1)*sizeof(uint32_t));
uint32_t *lcs = (uint32_t*)zmalloc((size_t)(alen+1)*(blen+1)*sizeof(uint32_t));
#define LCS(A,B) lcs[(B)+((A)*(blen+1))]
/* Start building the LCS table. */

View File

@ -158,6 +158,18 @@ tags {"aof"} {
assert_match "*not valid*" $result
}
test "Short read: Utility should show the abnormal line num in AOF" {
create_aof {
append_to_aof [formatCommand set foo hello]
append_to_aof "!!!"
}
catch {
exec src/keydb-check-aof $aof_path
} result
assert_match "*ok_up_to_line=8*" $result
}
test "Short read: Utility should be able to fix the AOF" {
set result [exec src/keydb-check-aof --fix $aof_path << "y\n"]
assert_match "*Successfully truncated AOF*" $result

View File

@ -39,6 +39,7 @@ proc show_cluster_status {} {
# all the lists are empty.
#
# regexp {^[0-9]+:[A-Z] [0-9]+ [A-z]+ [0-9]+ ([0-9:.]+) .*} $l - logdate
catch {
while 1 {
# Find the log with smallest time.
set empty 0
@ -67,6 +68,7 @@ proc show_cluster_status {} {
puts "\[$best port $R_port($best)\] [lindex $log($best) 0]"
set log($best) [lrange $log($best) 1 end]
}
}
}
}

View File

@ -33,7 +33,8 @@ start_server {tags {"maxmemory"}} {
# Get the current memory limit and calculate a new limit.
# We just add 100k to the current memory size so that it is
# fast for us to reach that limit.
set used [s used_memory]
set overhead [s mem_not_counted_for_evict]
set used [expr [s used_memory] - $overhead]
set limit [expr {$used+100*1024}]
r config set maxmemory $limit
r config set maxmemory-policy $policy
@ -42,7 +43,7 @@ start_server {tags {"maxmemory"}} {
while 1 {
r setex [randomKey] 10000 x
incr numkeys
if {[s used_memory]+4096 > $limit} {
if {[expr {[s used_memory] - $overhead + 4096}] > $limit} {
assert {$numkeys > 10}
break
}
@ -52,7 +53,8 @@ start_server {tags {"maxmemory"}} {
for {set j 0} {$j < $numkeys} {incr j} {
r setex [randomKey] 10000 x
}
assert {[s used_memory] < ($limit+4096)}
set used_amt [expr [s used_memory] - $overhead]
assert {$used_amt < ($limit+4096)}
}
}
@ -65,7 +67,8 @@ start_server {tags {"maxmemory"}} {
# Get the current memory limit and calculate a new limit.
# We just add 100k to the current memory size so that it is
# fast for us to reach that limit.
set used [s used_memory]
set overhead [s mem_not_counted_for_evict]
set used [expr [s used_memory] - $overhead]
set limit [expr {$used+100*1024}]
r config set maxmemory $limit
r config set maxmemory-policy $policy
@ -74,7 +77,7 @@ start_server {tags {"maxmemory"}} {
while 1 {
r set [randomKey] x
incr numkeys
if {[s used_memory]+4096 > $limit} {
if {[expr [s used_memory] - $overhead]+4096 > $limit} {
assert {$numkeys > 10}
break
}
@ -91,7 +94,7 @@ start_server {tags {"maxmemory"}} {
}
}
if {[string match allkeys-* $policy]} {
assert {[s used_memory] < ($limit+4096)}
assert {[expr [s used_memory] - $overhead] < ($limit+4096)}
} else {
assert {$err == 1}
}
@ -107,7 +110,8 @@ start_server {tags {"maxmemory"}} {
# Get the current memory limit and calculate a new limit.
# We just add 100k to the current memory size so that it is
# fast for us to reach that limit.
set used [s used_memory]
set overhead [s mem_not_counted_for_evict]
set used [expr [s used_memory] - $overhead]
set limit [expr {$used+100*1024}]
r config set maxmemory $limit
r config set maxmemory-policy $policy
@ -121,7 +125,7 @@ start_server {tags {"maxmemory"}} {
} else {
r set "key:$numkeys" x
}
if {[s used_memory]+4096 > $limit} {
if {[expr [s used_memory] - $overhead]+4096 > $limit} {
assert {$numkeys > 10}
break
}
@ -135,7 +139,7 @@ start_server {tags {"maxmemory"}} {
catch {r setex "foo:$j" 10000 x}
}
# We should still be under the limit.
assert {[s used_memory] < ($limit+4096)}
assert {[expr [s used_memory] - $overhead] < ($limit+4096)}
# However all our non volatile keys should be here.
for {set j 0} {$j < $numkeys} {incr j 2} {
assert {[r exists "key:$j"]}
@ -305,7 +309,8 @@ start_server {tags {"maxmemory"} overrides {server-threads 1}} {
# we need to make sure to evict keynames of a total size of more than
# 16kb since the (PROTO_REPLY_CHUNK_BYTES), only after that the
# invalidation messages have a chance to trigger further eviction.
set used [s used_memory]
set overhead [s mem_not_counted_for_evict]
set used [expr [s used_memory] - $overhead]
set limit [expr {$used - 40000}]
r config set maxmemory $limit

View File

@ -395,7 +395,7 @@ start_server {tags {"defrag"} overrides {appendonly yes auto-aof-rewrite-percent
# if the current slab is lower in utilization the defragger would have ended up in stagnation,
# keept running and not move any allocation.
# this test is more consistent on a fresh server with no history
start_server {tags {"defrag"} overrides {save ""}} {
start_server {tags {"defrag"} overrides {save "" server-threads 1}} {
r flushdb
r config resetstat
r config set hz 100

View File

@ -25,7 +25,7 @@ test {CONFIG SET port number} {
test {CONFIG SET bind address} {
start_server {} {
# non-valid address
catch {r CONFIG SET bind "some.wrong.bind.address"} e
catch {r CONFIG SET bind "999.999.999.999"} e
assert_match {*Failed to bind to specified addresses*} $e
# make sure server still bound to the previous address

View File

@ -395,6 +395,17 @@ start_server {tags {"tracking network"}} {
assert {[lindex msg 2] eq {} }
}
test {Test ASYNC flushall} {
clean_all
r CLIENT TRACKING on REDIRECT $redir_id
r GET key1
r GET key2
assert_equal [s 0 tracking_total_keys] 2
$rd_sg FLUSHALL ASYNC
assert_equal [s 0 tracking_total_keys] 0
assert_equal [lindex [$rd_redirection read] 2] {}
}
# Keys are defined to be evicted 100 at a time by default.
# If after eviction the number of keys still surpasses the limit
# defined in tracking-table-max-keys, we increases eviction