Merge branch 'keydbpro' of https://github.com/JohnSully/KeyDB-Pro into keydbpro

Former-commit-id: 770941923d183c0dfed8d5bf8f2299fc4fadf9c6
This commit is contained in:
John Sully 2020-02-16 18:38:15 -05:00
commit f76e237d24
15 changed files with 173 additions and 78 deletions

2
.gitignore vendored
View File

@ -19,7 +19,7 @@ keydb-cli
redis-sentinel redis-sentinel
keydb-sentinel keydb-sentinel
redis-server redis-server
keydb-server keydb-pro-server
doc-tools doc-tools
release release
misc/* misc/*

View File

@ -158,19 +158,19 @@ Running KeyDB
To run KeyDB with the default configuration just type: To run KeyDB with the default configuration just type:
% cd src % cd src
% ./keydb-server % ./keydb-pro-server
If you want to provide your keydb.conf, you have to run it using an additional If you want to provide your keydb.conf, you have to run it using an additional
parameter (the path of the configuration file): parameter (the path of the configuration file):
% cd src % cd src
% ./keydb-server /path/to/keydb.conf % ./keydb-pro-server /path/to/keydb.conf
It is possible to alter the KeyDB configuration by passing parameters directly It is possible to alter the KeyDB configuration by passing parameters directly
as options using the command line. Examples: as options using the command line. Examples:
% ./keydb-server --port 9999 --replicaof 127.0.0.1 6379 % ./keydb-pro-server --port 9999 --replicaof 127.0.0.1 6379
% ./keydb-server /etc/keydb/6379.conf --loglevel debug % ./keydb-pro-server /etc/keydb/6379.conf --loglevel debug
All the options in keydb.conf are also supported as options using the command All the options in keydb.conf are also supported as options using the command
line, with exactly the same name. line, with exactly the same name.
@ -178,7 +178,7 @@ line, with exactly the same name.
Playing with KeyDB Playing with KeyDB
------------------ ------------------
You can use keydb-cli to play with KeyDB. Start a keydb-server instance, You can use keydb-cli to play with KeyDB. Start a keydb-pro-server instance,
then in another terminal try the following: then in another terminal try the following:
% cd src % cd src
@ -243,7 +243,7 @@ Simply make a directory you would like to have the latest binaries dumped in, th
``` ```
$ docker run -it --rm -v /path-to-dump-binaries:/keydb_bin eqalpha/keydb-build-bin $ docker run -it --rm -v /path-to-dump-binaries:/keydb_bin eqalpha/keydb-build-bin
``` ```
You should receive the following files: keydb-benchmark, keydb-check-aof, keydb-check-rdb, keydb-cli, keydb-sentinel, keydb-server You should receive the following files: keydb-benchmark, keydb-check-aof, keydb-check-rdb, keydb-cli, keydb-sentinel, keydb-pro-server
If you are looking to enable flash support with the build (make MALLOC=memkind) then use the following command: If you are looking to enable flash support with the build (make MALLOC=memkind) then use the following command:
``` ```

View File

@ -29,9 +29,9 @@ INSTALL_INCLUDE_PATH= $(DESTDIR)$(PREFIX)/$(INCLUDE_PATH)
INSTALL_LIBRARY_PATH= $(DESTDIR)$(PREFIX)/$(LIBRARY_PATH) INSTALL_LIBRARY_PATH= $(DESTDIR)$(PREFIX)/$(LIBRARY_PATH)
INSTALL_PKGCONF_PATH= $(INSTALL_LIBRARY_PATH)/$(PKGCONF_PATH) INSTALL_PKGCONF_PATH= $(INSTALL_LIBRARY_PATH)/$(PKGCONF_PATH)
# keydb-server configuration used for testing # keydb-pro-server configuration used for testing
REDIS_PORT=56379 REDIS_PORT=56379
REDIS_SERVER=keydb-server REDIS_SERVER=keydb-pro-server
define REDIS_TEST_CONFIG define REDIS_TEST_CONFIG
daemonize yes daemonize yes
pidfile /tmp/hiredis-test-redis.pid pidfile /tmp/hiredis-test-redis.pid

View File

@ -3,7 +3,7 @@
# Note that in order to read the configuration file, KeyDB must be # Note that in order to read the configuration file, KeyDB must be
# started with the file path as first argument: # started with the file path as first argument:
# #
# ./keydb-server /path/to/keydb.conf # ./keydb-pro-server /path/to/keydb.conf
# Note on units: when memory size is needed, it is possible to specify # Note on units: when memory size is needed, it is possible to specify
# it in the usual form of 1k 5GB 4M and so forth: # it in the usual form of 1k 5GB 4M and so forth:

View File

@ -319,7 +319,7 @@ else
endif endif
@touch $@ @touch $@
# keydb-server # keydb-pro-server
$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) $(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ)
$(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS)

View File

@ -1497,6 +1497,14 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when)
rememberSlaveKeyWithExpire(db,key); rememberSlaveKeyWithExpire(db,key);
} }
redisDb::~redisDb()
{
dictRelease(watched_keys);
dictRelease(ready_keys);
dictRelease(blocking_keys);
listRelease(defrag_later);
}
void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e)
{ {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());

View File

@ -1560,7 +1560,7 @@ void sigsegvHandler(int sig, siginfo_t *info, void *secret) {
"\n=== KEYDB BUG REPORT END. Make sure to include from START to END. ===\n\n" "\n=== KEYDB BUG REPORT END. Make sure to include from START to END. ===\n\n"
" Please report the crash by opening an issue on github:\n\n" " Please report the crash by opening an issue on github:\n\n"
" https://github.com/JohnSully/KeyDB/issues\n\n" " https://github.com/JohnSully/KeyDB/issues\n\n"
" Suspect RAM error? Use keydb-server --test-memory to verify it.\n\n" " Suspect RAM error? Use keydb-pro-server --test-memory to verify it.\n\n"
); );
/* free(messages); Don't call free() with possibly corrupted memory. */ /* free(messages); Don't call free() with possibly corrupted memory. */

