Merge branch 'keydbpro' into PRO_RELEASE_6

Former-commit-id: bc4b0ff8c18410ae1d1b029410580c96c430510f
This commit is contained in:
John Sully 2020-02-17 19:56:38 -05:00
commit a6bc30fc09
29 changed files with 587 additions and 180 deletions

2
.gitignore vendored
View File

@ -19,7 +19,7 @@ keydb-cli
redis-sentinel
keydb-sentinel
redis-server
keydb-server
keydb-pro-server
doc-tools
release
misc/*

View File

@ -158,19 +158,19 @@ Running KeyDB
To run KeyDB with the default configuration just type:
% cd src
% ./keydb-server
% ./keydb-pro-server
If you want to provide your keydb.conf, you have to run it using an additional
parameter (the path of the configuration file):
% cd src
% ./keydb-server /path/to/keydb.conf
% ./keydb-pro-server /path/to/keydb.conf
It is possible to alter the KeyDB configuration by passing parameters directly
as options using the command line. Examples:
% ./keydb-server --port 9999 --replicaof 127.0.0.1 6379
% ./keydb-server /etc/keydb/6379.conf --loglevel debug
% ./keydb-pro-server --port 9999 --replicaof 127.0.0.1 6379
% ./keydb-pro-server /etc/keydb/6379.conf --loglevel debug
All the options in keydb.conf are also supported as options using the command
line, with exactly the same name.
@ -178,7 +178,7 @@ line, with exactly the same name.
Playing with KeyDB
------------------
You can use keydb-cli to play with KeyDB. Start a keydb-server instance,
You can use keydb-cli to play with KeyDB. Start a keydb-pro-server instance,
then in another terminal try the following:
% cd src
@ -243,7 +243,7 @@ Simply make a directory you would like to have the latest binaries dumped in, th
```
$ docker run -it --rm -v /path-to-dump-binaries:/keydb_bin eqalpha/keydb-build-bin
```
You should receive the following files: keydb-benchmark, keydb-check-aof, keydb-check-rdb, keydb-cli, keydb-sentinel, keydb-server
You should receive the following files: keydb-benchmark, keydb-check-aof, keydb-check-rdb, keydb-cli, keydb-sentinel, keydb-pro-server
If you are looking to enable flash support with the build (make MALLOC=memkind) then use the following command:
```

View File

@ -29,9 +29,9 @@ INSTALL_INCLUDE_PATH= $(DESTDIR)$(PREFIX)/$(INCLUDE_PATH)
INSTALL_LIBRARY_PATH= $(DESTDIR)$(PREFIX)/$(LIBRARY_PATH)
INSTALL_PKGCONF_PATH= $(INSTALL_LIBRARY_PATH)/$(PKGCONF_PATH)
# keydb-server configuration used for testing
# keydb-pro-server configuration used for testing
REDIS_PORT=56379
REDIS_SERVER=keydb-server
REDIS_SERVER=keydb-pro-server
define REDIS_TEST_CONFIG
daemonize yes
pidfile /tmp/hiredis-test-redis.pid

View File

@ -3,7 +3,7 @@
# Note that in order to read the configuration file, KeyDB must be
# started with the file path as first argument:
#
# ./keydb-server /path/to/keydb.conf
# ./keydb-pro-server /path/to/keydb.conf
# Note on units: when memory size is needed, it is possible to specify
# it in the usual form of 1k 5GB 4M and so forth:

View File

@ -319,7 +319,7 @@ else
endif
@touch $@
# keydb-server
# keydb-pro-server
$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ)
$(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS)

View File

@ -276,7 +276,8 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
cmd.proc = proc;
cmd.clientData = arg;
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
AE_ASSERT(size == sizeof(cmd));
if (size != sizeof(cmd))
return AE_ERR;
return AE_OK;
}
@ -402,10 +403,18 @@ extern "C" void aeDeleteEventLoop(aeEventLoop *eventLoop) {
aeApiFree(eventLoop);
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
fastlock_free(&eventLoop->flock);
close(eventLoop->fdCmdRead);
close(eventLoop->fdCmdWrite);
auto *te = eventLoop->timeEventHead;
while (te)
{
auto *teNext = te->next;
zfree(te);
te = teNext;
}
zfree(eventLoop);
}
extern "C" void aeStop(aeEventLoop *eventLoop) {

View File

@ -124,6 +124,21 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
}
}
void installAofRewriteEvent()
{
serverTL->fRetrySetAofEvent = false;
if (!g_pserver->aof_rewrite_pending) {
g_pserver->aof_rewrite_pending = true;
int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] {
g_pserver->aof_rewrite_pending = false;
if (g_pserver->aof_pipe_write_data_to_child >= 0)
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL);
});
if (res != AE_OK)
serverTL->fRetrySetAofEvent = true;
}
}
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(g_pserver->aof_rewrite_buf_blocks);
@ -165,14 +180,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
/* Install a file event to send data to the rewrite child if there is
* not one already. */
if (!g_pserver->aof_rewrite_pending) {
g_pserver->aof_rewrite_pending = true;
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] {
g_pserver->aof_rewrite_pending = false;
if (g_pserver->aof_pipe_write_data_to_child >= 0)
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL);
});
}
installAofRewriteEvent();
}
/* Write the buffer (possibly composed of multiple blocks) into the specified
@ -346,6 +354,9 @@ void flushAppendOnlyFile(int force) {
int sync_in_progress = 0;
mstime_t latency;
if (serverTL->fRetrySetAofEvent)
installAofRewriteEvent();
if (sdslen(g_pserver->aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
@ -1595,16 +1606,18 @@ error:
void aofClosePipes(void) {
int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child;
aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{
int res = aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{
aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE);
close (fdAofAckPipe);
});
serverAssert(res == AE_OK);
int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child;
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{
res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{
aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE);
close(fdAofWritePipe);
});
serverAssert(res == AE_OK);
g_pserver->aof_pipe_write_data_to_child = -1;
close(g_pserver->aof_pipe_read_data_from_parent);

View File

@ -296,6 +296,15 @@ int clusterLoadConfig(char *filename) {
if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) {
g_pserver->cluster->currentEpoch = clusterGetMaxEpoch();
}
if (dictSize(g_pserver->cluster->nodes) > 1 && cserver.thread_min_client_threshold < 100)
{
// Because we expect the individual load of a client to be much less in a cluster (it will spread over multiple server)
// we can increase the grouping of clients on a single thread within reason
cserver.thread_min_client_threshold *= dictSize(g_pserver->cluster->nodes);
cserver.thread_min_client_threshold = std::min(cserver.thread_min_client_threshold, 200);
serverLog(LL_NOTICE, "Expanding min-clients-per-thread to %d due to cluster", cserver.thread_min_client_threshold);
}
return C_OK;
fmterr:
@ -624,9 +633,10 @@ void freeClusterLink(clusterLink *link) {
if (link->node)
link->node->link = NULL;
link->node = nullptr;
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{
int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{
freeClusterLink(link);
});
serverAssert(res == AE_OK);
return;
}
if (link->conn) {

View File

@ -189,9 +189,8 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) {
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
expireIfNeeded(db,key);
robj *o = lookupKey(db,key,flags|LOOKUP_UPDATEMVCC);
if (expireIfNeeded(db,key))
o = nullptr;
return o;
}
@ -412,7 +411,9 @@ bool redisDbPersistentData::syncDelete(robj *key)
auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key));
if (itr != nullptr)
{
dictAdd(m_pdictTombstone, sdsdup(szFromObj(key)), nullptr);
sds keyTombstone = sdsdup(szFromObj(key));
if (dictAdd(m_pdictTombstone, keyTombstone, nullptr) != DICT_OK)
sdsfree(keyTombstone);
}
}
if (g_pserver->cluster_enabled) slotToKeyDel(key);
@ -1197,7 +1198,7 @@ void shutdownCommand(client *c) {
* Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
if (g_pserver->loading || g_pserver->sentinel_mode)
flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
if (prepareForShutdown(flags) == C_OK) exit(0);
if (prepareForShutdown(flags) == C_OK) throw ShutdownException();
addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
}
@ -1496,6 +1497,14 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when)
rememberSlaveKeyWithExpire(db,key);
}
redisDb::~redisDb()
{
dictRelease(watched_keys);
dictRelease(ready_keys);
dictRelease(blocking_keys);
listRelease(defrag_later);
}
void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e)
{
serverAssert(GlobalLocksAcquired());
@ -2256,7 +2265,6 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges()
serverAssert(m_fTrackingChanges >= 0);
changelist vecRet;
fastlock_lock(&m_lockStorage);
if (m_spstorage != nullptr)
{
m_spstorage->beginWriteBatch();
@ -2296,7 +2304,6 @@ void redisDbPersistentData::commitChanges(const changelist &vec)
}
if (m_spstorage != nullptr)
m_spstorage->endWriteBatch();
fastlock_unlock(&m_lockStorage);
}
redisDbPersistentData::~redisDbPersistentData()
@ -2456,6 +2463,9 @@ std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t
spexpire = std::make_unique<expireEntry>(key, subkey, when);
else
spexpire->update(subkey, when);
if (subkey)
sdsfree(subkey);
}
*poffset = offset;

View File

@ -736,8 +736,26 @@ NULL
} else if (!strcasecmp(szFromObj(c->argv[1]),"stringmatch-test") && c->argc == 2) {
stringmatchlen_fuzz_test();
addReplyStatus(c,"Apparently Redis did not crash: test passed");
} else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 2) {
} else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 3) {
c->flags |= CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY;
if (!strcasecmp(szFromObj(c->argv[2]), "yes"))
{
redisMaster *mi = (redisMaster*)zcalloc(sizeof(redisMaster), MALLOC_LOCAL);
mi->master = c;
listAddNodeHead(g_pserver->masters, mi);
}
else if (strcasecmp(szFromObj(c->argv[2]), "flagonly")) // if we didn't set flagonly assume its an unset
{
serverAssert(c->flags & CLIENT_MASTER);
if (listLength(g_pserver->masters))
{
redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters));
serverAssert(mi->master == c);
listDelNode(g_pserver->masters, listFirst(g_pserver->masters));
zfree(mi);
}
c->flags &= ~(CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY);
}
addReply(c, shared.ok);
#ifdef USE_JEMALLOC
} else if(!strcasecmp(szFromObj(c->argv[1]),"mallctl") && c->argc >= 3) {
@ -1542,7 +1560,7 @@ void sigsegvHandler(int sig, siginfo_t *info, void *secret) {
"\n=== KEYDB BUG REPORT END. Make sure to include from START to END. ===\n\n"
" Please report the crash by opening an issue on github:\n\n"
" https://github.com/JohnSully/KeyDB/issues\n\n"
" Suspect RAM error? Use keydb-server --test-memory to verify it.\n\n"
" Suspect RAM error? Use keydb-pro-server --test-memory to verify it.\n\n"
);
/* free(messages); Don't call free() with possibly corrupted memory. */

