Merge branch 'keydbpro' of https://gitlab.eqalpha.com/keydb-dev/KeyDB-Pro into keydbpro

Former-commit-id: 6eb1417295ecc6ce23677e9a8e1c756579177629
This commit is contained in:
John Sully 2021-06-02 02:11:29 +00:00
commit c034037cb7
2 changed files with 108 additions and 34 deletions

View File

@ -1,21 +1,17 @@
import keydb
import random import random
import sched, time import time
import socket import socket
import asyncore import asyncore
import threading import threading
import sys import argparse
from pprint import pprint from pprint import pprint
# Parameters
numclients = 50
#numkeys = 1000000
numkeys = 100000
# Globals # Globals
ops=0 ops = {}
s = sched.scheduler(time.time, time.sleep) numclients = 0
g_exit = False numkeys = 0
runtime = 0
clients = []
def _buildResp(*args): def _buildResp(*args):
result = "*" + str(len(args)) + "\r\n" result = "*" + str(len(args)) + "\r\n"
@ -32,6 +28,8 @@ class Client(asyncore.dispatcher):
self.buf = b'' self.buf = b''
self.inbuf = b'' self.inbuf = b''
self.callbacks = list() self.callbacks = list()
self.client_id = 0
self.get_client_id()
def handle_connect(self): def handle_connect(self):
pass pass
@ -55,6 +53,9 @@ class Client(asyncore.dispatcher):
endrange = self.inbuf[startpos+1:].find(ord('\r')) + 1 + startpos endrange = self.inbuf[startpos+1:].find(ord('\r')) + 1 + startpos
assert(endrange > 0) assert(endrange > 0)
numargs = int(self.inbuf[startpos+1:endrange]) numargs = int(self.inbuf[startpos+1:endrange])
if numargs == -1: # Nil array, used in some returns
startpos = endrange + 2
return startpos, []
assert(numargs > 0) assert(numargs > 0)
args = list() args = list()
startpos = endrange + 2 # plus 1 gets us to the '\n' and the next gets us to the start char startpos = endrange + 2 # plus 1 gets us to the '\n' and the next gets us to the start char
@ -127,10 +128,18 @@ class Client(asyncore.dispatcher):
self.buf += _buildResp("lpush", key, val) self.buf += _buildResp("lpush", key, val)
self.callbacks.append(callback) self.callbacks.append(callback)
def blpop(self, *keys, timeout=0, callback=default_result_handler):
self.buf += _buildResp("blpop", *keys, str(timeout))
self.callbacks.append(callback)
def delete(self, key, callback = default_result_handler): def delete(self, key, callback = default_result_handler):
self.buf += _buildResp("del", key) self.buf += _buildResp("del", key)
self.callbacks.append(callback) self.callbacks.append(callback)
def unblock(self, client_id, callback=default_result_handler):
self.buf += _buildResp("client", "unblock", str(client_id))
self.callbacks.append(callback)
def scan(self, iter, match=None, count=None, callback = default_result_handler): def scan(self, iter, match=None, count=None, callback = default_result_handler):
args = ["scan", str(iter)] args = ["scan", str(iter)]
if match != None: if match != None:
@ -142,68 +151,133 @@ class Client(asyncore.dispatcher):
self.buf += _buildResp(*args) self.buf += _buildResp(*args)
self.callbacks.append(callback) self.callbacks.append(callback)
def get_client_id(self):
self.buf += _buildResp("client", "id")
self.callbacks.append(self.store_client_id)
def store_client_id(self, c, resp):
assert(resp[0] == ord(':'))
self.client_id = int(resp[1:])
assert(self.client_id == c.client_id)
def get(self, key, callback = None): def get(self, key, callback = None):
return return
def getrandomkey(): def getrandomkey():
return str(random.randrange(0, numkeys)) return str(random.randrange(0, numkeys))
def handle_lpush_response(c, resp): def handle_lpush_response(c, resp=None):
global ops global ops
if resp != None: if resp != None:
ops = ops + 1 ops['lpush'] += 1
assert(resp[0] == ord(':')) assert(resp[0] == ord(':'))
c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response) c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response)
def handle_set_response(c, resp): def handle_blpop_response(c, resp=None):
global ops global ops
if resp != None: if resp != None:
ops = ops + 1 ops['blpop'] += 1
c.blpop("list_" + getrandomkey(), callback=handle_blpop_response)
def handle_set_response(c, resp=None):
global ops
if resp != None:
ops['set'] += 1
assert(resp[0] == ord('+')) assert(resp[0] == ord('+'))
c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response) c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response)
def handle_del_response(c, resp): def handle_del_response(c, resp=None):
global ops global ops
if resp != None: if resp != None:
ops = ops + 1 ops['del'] += 1
c.delete("list_" + getrandomkey(), handle_del_response) c.delete("list_" + getrandomkey(), handle_del_response)
def scan_callback(c, resp): def scan_callback(c, resp=None):
global ops global ops
nextstart = int(resp[0]) nextstart = int(resp[0])
c.scan(nextstart, count=500, callback=scan_callback) c.scan(nextstart, count=500, callback=scan_callback)
ops = ops+1 ops['scan'] += 1
def clear_ops():
global ops
ops = {'lpush': 0, 'blpop': 0, 'del': 0, 'scan': 0, 'set': 0, 'get': 0}
def stats_thread(): def stats_thread():
global ops global ops
global g_exit global runtime
while not g_exit: i = 0
while i < runtime or not runtime:
time.sleep(1) time.sleep(1)
print("Ops per second: " + str(ops)) print("Ops per second: " + str({k:v for (k,v) in ops.items() if v}))
ops = 0 clear_ops()
i += 1
asyncore.close_all()
def main(): def flush_db_sync():
global g_exit server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clients = [] server.connect(('127.0.0.1', 6379))
server.send(_buildResp("flushdb"))
resp = server.recv(8192)
assert(resp[:3] == "+OK".encode('utf-8'))
def init_blocking():
global clients
if numkeys > 100 * numclients:
print("WARNING: High ratio of keys to clients. Most lpushes will not be popped and unblocking will take a long time!")
for i in range(numclients):
clients.append(Client('127.0.0.1', 6379))
if i % 2:
handle_blpop_response(clients[-1])
else:
handle_lpush_response(clients[-1])
def init_lpush():
global clients
for i in range(numclients): for i in range(numclients):
clients.append(Client('127.0.0.1', 6379)) clients.append(Client('127.0.0.1', 6379))
for i in range (10): for i in range (10):
handle_lpush_response(clients[-1], None) handle_lpush_response(clients[-1])
#handle_set_response(clients[-1], None) #handle_set_response(clients[-1], None)
scan_client = Client('127.0.0.1', 6379) scan_client = Client('127.0.0.1', 6379)
scan_client.scan(0, count=500, callback=scan_callback) scan_client.scan(0, count=500, callback=scan_callback)
del_client = Client('127.0.0.1', 6379) del_client = Client('127.0.0.1', 6379)
handle_del_response(del_client, None) handle_del_response(del_client)
def main(test, flush):
clear_ops()
if flush:
flush_db_sync()
try:
globals()[f"init_{test}"]()
except KeyError:
print(f"Test \"{test}\" not found. Exiting...")
exit()
except ConnectionRefusedError:
print("Could not connect to server. Is it running?")
print("Exiting...")
exit()
threading.Thread(target=stats_thread).start() threading.Thread(target=stats_thread).start()
asyncore.loop() asyncore.loop()
g_exit = True print("Done.")
sys.exit(0)
print("DONE") parser = argparse.ArgumentParser(description="Test use cases for KeyDB.")
parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())], help="which test to run")
parser.add_argument('-c', '--clients', type=int, default=50, help="number of running clients to use")
parser.add_argument('-k', '--keys', type=int, default=100000, help="number of keys to choose from for random tests")
parser.add_argument('-t', '--runtime', type=int, default=0, help="how long to run the test for (default: 0 for infinite)")
parser.add_argument('-f', '--flush', action="store_true", help="flush the db before running the test")
if __name__ == "__main__": if __name__ == "__main__":
main() try:
args = parser.parse_args()
except:
exit()
numclients = args.clients
numkeys = args.keys
runtime = args.runtime
main(args.test, args.flush)

View File

@ -673,7 +673,7 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
if (!nofree) { if (!nofree) {
if (table == 0 && d->asyncdata != nullptr && (ssize_t)idx < d->rehashidx) { if (table == 0 && d->asyncdata != nullptr && (ssize_t)idx < d->rehashidx) {
he->next = d->asyncdata->deGCList; he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he->next; d->asyncdata->deGCList = he;
} else { } else {
dictFreeKey(d, he); dictFreeKey(d, he);
dictFreeVal(d, he); dictFreeVal(d, he);