diff --git a/deps/Makefile b/deps/Makefile index d02882b1b..190a648a8 100644 --- a/deps/Makefile +++ b/deps/Makefile @@ -103,7 +103,7 @@ jemalloc: .make-prerequisites rocksdb: .make-prerequisites @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) ifeq ($(uname_M),x86_64) - cd rocksdb && PORTABLE=1 USE_SSE=1 FORCE_SSE42=1 $(MAKE) static_lib + cd rocksdb && CFLAGS=-Wno-error PORTABLE=1 USE_SSE=1 FORCE_SSE42=1 $(MAKE) static_lib else cd rocksdb && PORTABLE=1 $(MAKE) static_lib endif diff --git a/src/Makefile b/src/Makefile index f48e93821..d432c6fc5 100644 --- a/src/Makefile +++ b/src/Makefile @@ -90,7 +90,6 @@ endif ifeq ($(COMPILER_NAME),clang) CXXFLAGS+= -stdlib=libc++ - LDFLAGS+= -latomic endif # To get ARM stack traces if KeyDB crashes we need a special C flag. @@ -104,12 +103,6 @@ ifneq (,$(findstring armv,$(uname_M))) endif endif -ifneq (,$(filter aarch64 armv,$(uname_M))) - LICENSE_LIB_DIR=../deps/license/arm64/ -else - LICENSE_LIB_DIR=../deps/license/x64/ -endif - # Backwards compatibility for selecting an allocator ifeq ($(USE_TCMALLOC),yes) MALLOC=tcmalloc @@ -135,11 +128,11 @@ endif # Override default settings if possible -include .make-settings +DEBUG=-g -ggdb FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(KEYDB_CFLAGS) $(REDIS_CFLAGS) FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CXXFLAGS) $(KEYDB_CFLAGS) $(REDIS_CFLAGS) FINAL_LDFLAGS=$(LDFLAGS) $(KEYDB_LDFLAGS) $(DEBUG) -FINAL_LIBS+=-lm -lz -latomic -L$(LICENSE_LIB_DIR) -lkey -lcrypto -lbz2 -lzstd -llz4 -lsnappy -DEBUG=-g -ggdb +FINAL_LIBS+=-lm -lz -lcrypto -lbz2 -lzstd -llz4 -lsnappy ifneq ($(uname_S),Darwin) FINAL_LIBS+=-latomic @@ -249,14 +242,15 @@ endif ifdef OPENSSL_PREFIX OPENSSL_CFLAGS=-I$(OPENSSL_PREFIX)/include + OPENSSL_CXXFLAGS=-I$(OPENSSL_PREFIX)/include OPENSSL_LDFLAGS=-L$(OPENSSL_PREFIX)/lib # Also export OPENSSL_PREFIX so it ends up in deps sub-Makefiles export OPENSSL_PREFIX endif # Include paths to dependencies -FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/license/ -FINAL_CXXFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/rocksdb/include/ -I../deps/license -I../deps/concurrentqueue +FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram +FINAL_CXXFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/rocksdb/include/ -I../deps/concurrentqueue # Determine systemd support and/or build preference (defaulting to auto-detection) BUILD_WITH_SYSTEMD=no diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index cdc380105..2c6dd42b9 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -121,9 +121,9 @@ void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, si * the element already exists. */ long index; if ((index = _dictKeyIndex(m_pdict, de->key, (uint64_t)de->key, nullptr)) == -1) { - dictEntry *de = dictFind(m_pdict, de->key); - serverAssert(de != nullptr); - de->v.s64++; + dictEntry *deLocal = dictFind(m_pdict, de->key); + serverAssert(deLocal != nullptr); + deLocal->v.s64++; m_collisionCount++; zfree(de); } else { diff --git a/src/config.cpp b/src/config.cpp index 870d2b6c3..08da538be 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -35,9 +35,9 @@ #include #include +#ifdef __linux__ #include -#include "keycheck.h" - +#endif const char *KEYDB_SET_VERSION = KEYDB_REAL_VERSION; @@ -371,6 +371,7 @@ bool initializeStorageProvider(const char **err) // We need to set max memory to a sane default so keys are actually evicted properly if (g_pserver->maxmemory == 0 && g_pserver->maxmemory_policy == MAXMEMORY_NO_EVICTION) { +#ifdef __linux__ struct sysinfo sys; if (sysinfo(&sys) == 0) { @@ -378,6 +379,9 @@ bool initializeStorageProvider(const char **err) g_pserver->maxmemory = static_cast(sys.totalram / 2.2); g_pserver->maxmemory_policy = MAXMEMORY_ALLKEYS_LRU; } +#else + serverLog(LL_WARNING, "Unable to dynamically set maxmemory, please set maxmemory and maxmemory-policy if you are using a storage provier"); +#endif } else if (g_pserver->maxmemory_policy == MAXMEMORY_NO_EVICTION) { @@ -753,15 +757,6 @@ void loadServerConfigFromString(char *config) { g_sdsProvider = sdsdup(argv[1]); if (argc > 2) g_sdsArgs = sdsdup(argv[2]); - } else if (!strcasecmp(argv[0],"enable-enterprise") && (argc == 1 || argc == 2)) { - if (argc == 2) - { - if (!FValidKey(argv[1], strlen(argv[1]))) { - err = "Invalid license key"; - goto loaderr; - } - cserver.license_key = zstrdup(argv[1]); - } } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -1904,7 +1899,6 @@ int rewriteConfig(char *path, int force_all) { rewriteConfigClientoutputbufferlimitOption(state); rewriteConfigYesNoOption(state,"active-replica",g_pserver->fActiveReplica,CONFIG_DEFAULT_ACTIVE_REPLICA); rewriteConfigStringOption(state, "version-override",KEYDB_SET_VERSION,KEYDB_REAL_VERSION); - rewriteConfigStringOption(state, "enable-enterprise", cserver.license_key, CONFIG_DEFAULT_LICENSE_KEY); rewriteConfigOOMScoreAdjValuesOption(state); /* Rewrite Sentinel config if in Sentinel mode. */ @@ -2832,6 +2826,7 @@ standardConfig configs[] = { createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL), createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, g_pserver->repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */ createLongLongConfig("repl-backlog-disk-reserve", NULL, IMMUTABLE_CONFIG, 0, LLONG_MAX, cserver.repl_backlog_disk_size, 0, MEMORY_CONFIG, NULL, NULL), + createLongLongConfig("max-snapshot-slip", NULL, MODIFIABLE_CONFIG, 0, 5000, g_pserver->snapshot_slip, 400, 0, NULL, NULL), /* Unsigned Long Long configs */ createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory), diff --git a/src/db.cpp b/src/db.cpp index e55c887a3..a115c887b 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2488,7 +2488,10 @@ void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add) { } else { fModified = raxRemove(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL); } - serverAssert(fModified); + // This assert is disabled when a snapshot depth is >0 because prepOverwriteForSnapshot will add in a tombstone, + // this prevents ensure from adding the key to the dictionary which means the caller isn't aware we're already tracking + // the key. + serverAssert(fModified || g_pserver->db[0]->snapshot_depth() > 0); if (indexed != buf) zfree(indexed); } diff --git a/src/dict.cpp b/src/dict.cpp index d94f17701..34efe1e82 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -575,7 +575,7 @@ int dictRehashMilliseconds(dict *d, int ms) { * dictionary so that the hash table automatically migrates from H1 to H2 * while it is actively used. */ static void _dictRehashStep(dict *d) { - uint16_t pauserehash; + int16_t pauserehash; __atomic_load(&d->pauserehash, &pauserehash, __ATOMIC_RELAXED); if (pauserehash == 0) dictRehash(d,1); } @@ -1519,7 +1519,7 @@ void dictGetStats(char *buf, size_t bufsize, dict *d) { void dictForceRehash(dict *d) { - uint16_t pauserehash; + int16_t pauserehash; __atomic_load(&d->pauserehash, &pauserehash, __ATOMIC_RELAXED); while (pauserehash == 0 && dictIsRehashing(d)) _dictRehashStep(d); } diff --git a/src/module.cpp b/src/module.cpp index 906493f86..6f4315c32 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -4289,10 +4289,9 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch } { - aeAcquireLock(); + AeLocker locker; locker.arm(nullptr); std::unique_lock ul(c->lock); call(c,call_flags); - aeReleaseLock(); } g_pserver->replication_allowed = prev_replication_allowed; diff --git a/src/networking.cpp b/src/networking.cpp index 98b6aa735..266119343 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -145,7 +145,7 @@ client *createClient(connection *conn, int iel) { client_id = g_pserver->next_client_id.fetch_add(1); c->iel = iel; c->id = client_id; - sprintf(c->lock.szName, "client %lu", client_id); + sprintf(c->lock.szName, "client %" PRIu64, client_id); c->resp = 2; c->conn = conn; c->name = NULL; @@ -208,7 +208,7 @@ client *createClient(connection *conn, int iel) { c->paused_list_node = NULL; c->client_tracking_redirection = 0; c->casyncOpsPending = 0; - c->mvccCheckpoint = getMvccTstamp(); + c->mvccCheckpoint = 0; c->master_error = 0; memset(c->uuid, 0, UUID_BINARY_LEN); @@ -1345,7 +1345,7 @@ void acceptOnThread(connection *conn, int flags, char *cip) szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); } - int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT, fBootLoad] { + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT] { connMarshalThread(conn); acceptCommonHandler(conn,flags,szT,ielTarget); rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); @@ -2754,7 +2754,7 @@ void readQueryFromClient(connection *conn) { // Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often // so we exclude them unless the snapshot we need already exists bool fSnapshotExists = c->db->mvccLastSnapshot >= c->mvccCheckpoint; - bool fWriteTooRecent = (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < redisDbPersistentDataSnapshot::msStaleThreshold/2); + bool fWriteTooRecent = (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < static_cast(g_pserver->snapshot_slip)/2); // The check below avoids running async commands if this is a frequent writer unless a snapshot is already there to service it if (!fWriteTooRecent || fSnapshotExists) { @@ -2766,9 +2766,9 @@ void readQueryFromClient(connection *conn) { } else { // If we're single threaded its actually better to just process the command here while the query is hot in the cache // multithreaded lock contention dominates and batching is better - aeAcquireLock(); + AeLocker locker; + locker.arm(c); runAndPropogateToReplicas(processInputBuffer, c, true /*fParse*/, CMD_CALL_FULL); - aeReleaseLock(); } } diff --git a/src/rdb.cpp b/src/rdb.cpp index 31fe905a6..4d6cde7d3 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1552,6 +1552,7 @@ struct rdbSaveThreadArgs void *rdbSaveThread(void *vargs) { + serverAssert(!g_pserver->rdbThreadVars.fDone); rdbSaveThreadArgs *args = reinterpret_cast(vargs); serverAssert(serverTL == nullptr); redisServerThreadVars vars; @@ -1577,6 +1578,7 @@ void *rdbSaveThread(void *vargs) "RDB",cbDiff/(1024*1024)); } + g_pserver->rdbThreadVars.fDone = true; return (retval == C_OK) ? (void*)0 : (void*)1; } @@ -2945,6 +2947,7 @@ public: serverTL = &vars; aeSetThreadOwnsLockOverride(true); +#ifdef __linux__ // We will inheret the server thread's affinity mask, clear it as we want to run on a different core. cpu_set_t *cpuset = CPU_ALLOC(std::thread::hardware_concurrency()); if (cpuset != nullptr) { @@ -2956,6 +2959,7 @@ public: pthread_setaffinity_np(pthread_self(), size, cpuset); CPU_FREE(cpuset); } +#endif for (;;) { if (queue.queueJobs.size_approx() == 0) { @@ -3624,6 +3628,7 @@ struct rdbSaveSocketThreadArgs }; void *rdbSaveToSlavesSocketsThread(void *vargs) { + serverAssert(!g_pserver->rdbThreadVars.fDone); /* Child */ serverAssert(serverTL == nullptr); rdbSaveSocketThreadArgs *args = (rdbSaveSocketThreadArgs*)vargs; @@ -3664,6 +3669,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs) close(args->safe_to_exit_pipe); zfree(args); + g_pserver->rdbThreadVars.fDone = true; return (retval == C_OK) ? (void*)0 : (void*)1; } diff --git a/src/redis-benchmark.cpp b/src/redis-benchmark.cpp index 48465df9c..397b557c6 100644 --- a/src/redis-benchmark.cpp +++ b/src/redis-benchmark.cpp @@ -42,6 +42,7 @@ #include #include #include +#include extern "C" { #include /* Use hiredis' sds compat header that maps sds calls to their hi_ variants */ #include /* Use hiredis sds. */ diff --git a/src/replication.cpp b/src/replication.cpp index 1b095f031..7cb9851d2 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -190,7 +190,11 @@ int bg_unlink(const char *filename) { bool createDiskBacklog() { // Lets create some disk backed pages and add them here std::string path = "./repl-backlog-temp" + std::to_string(gettid()); +#ifdef __APPLE__ + int fd = open(path.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); +#else int fd = open(path.c_str(), O_CREAT | O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR); +#endif if (fd < 0) { return false; } @@ -991,6 +995,7 @@ need_full_resync: } int checkClientOutputBufferLimits(client *c); +void clientInstallAsyncWriteHandler(client *c); class replicationBuffer { std::vector replicas; clientReplyBlock *reply = nullptr; @@ -1020,6 +1025,17 @@ public: if (reply == nullptr) return; size_t written = reply->used; + + for (auto replica : replicas) { + std::unique_lock ul(replica->lock); + while (checkClientOutputBufferLimits(replica) + && (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) { + ul.unlock(); + usleep(0); + ul.lock(); + } + } + aeAcquireLock(); for (size_t ireplica = 0; ireplica < replicas.size(); ++ireplica) { auto replica = replicas[ireplica]; @@ -1027,18 +1043,23 @@ public: replica->replstate = REPL_STATE_NONE; continue; } - - while (checkClientOutputBufferLimits(replica) - && (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) { - aeReleaseLock(); - usleep(10); - aeAcquireLock(); - } std::unique_lock lock(replica->lock); - addReplyProto(replica, reply->buf(), reply->used); + if (ireplica == (replicas.size()-1) && replica->replyAsync == nullptr) { + replica->replyAsync = reply; + reply = nullptr; + if (!(replica->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(replica); + } else { + addReplyProto(replica, reply->buf(), reply->used); + } } ProcessPendingAsyncWrites(); + for (auto c : replicas) { + if (c->flags & CLIENT_CLOSE_ASAP) { + std::unique_lock ul(c->lock); + c->replstate = REPL_STATE_NONE; // otherwise the client can't be free'd + } + } replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end()); aeReleaseLock(); if (reply != nullptr) { @@ -1058,7 +1079,7 @@ public: } if (reply == nullptr) { - reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*2))); + reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*64))); reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock); reply->used = 0; } @@ -1183,7 +1204,7 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { size_t snapshotDeclaredCount = spsnapshot->count(); replBuf.addArrayLen(snapshotDeclaredCount); size_t count = 0; - bool result = spsnapshot->enumerate([&replBuf, &count, &cbData, &lastLogTime, timeStart, &cbLastUpdate](const char *rgchKey, size_t cchKey, const void *rgchVal, size_t cchVal) -> bool{ + bool result = spsnapshot->enumerate([&replBuf, &count, &cbData, &lastLogTime, &cbLastUpdate](const char *rgchKey, size_t cchKey, const void *rgchVal, size_t cchVal) -> bool{ replBuf.addArrayLen(2); replBuf.addString(rgchKey, cchKey); @@ -1557,16 +1578,9 @@ LError: return; } -void processReplconfLicense(client *c, robj *arg) +void processReplconfLicense(client *c, robj *) { - if (cserver.license_key != nullptr) - { - if (strcmp(cserver.license_key, szFromObj(arg)) == 0) { - addReplyError(c, "Each replica must have a unique license key"); - c->flags |= CLIENT_CLOSE_AFTER_REPLY; - return; - } - } + // Only for back-compat addReply(c, shared.ok); } @@ -3632,41 +3646,6 @@ retry_connect: } sdsfree(err); err = NULL; - mi->repl_state = REPL_STATE_SEND_KEY; - // fallthrough - } - - /* Send LICENSE Key */ - if (mi->repl_state == REPL_STATE_SEND_KEY) - { - if (cserver.license_key == nullptr) - { - mi->repl_state = REPL_STATE_SEND_PSYNC; - } - else - { - err = sendCommand(conn,"REPLCONF","license",cserver.license_key,NULL); - if (err) goto write_error; - mi->repl_state = REPL_STATE_KEY_ACK; - return; - } - } - - /* LICENSE Key Ack */ - if (mi->repl_state == REPL_STATE_KEY_ACK) - { - err = receiveSynchronousResponse(mi, conn); - if (err[0] == '-') { - if (err[1] == 'E' && err[2] == 'R' && err[3] == 'R') { - // Replicating with non-enterprise - serverLog(LL_WARNING, "Replicating with non-enterprise server."); - } else { - serverLog(LL_WARNING, "Recieved error from client: %s", err); - sdsfree(err); - goto error; - } - } - sdsfree(err); mi->repl_state = REPL_STATE_SEND_PSYNC; // fallthrough } @@ -5598,7 +5577,7 @@ void flushReplBacklogToClients() if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) { bool fAsyncWrite = false; - long long min_offset = LONG_LONG_MAX; + long long min_offset = LLONG_MAX; // Ensure no overflow serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset); if (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart > g_pserver->repl_backlog_size) { @@ -5657,7 +5636,7 @@ LDone: // This may be called multiple times per "frame" so update with our progress flushing to clients g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; - g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst); + g_pserver->repl_lowest_off.store(min_offset == LLONG_MAX ? -1 : min_offset, std::memory_order_seq_cst); } } diff --git a/src/server.cpp b/src/server.cpp index 119f91bb4..dd24a84ef 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -65,7 +65,6 @@ #include #include #include "aelocker.h" -#include "keycheck.h" #include "motd.h" #include "t_nhash.h" #include "readwritelock.h" @@ -1178,6 +1177,10 @@ struct redisCommand redisCommandTable[] = { {"failover",failoverCommand,-1, "admin no-script ok-stale", + 0,NULL,0,0,0,0,0,0}, + + {"lfence", lfenceCommand,1, + "read-only random ok-stale", 0,NULL,0,0,0,0,0,0} }; @@ -2200,8 +2203,8 @@ void checkChildrenDone(void) { if (g_pserver->FRdbSaveInProgress() && !cserver.fForkBgSave) { void *rval = nullptr; - int err; - if ((err = pthread_tryjoin_np(g_pserver->rdbThreadVars.rdb_child_thread, &rval))) + int err = EAGAIN; + if (!g_pserver->rdbThreadVars.fDone || (err = pthread_join(g_pserver->rdbThreadVars.rdb_child_thread, &rval))) { if (err != EBUSY && err != EAGAIN) serverLog(LL_WARNING, "Error joining the background RDB save thread: %s\n", strerror(errno)); @@ -2211,6 +2214,7 @@ void checkChildrenDone(void) { int exitcode = (int)reinterpret_cast(rval); backgroundSaveDoneHandler(exitcode,g_pserver->rdbThreadVars.fRdbThreadCancel); g_pserver->rdbThreadVars.fRdbThreadCancel = false; + g_pserver->rdbThreadVars.fDone = false; if (exitcode == 0) receiveChildInfo(); } } @@ -2776,7 +2780,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* end any snapshots created by fast async commands */ for (int idb = 0; idb < cserver.dbnum; ++idb) { - if (serverTL->rgdbSnapshot[idb] != nullptr) { + if (serverTL->rgdbSnapshot[idb] != nullptr && serverTL->rgdbSnapshot[idb]->FStale()) { g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]); serverTL->rgdbSnapshot[idb] = nullptr; } @@ -4661,6 +4665,11 @@ void rejectCommand(client *c, robj *reply, int severity = ERR_CRITICAL) { } } +void lfenceCommand(client *c) { + c->mvccCheckpoint = getMvccTstamp(); + addReply(c, shared.ok); +} + void rejectCommandFormat(client *c, const char *fmt, ...) { if (c->cmd) c->cmd->rejected_calls++; flagTransaction(c); @@ -7513,13 +7522,6 @@ int main(int argc, char **argv) { serverLog(LL_WARNING, "Configuration loaded"); } -#ifndef NO_LICENSE_CHECK - if (!g_pserver->sentinel_mode && (cserver.license_key == nullptr || !FValidKey(cserver.license_key, strlen(cserver.license_key)))){ - serverLog(LL_WARNING, "Error: %s license key provided, exiting immediately.", cserver.license_key == nullptr ? "No" : "Invalid"); - exit(1); - } -#endif - validateConfiguration(); const char *err; diff --git a/src/server.h b/src/server.h index 4a0b9f55c..fea3e8889 100644 --- a/src/server.h +++ b/src/server.h @@ -373,8 +373,6 @@ inline bool operator!=(const void *p, const robj_sharedptr &rhs) #define CONFIG_DEFAULT_ACTIVE_REPLICA 0 #define CONFIG_DEFAULT_ENABLE_MULTIMASTER 0 -#define CONFIG_DEFAULT_LICENSE_KEY "" - #define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 64 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_SUBKEY_LOOKUPS_PER_LOOP 16384 /* Subkey loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ @@ -586,8 +584,6 @@ typedef enum { REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_UUID, /* they should ack with their UUID */ - REPL_STATE_SEND_KEY, - REPL_STATE_KEY_ACK, REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ /* --- End of handshake states --- */ @@ -1110,7 +1106,8 @@ public: redisDbPersistentData(); virtual ~redisDbPersistentData(); - redisDbPersistentData(redisDbPersistentData &&) = default; + redisDbPersistentData(const redisDbPersistentData &) = delete; + redisDbPersistentData(redisDbPersistentData &&) = delete; size_t slots() const { return dictSlots(m_pdict); } size_t size(bool fCachedOnly = false) const; @@ -1290,8 +1287,6 @@ public: // These need to be fixed using redisDbPersistentData::size; using redisDbPersistentData::expireSize; - - static const uint64_t msStaleThreshold = 500; }; /* Redis database representation. There are multiple databases identified @@ -2179,7 +2174,6 @@ struct redisServerConst { int enable_motd; /* Flag to retrieve the Message of today using CURL request*/ - sds license_key = nullptr; int delete_on_evict = false; // Only valid when a storage provider is set int thread_min_client_threshold = 50; int multimaster_no_forward; @@ -2371,6 +2365,7 @@ struct redisServer { struct _rdbThreadVars { std::atomic fRdbThreadCancel {false}; + std::atomic fDone {false}; int tmpfileNum = 0; pthread_t rdb_child_thread; int fRdbThreadActive = false; @@ -2609,6 +2604,8 @@ struct redisServer { IStorageFactory *m_pstorageFactory = nullptr; int storage_flush_period; // The time between flushes in the CRON job + long long snapshot_slip = 500; // The amount of time in milliseconds we let a snapshot be behind the current database + /* TLS Configuration */ int tls_cluster; int tls_replication; @@ -3788,6 +3785,7 @@ void hrenameCommand(client *c); void stralgoCommand(client *c); void resetCommand(client *c); void failoverCommand(client *c); +void lfenceCommand(client *c); int FBrokenLinkToMaster(); diff --git a/src/snapshot.cpp b/src/snapshot.cpp index a13f5d9f5..de3398818 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -626,7 +626,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function fnNew = [this, &fn, &celem, dictTombstone](const char *key, robj_roptr o) { + std::function fnNew = [&fn, &celem, dictTombstone](const char *key, robj_roptr o) { dictEntry *deTombstone = dictFind(dictTombstone, key); if (deTombstone != nullptr) return true; @@ -654,7 +654,7 @@ int redisDbPersistentDataSnapshot::snapshot_depth() const bool redisDbPersistentDataSnapshot::FStale() const { - return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= redisDbPersistentDataSnapshot::msStaleThreshold; + return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= static_cast(g_pserver->snapshot_slip); } void dictGCAsyncFree(dictAsyncRehashCtl *async) { diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index 0936c585a..a35fea056 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -44,7 +44,7 @@ public: virtual void flush() override; - size_t count() const; + size_t count() const override; protected: bool FKeyExists(const char *key, size_t cchKey) const; diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index c7f45b977..8e6ecd0d5 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -21,6 +21,16 @@ rocksdb::Options DefaultRocksDBOptions() { options.allow_mmap_reads = true; options.avoid_unnecessary_blocking_io = true; options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(0)); + + rocksdb::BlockBasedTableOptions table_options; + table_options.block_size = 16 * 1024; + table_options.cache_index_and_filter_blocks = true; + table_options.pin_l0_filter_and_index_blocks_in_cache = true; + table_options.data_block_index_type = rocksdb::BlockBasedTableOptions::kDataBlockBinaryAndHash; + table_options.checksum = rocksdb::kNoChecksum; + table_options.format_version = 4; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + return options; } @@ -51,19 +61,9 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons options.sst_file_manager = m_pfilemanager; options.create_if_missing = true; options.create_missing_column_families = true; + options.info_log_level = rocksdb::ERROR_LEVEL; rocksdb::DB *db = nullptr; - rocksdb::BlockBasedTableOptions table_options; - table_options.block_size = 16 * 1024; - table_options.cache_index_and_filter_blocks = true; - table_options.pin_l0_filter_and_index_blocks_in_cache = true; - table_options.data_block_index_type = rocksdb::BlockBasedTableOptions::kDataBlockBinaryAndHash; - table_options.checksum = rocksdb::kNoChecksum; - table_options.format_version = 4; - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - options.table_factory.reset( - rocksdb::NewBlockBasedTableFactory(table_options)); - for (int idb = 0; idb < dbnum; ++idb) { rocksdb::ColumnFamilyOptions cf_options(options); @@ -169,12 +169,14 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *pr std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(opts, spcolfamily.get())); it->SeekToFirst(); - if (fUnclean && it->Valid()) - printf("\tDatabase was not shutdown cleanly, recomputing metrics\n"); + bool fFirstRealKey = true; for (;it->Valid(); it->Next()) { if (FInternalKey(it->key().data(), it->key().size())) continue; + if (fUnclean && it->Valid() && fFirstRealKey) + printf("\tDatabase %d was not shutdown cleanly, recomputing metrics\n", db); + fFirstRealKey = false; if (iter != nullptr) iter(it->key().data(), it->key().size(), privdata); ++count; diff --git a/src/storage/teststorageprovider.h b/src/storage/teststorageprovider.h index 2b2b1a38d..f21732256 100644 --- a/src/storage/teststorageprovider.h +++ b/src/storage/teststorageprovider.h @@ -8,7 +8,7 @@ class TestStorageFactory : public IStorageFactory virtual class IStorage *createMetadataDb() override; virtual const char *name() const override; virtual size_t totalDiskspaceUsed() const override { return 0; } - virtual bool FSlow() const { return false; } + virtual bool FSlow() const override { return false; } }; class TestStorageProvider final : public IStorage diff --git a/tests/support/util.tcl b/tests/support/util.tcl index dd843110d..156f6f811 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -110,6 +110,7 @@ proc wait_for_ofs_sync {r1 r2} { } else { fail "replica didn't sync in time" } + $r2 lfence } proc wait_done_loading r {