View File

@ -74,6 +74,10 @@ extern int g_fInCrash;
#define __has_feature(x) 0
#endif
#ifdef __linux__
extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex);
#endif
#if __has_feature(thread_sanitizer)
/* Report that a lock has been created at address "lock". */
@ -206,6 +210,11 @@ DeadlockDetector g_dlock;
static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough");
uint64_t g_longwaits = 0;
extern "C" void fastlock_panic(struct fastlock *lock)
{
_serverPanic(__FILE__, __LINE__, "fastlock lock/unlock mismatch for: %s", lock->szName);
}
uint64_t fastlock_getlongwaitcount()
{
uint64_t rval;
@ -337,31 +346,6 @@ extern "C" int fastlock_trylock(struct fastlock *lock, int fWeak)
return false;
}
#ifdef __linux__
#define ROL32(v, shift) ((v << shift) | (v >> (32-shift)))
void unlock_futex(struct fastlock *lock, uint16_t ifutex)
{
unsigned mask = (1U << (ifutex % 32));
unsigned futexT;
__atomic_load(&lock->futex, &futexT, __ATOMIC_RELAXED);
futexT &= mask;
if (futexT == 0)
return;
for (;;)
{
__atomic_load(&lock->futex, &futexT, __ATOMIC_ACQUIRE);
futexT &= mask;
if (!futexT)
break;
if (futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask) == 1)
break;
}
}
#endif
extern "C" void fastlock_unlock(struct fastlock *lock)
{
--lock->m_depth;
@ -384,6 +368,26 @@ extern "C" void fastlock_unlock(struct fastlock *lock)
}
#endif
#ifdef __linux__
#define ROL32(v, shift) ((v << shift) | (v >> (32-shift)))
extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex)
{
unsigned mask = (1U << (ifutex % 32));
unsigned futexT;
for (;;)
{
__atomic_load(&lock->futex, &futexT, __ATOMIC_ACQUIRE);
futexT &= mask;
if (!futexT)
break;
if (futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask) == 1)
break;
}
}
#endif
extern "C" void fastlock_free(struct fastlock *lock)
{
// NOP

View File

@ -126,6 +126,7 @@ fastlock_trylock:
.ALIGN 16
.global fastlock_unlock
.type fastlock_unlock,@function
fastlock_unlock:
# RDI points to the struct:
# int32_t m_pidOwner
@ -133,34 +134,19 @@ fastlock_unlock:
# [rdi+64] ...
# uint16_t active
# uint16_t avail
push r11
sub dword ptr [rdi+4], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state
jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it
mov dword ptr [rdi], -1 # pidOwner = -1 (we don't own it anymore)
mov ecx, [rdi+64] # get current active (this one)
inc ecx # bump it to the next thread
mov [rdi+64], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable)
mov esi, [rdi+64] # get current active (this one)
inc esi # bump it to the next thread
mov word ptr [rdi+64], si # give up our ticket (note: lock is not required here because the spinlock itself guards this variable)
mfence # sync other threads
# At this point the lock is removed, however we must wake up any pending futexs
mov r9d, 1 # eax is the bitmask for 2 threads
rol r9d, cl # place the mask in the right spot for the next 2 threads
add rdi, 64 # rdi now points to the token
mov edx, [rdi+64+4] # load the futex mask
bt edx, esi # is the next thread waiting on a futex?
jc unlock_futex # unlock the futex if necessary
ret # if not we're done.
.ALIGN 16
.LRetryWake:
mov r11d, [rdi+4] # load the futex mask
and r11d, r9d # are any threads waiting on a futex?
jz .LDone # if not we're done.
# we have to wake the futexs
# rdi ARG1 futex (already in rdi)
mov esi, (10 | 128) # rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE
mov edx, 0x7fffffff # rdx ARG3 INT_MAX (number of threads to wake)
xor r10d, r10d # r10 ARG4 NULL
mov r8, rdi # r8 ARG5 dup rdi
# r9 ARG6 mask (already set above)
mov eax, 202 # sys_futex
syscall
cmp eax, 1 # did we wake as many as we expected?
jnz .LRetryWake
.LDone:
pop r11
js fastlock_panic # panic if we made m_depth negative
ret

