diff --git a/monkey/monkey.py b/monkey/monkey.py index 67e426486..5308a8276 100644 --- a/monkey/monkey.py +++ b/monkey/monkey.py @@ -15,6 +15,7 @@ g_exit = False numclients = 0 numkeys = 0 runtime = 0 +clients = [] def _buildResp(*args): result = "*" + str(len(args)) + "\r\n" @@ -31,6 +32,8 @@ class Client(asyncore.dispatcher): self.buf = b'' self.inbuf = b'' self.callbacks = list() + self.client_id = 0 + self.get_client_id() def handle_connect(self): pass @@ -54,6 +57,9 @@ class Client(asyncore.dispatcher): endrange = self.inbuf[startpos+1:].find(ord('\r')) + 1 + startpos assert(endrange > 0) numargs = int(self.inbuf[startpos+1:endrange]) + if numargs == -1: # Nil array, used in some returns + startpos = endrange + 2 + return startpos, [] assert(numargs > 0) args = list() startpos = endrange + 2 # plus 1 gets us to the '\n' and the next gets us to the start char @@ -134,6 +140,10 @@ class Client(asyncore.dispatcher): self.buf += _buildResp("del", key) self.callbacks.append(callback) + def unblock(self, client_id, callback=default_result_handler): + self.buf += _buildResp("client", "unblock", str(client_id)) + self.callbacks.append(callback) + def scan(self, iter, match=None, count=None, callback = default_result_handler): args = ["scan", str(iter)] if match != None: @@ -145,10 +155,18 @@ class Client(asyncore.dispatcher): self.buf += _buildResp(*args) self.callbacks.append(callback) + def get_client_id(self): + self.buf += _buildResp("client", "id") + self.callbacks.append(self.store_client_id) + + def store_client_id(self, c, resp): + assert(resp[0] == ord(':')) + self.client_id = int(resp[1:]) + assert(self.client_id == c.client_id) + def get(self, key, callback = None): return - def getrandomkey(): return str(random.randrange(0, numkeys)) @@ -162,28 +180,43 @@ def handle_lpush_response(c, resp, delay=0): def handle_blpop_response(c, resp): global ops if resp != None: + print("unblocked thread") ops = ops + 1 - c.blpop("list_" + getrandomkey(), callback=handle_blpop_response) + else: + c.blpop("list_" + getrandomkey(), callback=handle_blpop_response) -def handle_set_response(c, resp): +def handle_set_response(c, resp=None): global ops if resp != None: ops = ops + 1 assert(resp[0] == ord('+')) c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response) -def handle_del_response(c, resp): +def handle_del_response(c, resp=None): global ops if resp != None: ops = ops + 1 c.delete("list_" + getrandomkey(), handle_del_response) -def scan_callback(c, resp): +def scan_callback(c, resp=None): global ops nextstart = int(resp[0]) c.scan(nextstart, count=500, callback=scan_callback) ops = ops+1 +def unblock_clients(c, resp=None): + global clients + global ops + if resp != None: + ops = ops + 1 + time.sleep(1) + client_ids = list(map(lambda x: x.client_id, clients)) + for id in client_ids: + if id: + print(f"unblocking client {id}") + time.sleep(1) + c.unblock(id, unblock_clients) + def stats_thread(): global ops global g_exit @@ -192,34 +225,47 @@ def stats_thread(): while not g_exit and not (runtime and i > runtime): time.sleep(1) print("Ops per second: " + str(ops)) + #print(f"Blocked threads: {len(list(filter(lambda x: x.blocked, clients)))}") ops = 0 i += 1 g_exit = True +def flush_db_sync(): + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.connect(('127.0.0.1', 6379)) + server.send(_buildResp("flushdb")) + resp = server.recv(8192) + assert(resp[:3] == "+OK".encode('utf-8')) + def init_blocking(): + global clients for i in range(numclients): - c = Client('127.0.0.1', 6379) - if i % 2: - handle_lpush_response(c, None, delay=1) - else: - handle_blpop_response(c, None) + clients.append(Client('127.0.0.1', 6379)) + handle_blpop_response(clients[-1], None) + + unblock_client = Client('127.0.0.1', 6379) + unblock_clients(unblock_client) def init_lpush(): + global clients for i in range(numclients): - c = Client('127.0.0.1', 6379) + clients.append(Client('127.0.0.1', 6379)) for i in range (10): - handle_lpush_response(c, None) + handle_lpush_response(c) #handle_set_response(clients[-1], None) scan_client = Client('127.0.0.1', 6379) scan_client.scan(0, count=500, callback=scan_callback) del_client = Client('127.0.0.1', 6379) - handle_del_response(del_client, None) + handle_del_response(del_client) -def main(test): +def main(test, flush): global g_exit + if flush: + flush_db_sync() + try: globals()[f"init_{test}"]() except KeyError: @@ -233,10 +279,11 @@ def main(test): print("DONE") parser = argparse.ArgumentParser(description="Test use cases for KeyDB.") -parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())]) -parser.add_argument('-c', '--clients', type=int, default=50) -parser.add_argument('-k', '--keys', type=int, default=100000) -parser.add_argument('-t', '--runtime', type=int, default=0) +parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())], help="which test to run") +parser.add_argument('-c', '--clients', type=int, default=50, help="number of running clients to use") +parser.add_argument('-k', '--keys', type=int, default=100000, help="number of keys to choose from for random tests") +parser.add_argument('-t', '--runtime', type=int, default=0, help="how long to run the test for (default: 0 for infinite)") +parser.add_argument('-f', '--flush', action="store_true", help="flush the db before running the test") if __name__ == "__main__": try: @@ -246,4 +293,4 @@ if __name__ == "__main__": numclients = args.clients numkeys = args.keys runtime = args.runtime - main(args.test) + main(args.test, args.flush)