From a2d8079d49320fc73fccaaff20e6839c641c1d35 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 17 Mar 2021 16:03:51 +0000 Subject: [PATCH] track each command separately in monkey Former-commit-id: e3accdaad985fd9f93490a5cc6704d9ecb909604 --- monkey/monkey.py | 44 +++++++++++++++++++------------------------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/monkey/monkey.py b/monkey/monkey.py index 5308a8276..e1b8fbb0e 100644 --- a/monkey/monkey.py +++ b/monkey/monkey.py @@ -9,7 +9,7 @@ import sys from pprint import pprint # Globals -ops=0 +ops = {} s = sched.scheduler(time.time, time.sleep) g_exit = False numclients = 0 @@ -170,52 +170,42 @@ class Client(asyncore.dispatcher): def getrandomkey(): return str(random.randrange(0, numkeys)) -def handle_lpush_response(c, resp, delay=0): +def handle_lpush_response(c, resp=None): global ops if resp != None: - ops = ops + 1 + ops['lpush'] += 1 assert(resp[0] == ord(':')) c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response) -def handle_blpop_response(c, resp): +def handle_blpop_response(c, resp=None): global ops if resp != None: - print("unblocked thread") - ops = ops + 1 + ops['blpop'] += 1 else: c.blpop("list_" + getrandomkey(), callback=handle_blpop_response) def handle_set_response(c, resp=None): global ops if resp != None: - ops = ops + 1 + ops['set'] += 1 assert(resp[0] == ord('+')) c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response) def handle_del_response(c, resp=None): global ops if resp != None: - ops = ops + 1 + ops['del'] += 1 c.delete("list_" + getrandomkey(), handle_del_response) def scan_callback(c, resp=None): global ops nextstart = int(resp[0]) c.scan(nextstart, count=500, callback=scan_callback) - ops = ops+1 + ops['scan'] += 1 -def unblock_clients(c, resp=None): - global clients +def clear_ops(): global ops - if resp != None: - ops = ops + 1 - time.sleep(1) - client_ids = list(map(lambda x: x.client_id, clients)) - for id in client_ids: - if id: - print(f"unblocking client {id}") - time.sleep(1) - c.unblock(id, unblock_clients) + ops = {'lpush': 0, 'blpop': 0, 'del': 0, 'scan': 0, 'set': 0, 'get': 0} def stats_thread(): global ops @@ -226,7 +216,7 @@ def stats_thread(): time.sleep(1) print("Ops per second: " + str(ops)) #print(f"Blocked threads: {len(list(filter(lambda x: x.blocked, clients)))}") - ops = 0 + clear_ops() i += 1 g_exit = True @@ -239,12 +229,14 @@ def flush_db_sync(): def init_blocking(): global clients + if numkeys > 5 * numclients: + print("WARNING: High ratio of keys to clients. Most lpushes will not be popped and unblocking will take a long time!") for i in range(numclients): clients.append(Client('127.0.0.1', 6379)) - handle_blpop_response(clients[-1], None) - - unblock_client = Client('127.0.0.1', 6379) - unblock_clients(unblock_client) + if i % 2: + handle_blpop_response(clients[-1]) + else: + handle_lpush_response(clients[-1]) def init_lpush(): global clients @@ -262,6 +254,8 @@ def init_lpush(): def main(test, flush): global g_exit + + clear_ops() if flush: flush_db_sync()