2019-11-24 17:59:02 -05:00
# include "server.h"
2019-11-28 19:00:51 -05:00
# include "aelocker.h"
2019-11-24 17:59:02 -05:00
2020-02-02 23:42:44 -05:00
static const size_t c_elementsSmallLimit = 500000 ;
2020-08-17 00:33:37 +00:00
static fastlock s_lock { " consolidate_children " } ; // this lock ensures only one thread is consolidating at a time
class LazyFree : public ICollectable
{
public :
virtual ~ LazyFree ( )
{
for ( auto * de : vecde )
{
dbDictType . keyDestructor ( nullptr , dictGetKey ( de ) ) ;
dbDictType . valDestructor ( nullptr , dictGetVal ( de ) ) ;
zfree ( de ) ;
}
for ( robj * o : vecobjLazyFree )
decrRefCount ( o ) ;
for ( dict * d : vecdictLazyFree )
dictRelease ( d ) ;
}
std : : vector < dict * > vecdictLazyFree ;
std : : vector < robj * > vecobjLazyFree ;
std : : vector < dictEntry * > vecde ;
} ;
2020-02-02 23:42:44 -05:00
2021-03-16 02:38:41 +00:00
void discontinueAsyncRehash ( dict * d ) {
if ( d - > asyncdata ! = nullptr ) {
auto adata = d - > asyncdata ;
while ( adata ! = nullptr ) {
adata - > abondon = true ;
adata = adata - > next ;
}
d - > rehashidx = 0 ;
}
}
2019-11-24 17:59:02 -05:00
const redisDbPersistentDataSnapshot * redisDbPersistentData : : createSnapshot ( uint64_t mvccCheckpoint , bool fOptional )
{
serverAssert ( GlobalLocksAcquired ( ) ) ;
serverAssert ( m_refCount = = 0 ) ; // do not call this on a snapshot
2021-05-28 23:32:46 +00:00
if ( performEvictions ( true /*fPreSnapshot*/ ) ! = C_OK & & fOptional )
2020-07-11 21:23:48 +00:00
return nullptr ; // can't create snapshot due to OOM
2020-05-07 23:07:31 -04:00
2019-11-24 17:59:02 -05:00
int levels = 1 ;
redisDbPersistentDataSnapshot * psnapshot = m_spdbSnapshotHOLDER . get ( ) ;
while ( psnapshot ! = nullptr )
{
+ + levels ;
psnapshot = psnapshot - > m_spdbSnapshotHOLDER . get ( ) ;
}
if ( m_spdbSnapshotHOLDER ! = nullptr )
{
2019-11-28 19:00:51 -05:00
// If possible reuse an existing snapshot (we want to minimize nesting)
2020-01-12 01:22:44 -05:00
if ( mvccCheckpoint < = m_spdbSnapshotHOLDER - > m_mvccCheckpoint )
2019-11-24 17:59:02 -05:00
{
2020-01-12 01:22:44 -05:00
if ( ! m_spdbSnapshotHOLDER - > FStale ( ) )
2019-11-28 19:00:51 -05:00
{
m_spdbSnapshotHOLDER - > m_refCount + + ;
return m_spdbSnapshotHOLDER . get ( ) ;
}
2020-01-12 01:22:44 -05:00
serverLog ( LL_VERBOSE , " Existing snapshot too old, creating a new one " ) ;
2019-11-24 17:59:02 -05:00
}
}
2020-01-12 01:22:44 -05:00
// See if we have too many levels and can bail out of this to reduce load
2020-07-10 01:43:51 +00:00
if ( fOptional & & ( levels > = 6 ) )
{
serverLog ( LL_DEBUG , " Snapshot nesting too deep, abondoning " ) ;
2020-01-12 01:22:44 -05:00
return nullptr ;
2020-07-10 01:43:51 +00:00
}
2020-01-12 01:22:44 -05:00
2019-11-24 17:59:02 -05:00
auto spdb = std : : unique_ptr < redisDbPersistentDataSnapshot > ( new ( MALLOC_LOCAL ) redisDbPersistentDataSnapshot ( ) ) ;
2021-03-03 07:06:19 +00:00
// We can't have async rehash modifying these. Setting the asyncdata list to null
// will cause us to throw away the async work rather than modify the tables in flight
2021-03-16 02:38:41 +00:00
discontinueAsyncRehash ( m_pdict ) ;
discontinueAsyncRehash ( m_pdictTombstone ) ;
2021-03-03 07:06:19 +00:00
2019-11-24 17:59:02 -05:00
spdb - > m_fAllChanged = false ;
spdb - > m_fTrackingChanges = 0 ;
spdb - > m_pdict = m_pdict ;
spdb - > m_pdictTombstone = m_pdictTombstone ;
// Add a fake iterator so the dicts don't rehash (they need to be read only)
2021-05-28 23:32:46 +00:00
dictPauseRehashing ( spdb - > m_pdict ) ;
2019-11-24 17:59:02 -05:00
dictForceRehash ( spdb - > m_pdictTombstone ) ; // prevent rehashing by finishing the rehash now
spdb - > m_spdbSnapshotHOLDER = std : : move ( m_spdbSnapshotHOLDER ) ;
2019-12-06 17:43:28 -05:00
if ( m_spstorage ! = nullptr )
2020-07-11 21:23:48 +00:00
spdb - > m_spstorage = std : : shared_ptr < StorageCache > ( const_cast < StorageCache * > ( m_spstorage - > clone ( ) ) ) ;
2019-11-24 17:59:02 -05:00
spdb - > m_pdbSnapshot = m_pdbSnapshot ;
spdb - > m_refCount = 1 ;
2020-01-12 01:22:44 -05:00
spdb - > m_mvccCheckpoint = getMvccTstamp ( ) ;
2019-11-24 17:59:02 -05:00
if ( m_setexpire ! = nullptr )
2019-12-17 17:39:04 -05:00
{
2020-06-09 19:58:42 -04:00
std : : unique_lock < fastlock > ul ( g_expireLock ) ;
2019-12-16 22:07:53 -05:00
spdb - > m_setexpire = new ( MALLOC_LOCAL ) expireset ( * m_setexpire ) ;
2019-12-17 17:39:04 -05:00
spdb - > m_setexpire - > pause_rehash ( ) ; // needs to be const
}
2019-11-24 17:59:02 -05:00
2020-08-15 23:05:56 +00:00
if ( dictIsRehashing ( spdb - > m_pdict ) | | dictIsRehashing ( spdb - > m_pdictTombstone ) ) {
2021-03-14 03:19:41 +00:00
serverLog ( LL_VERBOSE , " NOTICE: Suboptimal snapshot " ) ;
2020-08-15 23:05:56 +00:00
}
2019-11-24 17:59:02 -05:00
m_pdict = dictCreate ( & dbDictType , this ) ;
2020-08-15 23:05:56 +00:00
dictExpand ( m_pdict , 1024 ) ; // minimize rehash overhead
m_pdictTombstone = dictCreate ( & dbTombstoneDictType , this ) ;
2019-12-06 17:43:28 -05:00
2021-05-28 23:32:46 +00:00
serverAssert ( spdb - > m_pdict - > pauserehash = = 1 ) ;
2019-11-24 17:59:02 -05:00
m_spdbSnapshotHOLDER = std : : move ( spdb ) ;
m_pdbSnapshot = m_spdbSnapshotHOLDER . get ( ) ;
// Finally we need to take a ref on all our children snapshots. This ensures they aren't free'd before we are
redisDbPersistentData * pdbSnapshotNext = m_pdbSnapshot - > m_spdbSnapshotHOLDER . get ( ) ;
while ( pdbSnapshotNext ! = nullptr )
{
pdbSnapshotNext - > m_refCount + + ;
pdbSnapshotNext = pdbSnapshotNext - > m_spdbSnapshotHOLDER . get ( ) ;
}
2020-02-01 22:28:24 -05:00
if ( m_pdbSnapshotASYNC ! = nullptr )
{
// free the async snapshot, it's done its job
endSnapshot ( m_pdbSnapshotASYNC ) ; // should be just a dec ref (FAST)
m_pdbSnapshotASYNC = nullptr ;
}
2021-03-16 02:38:41 +00:00
std : : atomic_thread_fence ( std : : memory_order_seq_cst ) ;
2019-11-24 17:59:02 -05:00
return m_pdbSnapshot ;
}
void redisDbPersistentData : : recursiveFreeSnapshots ( redisDbPersistentDataSnapshot * psnapshot )
{
std : : vector < redisDbPersistentDataSnapshot * > stackSnapshots ;
// gather a stack of snapshots, we do this so we can free them in reverse
// Note: we don't touch the incoming psnapshot since the parent is free'ing that one
while ( ( psnapshot = psnapshot - > m_spdbSnapshotHOLDER . get ( ) ) ! = nullptr )
{
stackSnapshots . push_back ( psnapshot ) ;
}
for ( auto itr = stackSnapshots . rbegin ( ) ; itr ! = stackSnapshots . rend ( ) ; + + itr )
{
endSnapshot ( * itr ) ;
}
}
2019-11-28 19:00:51 -05:00
/* static */ void redisDbPersistentDataSnapshot : : gcDisposeSnapshot ( redisDbPersistentDataSnapshot * psnapshot )
{
psnapshot - > m_refCount - - ;
if ( psnapshot - > m_refCount < = 0 )
{
serverAssert ( psnapshot - > m_refCount = = 0 ) ;
// Remove our ref from any children and dispose them too
redisDbPersistentDataSnapshot * psnapshotChild = psnapshot ;
std : : vector < redisDbPersistentDataSnapshot * > vecClean ;
while ( ( psnapshotChild = psnapshotChild - > m_spdbSnapshotHOLDER . get ( ) ) ! = nullptr )
vecClean . push_back ( psnapshotChild ) ;
for ( auto psnapshotChild : vecClean )
gcDisposeSnapshot ( psnapshotChild ) ;
//psnapshot->m_pdict->iterators--;
psnapshot - > m_spdbSnapshotHOLDER . release ( ) ;
2021-03-14 23:40:41 +00:00
psnapshot - > m_pdbSnapshot = nullptr ;
2019-11-28 19:00:51 -05:00
g_pserver - > garbageCollector . enqueue ( serverTL - > gcEpoch , std : : unique_ptr < redisDbPersistentDataSnapshot > ( psnapshot ) ) ;
2020-01-12 01:22:44 -05:00
serverLog ( LL_VERBOSE , " Garbage collected snapshot " ) ;
2019-11-28 19:00:51 -05:00
}
}
2020-01-29 12:55:23 -05:00
void redisDbPersistentData : : restoreSnapshot ( const redisDbPersistentDataSnapshot * psnapshot )
{
serverAssert ( psnapshot - > m_refCount = = 1 ) ;
serverAssert ( m_spdbSnapshotHOLDER . get ( ) = = psnapshot ) ;
m_pdbSnapshot = psnapshot ; // if it was deleted restore it
size_t expectedSize = psnapshot - > size ( ) ;
dictEmpty ( m_pdict , nullptr ) ;
dictEmpty ( m_pdictTombstone , nullptr ) ;
2020-06-09 19:58:42 -04:00
{
std : : unique_lock < fastlock > ul ( g_expireLock ) ;
2020-01-29 12:55:23 -05:00
delete m_setexpire ;
m_setexpire = new ( MALLOC_LOCAL ) expireset ( * psnapshot - > m_setexpire ) ;
2020-06-09 19:58:42 -04:00
}
2020-01-29 12:55:23 -05:00
endSnapshot ( psnapshot ) ;
serverAssert ( size ( ) = = expectedSize ) ;
}
2020-02-01 22:28:24 -05:00
// This function is all about minimizing the amount of work done under global lock
// when there has been lots of changes since snapshot creation a naive endSnapshot()
// will block for a very long time and will cause latency spikes.
//
// Note that this function uses a lot more CPU time than a simple endSnapshot(), we
// have some internal heuristics to do a synchronous endSnapshot if it makes sense
void redisDbPersistentData : : endSnapshotAsync ( const redisDbPersistentDataSnapshot * psnapshot )
2019-11-24 17:59:02 -05:00
{
2020-08-04 04:37:16 +00:00
mstime_t latency ;
2020-08-17 00:33:37 +00:00
aeAcquireLock ( ) ;
while ( dictIsRehashing ( m_pdict ) | | dictIsRehashing ( m_pdictTombstone ) ) {
dictRehashMilliseconds ( m_pdict , 1 ) ;
dictRehashMilliseconds ( m_pdictTombstone , 1 ) ;
// Give someone else a chance
aeReleaseLock ( ) ;
usleep ( 300 ) ;
aeAcquireLock ( ) ;
}
latencyStartMonitor ( latency ) ;
2020-02-01 22:28:24 -05:00
if ( m_pdbSnapshotASYNC & & m_pdbSnapshotASYNC - > m_mvccCheckpoint < = psnapshot - > m_mvccCheckpoint )
{
// Free a stale async snapshot so consolidate_children can clean it up later
endSnapshot ( m_pdbSnapshotASYNC ) ; // FAST: just a ref decrement
m_pdbSnapshotASYNC = nullptr ;
}
size_t elements = dictSize ( m_pdictTombstone ) ;
// if neither dict is rehashing then the merge is O(1) so don't count the size
if ( dictIsRehashing ( psnapshot - > m_pdict ) | | dictIsRehashing ( m_pdict ) )
elements + = dictSize ( m_pdict ) ;
2020-02-02 23:42:44 -05:00
if ( elements < c_elementsSmallLimit | | psnapshot ! = m_spdbSnapshotHOLDER . get ( ) ) // heuristic
2020-02-01 22:28:24 -05:00
{
// For small snapshots it makes more sense just to merge it directly
endSnapshot ( psnapshot ) ;
2020-08-04 04:37:16 +00:00
latencyEndMonitor ( latency ) ;
latencyAddSampleIfNeeded ( " end-snapshot-async-synchronous-path " , latency ) ;
2020-02-01 22:28:24 -05:00
aeReleaseLock ( ) ;
return ;
}
2019-11-24 17:59:02 -05:00
2020-02-01 22:28:24 -05:00
// OK this is a big snapshot so lets do the merge work outside the lock
auto psnapshotT = createSnapshot ( LLONG_MAX , false ) ;
endSnapshot ( psnapshot ) ; // this will just dec the ref count since our new snapshot has a ref
2020-02-02 23:42:44 -05:00
psnapshot = nullptr ;
2020-08-17 00:33:37 +00:00
latencyEndMonitor ( latency ) ;
2020-08-04 04:37:16 +00:00
latencyAddSampleIfNeeded ( " end-snapshot-async-phase-1 " , latency ) ;
2020-08-17 00:33:37 +00:00
aeReleaseLock ( ) ;
2020-02-01 22:28:24 -05:00
// do the expensive work of merging snapshots outside the ref
2020-08-17 00:33:37 +00:00
if ( const_cast < redisDbPersistentDataSnapshot * > ( psnapshotT ) - > freeTombstoneObjects ( 1 ) ) // depth is one because we just creted it
{
aeAcquireLock ( ) ;
if ( m_pdbSnapshotASYNC ! = nullptr )
endSnapshot ( m_pdbSnapshotASYNC ) ;
m_pdbSnapshotASYNC = nullptr ;
endSnapshot ( psnapshotT ) ;
aeReleaseLock ( ) ;
return ;
}
2020-02-01 22:28:24 -05:00
// Final Cleanup
2020-08-04 04:37:16 +00:00
aeAcquireLock ( ) ; latencyStartMonitor ( latency ) ;
2020-02-01 22:28:24 -05:00
if ( m_pdbSnapshotASYNC = = nullptr )
m_pdbSnapshotASYNC = psnapshotT ;
else
2020-02-02 23:42:44 -05:00
endSnapshot ( psnapshotT ) ; // finally clean up our temp snapshot
2020-08-17 00:33:37 +00:00
latencyEndMonitor ( latency ) ;
2020-08-04 04:37:16 +00:00
latencyAddSampleIfNeeded ( " end-snapshot-async-phase-2 " , latency ) ;
2020-08-17 00:33:37 +00:00
aeReleaseLock ( ) ;
2020-02-01 22:28:24 -05:00
}
2020-08-16 00:13:19 +00:00
bool redisDbPersistentDataSnapshot : : freeTombstoneObjects ( int depth )
2020-02-02 23:42:44 -05:00
{
if ( m_pdbSnapshot = = nullptr )
2020-08-16 00:13:19 +00:00
{
serverAssert ( dictSize ( m_pdictTombstone ) = = 0 ) ;
return true ;
}
2020-02-02 23:42:44 -05:00
2020-08-17 00:33:37 +00:00
if ( ! const_cast < redisDbPersistentDataSnapshot * > ( m_pdbSnapshot ) - > freeTombstoneObjects ( depth + 1 ) )
return false ;
{
AeLocker ae ;
ae . arm ( nullptr ) ;
2020-02-02 23:42:44 -05:00
if ( m_pdbSnapshot - > m_refCount ! = depth & & ( m_pdbSnapshot - > m_refCount ! = ( m_refCount + 1 ) ) )
2020-08-16 00:13:19 +00:00
return false ;
2020-08-17 00:33:37 +00:00
ae . disarm ( ) ;
}
std : : unique_lock < fastlock > lock ( s_lock , std : : defer_lock ) ;
if ( ! lock . try_lock ( ) )
return false ; // this is a best effort function
2020-02-02 23:42:44 -05:00
2020-08-17 00:33:37 +00:00
std : : unique_ptr < LazyFree > splazy = std : : make_unique < LazyFree > ( ) ;
dict * dictTombstoneNew = dictCreate ( & dbTombstoneDictType , nullptr ) ;
2020-02-02 23:42:44 -05:00
dictIterator * di = dictGetIterator ( m_pdictTombstone ) ;
dictEntry * de ;
2020-08-16 00:13:19 +00:00
std : : vector < dictEntry * > vecdeFree ;
vecdeFree . reserve ( dictSize ( m_pdictTombstone ) ) ;
2020-08-17 00:33:37 +00:00
unsigned rgcremoved [ 2 ] = { 0 } ;
2020-02-02 23:42:44 -05:00
while ( ( de = dictNext ( di ) ) ! = nullptr )
{
2020-08-15 23:25:58 +00:00
dictEntry * * dePrev = nullptr ;
dictht * ht = nullptr ;
sds key = ( sds ) dictGetKey ( de ) ;
2020-08-17 00:33:37 +00:00
// BUG BUG: Why can't we do a shallow search here?
dictEntry * deObj = dictFindWithPrev ( m_pdbSnapshot - > m_pdict , key , ( uint64_t ) dictGetVal ( de ) , & dePrev , & ht , false ) ;
2020-08-15 23:25:58 +00:00
if ( deObj ! = nullptr )
2020-02-02 23:42:44 -05:00
{
2020-08-16 00:13:19 +00:00
// Now unlink the DE
__atomic_store ( dePrev , & deObj - > next , __ATOMIC_RELEASE ) ;
2020-08-17 00:33:37 +00:00
if ( ht = = & m_pdbSnapshot - > m_pdict - > ht [ 0 ] )
rgcremoved [ 0 ] + + ;
else
rgcremoved [ 1 ] + + ;
splazy - > vecde . push_back ( deObj ) ;
} else {
serverAssert ( dictFind ( m_pdbSnapshot - > m_pdict , key ) = = nullptr ) ;
serverAssert ( m_pdbSnapshot - > find_cached_threadsafe ( key ) ! = nullptr ) ;
dictAdd ( dictTombstoneNew , sdsdupshared ( ( sds ) dictGetKey ( de ) ) , dictGetVal ( de ) ) ;
2020-02-02 23:42:44 -05:00
}
}
dictReleaseIterator ( di ) ;
2020-08-16 00:13:19 +00:00
2020-08-17 00:33:37 +00:00
dictForceRehash ( dictTombstoneNew ) ;
2020-08-16 00:13:19 +00:00
aeAcquireLock ( ) ;
2021-03-16 02:38:41 +00:00
if ( m_pdbSnapshot - > m_pdict - > asyncdata ! = nullptr ) {
// In this case we use the asyncdata to free us, not our own lazy free
for ( auto de : splazy - > vecde )
dictFreeUnlinkedEntry ( m_pdbSnapshot - > m_pdict , de ) ;
splazy - > vecde . clear ( ) ;
}
2020-08-17 00:33:37 +00:00
dict * dT = m_pdbSnapshot - > m_pdict ;
splazy - > vecdictLazyFree . push_back ( m_pdictTombstone ) ;
__atomic_store ( & m_pdictTombstone , & dictTombstoneNew , __ATOMIC_RELEASE ) ;
__atomic_fetch_sub ( & dT - > ht [ 0 ] . used , rgcremoved [ 0 ] , __ATOMIC_RELEASE ) ;
__atomic_fetch_sub ( & dT - > ht [ 1 ] . used , rgcremoved [ 1 ] , __ATOMIC_RELEASE ) ;
serverLog ( LL_WARNING , " tombstones removed: %u, remain: %lu " , rgcremoved [ 0 ] + rgcremoved [ 1 ] , dictSize ( m_pdictTombstone ) ) ;
g_pserver - > garbageCollector . enqueue ( serverTL - > gcEpoch , std : : move ( splazy ) ) ;
2020-08-16 00:13:19 +00:00
aeReleaseLock ( ) ;
2020-08-17 00:33:37 +00:00
return true ;
2020-02-02 23:42:44 -05:00
}
2020-02-01 22:28:24 -05:00
void redisDbPersistentData : : endSnapshot ( const redisDbPersistentDataSnapshot * psnapshot )
{
serverAssert ( GlobalLocksAcquired ( ) ) ;
2019-11-24 17:59:02 -05:00
if ( m_spdbSnapshotHOLDER . get ( ) ! = psnapshot )
{
2019-11-28 19:00:51 -05:00
if ( m_spdbSnapshotHOLDER = = nullptr )
{
// This is an orphaned snapshot
redisDbPersistentDataSnapshot : : gcDisposeSnapshot ( const_cast < redisDbPersistentDataSnapshot * > ( psnapshot ) ) ;
return ;
}
2019-11-24 17:59:02 -05:00
m_spdbSnapshotHOLDER - > endSnapshot ( psnapshot ) ;
return ;
}
2020-08-04 04:37:16 +00:00
mstime_t latency_endsnapshot ;
latencyStartMonitor ( latency_endsnapshot ) ;
2019-11-24 17:59:02 -05:00
// Alright we're ready to be free'd, but first dump all the refs on our child snapshots
if ( m_spdbSnapshotHOLDER - > m_refCount = = 1 )
recursiveFreeSnapshots ( m_spdbSnapshotHOLDER . get ( ) ) ;
m_spdbSnapshotHOLDER - > m_refCount - - ;
if ( m_spdbSnapshotHOLDER - > m_refCount > 0 )
return ;
2020-02-01 21:08:26 -05:00
size_t sizeStart = size ( ) ;
2019-11-24 17:59:02 -05:00
serverAssert ( m_spdbSnapshotHOLDER - > m_refCount = = 0 ) ;
2021-05-28 23:32:46 +00:00
serverAssert ( ( m_refCount = = 0 & & m_pdict - > pauserehash = = 0 ) | | ( m_refCount ! = 0 & & m_pdict - > pauserehash = = 1 ) ) ;
2019-11-24 17:59:02 -05:00
2021-05-28 23:32:46 +00:00
serverAssert ( m_spdbSnapshotHOLDER - > m_pdict - > pauserehash = = 1 ) ; // All iterators should have been free'd except the fake one from createSnapshot
2019-11-24 17:59:02 -05:00
if ( m_refCount = = 0 )
{
2021-05-28 23:32:46 +00:00
dictResumeRehashing ( m_spdbSnapshotHOLDER - > m_pdict ) ;
2019-11-24 17:59:02 -05:00
}
if ( m_pdbSnapshot = = nullptr )
{
// the database was cleared so we don't need to recover the snapshot
dictEmpty ( m_pdictTombstone , nullptr ) ;
m_spdbSnapshotHOLDER = std : : move ( m_spdbSnapshotHOLDER - > m_spdbSnapshotHOLDER ) ;
return ;
}
// Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB
dictIterator * di = dictGetIterator ( m_pdictTombstone ) ;
dictEntry * de ;
2021-05-28 23:32:46 +00:00
dictPauseRehashing ( m_spdbSnapshotHOLDER - > m_pdict ) ;
2020-08-17 00:33:37 +00:00
auto splazy = std : : make_unique < LazyFree > ( ) ;
2019-11-24 17:59:02 -05:00
while ( ( de = dictNext ( di ) ) ! = NULL )
{
2020-06-04 01:07:14 -04:00
dictEntry * * dePrev ;
dictht * ht ;
2020-08-17 00:33:37 +00:00
// BUG BUG Why not a shallow search?
dictEntry * deSnapshot = dictFindWithPrev ( m_spdbSnapshotHOLDER - > m_pdict , dictGetKey ( de ) , ( uint64_t ) dictGetVal ( de ) , & dePrev , & ht , false /*!!sdsisshared((sds)dictGetKey(de))*/ ) ;
2020-06-03 23:30:35 -04:00
if ( deSnapshot = = nullptr & & m_spdbSnapshotHOLDER - > m_pdbSnapshot )
2019-11-25 17:50:40 -05:00
{
2020-06-03 23:30:35 -04:00
// The tombstone is for a grand child, propogate it (or possibly in the storage provider - but an extra tombstone won't hurt)
2020-08-17 00:33:37 +00:00
# ifdef CHECKED_BUILD
2019-12-20 17:45:07 -05:00
serverAssert ( m_spdbSnapshotHOLDER - > m_pdbSnapshot - > find_cached_threadsafe ( ( const char * ) dictGetKey ( de ) ) ! = nullptr ) ;
2020-08-17 00:33:37 +00:00
# endif
2021-03-14 03:21:09 +00:00
dictAdd ( m_spdbSnapshotHOLDER - > m_pdictTombstone , sdsdupshared ( ( sds ) dictGetKey ( de ) ) , dictGetVal ( de ) ) ;
2019-11-25 17:50:40 -05:00
continue ;
}
2020-06-03 23:30:35 -04:00
else if ( deSnapshot = = nullptr )
{
serverAssert ( m_spdbSnapshotHOLDER - > m_spstorage ! = nullptr ) ; // the only case where we can have a tombstone without a snapshot child is if a storage engine is set
continue ;
}
2019-11-24 17:59:02 -05:00
2020-06-04 01:07:14 -04:00
// Delete the object from the source dict, we don't use dictDelete to avoid a second search
2021-03-16 02:38:41 +00:00
* dePrev = deSnapshot - > next ; // Unlink it first
if ( deSnapshot ! = nullptr ) {
if ( m_spdbSnapshotHOLDER - > m_pdict - > asyncdata ! = nullptr ) {
dictFreeUnlinkedEntry ( m_spdbSnapshotHOLDER - > m_pdict , deSnapshot ) ;
} else {
splazy - > vecde . push_back ( deSnapshot ) ;
}
}
2020-06-04 01:07:14 -04:00
ht - > used - - ;
2019-11-24 17:59:02 -05:00
}
2020-08-17 00:33:37 +00:00
2021-05-28 23:32:46 +00:00
dictResumeRehashing ( m_spdbSnapshotHOLDER - > m_pdict ) ;
2019-11-24 17:59:02 -05:00
dictReleaseIterator ( di ) ;
2020-08-17 00:33:37 +00:00
splazy - > vecdictLazyFree . push_back ( m_pdictTombstone ) ;
2020-08-16 00:13:19 +00:00
m_pdictTombstone = dictCreate ( & dbTombstoneDictType , nullptr ) ;
2019-11-24 17:59:02 -05:00
// Stage 2 Move all new keys to the snapshot DB
2020-02-01 21:08:26 -05:00
dictMerge ( m_spdbSnapshotHOLDER - > m_pdict , m_pdict ) ;
2019-11-24 17:59:02 -05:00
// Stage 3 swap the databases with the snapshot
std : : swap ( m_pdict , m_spdbSnapshotHOLDER - > m_pdict ) ;
2019-11-28 19:00:51 -05:00
if ( m_spdbSnapshotHOLDER - > m_pdbSnapshot ! = nullptr )
std : : swap ( m_pdictTombstone , m_spdbSnapshotHOLDER - > m_pdictTombstone ) ;
2019-11-24 17:59:02 -05:00
// Finally free the snapshot
if ( m_pdbSnapshot ! = nullptr & & m_spdbSnapshotHOLDER - > m_pdbSnapshot ! = nullptr )
{
m_pdbSnapshot = m_spdbSnapshotHOLDER - > m_pdbSnapshot ;
}
else
{
m_pdbSnapshot = nullptr ;
}
2021-03-14 23:40:41 +00:00
m_spdbSnapshotHOLDER - > m_pdbSnapshot = nullptr ;
2019-11-24 17:59:02 -05:00
// Fixup the about to free'd snapshots iterator count so the dtor doesn't complain
if ( m_refCount )
{
2021-05-28 23:32:46 +00:00
dictResumeRehashing ( m_spdbSnapshotHOLDER - > m_pdict ) ;
2019-11-24 17:59:02 -05:00
}
2020-02-01 22:28:24 -05:00
auto spsnapshotFree = std : : move ( m_spdbSnapshotHOLDER ) ;
m_spdbSnapshotHOLDER = std : : move ( spsnapshotFree - > m_spdbSnapshotHOLDER ) ;
2020-08-17 00:33:37 +00:00
if ( serverTL ! = nullptr ) {
2020-02-01 22:28:24 -05:00
g_pserver - > garbageCollector . enqueue ( serverTL - > gcEpoch , std : : move ( spsnapshotFree ) ) ;
2020-08-17 00:33:37 +00:00
g_pserver - > garbageCollector . enqueue ( serverTL - > gcEpoch , std : : move ( splazy ) ) ;
}
2020-02-01 22:28:24 -05:00
// Sanity Checks
2019-11-24 17:59:02 -05:00
serverAssert ( m_spdbSnapshotHOLDER ! = nullptr | | m_pdbSnapshot = = nullptr ) ;
serverAssert ( m_pdbSnapshot = = m_spdbSnapshotHOLDER . get ( ) | | m_pdbSnapshot = = nullptr ) ;
2021-05-28 23:32:46 +00:00
serverAssert ( ( m_refCount = = 0 & & m_pdict - > pauserehash = = 0 ) | | ( m_refCount ! = 0 & & m_pdict - > pauserehash = = 1 ) ) ;
2019-11-25 17:50:40 -05:00
serverAssert ( m_spdbSnapshotHOLDER ! = nullptr | | dictSize ( m_pdictTombstone ) = = 0 ) ;
2020-02-01 21:08:26 -05:00
serverAssert ( sizeStart = = size ( ) ) ;
2020-05-07 23:07:31 -04:00
2020-06-04 01:07:14 -04:00
latencyEndMonitor ( latency_endsnapshot ) ;
latencyAddSampleIfNeeded ( " end-mvcc-snapshot " , latency_endsnapshot ) ;
2021-05-28 23:32:46 +00:00
performEvictions ( false ) ;
2019-11-24 17:59:02 -05:00
}
2020-03-23 23:12:10 -04:00
dict_iter redisDbPersistentDataSnapshot : : random_cache_threadsafe ( bool fPrimaryOnly ) const
2019-11-24 17:59:02 -05:00
{
if ( size ( ) = = 0 )
return dict_iter ( nullptr ) ;
2020-03-23 23:12:10 -04:00
if ( ! fPrimaryOnly & & m_pdbSnapshot ! = nullptr & & m_pdbSnapshot - > size ( ) > 0 )
2019-11-24 17:59:02 -05:00
{
dict_iter iter ( nullptr ) ;
double pctInSnapshot = ( double ) m_pdbSnapshot - > size ( ) / ( size ( ) + m_pdbSnapshot - > size ( ) ) ;
double randval = ( double ) rand ( ) / RAND_MAX ;
if ( randval < = pctInSnapshot )
{
2019-12-20 17:45:07 -05:00
return m_pdbSnapshot - > random_cache_threadsafe ( ) ;
2019-11-24 17:59:02 -05:00
}
}
2019-12-20 17:45:07 -05:00
if ( dictSize ( m_pdict ) = = 0 )
return dict_iter ( nullptr ) ;
2019-11-24 17:59:02 -05:00
dictEntry * de = dictGetRandomKey ( m_pdict ) ;
2020-12-10 02:37:28 +00:00
return dict_iter ( m_pdict , de ) ;
2019-11-24 17:59:02 -05:00
}
2020-06-05 00:39:58 -04:00
dict_iter redisDbPersistentData : : find_cached_threadsafe ( const char * key ) const
2019-11-25 17:50:40 -05:00
{
2020-08-17 00:33:37 +00:00
dict * dictTombstone ;
__atomic_load ( & m_pdictTombstone , & dictTombstone , __ATOMIC_ACQUIRE ) ;
2019-11-25 17:50:40 -05:00
dictEntry * de = dictFind ( m_pdict , key ) ;
2020-08-17 00:33:37 +00:00
if ( de = = nullptr & & m_pdbSnapshot ! = nullptr & & dictFind ( dictTombstone , key ) = = nullptr )
2019-11-25 17:50:40 -05:00
{
2019-12-20 17:45:07 -05:00
auto itr = m_pdbSnapshot - > find_cached_threadsafe ( key ) ;
2020-02-02 23:42:44 -05:00
if ( itr ! = nullptr )
2019-11-25 17:50:40 -05:00
return itr ;
}
2020-12-10 02:37:28 +00:00
return dict_iter ( m_pdict , de ) ;
2019-11-25 17:50:40 -05:00
}
2020-07-09 12:57:35 -04:00
struct scan_callback_data
{
dict * dictTombstone ;
2020-07-10 03:43:56 +00:00
sds type ;
2020-07-09 12:57:35 -04:00
list * keys ;
} ;
void snapshot_scan_callback ( void * privdata , const dictEntry * de )
{
scan_callback_data * data = ( scan_callback_data * ) privdata ;
if ( data - > dictTombstone ! = nullptr & & dictFind ( data - > dictTombstone , dictGetKey ( de ) ) ! = nullptr )
return ;
sds sdskey = ( sds ) dictGetKey ( de ) ;
2020-07-10 03:43:56 +00:00
if ( data - > type ! = nullptr )
{
if ( strcasecmp ( data - > type , getObjectTypeName ( ( robj * ) dictGetVal ( de ) ) ) ! = 0 )
return ;
}
2020-07-09 12:57:35 -04:00
listAddNodeHead ( data - > keys , createStringObject ( sdskey , sdslen ( sdskey ) ) ) ;
}
2020-07-10 03:43:56 +00:00
unsigned long redisDbPersistentDataSnapshot : : scan_threadsafe ( unsigned long iterator , long count , sds type , list * keys ) const
2020-07-09 12:57:35 -04:00
{
unsigned long iteratorReturn = 0 ;
2020-07-10 01:43:51 +00:00
scan_callback_data data ;
data . dictTombstone = m_pdictTombstone ;
data . keys = keys ;
2020-07-10 03:43:56 +00:00
data . type = type ;
2020-07-09 12:57:35 -04:00
const redisDbPersistentDataSnapshot * psnapshot ;
__atomic_load ( & m_pdbSnapshot , & psnapshot , __ATOMIC_ACQUIRE ) ;
if ( psnapshot ! = nullptr )
{
// Always process the snapshot first as we assume its bigger than we are
2020-07-10 03:43:56 +00:00
iteratorReturn = psnapshot - > scan_threadsafe ( iterator , count , type , keys ) ;
2020-07-09 12:57:35 -04:00
2020-07-10 01:43:51 +00:00
// Just catch up with our snapshot
do
{
iterator = dictScan ( m_pdict , iterator , snapshot_scan_callback , nullptr , & data ) ;
} while ( iterator ! = 0 & & ( iterator < iteratorReturn | | iteratorReturn = = 0 ) ) ;
}
else
2020-07-09 12:57:35 -04:00
{
long maxiterations = count * 10 ; // allow more iterations than keys for sparse tables
iteratorReturn = iterator ;
do {
2020-07-10 01:43:51 +00:00
iteratorReturn = dictScan ( m_pdict , iteratorReturn , snapshot_scan_callback , NULL , & data ) ;
2020-07-09 12:57:35 -04:00
} while ( iteratorReturn & &
maxiterations - - & &
listLength ( keys ) < ( unsigned long ) count ) ;
}
2020-07-10 01:43:51 +00:00
2020-07-09 12:57:35 -04:00
return iteratorReturn ;
}
2020-06-04 00:26:51 -04:00
bool redisDbPersistentDataSnapshot : : iterate_threadsafe ( std : : function < bool ( const char * , robj_roptr o ) > fn , bool fKeyOnly , bool fCacheOnly ) const
2020-08-17 00:33:37 +00:00
{
return iterate_threadsafe_core ( fn , fKeyOnly , fCacheOnly , true ) ;
}
bool redisDbPersistentDataSnapshot : : iterate_threadsafe_core ( std : : function < bool ( const char * , robj_roptr o ) > & fn , bool fKeyOnly , bool fCacheOnly , bool fFirst ) const
2019-11-24 17:59:02 -05:00
{
2019-12-23 17:17:41 -05:00
// Take the size so we can ensure we visited every element exactly once
// use volatile to ensure it's not checked too late. This makes it more
// likely we'll detect races (but it won't gurantee it)
2020-08-17 00:33:37 +00:00
aeAcquireLock ( ) ;
dict * dictTombstone ;
__atomic_load ( & m_pdictTombstone , & dictTombstone , __ATOMIC_ACQUIRE ) ;
2020-08-10 05:01:36 +00:00
volatile ssize_t celem = ( ssize_t ) size ( ) ;
2020-08-17 00:33:37 +00:00
aeReleaseLock ( ) ;
2019-12-23 17:17:41 -05:00
dictEntry * de = nullptr ;
bool fResult = true ;
dictIterator * di = dictGetSafeIterator ( m_pdict ) ;
while ( fResult & & ( ( de = dictNext ( di ) ) ! = nullptr ) )
{
- - celem ;
2020-01-12 01:22:44 -05:00
robj * o = ( robj * ) dictGetVal ( de ) ;
2019-12-23 17:17:41 -05:00
if ( ! fn ( ( const char * ) dictGetKey ( de ) , o ) )
fResult = false ;
}
dictReleaseIterator ( di ) ;
2020-06-04 00:26:51 -04:00
if ( m_spstorage ! = nullptr & & ! fCacheOnly )
2019-12-20 17:45:07 -05:00
{
2019-12-23 17:17:41 -05:00
bool fSawAll = fResult & & m_spstorage - > enumerate ( [ & ] ( const char * key , size_t cchKey , const void * data , size_t cbData ) {
2019-12-20 17:45:07 -05:00
sds sdsKey = sdsnewlen ( key , cchKey ) ;
dictEntry * de = dictFind ( m_pdict , sdsKey ) ;
2019-12-23 17:17:41 -05:00
bool fContinue = true ;
if ( de = = nullptr )
2019-12-20 17:45:07 -05:00
{
2020-01-03 15:53:36 -05:00
robj * o = nullptr ;
if ( ! fKeyOnly )
{
size_t offset = 0 ;
deserializeExpire ( sdsKey , ( const char * ) data , cbData , & offset ) ;
o = deserializeStoredObject ( this , sdsKey , reinterpret_cast < const char * > ( data ) + offset , cbData - offset ) ;
}
2019-12-20 17:45:07 -05:00
fContinue = fn ( sdsKey , o ) ;
if ( o ! = nullptr )
decrRefCount ( o ) ;
}
sdsfree ( sdsKey ) ;
return fContinue ;
} ) ;
return fSawAll ;
}
2019-12-17 17:39:04 -05:00
const redisDbPersistentDataSnapshot * psnapshot ;
2019-11-28 19:00:51 -05:00
__atomic_load ( & m_pdbSnapshot , & psnapshot , __ATOMIC_ACQUIRE ) ;
if ( fResult & & psnapshot ! = nullptr )
2019-11-24 17:59:02 -05:00
{
2022-03-07 19:28:39 -05:00
std : : function < bool ( const char * , robj_roptr o ) > fnNew = [ & fn , & celem , dictTombstone ] ( const char * key , robj_roptr o ) {
2020-08-17 00:33:37 +00:00
dictEntry * deTombstone = dictFind ( dictTombstone , key ) ;
2019-11-24 17:59:02 -05:00
if ( deTombstone ! = nullptr )
return true ;
// Alright it's a key in the use keyspace, lets ensure it and then pass it off
2019-11-25 17:50:40 -05:00
- - celem ;
2019-11-24 17:59:02 -05:00
return fn ( key , o ) ;
2020-08-17 00:33:37 +00:00
} ;
fResult = psnapshot - > iterate_threadsafe_core ( fnNew , fKeyOnly , fCacheOnly , false ) ;
2019-11-24 17:59:02 -05:00
}
2020-06-04 00:26:51 -04:00
// we should have hit all keys or had a good reason not to
2020-08-17 00:33:37 +00:00
if ( ! ( ! fResult | | celem = = 0 | | ( m_spstorage & & fCacheOnly ) ) )
serverLog ( LL_WARNING , " celem: %ld " , celem ) ;
serverAssert ( ! fResult | | celem = = 0 | | ( m_spstorage & & fCacheOnly ) | | ! fFirst ) ;
2019-11-24 17:59:02 -05:00
return fResult ;
}
2019-11-28 19:00:51 -05:00
int redisDbPersistentDataSnapshot : : snapshot_depth ( ) const
{
if ( m_pdbSnapshot )
return m_pdbSnapshot - > snapshot_depth ( ) + 1 ;
return 0 ;
}
2020-01-12 01:22:44 -05:00
bool redisDbPersistentDataSnapshot : : FStale ( ) const
{
2022-03-07 16:00:48 -05:00
return ( ( getMvccTstamp ( ) - m_mvccCheckpoint ) > > MVCC_MS_SHIFT ) > = static_cast < uint64_t > ( g_pserver - > snapshot_slip ) ;
2021-03-16 02:38:41 +00:00
}
void dictGCAsyncFree ( dictAsyncRehashCtl * async ) {
if ( async - > deGCList ! = nullptr & & serverTL ! = nullptr & & ! serverTL - > gcEpoch . isReset ( ) ) {
auto splazy = std : : make_unique < LazyFree > ( ) ;
auto * de = async - > deGCList ;
while ( de ! = nullptr ) {
splazy - > vecde . push_back ( de ) ;
de = de - > next ;
}
async - > deGCList = nullptr ;
g_pserver - > garbageCollector . enqueue ( serverTL - > gcEpoch , std : : move ( splazy ) ) ;
}
delete async ;
2019-11-28 19:00:51 -05:00
}