Merge branch 'keydbpro' into PRO_RELEASE_6

Former-commit-id: 219d7f3499fd826f039a1490ed3317791b084f94
This commit is contained in:
John Sully 2020-05-08 00:24:58 -04:00
commit e03a2bbb7f
7 changed files with 26 additions and 17 deletions

View File

@ -324,8 +324,8 @@ bool initializeStorageProvider(const char **err)
struct sysinfo sys; struct sysinfo sys;
if (sysinfo(&sys) == 0) if (sysinfo(&sys) == 0)
{ {
// By default it's half the memory. This gives sufficient room for background saving // By default it's a little under half the memory. This gives sufficient room for background saving
g_pserver->maxmemory = sys.totalram / 2; g_pserver->maxmemory = static_cast<unsigned long long>(sys.totalram / 2.2);
g_pserver->maxmemory_policy = MAXMEMORY_ALLKEYS_LRU; g_pserver->maxmemory_policy = MAXMEMORY_ALLKEYS_LRU;
} }
} }
@ -2176,7 +2176,7 @@ static int updateMaxmemory(long long val, long long prev, const char **err) {
if ((unsigned long long)val < used) { if ((unsigned long long)val < used) {
serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET (%llu) is smaller than the current memory usage (%zu). This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.", g_pserver->maxmemory, used); serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET (%llu) is smaller than the current memory usage (%zu). This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.", g_pserver->maxmemory, used);
} }
freeMemoryIfNeededAndSafe(); freeMemoryIfNeededAndSafe(false /*fPreSnapshot*/);
} }
return 1; return 1;
} }

View File

@ -422,7 +422,7 @@ size_t freeMemoryGetNotCountedMemory(void) {
* limit. * limit.
* (Populated both for C_ERR and C_OK) * (Populated both for C_ERR and C_OK)
*/ */
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level) { int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fPreSnapshot) {
size_t mem_reported, mem_used, mem_tofree; size_t mem_reported, mem_used, mem_tofree;
/* Check if we are over the memory usage limit. If we are not, no need /* Check if we are over the memory usage limit. If we are not, no need
@ -430,8 +430,10 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
mem_reported = zmalloc_used_memory(); mem_reported = zmalloc_used_memory();
if (total) *total = mem_reported; if (total) *total = mem_reported;
size_t maxmemory = g_pserver->maxmemory; size_t maxmemory = g_pserver->maxmemory;
if (fPreSnapshot)
maxmemory = static_cast<size_t>(maxmemory * 0.9); // derate memory by 10% since we won't be able to free during snapshot
if (g_pserver->FRdbSaveInProgress()) if (g_pserver->FRdbSaveInProgress())
maxmemory *= 2; maxmemory = static_cast<size_t>(maxmemory*1.5);
/* We may return ASAP if there is no need to compute the level. */ /* We may return ASAP if there is no need to compute the level. */
int return_ok_asap = !maxmemory || mem_reported <= maxmemory; int return_ok_asap = !maxmemory || mem_reported <= maxmemory;
@ -479,12 +481,12 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
* were over the limit, but the attempt to free memory was successful. * were over the limit, but the attempt to free memory was successful.
* Otehrwise if we are over the memory limit, but not enough memory * Otehrwise if we are over the memory limit, but not enough memory
* was freed to return back under the limit, the function returns C_ERR. */ * was freed to return back under the limit, the function returns C_ERR. */
int freeMemoryIfNeeded(void) { int freeMemoryIfNeeded(bool fPreSnapshot) {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
/* By default replicas should ignore maxmemory /* By default replicas should ignore maxmemory
* and just be masters exact copies. */ * and just be masters exact copies. */
if (listLength(g_pserver->masters) && g_pserver->repl_slave_ignore_maxmemory && !g_pserver->fActiveReplica) return C_OK; if (g_pserver->m_pstorageFactory == nullptr && listLength(g_pserver->masters) && g_pserver->repl_slave_ignore_maxmemory && !g_pserver->fActiveReplica) return C_OK;
size_t mem_reported, mem_tofree, mem_freed; size_t mem_reported, mem_tofree, mem_freed;
mstime_t latency, eviction_latency; mstime_t latency, eviction_latency;
@ -497,7 +499,7 @@ int freeMemoryIfNeeded(void) {
* POV of clients not being able to write, but also from the POV of * POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed. */ * expires and evictions of keys not being performed. */
if (clientsArePaused()) return C_OK; if (clientsArePaused()) return C_OK;
if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK) if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL,fPreSnapshot) == C_OK)
return C_OK; return C_OK;
mem_freed = 0; mem_freed = 0;
@ -714,7 +716,7 @@ cant_free:
* - Nor we are loading data right now. * - Nor we are loading data right now.
* *
*/ */
int freeMemoryIfNeededAndSafe(void) { int freeMemoryIfNeededAndSafe(bool fPreSnapshot) {
if (g_pserver->lua_timedout || g_pserver->loading) return C_OK; if (g_pserver->lua_timedout || g_pserver->loading) return C_OK;
return freeMemoryIfNeeded(); return freeMemoryIfNeeded(fPreSnapshot);
} }

View File

@ -2494,7 +2494,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
g_pserver->db[idb]->commitChanges(); g_pserver->db[idb]->commitChanges();
g_pserver->db[idb]->trackChanges(false); g_pserver->db[idb]->trackChanges(false);
} }
freeMemoryIfNeeded(); freeMemoryIfNeeded(false /* fPreSnapshot*/);
} }
} }