View File

@ -30,6 +30,12 @@ public:
return m_epochNext;
}
void shutdown()
{
std::unique_lock<fastlock> lock(m_lock);
m_vecepochs.clear();
}
void endEpoch(uint64_t epoch, bool fNoFree = false)
{
std::unique_lock<fastlock> lock(m_lock);

View File

@ -30,6 +30,7 @@
#include "server.h"
#include "cluster.h"
#include "rdb.h"
#include "aelocker.h"
#include <dlfcn.h>
#include <mutex>
#include <condition_variable>
@ -1276,7 +1277,10 @@ client *moduleGetReplyClient(RedisModuleCtx *ctx) {
int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReplyLongLong(c,ll);
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyLongLongAsync(c,ll);
return REDISMODULE_OK;
}
@ -1286,9 +1290,12 @@ int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReplyProto(c,prefix,strlen(prefix));
addReplyProto(c,msg,strlen(msg));
addReplyProto(c,"\r\n",2);
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyProtoAsync(c,prefix,strlen(prefix));
addReplyProtoAsync(c,msg,strlen(msg));
addReplyProtoAsync(c,"\r\n",2);
return REDISMODULE_OK;
}
@ -1332,15 +1339,19 @@ int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) {
* The function always returns REDISMODULE_OK. */
int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
client *c = moduleGetReplyClient(ctx);
AeLocker locker;
if (c == NULL) return REDISMODULE_OK;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
if (len == REDISMODULE_POSTPONED_ARRAY_LEN) {
ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)*
(ctx->postponed_arrays_count+1), MALLOC_LOCAL);
ctx->postponed_arrays[ctx->postponed_arrays_count] =
addReplyDeferredLen(c);
addReplyDeferredLenAsync(c);
ctx->postponed_arrays_count++;
} else {
addReplyArrayLen(c,len);
addReplyArrayLenAsync(c,len);
}
return REDISMODULE_OK;
}
@ -1352,7 +1363,10 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
int RM_ReplyWithNullArray(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReplyNullArray(c);
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyNullArrayAsync(c);
return REDISMODULE_OK;
}
@ -1362,7 +1376,10 @@ int RM_ReplyWithNullArray(RedisModuleCtx *ctx) {
int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReply(c,shared.emptyarray);
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyAsync(c,shared.emptyarray);
return REDISMODULE_OK;
}
@ -1395,6 +1412,9 @@ int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) {
void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return;
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
if (ctx->postponed_arrays_count == 0) {
serverLog(LL_WARNING,
"API misuse detected in module %s: "
@ -1404,7 +1424,7 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
return;
}
ctx->postponed_arrays_count--;
setDeferredArrayLen(c,
setDeferredArrayLenAsync(c,
ctx->postponed_arrays[ctx->postponed_arrays_count],
len);
if (ctx->postponed_arrays_count == 0) {
@ -1419,7 +1439,10 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReplyBulkCBuffer(c,(char*)buf,len);
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyBulkCBufferAsync(c,(char*)buf,len);
return REDISMODULE_OK;
}
@ -1430,7 +1453,10 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReplyBulkCString(c,(char*)buf);
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyBulkCStringAsync(c,(char*)buf);
return REDISMODULE_OK;
}
@ -1440,7 +1466,10 @@ int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReplyBulk(c,str);
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyBulkAsync(c,str);
return REDISMODULE_OK;
}
@ -1450,7 +1479,10 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReply(c,shared.emptybulk);
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyAsync(c,shared.emptybulk);
return REDISMODULE_OK;
}
@ -1461,7 +1493,10 @@ int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReplyVerbatim(c, buf, len, "txt");
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyVerbatimAsync(c, buf, len, "txt");
return REDISMODULE_OK;
}
@ -1471,7 +1506,10 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len)
int RM_ReplyWithNull(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReplyNull(c);
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyNullAsync(c);
return REDISMODULE_OK;
}
@ -1484,8 +1522,11 @@ int RM_ReplyWithNull(RedisModuleCtx *ctx) {
int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
sds proto = sdsnewlen(reply->proto, reply->protolen);
addReplySds(c,proto);
addReplySdsAsync(c,proto);
return REDISMODULE_OK;
}
@ -1498,7 +1539,10 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReplyDouble(c,d);
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyDoubleAsync(c,d);
return REDISMODULE_OK;
}
@ -1513,7 +1557,10 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK;
addReplyHumanLongDouble(c, ld);
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyHumanLongDoubleAsync(c, ld);
return REDISMODULE_OK;
}

