Merge remote-tracking branch 'origin/keydbpro' into repl_backlog_rework

Former-commit-id: e9bfcef5429c747fd9f26a829fed2281c5ec2b25
This commit is contained in:
VivekSainiEQ 2021-06-16 16:40:06 +00:00
commit 03709d475a
9 changed files with 167 additions and 43 deletions

View File

@ -1,21 +1,17 @@
import keydb
import random
import sched, time
import time
import socket
import asyncore
import threading
import sys
import argparse
from pprint import pprint
# Parameters
numclients = 50
#numkeys = 1000000
numkeys = 100000
# Globals
ops=0
s = sched.scheduler(time.time, time.sleep)
g_exit = False
ops = {}
numclients = 0
numkeys = 0
runtime = 0
clients = []
def _buildResp(*args):
result = "*" + str(len(args)) + "\r\n"
@ -32,6 +28,8 @@ class Client(asyncore.dispatcher):
self.buf = b''
self.inbuf = b''
self.callbacks = list()
self.client_id = 0
self.get_client_id()
def handle_connect(self):
pass
@ -55,6 +53,9 @@ class Client(asyncore.dispatcher):
endrange = self.inbuf[startpos+1:].find(ord('\r')) + 1 + startpos
assert(endrange > 0)
numargs = int(self.inbuf[startpos+1:endrange])
if numargs == -1: # Nil array, used in some returns
startpos = endrange + 2
return startpos, []
assert(numargs > 0)
args = list()
startpos = endrange + 2 # plus 1 gets us to the '\n' and the next gets us to the start char
@ -127,10 +128,18 @@ class Client(asyncore.dispatcher):
self.buf += _buildResp("lpush", key, val)
self.callbacks.append(callback)
def blpop(self, *keys, timeout=0, callback=default_result_handler):
self.buf += _buildResp("blpop", *keys, str(timeout))
self.callbacks.append(callback)
def delete(self, key, callback = default_result_handler):
self.buf += _buildResp("del", key)
self.callbacks.append(callback)
def unblock(self, client_id, callback=default_result_handler):
self.buf += _buildResp("client", "unblock", str(client_id))
self.callbacks.append(callback)
def scan(self, iter, match=None, count=None, callback = default_result_handler):
args = ["scan", str(iter)]
if match != None:
@ -142,68 +151,133 @@ class Client(asyncore.dispatcher):
self.buf += _buildResp(*args)
self.callbacks.append(callback)
def get_client_id(self):
self.buf += _buildResp("client", "id")
self.callbacks.append(self.store_client_id)
def store_client_id(self, c, resp):
assert(resp[0] == ord(':'))
self.client_id = int(resp[1:])
assert(self.client_id == c.client_id)
def get(self, key, callback = None):
return
def getrandomkey():
return str(random.randrange(0, numkeys))
def handle_lpush_response(c, resp):
def handle_lpush_response(c, resp=None):
global ops
if resp != None:
ops = ops + 1
ops['lpush'] += 1
assert(resp[0] == ord(':'))
c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response)
def handle_set_response(c, resp):
def handle_blpop_response(c, resp=None):
global ops
if resp != None:
ops = ops + 1
ops['blpop'] += 1
c.blpop("list_" + getrandomkey(), callback=handle_blpop_response)
def handle_set_response(c, resp=None):
global ops
if resp != None:
ops['set'] += 1
assert(resp[0] == ord('+'))
c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response)
def handle_del_response(c, resp):
def handle_del_response(c, resp=None):
global ops
if resp != None:
ops = ops + 1
ops['del'] += 1
c.delete("list_" + getrandomkey(), handle_del_response)
def scan_callback(c, resp):
def scan_callback(c, resp=None):
global ops
nextstart = int(resp[0])
c.scan(nextstart, count=500, callback=scan_callback)
ops = ops+1
ops['scan'] += 1
def clear_ops():
global ops
ops = {'lpush': 0, 'blpop': 0, 'del': 0, 'scan': 0, 'set': 0, 'get': 0}
def stats_thread():
global ops
global g_exit
while not g_exit:
global runtime
i = 0
while i < runtime or not runtime:
time.sleep(1)
print("Ops per second: " + str(ops))
ops = 0
print("Ops per second: " + str({k:v for (k,v) in ops.items() if v}))
clear_ops()
i += 1
asyncore.close_all()
def main():
global g_exit
clients = []
def flush_db_sync():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.connect(('127.0.0.1', 6379))
server.send(_buildResp("flushdb"))
resp = server.recv(8192)
assert(resp[:3] == "+OK".encode('utf-8'))
def init_blocking():
global clients
if numkeys > 100 * numclients:
print("WARNING: High ratio of keys to clients. Most lpushes will not be popped and unblocking will take a long time!")
for i in range(numclients):
clients.append(Client('127.0.0.1', 6379))
if i % 2:
handle_blpop_response(clients[-1])
else:
handle_lpush_response(clients[-1])
def init_lpush():
global clients
for i in range(numclients):
clients.append(Client('127.0.0.1', 6379))
for i in range (10):
handle_lpush_response(clients[-1], None)
handle_lpush_response(clients[-1])
#handle_set_response(clients[-1], None)
scan_client = Client('127.0.0.1', 6379)
scan_client.scan(0, count=500, callback=scan_callback)
del_client = Client('127.0.0.1', 6379)
handle_del_response(del_client, None)
handle_del_response(del_client)
def main(test, flush):
clear_ops()
if flush:
flush_db_sync()
try:
globals()[f"init_{test}"]()
except KeyError:
print(f"Test \"{test}\" not found. Exiting...")
exit()
except ConnectionRefusedError:
print("Could not connect to server. Is it running?")
print("Exiting...")
exit()
threading.Thread(target=stats_thread).start()
asyncore.loop()
g_exit = True
sys.exit(0)
print("DONE")
print("Done.")
parser = argparse.ArgumentParser(description="Test use cases for KeyDB.")
parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())], help="which test to run")
parser.add_argument('-c', '--clients', type=int, default=50, help="number of running clients to use")
parser.add_argument('-k', '--keys', type=int, default=100000, help="number of keys to choose from for random tests")
parser.add_argument('-t', '--runtime', type=int, default=0, help="how long to run the test for (default: 0 for infinite)")
parser.add_argument('-f', '--flush', action="store_true", help="flush the db before running the test")
if __name__ == "__main__":
main()
try:
args = parser.parse_args()
except:
exit()
numclients = args.clients
numkeys = args.keys
runtime = args.runtime
main(args.test, args.flush)

