Merge pull request #51 from jsully/keydbpro
KeyDB Pro Collab Bug Fixes and Mac Build
This commit is contained in:
commit
751ea96e41
2
deps/Makefile
vendored
2
deps/Makefile
vendored
@ -103,7 +103,7 @@ jemalloc: .make-prerequisites
|
||||
rocksdb: .make-prerequisites
|
||||
@printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR)
|
||||
ifeq ($(uname_M),x86_64)
|
||||
cd rocksdb && PORTABLE=1 USE_SSE=1 FORCE_SSE42=1 $(MAKE) static_lib
|
||||
cd rocksdb && CFLAGS=-Wno-error PORTABLE=1 USE_SSE=1 FORCE_SSE42=1 $(MAKE) static_lib
|
||||
else
|
||||
cd rocksdb && PORTABLE=1 $(MAKE) static_lib
|
||||
endif
|
||||
|
16
src/Makefile
16
src/Makefile
@ -90,7 +90,6 @@ endif
|
||||
|
||||
ifeq ($(COMPILER_NAME),clang)
|
||||
CXXFLAGS+= -stdlib=libc++
|
||||
LDFLAGS+= -latomic
|
||||
endif
|
||||
|
||||
# To get ARM stack traces if KeyDB crashes we need a special C flag.
|
||||
@ -104,12 +103,6 @@ ifneq (,$(findstring armv,$(uname_M)))
|
||||
endif
|
||||
endif
|
||||
|
||||
ifneq (,$(filter aarch64 armv,$(uname_M)))
|
||||
LICENSE_LIB_DIR=../deps/license/arm64/
|
||||
else
|
||||
LICENSE_LIB_DIR=../deps/license/x64/
|
||||
endif
|
||||
|
||||
# Backwards compatibility for selecting an allocator
|
||||
ifeq ($(USE_TCMALLOC),yes)
|
||||
MALLOC=tcmalloc
|
||||
@ -135,11 +128,11 @@ endif
|
||||
# Override default settings if possible
|
||||
-include .make-settings
|
||||
|
||||
DEBUG=-g -ggdb
|
||||
FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(KEYDB_CFLAGS) $(REDIS_CFLAGS)
|
||||
FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CXXFLAGS) $(KEYDB_CFLAGS) $(REDIS_CFLAGS)
|
||||
FINAL_LDFLAGS=$(LDFLAGS) $(KEYDB_LDFLAGS) $(DEBUG)
|
||||
FINAL_LIBS+=-lm -lz -latomic -L$(LICENSE_LIB_DIR) -lkey -lcrypto -lbz2 -lzstd -llz4 -lsnappy
|
||||
DEBUG=-g -ggdb
|
||||
FINAL_LIBS+=-lm -lz -lcrypto -lbz2 -lzstd -llz4 -lsnappy
|
||||
|
||||
ifneq ($(uname_S),Darwin)
|
||||
FINAL_LIBS+=-latomic
|
||||
@ -249,14 +242,15 @@ endif
|
||||
|
||||
ifdef OPENSSL_PREFIX
|
||||
OPENSSL_CFLAGS=-I$(OPENSSL_PREFIX)/include
|
||||
OPENSSL_CXXFLAGS=-I$(OPENSSL_PREFIX)/include
|
||||
OPENSSL_LDFLAGS=-L$(OPENSSL_PREFIX)/lib
|
||||
# Also export OPENSSL_PREFIX so it ends up in deps sub-Makefiles
|
||||
export OPENSSL_PREFIX
|
||||
endif
|
||||
|
||||
# Include paths to dependencies
|
||||
FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/license/
|
||||
FINAL_CXXFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/rocksdb/include/ -I../deps/license -I../deps/concurrentqueue
|
||||
FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram
|
||||
FINAL_CXXFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/rocksdb/include/ -I../deps/concurrentqueue
|
||||
|
||||
# Determine systemd support and/or build preference (defaulting to auto-detection)
|
||||
BUILD_WITH_SYSTEMD=no
|
||||
|
@ -121,9 +121,9 @@ void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, si
|
||||
* the element already exists. */
|
||||
long index;
|
||||
if ((index = _dictKeyIndex(m_pdict, de->key, (uint64_t)de->key, nullptr)) == -1) {
|
||||
dictEntry *de = dictFind(m_pdict, de->key);
|
||||
serverAssert(de != nullptr);
|
||||
de->v.s64++;
|
||||
dictEntry *deLocal = dictFind(m_pdict, de->key);
|
||||
serverAssert(deLocal != nullptr);
|
||||
deLocal->v.s64++;
|
||||
m_collisionCount++;
|
||||
zfree(de);
|
||||
} else {
|
||||
|
@ -35,9 +35,9 @@
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <sys/stat.h>
|
||||
#ifdef __linux__
|
||||
#include <sys/sysinfo.h>
|
||||
#include "keycheck.h"
|
||||
|
||||
#endif
|
||||
|
||||
const char *KEYDB_SET_VERSION = KEYDB_REAL_VERSION;
|
||||
|
||||
@ -371,6 +371,7 @@ bool initializeStorageProvider(const char **err)
|
||||
// We need to set max memory to a sane default so keys are actually evicted properly
|
||||
if (g_pserver->maxmemory == 0 && g_pserver->maxmemory_policy == MAXMEMORY_NO_EVICTION)
|
||||
{
|
||||
#ifdef __linux__
|
||||
struct sysinfo sys;
|
||||
if (sysinfo(&sys) == 0)
|
||||
{
|
||||
@ -378,6 +379,9 @@ bool initializeStorageProvider(const char **err)
|
||||
g_pserver->maxmemory = static_cast<unsigned long long>(sys.totalram / 2.2);
|
||||
g_pserver->maxmemory_policy = MAXMEMORY_ALLKEYS_LRU;
|
||||
}
|
||||
#else
|
||||
serverLog(LL_WARNING, "Unable to dynamically set maxmemory, please set maxmemory and maxmemory-policy if you are using a storage provier");
|
||||
#endif
|
||||
}
|
||||
else if (g_pserver->maxmemory_policy == MAXMEMORY_NO_EVICTION)
|
||||
{
|
||||
@ -753,15 +757,6 @@ void loadServerConfigFromString(char *config) {
|
||||
g_sdsProvider = sdsdup(argv[1]);
|
||||
if (argc > 2)
|
||||
g_sdsArgs = sdsdup(argv[2]);
|
||||
} else if (!strcasecmp(argv[0],"enable-enterprise") && (argc == 1 || argc == 2)) {
|
||||
if (argc == 2)
|
||||
{
|
||||
if (!FValidKey(argv[1], strlen(argv[1]))) {
|
||||
err = "Invalid license key";
|
||||
goto loaderr;
|
||||
}
|
||||
cserver.license_key = zstrdup(argv[1]);
|
||||
}
|
||||
} else {
|
||||
err = "Bad directive or wrong number of arguments"; goto loaderr;
|
||||
}
|
||||
@ -1904,7 +1899,6 @@ int rewriteConfig(char *path, int force_all) {
|
||||
rewriteConfigClientoutputbufferlimitOption(state);
|
||||
rewriteConfigYesNoOption(state,"active-replica",g_pserver->fActiveReplica,CONFIG_DEFAULT_ACTIVE_REPLICA);
|
||||
rewriteConfigStringOption(state, "version-override",KEYDB_SET_VERSION,KEYDB_REAL_VERSION);
|
||||
rewriteConfigStringOption(state, "enable-enterprise", cserver.license_key, CONFIG_DEFAULT_LICENSE_KEY);
|
||||
rewriteConfigOOMScoreAdjValuesOption(state);
|
||||
|
||||
/* Rewrite Sentinel config if in Sentinel mode. */
|
||||
@ -2832,6 +2826,7 @@ standardConfig configs[] = {
|
||||
createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL),
|
||||
createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, g_pserver->repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */
|
||||
createLongLongConfig("repl-backlog-disk-reserve", NULL, IMMUTABLE_CONFIG, 0, LLONG_MAX, cserver.repl_backlog_disk_size, 0, MEMORY_CONFIG, NULL, NULL),
|
||||
createLongLongConfig("max-snapshot-slip", NULL, MODIFIABLE_CONFIG, 0, 5000, g_pserver->snapshot_slip, 400, 0, NULL, NULL),
|
||||
|
||||
/* Unsigned Long Long configs */
|
||||
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),
|
||||
|
@ -2488,7 +2488,10 @@ void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add) {
|
||||
} else {
|
||||
fModified = raxRemove(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL);
|
||||
}
|
||||
serverAssert(fModified);
|
||||
// This assert is disabled when a snapshot depth is >0 because prepOverwriteForSnapshot will add in a tombstone,
|
||||
// this prevents ensure from adding the key to the dictionary which means the caller isn't aware we're already tracking
|
||||
// the key.
|
||||
serverAssert(fModified || g_pserver->db[0]->snapshot_depth() > 0);
|
||||
if (indexed != buf) zfree(indexed);
|
||||
}
|
||||
|
||||
|
@ -575,7 +575,7 @@ int dictRehashMilliseconds(dict *d, int ms) {
|
||||
* dictionary so that the hash table automatically migrates from H1 to H2
|
||||
* while it is actively used. */
|
||||
static void _dictRehashStep(dict *d) {
|
||||
uint16_t pauserehash;
|
||||
int16_t pauserehash;
|
||||
__atomic_load(&d->pauserehash, &pauserehash, __ATOMIC_RELAXED);
|
||||
if (pauserehash == 0) dictRehash(d,1);
|
||||
}
|
||||
@ -1519,7 +1519,7 @@ void dictGetStats(char *buf, size_t bufsize, dict *d) {
|
||||
|
||||
void dictForceRehash(dict *d)
|
||||
{
|
||||
uint16_t pauserehash;
|
||||
int16_t pauserehash;
|
||||
__atomic_load(&d->pauserehash, &pauserehash, __ATOMIC_RELAXED);
|
||||
while (pauserehash == 0 && dictIsRehashing(d)) _dictRehashStep(d);
|
||||
}
|
||||
|
@ -4289,10 +4289,9 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
|
||||
}
|
||||
|
||||
{
|
||||
aeAcquireLock();
|
||||
AeLocker locker; locker.arm(nullptr);
|
||||
std::unique_lock<fastlock> ul(c->lock);
|
||||
call(c,call_flags);
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
||||
g_pserver->replication_allowed = prev_replication_allowed;
|
||||
|
@ -145,7 +145,7 @@ client *createClient(connection *conn, int iel) {
|
||||
client_id = g_pserver->next_client_id.fetch_add(1);
|
||||
c->iel = iel;
|
||||
c->id = client_id;
|
||||
sprintf(c->lock.szName, "client %lu", client_id);
|
||||
sprintf(c->lock.szName, "client %" PRIu64, client_id);
|
||||
c->resp = 2;
|
||||
c->conn = conn;
|
||||
c->name = NULL;
|
||||
@ -208,7 +208,7 @@ client *createClient(connection *conn, int iel) {
|
||||
c->paused_list_node = NULL;
|
||||
c->client_tracking_redirection = 0;
|
||||
c->casyncOpsPending = 0;
|
||||
c->mvccCheckpoint = getMvccTstamp();
|
||||
c->mvccCheckpoint = 0;
|
||||
c->master_error = 0;
|
||||
memset(c->uuid, 0, UUID_BINARY_LEN);
|
||||
|
||||
@ -1345,7 +1345,7 @@ void acceptOnThread(connection *conn, int flags, char *cip)
|
||||
szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
|
||||
memcpy(szT, cip, NET_IP_STR_LEN);
|
||||
}
|
||||
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT, fBootLoad] {
|
||||
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT] {
|
||||
connMarshalThread(conn);
|
||||
acceptCommonHandler(conn,flags,szT,ielTarget);
|
||||
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
|
||||
@ -2754,7 +2754,7 @@ void readQueryFromClient(connection *conn) {
|
||||
// Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often
|
||||
// so we exclude them unless the snapshot we need already exists
|
||||
bool fSnapshotExists = c->db->mvccLastSnapshot >= c->mvccCheckpoint;
|
||||
bool fWriteTooRecent = (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < redisDbPersistentDataSnapshot::msStaleThreshold/2);
|
||||
bool fWriteTooRecent = (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < static_cast<uint64_t>(g_pserver->snapshot_slip)/2);
|
||||
|
||||
// The check below avoids running async commands if this is a frequent writer unless a snapshot is already there to service it
|
||||
if (!fWriteTooRecent || fSnapshotExists) {
|
||||
@ -2766,9 +2766,9 @@ void readQueryFromClient(connection *conn) {
|
||||
} else {
|
||||
// If we're single threaded its actually better to just process the command here while the query is hot in the cache
|
||||
// multithreaded lock contention dominates and batching is better
|
||||
aeAcquireLock();
|
||||
AeLocker locker;
|
||||
locker.arm(c);
|
||||
runAndPropogateToReplicas(processInputBuffer, c, true /*fParse*/, CMD_CALL_FULL);
|
||||
aeReleaseLock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1552,6 +1552,7 @@ struct rdbSaveThreadArgs
|
||||
|
||||
void *rdbSaveThread(void *vargs)
|
||||
{
|
||||
serverAssert(!g_pserver->rdbThreadVars.fDone);
|
||||
rdbSaveThreadArgs *args = reinterpret_cast<rdbSaveThreadArgs*>(vargs);
|
||||
serverAssert(serverTL == nullptr);
|
||||
redisServerThreadVars vars;
|
||||
@ -1577,6 +1578,7 @@ void *rdbSaveThread(void *vargs)
|
||||
"RDB",cbDiff/(1024*1024));
|
||||
}
|
||||
|
||||
g_pserver->rdbThreadVars.fDone = true;
|
||||
return (retval == C_OK) ? (void*)0 : (void*)1;
|
||||
}
|
||||
|
||||
@ -2945,6 +2947,7 @@ public:
|
||||
serverTL = &vars;
|
||||
aeSetThreadOwnsLockOverride(true);
|
||||
|
||||
#ifdef __linux__
|
||||
// We will inheret the server thread's affinity mask, clear it as we want to run on a different core.
|
||||
cpu_set_t *cpuset = CPU_ALLOC(std::thread::hardware_concurrency());
|
||||
if (cpuset != nullptr) {
|
||||
@ -2956,6 +2959,7 @@ public:
|
||||
pthread_setaffinity_np(pthread_self(), size, cpuset);
|
||||
CPU_FREE(cpuset);
|
||||
}
|
||||
#endif
|
||||
|
||||
for (;;) {
|
||||
if (queue.queueJobs.size_approx() == 0) {
|
||||
@ -3624,6 +3628,7 @@ struct rdbSaveSocketThreadArgs
|
||||
};
|
||||
void *rdbSaveToSlavesSocketsThread(void *vargs)
|
||||
{
|
||||
serverAssert(!g_pserver->rdbThreadVars.fDone);
|
||||
/* Child */
|
||||
serverAssert(serverTL == nullptr);
|
||||
rdbSaveSocketThreadArgs *args = (rdbSaveSocketThreadArgs*)vargs;
|
||||
@ -3664,6 +3669,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs)
|
||||
|
||||
close(args->safe_to_exit_pipe);
|
||||
zfree(args);
|
||||
g_pserver->rdbThreadVars.fDone = true;
|
||||
return (retval == C_OK) ? (void*)0 : (void*)1;
|
||||
}
|
||||
|
||||
|
@ -42,6 +42,7 @@
|
||||
#include <assert.h>
|
||||
#include <math.h>
|
||||
#include <pthread.h>
|
||||
#include <string>
|
||||
extern "C" {
|
||||
#include <sdscompat.h> /* Use hiredis' sds compat header that maps sds calls to their hi_ variants */
|
||||
#include <sds.h> /* Use hiredis sds. */
|
||||
|
@ -190,7 +190,11 @@ int bg_unlink(const char *filename) {
|
||||
bool createDiskBacklog() {
|
||||
// Lets create some disk backed pages and add them here
|
||||
std::string path = "./repl-backlog-temp" + std::to_string(gettid());
|
||||
#ifdef __APPLE__
|
||||
int fd = open(path.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
|
||||
#else
|
||||
int fd = open(path.c_str(), O_CREAT | O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
|
||||
#endif
|
||||
if (fd < 0) {
|
||||
return false;
|
||||
}
|
||||
@ -991,6 +995,7 @@ need_full_resync:
|
||||
}
|
||||
|
||||
int checkClientOutputBufferLimits(client *c);
|
||||
void clientInstallAsyncWriteHandler(client *c);
|
||||
class replicationBuffer {
|
||||
std::vector<client *> replicas;
|
||||
clientReplyBlock *reply = nullptr;
|
||||
@ -1020,6 +1025,17 @@ public:
|
||||
if (reply == nullptr)
|
||||
return;
|
||||
size_t written = reply->used;
|
||||
|
||||
for (auto replica : replicas) {
|
||||
std::unique_lock<fastlock> ul(replica->lock);
|
||||
while (checkClientOutputBufferLimits(replica)
|
||||
&& (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) {
|
||||
ul.unlock();
|
||||
usleep(0);
|
||||
ul.lock();
|
||||
}
|
||||
}
|
||||
|
||||
aeAcquireLock();
|
||||
for (size_t ireplica = 0; ireplica < replicas.size(); ++ireplica) {
|
||||
auto replica = replicas[ireplica];
|
||||
@ -1027,18 +1043,23 @@ public:
|
||||
replica->replstate = REPL_STATE_NONE;
|
||||
continue;
|
||||
}
|
||||
|
||||
while (checkClientOutputBufferLimits(replica)
|
||||
&& (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) {
|
||||
aeReleaseLock();
|
||||
usleep(10);
|
||||
aeAcquireLock();
|
||||
}
|
||||
|
||||
std::unique_lock<fastlock> lock(replica->lock);
|
||||
addReplyProto(replica, reply->buf(), reply->used);
|
||||
if (ireplica == (replicas.size()-1) && replica->replyAsync == nullptr) {
|
||||
replica->replyAsync = reply;
|
||||
reply = nullptr;
|
||||
if (!(replica->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(replica);
|
||||
} else {
|
||||
addReplyProto(replica, reply->buf(), reply->used);
|
||||
}
|
||||
}
|
||||
ProcessPendingAsyncWrites();
|
||||
for (auto c : replicas) {
|
||||
if (c->flags & CLIENT_CLOSE_ASAP) {
|
||||
std::unique_lock<fastlock> ul(c->lock);
|
||||
c->replstate = REPL_STATE_NONE; // otherwise the client can't be free'd
|
||||
}
|
||||
}
|
||||
replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end());
|
||||
aeReleaseLock();
|
||||
if (reply != nullptr) {
|
||||
@ -1058,7 +1079,7 @@ public:
|
||||
}
|
||||
|
||||
if (reply == nullptr) {
|
||||
reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*2)));
|
||||
reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*64)));
|
||||
reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock);
|
||||
reply->used = 0;
|
||||
}
|
||||
@ -1183,7 +1204,7 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) {
|
||||
size_t snapshotDeclaredCount = spsnapshot->count();
|
||||
replBuf.addArrayLen(snapshotDeclaredCount);
|
||||
size_t count = 0;
|
||||
bool result = spsnapshot->enumerate([&replBuf, &count, &cbData, &lastLogTime, timeStart, &cbLastUpdate](const char *rgchKey, size_t cchKey, const void *rgchVal, size_t cchVal) -> bool{
|
||||
bool result = spsnapshot->enumerate([&replBuf, &count, &cbData, &lastLogTime, &cbLastUpdate](const char *rgchKey, size_t cchKey, const void *rgchVal, size_t cchVal) -> bool{
|
||||
replBuf.addArrayLen(2);
|
||||
|
||||
replBuf.addString(rgchKey, cchKey);
|
||||
@ -1557,16 +1578,9 @@ LError:
|
||||
return;
|
||||
}
|
||||
|
||||
void processReplconfLicense(client *c, robj *arg)
|
||||
void processReplconfLicense(client *c, robj *)
|
||||
{
|
||||
if (cserver.license_key != nullptr)
|
||||
{
|
||||
if (strcmp(cserver.license_key, szFromObj(arg)) == 0) {
|
||||
addReplyError(c, "Each replica must have a unique license key");
|
||||
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Only for back-compat
|
||||
addReply(c, shared.ok);
|
||||
}
|
||||
|
||||
@ -3632,41 +3646,6 @@ retry_connect:
|
||||
}
|
||||
sdsfree(err);
|
||||
err = NULL;
|
||||
mi->repl_state = REPL_STATE_SEND_KEY;
|
||||
// fallthrough
|
||||
}
|
||||
|
||||
/* Send LICENSE Key */
|
||||
if (mi->repl_state == REPL_STATE_SEND_KEY)
|
||||
{
|
||||
if (cserver.license_key == nullptr)
|
||||
{
|
||||
mi->repl_state = REPL_STATE_SEND_PSYNC;
|
||||
}
|
||||
else
|
||||
{
|
||||
err = sendCommand(conn,"REPLCONF","license",cserver.license_key,NULL);
|
||||
if (err) goto write_error;
|
||||
mi->repl_state = REPL_STATE_KEY_ACK;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* LICENSE Key Ack */
|
||||
if (mi->repl_state == REPL_STATE_KEY_ACK)
|
||||
{
|
||||
err = receiveSynchronousResponse(mi, conn);
|
||||
if (err[0] == '-') {
|
||||
if (err[1] == 'E' && err[2] == 'R' && err[3] == 'R') {
|
||||
// Replicating with non-enterprise
|
||||
serverLog(LL_WARNING, "Replicating with non-enterprise server.");
|
||||
} else {
|
||||
serverLog(LL_WARNING, "Recieved error from client: %s", err);
|
||||
sdsfree(err);
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
sdsfree(err);
|
||||
mi->repl_state = REPL_STATE_SEND_PSYNC;
|
||||
// fallthrough
|
||||
}
|
||||
@ -5598,7 +5577,7 @@ void flushReplBacklogToClients()
|
||||
|
||||
if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) {
|
||||
bool fAsyncWrite = false;
|
||||
long long min_offset = LONG_LONG_MAX;
|
||||
long long min_offset = LLONG_MAX;
|
||||
// Ensure no overflow
|
||||
serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset);
|
||||
if (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart > g_pserver->repl_backlog_size) {
|
||||
@ -5657,7 +5636,7 @@ LDone:
|
||||
// This may be called multiple times per "frame" so update with our progress flushing to clients
|
||||
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
|
||||
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
|
||||
g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst);
|
||||
g_pserver->repl_lowest_off.store(min_offset == LLONG_MAX ? -1 : min_offset, std::memory_order_seq_cst);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -65,7 +65,6 @@
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include "aelocker.h"
|
||||
#include "keycheck.h"
|
||||
#include "motd.h"
|
||||
#include "t_nhash.h"
|
||||
#include "readwritelock.h"
|
||||
@ -1178,6 +1177,10 @@ struct redisCommand redisCommandTable[] = {
|
||||
|
||||
{"failover",failoverCommand,-1,
|
||||
"admin no-script ok-stale",
|
||||
0,NULL,0,0,0,0,0,0},
|
||||
|
||||
{"lfence", lfenceCommand,1,
|
||||
"read-only random ok-stale",
|
||||
0,NULL,0,0,0,0,0,0}
|
||||
};
|
||||
|
||||
@ -2200,8 +2203,8 @@ void checkChildrenDone(void) {
|
||||
if (g_pserver->FRdbSaveInProgress() && !cserver.fForkBgSave)
|
||||
{
|
||||
void *rval = nullptr;
|
||||
int err;
|
||||
if ((err = pthread_tryjoin_np(g_pserver->rdbThreadVars.rdb_child_thread, &rval)))
|
||||
int err = EAGAIN;
|
||||
if (!g_pserver->rdbThreadVars.fDone || (err = pthread_join(g_pserver->rdbThreadVars.rdb_child_thread, &rval)))
|
||||
{
|
||||
if (err != EBUSY && err != EAGAIN)
|
||||
serverLog(LL_WARNING, "Error joining the background RDB save thread: %s\n", strerror(errno));
|
||||
@ -2211,6 +2214,7 @@ void checkChildrenDone(void) {
|
||||
int exitcode = (int)reinterpret_cast<ptrdiff_t>(rval);
|
||||
backgroundSaveDoneHandler(exitcode,g_pserver->rdbThreadVars.fRdbThreadCancel);
|
||||
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
|
||||
g_pserver->rdbThreadVars.fDone = false;
|
||||
if (exitcode == 0) receiveChildInfo();
|
||||
}
|
||||
}
|
||||
@ -2776,7 +2780,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
|
||||
/* end any snapshots created by fast async commands */
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
||||
if (serverTL->rgdbSnapshot[idb] != nullptr && serverTL->rgdbSnapshot[idb]->FStale()) {
|
||||
g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]);
|
||||
serverTL->rgdbSnapshot[idb] = nullptr;
|
||||
}
|
||||
@ -4661,6 +4665,11 @@ void rejectCommand(client *c, robj *reply, int severity = ERR_CRITICAL) {
|
||||
}
|
||||
}
|
||||
|
||||
void lfenceCommand(client *c) {
|
||||
c->mvccCheckpoint = getMvccTstamp();
|
||||
addReply(c, shared.ok);
|
||||
}
|
||||
|
||||
void rejectCommandFormat(client *c, const char *fmt, ...) {
|
||||
if (c->cmd) c->cmd->rejected_calls++;
|
||||
flagTransaction(c);
|
||||
@ -7513,13 +7522,6 @@ int main(int argc, char **argv) {
|
||||
serverLog(LL_WARNING, "Configuration loaded");
|
||||
}
|
||||
|
||||
#ifndef NO_LICENSE_CHECK
|
||||
if (!g_pserver->sentinel_mode && (cserver.license_key == nullptr || !FValidKey(cserver.license_key, strlen(cserver.license_key)))){
|
||||
serverLog(LL_WARNING, "Error: %s license key provided, exiting immediately.", cserver.license_key == nullptr ? "No" : "Invalid");
|
||||
exit(1);
|
||||
}
|
||||
#endif
|
||||
|
||||
validateConfiguration();
|
||||
|
||||
const char *err;
|
||||
|
14
src/server.h
14
src/server.h
@ -373,8 +373,6 @@ inline bool operator!=(const void *p, const robj_sharedptr &rhs)
|
||||
#define CONFIG_DEFAULT_ACTIVE_REPLICA 0
|
||||
#define CONFIG_DEFAULT_ENABLE_MULTIMASTER 0
|
||||
|
||||
#define CONFIG_DEFAULT_LICENSE_KEY ""
|
||||
|
||||
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 64 /* Loopkups per loop. */
|
||||
#define ACTIVE_EXPIRE_CYCLE_SUBKEY_LOOKUPS_PER_LOOP 16384 /* Subkey loopkups per loop. */
|
||||
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
|
||||
@ -586,8 +584,6 @@ typedef enum {
|
||||
REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */
|
||||
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
|
||||
REPL_STATE_RECEIVE_UUID, /* they should ack with their UUID */
|
||||
REPL_STATE_SEND_KEY,
|
||||
REPL_STATE_KEY_ACK,
|
||||
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
|
||||
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
|
||||
/* --- End of handshake states --- */
|
||||
@ -1110,7 +1106,8 @@ public:
|
||||
redisDbPersistentData();
|
||||
virtual ~redisDbPersistentData();
|
||||
|
||||
redisDbPersistentData(redisDbPersistentData &&) = default;
|
||||
redisDbPersistentData(const redisDbPersistentData &) = delete;
|
||||
redisDbPersistentData(redisDbPersistentData &&) = delete;
|
||||
|
||||
size_t slots() const { return dictSlots(m_pdict); }
|
||||
size_t size(bool fCachedOnly = false) const;
|
||||
@ -1290,8 +1287,6 @@ public:
|
||||
// These need to be fixed
|
||||
using redisDbPersistentData::size;
|
||||
using redisDbPersistentData::expireSize;
|
||||
|
||||
static const uint64_t msStaleThreshold = 500;
|
||||
};
|
||||
|
||||
/* Redis database representation. There are multiple databases identified
|
||||
@ -2179,7 +2174,6 @@ struct redisServerConst {
|
||||
|
||||
int enable_motd; /* Flag to retrieve the Message of today using CURL request*/
|
||||
|
||||
sds license_key = nullptr;
|
||||
int delete_on_evict = false; // Only valid when a storage provider is set
|
||||
int thread_min_client_threshold = 50;
|
||||
int multimaster_no_forward;
|
||||
@ -2371,6 +2365,7 @@ struct redisServer {
|
||||
struct _rdbThreadVars
|
||||
{
|
||||
std::atomic<bool> fRdbThreadCancel {false};
|
||||
std::atomic<bool> fDone {false};
|
||||
int tmpfileNum = 0;
|
||||
pthread_t rdb_child_thread;
|
||||
int fRdbThreadActive = false;
|
||||
@ -2609,6 +2604,8 @@ struct redisServer {
|
||||
IStorageFactory *m_pstorageFactory = nullptr;
|
||||
int storage_flush_period; // The time between flushes in the CRON job
|
||||
|
||||
long long snapshot_slip = 500; // The amount of time in milliseconds we let a snapshot be behind the current database
|
||||
|
||||
/* TLS Configuration */
|
||||
int tls_cluster;
|
||||
int tls_replication;
|
||||
@ -3788,6 +3785,7 @@ void hrenameCommand(client *c);
|
||||
void stralgoCommand(client *c);
|
||||
void resetCommand(client *c);
|
||||
void failoverCommand(client *c);
|
||||
void lfenceCommand(client *c);
|
||||
|
||||
|
||||
int FBrokenLinkToMaster();
|
||||
|
@ -626,7 +626,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function<bool(c
|
||||
__atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
|
||||
if (fResult && psnapshot != nullptr)
|
||||
{
|
||||
std::function<bool(const char*, robj_roptr o)> fnNew = [this, &fn, &celem, dictTombstone](const char *key, robj_roptr o) {
|
||||
std::function<bool(const char*, robj_roptr o)> fnNew = [&fn, &celem, dictTombstone](const char *key, robj_roptr o) {
|
||||
dictEntry *deTombstone = dictFind(dictTombstone, key);
|
||||
if (deTombstone != nullptr)
|
||||
return true;
|
||||
@ -654,7 +654,7 @@ int redisDbPersistentDataSnapshot::snapshot_depth() const
|
||||
|
||||
bool redisDbPersistentDataSnapshot::FStale() const
|
||||
{
|
||||
return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= redisDbPersistentDataSnapshot::msStaleThreshold;
|
||||
return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= static_cast<uint64_t>(g_pserver->snapshot_slip);
|
||||
}
|
||||
|
||||
void dictGCAsyncFree(dictAsyncRehashCtl *async) {
|
||||
|
@ -44,7 +44,7 @@ public:
|
||||
|
||||
virtual void flush() override;
|
||||
|
||||
size_t count() const;
|
||||
size_t count() const override;
|
||||
|
||||
protected:
|
||||
bool FKeyExists(const char *key, size_t cchKey) const;
|
||||
|
@ -21,6 +21,16 @@ rocksdb::Options DefaultRocksDBOptions() {
|
||||
options.allow_mmap_reads = true;
|
||||
options.avoid_unnecessary_blocking_io = true;
|
||||
options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(0));
|
||||
|
||||
rocksdb::BlockBasedTableOptions table_options;
|
||||
table_options.block_size = 16 * 1024;
|
||||
table_options.cache_index_and_filter_blocks = true;
|
||||
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
|
||||
table_options.data_block_index_type = rocksdb::BlockBasedTableOptions::kDataBlockBinaryAndHash;
|
||||
table_options.checksum = rocksdb::kNoChecksum;
|
||||
table_options.format_version = 4;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
|
||||
return options;
|
||||
}
|
||||
|
||||
@ -51,19 +61,9 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons
|
||||
options.sst_file_manager = m_pfilemanager;
|
||||
options.create_if_missing = true;
|
||||
options.create_missing_column_families = true;
|
||||
options.info_log_level = rocksdb::ERROR_LEVEL;
|
||||
rocksdb::DB *db = nullptr;
|
||||
|
||||
rocksdb::BlockBasedTableOptions table_options;
|
||||
table_options.block_size = 16 * 1024;
|
||||
table_options.cache_index_and_filter_blocks = true;
|
||||
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
|
||||
table_options.data_block_index_type = rocksdb::BlockBasedTableOptions::kDataBlockBinaryAndHash;
|
||||
table_options.checksum = rocksdb::kNoChecksum;
|
||||
table_options.format_version = 4;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
options.table_factory.reset(
|
||||
rocksdb::NewBlockBasedTableFactory(table_options));
|
||||
|
||||
for (int idb = 0; idb < dbnum; ++idb)
|
||||
{
|
||||
rocksdb::ColumnFamilyOptions cf_options(options);
|
||||
@ -169,12 +169,14 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *pr
|
||||
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(opts, spcolfamily.get()));
|
||||
|
||||
it->SeekToFirst();
|
||||
if (fUnclean && it->Valid())
|
||||
printf("\tDatabase was not shutdown cleanly, recomputing metrics\n");
|
||||
bool fFirstRealKey = true;
|
||||
|
||||
for (;it->Valid(); it->Next()) {
|
||||
if (FInternalKey(it->key().data(), it->key().size()))
|
||||
continue;
|
||||
if (fUnclean && it->Valid() && fFirstRealKey)
|
||||
printf("\tDatabase %d was not shutdown cleanly, recomputing metrics\n", db);
|
||||
fFirstRealKey = false;
|
||||
if (iter != nullptr)
|
||||
iter(it->key().data(), it->key().size(), privdata);
|
||||
++count;
|
||||
|
@ -8,7 +8,7 @@ class TestStorageFactory : public IStorageFactory
|
||||
virtual class IStorage *createMetadataDb() override;
|
||||
virtual const char *name() const override;
|
||||
virtual size_t totalDiskspaceUsed() const override { return 0; }
|
||||
virtual bool FSlow() const { return false; }
|
||||
virtual bool FSlow() const override { return false; }
|
||||
};
|
||||
|
||||
class TestStorageProvider final : public IStorage
|
||||
|
@ -110,6 +110,7 @@ proc wait_for_ofs_sync {r1 r2} {
|
||||
} else {
|
||||
fail "replica didn't sync in time"
|
||||
}
|
||||
$r2 lfence
|
||||
}
|
||||
|
||||
proc wait_done_loading r {
|
||||
|
Loading…
x
Reference in New Issue
Block a user