View File

@ -30,6 +30,7 @@
#include "server.h" #include "server.h"
#include "cluster.h" #include "cluster.h"
#include "rdb.h" #include "rdb.h"
#include "aelocker.h"
#include <dlfcn.h> #include <dlfcn.h>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
@ -1276,7 +1277,10 @@ client *moduleGetReplyClient(RedisModuleCtx *ctx) {
int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) { int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyLongLong(c,ll); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyLongLongAsync(c,ll);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1286,9 +1290,12 @@ int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) { int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyProto(c,prefix,strlen(prefix)); AeLocker locker;
addReplyProto(c,msg,strlen(msg)); std::unique_lock<fastlock> lock(c->lock);
addReplyProto(c,"\r\n",2); locker.arm(c);
addReplyProtoAsync(c,prefix,strlen(prefix));
addReplyProtoAsync(c,msg,strlen(msg));
addReplyProtoAsync(c,"\r\n",2);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1332,15 +1339,19 @@ int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) {
* The function always returns REDISMODULE_OK. */ * The function always returns REDISMODULE_OK. */
int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
AeLocker locker;
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
if (len == REDISMODULE_POSTPONED_ARRAY_LEN) { if (len == REDISMODULE_POSTPONED_ARRAY_LEN) {
ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)* ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)*
(ctx->postponed_arrays_count+1), MALLOC_LOCAL); (ctx->postponed_arrays_count+1), MALLOC_LOCAL);
ctx->postponed_arrays[ctx->postponed_arrays_count] = ctx->postponed_arrays[ctx->postponed_arrays_count] =
addReplyDeferredLen(c); addReplyDeferredLenAsync(c);
ctx->postponed_arrays_count++; ctx->postponed_arrays_count++;
} else { } else {
addReplyArrayLen(c,len); addReplyArrayLenAsync(c,len);
} }
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1352,7 +1363,10 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
int RM_ReplyWithNullArray(RedisModuleCtx *ctx) { int RM_ReplyWithNullArray(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyNullArray(c); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyNullArrayAsync(c);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1362,7 +1376,10 @@ int RM_ReplyWithNullArray(RedisModuleCtx *ctx) {
int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) { int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReply(c,shared.emptyarray); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyAsync(c,shared.emptyarray);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1395,6 +1412,9 @@ int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) {
void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return; if (c == NULL) return;
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
if (ctx->postponed_arrays_count == 0) { if (ctx->postponed_arrays_count == 0) {
serverLog(LL_WARNING, serverLog(LL_WARNING,
"API misuse detected in module %s: " "API misuse detected in module %s: "
@ -1404,7 +1424,7 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
return; return;
} }
ctx->postponed_arrays_count--; ctx->postponed_arrays_count--;
setDeferredArrayLen(c, setDeferredArrayLenAsync(c,
ctx->postponed_arrays[ctx->postponed_arrays_count], ctx->postponed_arrays[ctx->postponed_arrays_count],
len); len);
if (ctx->postponed_arrays_count == 0) { if (ctx->postponed_arrays_count == 0) {
@ -1419,7 +1439,10 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyBulkCBuffer(c,(char*)buf,len); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyBulkCBufferAsync(c,(char*)buf,len);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1430,7 +1453,10 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyBulkCString(c,(char*)buf); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyBulkCStringAsync(c,(char*)buf);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1440,7 +1466,10 @@ int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyBulk(c,str); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyBulkAsync(c,str);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1450,7 +1479,10 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReply(c,shared.emptybulk); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyAsync(c,shared.emptybulk);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1461,7 +1493,10 @@ int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) { int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyVerbatim(c, buf, len, "txt"); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyVerbatimAsync(c, buf, len, "txt");
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1471,7 +1506,10 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len)
int RM_ReplyWithNull(RedisModuleCtx *ctx) { int RM_ReplyWithNull(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyNull(c); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyNullAsync(c);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1484,8 +1522,11 @@ int RM_ReplyWithNull(RedisModuleCtx *ctx) {
int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
sds proto = sdsnewlen(reply->proto, reply->protolen); sds proto = sdsnewlen(reply->proto, reply->protolen);
addReplySds(c,proto); addReplySdsAsync(c,proto);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1498,7 +1539,10 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyDouble(c,d); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyDoubleAsync(c,d);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1513,7 +1557,10 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) { int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyHumanLongDouble(c, ld); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyHumanLongDoubleAsync(c, ld);
return REDISMODULE_OK; return REDISMODULE_OK;
} }

View File

@ -663,23 +663,33 @@ void addReplyDoubleAsync(client *c, double d) {
addReplyDoubleCore(c, d, true); addReplyDoubleCore(c, d, true);
} }
void addReplyBulkCore(client *c, robj_roptr obj, bool fAsync);
/* Add a long double as a bulk reply, but uses a human readable formatting /* Add a long double as a bulk reply, but uses a human readable formatting
* of the double instead of exposing the crude behavior of doubles to the * of the double instead of exposing the crude behavior of doubles to the
* dear user. */ * dear user. */
void addReplyHumanLongDouble(client *c, long double d) { void addReplyHumanLongDoubleCore(client *c, long double d, bool fAsync) {
if (c->resp == 2) { if (c->resp == 2) {
robj *o = createStringObjectFromLongDouble(d,1); robj *o = createStringObjectFromLongDouble(d,1);
addReplyBulk(c,o); addReplyBulkCore(c,o,fAsync);
decrRefCount(o); decrRefCount(o);
} else { } else {
char buf[MAX_LONG_DOUBLE_CHARS]; char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN); int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
addReplyProto(c,",",1); addReplyProtoCore(c,",",1,fAsync);
addReplyProto(c,buf,len); addReplyProtoCore(c,buf,len,fAsync);
addReplyProto(c,"\r\n",2); addReplyProtoCore(c,"\r\n",2,fAsync);
} }
} }
void addReplyHumanLongDouble(client *c, long double d) {
addReplyHumanLongDoubleCore(c, d, false);
}
void addReplyHumanLongDoubleAsync(client *c, long double d) {
addReplyHumanLongDoubleCore(c, d, true);
}
/* Add a long long as integer reply or bulk len / multi bulk count. /* Add a long long as integer reply or bulk len / multi bulk count.
* Basically this is used to output <prefix><long long><crlf>. */ * Basically this is used to output <prefix><long long><crlf>. */
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) { void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) {
@ -914,6 +924,10 @@ void addReplyBulkCString(client *c, const char *s) {
addReplyBulkCStringCore(c, s, false); addReplyBulkCStringCore(c, s, false);
} }
void addReplyBulkCStringAsync(client *c, const char *s) {
addReplyBulkCStringCore(c, s, true);
}
/* Add a long long as a bulk reply */ /* Add a long long as a bulk reply */
void addReplyBulkLongLong(client *c, long long ll) { void addReplyBulkLongLong(client *c, long long ll) {
char buf[64]; char buf[64];
@ -932,9 +946,9 @@ void addReplyBulkLongLong(client *c, long long ll) {
* three first characters of the extension are used, and if the * three first characters of the extension are used, and if the
* provided one is shorter than that, the remaining is filled with * provided one is shorter than that, the remaining is filled with
* spaces. */ * spaces. */
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { void addReplyVerbatimCore(client *c, const char *s, size_t len, const char *ext, bool fAsync) {
if (c->resp == 2) { if (c->resp == 2) {
addReplyBulkCBuffer(c,s,len); addReplyBulkCBufferCore(c,s,len,fAsync);
} else { } else {
char buf[32]; char buf[32];
size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4); size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4);
@ -946,12 +960,20 @@ void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
p[i] = *ext++; p[i] = *ext++;
} }
} }
addReplyProto(c,buf,preflen); addReplyProtoCore(c,buf,preflen,fAsync);
addReplyProto(c,s,len); addReplyProtoCore(c,s,len,fAsync);
addReplyProto(c,"\r\n",2); addReplyProtoCore(c,"\r\n",2,fAsync);
} }
} }
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
addReplyVerbatimCore(c, s, len, ext, false);
}
void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext) {
addReplyVerbatimCore(c, s, len, ext, true);
}
/* Add an array of C strings as status replies with a heading. /* Add an array of C strings as status replies with a heading.
* This function is typically invoked by from commands that support * This function is typically invoked by from commands that support
* subcommands in response to the 'help' subcommand. The help array * subcommands in response to the 'help' subcommand. The help array
@ -1015,30 +1037,24 @@ int clientHasPendingReplies(client *c) {
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
} }
static std::atomic<int> rgacceptsInFlight[MAX_EVENT_LOOPS];
int chooseBestThreadForAccept() int chooseBestThreadForAccept()
{ {
listIter li;
listNode *ln;
int rgcclients[MAX_EVENT_LOOPS] = {0};
listRewind(g_pserver->clients, &li);
while ((ln = listNext(&li)) != nullptr)
{
client *c = (client*)listNodeValue(ln);
if (c->iel < 0)
continue;
rgcclients[c->iel]++;
}
int ielMinLoad = 0; int ielMinLoad = 0;
int cclientsMin = INT_MAX;
for (int iel = 0; iel < cserver.cthreads; ++iel) for (int iel = 0; iel < cserver.cthreads; ++iel)
{ {
if (rgcclients[iel] < cserver.thread_min_client_threshold) int cclientsThread;
atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread);
cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed);
if (cclientsThread < cserver.thread_min_client_threshold)
return iel; return iel;
if (rgcclients[iel] < rgcclients[ielMinLoad]) if (cclientsThread < cclientsMin)
{
cclientsMin = cclientsThread;
ielMinLoad = iel; ielMinLoad = iel;
} }
}
return ielMinLoad; return ielMinLoad;
} }
@ -1185,18 +1201,21 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
{ {
{ {
int ielTarget = chooseBestThreadForAccept(); int ielTarget = chooseBestThreadForAccept();
rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed);
if (ielTarget != ielCur) if (ielTarget != ielCur)
{ {
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN); memcpy(szT, cip, NET_IP_STR_LEN);
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT, ielTarget); acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,ielTarget);
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
zfree(szT); zfree(szT);
}); });
if (res == AE_OK) if (res == AE_OK)
continue; continue;
} }
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
} }
LLocalThread: LLocalThread:
@ -1267,8 +1286,25 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
return; return;
} }
serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket); serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket);
aeAcquireLock();
int ielTarget = rand() % cserver.cthreads;
if (ielTarget == iel)
{
LLocalThread:
acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel); acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel);
} }
else
{
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{
acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,ielTarget);
});
if (res != AE_OK)
goto LLocalThread;
}
aeReleaseLock();
}
} }
static void freeClientArgv(client *c) { static void freeClientArgv(client *c) {

View File

@ -4970,19 +4970,19 @@ void version(void) {
} }
void usage(void) { void usage(void) {
fprintf(stderr,"Usage: ./keydb-server [/path/to/keydb.conf] [options]\n"); fprintf(stderr,"Usage: ./keydb-pro-server [/path/to/keydb.conf] [options]\n");
fprintf(stderr," ./keydb-server - (read config from stdin)\n"); fprintf(stderr," ./keydb-pro-server - (read config from stdin)\n");
fprintf(stderr," ./keydb-server -v or --version\n"); fprintf(stderr," ./keydb-pro-server -v or --version\n");
fprintf(stderr," ./keydb-server -h or --help\n"); fprintf(stderr," ./keydb-pro-server -h or --help\n");
fprintf(stderr," ./keydb-server --test-memory <megabytes>\n\n"); fprintf(stderr," ./keydb-pro-server --test-memory <megabytes>\n\n");
fprintf(stderr,"Examples:\n"); fprintf(stderr,"Examples:\n");
fprintf(stderr," ./keydb-server (run the server with default conf)\n"); fprintf(stderr," ./keydb-pro-server (run the server with default conf)\n");
fprintf(stderr," ./keydb-server /etc/redis/6379.conf\n"); fprintf(stderr," ./keydb-pro-server /etc/redis/6379.conf\n");
fprintf(stderr," ./keydb-server --port 7777\n"); fprintf(stderr," ./keydb-pro-server --port 7777\n");
fprintf(stderr," ./keydb-server --port 7777 --replicaof 127.0.0.1 8888\n"); fprintf(stderr," ./keydb-pro-server --port 7777 --replicaof 127.0.0.1 8888\n");
fprintf(stderr," ./keydb-server /etc/mykeydb.conf --loglevel verbose\n\n"); fprintf(stderr," ./keydb-pro-server /etc/mykeydb.conf --loglevel verbose\n\n");
fprintf(stderr,"Sentinel mode:\n"); fprintf(stderr,"Sentinel mode:\n");
fprintf(stderr," ./keydb-server /etc/sentinel.conf --sentinel\n"); fprintf(stderr," ./keydb-pro-server /etc/sentinel.conf --sentinel\n");
exit(1); exit(1);
} }
@ -5476,7 +5476,7 @@ int main(int argc, char **argv) {
exit(0); exit(0);
} else { } else {
fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n"); fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
fprintf(stderr,"Example: ./keydb-server --test-memory 4096\n\n"); fprintf(stderr,"Example: ./keydb-pro-server --test-memory 4096\n\n");
exit(1); exit(1);
} }
} }

View File

@ -1400,7 +1400,7 @@ public:
/* Redis database representation. There are multiple databases identified /* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured * by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */ * database. The database number is the 'id' field in the structure. */
typedef struct redisDb : public redisDbPersistentDataSnapshot struct redisDb : public redisDbPersistentDataSnapshot
{ {
// Legacy C API, Do not add more // Legacy C API, Do not add more
friend void tryResizeHashTables(int); friend void tryResizeHashTables(int);
@ -1425,6 +1425,7 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot
: expireitr(nullptr) : expireitr(nullptr)
{} {}
void initialize(int id); void initialize(int id);
virtual ~redisDb();
void dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); void dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire);
@ -1477,7 +1478,7 @@ public:
long long last_expire_set; /* when the last expire was set */ long long last_expire_set; /* when the last expire was set */
double avg_ttl; /* Average TTL, just for stats */ double avg_ttl; /* Average TTL, just for stats */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb; };
/* Client MULTI/EXEC state */ /* Client MULTI/EXEC state */
typedef struct multiCmd { typedef struct multiCmd {
@ -2532,10 +2533,12 @@ void addReplyNullArray(client *c);
void addReplyNullArrayAsync(client *c); void addReplyNullArrayAsync(client *c);
void addReplyBool(client *c, int b); void addReplyBool(client *c, int b);
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext);
void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext);
void addReplyProto(client *c, const char *s, size_t len); void addReplyProto(client *c, const char *s, size_t len);
void addReplyBulk(client *c, robj_roptr obj); void addReplyBulk(client *c, robj_roptr obj);
void AddReplyFromClient(client *c, client *src); void AddReplyFromClient(client *c, client *src);
void addReplyBulkCString(client *c, const char *s); void addReplyBulkCString(client *c, const char *s);
void addReplyBulkCStringAsync(client *c, const char *s);
void addReplyBulkCBuffer(client *c, const void *p, size_t len); void addReplyBulkCBuffer(client *c, const void *p, size_t len);
void addReplyBulkLongLong(client *c, long long ll); void addReplyBulkLongLong(client *c, long long ll);
void addReply(client *c, robj_roptr obj); void addReply(client *c, robj_roptr obj);
@ -2545,6 +2548,7 @@ void addReplyError(client *c, const char *err);
void addReplyStatus(client *c, const char *status); void addReplyStatus(client *c, const char *status);
void addReplyDouble(client *c, double d); void addReplyDouble(client *c, double d);
void addReplyHumanLongDouble(client *c, long double d); void addReplyHumanLongDouble(client *c, long double d);
void addReplyHumanLongDoubleAsync(client *c, long double d);
void addReplyLongLong(client *c, long long ll); void addReplyLongLong(client *c, long long ll);
#ifdef __cplusplus #ifdef __cplusplus
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync);

