Merge branch 'main' into keydbpro

This commit is contained in:
John Sully 2022-06-05 19:33:49 +00:00
commit d6cf39c485
5 changed files with 52 additions and 19 deletions

View File

@ -56,4 +56,18 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: make - name: make
run: |
sudo apt-get update
sudo apt-get -y install uuid-dev libcurl4-openssl-dev
make KEYDB_CFLAGS='-Werror' KEYDB_CXXFLAGS='-Werror' MALLOC=libc -j2
build-ubuntu-32bit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: make
run: |
sudo dpkg --add-architecture i386
sudo apt-get update
sudo apt-get -y install gcc-multilib g++-multilib libc6-dev-i386 lib32z1 uuid-dev:i386 libcurl4-openssl-dev:i386
make KEYDB_CFLAGS='-Werror' KEYDB_CXXFLAGS='-Werror' 32bit -j2

View File

@ -2674,6 +2674,11 @@ void redisDbPersistentData::prepOverwriteForSnapshot(char *key)
auto itr = m_pdbSnapshot->find_cached_threadsafe(key); auto itr = m_pdbSnapshot->find_cached_threadsafe(key);
if (itr.key() != nullptr) if (itr.key() != nullptr)
{ {
if (itr.val()->FExpires()) {
// Note: I'm sure we could handle this, but its too risky at the moment.
// There are known bugs doing this with expires
return;
}
sds keyNew = sdsdupshared(itr.key()); sds keyNew = sdsdupshared(itr.key());
if (dictAdd(m_pdictTombstone, keyNew, (void*)dictHashKey(m_pdict, key)) != DICT_OK) if (dictAdd(m_pdictTombstone, keyNew, (void*)dictHashKey(m_pdict, key)) != DICT_OK)
sdsfree(keyNew); sdsfree(keyNew);

View File

@ -2,6 +2,7 @@
#include <vector> #include <vector>
#include <assert.h> #include <assert.h>
#include <unordered_set> #include <unordered_set>
#include <list>
struct ICollectable struct ICollectable
{ {
@ -45,14 +46,14 @@ public:
void shutdown() void shutdown()
{ {
std::unique_lock<fastlock> lock(m_lock); std::unique_lock<fastlock> lock(m_lock);
m_vecepochs.clear(); m_listepochs.clear();
m_setepochOutstanding.clear(); m_setepochOutstanding.clear();
} }
bool empty() const bool empty() const
{ {
std::unique_lock<fastlock> lock(m_lock); std::unique_lock<fastlock> lock(m_lock);
return m_vecepochs.empty(); return m_listepochs.empty();
} }
void endEpoch(uint64_t epoch, bool fNoFree = false) void endEpoch(uint64_t epoch, bool fNoFree = false)
@ -63,12 +64,12 @@ public:
m_setepochOutstanding.erase(epoch); m_setepochOutstanding.erase(epoch);
if (fNoFree) if (fNoFree)
return; return;
std::vector<EpochHolder> vecclean; std::list<EpochHolder> listclean;
// No outstanding epochs? // No outstanding epochs?
if (m_setepochOutstanding.empty()) if (m_setepochOutstanding.empty())
{ {
vecclean = std::move(m_vecepochs); // Everything goes! listclean = std::move(m_listepochs); // Everything goes!
} }
else else
{ {
@ -77,18 +78,20 @@ public:
return; // No available epochs to free return; // No available epochs to free
// Clean any epochs available (after the lock) // Clean any epochs available (after the lock)
for (size_t iepoch = 0; iepoch < m_vecepochs.size(); ++iepoch) for (auto itr = m_listepochs.begin(); itr != m_listepochs.end(); /* itr incremented in loop*/)
{ {
auto &e = m_vecepochs[iepoch]; auto &e = *itr;
auto itrNext = itr;
++itrNext;
if (e < minepoch) if (e < minepoch)
{ {
vecclean.emplace_back(std::move(e)); listclean.emplace_back(std::move(e));
m_vecepochs.erase(m_vecepochs.begin() + iepoch); m_listepochs.erase(itr);
--iepoch;
} }
itr = itrNext;
} }
assert(vecclean.empty() || fMinElement); assert(listclean.empty() || fMinElement);
} }
lock.unlock(); // don't hold it for the potentially long delete of vecclean lock.unlock(); // don't hold it for the potentially long delete of vecclean
@ -100,13 +103,13 @@ public:
serverAssert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end()); serverAssert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end());
serverAssert(sp->FWillFreeChildDebug() == false); serverAssert(sp->FWillFreeChildDebug() == false);
auto itr = std::find(m_vecepochs.begin(), m_vecepochs.end(), m_epochNext+1); auto itr = std::find(m_listepochs.begin(), m_listepochs.end(), m_epochNext+1);
if (itr == m_vecepochs.end()) if (itr == m_listepochs.end())
{ {
EpochHolder e; EpochHolder e;
e.tstamp = m_epochNext+1; e.tstamp = m_epochNext+1;
e.m_vecObjs.push_back(std::move(sp)); e.m_vecObjs.push_back(std::move(sp));
m_vecepochs.emplace_back(std::move(e)); m_listepochs.emplace_back(std::move(e));
} }
else else
{ {
@ -117,7 +120,7 @@ public:
private: private:
mutable fastlock m_lock { "Garbage Collector"}; mutable fastlock m_lock { "Garbage Collector"};
std::vector<EpochHolder> m_vecepochs; std::list<EpochHolder> m_listepochs;
std::unordered_set<uint64_t> m_setepochOutstanding; std::unordered_set<uint64_t> m_setepochOutstanding;
uint64_t m_epochNext = 0; uint64_t m_epochNext = 0;
}; };

View File

@ -2757,9 +2757,10 @@ void readQueryFromClient(connection *conn) {
parseClientCommandBuffer(c); parseClientCommandBuffer(c);
if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) { if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) {
// Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often // Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often
// so we exclude them unless the snapshot we need already exists // so we exclude them unless the snapshot we need already exists.
// Note: In test mode we want to create snapshots as often as possibl to excercise them - we don't care about perf
bool fSnapshotExists = c->db->mvccLastSnapshot >= c->mvccCheckpoint; bool fSnapshotExists = c->db->mvccLastSnapshot >= c->mvccCheckpoint;
bool fWriteTooRecent = (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < static_cast<uint64_t>(g_pserver->snapshot_slip)/2); bool fWriteTooRecent = !g_fTestMode && (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < static_cast<uint64_t>(g_pserver->snapshot_slip)/2);
// The check below avoids running async commands if this is a frequent writer unless a snapshot is already there to service it // The check below avoids running async commands if this is a frequent writer unless a snapshot is already there to service it
if (!fWriteTooRecent || fSnapshotExists) { if (!fWriteTooRecent || fSnapshotExists) {

View File

@ -1657,13 +1657,18 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
g_pserver->rdbThreadVars.tmpfileNum++; g_pserver->rdbThreadVars.tmpfileNum++;
g_pserver->rdbThreadVars.fRdbThreadCancel = false; g_pserver->rdbThreadVars.fRdbThreadCancel = false;
if (pthread_create(&child, NULL, rdbSaveThread, args)) { pthread_attr_t tattr;
pthread_attr_init(&tattr);
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB
if (pthread_create(&child, &tattr, rdbSaveThread, args)) {
pthread_attr_destroy(&tattr);
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]); g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]);
args->~rdbSaveThreadArgs(); args->~rdbSaveThreadArgs();
zfree(args); zfree(args);
return C_ERR; return C_ERR;
} }
pthread_attr_destroy(&tattr);
g_pserver->child_type = CHILD_TYPE_RDB; g_pserver->child_type = CHILD_TYPE_RDB;
} }
return C_OK; return C_OK;
@ -3834,7 +3839,11 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
g_pserver->rdbThreadVars.tmpfileNum++; g_pserver->rdbThreadVars.tmpfileNum++;
g_pserver->rdbThreadVars.fRdbThreadCancel = false; g_pserver->rdbThreadVars.fRdbThreadCancel = false;
if (pthread_create(&child, nullptr, rdbSaveToSlavesSocketsThread, args)) { pthread_attr_t tattr;
pthread_attr_init(&tattr);
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB
if (pthread_create(&child, &tattr, rdbSaveToSlavesSocketsThread, args)) {
pthread_attr_destroy(&tattr);
serverLog(LL_WARNING,"Can't save in background: fork: %s", serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno)); strerror(errno));
@ -3860,6 +3869,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
closeChildInfoPipe(); closeChildInfoPipe();
return C_ERR; return C_ERR;
} }
pthread_attr_destroy(&tattr);
g_pserver->child_type = CHILD_TYPE_RDB; g_pserver->child_type = CHILD_TYPE_RDB;
serverLog(LL_NOTICE,"Background RDB transfer started"); serverLog(LL_NOTICE,"Background RDB transfer started");