From 78256076734d8b859e744cc6d84074a50b625df1 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Tue, 16 Mar 2021 23:19:29 +0000 Subject: [PATCH 1/8] parameterized monkey options Former-commit-id: eda1055689587c25dca3e889dd3a74d3f9c9378f --- monkey/monkey.py | 68 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 54 insertions(+), 14 deletions(-) diff --git a/monkey/monkey.py b/monkey/monkey.py index 653970728..67e426486 100644 --- a/monkey/monkey.py +++ b/monkey/monkey.py @@ -4,18 +4,17 @@ import sched, time import socket import asyncore import threading +import argparse import sys from pprint import pprint -# Parameters -numclients = 50 -#numkeys = 1000000 -numkeys = 100000 - # Globals ops=0 s = sched.scheduler(time.time, time.sleep) g_exit = False +numclients = 0 +numkeys = 0 +runtime = 0 def _buildResp(*args): result = "*" + str(len(args)) + "\r\n" @@ -127,6 +126,10 @@ class Client(asyncore.dispatcher): self.buf += _buildResp("lpush", key, val) self.callbacks.append(callback) + def blpop(self, *keys, timeout=0, callback=default_result_handler): + self.buf += _buildResp("blpop", *keys, str(timeout)) + self.callbacks.append(callback) + def delete(self, key, callback = default_result_handler): self.buf += _buildResp("del", key) self.callbacks.append(callback) @@ -149,13 +152,19 @@ class Client(asyncore.dispatcher): def getrandomkey(): return str(random.randrange(0, numkeys)) -def handle_lpush_response(c, resp): +def handle_lpush_response(c, resp, delay=0): global ops if resp != None: ops = ops + 1 assert(resp[0] == ord(':')) c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response) +def handle_blpop_response(c, resp): + global ops + if resp != None: + ops = ops + 1 + c.blpop("list_" + getrandomkey(), callback=handle_blpop_response) + def handle_set_response(c, resp): global ops if resp != None: @@ -178,19 +187,28 @@ def scan_callback(c, resp): def stats_thread(): global ops global g_exit - while not g_exit: + global runtime + i = 0 + while not g_exit and not (runtime and i > runtime): time.sleep(1) print("Ops per second: " + str(ops)) ops = 0 + i += 1 + g_exit = True -def main(): - global g_exit - clients = [] - +def init_blocking(): for i in range(numclients): - clients.append(Client('127.0.0.1', 6379)) + c = Client('127.0.0.1', 6379) + if i % 2: + handle_lpush_response(c, None, delay=1) + else: + handle_blpop_response(c, None) + +def init_lpush(): + for i in range(numclients): + c = Client('127.0.0.1', 6379) for i in range (10): - handle_lpush_response(clients[-1], None) + handle_lpush_response(c, None) #handle_set_response(clients[-1], None) scan_client = Client('127.0.0.1', 6379) @@ -199,11 +217,33 @@ def main(): del_client = Client('127.0.0.1', 6379) handle_del_response(del_client, None) +def main(test): + global g_exit + + try: + globals()[f"init_{test}"]() + except KeyError: + print(f"Test \"{test}\" not found. Exiting...") + exit() + threading.Thread(target=stats_thread).start() asyncore.loop() g_exit = True sys.exit(0) print("DONE") +parser = argparse.ArgumentParser(description="Test use cases for KeyDB.") +parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())]) +parser.add_argument('-c', '--clients', type=int, default=50) +parser.add_argument('-k', '--keys', type=int, default=100000) +parser.add_argument('-t', '--runtime', type=int, default=0) + if __name__ == "__main__": - main() + try: + args = parser.parse_args() + except: + exit() + numclients = args.clients + numkeys = args.keys + runtime = args.runtime + main(args.test) From de2bd40afb55c1f6539d758d8e22e395b8a03579 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 17 Mar 2021 01:40:38 +0000 Subject: [PATCH 2/8] unblocking test to monkey, works with 1 client Former-commit-id: 9fbe8cf6a8aeb141d4a502532a456e4256f4daf8 --- monkey/monkey.py | 85 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 66 insertions(+), 19 deletions(-) diff --git a/monkey/monkey.py b/monkey/monkey.py index 67e426486..5308a8276 100644 --- a/monkey/monkey.py +++ b/monkey/monkey.py @@ -15,6 +15,7 @@ g_exit = False numclients = 0 numkeys = 0 runtime = 0 +clients = [] def _buildResp(*args): result = "*" + str(len(args)) + "\r\n" @@ -31,6 +32,8 @@ class Client(asyncore.dispatcher): self.buf = b'' self.inbuf = b'' self.callbacks = list() + self.client_id = 0 + self.get_client_id() def handle_connect(self): pass @@ -54,6 +57,9 @@ class Client(asyncore.dispatcher): endrange = self.inbuf[startpos+1:].find(ord('\r')) + 1 + startpos assert(endrange > 0) numargs = int(self.inbuf[startpos+1:endrange]) + if numargs == -1: # Nil array, used in some returns + startpos = endrange + 2 + return startpos, [] assert(numargs > 0) args = list() startpos = endrange + 2 # plus 1 gets us to the '\n' and the next gets us to the start char @@ -134,6 +140,10 @@ class Client(asyncore.dispatcher): self.buf += _buildResp("del", key) self.callbacks.append(callback) + def unblock(self, client_id, callback=default_result_handler): + self.buf += _buildResp("client", "unblock", str(client_id)) + self.callbacks.append(callback) + def scan(self, iter, match=None, count=None, callback = default_result_handler): args = ["scan", str(iter)] if match != None: @@ -145,10 +155,18 @@ class Client(asyncore.dispatcher): self.buf += _buildResp(*args) self.callbacks.append(callback) + def get_client_id(self): + self.buf += _buildResp("client", "id") + self.callbacks.append(self.store_client_id) + + def store_client_id(self, c, resp): + assert(resp[0] == ord(':')) + self.client_id = int(resp[1:]) + assert(self.client_id == c.client_id) + def get(self, key, callback = None): return - def getrandomkey(): return str(random.randrange(0, numkeys)) @@ -162,28 +180,43 @@ def handle_lpush_response(c, resp, delay=0): def handle_blpop_response(c, resp): global ops if resp != None: + print("unblocked thread") ops = ops + 1 - c.blpop("list_" + getrandomkey(), callback=handle_blpop_response) + else: + c.blpop("list_" + getrandomkey(), callback=handle_blpop_response) -def handle_set_response(c, resp): +def handle_set_response(c, resp=None): global ops if resp != None: ops = ops + 1 assert(resp[0] == ord('+')) c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response) -def handle_del_response(c, resp): +def handle_del_response(c, resp=None): global ops if resp != None: ops = ops + 1 c.delete("list_" + getrandomkey(), handle_del_response) -def scan_callback(c, resp): +def scan_callback(c, resp=None): global ops nextstart = int(resp[0]) c.scan(nextstart, count=500, callback=scan_callback) ops = ops+1 +def unblock_clients(c, resp=None): + global clients + global ops + if resp != None: + ops = ops + 1 + time.sleep(1) + client_ids = list(map(lambda x: x.client_id, clients)) + for id in client_ids: + if id: + print(f"unblocking client {id}") + time.sleep(1) + c.unblock(id, unblock_clients) + def stats_thread(): global ops global g_exit @@ -192,34 +225,47 @@ def stats_thread(): while not g_exit and not (runtime and i > runtime): time.sleep(1) print("Ops per second: " + str(ops)) + #print(f"Blocked threads: {len(list(filter(lambda x: x.blocked, clients)))}") ops = 0 i += 1 g_exit = True +def flush_db_sync(): + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.connect(('127.0.0.1', 6379)) + server.send(_buildResp("flushdb")) + resp = server.recv(8192) + assert(resp[:3] == "+OK".encode('utf-8')) + def init_blocking(): + global clients for i in range(numclients): - c = Client('127.0.0.1', 6379) - if i % 2: - handle_lpush_response(c, None, delay=1) - else: - handle_blpop_response(c, None) + clients.append(Client('127.0.0.1', 6379)) + handle_blpop_response(clients[-1], None) + + unblock_client = Client('127.0.0.1', 6379) + unblock_clients(unblock_client) def init_lpush(): + global clients for i in range(numclients): - c = Client('127.0.0.1', 6379) + clients.append(Client('127.0.0.1', 6379)) for i in range (10): - handle_lpush_response(c, None) + handle_lpush_response(c) #handle_set_response(clients[-1], None) scan_client = Client('127.0.0.1', 6379) scan_client.scan(0, count=500, callback=scan_callback) del_client = Client('127.0.0.1', 6379) - handle_del_response(del_client, None) + handle_del_response(del_client) -def main(test): +def main(test, flush): global g_exit + if flush: + flush_db_sync() + try: globals()[f"init_{test}"]() except KeyError: @@ -233,10 +279,11 @@ def main(test): print("DONE") parser = argparse.ArgumentParser(description="Test use cases for KeyDB.") -parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())]) -parser.add_argument('-c', '--clients', type=int, default=50) -parser.add_argument('-k', '--keys', type=int, default=100000) -parser.add_argument('-t', '--runtime', type=int, default=0) +parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())], help="which test to run") +parser.add_argument('-c', '--clients', type=int, default=50, help="number of running clients to use") +parser.add_argument('-k', '--keys', type=int, default=100000, help="number of keys to choose from for random tests") +parser.add_argument('-t', '--runtime', type=int, default=0, help="how long to run the test for (default: 0 for infinite)") +parser.add_argument('-f', '--flush', action="store_true", help="flush the db before running the test") if __name__ == "__main__": try: @@ -246,4 +293,4 @@ if __name__ == "__main__": numclients = args.clients numkeys = args.keys runtime = args.runtime - main(args.test) + main(args.test, args.flush) From a58597493210a691960b2501b36c44ea799adfee Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 17 Mar 2021 16:03:51 +0000 Subject: [PATCH 3/8] track each command separately in monkey Former-commit-id: e3accdaad985fd9f93490a5cc6704d9ecb909604 --- monkey/monkey.py | 44 +++++++++++++++++++------------------------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/monkey/monkey.py b/monkey/monkey.py index 5308a8276..e1b8fbb0e 100644 --- a/monkey/monkey.py +++ b/monkey/monkey.py @@ -9,7 +9,7 @@ import sys from pprint import pprint # Globals -ops=0 +ops = {} s = sched.scheduler(time.time, time.sleep) g_exit = False numclients = 0 @@ -170,52 +170,42 @@ class Client(asyncore.dispatcher): def getrandomkey(): return str(random.randrange(0, numkeys)) -def handle_lpush_response(c, resp, delay=0): +def handle_lpush_response(c, resp=None): global ops if resp != None: - ops = ops + 1 + ops['lpush'] += 1 assert(resp[0] == ord(':')) c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response) -def handle_blpop_response(c, resp): +def handle_blpop_response(c, resp=None): global ops if resp != None: - print("unblocked thread") - ops = ops + 1 + ops['blpop'] += 1 else: c.blpop("list_" + getrandomkey(), callback=handle_blpop_response) def handle_set_response(c, resp=None): global ops if resp != None: - ops = ops + 1 + ops['set'] += 1 assert(resp[0] == ord('+')) c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response) def handle_del_response(c, resp=None): global ops if resp != None: - ops = ops + 1 + ops['del'] += 1 c.delete("list_" + getrandomkey(), handle_del_response) def scan_callback(c, resp=None): global ops nextstart = int(resp[0]) c.scan(nextstart, count=500, callback=scan_callback) - ops = ops+1 + ops['scan'] += 1 -def unblock_clients(c, resp=None): - global clients +def clear_ops(): global ops - if resp != None: - ops = ops + 1 - time.sleep(1) - client_ids = list(map(lambda x: x.client_id, clients)) - for id in client_ids: - if id: - print(f"unblocking client {id}") - time.sleep(1) - c.unblock(id, unblock_clients) + ops = {'lpush': 0, 'blpop': 0, 'del': 0, 'scan': 0, 'set': 0, 'get': 0} def stats_thread(): global ops @@ -226,7 +216,7 @@ def stats_thread(): time.sleep(1) print("Ops per second: " + str(ops)) #print(f"Blocked threads: {len(list(filter(lambda x: x.blocked, clients)))}") - ops = 0 + clear_ops() i += 1 g_exit = True @@ -239,12 +229,14 @@ def flush_db_sync(): def init_blocking(): global clients + if numkeys > 5 * numclients: + print("WARNING: High ratio of keys to clients. Most lpushes will not be popped and unblocking will take a long time!") for i in range(numclients): clients.append(Client('127.0.0.1', 6379)) - handle_blpop_response(clients[-1], None) - - unblock_client = Client('127.0.0.1', 6379) - unblock_clients(unblock_client) + if i % 2: + handle_blpop_response(clients[-1]) + else: + handle_lpush_response(clients[-1]) def init_lpush(): global clients @@ -262,6 +254,8 @@ def init_lpush(): def main(test, flush): global g_exit + + clear_ops() if flush: flush_db_sync() From 91a18c2313771cb3e1c027b10b0773234465df60 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 17 Mar 2021 16:10:06 +0000 Subject: [PATCH 4/8] working blpop test in monkey Former-commit-id: 21af13c2a38741ad9e036fad42b23ea2454ac33c --- monkey/monkey.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/monkey/monkey.py b/monkey/monkey.py index e1b8fbb0e..07f157961 100644 --- a/monkey/monkey.py +++ b/monkey/monkey.py @@ -181,8 +181,7 @@ def handle_blpop_response(c, resp=None): global ops if resp != None: ops['blpop'] += 1 - else: - c.blpop("list_" + getrandomkey(), callback=handle_blpop_response) + c.blpop("list_" + getrandomkey(), callback=handle_blpop_response) def handle_set_response(c, resp=None): global ops @@ -214,7 +213,7 @@ def stats_thread(): i = 0 while not g_exit and not (runtime and i > runtime): time.sleep(1) - print("Ops per second: " + str(ops)) + print("Ops per second: " + str({k:v for (k,v) in ops.items() if v})) #print(f"Blocked threads: {len(list(filter(lambda x: x.blocked, clients)))}") clear_ops() i += 1 @@ -229,7 +228,7 @@ def flush_db_sync(): def init_blocking(): global clients - if numkeys > 5 * numclients: + if numkeys > 100 * numclients: print("WARNING: High ratio of keys to clients. Most lpushes will not be popped and unblocking will take a long time!") for i in range(numclients): clients.append(Client('127.0.0.1', 6379)) @@ -265,6 +264,10 @@ def main(test, flush): except KeyError: print(f"Test \"{test}\" not found. Exiting...") exit() + except ConnectionRefusedError: + print("Could not connect to server. Is it running?") + print("Exiting...") + exit() threading.Thread(target=stats_thread).start() asyncore.loop() From 31f8c6b63da5cdc1003d6bfcb9e65410a7046420 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 17 Mar 2021 19:50:28 +0000 Subject: [PATCH 5/8] fixed runtime arg for monkey Former-commit-id: b3e6e1f13b3d24c92d9f7e8441831232f47ff053 --- monkey/monkey.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/monkey/monkey.py b/monkey/monkey.py index 07f157961..3c23874a8 100644 --- a/monkey/monkey.py +++ b/monkey/monkey.py @@ -211,13 +211,14 @@ def stats_thread(): global g_exit global runtime i = 0 - while not g_exit and not (runtime and i > runtime): + while not g_exit and not (runtime and i >= runtime): time.sleep(1) print("Ops per second: " + str({k:v for (k,v) in ops.items() if v})) #print(f"Blocked threads: {len(list(filter(lambda x: x.blocked, clients)))}") clear_ops() i += 1 g_exit = True + asyncore.close_all() def flush_db_sync(): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -253,7 +254,7 @@ def init_lpush(): def main(test, flush): global g_exit - + clear_ops() if flush: @@ -272,8 +273,7 @@ def main(test, flush): threading.Thread(target=stats_thread).start() asyncore.loop() g_exit = True - sys.exit(0) - print("DONE") + print("Done.") parser = argparse.ArgumentParser(description="Test use cases for KeyDB.") parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())], help="which test to run") From 1fc5fdd52551542b90ea078cbaf923d64a3d70cd Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 17 Mar 2021 20:17:52 +0000 Subject: [PATCH 6/8] removed unused stuff from monkey Former-commit-id: be55daa6f7bcbf137b37269d1ee7a5fcacc14ff0 --- monkey/monkey.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/monkey/monkey.py b/monkey/monkey.py index 3c23874a8..a31b35f9b 100644 --- a/monkey/monkey.py +++ b/monkey/monkey.py @@ -1,17 +1,13 @@ -import keydb import random -import sched, time +import time import socket import asyncore import threading import argparse -import sys from pprint import pprint # Globals ops = {} -s = sched.scheduler(time.time, time.sleep) -g_exit = False numclients = 0 numkeys = 0 runtime = 0 @@ -208,16 +204,13 @@ def clear_ops(): def stats_thread(): global ops - global g_exit global runtime i = 0 - while not g_exit and not (runtime and i >= runtime): + while i < runtime or not runtime: time.sleep(1) print("Ops per second: " + str({k:v for (k,v) in ops.items() if v})) - #print(f"Blocked threads: {len(list(filter(lambda x: x.blocked, clients)))}") clear_ops() i += 1 - g_exit = True asyncore.close_all() def flush_db_sync(): @@ -253,8 +246,6 @@ def init_lpush(): handle_del_response(del_client) def main(test, flush): - global g_exit - clear_ops() if flush: @@ -272,7 +263,6 @@ def main(test, flush): threading.Thread(target=stats_thread).start() asyncore.loop() - g_exit = True print("Done.") parser = argparse.ArgumentParser(description="Test use cases for KeyDB.") From 8858ffd2a911e39f96c0737bcc3402024a22916d Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 17 Mar 2021 20:22:01 +0000 Subject: [PATCH 7/8] fix old reference to c variable (monkey) Former-commit-id: 0b9b6413c70d4ba71bbabcd0b22fb004d804958f --- monkey/monkey.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monkey/monkey.py b/monkey/monkey.py index a31b35f9b..37e5e4440 100644 --- a/monkey/monkey.py +++ b/monkey/monkey.py @@ -236,7 +236,7 @@ def init_lpush(): for i in range(numclients): clients.append(Client('127.0.0.1', 6379)) for i in range (10): - handle_lpush_response(c) + handle_lpush_response(clients[-1]) #handle_set_response(clients[-1], None) scan_client = Client('127.0.0.1', 6379) From d6fbca3794f2a5016f3af758e64f2dafb44eae62 Mon Sep 17 00:00:00 2001 From: malavan Date: Wed, 26 May 2021 17:01:51 +0000 Subject: [PATCH 8/8] fix leak caused by dict entry not being added to GCList Former-commit-id: d8c1b3b6ec64f63fdff04d53102e4563c2c6764a --- src/dict.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dict.cpp b/src/dict.cpp index 88ad116bb..b37b35a6a 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -673,7 +673,7 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { if (!nofree) { if (table == 0 && d->asyncdata != nullptr && (ssize_t)idx < d->rehashidx) { he->next = d->asyncdata->deGCList; - d->asyncdata->deGCList = he->next; + d->asyncdata->deGCList = he; } else { dictFreeKey(d, he); dictFreeVal(d, he);