Merge branch 'keydbpro' into 'flash_read_perf'

# Conflicts:
#   src/db.cpp
#   src/networking.cpp

Former-commit-id: e16c846b2b9e70f20981172287b19e585f81d73d
This commit is contained in:
jsully 2021-04-01 17:25:31 +00:00
commit c6cea9d84f
11 changed files with 128 additions and 22 deletions

View File

@ -15,7 +15,7 @@
release_hdr := $(shell sh -c './mkreleasehdr.sh')
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not')
OPTIMIZATION?=-O2
OPTIMIZATION?=-O2 -flto
DEPENDENCY_TARGETS=hiredis linenoise lua rocksdb
NODEPS:=clean distclean

View File

@ -35,6 +35,11 @@
#include <signal.h>
#include <ctype.h>
// Needed for prefetch
#if defined(__x86_64__) || defined(__i386__)
#include <xmmintrin.h>
#endif
/* Database backup. */
struct dbBackup {
const redisDbPersistentDataSnapshot **dbarray;
@ -2581,6 +2586,8 @@ void redisDbPersistentData::updateValue(dict_iter itr, robj *val)
void redisDbPersistentData::ensure(const char *key)
{
if (m_pdbSnapshot == nullptr && m_spstorage == nullptr)
return;
dictEntry *de = dictFind(m_pdict, key);
ensure(key, &de);
}
@ -3000,8 +3007,30 @@ int dbnumFromDb(redisDb *db)
bool redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command, bool fExecOK)
{
if (m_spstorage == nullptr)
return false;
if (m_spstorage == nullptr) {
#if defined(__x86_64__) || defined(__i386__)
// We do a quick 'n dirty check for set & get. Anything else is too slow.
// Should the user do something weird like remap them then the worst that will
// happen is we don't prefetch or we prefetch wrong data. A mild perf hit, but
// not dangerous
if (command.argc >= 2) {
const char *cmd = szFromObj(command.argv[0]);
if (!strcasecmp(cmd, "set") || !strcasecmp(cmd, "get")) {
auto h = dictSdsHash(szFromObj(command.argv[1]));
for (int iht = 0; iht < 2; ++iht) {
auto hT = h & c->db->m_pdict->ht[iht].sizemask;
dictEntry **table;
__atomic_load(&c->db->m_pdict->ht[iht].table, &table, __ATOMIC_RELAXED);
if (table != nullptr)
_mm_prefetch(table[hT], _MM_HINT_T2);
if (!dictIsRehashing(c->db->m_pdict))
break;
}
}
}
#endif
return;
}
AeLocker lock;

View File

@ -74,6 +74,8 @@ static int _dictInit(dict *ht, dictType *type, void *privDataPtr);
static uint8_t dict_hash_function_seed[16];
extern "C" void asyncFreeDictTable(dictEntry **de);
void dictSetHashFunctionSeed(uint8_t *seed) {
memcpy(dict_hash_function_seed,seed,sizeof(dict_hash_function_seed));
}
@ -359,7 +361,7 @@ int dictRehash(dict *d, int n) {
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
asyncFreeDictTable(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
@ -487,7 +489,7 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) {
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0 && d->asyncdata == nullptr) {
zfree(d->ht[0].table);
asyncFreeDictTable(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
@ -544,7 +546,7 @@ int dictRehashMilliseconds(dict *d, int ms) {
static void _dictRehashStep(dict *d) {
unsigned iterators;
__atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED);
if (iterators == 0) dictRehash(d,2);
if (iterators == 0) dictRehash(d,1);
}
/* Add an element to the target hash table */
@ -762,7 +764,7 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
}
}
/* Free the table and the allocated cache structure */
zfree(ht->table);
asyncFreeDictTable(ht->table);
/* Re-initialize the table */
_dictReset(ht);
return DICT_OK; /* never fails */

View File

@ -52,7 +52,7 @@ public:
void endEpoch(uint64_t epoch, bool fNoFree = false)
{
std::unique_lock<fastlock> lock(m_lock);
assert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end());
serverAssert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end());
bool fMinElement = *std::min_element(m_setepochOutstanding.begin(), m_setepochOutstanding.end());
m_setepochOutstanding.erase(epoch);
if (fNoFree)
@ -91,8 +91,8 @@ public:
void enqueue(uint64_t epoch, std::unique_ptr<T> &&sp)
{
std::unique_lock<fastlock> lock(m_lock);
assert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end());
assert(sp->FWillFreeChildDebug() == false);
serverAssert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end());
serverAssert(sp->FWillFreeChildDebug() == false);
auto itr = std::find(m_vecepochs.begin(), m_vecepochs.end(), m_epochNext+1);
if (itr == m_vecepochs.end())