View File

@ -28,6 +28,7 @@
*/
#include "server.h"
bool FInReplicaReplay();
/* ================================ MULTI/EXEC ============================== */
@ -174,7 +175,7 @@ void execCommand(client *c) {
* This way we'll deliver the MULTI/..../EXEC block as a whole and
* both the AOF and the replication link will have the same consistency
* and atomicity guarantees. */
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)) && !(FInReplicaReplay())) {
execCommandPropagateMulti(c);
must_propagate = 1;
}
@ -190,7 +191,10 @@ void execCommand(client *c) {
"no permission to execute the command or subcommand" :
"no permission to touch the specified keys");
} else {
call(c,g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL);
int flags = g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL;
if (FInReplicaReplay())
flags &= ~CMD_CALL_PROPAGATE;
call(c,flags);
}
/* Commands may alter argc/argv, restore mstate. */

View File

@ -663,23 +663,33 @@ void addReplyDoubleAsync(client *c, double d) {
addReplyDoubleCore(c, d, true);
}
void addReplyBulkCore(client *c, robj_roptr obj, bool fAsync);
/* Add a long double as a bulk reply, but uses a human readable formatting
* of the double instead of exposing the crude behavior of doubles to the
* dear user. */
void addReplyHumanLongDouble(client *c, long double d) {
void addReplyHumanLongDoubleCore(client *c, long double d, bool fAsync) {
if (c->resp == 2) {
robj *o = createStringObjectFromLongDouble(d,1);
addReplyBulk(c,o);
addReplyBulkCore(c,o,fAsync);
decrRefCount(o);
} else {
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
addReplyProto(c,",",1);
addReplyProto(c,buf,len);
addReplyProto(c,"\r\n",2);
addReplyProtoCore(c,",",1,fAsync);
addReplyProtoCore(c,buf,len,fAsync);
addReplyProtoCore(c,"\r\n",2,fAsync);
}
}
void addReplyHumanLongDouble(client *c, long double d) {
addReplyHumanLongDoubleCore(c, d, false);
}
void addReplyHumanLongDoubleAsync(client *c, long double d) {
addReplyHumanLongDoubleCore(c, d, true);
}
/* Add a long long as integer reply or bulk len / multi bulk count.
* Basically this is used to output <prefix><long long><crlf>. */
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) {
@ -914,6 +924,10 @@ void addReplyBulkCString(client *c, const char *s) {
addReplyBulkCStringCore(c, s, false);
}
void addReplyBulkCStringAsync(client *c, const char *s) {
addReplyBulkCStringCore(c, s, true);
}
/* Add a long long as a bulk reply */
void addReplyBulkLongLong(client *c, long long ll) {
char buf[64];
@ -932,9 +946,9 @@ void addReplyBulkLongLong(client *c, long long ll) {
* three first characters of the extension are used, and if the
* provided one is shorter than that, the remaining is filled with
* spaces. */
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
void addReplyVerbatimCore(client *c, const char *s, size_t len, const char *ext, bool fAsync) {
if (c->resp == 2) {
addReplyBulkCBuffer(c,s,len);
addReplyBulkCBufferCore(c,s,len,fAsync);
} else {
char buf[32];
size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4);
@ -946,12 +960,20 @@ void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
p[i] = *ext++;
}
}
addReplyProto(c,buf,preflen);
addReplyProto(c,s,len);
addReplyProto(c,"\r\n",2);
addReplyProtoCore(c,buf,preflen,fAsync);
addReplyProtoCore(c,s,len,fAsync);
addReplyProtoCore(c,"\r\n",2,fAsync);
}
}
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
addReplyVerbatimCore(c, s, len, ext, false);
}
void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext) {
addReplyVerbatimCore(c, s, len, ext, true);
}
/* Add an array of C strings as status replies with a heading.
* This function is typically invoked by from commands that support
* subcommands in response to the 'help' subcommand. The help array
@ -1015,6 +1037,27 @@ int clientHasPendingReplies(client *c) {
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
}
static std::atomic<int> rgacceptsInFlight[MAX_EVENT_LOOPS];
int chooseBestThreadForAccept()
{
int ielMinLoad = 0;
int cclientsMin = INT_MAX;
for (int iel = 0; iel < cserver.cthreads; ++iel)
{
int cclientsThread;
atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread);
cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed);
if (cclientsThread < cserver.thread_min_client_threshold)
return iel;
if (cclientsThread < cclientsMin)
{
cclientsMin = cclientsThread;
ielMinLoad = iel;
}
}
return ielMinLoad;
}
void clientAcceptHandler(connection *conn) {
client *c = (client*)connGetPrivateData(conn);
@ -1156,7 +1199,25 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
if (!g_fTestMode)
{
// We always accept on the same thread
{
int ielTarget = chooseBestThreadForAccept();
rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed);
if (ielTarget != ielCur)
{
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN);
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,ielTarget);
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
zfree(szT);
});
if (res == AE_OK)
continue;
}
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
}
LLocalThread:
aeAcquireLock();
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip,ielCur);
@ -1173,10 +1234,15 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
goto LLocalThread;
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN);
aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{
int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,iel);
zfree(szT);
});
if (res != AE_OK)
{
zfree(szT);
goto LLocalThread;
}
}
}
}
@ -1220,8 +1286,25 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket);
aeAcquireLock();
int ielTarget = rand() % cserver.cthreads;
if (ielTarget == iel)
{
LLocalThread:
acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel);
}
else
{
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{
acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,ielTarget);
});
if (res != AE_OK)
goto LLocalThread;
}
aeReleaseLock();
}
}
static void freeClientArgv(client *c) {
@ -2602,7 +2685,7 @@ NULL
{
int iel = client->iel;
freeClientAsync(client);
aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] {
aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { // note: failure is OK
freeClientsInAsyncFreeQueue(iel);
});
}
@ -3108,6 +3191,7 @@ void unpauseClientsIfNecessary()
*
* The function returns the total number of events processed. */
int processEventsWhileBlocked(int iel) {
serverAssert(GlobalLocksAcquired());
int iterations = 4; /* See the function top-comment. */
int count = 0;
@ -3117,8 +3201,11 @@ int processEventsWhileBlocked(int iel) {
serverAssert(c->flags & CLIENT_PROTECTED);
c->lock.unlock();
}
aeReleaseLock();
serverAssertDebug(!GlobalLocksAcquired());
try
{
while (iterations--) {
int events = 0;
events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT);
@ -3126,6 +3213,18 @@ int processEventsWhileBlocked(int iel) {
if (!events) break;
count += events;
}
}
catch (...)
{
// Caller expects us to be locked so fix and rethrow
AeLocker locker;
if (c != nullptr)
c->lock.lock();
locker.arm(c);
locker.release();
throw;
}
AeLocker locker;
if (c != nullptr)
c->lock.lock();

View File

@ -1676,9 +1676,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) {
== NULL) return NULL;
if (rdbtype == RDB_TYPE_ZSET_2) {
if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL;
if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) {
sdsfree(sdsele);
return NULL;
}
} else {
if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
if (rdbLoadDoubleValue(rdb,&score) == -1) {
sdsfree(sdsele);
return NULL;
}
}
/* Don't care about integer-encoded strings. */
@ -2487,6 +2493,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
decrRefCount(val);
val = nullptr;
}
decrRefCount(key);
key = nullptr;
}
if (g_pserver->key_load_delay)
usleep(g_pserver->key_load_delay);

