diff --git a/monkey/monkey.py b/monkey/monkey.py index 653970728..37e5e4440 100644 --- a/monkey/monkey.py +++ b/monkey/monkey.py @@ -1,21 +1,17 @@ -import keydb import random -import sched, time +import time import socket import asyncore import threading -import sys +import argparse from pprint import pprint -# Parameters -numclients = 50 -#numkeys = 1000000 -numkeys = 100000 - # Globals -ops=0 -s = sched.scheduler(time.time, time.sleep) -g_exit = False +ops = {} +numclients = 0 +numkeys = 0 +runtime = 0 +clients = [] def _buildResp(*args): result = "*" + str(len(args)) + "\r\n" @@ -32,6 +28,8 @@ class Client(asyncore.dispatcher): self.buf = b'' self.inbuf = b'' self.callbacks = list() + self.client_id = 0 + self.get_client_id() def handle_connect(self): pass @@ -55,6 +53,9 @@ class Client(asyncore.dispatcher): endrange = self.inbuf[startpos+1:].find(ord('\r')) + 1 + startpos assert(endrange > 0) numargs = int(self.inbuf[startpos+1:endrange]) + if numargs == -1: # Nil array, used in some returns + startpos = endrange + 2 + return startpos, [] assert(numargs > 0) args = list() startpos = endrange + 2 # plus 1 gets us to the '\n' and the next gets us to the start char @@ -127,10 +128,18 @@ class Client(asyncore.dispatcher): self.buf += _buildResp("lpush", key, val) self.callbacks.append(callback) + def blpop(self, *keys, timeout=0, callback=default_result_handler): + self.buf += _buildResp("blpop", *keys, str(timeout)) + self.callbacks.append(callback) + def delete(self, key, callback = default_result_handler): self.buf += _buildResp("del", key) self.callbacks.append(callback) + def unblock(self, client_id, callback=default_result_handler): + self.buf += _buildResp("client", "unblock", str(client_id)) + self.callbacks.append(callback) + def scan(self, iter, match=None, count=None, callback = default_result_handler): args = ["scan", str(iter)] if match != None: @@ -142,68 +151,133 @@ class Client(asyncore.dispatcher): self.buf += _buildResp(*args) self.callbacks.append(callback) + def get_client_id(self): + self.buf += _buildResp("client", "id") + self.callbacks.append(self.store_client_id) + + def store_client_id(self, c, resp): + assert(resp[0] == ord(':')) + self.client_id = int(resp[1:]) + assert(self.client_id == c.client_id) + def get(self, key, callback = None): return - def getrandomkey(): return str(random.randrange(0, numkeys)) -def handle_lpush_response(c, resp): +def handle_lpush_response(c, resp=None): global ops if resp != None: - ops = ops + 1 + ops['lpush'] += 1 assert(resp[0] == ord(':')) c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response) -def handle_set_response(c, resp): +def handle_blpop_response(c, resp=None): global ops if resp != None: - ops = ops + 1 + ops['blpop'] += 1 + c.blpop("list_" + getrandomkey(), callback=handle_blpop_response) + +def handle_set_response(c, resp=None): + global ops + if resp != None: + ops['set'] += 1 assert(resp[0] == ord('+')) c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response) -def handle_del_response(c, resp): +def handle_del_response(c, resp=None): global ops if resp != None: - ops = ops + 1 + ops['del'] += 1 c.delete("list_" + getrandomkey(), handle_del_response) -def scan_callback(c, resp): +def scan_callback(c, resp=None): global ops nextstart = int(resp[0]) c.scan(nextstart, count=500, callback=scan_callback) - ops = ops+1 + ops['scan'] += 1 + +def clear_ops(): + global ops + ops = {'lpush': 0, 'blpop': 0, 'del': 0, 'scan': 0, 'set': 0, 'get': 0} def stats_thread(): global ops - global g_exit - while not g_exit: + global runtime + i = 0 + while i < runtime or not runtime: time.sleep(1) - print("Ops per second: " + str(ops)) - ops = 0 + print("Ops per second: " + str({k:v for (k,v) in ops.items() if v})) + clear_ops() + i += 1 + asyncore.close_all() -def main(): - global g_exit - clients = [] +def flush_db_sync(): + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.connect(('127.0.0.1', 6379)) + server.send(_buildResp("flushdb")) + resp = server.recv(8192) + assert(resp[:3] == "+OK".encode('utf-8')) +def init_blocking(): + global clients + if numkeys > 100 * numclients: + print("WARNING: High ratio of keys to clients. Most lpushes will not be popped and unblocking will take a long time!") + for i in range(numclients): + clients.append(Client('127.0.0.1', 6379)) + if i % 2: + handle_blpop_response(clients[-1]) + else: + handle_lpush_response(clients[-1]) + +def init_lpush(): + global clients for i in range(numclients): clients.append(Client('127.0.0.1', 6379)) for i in range (10): - handle_lpush_response(clients[-1], None) + handle_lpush_response(clients[-1]) #handle_set_response(clients[-1], None) scan_client = Client('127.0.0.1', 6379) scan_client.scan(0, count=500, callback=scan_callback) del_client = Client('127.0.0.1', 6379) - handle_del_response(del_client, None) + handle_del_response(del_client) + +def main(test, flush): + clear_ops() + + if flush: + flush_db_sync() + + try: + globals()[f"init_{test}"]() + except KeyError: + print(f"Test \"{test}\" not found. Exiting...") + exit() + except ConnectionRefusedError: + print("Could not connect to server. Is it running?") + print("Exiting...") + exit() threading.Thread(target=stats_thread).start() asyncore.loop() - g_exit = True - sys.exit(0) - print("DONE") + print("Done.") + +parser = argparse.ArgumentParser(description="Test use cases for KeyDB.") +parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())], help="which test to run") +parser.add_argument('-c', '--clients', type=int, default=50, help="number of running clients to use") +parser.add_argument('-k', '--keys', type=int, default=100000, help="number of keys to choose from for random tests") +parser.add_argument('-t', '--runtime', type=int, default=0, help="how long to run the test for (default: 0 for infinite)") +parser.add_argument('-f', '--flush', action="store_true", help="flush the db before running the test") if __name__ == "__main__": - main() + try: + args = parser.parse_args() + except: + exit() + numclients = args.clients + numkeys = args.keys + runtime = args.runtime + main(args.test, args.flush) diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index dae36cc06..fe02e5212 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -18,14 +18,15 @@ void AsyncWorkQueue::WorkerThreadMain() vars.clients_pending_asyncwrite = listCreate(); - aeAcquireLock(); + m_mutex.lock(); m_vecpthreadVars.push_back(&vars); - aeReleaseLock(); + m_mutex.unlock(); while (!m_fQuitting) { std::unique_lock lock(m_mutex); - m_cvWakeup.wait(lock); + if (m_workqueue.empty()) + m_cvWakeup.wait(lock); while (!m_workqueue.empty()) { @@ -41,9 +42,11 @@ void AsyncWorkQueue::WorkerThreadMain() lock.unlock(); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); - aeAcquireLock(); - ProcessPendingAsyncWrites(); - aeReleaseLock(); + if (listLength(serverTL->clients_pending_asyncwrite)) { + aeAcquireLock(); + ProcessPendingAsyncWrites(); + aeReleaseLock(); + } g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); serverTL->gcEpoch.reset(); } @@ -60,6 +63,7 @@ bool AsyncWorkQueue::removeClientAsyncWrites(client *c) { bool fFound = false; aeAcquireLock(); + m_mutex.lock(); for (auto pvars : m_vecpthreadVars) { listIter li; @@ -74,6 +78,7 @@ bool AsyncWorkQueue::removeClientAsyncWrites(client *c) } } } + m_mutex.unlock(); aeReleaseLock(); return fFound; } diff --git a/src/IStorage.h b/src/IStorage.h index 69611de13..d5808c4c1 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -1,5 +1,6 @@ #pragma once #include +#include "sds.h" class IStorageFactory { @@ -28,6 +29,14 @@ public: virtual bool enumerate(callback fn) const = 0; virtual size_t count() const = 0; + virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) { + beginWriteBatch(); + for (size_t ielem = 0; ielem < celem; ++ielem) { + insert(rgkeys[ielem], sdslen(rgkeys[ielem]), rgvals[ielem], sdslen(rgvals[ielem]), false); + } + endWriteBatch(); + } + virtual void beginWriteBatch() {} // NOP virtual void endWriteBatch() {} // NOP diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index 7f86b595e..29908c7f5 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -91,6 +91,20 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite); } +void StorageCache::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) +{ + std::unique_lock ul(m_lock); + bulkInsertsInProgress++; + if (m_pdict != nullptr) { + for (size_t ielem = 0; ielem < celem; ++ielem) { + cacheKey(rgkeys[ielem]); + } + } + ul.unlock(); + m_spstorage->bulkInsert(rgkeys, rgvals, celem); + bulkInsertsInProgress--; +} + const StorageCache *StorageCache::clone() { std::unique_lock ul(m_lock); @@ -119,7 +133,7 @@ size_t StorageCache::count() const std::unique_lock ul(m_lock); size_t count = m_spstorage->count(); if (m_pdict != nullptr) { - serverAssert(count == (dictSize(m_pdict) + m_collisionCount)); + serverAssert(bulkInsertsInProgress.load(std::memory_order_seq_cst) || count == (dictSize(m_pdict) + m_collisionCount)); } return count; } diff --git a/src/StorageCache.h b/src/StorageCache.h index c2170b7d0..ed868e74b 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -7,6 +7,7 @@ class StorageCache dict *m_pdict = nullptr; int m_collisionCount = 0; mutable fastlock m_lock {"StorageCache"}; + std::atomic bulkInsertsInProgress; StorageCache(IStorage *storage, bool fNoCache); @@ -37,6 +38,7 @@ public: void clear(); void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); + void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem); void retrieve(sds key, IStorage::callbackSingle fn) const; bool erase(sds key); @@ -50,4 +52,4 @@ public: size_t count() const; const StorageCache *clone(); -}; \ No newline at end of file +}; diff --git a/src/dict.cpp b/src/dict.cpp index 57872b5c4..d16fd77f7 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -535,7 +535,8 @@ dictAsyncRehashCtl::~dictAsyncRehashCtl() { while (deGCList != nullptr) { auto next = deGCList->next; dictFreeKey(dict, deGCList); - dictFreeVal(dict, deGCList); + if (deGCList->v.val != nullptr) + dictFreeVal(dict, deGCList); zfree(deGCList); deGCList = next; } @@ -694,6 +695,8 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { d->ht[table].table[idx] = he->next; if (!nofree) { if (table == 0 && d->asyncdata != nullptr && (ssize_t)idx < d->rehashidx) { + dictFreeVal(d, he); + he->v.val = nullptr; he->next = d->asyncdata->deGCList; d->asyncdata->deGCList = he; } else { @@ -752,6 +755,8 @@ void dictFreeUnlinkedEntry(dict *d, dictEntry *he) { if (he == NULL) return; if (d->asyncdata) { + dictFreeVal(d, he); + he->v.val = nullptr; he->next = d->asyncdata->deGCList; d->asyncdata->deGCList = he; } else { @@ -775,6 +780,8 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { while(he) { nextHe = he->next; if (d->asyncdata && (ssize_t)i < d->rehashidx) { + dictFreeVal(d, he); + he->v.val = nullptr; he->next = d->asyncdata->deGCList; d->asyncdata->deGCList = he; } else { diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 1d6f943b4..a598c2a35 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -38,6 +38,17 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, ++m_count; } +void RocksDBStorageProvider::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) +{ + auto spbatch = std::make_unique(); + for (size_t ielem = 0; ielem < celem; ++ielem) { + spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(rgkeys[ielem], sdslen(rgkeys[ielem])), rocksdb::Slice(rgvals[ielem], sdslen(rgvals[ielem]))); + } + m_spdb->Write(WriteOptions(), spbatch.get()); + std::unique_lock l(m_lock); + m_count += celem; +} + bool RocksDBStorageProvider::erase(const char *key, size_t cchKey) { rocksdb::Status status; diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index 4a0665b6e..48be6984f 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -34,6 +34,8 @@ public: virtual void beginWriteBatch() override; virtual void endWriteBatch() override; + virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) override; + virtual void batch_lock() override; virtual void batch_unlock() override; diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 4ee6fdbdb..db18e7128 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -395,7 +395,7 @@ start_server {tags {"defrag"} overrides {appendonly yes auto-aof-rewrite-percent # if the current slab is lower in utilization the defragger would have ended up in stagnation, # keept running and not move any allocation. # this test is more consistent on a fresh server with no history - start_server {tags {"defrag"} overrides {save ""}} { + start_server {tags {"defrag"} overrides {save "" server-threads 1}} { r flushdb r config resetstat r config set hz 100