View File

@ -312,6 +312,16 @@ int prepareClientToWrite(client *c) {
* Low level functions to add more data to output buffers.
* -------------------------------------------------------------------------- */
void _clientAsyncReplyBufferReserve(client *c, size_t len) {
if (c->replyAsync != nullptr)
return;
size_t newsize = std::max(len, (size_t)PROTO_ASYNC_REPLY_CHUNK_BYTES);
clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize);
replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock);
replyNew->used = 0;
c->replyAsync = replyNew;
}
/* Attempts to add the reply to the static buffer in the client struct.
* Returns C_ERR if the buffer is full, or the reply list is not empty,
* in which case the reply must be added to the reply list. */
@ -1662,7 +1672,7 @@ int writeToClient(client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
clientReplyBlock *o;
AssertCorrectThread(c);
serverAssertDebug(FCorrectThread(c));
std::unique_lock<decltype(c->lock)> lock(c->lock);
@ -1881,7 +1891,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
processed += (int)vec.size();
for (client *c : vec) {
AssertCorrectThread(c);
serverAssertDebug(FCorrectThread(c));
uint64_t flags = c->flags.fetch_and(~CLIENT_PENDING_WRITE, std::memory_order_relaxed);
@ -2358,8 +2368,8 @@ void parseClientCommandBuffer(client *c) {
serverAssert(c->vecqueuedcmd.back().reploff >= 0);
}
/* Prefetch if we have a storage provider and we're not in the global lock */
if (cqueriesStart < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) {
/* Prefetch outside the lock for better perf */
if (cqueries < c->vecqueuedcmd.size() && !GlobalLocksAcquired()) {
auto &query = c->vecqueuedcmd.back();
if (query.argc > 0 && query.argc == query.argcMax) {
if (c->db->prefetchKeysAsync(c, query, c->vecqueuedcmd.size() == 1)) {
@ -2430,8 +2440,8 @@ void readQueryFromClient(connection *conn) {
int nread, readlen;
size_t qblen;
serverAssert(FCorrectThread(c));
serverAssert(!GlobalLocksAcquired());
serverAssertDebug(FCorrectThread(c) sdfsdf);
serverAssertDebug(!GlobalLocksAcquired());
AeLocker aelock;
AssertCorrectThread(c);

View File

@ -813,6 +813,10 @@ static int ipow(int base, int exp) {
return result;
}
extern "C" void asyncFreeDictTable(dictEntry **de) {
zfree(de);
}
static void showLatencyReport(void) {
int i, curlat = 0;
int usbetweenlat = ipow(10, MAX_LATENCY_PRECISION-config.precision);

View File

@ -144,6 +144,11 @@ static void cliRefreshPrompt(void) {
sdsfree(prompt);
}
struct dictEntry;
void asyncFreeDictTable(struct dictEntry **de) {
zfree(de);
}
/* Return the name of the dotfile for the specified 'dotfilename'.
* Normally it just concatenates user $HOME to the file specified
* in 'dotfilename'. However if the environment variable 'envoverride'

View File

@ -4449,6 +4449,7 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long
sdsfree(szFromObj(&objTtl));
}
void _clientAsyncReplyBufferReserve(client *c, size_t len);
void flushReplBacklogToClients()
{
serverAssert(GlobalLocksAcquired());
@ -4486,6 +4487,8 @@ void flushReplBacklogToClients()
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy);
} else {
auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart;
if (fAsyncWrite)
_clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx);
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1);
addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart));

View File

@ -2485,6 +2485,15 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
return 1000/g_pserver->hz;
}
extern "C" void asyncFreeDictTable(dictEntry **de)
{
if (de == nullptr || serverTL == nullptr || serverTL->gcEpoch.isReset()) {
zfree(de);
} else {
g_pserver->garbageCollector.enqueueCPtr(serverTL->gcEpoch, de);
}
}
extern int ProcessingEventsWhileBlocked;
void processClients();

View File

@ -1786,6 +1786,25 @@ class GarbageCollectorCollection
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollectorSnapshot;
GarbageCollector<ICollectable> garbageCollectorGeneric;
class CPtrCollectable : public ICollectable
{
void *m_pv;
public:
CPtrCollectable(void *pv)
: m_pv(pv)
{}
CPtrCollectable(CPtrCollectable &&move) {
m_pv = move.m_pv;
move.m_pv = nullptr;
}
virtual ~CPtrCollectable() {
zfree(m_pv);
}
};
public:
struct Epoch
{
@ -1797,6 +1816,20 @@ public:
epochGeneric = 0;
}
Epoch() = default;
Epoch (const Epoch &other) {
epochSnapshot = other.epochSnapshot;
epochGeneric = other.epochGeneric;
}
Epoch &operator=(const Epoch &other) {
serverAssert(isReset());
epochSnapshot = other.epochSnapshot;
epochGeneric = other.epochGeneric;
return *this;
}
bool isReset() const {
return epochSnapshot == 0 && epochGeneric == 0;
}
@ -1810,10 +1843,13 @@ public:
return e;
}
void endEpoch(Epoch e, bool fNoFree = false)
void endEpoch(Epoch &e, bool fNoFree = false)
{
garbageCollectorSnapshot.endEpoch(e.epochSnapshot, fNoFree);
garbageCollectorGeneric.endEpoch(e.epochGeneric, fNoFree);
auto epochSnapshot = e.epochSnapshot;
auto epochGeneric = e.epochGeneric;
e.reset(); // We must do this early as GC'd dtors can themselves try to enqueue more data
garbageCollectorSnapshot.endEpoch(epochSnapshot, fNoFree);
garbageCollectorGeneric.endEpoch(epochGeneric, fNoFree);
}
void shutdown()
@ -1831,6 +1867,13 @@ public:
{
garbageCollectorGeneric.enqueue(e.epochGeneric, std::move(sp));
}
template<typename T>
void enqueueCPtr(Epoch e, T p)
{
auto sp = std::make_unique<CPtrCollectable>(reinterpret_cast<void*>(p));
enqueue(e, std::move(sp));
}
};
// Per-thread variabels that may be accessed without a lock

View File

@ -154,7 +154,7 @@ start_server {tags {"defrag"} overrides {server-threads 1} } {
$rd read ; # Discard replies
}
set expected_frag 1.7
set expected_frag 1.5
if {$::accurate} {
# scale the hash to 1m fields in order to have a measurable the latency
for {set j 10000} {$j < 1000000} {incr j} {
@ -265,7 +265,7 @@ start_server {tags {"defrag"} overrides {server-threads 1} } {
# create big keys with 10k items
set rd [redis_deferring_client]
set expected_frag 1.7
set expected_frag 1.5
# add a mass of list nodes to two lists (allocations are interlaced)
set val [string repeat A 100] ;# 5 items of 100 bytes puts us in the 640 bytes bin, which has 32 regs, so high potential for fragmentation
set elements 500000
@ -544,3 +544,4 @@ start_server {tags {"defrag"} overrides {server-threads 1 active-replica yes} }
}
}
} ;# run solo