Merge branch 'keydbpro' into PRO_RELEASE_6
Former-commit-id: c03c45a1a1bf3c92a75804e123e34ea394adad07
This commit is contained in:
commit
7596290472
209
monkey/monkey.py
Normal file
209
monkey/monkey.py
Normal file
@ -0,0 +1,209 @@
|
|||||||
|
import keydb
|
||||||
|
import random
|
||||||
|
import sched, time
|
||||||
|
import socket
|
||||||
|
import asyncore
|
||||||
|
import threading
|
||||||
|
import sys
|
||||||
|
from pprint import pprint
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
numclients = 50
|
||||||
|
#numkeys = 1000000
|
||||||
|
numkeys = 100000
|
||||||
|
|
||||||
|
# Globals
|
||||||
|
ops=0
|
||||||
|
s = sched.scheduler(time.time, time.sleep)
|
||||||
|
g_exit = False
|
||||||
|
|
||||||
|
def _buildResp(*args):
|
||||||
|
result = "*" + str(len(args)) + "\r\n"
|
||||||
|
for v in args:
|
||||||
|
result = result + "$" + str(len(v)) + "\r\n"
|
||||||
|
result = result + v + "\r\n"
|
||||||
|
return result.encode('utf-8')
|
||||||
|
|
||||||
|
class Client(asyncore.dispatcher):
|
||||||
|
def __init__(self, host, port):
|
||||||
|
asyncore.dispatcher.__init__(self)
|
||||||
|
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
self.connect((host, port))
|
||||||
|
self.buf = b''
|
||||||
|
self.inbuf = b''
|
||||||
|
self.callbacks = list()
|
||||||
|
|
||||||
|
def handle_connect(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def handle_read(self):
|
||||||
|
self.inbuf += self.recv(8192)
|
||||||
|
self.parse_response()
|
||||||
|
|
||||||
|
def handle_write(self):
|
||||||
|
sent = self.send(self.buf)
|
||||||
|
self.buf = self.buf[sent:]
|
||||||
|
|
||||||
|
def handle_close(self):
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
def writable(self):
|
||||||
|
return len(self.buf) > 0
|
||||||
|
|
||||||
|
def parse_array(self, startpos):
|
||||||
|
assert(self.inbuf[startpos] == ord('*'))
|
||||||
|
endrange = self.inbuf[startpos+1:].find(ord('\r')) + 1 + startpos
|
||||||
|
assert(endrange > 0)
|
||||||
|
numargs = int(self.inbuf[startpos+1:endrange])
|
||||||
|
assert(numargs > 0)
|
||||||
|
args = list()
|
||||||
|
startpos = endrange + 2 # plus 1 gets us to the '\n' and the next gets us to the start char
|
||||||
|
|
||||||
|
while len(args) < numargs:
|
||||||
|
# We're parsing entries of the form "$N\r\nnnnnnn\r\n"
|
||||||
|
if startpos >= len(self.inbuf):
|
||||||
|
return # Not the full response
|
||||||
|
if self.inbuf[startpos] == ord('*'):
|
||||||
|
startpos, arr = self.parse_array(startpos)
|
||||||
|
args.append(arr)
|
||||||
|
else:
|
||||||
|
assert(self.inbuf[startpos] == ord('$'))
|
||||||
|
startpos = startpos + 1
|
||||||
|
endrange = self.inbuf[startpos:].find(b'\r')
|
||||||
|
if endrange < 0:
|
||||||
|
return
|
||||||
|
endrange += startpos
|
||||||
|
assert(endrange <= len(self.inbuf))
|
||||||
|
length = int(self.inbuf[startpos:endrange])
|
||||||
|
if length < 0:
|
||||||
|
return
|
||||||
|
startpos = endrange + 2
|
||||||
|
assert((startpos + length) <= len(self.inbuf))
|
||||||
|
assert(self.inbuf[startpos+length] == ord('\r'))
|
||||||
|
assert(self.inbuf[startpos+length+1] == ord('\n'))
|
||||||
|
args.append(self.inbuf[startpos:(startpos+length)])
|
||||||
|
startpos += length + 2
|
||||||
|
assert(len(args) == numargs)
|
||||||
|
return startpos, args
|
||||||
|
|
||||||
|
def parse_response(self):
|
||||||
|
if len(self.inbuf) == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
while len(self.inbuf) > 0:
|
||||||
|
if self.inbuf[0] == ord('+') or self.inbuf[0] == ord('-') or self.inbuf[0] == ord(':'):
|
||||||
|
# This is a single line response
|
||||||
|
endpos = self.inbuf.find(b'\n')
|
||||||
|
if endpos < 0:
|
||||||
|
return # incomplete response
|
||||||
|
self.callbacks[0](self, self.inbuf[0:endpos-1])
|
||||||
|
self.callbacks.pop(0)
|
||||||
|
self.inbuf = self.inbuf[endpos+1:]
|
||||||
|
|
||||||
|
elif self.inbuf[0] == ord('*'):
|
||||||
|
#RESP response
|
||||||
|
try:
|
||||||
|
startpos, args = self.parse_array(0)
|
||||||
|
except:
|
||||||
|
return # Not all data here yet
|
||||||
|
self.callbacks[0](self, args)
|
||||||
|
self.callbacks.pop(0)
|
||||||
|
self.inbuf = self.inbuf[startpos:]
|
||||||
|
else:
|
||||||
|
print("ERROR: Unknown response:")
|
||||||
|
pprint(self.inbuf)
|
||||||
|
assert(False)
|
||||||
|
|
||||||
|
|
||||||
|
def default_result_handler(self, result):
|
||||||
|
pprint(result)
|
||||||
|
|
||||||
|
# Public Methods
|
||||||
|
def set(self, key, val, callback = default_result_handler):
|
||||||
|
self.buf += _buildResp("set", key, val)
|
||||||
|
self.callbacks.append(callback)
|
||||||
|
|
||||||
|
def lpush(self, key, val, callback = default_result_handler):
|
||||||
|
self.buf += _buildResp("lpush", key, val)
|
||||||
|
self.callbacks.append(callback)
|
||||||
|
|
||||||
|
def delete(self, key, callback = default_result_handler):
|
||||||
|
self.buf += _buildResp("del", key)
|
||||||
|
self.callbacks.append(callback)
|
||||||
|
|
||||||
|
def scan(self, iter, match=None, count=None, callback = default_result_handler):
|
||||||
|
args = ["scan", str(iter)]
|
||||||
|
if match != None:
|
||||||
|
args.append("MATCH")
|
||||||
|
args.append(match)
|
||||||
|
if count != None:
|
||||||
|
args.append("COUNT")
|
||||||
|
args.append(str(count))
|
||||||
|
self.buf += _buildResp(*args)
|
||||||
|
self.callbacks.append(callback)
|
||||||
|
|
||||||
|
def get(self, key, callback = None):
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def getrandomkey():
|
||||||
|
return str(random.randrange(0, numkeys))
|
||||||
|
|
||||||
|
def handle_lpush_response(c, resp):
|
||||||
|
global ops
|
||||||
|
if resp != None:
|
||||||
|
ops = ops + 1
|
||||||
|
assert(resp[0] == ord(':'))
|
||||||
|
c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response)
|
||||||
|
|
||||||
|
def handle_set_response(c, resp):
|
||||||
|
global ops
|
||||||
|
if resp != None:
|
||||||
|
ops = ops + 1
|
||||||
|
assert(resp[0] == ord('+'))
|
||||||
|
c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response)
|
||||||
|
|
||||||
|
def handle_del_response(c, resp):
|
||||||
|
global ops
|
||||||
|
if resp != None:
|
||||||
|
ops = ops + 1
|
||||||
|
c.delete("list_" + getrandomkey(), handle_del_response)
|
||||||
|
|
||||||
|
def scan_callback(c, resp):
|
||||||
|
global ops
|
||||||
|
nextstart = int(resp[0])
|
||||||
|
c.scan(nextstart, count=500, callback=scan_callback)
|
||||||
|
ops = ops+1
|
||||||
|
|
||||||
|
def stats_thread():
|
||||||
|
global ops
|
||||||
|
global g_exit
|
||||||
|
while not g_exit:
|
||||||
|
time.sleep(1)
|
||||||
|
print("Ops per second: " + str(ops))
|
||||||
|
ops = 0
|
||||||
|
|
||||||
|
def main():
|
||||||
|
global g_exit
|
||||||
|
clients = []
|
||||||
|
|
||||||
|
for i in range(numclients):
|
||||||
|
clients.append(Client('127.0.0.1', 6379))
|
||||||
|
for i in range (10):
|
||||||
|
handle_lpush_response(clients[-1], None)
|
||||||
|
#handle_set_response(clients[-1], None)
|
||||||
|
|
||||||
|
scan_client = Client('127.0.0.1', 6379)
|
||||||
|
scan_client.scan(0, count=500, callback=scan_callback)
|
||||||
|
|
||||||
|
del_client = Client('127.0.0.1', 6379)
|
||||||
|
handle_del_response(del_client, None)
|
||||||
|
|
||||||
|
threading.Thread(target=stats_thread).start()
|
||||||
|
asyncore.loop()
|
||||||
|
g_exit = True
|
||||||
|
sys.exit(0)
|
||||||
|
print("DONE")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
@ -49,6 +49,11 @@ void AsyncWorkQueue::WorkerThreadMain()
|
|||||||
}
|
}
|
||||||
|
|
||||||
listRelease(vars.clients_pending_asyncwrite);
|
listRelease(vars.clients_pending_asyncwrite);
|
||||||
|
|
||||||
|
std::unique_lock<fastlock> lockf(serverTL->lockPendingWrite);
|
||||||
|
serverTL->vecclientsProcess.clear();
|
||||||
|
serverTL->clients_pending_write.clear();
|
||||||
|
std::atomic_thread_fence(std::memory_order_seq_cst);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool AsyncWorkQueue::removeClientAsyncWrites(client *c)
|
bool AsyncWorkQueue::removeClientAsyncWrites(client *c)
|
||||||
|
@ -55,7 +55,9 @@ ifneq ($(strip $(SANITIZE)),)
|
|||||||
USEASM=false
|
USEASM=false
|
||||||
endif
|
endif
|
||||||
|
|
||||||
|
ifeq ($(CHECKED),true)
|
||||||
|
CXXFLAGS+= -DCHECKED_BUILD
|
||||||
|
endif
|
||||||
|
|
||||||
# Do we use our assembly spinlock? X64 only
|
# Do we use our assembly spinlock? X64 only
|
||||||
ifeq ($(uname_S),Linux)
|
ifeq ($(uname_S),Linux)
|
||||||
|
11
src/cron.cpp
11
src/cron.cpp
@ -31,7 +31,8 @@ void cronCommand(client *c)
|
|||||||
if (getLongLongFromObjectOrReply(c, c->argv[ARG_EXPIRE], &interval, "missing expire time") != C_OK)
|
if (getLongLongFromObjectOrReply(c, c->argv[ARG_EXPIRE], &interval, "missing expire time") != C_OK)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
long long base = g_pserver->mstime;
|
long long base;
|
||||||
|
__atomic_load(&g_pserver->mstime, &base, __ATOMIC_ACQUIRE);
|
||||||
if (getLongLongFromObject(c->argv[ARG_EXPIRE+1], &base) == C_OK) {
|
if (getLongLongFromObject(c->argv[ARG_EXPIRE+1], &base) == C_OK) {
|
||||||
arg_offset++;
|
arg_offset++;
|
||||||
std::swap(base, interval);
|
std::swap(base, interval);
|
||||||
@ -120,18 +121,20 @@ void executeCronJobExpireHook(const char *key, robj *o)
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
job->startTime += job->interval;
|
job->startTime += job->interval;
|
||||||
if (job->startTime < (uint64_t)g_pserver->mstime)
|
mstime_t mstime;
|
||||||
|
__atomic_load(&g_pserver->mstime, &mstime, __ATOMIC_ACQUIRE);
|
||||||
|
if (job->startTime < (uint64_t)mstime)
|
||||||
{
|
{
|
||||||
// If we are more than one interval in the past then fast forward to
|
// If we are more than one interval in the past then fast forward to
|
||||||
// the first interval still in the future. If startTime wasn't zero align
|
// the first interval still in the future. If startTime wasn't zero align
|
||||||
// this to the original startTime, if it was zero align to now
|
// this to the original startTime, if it was zero align to now
|
||||||
if (job->startTime == job->interval)
|
if (job->startTime == job->interval)
|
||||||
{ // startTime was 0
|
{ // startTime was 0
|
||||||
job->startTime = g_pserver->mstime + job->interval;
|
job->startTime = mstime + job->interval;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto delta = g_pserver->mstime - job->startTime;
|
auto delta = mstime - job->startTime;
|
||||||
auto multiple = (delta / job->interval)+1;
|
auto multiple = (delta / job->interval)+1;
|
||||||
job->startTime += job->interval * multiple;
|
job->startTime += job->interval * multiple;
|
||||||
}
|
}
|
||||||
|
16
src/db.cpp
16
src/db.cpp
@ -1713,7 +1713,8 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when)
|
|||||||
|
|
||||||
/* Update TTL stats (exponential moving average) */
|
/* Update TTL stats (exponential moving average) */
|
||||||
/* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */
|
/* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */
|
||||||
long long now = g_pserver->mstime;
|
mstime_t now;
|
||||||
|
__atomic_load(&g_pserver->mstime, &now, __ATOMIC_ACQUIRE);
|
||||||
db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed
|
db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed
|
||||||
if (db->expireSize() == 0)
|
if (db->expireSize() == 0)
|
||||||
db->avg_ttl = 0;
|
db->avg_ttl = 0;
|
||||||
@ -1894,7 +1895,7 @@ int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key) {
|
|||||||
* open object in a next call, if the next call will see the key expired,
|
* open object in a next call, if the next call will see the key expired,
|
||||||
* while the first did not. */
|
* while the first did not. */
|
||||||
else if (serverTL->fixed_time_expire > 0) {
|
else if (serverTL->fixed_time_expire > 0) {
|
||||||
now = g_pserver->mstime;
|
__atomic_load(&g_pserver->mstime, &now, __ATOMIC_ACQUIRE);
|
||||||
}
|
}
|
||||||
/* For the other cases, we want to use the most fresh time we have. */
|
/* For the other cases, we want to use the most fresh time we have. */
|
||||||
else {
|
else {
|
||||||
@ -2588,6 +2589,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
|
|||||||
{
|
{
|
||||||
serverAssert(sdsKey != nullptr);
|
serverAssert(sdsKey != nullptr);
|
||||||
serverAssert(FImplies(*pde != nullptr, dictGetVal(*pde) != nullptr)); // early versions set a NULL object, this is no longer valid
|
serverAssert(FImplies(*pde != nullptr, dictGetVal(*pde) != nullptr)); // early versions set a NULL object, this is no longer valid
|
||||||
|
serverAssert(m_refCount == 0);
|
||||||
std::unique_lock<fastlock> ul(g_expireLock);
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
|
|
||||||
// First see if the key can be obtained from a snapshot
|
// First see if the key can be obtained from a snapshot
|
||||||
@ -2781,7 +2783,7 @@ redisDbPersistentData::~redisDbPersistentData()
|
|||||||
if (m_spdbSnapshotHOLDER != nullptr)
|
if (m_spdbSnapshotHOLDER != nullptr)
|
||||||
endSnapshot(m_spdbSnapshotHOLDER.get());
|
endSnapshot(m_spdbSnapshotHOLDER.get());
|
||||||
|
|
||||||
//serverAssert(m_pdbSnapshot == nullptr);
|
serverAssert(m_pdbSnapshot == nullptr);
|
||||||
serverAssert(m_refCount == 0);
|
serverAssert(m_refCount == 0);
|
||||||
//serverAssert(m_pdict->iterators == 0);
|
//serverAssert(m_pdict->iterators == 0);
|
||||||
serverAssert(m_pdictTombstone == nullptr || m_pdictTombstone->iterators == 0);
|
serverAssert(m_pdictTombstone == nullptr || m_pdictTombstone->iterators == 0);
|
||||||
@ -2853,16 +2855,16 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
redisDbPersistentData::redisDbPersistentData() {
|
||||||
|
m_dictChanged = dictCreate(&dictChangeDescType, nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
void redisDbPersistentData::trackChanges(bool fBulk, size_t sizeHint)
|
void redisDbPersistentData::trackChanges(bool fBulk, size_t sizeHint)
|
||||||
{
|
{
|
||||||
m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed);
|
m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed);
|
||||||
if (fBulk)
|
if (fBulk)
|
||||||
m_fAllChanged.fetch_add(1, std::memory_order_acq_rel);
|
m_fAllChanged.fetch_add(1, std::memory_order_acq_rel);
|
||||||
|
|
||||||
if (m_dictChanged == nullptr) {
|
|
||||||
m_dictChanged = dictCreate(&dictChangeDescType, nullptr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sizeHint > 0)
|
if (sizeHint > 0)
|
||||||
dictExpand(m_dictChanged, sizeHint, false);
|
dictExpand(m_dictChanged, sizeHint, false);
|
||||||
}
|
}
|
||||||
|
56
src/dict.cpp
56
src/dict.cpp
@ -129,6 +129,7 @@ int _dictInit(dict *d, dictType *type,
|
|||||||
d->rehashidx = -1;
|
d->rehashidx = -1;
|
||||||
d->iterators = 0;
|
d->iterators = 0;
|
||||||
d->asyncdata = nullptr;
|
d->asyncdata = nullptr;
|
||||||
|
d->refcount = 1;
|
||||||
return DICT_OK;
|
return DICT_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -195,7 +196,7 @@ int dictMerge(dict *dst, dict *src)
|
|||||||
}
|
}
|
||||||
|
|
||||||
size_t expectedSize = dictSize(src) + dictSize(dst);
|
size_t expectedSize = dictSize(src) + dictSize(dst);
|
||||||
if (dictSize(src) > dictSize(dst))
|
if (dictSize(src) > dictSize(dst) && src->asyncdata == nullptr && dst->asyncdata == nullptr)
|
||||||
{
|
{
|
||||||
std::swap(*dst, *src);
|
std::swap(*dst, *src);
|
||||||
std::swap(dst->iterators, src->iterators);
|
std::swap(dst->iterators, src->iterators);
|
||||||
@ -369,8 +370,14 @@ int dictRehash(dict *d, int n) {
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dictAsyncRehashCtl::dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) {
|
||||||
|
queue.reserve(c_targetQueueSize);
|
||||||
|
__atomic_fetch_add(&d->refcount, 1, __ATOMIC_RELEASE);
|
||||||
|
}
|
||||||
|
|
||||||
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
|
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
|
||||||
if (!dictIsRehashing(d)) return 0;
|
assert(d->type->asyncfree != nullptr);
|
||||||
|
if (!dictIsRehashing(d) || d->iterators != 0) return nullptr;
|
||||||
|
|
||||||
d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata);
|
d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata);
|
||||||
|
|
||||||
@ -454,7 +461,7 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fUnlinked && !ctl->release) {
|
if (fUnlinked && !ctl->abondon) {
|
||||||
if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash
|
if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash
|
||||||
for (auto &wi : ctl->queue) {
|
for (auto &wi : ctl->queue) {
|
||||||
// We need to remove it from the source hash table, and store it in the dest.
|
// We need to remove it from the source hash table, and store it in the dest.
|
||||||
@ -487,23 +494,10 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (fFree) {
|
if (fFree) {
|
||||||
while (ctl->deGCList != nullptr) {
|
d->type->asyncfree(ctl);
|
||||||
auto next = ctl->deGCList->next;
|
|
||||||
dictFreeKey(d, ctl->deGCList);
|
|
||||||
dictFreeVal(d, ctl->deGCList);
|
|
||||||
zfree(ctl->deGCList);
|
|
||||||
ctl->deGCList = next;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Was the dictionary free'd while we were in flight?
|
// Remove our reference
|
||||||
if (ctl->release) {
|
dictRelease(d);
|
||||||
if (d->asyncdata != nullptr)
|
|
||||||
d->asyncdata->release = true;
|
|
||||||
else
|
|
||||||
dictRelease(d);
|
|
||||||
}
|
|
||||||
|
|
||||||
delete ctl;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -514,6 +508,16 @@ long long timeInMilliseconds(void) {
|
|||||||
return (((long long)tv.tv_sec)*1000)+(tv.tv_usec/1000);
|
return (((long long)tv.tv_sec)*1000)+(tv.tv_usec/1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dictAsyncRehashCtl::~dictAsyncRehashCtl() {
|
||||||
|
while (deGCList != nullptr) {
|
||||||
|
auto next = deGCList->next;
|
||||||
|
dictFreeKey(dict, deGCList);
|
||||||
|
dictFreeVal(dict, deGCList);
|
||||||
|
zfree(deGCList);
|
||||||
|
deGCList = next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Rehash in ms+"delta" milliseconds. The value of "delta" is larger
|
/* Rehash in ms+"delta" milliseconds. The value of "delta" is larger
|
||||||
* than 0, and is smaller than 1 in most cases. The exact upper bound
|
* than 0, and is smaller than 1 in most cases. The exact upper bound
|
||||||
* depends on the running time of dictRehash(d,100).*/
|
* depends on the running time of dictRehash(d,100).*/
|
||||||
@ -537,7 +541,7 @@ int dictRehashMilliseconds(dict *d, int ms) {
|
|||||||
* dictionary so that the hash table automatically migrates from H1 to H2
|
* dictionary so that the hash table automatically migrates from H1 to H2
|
||||||
* while it is actively used. */
|
* while it is actively used. */
|
||||||
static void _dictRehashStep(dict *d) {
|
static void _dictRehashStep(dict *d) {
|
||||||
unsigned long iterators;
|
unsigned iterators;
|
||||||
__atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED);
|
__atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED);
|
||||||
if (iterators == 0) dictRehash(d,2);
|
if (iterators == 0) dictRehash(d,2);
|
||||||
}
|
}
|
||||||
@ -766,13 +770,11 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
|
|||||||
/* Clear & Release the hash table */
|
/* Clear & Release the hash table */
|
||||||
void dictRelease(dict *d)
|
void dictRelease(dict *d)
|
||||||
{
|
{
|
||||||
if (d->asyncdata) {
|
if (__atomic_sub_fetch(&d->refcount, 1, __ATOMIC_ACQ_REL) == 0) {
|
||||||
d->asyncdata->release = true;
|
_dictClear(d,&d->ht[0],NULL);
|
||||||
return;
|
_dictClear(d,&d->ht[1],NULL);
|
||||||
|
zfree(d);
|
||||||
}
|
}
|
||||||
_dictClear(d,&d->ht[0],NULL);
|
|
||||||
_dictClear(d,&d->ht[1],NULL);
|
|
||||||
zfree(d);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dictEntry *dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **pht, bool fShallowCompare)
|
dictEntry *dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **pht, bool fShallowCompare)
|
||||||
@ -1460,7 +1462,7 @@ void dictGetStats(char *buf, size_t bufsize, dict *d) {
|
|||||||
|
|
||||||
void dictForceRehash(dict *d)
|
void dictForceRehash(dict *d)
|
||||||
{
|
{
|
||||||
unsigned long iterators;
|
unsigned iterators;
|
||||||
__atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED);
|
__atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED);
|
||||||
while (iterators == 0 && dictIsRehashing(d)) _dictRehashStep(d);
|
while (iterators == 0 && dictIsRehashing(d)) _dictRehashStep(d);
|
||||||
}
|
}
|
||||||
|
12
src/dict.h
12
src/dict.h
@ -53,6 +53,7 @@ extern "C" {
|
|||||||
|
|
||||||
/* Unused arguments generate annoying warnings... */
|
/* Unused arguments generate annoying warnings... */
|
||||||
#define DICT_NOTUSED(V) ((void) V)
|
#define DICT_NOTUSED(V) ((void) V)
|
||||||
|
struct dictAsyncRehashCtl;
|
||||||
|
|
||||||
typedef struct dictEntry {
|
typedef struct dictEntry {
|
||||||
void *key;
|
void *key;
|
||||||
@ -72,6 +73,7 @@ typedef struct dictType {
|
|||||||
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
|
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
|
||||||
void (*keyDestructor)(void *privdata, void *key);
|
void (*keyDestructor)(void *privdata, void *key);
|
||||||
void (*valDestructor)(void *privdata, void *obj);
|
void (*valDestructor)(void *privdata, void *obj);
|
||||||
|
void (*asyncfree)(dictAsyncRehashCtl *);
|
||||||
} dictType;
|
} dictType;
|
||||||
|
|
||||||
/* This is our hash table structure. Every dictionary has two of this as we
|
/* This is our hash table structure. Every dictionary has two of this as we
|
||||||
@ -98,13 +100,12 @@ struct dictAsyncRehashCtl {
|
|||||||
struct dict *dict = nullptr;
|
struct dict *dict = nullptr;
|
||||||
std::vector<workItem> queue;
|
std::vector<workItem> queue;
|
||||||
size_t hashIdx = 0;
|
size_t hashIdx = 0;
|
||||||
bool release = false;
|
|
||||||
dictAsyncRehashCtl *next = nullptr;
|
dictAsyncRehashCtl *next = nullptr;
|
||||||
std::atomic<bool> done { false };
|
std::atomic<bool> done { false };
|
||||||
|
std::atomic<bool> abondon { false };
|
||||||
|
|
||||||
dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) {
|
dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next);
|
||||||
queue.reserve(c_targetQueueSize);
|
~dictAsyncRehashCtl();
|
||||||
}
|
|
||||||
};
|
};
|
||||||
#else
|
#else
|
||||||
struct dictAsyncRehashCtl;
|
struct dictAsyncRehashCtl;
|
||||||
@ -115,7 +116,8 @@ typedef struct dict {
|
|||||||
void *privdata;
|
void *privdata;
|
||||||
dictht ht[2];
|
dictht ht[2];
|
||||||
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
|
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
|
||||||
unsigned long iterators; /* number of iterators currently running */
|
unsigned iterators; /* number of iterators currently running */
|
||||||
|
unsigned refcount;
|
||||||
dictAsyncRehashCtl *asyncdata;
|
dictAsyncRehashCtl *asyncdata;
|
||||||
} dict;
|
} dict;
|
||||||
|
|
||||||
|
@ -433,7 +433,7 @@ extern "C" void fastlock_unlock(struct fastlock *lock)
|
|||||||
serverAssert(pidT >= 0); // unlock after free
|
serverAssert(pidT >= 0); // unlock after free
|
||||||
int t = -1;
|
int t = -1;
|
||||||
__atomic_store(&lock->m_pidOwner, &t, __ATOMIC_RELEASE);
|
__atomic_store(&lock->m_pidOwner, &t, __ATOMIC_RELEASE);
|
||||||
std::atomic_thread_fence(std::memory_order_release);
|
std::atomic_thread_fence(std::memory_order_acq_rel);
|
||||||
ANNOTATE_RWLOCK_RELEASED(lock, true);
|
ANNOTATE_RWLOCK_RELEASED(lock, true);
|
||||||
uint16_t activeNew = __atomic_add_fetch(&lock->m_ticket.m_active, 1, __ATOMIC_RELEASE); // on x86 the atomic is not required here, but ASM handles that case
|
uint16_t activeNew = __atomic_add_fetch(&lock->m_ticket.m_active, 1, __ATOMIC_RELEASE); // on x86 the atomic is not required here, but ASM handles that case
|
||||||
#ifdef __linux__
|
#ifdef __linux__
|
||||||
|
6
src/gc.h
6
src/gc.h
@ -29,6 +29,11 @@ class GarbageCollector
|
|||||||
};
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
~GarbageCollector() {
|
||||||
|
// Silence TSAN errors
|
||||||
|
m_lock.lock();
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t startEpoch()
|
uint64_t startEpoch()
|
||||||
{
|
{
|
||||||
std::unique_lock<fastlock> lock(m_lock);
|
std::unique_lock<fastlock> lock(m_lock);
|
||||||
@ -41,6 +46,7 @@ public:
|
|||||||
{
|
{
|
||||||
std::unique_lock<fastlock> lock(m_lock);
|
std::unique_lock<fastlock> lock(m_lock);
|
||||||
m_vecepochs.clear();
|
m_vecepochs.clear();
|
||||||
|
m_setepochOutstanding.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
void endEpoch(uint64_t epoch, bool fNoFree = false)
|
void endEpoch(uint64_t epoch, bool fNoFree = false)
|
||||||
|
19
src/rdb.cpp
19
src/rdb.cpp
@ -2370,21 +2370,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class EvictionPolicyCleanup
|
|
||||||
{
|
|
||||||
int oldpolicy;
|
|
||||||
|
|
||||||
public:
|
|
||||||
EvictionPolicyCleanup() {
|
|
||||||
oldpolicy = g_pserver->maxmemory_policy;
|
|
||||||
g_pserver->maxmemory_policy = MAXMEMORY_ALLKEYS_RANDOM;
|
|
||||||
}
|
|
||||||
|
|
||||||
~EvictionPolicyCleanup() {
|
|
||||||
g_pserver->maxmemory_policy = oldpolicy;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
|
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
|
||||||
* otherwise C_ERR is returned and 'errno' is set accordingly. */
|
* otherwise C_ERR is returned and 'errno' is set accordingly. */
|
||||||
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||||
@ -2401,10 +2386,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
|||||||
sds key = nullptr;
|
sds key = nullptr;
|
||||||
bool fLastKeyExpired = false;
|
bool fLastKeyExpired = false;
|
||||||
|
|
||||||
// If we're running flash we may evict during load. We want a fast eviction function
|
|
||||||
// because there isn't any difference in use times between keys anyways
|
|
||||||
EvictionPolicyCleanup ecleanup;
|
|
||||||
|
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||||
{
|
{
|
||||||
g_pserver->db[idb]->trackChanges(true, 1024);
|
g_pserver->db[idb]->trackChanges(true, 1024);
|
||||||
|
@ -195,7 +195,7 @@ void sdsfree(const char *s) {
|
|||||||
if ((flags & SDS_TYPE_MASK) == SDS_TYPE_REFCOUNTED)
|
if ((flags & SDS_TYPE_MASK) == SDS_TYPE_REFCOUNTED)
|
||||||
{
|
{
|
||||||
SDS_HDR_VAR_REFCOUNTED(s);
|
SDS_HDR_VAR_REFCOUNTED(s);
|
||||||
if (__atomic_fetch_sub(&sh->refcount, 1, __ATOMIC_RELAXED) > 1)
|
if (__atomic_fetch_sub(&sh->refcount, 1, __ATOMIC_ACQ_REL) > 1)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
s_free((char*)s-sdsHdrSize(s[-1]));
|
s_free((char*)s-sdsHdrSize(s[-1]));
|
||||||
|
@ -1359,6 +1359,8 @@ uint64_t dictEncObjHash(const void *key) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dictGCAsyncFree(dictAsyncRehashCtl *async);
|
||||||
|
|
||||||
/* Generic hash table type where keys are Redis Objects, Values
|
/* Generic hash table type where keys are Redis Objects, Values
|
||||||
* dummy pointers. */
|
* dummy pointers. */
|
||||||
dictType objectKeyPointerValueDictType = {
|
dictType objectKeyPointerValueDictType = {
|
||||||
@ -1407,8 +1409,9 @@ dictType dbDictType = {
|
|||||||
NULL, /* key dup */
|
NULL, /* key dup */
|
||||||
NULL, /* val dup */
|
NULL, /* val dup */
|
||||||
dictSdsKeyCompare, /* key compare */
|
dictSdsKeyCompare, /* key compare */
|
||||||
dictDbKeyDestructor, /* key destructor */
|
dictDbKeyDestructor, /* key destructor */
|
||||||
dictObjectDestructor /* val destructor */
|
dictObjectDestructor, /* val destructor */
|
||||||
|
dictGCAsyncFree /* async free destructor */
|
||||||
};
|
};
|
||||||
|
|
||||||
/* db->pdict, keys are sds strings, vals are Redis objects. */
|
/* db->pdict, keys are sds strings, vals are Redis objects. */
|
||||||
@ -2424,18 +2427,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
run_with_period(100) {
|
|
||||||
bool fAnySnapshots = false;
|
|
||||||
for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb)
|
|
||||||
fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot();
|
|
||||||
if (fAnySnapshots)
|
|
||||||
{
|
|
||||||
g_pserver->asyncworkqueue->AddWorkFunction([]{
|
|
||||||
g_pserver->db[0]->consolidate_snapshot();
|
|
||||||
}, true /*HiPri*/);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Fire the cron loop modules event. */
|
/* Fire the cron loop modules event. */
|
||||||
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,g_pserver->hz};
|
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,g_pserver->hz};
|
||||||
moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
|
moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
|
||||||
@ -3926,7 +3917,7 @@ void call(client *c, int flags) {
|
|||||||
dirty = g_pserver->dirty;
|
dirty = g_pserver->dirty;
|
||||||
incrementMvccTstamp();
|
incrementMvccTstamp();
|
||||||
__atomic_load(&g_pserver->ustime, &start, __ATOMIC_SEQ_CST);
|
__atomic_load(&g_pserver->ustime, &start, __ATOMIC_SEQ_CST);
|
||||||
start = g_pserver->ustime;
|
|
||||||
try {
|
try {
|
||||||
c->cmd->proc(c);
|
c->cmd->proc(c);
|
||||||
} catch (robj_roptr o) {
|
} catch (robj_roptr o) {
|
||||||
@ -5403,10 +5394,12 @@ sds genRedisInfoString(const char *section) {
|
|||||||
vkeys = g_pserver->db[j]->expireSize();
|
vkeys = g_pserver->db[j]->expireSize();
|
||||||
|
|
||||||
// Adjust TTL by the current time
|
// Adjust TTL by the current time
|
||||||
g_pserver->db[j]->avg_ttl -= (g_pserver->mstime - g_pserver->db[j]->last_expire_set);
|
mstime_t mstime;
|
||||||
|
__atomic_load(&g_pserver->mstime, &mstime, __ATOMIC_ACQUIRE);
|
||||||
|
g_pserver->db[j]->avg_ttl -= (mstime - g_pserver->db[j]->last_expire_set);
|
||||||
if (g_pserver->db[j]->avg_ttl < 0)
|
if (g_pserver->db[j]->avg_ttl < 0)
|
||||||
g_pserver->db[j]->avg_ttl = 0;
|
g_pserver->db[j]->avg_ttl = 0;
|
||||||
g_pserver->db[j]->last_expire_set = g_pserver->mstime;
|
g_pserver->db[j]->last_expire_set = mstime;
|
||||||
|
|
||||||
if (keys || vkeys) {
|
if (keys || vkeys) {
|
||||||
info = sdscatprintf(info,
|
info = sdscatprintf(info,
|
||||||
|
@ -1045,9 +1045,9 @@ class redisDbPersistentData
|
|||||||
friend class redisDbPersistentDataSnapshot;
|
friend class redisDbPersistentDataSnapshot;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
redisDbPersistentData();
|
||||||
virtual ~redisDbPersistentData();
|
virtual ~redisDbPersistentData();
|
||||||
|
|
||||||
redisDbPersistentData() = default;
|
|
||||||
redisDbPersistentData(redisDbPersistentData &&) = default;
|
redisDbPersistentData(redisDbPersistentData &&) = default;
|
||||||
|
|
||||||
size_t slots() const { return dictSlots(m_pdict); }
|
size_t slots() const { return dictSlots(m_pdict); }
|
||||||
@ -1130,8 +1130,6 @@ public:
|
|||||||
void endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot);
|
void endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot);
|
||||||
void restoreSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
|
void restoreSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
|
||||||
|
|
||||||
void consolidate_snapshot();
|
|
||||||
|
|
||||||
bool FStorageProvider() { return m_spstorage != nullptr; }
|
bool FStorageProvider() { return m_spstorage != nullptr; }
|
||||||
bool removeCachedValue(const char *key);
|
bool removeCachedValue(const char *key);
|
||||||
void removeAllCachedValues();
|
void removeAllCachedValues();
|
||||||
@ -1187,7 +1185,6 @@ private:
|
|||||||
|
|
||||||
protected:
|
protected:
|
||||||
static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot);
|
static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot);
|
||||||
void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce);
|
|
||||||
bool freeTombstoneObjects(int depth);
|
bool freeTombstoneObjects(int depth);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
@ -1289,7 +1286,6 @@ struct redisDb : public redisDbPersistentDataSnapshot
|
|||||||
using redisDbPersistentData::createSnapshot;
|
using redisDbPersistentData::createSnapshot;
|
||||||
using redisDbPersistentData::endSnapshot;
|
using redisDbPersistentData::endSnapshot;
|
||||||
using redisDbPersistentData::restoreSnapshot;
|
using redisDbPersistentData::restoreSnapshot;
|
||||||
using redisDbPersistentData::consolidate_snapshot;
|
|
||||||
using redisDbPersistentData::removeAllCachedValues;
|
using redisDbPersistentData::removeAllCachedValues;
|
||||||
using redisDbPersistentData::dictUnsafeKeyOnly;
|
using redisDbPersistentData::dictUnsafeKeyOnly;
|
||||||
using redisDbPersistentData::resortExpire;
|
using redisDbPersistentData::resortExpire;
|
||||||
|
191
src/snapshot.cpp
191
src/snapshot.cpp
@ -26,6 +26,17 @@ public:
|
|||||||
std::vector<dictEntry*> vecde;
|
std::vector<dictEntry*> vecde;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void discontinueAsyncRehash(dict *d) {
|
||||||
|
if (d->asyncdata != nullptr) {
|
||||||
|
auto adata = d->asyncdata;
|
||||||
|
while (adata != nullptr) {
|
||||||
|
adata->abondon = true;
|
||||||
|
adata = adata->next;
|
||||||
|
}
|
||||||
|
d->rehashidx = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional)
|
const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional)
|
||||||
{
|
{
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
@ -56,34 +67,6 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m_pdbSnapshot != nullptr && m_pdbSnapshot == m_pdbSnapshotASYNC && m_spdbSnapshotHOLDER->m_refCount == 1 && dictSize(m_pdictTombstone) < c_elementsSmallLimit)
|
|
||||||
{
|
|
||||||
serverLog(LL_WARNING, "Reusing old snapshot");
|
|
||||||
// is there an existing snapshot only owned by us?
|
|
||||||
|
|
||||||
dictIterator *di = dictGetIterator(m_pdictTombstone);
|
|
||||||
dictEntry *de;
|
|
||||||
while ((de = dictNext(di)) != nullptr)
|
|
||||||
{
|
|
||||||
if (dictDelete(m_pdbSnapshot->m_pdict, dictGetKey(de)) != DICT_OK)
|
|
||||||
dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), nullptr);
|
|
||||||
}
|
|
||||||
dictReleaseIterator(di);
|
|
||||||
|
|
||||||
dictForceRehash(m_spdbSnapshotHOLDER->m_pdictTombstone);
|
|
||||||
dictMerge(m_pdbSnapshot->m_pdict, m_pdict);
|
|
||||||
dictEmpty(m_pdictTombstone, nullptr);
|
|
||||||
{
|
|
||||||
std::unique_lock<fastlock> ul(g_expireLock);
|
|
||||||
(*m_spdbSnapshotHOLDER->m_setexpire) = *m_setexpire;
|
|
||||||
}
|
|
||||||
|
|
||||||
m_pdbSnapshotASYNC = nullptr;
|
|
||||||
serverAssert(m_pdbSnapshot->m_pdict->iterators == 1);
|
|
||||||
serverAssert(m_spdbSnapshotHOLDER->m_refCount == 1);
|
|
||||||
return m_pdbSnapshot;
|
|
||||||
}
|
|
||||||
|
|
||||||
// See if we have too many levels and can bail out of this to reduce load
|
// See if we have too many levels and can bail out of this to reduce load
|
||||||
if (fOptional && (levels >= 6))
|
if (fOptional && (levels >= 6))
|
||||||
{
|
{
|
||||||
@ -95,14 +78,8 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
|||||||
|
|
||||||
// We can't have async rehash modifying these. Setting the asyncdata list to null
|
// We can't have async rehash modifying these. Setting the asyncdata list to null
|
||||||
// will cause us to throw away the async work rather than modify the tables in flight
|
// will cause us to throw away the async work rather than modify the tables in flight
|
||||||
if (m_pdict->asyncdata != nullptr) {
|
discontinueAsyncRehash(m_pdict);
|
||||||
m_pdict->asyncdata = nullptr;
|
discontinueAsyncRehash(m_pdictTombstone);
|
||||||
m_pdict->rehashidx = 0;
|
|
||||||
}
|
|
||||||
if (m_pdictTombstone->asyncdata != nullptr) {
|
|
||||||
m_pdictTombstone->rehashidx = 0;
|
|
||||||
m_pdictTombstone->asyncdata = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
spdb->m_fAllChanged = false;
|
spdb->m_fAllChanged = false;
|
||||||
spdb->m_fTrackingChanges = 0;
|
spdb->m_fTrackingChanges = 0;
|
||||||
@ -125,7 +102,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (dictIsRehashing(spdb->m_pdict) || dictIsRehashing(spdb->m_pdictTombstone)) {
|
if (dictIsRehashing(spdb->m_pdict) || dictIsRehashing(spdb->m_pdictTombstone)) {
|
||||||
serverLog(LL_NOTICE, "NOTICE: Suboptimal snapshot");
|
serverLog(LL_VERBOSE, "NOTICE: Suboptimal snapshot");
|
||||||
}
|
}
|
||||||
|
|
||||||
m_pdict = dictCreate(&dbDictType,this);
|
m_pdict = dictCreate(&dbDictType,this);
|
||||||
@ -152,6 +129,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
|||||||
m_pdbSnapshotASYNC = nullptr;
|
m_pdbSnapshotASYNC = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::atomic_thread_fence(std::memory_order_seq_cst);
|
||||||
return m_pdbSnapshot;
|
return m_pdbSnapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -189,7 +167,7 @@ void redisDbPersistentData::recursiveFreeSnapshots(redisDbPersistentDataSnapshot
|
|||||||
|
|
||||||
//psnapshot->m_pdict->iterators--;
|
//psnapshot->m_pdict->iterators--;
|
||||||
psnapshot->m_spdbSnapshotHOLDER.release();
|
psnapshot->m_spdbSnapshotHOLDER.release();
|
||||||
//psnapshot->m_pdbSnapshot = nullptr;
|
psnapshot->m_pdbSnapshot = nullptr;
|
||||||
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::unique_ptr<redisDbPersistentDataSnapshot>(psnapshot));
|
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::unique_ptr<redisDbPersistentDataSnapshot>(psnapshot));
|
||||||
serverLog(LL_VERBOSE, "Garbage collected snapshot");
|
serverLog(LL_VERBOSE, "Garbage collected snapshot");
|
||||||
}
|
}
|
||||||
@ -275,7 +253,6 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot
|
|||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->consolidate_children(this, true);
|
|
||||||
|
|
||||||
// Final Cleanup
|
// Final Cleanup
|
||||||
aeAcquireLock(); latencyStartMonitor(latency);
|
aeAcquireLock(); latencyStartMonitor(latency);
|
||||||
@ -347,6 +324,12 @@ bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
|
|||||||
|
|
||||||
dictForceRehash(dictTombstoneNew);
|
dictForceRehash(dictTombstoneNew);
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
|
if (m_pdbSnapshot->m_pdict->asyncdata != nullptr) {
|
||||||
|
// In this case we use the asyncdata to free us, not our own lazy free
|
||||||
|
for (auto de : splazy->vecde)
|
||||||
|
dictFreeUnlinkedEntry(m_pdbSnapshot->m_pdict, de);
|
||||||
|
splazy->vecde.clear();
|
||||||
|
}
|
||||||
dict *dT = m_pdbSnapshot->m_pdict;
|
dict *dT = m_pdbSnapshot->m_pdict;
|
||||||
splazy->vecdictLazyFree.push_back(m_pdictTombstone);
|
splazy->vecdictLazyFree.push_back(m_pdictTombstone);
|
||||||
__atomic_store(&m_pdictTombstone, &dictTombstoneNew, __ATOMIC_RELEASE);
|
__atomic_store(&m_pdictTombstone, &dictTombstoneNew, __ATOMIC_RELEASE);
|
||||||
@ -421,7 +404,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
|||||||
#ifdef CHECKED_BUILD
|
#ifdef CHECKED_BUILD
|
||||||
serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr);
|
serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr);
|
||||||
#endif
|
#endif
|
||||||
dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), nullptr);
|
dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), dictGetVal(de));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else if (deSnapshot == nullptr)
|
else if (deSnapshot == nullptr)
|
||||||
@ -431,8 +414,14 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delete the object from the source dict, we don't use dictDelete to avoid a second search
|
// Delete the object from the source dict, we don't use dictDelete to avoid a second search
|
||||||
splazy->vecde.push_back(deSnapshot);
|
*dePrev = deSnapshot->next; // Unlink it first
|
||||||
*dePrev = deSnapshot->next;
|
if (deSnapshot != nullptr) {
|
||||||
|
if (m_spdbSnapshotHOLDER->m_pdict->asyncdata != nullptr) {
|
||||||
|
dictFreeUnlinkedEntry(m_spdbSnapshotHOLDER->m_pdict, deSnapshot);
|
||||||
|
} else {
|
||||||
|
splazy->vecde.push_back(deSnapshot);
|
||||||
|
}
|
||||||
|
}
|
||||||
ht->used--;
|
ht->used--;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -454,12 +443,12 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
|||||||
if (m_pdbSnapshot != nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot != nullptr)
|
if (m_pdbSnapshot != nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot != nullptr)
|
||||||
{
|
{
|
||||||
m_pdbSnapshot = m_spdbSnapshotHOLDER->m_pdbSnapshot;
|
m_pdbSnapshot = m_spdbSnapshotHOLDER->m_pdbSnapshot;
|
||||||
m_spdbSnapshotHOLDER->m_pdbSnapshot = nullptr;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
m_pdbSnapshot = nullptr;
|
m_pdbSnapshot = nullptr;
|
||||||
}
|
}
|
||||||
|
m_spdbSnapshotHOLDER->m_pdbSnapshot = nullptr;
|
||||||
|
|
||||||
// Fixup the about to free'd snapshots iterator count so the dtor doesn't complain
|
// Fixup the about to free'd snapshots iterator count so the dtor doesn't complain
|
||||||
if (m_refCount)
|
if (m_refCount)
|
||||||
@ -663,111 +652,23 @@ int redisDbPersistentDataSnapshot::snapshot_depth() const
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void redisDbPersistentData::consolidate_snapshot()
|
|
||||||
{
|
|
||||||
aeAcquireLock();
|
|
||||||
auto psnapshot = (m_pdbSnapshot != nullptr) ? m_spdbSnapshotHOLDER.get() : nullptr;
|
|
||||||
if (psnapshot == nullptr || psnapshot->snapshot_depth() == 0)
|
|
||||||
{
|
|
||||||
aeReleaseLock();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
psnapshot->m_refCount++; // ensure it's not free'd
|
|
||||||
aeReleaseLock();
|
|
||||||
psnapshot->consolidate_children(this, false /* fForce */);
|
|
||||||
aeAcquireLock();
|
|
||||||
endSnapshot(psnapshot);
|
|
||||||
aeReleaseLock();
|
|
||||||
}
|
|
||||||
|
|
||||||
// only call this on the "real" database to consolidate the first child
|
|
||||||
void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce)
|
|
||||||
{
|
|
||||||
std::unique_lock<fastlock> lock(s_lock, std::defer_lock);
|
|
||||||
if (!lock.try_lock())
|
|
||||||
return; // this is a best effort function
|
|
||||||
|
|
||||||
if (!fForce && snapshot_depth() < 2)
|
|
||||||
return;
|
|
||||||
|
|
||||||
auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot());
|
|
||||||
spdb->initialize();
|
|
||||||
dictExpand(spdb->m_pdict, m_pdbSnapshot->size());
|
|
||||||
|
|
||||||
volatile size_t skipped = 0;
|
|
||||||
m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o) {
|
|
||||||
if (o != nullptr || !m_spstorage) {
|
|
||||||
dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast());
|
|
||||||
if (o != nullptr) {
|
|
||||||
incrRefCount(o);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
++skipped;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}, true /*fKeyOnly*/, true /*fCacheOnly*/);
|
|
||||||
spdb->m_spstorage = m_pdbSnapshot->m_spstorage;
|
|
||||||
{
|
|
||||||
std::unique_lock<fastlock> ul(g_expireLock);
|
|
||||||
delete spdb->m_setexpire;
|
|
||||||
spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_pdbSnapshot->m_setexpire);
|
|
||||||
}
|
|
||||||
|
|
||||||
spdb->m_pdict->iterators++;
|
|
||||||
|
|
||||||
if (m_spstorage) {
|
|
||||||
serverAssert(spdb->size() == m_pdbSnapshot->size());
|
|
||||||
} else {
|
|
||||||
serverAssert((spdb->size()+skipped) == m_pdbSnapshot->size());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now wire us in (Acquire the LOCK)
|
|
||||||
AeLocker locker;
|
|
||||||
locker.arm(nullptr);
|
|
||||||
|
|
||||||
int depth = 0;
|
|
||||||
redisDbPersistentDataSnapshot *psnapshotT = pdbPrimary->m_spdbSnapshotHOLDER.get();
|
|
||||||
while (psnapshotT != nullptr)
|
|
||||||
{
|
|
||||||
++depth;
|
|
||||||
if (psnapshotT == this)
|
|
||||||
break;
|
|
||||||
psnapshotT = psnapshotT->m_spdbSnapshotHOLDER.get();
|
|
||||||
}
|
|
||||||
if (psnapshotT != this)
|
|
||||||
{
|
|
||||||
locker.disarm(); // don't run spdb's dtor in the lock
|
|
||||||
return; // we were unlinked and this was a waste of time
|
|
||||||
}
|
|
||||||
|
|
||||||
serverLog(LL_VERBOSE, "cleaned %d snapshots", snapshot_depth()-1);
|
|
||||||
spdb->m_refCount = depth;
|
|
||||||
// Drop our refs from this snapshot and its children
|
|
||||||
psnapshotT = this;
|
|
||||||
std::vector<redisDbPersistentDataSnapshot*> vecT;
|
|
||||||
while ((psnapshotT = psnapshotT->m_spdbSnapshotHOLDER.get()) != nullptr)
|
|
||||||
{
|
|
||||||
vecT.push_back(psnapshotT);
|
|
||||||
}
|
|
||||||
for (auto itr = vecT.rbegin(); itr != vecT.rend(); ++itr)
|
|
||||||
{
|
|
||||||
psnapshotT = *itr;
|
|
||||||
psnapshotT->m_refCount -= (depth-1); // -1 because dispose will sub another
|
|
||||||
gcDisposeSnapshot(psnapshotT);
|
|
||||||
}
|
|
||||||
std::atomic_thread_fence(std::memory_order_seq_cst);
|
|
||||||
m_spdbSnapshotHOLDER.release(); // GC has responsibility for it now
|
|
||||||
m_spdbSnapshotHOLDER = std::move(spdb);
|
|
||||||
const redisDbPersistentDataSnapshot *ptrT = m_spdbSnapshotHOLDER.get();
|
|
||||||
__atomic_store(&m_pdbSnapshot, &ptrT, __ATOMIC_SEQ_CST);
|
|
||||||
locker.disarm(); // ensure we're not locked for any dtors
|
|
||||||
}
|
|
||||||
|
|
||||||
bool redisDbPersistentDataSnapshot::FStale() const
|
bool redisDbPersistentDataSnapshot::FStale() const
|
||||||
{
|
{
|
||||||
// 0.5 seconds considered stale;
|
// 0.5 seconds considered stale;
|
||||||
static const uint64_t msStale = 500;
|
static const uint64_t msStale = 500;
|
||||||
return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= msStale;
|
return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= msStale;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dictGCAsyncFree(dictAsyncRehashCtl *async) {
|
||||||
|
if (async->deGCList != nullptr && serverTL != nullptr && !serverTL->gcEpoch.isReset()) {
|
||||||
|
auto splazy = std::make_unique<LazyFree>();
|
||||||
|
auto *de = async->deGCList;
|
||||||
|
while (de != nullptr) {
|
||||||
|
splazy->vecde.push_back(de);
|
||||||
|
de = de->next;
|
||||||
|
}
|
||||||
|
async->deGCList = nullptr;
|
||||||
|
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy));
|
||||||
|
}
|
||||||
|
delete async;
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user