unblocking test to monkey, works with 1 client
Former-commit-id: 9fbe8cf6a8aeb141d4a502532a456e4256f4daf8
This commit is contained in:
parent
a93234accc
commit
6119273c5e
@ -15,6 +15,7 @@ g_exit = False
|
||||
numclients = 0
|
||||
numkeys = 0
|
||||
runtime = 0
|
||||
clients = []
|
||||
|
||||
def _buildResp(*args):
|
||||
result = "*" + str(len(args)) + "\r\n"
|
||||
@ -31,6 +32,8 @@ class Client(asyncore.dispatcher):
|
||||
self.buf = b''
|
||||
self.inbuf = b''
|
||||
self.callbacks = list()
|
||||
self.client_id = 0
|
||||
self.get_client_id()
|
||||
|
||||
def handle_connect(self):
|
||||
pass
|
||||
@ -54,6 +57,9 @@ class Client(asyncore.dispatcher):
|
||||
endrange = self.inbuf[startpos+1:].find(ord('\r')) + 1 + startpos
|
||||
assert(endrange > 0)
|
||||
numargs = int(self.inbuf[startpos+1:endrange])
|
||||
if numargs == -1: # Nil array, used in some returns
|
||||
startpos = endrange + 2
|
||||
return startpos, []
|
||||
assert(numargs > 0)
|
||||
args = list()
|
||||
startpos = endrange + 2 # plus 1 gets us to the '\n' and the next gets us to the start char
|
||||
@ -134,6 +140,10 @@ class Client(asyncore.dispatcher):
|
||||
self.buf += _buildResp("del", key)
|
||||
self.callbacks.append(callback)
|
||||
|
||||
def unblock(self, client_id, callback=default_result_handler):
|
||||
self.buf += _buildResp("client", "unblock", str(client_id))
|
||||
self.callbacks.append(callback)
|
||||
|
||||
def scan(self, iter, match=None, count=None, callback = default_result_handler):
|
||||
args = ["scan", str(iter)]
|
||||
if match != None:
|
||||
@ -145,10 +155,18 @@ class Client(asyncore.dispatcher):
|
||||
self.buf += _buildResp(*args)
|
||||
self.callbacks.append(callback)
|
||||
|
||||
def get_client_id(self):
|
||||
self.buf += _buildResp("client", "id")
|
||||
self.callbacks.append(self.store_client_id)
|
||||
|
||||
def store_client_id(self, c, resp):
|
||||
assert(resp[0] == ord(':'))
|
||||
self.client_id = int(resp[1:])
|
||||
assert(self.client_id == c.client_id)
|
||||
|
||||
def get(self, key, callback = None):
|
||||
return
|
||||
|
||||
|
||||
def getrandomkey():
|
||||
return str(random.randrange(0, numkeys))
|
||||
|
||||
@ -162,28 +180,43 @@ def handle_lpush_response(c, resp, delay=0):
|
||||
def handle_blpop_response(c, resp):
|
||||
global ops
|
||||
if resp != None:
|
||||
print("unblocked thread")
|
||||
ops = ops + 1
|
||||
c.blpop("list_" + getrandomkey(), callback=handle_blpop_response)
|
||||
else:
|
||||
c.blpop("list_" + getrandomkey(), callback=handle_blpop_response)
|
||||
|
||||
def handle_set_response(c, resp):
|
||||
def handle_set_response(c, resp=None):
|
||||
global ops
|
||||
if resp != None:
|
||||
ops = ops + 1
|
||||
assert(resp[0] == ord('+'))
|
||||
c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response)
|
||||
|
||||
def handle_del_response(c, resp):
|
||||
def handle_del_response(c, resp=None):
|
||||
global ops
|
||||
if resp != None:
|
||||
ops = ops + 1
|
||||
c.delete("list_" + getrandomkey(), handle_del_response)
|
||||
|
||||
def scan_callback(c, resp):
|
||||
def scan_callback(c, resp=None):
|
||||
global ops
|
||||
nextstart = int(resp[0])
|
||||
c.scan(nextstart, count=500, callback=scan_callback)
|
||||
ops = ops+1
|
||||
|
||||
def unblock_clients(c, resp=None):
|
||||
global clients
|
||||
global ops
|
||||
if resp != None:
|
||||
ops = ops + 1
|
||||
time.sleep(1)
|
||||
client_ids = list(map(lambda x: x.client_id, clients))
|
||||
for id in client_ids:
|
||||
if id:
|
||||
print(f"unblocking client {id}")
|
||||
time.sleep(1)
|
||||
c.unblock(id, unblock_clients)
|
||||
|
||||
def stats_thread():
|
||||
global ops
|
||||
global g_exit
|
||||
@ -192,34 +225,47 @@ def stats_thread():
|
||||
while not g_exit and not (runtime and i > runtime):
|
||||
time.sleep(1)
|
||||
print("Ops per second: " + str(ops))
|
||||
#print(f"Blocked threads: {len(list(filter(lambda x: x.blocked, clients)))}")
|
||||
ops = 0
|
||||
i += 1
|
||||
g_exit = True
|
||||
|
||||
def flush_db_sync():
|
||||
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
server.connect(('127.0.0.1', 6379))
|
||||
server.send(_buildResp("flushdb"))
|
||||
resp = server.recv(8192)
|
||||
assert(resp[:3] == "+OK".encode('utf-8'))
|
||||
|
||||
def init_blocking():
|
||||
global clients
|
||||
for i in range(numclients):
|
||||
c = Client('127.0.0.1', 6379)
|
||||
if i % 2:
|
||||
handle_lpush_response(c, None, delay=1)
|
||||
else:
|
||||
handle_blpop_response(c, None)
|
||||
clients.append(Client('127.0.0.1', 6379))
|
||||
handle_blpop_response(clients[-1], None)
|
||||
|
||||
unblock_client = Client('127.0.0.1', 6379)
|
||||
unblock_clients(unblock_client)
|
||||
|
||||
def init_lpush():
|
||||
global clients
|
||||
for i in range(numclients):
|
||||
c = Client('127.0.0.1', 6379)
|
||||
clients.append(Client('127.0.0.1', 6379))
|
||||
for i in range (10):
|
||||
handle_lpush_response(c, None)
|
||||
handle_lpush_response(c)
|
||||
#handle_set_response(clients[-1], None)
|
||||
|
||||
scan_client = Client('127.0.0.1', 6379)
|
||||
scan_client.scan(0, count=500, callback=scan_callback)
|
||||
|
||||
del_client = Client('127.0.0.1', 6379)
|
||||
handle_del_response(del_client, None)
|
||||
handle_del_response(del_client)
|
||||
|
||||
def main(test):
|
||||
def main(test, flush):
|
||||
global g_exit
|
||||
|
||||
if flush:
|
||||
flush_db_sync()
|
||||
|
||||
try:
|
||||
globals()[f"init_{test}"]()
|
||||
except KeyError:
|
||||
@ -233,10 +279,11 @@ def main(test):
|
||||
print("DONE")
|
||||
|
||||
parser = argparse.ArgumentParser(description="Test use cases for KeyDB.")
|
||||
parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())])
|
||||
parser.add_argument('-c', '--clients', type=int, default=50)
|
||||
parser.add_argument('-k', '--keys', type=int, default=100000)
|
||||
parser.add_argument('-t', '--runtime', type=int, default=0)
|
||||
parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())], help="which test to run")
|
||||
parser.add_argument('-c', '--clients', type=int, default=50, help="number of running clients to use")
|
||||
parser.add_argument('-k', '--keys', type=int, default=100000, help="number of keys to choose from for random tests")
|
||||
parser.add_argument('-t', '--runtime', type=int, default=0, help="how long to run the test for (default: 0 for infinite)")
|
||||
parser.add_argument('-f', '--flush', action="store_true", help="flush the db before running the test")
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
@ -246,4 +293,4 @@ if __name__ == "__main__":
|
||||
numclients = args.clients
|
||||
numkeys = args.keys
|
||||
runtime = args.runtime
|
||||
main(args.test)
|
||||
main(args.test, args.flush)
|
||||
|
Loading…
x
Reference in New Issue
Block a user