From ec6378767d3cc031ecb84f9e6f4cc8f61711e27a Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 4 May 2022 01:37:40 +0000 Subject: [PATCH] Fix PSYNC test crashes --- src/object.cpp | 12 +++------- src/rdb.cpp | 22 ++++++++++++++----- src/server.cpp | 4 ++-- src/server.h | 5 ++++- .../replication-psync-multimaster.tcl | 4 ++-- tests/test_helper.tcl | 1 + 6 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/object.cpp b/src/object.cpp index eb8118d11..44dcbd6e9 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -97,6 +97,7 @@ robj *createRawStringObject(const char *ptr, size_t len) { * allocated in the same chunk as the object itself. */ robj *createEmbeddedStringObject(const char *ptr, size_t len) { serverAssert(len <= UINT8_MAX); + // Note: If the size changes update serializeStoredStringObject size_t allocsize = sizeof(struct sdshdr8)+len+1; if (allocsize < sizeof(void*)) allocsize = sizeof(void*); @@ -1585,20 +1586,13 @@ sds serializeStoredStringObject(sds str, robj_roptr o) static_assert((sizeof(robj) + sizeof(mvcc)) == sizeof(redisObjectStack), ""); switch (o->encoding) { + case OBJ_ENCODING_EMBSTR: case OBJ_ENCODING_RAW: str = sdscatsds(str, (sds)szFromObj(o)); break; case OBJ_ENCODING_INT: break; //nop - - case OBJ_ENCODING_EMBSTR: - size_t cb = zmalloc_size(o.unsafe_robjcast()); - if (cb > sizeof(robj)) - { - str = sdscatlen(str, o.unsafe_robjcast() + 1, cb - sizeof(robj)); - } - break; } return str; @@ -1618,7 +1612,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb) break; case OBJ_ENCODING_EMBSTR: - newObject = createEmbeddedStringObject(szFromObj(oT), sdslen(szFromObj(oT))); + newObject = createEmbeddedStringObject(data+sizeof(robj)+sizeof(mvcc), cb-sizeof(robj)-sizeof(uint64_t)); break; case OBJ_ENCODING_RAW: diff --git a/src/rdb.cpp b/src/rdb.cpp index 320ab918b..9c74914d2 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1580,6 +1580,8 @@ void *rdbSaveThread(void *vargs) ssize_t cbStart = zmalloc_used_memory(); for (int idb = 0; idb < cserver.dbnum; ++idb) g_pserver->db[idb]->endSnapshotAsync(args->rgpdb[idb]); + + args->~rdbSaveThreadArgs(); zfree(args); ssize_t cbDiff = (cbStart - (ssize_t)zmalloc_used_memory()); g_pserver->garbageCollector.endEpoch(vars.gcEpoch); @@ -1640,7 +1642,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) } else { rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zcalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); - // Placement new + args = new (args) rdbSaveThreadArgs(); rdbSaveInfo rsiT; if (rsi == nullptr) rsi = &rsiT; @@ -1656,6 +1658,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) if (pthread_create(&child, NULL, rdbSaveThread, args)) { for (int idb = 0; idb < cserver.dbnum; ++idb) g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]); + args->~rdbSaveThreadArgs(); zfree(args); return C_ERR; } @@ -3201,7 +3204,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { * are required to skip AUX fields they don't understand. * * An AUX field is composed of two strings: key and value. */ - robj *auxkey, *auxval; + robj *auxkey = nullptr, *auxval = nullptr; if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr; if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr; @@ -3221,12 +3224,16 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } } else if (!strcasecmp(szFromObj(auxkey),"repl-masters")) { if (rsi) { - MasterSaveInfo msi; char *masters = szFromObj(auxval); char *saveptr; char *entry = strtok_r(masters, ":", &saveptr); while (entry != NULL) { - memcpy(msi.master_replid, entry, sizeof(msi.master_replid)); + MasterSaveInfo msi; + bool fSet = true; + if (strlen(entry) == sizeof(msi.master_replid)-1) + memcpy(msi.master_replid, entry, sizeof(msi.master_replid)); + else + fSet = false; entry = strtok_r(NULL, ":", &saveptr); if (entry == nullptr) break; msi.master_initial_offset = atoll(entry); @@ -3240,7 +3247,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { if (entry == nullptr) break; msi.selected_db = atoi(entry); entry = strtok_r(NULL, ":", &saveptr); - rsi->addMaster(msi); + if (fSet) + rsi->addMaster(msi); } } } else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) { @@ -3752,6 +3760,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs) aeThreadOffline(); close(args->safe_to_exit_pipe); + args->rsi.~rdbSaveInfo(); zfree(args); g_pserver->rdbThreadVars.fDone = true; return (retval == C_OK) ? (void*)0 : (void*)1; @@ -3784,7 +3793,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { args->rdb_pipe_write = pipefds[1]; /* write end */ anetNonBlock(NULL, g_pserver->rdb_pipe_read); - args->rsi = *(new (args) rdbSaveInfo(*rsi)); + args->rsi = *(new (&args->rsi) rdbSaveInfo(*rsi)); memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid)); args->rsi.master_repl_offset = g_pserver->master_repl_offset; @@ -3842,6 +3851,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { g_pserver->rdb_pipe_conns = NULL; g_pserver->rdb_pipe_numconns = 0; g_pserver->rdb_pipe_numconns_writing = 0; + args->rsi.~rdbSaveInfo(); zfree(args); closeChildInfoPipe(); return C_ERR; diff --git a/src/server.cpp b/src/server.cpp index 373b20337..f356bb96e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6694,10 +6694,10 @@ static void sigShutdownHandler(int sig) { serverLogFromHandler(LL_WARNING, "You insist... exiting now."); rdbRemoveTempFile(g_pserver->rdbThreadVars.tmpfileNum, 1); g_pserver->garbageCollector.shutdown(); - exit(1); /* Exit with an error since this was not a clean shutdown. */ + _Exit(1); /* Exit with an error since this was not a clean shutdown. */ } else if (g_pserver->loading) { serverLogFromHandler(LL_WARNING, "Received shutdown signal during loading, exiting now."); - exit(0); + _Exit(0); // calling dtors is undesirable, exit immediately } serverLogFromHandler(LL_WARNING, msg); diff --git a/src/server.h b/src/server.h index f96482612..386f8e754 100644 --- a/src/server.h +++ b/src/server.h @@ -1888,7 +1888,9 @@ struct redisMaster { }; struct MasterSaveInfo { - MasterSaveInfo() = default; + MasterSaveInfo() { + memcpy(master_replid, "0000000000000000000000000000000000000000", sizeof(master_replid)); + } MasterSaveInfo(const redisMaster &mi) { memcpy(master_replid, mi.master_replid, sizeof(mi.master_replid)); if (mi.master) { @@ -1935,6 +1937,7 @@ public: repl_stream_db = -1; repl_id_is_set = 0; memcpy(repl_id, "0000000000000000000000000000000000000000", sizeof(repl_id)); + repl_id[CONFIG_RUN_ID_SIZE] = '\0'; repl_offset = -1; fForceSetKey = TRUE; mvccMinThreshold = 0; diff --git a/tests/integration/replication-psync-multimaster.tcl b/tests/integration/replication-psync-multimaster.tcl index 90c3c0711..ada3f8cec 100644 --- a/tests/integration/replication-psync-multimaster.tcl +++ b/tests/integration/replication-psync-multimaster.tcl @@ -103,7 +103,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco [lindex [$replica role] 3] eq {connected} } else { fail "replica still not connected after some time" - } + } wait_for_condition 100 100 { [$master debug digest] == [$replica debug digest] @@ -147,4 +147,4 @@ foreach mdl {no yes} { assert {[s -1 sync_partial_err] > 0} } $mdl $sdl 1 } -} \ No newline at end of file +} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 77fc11a24..82be834db 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -61,6 +61,7 @@ set ::all_tests { integration/keydb-cli integration/keydb-benchmark integration/replication-fast + integration/replication-psync-multimaster unit/pubsub unit/slowlog unit/scripting