track each command separately in monkey

Former-commit-id: e3accdaad985fd9f93490a5cc6704d9ecb909604
This commit is contained in:
christianEQ 2021-03-17 16:03:51 +00:00
parent 6119273c5e
commit a2d8079d49

View File

@ -9,7 +9,7 @@ import sys
from pprint import pprint from pprint import pprint
# Globals # Globals
ops=0 ops = {}
s = sched.scheduler(time.time, time.sleep) s = sched.scheduler(time.time, time.sleep)
g_exit = False g_exit = False
numclients = 0 numclients = 0
@ -170,52 +170,42 @@ class Client(asyncore.dispatcher):
def getrandomkey(): def getrandomkey():
return str(random.randrange(0, numkeys)) return str(random.randrange(0, numkeys))
def handle_lpush_response(c, resp, delay=0): def handle_lpush_response(c, resp=None):
global ops global ops
if resp != None: if resp != None:
ops = ops + 1 ops['lpush'] += 1
assert(resp[0] == ord(':')) assert(resp[0] == ord(':'))
c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response) c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response)
def handle_blpop_response(c, resp): def handle_blpop_response(c, resp=None):
global ops global ops
if resp != None: if resp != None:
print("unblocked thread") ops['blpop'] += 1
ops = ops + 1
else: else:
c.blpop("list_" + getrandomkey(), callback=handle_blpop_response) c.blpop("list_" + getrandomkey(), callback=handle_blpop_response)
def handle_set_response(c, resp=None): def handle_set_response(c, resp=None):
global ops global ops
if resp != None: if resp != None:
ops = ops + 1 ops['set'] += 1
assert(resp[0] == ord('+')) assert(resp[0] == ord('+'))
c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response) c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response)
def handle_del_response(c, resp=None): def handle_del_response(c, resp=None):
global ops global ops
if resp != None: if resp != None:
ops = ops + 1 ops['del'] += 1
c.delete("list_" + getrandomkey(), handle_del_response) c.delete("list_" + getrandomkey(), handle_del_response)
def scan_callback(c, resp=None): def scan_callback(c, resp=None):
global ops global ops
nextstart = int(resp[0]) nextstart = int(resp[0])
c.scan(nextstart, count=500, callback=scan_callback) c.scan(nextstart, count=500, callback=scan_callback)
ops = ops+1 ops['scan'] += 1
def unblock_clients(c, resp=None): def clear_ops():
global clients
global ops global ops
if resp != None: ops = {'lpush': 0, 'blpop': 0, 'del': 0, 'scan': 0, 'set': 0, 'get': 0}
ops = ops + 1
time.sleep(1)
client_ids = list(map(lambda x: x.client_id, clients))
for id in client_ids:
if id:
print(f"unblocking client {id}")
time.sleep(1)
c.unblock(id, unblock_clients)
def stats_thread(): def stats_thread():
global ops global ops
@ -226,7 +216,7 @@ def stats_thread():
time.sleep(1) time.sleep(1)
print("Ops per second: " + str(ops)) print("Ops per second: " + str(ops))
#print(f"Blocked threads: {len(list(filter(lambda x: x.blocked, clients)))}") #print(f"Blocked threads: {len(list(filter(lambda x: x.blocked, clients)))}")
ops = 0 clear_ops()
i += 1 i += 1
g_exit = True g_exit = True
@ -239,12 +229,14 @@ def flush_db_sync():
def init_blocking(): def init_blocking():
global clients global clients
if numkeys > 5 * numclients:
print("WARNING: High ratio of keys to clients. Most lpushes will not be popped and unblocking will take a long time!")
for i in range(numclients): for i in range(numclients):
clients.append(Client('127.0.0.1', 6379)) clients.append(Client('127.0.0.1', 6379))
handle_blpop_response(clients[-1], None) if i % 2:
handle_blpop_response(clients[-1])
unblock_client = Client('127.0.0.1', 6379) else:
unblock_clients(unblock_client) handle_lpush_response(clients[-1])
def init_lpush(): def init_lpush():
global clients global clients
@ -263,6 +255,8 @@ def init_lpush():
def main(test, flush): def main(test, flush):
global g_exit global g_exit
clear_ops()
if flush: if flush:
flush_db_sync() flush_db_sync()