View File

@ -18,14 +18,15 @@ void AsyncWorkQueue::WorkerThreadMain()
vars.clients_pending_asyncwrite = listCreate();
aeAcquireLock();
m_mutex.lock();
m_vecpthreadVars.push_back(&vars);
aeReleaseLock();
m_mutex.unlock();
while (!m_fQuitting)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cvWakeup.wait(lock);
if (m_workqueue.empty())
m_cvWakeup.wait(lock);
while (!m_workqueue.empty())
{
@ -41,9 +42,11 @@ void AsyncWorkQueue::WorkerThreadMain()
lock.unlock();
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
aeAcquireLock();
ProcessPendingAsyncWrites();
aeReleaseLock();
if (listLength(serverTL->clients_pending_asyncwrite)) {
aeAcquireLock();
ProcessPendingAsyncWrites();
aeReleaseLock();
}
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch.reset();
}
@ -60,6 +63,7 @@ bool AsyncWorkQueue::removeClientAsyncWrites(client *c)
{
bool fFound = false;
aeAcquireLock();
m_mutex.lock();
for (auto pvars : m_vecpthreadVars)
{
listIter li;
@ -74,6 +78,7 @@ bool AsyncWorkQueue::removeClientAsyncWrites(client *c)
}
}
}
m_mutex.unlock();
aeReleaseLock();
return fFound;
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <functional>
#include "sds.h"
class IStorageFactory
{
@ -28,6 +29,14 @@ public:
virtual bool enumerate(callback fn) const = 0;
virtual size_t count() const = 0;
virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) {
beginWriteBatch();
for (size_t ielem = 0; ielem < celem; ++ielem) {
insert(rgkeys[ielem], sdslen(rgkeys[ielem]), rgvals[ielem], sdslen(rgvals[ielem]), false);
}
endWriteBatch();
}
virtual void beginWriteBatch() {} // NOP
virtual void endWriteBatch() {} // NOP