View File

@ -43,6 +43,8 @@
#include <algorithm>
#include <uuid/uuid.h>
#include <chrono>
#include <unordered_map>
#include <string>
void replicationDiscardCachedMaster(redisMaster *mi);
void replicationResurrectCachedMaster(redisMaster *mi, connection *conn);
@ -354,6 +356,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
cchDbNum = std::min<int>(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that
char szMvcc[128];
incrementMvccTstamp();
uint64_t mvccTstamp = getMvccTstamp();
int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp);
int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp);
@ -437,6 +440,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
addReplyProtoAsync(replica, reply->buf(), reply->used);
}
if (!fSendRaw)
{
addReplyAsync(replica,shared.crlf);
@ -2871,6 +2875,8 @@ void freeMasterInfo(redisMaster *mi)
zfree(mi->masteruser);
if (mi->repl_transfer_tmpfile)
zfree(mi->repl_transfer_tmpfile);
if (mi->clientFake)
freeClient(mi->clientFake);
delete mi->staleKeyMap;
if (mi->cached_master != nullptr)
freeClientAsync(mi->cached_master);
@ -2949,6 +2955,11 @@ void replicationHandleMasterDisconnection(redisMaster *mi) {
mi->master = NULL;
mi->repl_state = REPL_STATE_CONNECT;
mi->repl_down_since = g_pserver->unixtime;
if (mi->clientFake) {
freeClient(mi->clientFake);
mi->clientFake = nullptr;
}
/* We lost connection with our master, don't disconnect slaves yet,
* maybe we'll be able to PSYNC with our master later. We'll disconnect
* the slaves only if we'll have to do a full resync with our master. */
@ -3823,16 +3834,35 @@ public:
return m_cnesting == 1;
}
redisMaster *getMi(client *c)
{
if (m_mi == nullptr)
m_mi = MasterInfoFromClient(c);
return m_mi;
}
int nesting() const { return m_cnesting; }
private:
int m_cnesting = 0;
bool m_fCancelled = false;
redisMaster *m_mi = nullptr;
};
static thread_local std::unique_ptr<ReplicaNestState> s_pstate;
bool FInReplicaReplay()
{
return s_pstate != nullptr && s_pstate->nesting() > 0;
}
static std::unordered_map<std::string, uint64_t> g_mapmvcc;
void replicaReplayCommand(client *c)
{
static thread_local ReplicaNestState *s_pstate = nullptr;
if (s_pstate == nullptr)
s_pstate = new (MALLOC_LOCAL) ReplicaNestState;
s_pstate = std::make_unique<ReplicaNestState>();
// the replay command contains two arguments:
// 1: The UUID of the source
@ -3854,9 +3884,10 @@ void replicaReplayCommand(client *c)
return;
}
unsigned char uuid[UUID_BINARY_LEN];
std::string uuid;
uuid.resize(UUID_BINARY_LEN);
if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36
|| uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0)
|| uuid_parse((sds)ptrFromObj(c->argv[1]), (unsigned char*)uuid.data()) != 0)
{
addReplyError(c, "Expected UUID arg1");
s_pstate->Cancel();
@ -3892,7 +3923,7 @@ void replicaReplayCommand(client *c)
}
}
if (FSameUuidNoNil(uuid, cserver.uuid))
if (FSameUuidNoNil((unsigned char*)uuid.data(), cserver.uuid))
{
addReply(c, shared.ok);
s_pstate->Cancel();
@ -3902,33 +3933,57 @@ void replicaReplayCommand(client *c)
if (!s_pstate->FPush())
return;
redisMaster *mi = s_pstate->getMi(c);
client *cFake = mi->clientFake;
if (mi->clientFakeNesting != s_pstate->nesting())
cFake = nullptr;
serverAssert(mi != nullptr);
if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc)
{
s_pstate->Cancel();
s_pstate->Pop();
return;
}
// OK We've recieved a command lets execute
client *current_clientSave = serverTL->current_client;
client *cFake = createClient(nullptr, c->iel);
if (cFake == nullptr)
cFake = createClient(nullptr, c->iel);
cFake->lock.lock();
cFake->authenticated = c->authenticated;
cFake->puser = c->puser;
cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2]));
selectDb(cFake, c->db->id);
auto ccmdPrev = serverTL->commandsExecuted;
cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP;
processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE)));
cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP);
bool fExec = ccmdPrev != serverTL->commandsExecuted;
cFake->lock.unlock();
if (fExec)
if (fExec || cFake->flags & CLIENT_MULTI)
{
addReply(c, shared.ok);
selectDb(c, cFake->db->id);
redisMaster *mi = MasterInfoFromClient(c);
if (mi != nullptr) // this should never be null but I'd prefer not to crash
{
mi->mvccLastSync = mvcc;
}
if (mvcc > g_mapmvcc[uuid])
g_mapmvcc[uuid] = mvcc;
}
else
{
serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf);
addReplyError(c, "command did not execute");
}
serverAssert(sdslen(cFake->querybuf) == 0);
if (cFake->flags & CLIENT_MULTI)
{
mi->clientFake = cFake;
mi->clientFakeNesting = s_pstate->nesting();
}
else
{
if (mi->clientFake == cFake)
mi->clientFake = nullptr;
freeClient(cFake);
}
serverTL->current_client = current_clientSave;
// call() will not propogate this for us, so we do so here