View File

@ -65,7 +65,7 @@ test "Slave #5 is reachable and alive" {
test "Slave #5 should not be able to failover" { test "Slave #5 should not be able to failover" {
after 10000 after 10000
assert {[RI 5 role] eq {slave}} assert_equal {slave} [RI 5 role]
} }
test "Cluster should be down" { test "Cluster should be down" {

View File

@ -37,7 +37,7 @@
# REDIS_CONFIG_FILE=/etc/redis/1234.conf \ # REDIS_CONFIG_FILE=/etc/redis/1234.conf \
# REDIS_LOG_FILE=/var/log/redis_1234.log \ # REDIS_LOG_FILE=/var/log/redis_1234.log \
# REDIS_DATA_DIR=/var/lib/redis/1234 \ # REDIS_DATA_DIR=/var/lib/redis/1234 \
# REDIS_EXECUTABLE=`command -v keydb-server` ./utils/install_server.sh # REDIS_EXECUTABLE=`command -v keydb-pro-server` ./utils/install_server.sh
# #
# This generates a redis config file and an /etc/init.d script, and installs them. # This generates a redis config file and an /etc/init.d script, and installs them.
# #
@ -129,7 +129,7 @@ fi
if [ ! -x "$REDIS_EXECUTABLE" ] ; then if [ ! -x "$REDIS_EXECUTABLE" ] ; then
_MANUAL_EXECUTION=true _MANUAL_EXECUTION=true
#get the redis executable path #get the redis executable path
_REDIS_EXECUTABLE=`command -v keydb-server` _REDIS_EXECUTABLE=`command -v keydb-pro-server`
read -p "Please select the redis executable path [$_REDIS_EXECUTABLE] " REDIS_EXECUTABLE read -p "Please select the redis executable path [$_REDIS_EXECUTABLE] " REDIS_EXECUTABLE
if [ ! -x "$REDIS_EXECUTABLE" ] ; then if [ ! -x "$REDIS_EXECUTABLE" ] ; then
REDIS_EXECUTABLE=$_REDIS_EXECUTABLE REDIS_EXECUTABLE=$_REDIS_EXECUTABLE

View File

@ -12,7 +12,7 @@
### END INIT INFO ### END INIT INFO
REDISPORT=6379 REDISPORT=6379
EXEC=/usr/local/bin/keydb-server EXEC=/usr/local/bin/keydb-pro-server
CLIEXEC=/usr/local/bin/keydb-cli CLIEXEC=/usr/local/bin/keydb-cli
PIDFILE=/var/run/redis_${REDISPORT}.pid PIDFILE=/var/run/redis_${REDISPORT}.pid

View File

@ -27,8 +27,8 @@ proc run-tests branches {
} }
# Start the Redis server # Start the Redis server
puts " starting the server... [exec ./keydb-server -v]" puts " starting the server... [exec ./keydb-pro-server -v]"
set pids [exec echo "port $::port\nloglevel warning\n" | ./keydb-server - > /dev/null 2> /dev/null &] set pids [exec echo "port $::port\nloglevel warning\n" | ./keydb-pro-server - > /dev/null 2> /dev/null &]
puts " pids: $pids" puts " pids: $pids"
after 1000 after 1000
puts " running the benchmark" puts " running the benchmark"