diff --git a/src/SnapshotPayloadParseState.cpp b/src/SnapshotPayloadParseState.cpp index ef28e54f1..83bea31eb 100644 --- a/src/SnapshotPayloadParseState.cpp +++ b/src/SnapshotPayloadParseState.cpp @@ -136,14 +136,33 @@ void SnapshotPayloadParseState::flushQueuedKeys() { int idb = current_database; serverAssert(vecqueuedKeys.size() == vecqueuedVals.size()); auto sizePrev = vecqueuedKeys.size(); - (*insertsInFlight)++; - std::weak_ptr> insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous if (current_database < cserver.dbnum) { - g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { - g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); - (*(insertsInFlightTmp.lock()))--; - delete pallocator; - }); + if (g_pserver->m_pstorageFactory) { + (*insertsInFlight)++; + std::weak_ptr> insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous + g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { + g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); + (*(insertsInFlightTmp.lock()))--; + delete pallocator; + }); + } else { + for (size_t ival = 0; ival < vecqueuedKeys.size(); ++ival) { + size_t offset = 0; + auto spexpire = deserializeExpire(vecqueuedVals[ival], vecqueuedValsCb[ival], &offset); + auto o = deserializeStoredObject(vecqueuedVals[ival] + offset, vecqueuedValsCb[ival] - offset); + sds sdsKey = sdsnewlen(vecqueuedKeys[ival], -static_cast(vecqueuedKeysCb[ival])); + if (dbMerge(g_pserver->db[idb], sdsKey, o, false)) { + if (spexpire != nullptr) + g_pserver->db[idb]->setExpire(sdsKey, std::move(*spexpire)); + } else { + sdsfree(sdsKey); + } + } + vecqueuedKeys.clear(); + vecqueuedKeysCb.clear(); + vecqueuedVals.clear(); + vecqueuedValsCb.clear(); + } } else { // else drop the data vecqueuedKeys.clear(); diff --git a/src/config.cpp b/src/config.cpp index 75502ca3e..2b33f405b 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2983,6 +2983,7 @@ standardConfig configs[] = { createIntConfig("overload-protect-percent", NULL, MODIFIABLE_CONFIG, 0, 200, g_pserver->overload_protect_threshold, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("force-eviction-percent", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->force_eviction_percent, 0, INTEGER_CONFIG, NULL, NULL), createBoolConfig("enable-async-rehash", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_rehash, 1, NULL, NULL), + createBoolConfig("enable-keydb-fastsync", NULL, MODIFIABLE_CONFIG, g_pserver->fEnableFastSync, 1, NULL, NULL), #ifdef USE_OPENSSL createIntConfig("tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, g_pserver->tls_port, 0, INTEGER_CONFIG, NULL, updateTLSPort), /* TCP port. */ diff --git a/src/db.cpp b/src/db.cpp index 75a543f01..3e29801a7 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -56,7 +56,6 @@ int expireIfNeeded(redisDb *db, robj *key, robj *o); void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add); std::unique_ptr deserializeExpire(const char *str, size_t cch, size_t *poffset); -sds serializeStoredObjectAndExpire(robj_roptr o); dictType dictChangeDescType { dictSdsHash, /* hash function */ diff --git a/src/replication.cpp b/src/replication.cpp index ecf8bcd16..f3552f9d1 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1019,8 +1019,6 @@ public: void addReplica(client *replica) { replicas.push_back(replica); replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset()); - // Optimize the socket for bulk transfer - //connDisableTcpNoDelay(replica->conn); } bool isActive() const { return !replicas.empty(); } @@ -1093,7 +1091,7 @@ public: reply->used += size; } - void addLongWithPrefix(long val, char prefix) { + void addLongLongWithPrefix(long long val, char prefix) { char buf[128]; int len; @@ -1105,15 +1103,19 @@ public: } void addArrayLen(int len) { - addLongWithPrefix(len, '*'); + addLongLongWithPrefix(len, '*'); } void addLong(long val) { - addLongWithPrefix(val, ':'); + addLongLongWithPrefix(val, ':'); + } + + void addLongLong(long long val) { + addLongLongWithPrefix(val, ':'); } void addString(const char *s, unsigned long len) { - addLongWithPrefix(len, '$'); + addLongLongWithPrefix(len, '$'); addData(s, len); addData("\r\n", 2); } @@ -1149,22 +1151,19 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) { // TODO: This needs to be on a background thread int retval = C_OK; serverAssert(GlobalLocksAcquired()); - serverLog(LL_NOTICE, "Starting storage provider fast full sync with target: %s", "disk"); + serverLog(LL_NOTICE, "Starting fast full sync with target: %s", "disk"); std::shared_ptr spreplBuf = std::make_shared(); listNode *ln; listIter li; - client *replica = nullptr; listRewind(g_pserver->slaves, &li); - while (replica == nullptr && (ln = listNext(&li))) { + while ((ln = listNext(&li))) { client *replicaCur = (client*)listNodeValue(ln); if ((replicaCur->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC) && (replicaCur->replstate == SLAVE_STATE_WAIT_BGSAVE_START)) { - replica = replicaCur; - spreplBuf->addReplica(replica); + spreplBuf->addReplica(replicaCur); replicaCur->replstate = SLAVE_STATE_FASTSYNC_TX; } } - serverAssert(replica != nullptr); spreplBuf->addArrayLen(2); // Two sections: Metadata and databases @@ -1179,7 +1178,7 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) { spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE); spreplBuf->addArrayLen(2); spreplBuf->addString("repl-offset", 11); - spreplBuf->addLong(rsi->master_repl_offset); + spreplBuf->addLongLong(rsi->master_repl_offset); if (dictSize(g_pserver->lua_scripts)) { dictEntry *de; @@ -1195,13 +1194,13 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) { di = NULL; /* So that we don't release it again on error. */ } - std::shared_ptr>> spvecspsnapshot = std::make_shared>>(); + std::shared_ptr> spvecsnapshot = std::make_shared>(); for (int idb = 0; idb < cserver.dbnum; ++idb) { - spvecspsnapshot->emplace_back(g_pserver->db[idb]->CloneStorageCache()); + spvecsnapshot->emplace_back(g_pserver->db[idb]->createSnapshot(getMvccTstamp(), false)); } aeReleaseLock(); - g_pserver->asyncworkqueue->AddWorkFunction([spreplBuf = std::move(spreplBuf), spvecspsnapshot = std::move(spvecspsnapshot)]{ + g_pserver->asyncworkqueue->AddWorkFunction([spreplBuf = std::move(spreplBuf), spvecsnapshot = std::move(spvecsnapshot)]{ int retval = C_OK; auto timeStart = ustime(); auto lastLogTime = timeStart; @@ -1211,15 +1210,16 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) { // Databases replBuf.addArrayLen(cserver.dbnum); for (int idb = 0; idb < cserver.dbnum; ++idb) { - auto &spsnapshot = (*spvecspsnapshot)[idb]; - size_t snapshotDeclaredCount = spsnapshot->count(); + auto &spsnapshot = (*spvecsnapshot)[idb]; + size_t snapshotDeclaredCount = spsnapshot->size(); replBuf.addArrayLen(snapshotDeclaredCount); size_t count = 0; - bool result = spsnapshot->enumerate([&replBuf, &count, &cbData, &lastLogTime, &cbLastUpdate](const char *rgchKey, size_t cchKey, const void *rgchVal, size_t cchVal) -> bool{ + bool result = spsnapshot->iterate_threadsafe([&replBuf, &count, &cbData, &lastLogTime, &cbLastUpdate](const char *strKey, robj_roptr o) -> bool{ replBuf.addArrayLen(2); - replBuf.addString(rgchKey, cchKey); - replBuf.addString((const char *)rgchVal, cchVal); + replBuf.addString(strKey, sdslen(strKey)); + sds strT = serializeStoredObjectAndExpire(o); + replBuf.addString(strT, sdslen(strT)); ++count; if ((count % 8092) == 0) { auto curTime = ustime(); @@ -1230,7 +1230,8 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) { lastLogTime = ustime(); } } - cbData += cchKey + cchVal; + cbData += sdslen(strKey) + sdslen(strT); + sdsfree(strT); return replBuf.isActive(); }); @@ -1302,7 +1303,7 @@ int startBgsaveForReplication(int mincapa) { /* Only do rdbSave* when rsiptr is not NULL, * otherwise replica will miss repl-stream-db. */ if (rsiptr) { - if (mincapa & SLAVE_CAPA_KEYDB_FASTSYNC && g_pserver->m_pstorageFactory) + if (mincapa & SLAVE_CAPA_KEYDB_FASTSYNC && FFastSyncEnabled()) retval = rdbSaveSnapshotForReplication(rsiptr); else if (socket_target) retval = rdbSaveToSlavesSockets(rsiptr); @@ -1481,7 +1482,7 @@ void syncCommand(client *c) { } /* CASE 0: Fast Sync */ - if ((c->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC) && g_pserver->m_pstorageFactory) { + if (c->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC && FFastSyncEnabled()) { serverLog(LL_NOTICE,"Fast SYNC on next replication cycle"); /* CASE 1: BGSAVE is in progress, with disk target. */ } else if (g_pserver->FRdbSaveInProgress() && @@ -1672,7 +1673,7 @@ void replconfCommand(client *c) { c->slave_capa |= SLAVE_CAPA_PSYNC2; else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire")) c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE; - else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "rocksdb-snapshot-load")) + else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "keydb-fastsync")) c->slave_capa |= SLAVE_CAPA_KEYDB_FASTSYNC; fCapaCommand = true; @@ -1738,10 +1739,14 @@ void replconfCommand(client *c) { if (fCapaCommand) { sds reply = sdsnew("+OK"); - if (g_pserver->fActiveReplica) + if (g_pserver->fActiveReplica) { reply = sdscat(reply, " active-replica"); - if (g_pserver->m_pstorageFactory && (c->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC) && !g_pserver->fActiveReplica) - reply = sdscat(reply, " rocksdb-snapshot-save"); + } + if ((c->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC) && FFastSyncEnabled()) { + reply = sdscat(reply, " keydb-fastsync-save"); + } else { + c->slave_capa = (c->slave_capa & (~SLAVE_CAPA_KEYDB_FASTSYNC)); // never try to fast sync for this as they won't expect it + } reply = sdscat(reply, "\r\n"); addReplySds(c, reply); } else { @@ -2515,6 +2520,7 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi } } + serverAssert(mi->parseState != nullptr); for (int iter = 0; iter < 10; ++iter) { if (mi->parseState->shouldThrottle()) return false; @@ -2525,11 +2531,16 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi auto nread = connRead(conn, mi->bulkreadBuffer+qblen, readlen); if (nread <= 0) { - if (connGetState(conn) != CONN_STATE_CONNECTED) + if (connGetState(conn) != CONN_STATE_CONNECTED) { + serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", + (nread == -1) ? strerror(errno) : "connection lost"); cancelReplicationHandshake(mi, true); + } return false; } + g_pserver->stat_net_input_bytes += nread; mi->repl_transfer_lastio = g_pserver->unixtime; + mi->repl_transfer_read += nread; sdsIncrLen(mi->bulkreadBuffer,nread); size_t offset = 0; @@ -2601,8 +2612,9 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi rsi.repl_stream_db = mi->parseState->getMetaDataLongLong("repl-stream-db"); rsi.repl_offset = mi->parseState->getMetaDataLongLong("repl-offset"); sds str = mi->parseState->getMetaDataStr("repl-id"); - if (sdslen(str) == CONFIG_RUN_ID_SIZE+1) { + if (sdslen(str) == CONFIG_RUN_ID_SIZE) { memcpy(rsi.repl_id, str, CONFIG_RUN_ID_SIZE+1); + rsi.repl_id_is_set = 1; } fFinished = true; @@ -3018,13 +3030,11 @@ void readSyncBulkPayload(connection *conn) { /* We need to handle the case where the initial querybuf data was read by fast sync */ /* This should match the work readQueryFromClient would do for a master client */ mi->master->querybuf = sdscatsds(mi->master->querybuf, mi->bulkreadBuffer); + mi->master->pending_querybuf = sdscatsds(mi->master->pending_querybuf, mi->bulkreadBuffer); + mi->master->read_reploff += sdslen(mi->bulkreadBuffer); + sdsfree(mi->bulkreadBuffer); mi->bulkreadBuffer = nullptr; - - mi->master->pending_querybuf = sdscatlen(mi->master->pending_querybuf, - mi->master->querybuf,sdslen(mi->master->querybuf)); - - mi->master->read_reploff += sdslen(mi->master->querybuf); } mi->repl_transfer_s = nullptr; mi->repl_state = REPL_STATE_CONNECTED; @@ -3402,7 +3412,7 @@ void parseMasterCapa(redisMaster *mi, sds strcapa) // Parse the word if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) { mi->isActive = true; - } else if (strncmp(szStart, "rocksdb-snapshot-save", pchEnd - szStart) == 0) { + } else if (strncmp(szStart, "keydb-fastsync-save", pchEnd - szStart) == 0) { mi->isKeydbFastsync = true; } szStart = pchEnd + 1; @@ -3550,9 +3560,9 @@ retry_connect: "capa","psync2", "capa","activeExpire", }; - if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica && g_pserver->repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) { + if (FFastSyncEnabled() && g_pserver->repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) { veccapabilities.push_back("capa"); - veccapabilities.push_back("rocksdb-snapshot-load"); + veccapabilities.push_back("keydb-fastsync"); } err = sendCommandArgv(conn, veccapabilities.size(), veccapabilities.data(), nullptr); @@ -3864,6 +3874,10 @@ int cancelReplicationHandshake(redisMaster *mi, int reconnect) { delete mi->parseState; mi->parseState = nullptr; } + if (mi->bulkreadBuffer) { + sdsfree(mi->bulkreadBuffer); + mi->bulkreadBuffer = nullptr; + } if (mi->repl_state == REPL_STATE_TRANSFER) { replicationAbortSyncTransfer(mi); diff --git a/src/sds.h b/src/sds.h index 1c983cdc1..88683668c 100644 --- a/src/sds.h +++ b/src/sds.h @@ -418,7 +418,10 @@ public: sdsstring &operator=(const sdsstring &other) { sdsfree(m_str); - m_str = sdsdup(other.m_str); + if (other.m_str != nullptr) + m_str = sdsdup(other.m_str); + else + m_str = nullptr; return *this; } diff --git a/src/server.h b/src/server.h index b26163a08..d3d69d07f 100644 --- a/src/server.h +++ b/src/server.h @@ -2688,6 +2688,7 @@ struct redisServer { int fActiveReplica; /* Can this replica also be a master? */ int fWriteDuringActiveLoad; /* Can this active-replica write during an RDB load? */ + int fEnableFastSync = true; // Format: // Lower 20 bits: a counter incrementing for each command executed in the same millisecond @@ -3182,6 +3183,7 @@ void trimStringObjectIfNeeded(robj *o); robj *deserializeStoredObject(const void *data, size_t cb); std::unique_ptr deserializeExpire(const char *str, size_t cch, size_t *poffset); sds serializeStoredObject(robj_roptr o, sds sdsPrefix = nullptr); +sds serializeStoredObjectAndExpire(robj_roptr o); #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) @@ -3971,6 +3973,10 @@ inline int ielFromEventLoop(const aeEventLoop *eventLoop) return iel; } +inline bool FFastSyncEnabled() { + return g_pserver->fEnableFastSync && !g_pserver->fActiveReplica; +} + inline int FCorrectThread(client *c) { return (c->conn == nullptr) diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 2f6148fb5..fa84ac9d5 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -584,6 +584,7 @@ start_server {tags {"repl"}} { set master [srv 0 client] $master config set repl-diskless-sync yes $master config set repl-diskless-sync-delay 1 + $master config set enable-keydb-fastsync no set master_host [srv 0 host] set master_port [srv 0 port] set master_pid [srv 0 pid] @@ -727,6 +728,8 @@ start_server {tags {"repl"}} { } } } + + $master config set enable-keydb-fastsync yes } if 0 { @@ -867,8 +870,10 @@ test {replicaof right after disconnection} { test {Kill rdb child process if its dumping RDB is not useful} { start_server {tags {"repl"}} { set slave1 [srv 0 client] + $slave1 config set enable-keydb-fastsync no start_server {} { set slave2 [srv 0 client] + $slave2 config set enable-keydb-fastsync no start_server {} { set master [srv 0 client] set master_host [srv 0 host]