View File

@ -1744,7 +1744,7 @@ void clientsCron(int iel) {
/* The following functions do different service checks on the client.
* The protocol is that they return non-zero if the client was
* terminated. */
if (clientsCronHandleTimeout(c,now)) goto LContinue;
if (clientsCronHandleTimeout(c,now)) continue; // Client free'd so don't release the lock
if (clientsCronResizeQueryBuffer(c)) goto LContinue;
if (clientsCronTrackExpansiveClients(c)) goto LContinue;
LContinue:
@ -1826,7 +1826,7 @@ void updateCachedTime(int update_daylight_info) {
t /= 1000;
__atomic_store(&g_pserver->mstime, &t, __ATOMIC_RELAXED);
t /= 1000;
__atomic_store(&g_pserver->unixtime, &t, __ATOMIC_RELAXED);
g_pserver->unixtime = t;
/* To get information about daylight saving time, we need to call
* localtime_r and cache the result. However calling localtime_r in this
@ -2027,7 +2027,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
if (g_pserver->shutdown_asap) {
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) throw ShutdownException();
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
g_pserver->shutdown_asap = 0;
}
@ -3018,6 +3018,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
pvar->current_client = nullptr;
pvar->clients_paused = 0;
pvar->fRetrySetAofEvent = false;
if (pvar->el == NULL) {
serverLog(LL_WARNING,
"Failed creating the event loop. Error message: '%s'",
@ -4026,23 +4027,16 @@ int prepareForShutdown(int flags) {
/* Close the listening sockets. Apparently this allows faster restarts. */
closeListeningSockets(1);
for (int ithread = 0; ithread < MAX_EVENT_LOOPS; ++ithread) {
for (int idb = 0; idb < cserver.dbnum; ++idb) {
if (g_pserver->rgthreadvar[ithread].rgdbSnapshot[idb] != nullptr)
g_pserver->db[idb]->endSnapshot(g_pserver->rgthreadvar[ithread].rgdbSnapshot[idb]);
for (int iel = 0; iel < cserver.cthreads; ++iel)
{
aePostFunction(g_pserver->rgthreadvar[iel].el, [iel]{
g_pserver->rgthreadvar[iel].el->stop = 1;
});
}
}
/* free our databases */
for (int idb = 0; idb < cserver.dbnum; ++idb) {
delete g_pserver->db[idb];
g_pserver->db[idb] = nullptr;
}
delete g_pserver->m_pstorageFactory;
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
g_pserver->sentinel_mode ? "Sentinel" : "KeyDB");
return C_OK;
}
@ -4969,19 +4963,19 @@ void version(void) {
}
void usage(void) {
fprintf(stderr,"Usage: ./keydb-server [/path/to/keydb.conf] [options]\n");
fprintf(stderr," ./keydb-server - (read config from stdin)\n");
fprintf(stderr," ./keydb-server -v or --version\n");
fprintf(stderr," ./keydb-server -h or --help\n");
fprintf(stderr," ./keydb-server --test-memory <megabytes>\n\n");
fprintf(stderr,"Usage: ./keydb-pro-server [/path/to/keydb.conf] [options]\n");
fprintf(stderr," ./keydb-pro-server - (read config from stdin)\n");
fprintf(stderr," ./keydb-pro-server -v or --version\n");
fprintf(stderr," ./keydb-pro-server -h or --help\n");
fprintf(stderr," ./keydb-pro-server --test-memory <megabytes>\n\n");
fprintf(stderr,"Examples:\n");
fprintf(stderr," ./keydb-server (run the server with default conf)\n");
fprintf(stderr," ./keydb-server /etc/redis/6379.conf\n");
fprintf(stderr," ./keydb-server --port 7777\n");
fprintf(stderr," ./keydb-server --port 7777 --replicaof 127.0.0.1 8888\n");
fprintf(stderr," ./keydb-server /etc/mykeydb.conf --loglevel verbose\n\n");
fprintf(stderr," ./keydb-pro-server (run the server with default conf)\n");
fprintf(stderr," ./keydb-pro-server /etc/redis/6379.conf\n");
fprintf(stderr," ./keydb-pro-server --port 7777\n");
fprintf(stderr," ./keydb-pro-server --port 7777 --replicaof 127.0.0.1 8888\n");
fprintf(stderr," ./keydb-pro-server /etc/mykeydb.conf --loglevel verbose\n\n");
fprintf(stderr,"Sentinel mode:\n");
fprintf(stderr," ./keydb-server /etc/sentinel.conf --sentinel\n");
fprintf(stderr," ./keydb-pro-server /etc/sentinel.conf --sentinel\n");
exit(1);
}
@ -5351,8 +5345,23 @@ void *workerThreadMain(void *parg)
aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
aeSetBeforeSleepProc(el, beforeSleep, AE_SLEEP_THREADSAFE);
aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE);
try
{
aeMain(el);
}
catch (ShutdownException)
{
}
serverAssert(!GlobalLocksAcquired());
aeDeleteEventLoop(el);
aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb) {
if (g_pserver->rgthreadvar[iel].rgdbSnapshot[idb] != nullptr)
g_pserver->db[idb]->endSnapshot(g_pserver->rgthreadvar[iel].rgdbSnapshot[idb]);
}
aeReleaseLock();
return NULL;
}
@ -5475,7 +5484,7 @@ int main(int argc, char **argv) {
exit(0);
} else {
fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
fprintf(stderr,"Example: ./keydb-server --test-memory 4096\n\n");
fprintf(stderr,"Example: ./keydb-pro-server --test-memory 4096\n\n");
exit(1);
}
}
@ -5664,7 +5673,18 @@ int main(int argc, char **argv) {
/* The main thread sleeps until all the workers are done.
this is so that all worker threads are orthogonal in their startup/shutdown */
void *pvRet;
pthread_join(rgthread[IDX_EVENT_LOOP_MAIN], &pvRet);
for (int iel = 0; iel < cserver.cthreads; ++iel)
pthread_join(rgthread[iel], &pvRet);
/* free our databases */
for (int idb = 0; idb < cserver.dbnum; ++idb) {
delete g_pserver->db[idb];
g_pserver->db[idb] = nullptr;
}
g_pserver->garbageCollector.shutdown();
delete g_pserver->m_pstorageFactory;
return 0;
}