View File

@ -1472,6 +1472,9 @@ void evalGenericCommand(client *c, int evalsha) {
long long initial_server_dirty = g_pserver->dirty; long long initial_server_dirty = g_pserver->dirty;
int delhook = 0, err; int delhook = 0, err;
if (g_pserver->m_pstorageFactory != nullptr)
freeMemoryIfNeededAndSafe(true);
/* When we replicate whole scripts, we want the same PRNG sequence at /* When we replicate whole scripts, we want the same PRNG sequence at
* every call so that our PRNG is not affected by external state. */ * every call so that our PRNG is not affected by external state. */
redisSrand48(0); redisSrand48(0);

View File

@ -3871,7 +3871,7 @@ int processCommand(client *c, int callFlags) {
* propagation of DELs due to eviction. */ * propagation of DELs due to eviction. */
if (g_pserver->maxmemory && !g_pserver->lua_timedout) { if (g_pserver->maxmemory && !g_pserver->lua_timedout) {
locker.arm(c); locker.arm(c);
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR; int out_of_memory = freeMemoryIfNeededAndSafe(false /*fPreSnapshot*/) == C_ERR;
/* freeMemoryIfNeeded may flush replica output buffers. This may result /* freeMemoryIfNeeded may flush replica output buffers. This may result
* into a replica, that may be the active client, to be freed. */ * into a replica, that may be the active client, to be freed. */
if (serverTL->current_client == NULL) return C_ERR; if (serverTL->current_client == NULL) return C_ERR;
@ -4014,7 +4014,7 @@ int processCommand(client *c, int callFlags) {
queueMultiCommand(c); queueMultiCommand(c);
addReply(c,shared.queued); addReply(c,shared.queued);
} else { } else {
if (cserver.cthreads >= 2 && listLength(g_pserver->monitors) == 0 && c->cmd->proc == getCommand) if (cserver.cthreads >= 2 && g_pserver->m_pstorageFactory == nullptr && listLength(g_pserver->monitors) == 0 && c->cmd->proc == getCommand)
{ {
if (getCommandAsync(c)) if (getCommandAsync(c))
return C_OK; return C_OK;

View File

@ -1422,7 +1422,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e); friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e);
friend int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool); friend int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool);
friend void activeDefragCycle(void); friend void activeDefragCycle(void);
friend int freeMemoryIfNeeded(void); friend int freeMemoryIfNeeded(bool);
friend void activeExpireCycle(int); friend void activeExpireCycle(int);
friend void expireSlaveKeys(void); friend void expireSlaveKeys(void);
@ -2921,10 +2921,10 @@ int zslLexValueGteMin(sds value, zlexrangespec *spec);
int zslLexValueLteMax(sds value, zlexrangespec *spec); int zslLexValueLteMax(sds value, zlexrangespec *spec);
/* Core functions */ /* Core functions */
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level); int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fPreSnapshot=false);
size_t freeMemoryGetNotCountedMemory(); size_t freeMemoryGetNotCountedMemory();
int freeMemoryIfNeeded(void); int freeMemoryIfNeeded(bool fPreSnapshot);
int freeMemoryIfNeededAndSafe(void); int freeMemoryIfNeededAndSafe(bool fPreSnapshot);
int processCommand(client *c, int callFlags); int processCommand(client *c, int callFlags);
void setupSignalHandlers(void); void setupSignalHandlers(void);
struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommand(sds name);

View File

@ -8,6 +8,8 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
serverAssert(m_refCount == 0); // do not call this on a snapshot serverAssert(m_refCount == 0); // do not call this on a snapshot
freeMemoryIfNeededAndSafe(true /*fPreSnapshot*/);
int levels = 1; int levels = 1;
redisDbPersistentDataSnapshot *psnapshot = m_spdbSnapshotHOLDER.get(); redisDbPersistentDataSnapshot *psnapshot = m_spdbSnapshotHOLDER.get();
while (psnapshot != nullptr) while (psnapshot != nullptr)
@ -331,6 +333,8 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
serverAssert((m_refCount == 0 && m_pdict->iterators == 0) || (m_refCount != 0 && m_pdict->iterators == 1)); serverAssert((m_refCount == 0 && m_pdict->iterators == 0) || (m_refCount != 0 && m_pdict->iterators == 1));
serverAssert(m_spdbSnapshotHOLDER != nullptr || dictSize(m_pdictTombstone) == 0); serverAssert(m_spdbSnapshotHOLDER != nullptr || dictSize(m_pdictTombstone) == 0);
serverAssert(sizeStart == size()); serverAssert(sizeStart == size());
freeMemoryIfNeededAndSafe(false);
} }
dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOnly) const dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOnly) const