Allow garbage collection of generic data

Former-commit-id: feadb7fb1845027422bcfca43dbcb6097409b8dc
This commit is contained in:
John Sully 2020-08-17 00:32:48 +00:00
parent 00934cd341
commit c0586b3aed
4 changed files with 66 additions and 8 deletions

View File

@ -45,7 +45,7 @@ void AsyncWorkQueue::WorkerThreadMain()
ProcessPendingAsyncWrites(); ProcessPendingAsyncWrites();
aeReleaseLock(); aeReleaseLock();
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch = 0; serverTL->gcEpoch.reset();
} }
listRelease(vars.clients_pending_asyncwrite); listRelease(vars.clients_pending_asyncwrite);

View File

@ -3,6 +3,12 @@
#include <assert.h> #include <assert.h>
#include <unordered_set> #include <unordered_set>
struct ICollectable
{
virtual ~ICollectable() {}
bool FWillFreeChildDebug() { return false; }
};
template<typename T> template<typename T>
class GarbageCollector class GarbageCollector
{ {

View File

@ -2505,17 +2505,17 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
latencyAddSampleIfNeeded("storage-commit", commit_latency); latencyAddSampleIfNeeded("storage-commit", commit_latency);
handleClientsWithPendingWrites(iel, aof_state); handleClientsWithPendingWrites(iel, aof_state);
if (serverTL->gcEpoch != 0) if (!serverTL->gcEpoch.isReset())
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/);
serverTL->gcEpoch = 0; serverTL->gcEpoch.reset();
aeAcquireLock(); aeAcquireLock();
/* Close clients that need to be closed asynchronous */ /* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue(iel); freeClientsInAsyncFreeQueue(iel);
if (serverTL->gcEpoch != 0) if (!serverTL->gcEpoch.isReset())
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/);
serverTL->gcEpoch = 0; serverTL->gcEpoch.reset();
/* Before we are going to sleep, let the threads access the dataset by /* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this * releasing the GIL. Redis main thread will not touch anything at this
@ -2531,7 +2531,7 @@ void afterSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop); UNUSED(eventLoop);
if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/); if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/);
serverAssert(serverTL->gcEpoch == 0); serverAssert(serverTL->gcEpoch.isReset());
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
aeAcquireLock(); aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)

View File

@ -1959,6 +1959,58 @@ struct clusterState;
#define MAX_EVENT_LOOPS 16 #define MAX_EVENT_LOOPS 16
#define IDX_EVENT_LOOP_MAIN 0 #define IDX_EVENT_LOOP_MAIN 0
class GarbageCollectorCollection
{
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollectorSnapshot;
GarbageCollector<ICollectable> garbageCollectorGeneric;
public:
struct Epoch
{
uint64_t epochSnapshot = 0;
uint64_t epochGeneric = 0;
void reset() {
epochSnapshot = 0;
epochGeneric = 0;
}
bool isReset() const {
return epochSnapshot == 0 && epochGeneric == 0;
}
};
Epoch startEpoch()
{
Epoch e;
e.epochSnapshot = garbageCollectorSnapshot.startEpoch();
e.epochGeneric = garbageCollectorGeneric.startEpoch();
return e;
}
void endEpoch(Epoch e, bool fNoFree = false)
{
garbageCollectorSnapshot.endEpoch(e.epochSnapshot, fNoFree);
garbageCollectorGeneric.endEpoch(e.epochGeneric, fNoFree);
}
void shutdown()
{
garbageCollectorSnapshot.shutdown();
garbageCollectorGeneric.shutdown();
}
void enqueue(Epoch e, std::unique_ptr<redisDbPersistentDataSnapshot> &&sp)
{
garbageCollectorSnapshot.enqueue(e.epochSnapshot, std::move(sp));
}
void enqueue(Epoch e, std::unique_ptr<ICollectable> &&sp)
{
garbageCollectorGeneric.enqueue(e.epochGeneric, std::move(sp));
}
};
// Per-thread variabels that may be accessed without a lock // Per-thread variabels that may be accessed without a lock
struct redisServerThreadVars { struct redisServerThreadVars {
aeEventLoop *el; aeEventLoop *el;
@ -1980,7 +2032,7 @@ struct redisServerThreadVars {
struct fastlock lockPendingWrite { "thread pending write" }; struct fastlock lockPendingWrite { "thread pending write" };
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
long unsigned commandsExecuted = 0; long unsigned commandsExecuted = 0;
uint64_t gcEpoch = 0; GarbageCollectorCollection::Epoch gcEpoch;
const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr; const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
bool fRetrySetAofEvent = false; bool fRetrySetAofEvent = false;
@ -2434,7 +2486,7 @@ struct redisServer {
/* System hardware info */ /* System hardware info */
size_t system_memory_size; /* Total memory in system as reported by OS */ size_t system_memory_size; /* Total memory in system as reported by OS */
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollector; GarbageCollectorCollection garbageCollector;
IStorageFactory *m_pstorageFactory = nullptr; IStorageFactory *m_pstorageFactory = nullptr;
int storage_flush_period; // The time between flushes in the CRON job int storage_flush_period; // The time between flushes in the CRON job