View File

@ -1226,7 +1226,7 @@ class redisDbPersistentData
friend class redisDbPersistentDataSnapshot;
public:
~redisDbPersistentData();
virtual ~redisDbPersistentData();
redisDbPersistentData() = default;
redisDbPersistentData(redisDbPersistentData &&) = default;
@ -1358,7 +1358,6 @@ private:
std::unique_ptr<redisDbPersistentDataSnapshot> m_spdbSnapshotHOLDER;
const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr;
int m_refCount = 0;
fastlock m_lockStorage { "storage" };
};
class redisDbPersistentDataSnapshot : protected redisDbPersistentData
@ -1400,7 +1399,7 @@ public:
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb : public redisDbPersistentDataSnapshot
struct redisDb : public redisDbPersistentDataSnapshot
{
// Legacy C API, Do not add more
friend void tryResizeHashTables(int);
@ -1424,7 +1423,9 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot
redisDb()
: expireitr(nullptr)
{}
void initialize(int id);
virtual ~redisDb();
void dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire);
@ -1477,7 +1478,7 @@ public:
long long last_expire_set; /* when the last expire was set */
double avg_ttl; /* Average TTL, just for stats */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
};
/* Client MULTI/EXEC state */
typedef struct multiCmd {
@ -1904,6 +1905,7 @@ struct redisServerThreadVars {
long unsigned commandsExecuted = 0;
uint64_t gcEpoch = 0;
const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
bool fRetrySetAofEvent = false;
};
struct redisMaster {
@ -1913,6 +1915,8 @@ struct redisMaster {
int masterport; /* Port of master */
client *cached_master; /* Cached master to be reused for PSYNC. */
client *master;
client *clientFake;
int clientFakeNesting;
/* The following two fields is where we store master PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->master client structure. */
@ -1986,6 +1990,7 @@ struct redisServerConst {
sds license_key = nullptr;
int trial_timeout = 120;
int delete_on_evict = false; // Only valid when a storage provider is set
int thread_min_client_threshold = 50;
};
struct redisServer {
@ -2528,10 +2533,12 @@ void addReplyNullArray(client *c);
void addReplyNullArrayAsync(client *c);
void addReplyBool(client *c, int b);
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext);
void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext);
void addReplyProto(client *c, const char *s, size_t len);
void addReplyBulk(client *c, robj_roptr obj);
void AddReplyFromClient(client *c, client *src);
void addReplyBulkCString(client *c, const char *s);
void addReplyBulkCStringAsync(client *c, const char *s);
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
void addReplyBulkLongLong(client *c, long long ll);
void addReply(client *c, robj_roptr obj);
@ -2541,6 +2548,7 @@ void addReplyError(client *c, const char *err);
void addReplyStatus(client *c, const char *status);
void addReplyDouble(client *c, double d);
void addReplyHumanLongDouble(client *c, long double d);
void addReplyHumanLongDoubleAsync(client *c, long double d);
void addReplyLongLong(client *c, long long ll);
#ifdef __cplusplus
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync);
@ -3396,6 +3404,10 @@ void tlsInit(void);
void tlsInitThread();
int tlsConfigure(redisTLSContextConfig *ctx_config);
class ShutdownException
{};
#define redisDebug(fmt, ...) \
printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__)
#define redisDebugMark() \

