Allow garbage collection of generic data

Former-commit-id: feadb7fb1845027422bcfca43dbcb6097409b8dc
This commit is contained in:
John Sully 2020-08-17 00:32:48 +00:00
parent e97e4f1a35
commit b71f16471b
4 changed files with 66 additions and 8 deletions

View File

@ -45,7 +45,7 @@ void AsyncWorkQueue::WorkerThreadMain()
ProcessPendingAsyncWrites();
aeReleaseLock();
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch = 0;
serverTL->gcEpoch.reset();
}
listRelease(vars.clients_pending_asyncwrite);

View File

@ -3,6 +3,12 @@
#include <assert.h>
#include <unordered_set>
struct ICollectable
{
virtual ~ICollectable() {}
bool FWillFreeChildDebug() { return false; }
};
template<typename T>
class GarbageCollector
{

View File

@ -2505,17 +2505,17 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
latencyAddSampleIfNeeded("storage-commit", commit_latency);
handleClientsWithPendingWrites(iel, aof_state);
if (serverTL->gcEpoch != 0)
if (!serverTL->gcEpoch.isReset())
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/);
serverTL->gcEpoch = 0;
serverTL->gcEpoch.reset();
aeAcquireLock();
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue(iel);
if (serverTL->gcEpoch != 0)
if (!serverTL->gcEpoch.isReset())
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/);
serverTL->gcEpoch = 0;
serverTL->gcEpoch.reset();
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
@ -2531,7 +2531,7 @@ void afterSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/);
serverAssert(serverTL->gcEpoch == 0);
serverAssert(serverTL->gcEpoch.isReset());
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb)

View File

@ -1959,6 +1959,58 @@ struct clusterState;
#define MAX_EVENT_LOOPS 16
#define IDX_EVENT_LOOP_MAIN 0
class GarbageCollectorCollection
{
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollectorSnapshot;
GarbageCollector<ICollectable> garbageCollectorGeneric;
public:
struct Epoch
{
uint64_t epochSnapshot = 0;
uint64_t epochGeneric = 0;
void reset() {
epochSnapshot = 0;
epochGeneric = 0;
}
bool isReset() const {
return epochSnapshot == 0 && epochGeneric == 0;
}
};
Epoch startEpoch()
{
Epoch e;
e.epochSnapshot = garbageCollectorSnapshot.startEpoch();
e.epochGeneric = garbageCollectorGeneric.startEpoch();
return e;
}
void endEpoch(Epoch e, bool fNoFree = false)
{
garbageCollectorSnapshot.endEpoch(e.epochSnapshot, fNoFree);
garbageCollectorGeneric.endEpoch(e.epochGeneric, fNoFree);
}
void shutdown()
{
garbageCollectorSnapshot.shutdown();
garbageCollectorGeneric.shutdown();
}
void enqueue(Epoch e, std::unique_ptr<redisDbPersistentDataSnapshot> &&sp)
{
garbageCollectorSnapshot.enqueue(e.epochSnapshot, std::move(sp));
}
void enqueue(Epoch e, std::unique_ptr<ICollectable> &&sp)
{
garbageCollectorGeneric.enqueue(e.epochGeneric, std::move(sp));
}
};
// Per-thread variabels that may be accessed without a lock
struct redisServerThreadVars {
aeEventLoop *el;
@ -1980,7 +2032,7 @@ struct redisServerThreadVars {
struct fastlock lockPendingWrite { "thread pending write" };
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
long unsigned commandsExecuted = 0;
uint64_t gcEpoch = 0;
GarbageCollectorCollection::Epoch gcEpoch;
const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
bool fRetrySetAofEvent = false;
@ -2434,7 +2486,7 @@ struct redisServer {
/* System hardware info */
size_t system_memory_size; /* Total memory in system as reported by OS */
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollector;
GarbageCollectorCollection garbageCollector;
IStorageFactory *m_pstorageFactory = nullptr;
int storage_flush_period; // The time between flushes in the CRON job