View File

@ -91,6 +91,20 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr
m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite);
}
void StorageCache::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem)
{
std::unique_lock<fastlock> ul(m_lock);
bulkInsertsInProgress++;
if (m_pdict != nullptr) {
for (size_t ielem = 0; ielem < celem; ++ielem) {
cacheKey(rgkeys[ielem]);
}
}
ul.unlock();
m_spstorage->bulkInsert(rgkeys, rgvals, celem);
bulkInsertsInProgress--;
}
const StorageCache *StorageCache::clone()
{
std::unique_lock<fastlock> ul(m_lock);
@ -119,7 +133,7 @@ size_t StorageCache::count() const
std::unique_lock<fastlock> ul(m_lock);
size_t count = m_spstorage->count();
if (m_pdict != nullptr) {
serverAssert(count == (dictSize(m_pdict) + m_collisionCount));
serverAssert(bulkInsertsInProgress.load(std::memory_order_seq_cst) || count == (dictSize(m_pdict) + m_collisionCount));
}
return count;
}

View File

@ -7,6 +7,7 @@ class StorageCache
dict *m_pdict = nullptr;
int m_collisionCount = 0;
mutable fastlock m_lock {"StorageCache"};
std::atomic<int> bulkInsertsInProgress;
StorageCache(IStorage *storage, bool fNoCache);
@ -37,6 +38,7 @@ public:
void clear();
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem);
void retrieve(sds key, IStorage::callbackSingle fn) const;
bool erase(sds key);
@ -50,4 +52,4 @@ public:
size_t count() const;
const StorageCache *clone();
};
};

View File

@ -535,7 +535,8 @@ dictAsyncRehashCtl::~dictAsyncRehashCtl() {
while (deGCList != nullptr) {
auto next = deGCList->next;
dictFreeKey(dict, deGCList);
dictFreeVal(dict, deGCList);
if (deGCList->v.val != nullptr)
dictFreeVal(dict, deGCList);
zfree(deGCList);
deGCList = next;
}
@ -694,6 +695,8 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
d->ht[table].table[idx] = he->next;
if (!nofree) {
if (table == 0 && d->asyncdata != nullptr && (ssize_t)idx < d->rehashidx) {
dictFreeVal(d, he);
he->v.val = nullptr;
he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he;
} else {
@ -752,6 +755,8 @@ void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
if (he == NULL) return;
if (d->asyncdata) {
dictFreeVal(d, he);
he->v.val = nullptr;
he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he;
} else {
@ -775,6 +780,8 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
while(he) {
nextHe = he->next;
if (d->asyncdata && (ssize_t)i < d->rehashidx) {
dictFreeVal(d, he);
he->v.val = nullptr;
he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he;
} else {

View File

@ -38,6 +38,17 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data,
++m_count;
}
void RocksDBStorageProvider::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem)
{
auto spbatch = std::make_unique<rocksdb::WriteBatch>();
for (size_t ielem = 0; ielem < celem; ++ielem) {
spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(rgkeys[ielem], sdslen(rgkeys[ielem])), rocksdb::Slice(rgvals[ielem], sdslen(rgvals[ielem])));
}
m_spdb->Write(WriteOptions(), spbatch.get());
std::unique_lock<fastlock> l(m_lock);
m_count += celem;
}
bool RocksDBStorageProvider::erase(const char *key, size_t cchKey)
{
rocksdb::Status status;

View File

@ -34,6 +34,8 @@ public:
virtual void beginWriteBatch() override;
virtual void endWriteBatch() override;
virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) override;
virtual void batch_lock() override;
virtual void batch_unlock() override;

View File

@ -395,7 +395,7 @@ start_server {tags {"defrag"} overrides {appendonly yes auto-aof-rewrite-percent
# if the current slab is lower in utilization the defragger would have ended up in stagnation,
# keept running and not move any allocation.
# this test is more consistent on a fresh server with no history
start_server {tags {"defrag"} overrides {save ""}} {
start_server {tags {"defrag"} overrides {save "" server-threads 1}} {
r flushdb
r config resetstat
r config set hz 100