View File

@ -65,7 +65,7 @@ test "Slave #5 is reachable and alive" {
test "Slave #5 should not be able to failover" {
after 10000
assert {[RI 5 role] eq {slave}}
assert_equal {slave} [RI 5 role]
}
test "Cluster should be down" {

View File

@ -59,6 +59,24 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
}
}
test {Active replicas propogate transaction} {
$master set testkey 0
$master multi
$master incr testkey
$master incr testkey
after 5000
$master get testkey
$master exec
assert_equal 2 [$master get testkey]
after 500
wait_for_condition 50 500 {
[string match "2" [$slave get testkey]]
} else {
fail "Transaction failed to replicate"
}
$master flushall
}
test {Active replicas WAIT} {
# Test that wait succeeds since replicas should be syncronized
$master set testkey foo

View File

@ -0,0 +1,74 @@
foreach topology {mesh ring} {
start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} {
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
for {set j 0} {$j < 4} {incr j} {
set R($j) [srv [expr 0-$j] client]
set R_host($j) [srv [expr 0-$j] host]
set R_port($j) [srv [expr 0-$j] port]
}
# Initialize as mesh
if [string equal $topology "mesh"] {
for {set j 0} {$j < 4} {incr j} {
for {set k 0} {$k < 4} {incr k} {
if $j!=$k {
$R($j) replicaof $R_host($k) $R_port($k)
after 100
}
}
}}
#Else Ring
if [string equal $topology "ring"] {
$R(0) replicaof $R_host(3) $R_port(3)
after 100
$R(1) replicaof $R_host(0) $R_port(0)
after 100
$R(2) replicaof $R_host(1) $R_port(1)
after 100
$R(3) replicaof $R_host(2) $R_port(2)
}
after 2000
test "$topology replicates to all nodes" {
$R(0) set testkey foo
after 500
assert_equal foo [$R(1) get testkey] "replicates to 1"
assert_equal foo [$R(2) get testkey] "replicates to 2"
}
test "$topology replicates only once" {
$R(0) set testkey 1
after 500
$R(1) incr testkey
after 500
$R(2) incr testkey
after 500
assert_equal 3 [$R(0) get testkey]
assert_equal 3 [$R(1) get testkey]
assert_equal 3 [$R(2) get testkey]
assert_equal 3 [$R(3) get testkey]
}
test "$topology transaction replicates only once" {
for {set j 0} {$j < 1000} {incr j} {
$R(0) set testkey 1
$R(0) multi
$R(0) incr testkey
$R(0) incr testkey
$R(0) exec
after 1
assert_equal 3 [$R(0) get testkey] "node 0"
assert_equal 3 [$R(1) get testkey] "node 1"
assert_equal 3 [$R(2) get testkey] "node 2"
assert_equal 3 [$R(3) get testkey] "node 3"
}
}
}
}
}
}
}

View File

@ -37,6 +37,7 @@ set ::all_tests {
unit/acl
unit/rreplay
unit/cron
unit/replication
integration/block-repl
integration/replication
integration/replication-2
@ -44,6 +45,7 @@ set ::all_tests {
integration/replication-4
integration/replication-psync
integration/replication-active
integration/replication-multimaster
integration/aof
integration/rdb
integration/convert-zipmap-hash-on-load

View File

@ -0,0 +1,12 @@
start_server {tags {"repl"}} {
test "incr of expired key on replica doesn't cause a crash" {
r debug force-master yes
r set testkey 1
r pexpire testkey 1
after 500
r incr testkey
r incr testkey
r debug force-master no
}
}

View File

@ -1,7 +1,7 @@
start_server {tags {"rreplay"}} {
start_server {tags {"rreplay"} overrides {active-replica yes}} {
test {RREPLAY use current db} {
r debug force-master
r debug force-master yes
r select 4
r set dbnum invalid
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n"
@ -10,7 +10,7 @@ start_server {tags {"rreplay"}} {
reconnect
test {RREPLAY db different} {
r debug force-master
r debug force-master yes
r select 4
r set testkey four
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2

View File

@ -37,7 +37,7 @@
# REDIS_CONFIG_FILE=/etc/redis/1234.conf \
# REDIS_LOG_FILE=/var/log/redis_1234.log \
# REDIS_DATA_DIR=/var/lib/redis/1234 \
# REDIS_EXECUTABLE=`command -v keydb-server` ./utils/install_server.sh
# REDIS_EXECUTABLE=`command -v keydb-pro-server` ./utils/install_server.sh
#
# This generates a redis config file and an /etc/init.d script, and installs them.
#
@ -129,7 +129,7 @@ fi
if [ ! -x "$REDIS_EXECUTABLE" ] ; then
_MANUAL_EXECUTION=true
#get the redis executable path
_REDIS_EXECUTABLE=`command -v keydb-server`
_REDIS_EXECUTABLE=`command -v keydb-pro-server`
read -p "Please select the redis executable path [$_REDIS_EXECUTABLE] " REDIS_EXECUTABLE
if [ ! -x "$REDIS_EXECUTABLE" ] ; then
REDIS_EXECUTABLE=$_REDIS_EXECUTABLE

View File

@ -12,7 +12,7 @@
### END INIT INFO
REDISPORT=6379
EXEC=/usr/local/bin/keydb-server
EXEC=/usr/local/bin/keydb-pro-server
CLIEXEC=/usr/local/bin/keydb-cli
PIDFILE=/var/run/redis_${REDISPORT}.pid

View File

@ -27,8 +27,8 @@ proc run-tests branches {
}
# Start the Redis server
puts " starting the server... [exec ./keydb-server -v]"
set pids [exec echo "port $::port\nloglevel warning\n" | ./keydb-server - > /dev/null 2> /dev/null &]
puts " starting the server... [exec ./keydb-pro-server -v]"
set pids [exec echo "port $::port\nloglevel warning\n" | ./keydb-pro-server - > /dev/null 2> /dev/null &]
puts " pids: $pids"
after 1000
puts " running the benchmark"