futriix/src/dict.h
uriyage 94bc15cb71
Io thread work offload (#763)
### IO-Threads Work Offloading 

This PR is the 2nd of 3 PRs intended to achieve the goal of 1M requests
per second.
(1st PR: https://github.com/valkey-io/valkey/pull/758)

This PR offloads additional work to the I/O threads, beyond the current
read-parse/write operations, to better utilize the I/O threads and
reduce the load on the main thread.

It contains the following 3 commits:

### Poll Offload

Currently, the main thread is responsible for executing the poll-wait
system call, while the IO threads wait for tasks from the main thread.
The poll-wait operation is expensive and can consume up to 30% of the
main thread's time. We could have let the IO threads do the poll-wait by
themselves, with each thread listening to some of the clients and
notifying the main thread when a client's command is ready to execute.

However, the current approach, where the main thread listens for events
from the network, has several benefits. The main thread remains in
charge, allowing it to know the state of each client
(idle/read/write/close) at any given time. Additionally, it makes the
threads flexible, enabling us to drain an IO thread's job queue and stop
a thread when the load is light without modifying the event loop and
moving its clients to a different IO thread. Furthermore, with this
approach, the IO threads don't need to wait for both messages from the
network and from the main thread instead, the threads wait only for
tasks from the main thread.

To enjoy the benefits of both the main thread remaining in charge and
the poll being offloaded, we propose offloading the poll-wait as a
single-time, non-blocking job to one of the IO threads. The IO thread
will perform a poll-wait non-blocking call while the main thread
processes the client commands. Later, in `aeProcessEvents`, instead of
sleeping on the poll, we check for the IO thread's poll-wait results.

The poll-wait will be offloaded in `beforeSleep` only when there are
ready events for the main thread to process. If no events are pending,
the main thread will revert to the current behavior and sleep on the
poll by itself.

**Implementation Details**

A new call back `custompoll` was added to the `aeEventLoop` when not set
to `NULL` the ae will call the `custompoll` callback instead of the
`aeApiPoll`.

When the poll is offloaded we will set the `custompoll` to
`getIOThreadPollResults` and send a poll-job to the thread. the thread
will take a mutex, call a non-blocking (with timeout 0) to `aePoll`
which will populate the fired events array. the IO thread will set the
`server.io_fired_events` to the number of the returning `numevents`,
later the main-thread in `custompoll` will return the
`server.io_fired_events` and will set the `customPoll` back to `NULL`.

To ensure thread safety when accessing server.el, all functions that
modify the eventloop events were wrapped with a mutex to ensure mutual
exclusion when modifying the events.

### Command Lookup Offload

As the IO thread parses the command from the client's Querybuf, it can
perform a command lookup in the commands dictionary, which can consume
up to ~5% of the main-thread runtime.

**Implementation details**
The IO thread will store the looked-up command in the client's new field
`io_parsed_cmd` field. We can't use `c->cmd` for that since we use
`c->cmd `to check if a command was reprocessed or not.

To ensure thread safety when accessing the command dictionary, we make
sure the main thread isn't changing the dictionary while IO threads are
accessing it. This is accomplished by introducing a new flag called
`no_incremental_rehash` for the `dictType` commands. When performing
`dictResize`, we will rehash the entire dictionary in place rather than
deferring the process.

### Free Offload

Since the command arguments are allocated by the I/O thread, it would be
beneficial if they were also freed by the same thread. If the main
thread frees objects allocated by the I/O thread, two issues arise:

1. During the freeing process, the main thread needs to access the SDS
pointed to by the object to get its length.
2. With Jemalloc, each thread manages thread local pool (`tcache`) of
buffers for quick reallocation without accessing the arena. If the main
thread constantly frees objects allocated by other threads, those
threads will have to frequently access the shared arena to obtain new
memory allocations

**Implementation Details**
When freeing the client's argv, we will send the argv array to the
thread that allocated it. The thread will be identified by the client
ID. When freeing an object during `dbOverwrite`, we will offload the
object free as well. We will extend this to offload the free during
`dbDelete` in a future PR, as its effects on defrag/memory evictions
need to be studied.

---------

Signed-off-by: Uri Yagelnik <uriy@amazon.com>
2024-07-18 19:21:45 -07:00

261 lines
12 KiB
C

/* Hash Tables Implementation.
*
* This file implements in-memory hash tables with insert/del/replace/find/
* get-random-element operations. Hash tables will auto-resize if needed
* tables of power of two in size are used, collisions are handled by
* chaining. See the source code for more information... :)
*
* Copyright (c) 2006-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __DICT_H
#define __DICT_H
#include "mt19937-64.h"
#include <limits.h>
#include <stdint.h>
#include <stdlib.h>
#define DICT_OK 0
#define DICT_ERR 1
/* Hash table parameters */
#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */
typedef struct dictEntry dictEntry; /* opaque */
typedef struct dict dict;
typedef struct dictType {
/* Callbacks */
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(dict *d, const void *key);
int (*keyCompare)(dict *d, const void *key1, const void *key2);
void (*keyDestructor)(dict *d, void *key);
void (*valDestructor)(dict *d, void *obj);
int (*resizeAllowed)(size_t moreMem, double usedRatio);
/* Invoked at the start of dict initialization/rehashing (old and new ht are already created) */
void (*rehashingStarted)(dict *d);
/* Invoked at the end of dict initialization/rehashing of all the entries from old to new ht. Both ht still exists
* and are cleaned up after this callback. */
void (*rehashingCompleted)(dict *d);
/* Allow a dict to carry extra caller-defined metadata. The
* extra memory is initialized to 0 when a dict is allocated. */
size_t (*dictMetadataBytes)(dict *d);
/* Method for copying a given key into a buffer of buf_len. Also used for
* computing the length of the key + header when buf is NULL. */
size_t (*embedKey)(unsigned char *buf, size_t buf_len, const void *key, unsigned char *header_size);
/* Data */
void *userdata;
/* Flags */
/* The 'no_value' flag, if set, indicates that values are not used, i.e. the
* dict is a set. When this flag is set, it's not possible to access the
* value of a dictEntry and it's also impossible to use dictSetKey(). Entry
* metadata can also not be used. */
unsigned int no_value : 1;
/* If no_value = 1 and all keys are odd (LSB=1), setting keys_are_odd = 1
* enables one more optimization: to store a key without an allocated
* dictEntry. */
unsigned int keys_are_odd : 1;
/* If embedded_entry flag is set, it indicates that a copy of the key is created and the key is embedded
* as part of the dict entry. */
unsigned int embedded_entry : 1;
/* Perform rehashing during resizing instead of incrementally rehashing across multiple steps */
unsigned int no_incremental_rehash : 1;
} dictType;
#define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1 << (exp))
#define DICTHT_SIZE_MASK(exp) ((exp) == -1 ? 0 : (DICTHT_SIZE(exp)) - 1)
struct dict {
dictType *type;
dictEntry **ht_table[2];
unsigned long ht_used[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
/* Keep small vars at end for optimal (minimal) struct padding */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
int16_t pauseAutoResize; /* If >0 automatic resizing is disallowed (<0 indicates coding error) */
void *metadata[];
};
/* If safe is set to 1 this is a safe iterator, that means, you can call
* dictAdd, dictFind, and other functions against the dictionary even while
* iterating. Otherwise it is a non safe iterator, and only dictNext()
* should be called while iterating. */
typedef struct dictIterator {
dict *d;
long index;
int table, safe;
dictEntry *entry, *nextEntry;
/* unsafe iterator fingerprint for misuse detection. */
unsigned long long fingerprint;
} dictIterator;
typedef struct dictStats {
int htidx;
unsigned long buckets;
unsigned long maxChainLen;
unsigned long totalChainLen;
unsigned long htSize;
unsigned long htUsed;
unsigned long *clvector;
} dictStats;
typedef void(dictScanFunction)(void *privdata, const dictEntry *de);
typedef void *(dictDefragAllocFunction)(void *ptr);
typedef void(dictDefragEntryCb)(void *privdata, void *ptr);
typedef struct {
dictDefragAllocFunction *defragAlloc; /* Used for entries etc. */
dictDefragAllocFunction *defragKey; /* Defrag-realloc keys (optional) */
dictDefragAllocFunction *defragVal; /* Defrag-realloc values (optional) */
dictDefragEntryCb *defragEntryStartCb; /* Callback invoked prior to the start of defrag of dictEntry. */
dictDefragEntryCb *defragEntryFinishCb; /* Callback invoked after the defrag of dictEntry is tried. */
} dictDefragFunctions;
/* This is the initial size of every hash table */
#define DICT_HT_INITIAL_EXP 2
#define DICT_HT_INITIAL_SIZE (1 << (DICT_HT_INITIAL_EXP))
/* ------------------------------- Macros ------------------------------------*/
#define dictFreeVal(d, entry) \
do { \
if ((d)->type->valDestructor) (d)->type->valDestructor((d), dictGetVal(entry)); \
} while (0)
#define dictFreeKey(d, entry) \
if ((d)->type->keyDestructor) (d)->type->keyDestructor((d), dictGetKey(entry))
#define dictCompareKeys(d, key1, key2) \
(((d)->type->keyCompare) ? (d)->type->keyCompare((d), key1, key2) : (key1) == (key2))
#define dictMetadata(d) (&(d)->metadata)
#define dictMetadataSize(d) ((d)->type->dictMetadataBytes ? (d)->type->dictMetadataBytes(d) : 0)
#define dictHashKey(d, key) ((d)->type->hashFunction(key))
#define dictBuckets(d) (DICTHT_SIZE((d)->ht_size_exp[0]) + DICTHT_SIZE((d)->ht_size_exp[1]))
#define dictSize(d) ((d)->ht_used[0] + (d)->ht_used[1])
#define dictIsEmpty(d) ((d)->ht_used[0] == 0 && (d)->ht_used[1] == 0)
#define dictIsRehashing(d) ((d)->rehashidx != -1)
#define dictPauseRehashing(d) ((d)->pauserehash++)
#define dictResumeRehashing(d) ((d)->pauserehash--)
#define dictIsRehashingPaused(d) ((d)->pauserehash > 0)
#define dictPauseAutoResize(d) ((d)->pauseAutoResize++)
#define dictResumeAutoResize(d) ((d)->pauseAutoResize--)
/* If our unsigned long type can store a 64 bit number, use a 64 bit PRNG. */
#if ULONG_MAX >= 0xffffffffffffffff
#define randomULong() ((unsigned long)genrand64_int64())
#else
#define randomULong() random()
#endif
typedef enum {
DICT_RESIZE_ENABLE,
DICT_RESIZE_AVOID,
DICT_RESIZE_FORBID,
} dictResizeEnable;
/* API */
dict *dictCreate(dictType *type);
int dictExpand(dict *d, unsigned long size);
int dictTryExpand(dict *d, unsigned long size);
int dictShrink(dict *d, unsigned long size);
int dictAdd(dict *d, void *key, void *val);
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing);
void *dictFindPositionForInsert(dict *d, const void *key, dictEntry **existing);
dictEntry *dictInsertAtPosition(dict *d, void *key, void *position);
dictEntry *dictAddOrFind(dict *d, void *key);
int dictReplace(dict *d, void *key, void *val);
int dictDelete(dict *d, const void *key);
dictEntry *dictUnlink(dict *d, const void *key);
void dictFreeUnlinkedEntry(dict *d, dictEntry *he);
dictEntry *dictTwoPhaseUnlinkFind(dict *d, const void *key, dictEntry ***plink, int *table_index);
void dictTwoPhaseUnlinkFree(dict *d, dictEntry *he, dictEntry **plink, int table_index);
void dictRelease(dict *d);
dictEntry *dictFind(dict *d, const void *key);
void *dictFetchValue(dict *d, const void *key);
int dictShrinkIfNeeded(dict *d);
int dictExpandIfNeeded(dict *d);
void dictSetKey(dict *d, dictEntry *de, void *key);
void dictSetVal(dict *d, dictEntry *de, void *val);
void dictSetSignedIntegerVal(dictEntry *de, int64_t val);
void dictSetUnsignedIntegerVal(dictEntry *de, uint64_t val);
void dictSetDoubleVal(dictEntry *de, double val);
int64_t dictIncrSignedIntegerVal(dictEntry *de, int64_t val);
uint64_t dictIncrUnsignedIntegerVal(dictEntry *de, uint64_t val);
double dictIncrDoubleVal(dictEntry *de, double val);
void *dictGetKey(const dictEntry *de);
void *dictGetVal(const dictEntry *de);
int64_t dictGetSignedIntegerVal(const dictEntry *de);
uint64_t dictGetUnsignedIntegerVal(const dictEntry *de);
double dictGetDoubleVal(const dictEntry *de);
double *dictGetDoubleValPtr(dictEntry *de);
size_t dictMemUsage(const dict *d);
size_t dictEntryMemUsage(dictEntry *de);
dictIterator *dictGetIterator(dict *d);
dictIterator *dictGetSafeIterator(dict *d);
void dictInitIterator(dictIterator *iter, dict *d);
void dictInitSafeIterator(dictIterator *iter, dict *d);
void dictResetIterator(dictIterator *iter);
dictEntry *dictNext(dictIterator *iter);
void dictReleaseIterator(dictIterator *iter);
dictEntry *dictGetRandomKey(dict *d);
dictEntry *dictGetFairRandomKey(dict *d);
unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count);
void dictGetStats(char *buf, size_t bufsize, dict *d, int full);
uint64_t dictGenHashFunction(const void *key, size_t len);
uint64_t dictGenCaseHashFunction(const unsigned char *buf, size_t len);
void dictEmpty(dict *d, void(callback)(dict *));
void dictSetResizeEnabled(dictResizeEnable enable);
int dictRehash(dict *d, int n);
int dictRehashMicroseconds(dict *d, uint64_t us);
void dictSetHashFunctionSeed(uint8_t *seed);
uint8_t *dictGetHashFunctionSeed(void);
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size);
size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full);
dictStats *dictGetStatsHt(dict *d, int htidx, int full);
void dictCombineStats(dictStats *from, dictStats *into);
void dictFreeStats(dictStats *stats);
#ifdef SERVER_TEST
int dictTest(int argc, char *argv[], int flags);
#endif
#endif /* __DICT_H */