Merge branch 'multithread_load' into 'keydbpro'

Multithread load

See merge request external-collab/keydb-pro-6!5

Former-commit-id: 20e712244071028b0f75ccad477308efd139261f
This commit is contained in:
jsully 2021-10-08 17:55:55 +00:00
commit 5e4dec1a16
21 changed files with 5304 additions and 125 deletions

View File

@ -0,0 +1,583 @@
// Provides an efficient blocking version of moodycamel::ConcurrentQueue.
// ©2015-2020 Cameron Desrochers. Distributed under the terms of the simplified
// BSD license, available at the top of concurrentqueue.h.
// Also dual-licensed under the Boost Software License (see LICENSE.md)
// Uses Jeff Preshing's semaphore implementation (under the terms of its
// separate zlib license, see lightweightsemaphore.h).
#pragma once
#include "concurrentqueue.h"
#include "lightweightsemaphore.h"
#include <type_traits>
#include <cerrno>
#include <memory>
#include <chrono>
#include <ctime>
namespace moodycamel
{
// This is a blocking version of the queue. It has an almost identical interface to
// the normal non-blocking version, with the addition of various wait_dequeue() methods
// and the removal of producer-specific dequeue methods.
template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
class BlockingConcurrentQueue
{
private:
typedef ::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
typedef ::moodycamel::LightweightSemaphore LightweightSemaphore;
public:
typedef typename ConcurrentQueue::producer_token_t producer_token_t;
typedef typename ConcurrentQueue::consumer_token_t consumer_token_t;
typedef typename ConcurrentQueue::index_t index_t;
typedef typename ConcurrentQueue::size_t size_t;
typedef typename std::make_signed<size_t>::type ssize_t;
static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
public:
// Creates a queue with at least `capacity` element slots; note that the
// actual number of elements that can be inserted without additional memory
// allocation depends on the number of producers and the block size (e.g. if
// the block size is equal to `capacity`, only a single block will be allocated
// up-front, which means only a single producer will be able to enqueue elements
// without an extra allocation -- blocks aren't shared between producers).
// This method is not thread safe -- it is up to the user to ensure that the
// queue is fully constructed before it starts being used by other threads (this
// includes making the memory effects of construction visible, possibly with a
// memory barrier).
explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
: inner(capacity), sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
{
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
if (!sema) {
MOODYCAMEL_THROW(std::bad_alloc());
}
}
BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
: inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
{
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
if (!sema) {
MOODYCAMEL_THROW(std::bad_alloc());
}
}
// Disable copying and copy assignment
BlockingConcurrentQueue(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
BlockingConcurrentQueue& operator=(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
// Moving is supported, but note that it is *not* a thread-safe operation.
// Nobody can use the queue while it's being moved, and the memory effects
// of that move must be propagated to other threads before they can use it.
// Note: When a queue is moved, its tokens are still valid but can only be
// used with the destination queue (i.e. semantically they are moved along
// with the queue itself).
BlockingConcurrentQueue(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
: inner(std::move(other.inner)), sema(std::move(other.sema))
{ }
inline BlockingConcurrentQueue& operator=(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
{
return swap_internal(other);
}
// Swaps this queue's state with the other's. Not thread-safe.
// Swapping two queues does not invalidate their tokens, however
// the tokens that were created for one queue must be used with
// only the swapped queue (i.e. the tokens are tied to the
// queue's movable state, not the object itself).
inline void swap(BlockingConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
{
swap_internal(other);
}
private:
BlockingConcurrentQueue& swap_internal(BlockingConcurrentQueue& other)
{
if (this == &other) {
return *this;
}
inner.swap(other.inner);
sema.swap(other.sema);
return *this;
}
public:
// Enqueues a single item (by copying it).
// Allocates memory if required. Only fails if memory allocation fails (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(T const& item)
{
if ((details::likely)(inner.enqueue(item))) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by moving it, if possible).
// Allocates memory if required. Only fails if memory allocation fails (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(T&& item)
{
if ((details::likely)(inner.enqueue(std::move(item)))) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by copying it) using an explicit producer token.
// Allocates memory if required. Only fails if memory allocation fails (or
// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(producer_token_t const& token, T const& item)
{
if ((details::likely)(inner.enqueue(token, item))) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by moving it, if possible) using an explicit producer token.
// Allocates memory if required. Only fails if memory allocation fails (or
// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(producer_token_t const& token, T&& item)
{
if ((details::likely)(inner.enqueue(token, std::move(item)))) {
sema->signal();
return true;
}
return false;
}
// Enqueues several items.
// Allocates memory if required. Only fails if memory allocation fails (or
// implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
// is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Note: Use std::make_move_iterator if the elements should be moved instead of copied.
// Thread-safe.
template<typename It>
inline bool enqueue_bulk(It itemFirst, size_t count)
{
if ((details::likely)(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
return true;
}
return false;
}
// Enqueues several items using an explicit producer token.
// Allocates memory if required. Only fails if memory allocation fails
// (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Note: Use std::make_move_iterator if the elements should be moved
// instead of copied.
// Thread-safe.
template<typename It>
inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
if ((details::likely)(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
return true;
}
return false;
}
// Enqueues a single item (by copying it).
// Does not allocate memory. Fails if not enough room to enqueue (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
// is 0).
// Thread-safe.
inline bool try_enqueue(T const& item)
{
if (inner.try_enqueue(item)) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by moving it, if possible).
// Does not allocate memory (except for one-time implicit producer).
// Fails if not enough room to enqueue (or implicit production is
// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
// Thread-safe.
inline bool try_enqueue(T&& item)
{
if (inner.try_enqueue(std::move(item))) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by copying it) using an explicit producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Thread-safe.
inline bool try_enqueue(producer_token_t const& token, T const& item)
{
if (inner.try_enqueue(token, item)) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by moving it, if possible) using an explicit producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Thread-safe.
inline bool try_enqueue(producer_token_t const& token, T&& item)
{
if (inner.try_enqueue(token, std::move(item))) {
sema->signal();
return true;
}
return false;
}
// Enqueues several items.
// Does not allocate memory (except for one-time implicit producer).
// Fails if not enough room to enqueue (or implicit production is
// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
// Note: Use std::make_move_iterator if the elements should be moved
// instead of copied.
// Thread-safe.
template<typename It>
inline bool try_enqueue_bulk(It itemFirst, size_t count)
{
if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
return true;
}
return false;
}
// Enqueues several items using an explicit producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Note: Use std::make_move_iterator if the elements should be moved
// instead of copied.
// Thread-safe.
template<typename It>
inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
return true;
}
return false;
}
// Attempts to dequeue from the queue.
// Returns false if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename U>
inline bool try_dequeue(U& item)
{
if (sema->tryWait()) {
while (!inner.try_dequeue(item)) {
continue;
}
return true;
}
return false;
}
// Attempts to dequeue from the queue using an explicit consumer token.
// Returns false if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename U>
inline bool try_dequeue(consumer_token_t& token, U& item)
{
if (sema->tryWait()) {
while (!inner.try_dequeue(token, item)) {
continue;
}
return true;
}
return false;
}
// Attempts to dequeue several elements from the queue.
// Returns the number of items actually dequeued.
// Returns 0 if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename It>
inline size_t try_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
return count;
}
// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued.
// Returns 0 if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename It>
inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
return count;
}
// Blocks the current thread until there's something to dequeue, then
// dequeues it.
// Never allocates. Thread-safe.
template<typename U>
inline void wait_dequeue(U& item)
{
while (!sema->wait()) {
continue;
}
while (!inner.try_dequeue(item)) {
continue;
}
}
// Blocks the current thread until either there's something to dequeue
// or the timeout (specified in microseconds) expires. Returns false
// without setting `item` if the timeout expires, otherwise assigns
// to `item` and returns true.
// Using a negative timeout indicates an indefinite timeout,
// and is thus functionally equivalent to calling wait_dequeue.
// Never allocates. Thread-safe.
template<typename U>
inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
{
if (!sema->wait(timeout_usecs)) {
return false;
}
while (!inner.try_dequeue(item)) {
continue;
}
return true;
}
// Blocks the current thread until either there's something to dequeue
// or the timeout expires. Returns false without setting `item` if the
// timeout expires, otherwise assigns to `item` and returns true.
// Never allocates. Thread-safe.
template<typename U, typename Rep, typename Period>
inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
{
return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
}
// Blocks the current thread until there's something to dequeue, then
// dequeues it using an explicit consumer token.
// Never allocates. Thread-safe.
template<typename U>
inline void wait_dequeue(consumer_token_t& token, U& item)
{
while (!sema->wait()) {
continue;
}
while (!inner.try_dequeue(token, item)) {
continue;
}
}
// Blocks the current thread until either there's something to dequeue
// or the timeout (specified in microseconds) expires. Returns false
// without setting `item` if the timeout expires, otherwise assigns
// to `item` and returns true.
// Using a negative timeout indicates an indefinite timeout,
// and is thus functionally equivalent to calling wait_dequeue.
// Never allocates. Thread-safe.
template<typename U>
inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::int64_t timeout_usecs)
{
if (!sema->wait(timeout_usecs)) {
return false;
}
while (!inner.try_dequeue(token, item)) {
continue;
}
return true;
}
// Blocks the current thread until either there's something to dequeue
// or the timeout expires. Returns false without setting `item` if the
// timeout expires, otherwise assigns to `item` and returns true.
// Never allocates. Thread-safe.
template<typename U, typename Rep, typename Period>
inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::chrono::duration<Rep, Period> const& timeout)
{
return wait_dequeue_timed(token, item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
}
// Attempts to dequeue several elements from the queue.
// Returns the number of items actually dequeued, which will
// always be at least one (this method blocks until the queue
// is non-empty) and at most max.
// Never allocates. Thread-safe.
template<typename It>
inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
return count;
}
// Attempts to dequeue several elements from the queue.
// Returns the number of items actually dequeued, which can
// be 0 if the timeout expires while waiting for elements,
// and at most max.
// Using a negative timeout indicates an indefinite timeout,
// and is thus functionally equivalent to calling wait_dequeue_bulk.
// Never allocates. Thread-safe.
template<typename It>
inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
return count;
}
// Attempts to dequeue several elements from the queue.
// Returns the number of items actually dequeued, which can
// be 0 if the timeout expires while waiting for elements,
// and at most max.
// Never allocates. Thread-safe.
template<typename It, typename Rep, typename Period>
inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
{
return wait_dequeue_bulk_timed<It&>(itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
}
// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued, which will
// always be at least one (this method blocks until the queue
// is non-empty) and at most max.
// Never allocates. Thread-safe.
template<typename It>
inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
return count;
}
// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued, which can
// be 0 if the timeout expires while waiting for elements,
// and at most max.
// Using a negative timeout indicates an indefinite timeout,
// and is thus functionally equivalent to calling wait_dequeue_bulk.
// Never allocates. Thread-safe.
template<typename It>
inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
return count;
}
// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued, which can
// be 0 if the timeout expires while waiting for elements,
// and at most max.
// Never allocates. Thread-safe.
template<typename It, typename Rep, typename Period>
inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
{
return wait_dequeue_bulk_timed<It&>(token, itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
}
// Returns an estimate of the total number of elements currently in the queue. This
// estimate is only accurate if the queue has completely stabilized before it is called
// (i.e. all enqueue and dequeue operations have completed and their memory effects are
// visible on the calling thread, and no further operations start while this method is
// being called).
// Thread-safe.
inline size_t size_approx() const
{
return (size_t)sema->availableApprox();
}
// Returns true if the underlying atomic variables used by
// the queue are lock-free (they should be on most platforms).
// Thread-safe.
static bool is_lock_free()
{
return ConcurrentQueue::is_lock_free();
}
private:
template<typename U, typename A1, typename A2>
static inline U* create(A1&& a1, A2&& a2)
{
void* p = (Traits::malloc)(sizeof(U));
return p != nullptr ? new (p) U(std::forward<A1>(a1), std::forward<A2>(a2)) : nullptr;
}
template<typename U>
static inline void destroy(U* p)
{
if (p != nullptr) {
p->~U();
}
(Traits::free)(p);
}
private:
ConcurrentQueue inner;
std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
};
template<typename T, typename Traits>
inline void swap(BlockingConcurrentQueue<T, Traits>& a, BlockingConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
{
a.swap(b);
}
} // end namespace moodycamel

3743
deps/concurrentqueue/concurrentqueue.h vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,412 @@
// Provides an efficient implementation of a semaphore (LightweightSemaphore).
// This is an extension of Jeff Preshing's sempahore implementation (licensed
// under the terms of its separate zlib license) that has been adapted and
// extended by Cameron Desrochers.
#pragma once
#include <cstddef> // For std::size_t
#include <atomic>
#include <type_traits> // For std::make_signed<T>
#if defined(_WIN32)
// Avoid including windows.h in a header; we only need a handful of
// items, so we'll redeclare them here (this is relatively safe since
// the API generally has to remain stable between Windows versions).
// I know this is an ugly hack but it still beats polluting the global
// namespace with thousands of generic names or adding a .cpp for nothing.
extern "C" {
struct _SECURITY_ATTRIBUTES;
__declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
__declspec(dllimport) int __stdcall CloseHandle(void* hObject);
__declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
__declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
}
#elif defined(__MACH__)
#include <mach/mach.h>
#elif defined(__unix__)
#include <semaphore.h>
#endif
namespace moodycamel
{
namespace details
{
// Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
// portable + lightweight semaphore implementations, originally from
// https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
// LICENSE:
// Copyright (c) 2015 Jeff Preshing
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgement in the product documentation would be
// appreciated but is not required.
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution.
#if defined(_WIN32)
class Semaphore
{
private:
void* m_hSema;
Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
public:
Semaphore(int initialCount = 0)
{
assert(initialCount >= 0);
const long maxLong = 0x7fffffff;
m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
assert(m_hSema);
}
~Semaphore()
{
CloseHandle(m_hSema);
}
bool wait()
{
const unsigned long infinite = 0xffffffff;
return WaitForSingleObject(m_hSema, infinite) == 0;
}
bool try_wait()
{
return WaitForSingleObject(m_hSema, 0) == 0;
}
bool timed_wait(std::uint64_t usecs)
{
return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) == 0;
}
void signal(int count = 1)
{
while (!ReleaseSemaphore(m_hSema, count, nullptr));
}
};
#elif defined(__MACH__)
//---------------------------------------------------------
// Semaphore (Apple iOS and OSX)
// Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
//---------------------------------------------------------
class Semaphore
{
private:
semaphore_t m_sema;
Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
public:
Semaphore(int initialCount = 0)
{
assert(initialCount >= 0);
kern_return_t rc = semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
assert(rc == KERN_SUCCESS);
(void)rc;
}
~Semaphore()
{
semaphore_destroy(mach_task_self(), m_sema);
}
bool wait()
{
return semaphore_wait(m_sema) == KERN_SUCCESS;
}
bool try_wait()
{
return timed_wait(0);
}
bool timed_wait(std::uint64_t timeout_usecs)
{
mach_timespec_t ts;
ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000);
ts.tv_nsec = static_cast<int>((timeout_usecs % 1000000) * 1000);
// added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
kern_return_t rc = semaphore_timedwait(m_sema, ts);
return rc == KERN_SUCCESS;
}
void signal()
{
while (semaphore_signal(m_sema) != KERN_SUCCESS);
}
void signal(int count)
{
while (count-- > 0)
{
while (semaphore_signal(m_sema) != KERN_SUCCESS);
}
}
};
#elif defined(__unix__)
//---------------------------------------------------------
// Semaphore (POSIX, Linux)
//---------------------------------------------------------
class Semaphore
{
private:
sem_t m_sema;
Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
public:
Semaphore(int initialCount = 0)
{
assert(initialCount >= 0);
int rc = sem_init(&m_sema, 0, static_cast<unsigned int>(initialCount));
assert(rc == 0);
(void)rc;
}
~Semaphore()
{
sem_destroy(&m_sema);
}
bool wait()
{
// http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
int rc;
do {
rc = sem_wait(&m_sema);
} while (rc == -1 && errno == EINTR);
return rc == 0;
}
bool try_wait()
{
int rc;
do {
rc = sem_trywait(&m_sema);
} while (rc == -1 && errno == EINTR);
return rc == 0;
}
bool timed_wait(std::uint64_t usecs)
{
struct timespec ts;
const int usecs_in_1_sec = 1000000;
const int nsecs_in_1_sec = 1000000000;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += (time_t)(usecs / usecs_in_1_sec);
ts.tv_nsec += (long)(usecs % usecs_in_1_sec) * 1000;
// sem_timedwait bombs if you have more than 1e9 in tv_nsec
// so we have to clean things up before passing it in
if (ts.tv_nsec >= nsecs_in_1_sec) {
ts.tv_nsec -= nsecs_in_1_sec;
++ts.tv_sec;
}
int rc;
do {
rc = sem_timedwait(&m_sema, &ts);
} while (rc == -1 && errno == EINTR);
return rc == 0;
}
void signal()
{
while (sem_post(&m_sema) == -1);
}
void signal(int count)
{
while (count-- > 0)
{
while (sem_post(&m_sema) == -1);
}
}
};
#else
#error Unsupported platform! (No semaphore wrapper available)
#endif
} // end namespace details
//---------------------------------------------------------
// LightweightSemaphore
//---------------------------------------------------------
class LightweightSemaphore
{
public:
typedef std::make_signed<std::size_t>::type ssize_t;
private:
std::atomic<ssize_t> m_count;
details::Semaphore m_sema;
int m_maxSpins;
bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
{
ssize_t oldCount;
int spin = m_maxSpins;
while (--spin >= 0)
{
oldCount = m_count.load(std::memory_order_relaxed);
if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
return true;
std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
}
oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
if (oldCount > 0)
return true;
if (timeout_usecs < 0)
{
if (m_sema.wait())
return true;
}
if (timeout_usecs > 0 && m_sema.timed_wait((std::uint64_t)timeout_usecs))
return true;
// At this point, we've timed out waiting for the semaphore, but the
// count is still decremented indicating we may still be waiting on
// it. So we have to re-adjust the count, but only if the semaphore
// wasn't signaled enough times for us too since then. If it was, we
// need to release the semaphore too.
while (true)
{
oldCount = m_count.load(std::memory_order_acquire);
if (oldCount >= 0 && m_sema.try_wait())
return true;
if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
return false;
}
}
ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
{
assert(max > 0);
ssize_t oldCount;
int spin = m_maxSpins;
while (--spin >= 0)
{
oldCount = m_count.load(std::memory_order_relaxed);
if (oldCount > 0)
{
ssize_t newCount = oldCount > max ? oldCount - max : 0;
if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
return oldCount - newCount;
}
std::atomic_signal_fence(std::memory_order_acquire);
}
oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
if (oldCount <= 0)
{
if ((timeout_usecs == 0) || (timeout_usecs < 0 && !m_sema.wait()) || (timeout_usecs > 0 && !m_sema.timed_wait((std::uint64_t)timeout_usecs)))
{
while (true)
{
oldCount = m_count.load(std::memory_order_acquire);
if (oldCount >= 0 && m_sema.try_wait())
break;
if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
return 0;
}
}
}
if (max > 1)
return 1 + tryWaitMany(max - 1);
return 1;
}
public:
LightweightSemaphore(ssize_t initialCount = 0, int maxSpins = 10000) : m_count(initialCount), m_maxSpins(maxSpins)
{
assert(initialCount >= 0);
assert(maxSpins >= 0);
}
bool tryWait()
{
ssize_t oldCount = m_count.load(std::memory_order_relaxed);
while (oldCount > 0)
{
if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
return true;
}
return false;
}
bool wait()
{
return tryWait() || waitWithPartialSpinning();
}
bool wait(std::int64_t timeout_usecs)
{
return tryWait() || waitWithPartialSpinning(timeout_usecs);
}
// Acquires between 0 and (greedily) max, inclusive
ssize_t tryWaitMany(ssize_t max)
{
assert(max >= 0);
ssize_t oldCount = m_count.load(std::memory_order_relaxed);
while (oldCount > 0)
{
ssize_t newCount = oldCount > max ? oldCount - max : 0;
if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
return oldCount - newCount;
}
return 0;
}
// Acquires at least one, and (greedily) at most max
ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
{
assert(max >= 0);
ssize_t result = tryWaitMany(max);
if (result == 0 && max > 0)
result = waitManyWithPartialSpinning(max, timeout_usecs);
return result;
}
ssize_t waitMany(ssize_t max)
{
ssize_t result = waitMany(max, -1);
assert(result > 0);
return result;
}
void signal(ssize_t count = 1)
{
assert(count >= 0);
ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
ssize_t toRelease = -oldCount < count ? -oldCount : count;
if (toRelease > 0)
{
m_sema.signal((int)toRelease);
}
}
std::size_t availableApprox() const
{
ssize_t count = m_count.load(std::memory_order_relaxed);
return count > 0 ? static_cast<std::size_t>(count) : 0;
}
};
} // end namespace moodycamel

View File

@ -246,7 +246,7 @@ endif
endif
# Include paths to dependencies
FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/license/
FINAL_CXXFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/rocksdb/include/ -I../deps/license
FINAL_CXXFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/rocksdb/include/ -I../deps/license -I../deps/concurrentqueue
# Determine systemd support and/or build preference (defaulting to auto-detection)
BUILD_WITH_SYSTEMD=no

View File

@ -148,4 +148,14 @@ size_t StorageCache::count() const
void StorageCache::beginWriteBatch() {
serverAssert(GlobalLocksAcquired()); // Otherwise we deadlock
m_spstorage->beginWriteBatch();
}
void StorageCache::emergencyFreeCache() {
dict *d = m_pdict;
m_pdict = nullptr;
if (d != nullptr) {
g_pserver->asyncworkqueue->AddWorkFunction([d]{
dictRelease(d);
});
}
}

View File

@ -43,6 +43,8 @@ public:
void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem);
void retrieve(sds key, IStorage::callbackSingle fn) const;
bool erase(sds key);
void emergencyFreeCache();
bool keycacheIsEnabled() const { return m_pdict != nullptr; }
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }

View File

@ -787,6 +787,7 @@ void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep,
}
thread_local spin_worker tl_worker = nullptr;
thread_local bool fOwnLockOverride = false;
void setAeLockSetThreadSpinWorker(spin_worker worker)
{
tl_worker = worker;
@ -807,9 +808,14 @@ void aeReleaseLock()
g_lock.unlock();
}
void aeSetThreadOwnsLockOverride(int fOverride)
{
fOwnLockOverride = fOverride;
}
int aeThreadOwnsLock()
{
return g_lock.fOwnLock();
return fOwnLockOverride || g_lock.fOwnLock();
}
int aeLockContested(int threshold)

View File

@ -168,6 +168,7 @@ void aeAcquireLock();
int aeTryAcquireLock(int fWeak);
void aeReleaseLock();
int aeThreadOwnsLock();
void aeSetThreadOwnsLockOverride(int fOverride);
int aeLockContested(int threshold);
int aeLockContention(); // returns the number of instantaneous threads waiting on the lock

View File

@ -2725,6 +2725,7 @@ standardConfig configs[] = {
createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0, NULL, NULL),
createBoolConfig("time-thread-priority", NULL, IMMUTABLE_CONFIG, cserver.time_thread_priority, 0, NULL, NULL),
createBoolConfig("prefetch-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->prefetch_enabled, 1, NULL, NULL),
createBoolConfig("allow-rdb-resize-op", NULL, MODIFIABLE_CONFIG, g_pserver->allowRdbResizeOp, 1, NULL, NULL),
createBoolConfig("crash-log-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->crashlog_enabled, 1, NULL, updateSighandlerEnabled),
createBoolConfig("crash-memcheck-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->memcheck_enabled, 1, NULL, NULL),
createBoolConfig("use-exit-on-panic", NULL, MODIFIABLE_CONFIG, g_pserver->use_exit_on_panic, 0, NULL, NULL),

View File

@ -2910,6 +2910,35 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
return (m_spstorage != nullptr);
}
void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
{
++pendingJobs;
serverAssert(!m_fAllChanged);
dictEmpty(m_dictChanged, nullptr);
dict *dictNew = dictCreate(&dbDictType, nullptr);
std::swap(dictNew, m_pdict);
m_cnewKeysPending = 0;
g_pserver->asyncworkqueue->AddWorkFunction([dictNew, this, &pendingJobs]{
dictIterator *di = dictGetIterator(dictNew);
dictEntry *de;
std::vector<sds> veckeys;
std::vector<sds> vecvals;
while ((de = dictNext(di)) != nullptr)
{
robj *o = (robj*)dictGetVal(de);
sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o);
veckeys.push_back((sds)dictGetKey(de));
vecvals.push_back(temp);
}
m_spstorage->bulkInsert(veckeys.data(), vecvals.data(), veckeys.size());
for (auto val : vecvals)
sdsfree(val);
dictReleaseIterator(di);
dictRelease(dictNew);
--pendingJobs;
});
}
void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
{
if (m_pdbSnapshotStorageFlush)
@ -3047,6 +3076,20 @@ void redisDbPersistentData::removeAllCachedValues()
}
}
void redisDbPersistentData::disableKeyCache()
{
if (m_spstorage == nullptr)
return;
m_spstorage->emergencyFreeCache();
}
bool redisDbPersistentData::keycacheIsEnabled()
{
if (m_spstorage == nullptr)
return false;
return m_spstorage->keycacheIsEnabled();
}
void redisDbPersistentData::trackkey(const char *key, bool fUpdate)
{
if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) {

View File

@ -131,19 +131,6 @@ int _dictInit(dict *d, dictType *type,
return DICT_OK;
}
/* Resize the table to the minimal size that contains all the elements,
* but with the invariant of a USED/BUCKETS ratio near to <= 1 */
int dictResize(dict *d)
{
unsigned long minimal;
if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
minimal = d->ht[0].used;
if (minimal < DICT_HT_INITIAL_SIZE)
minimal = DICT_HT_INITIAL_SIZE;
return dictExpand(d, minimal, false /*fShirnk*/);
}
/* Expand or create the hash table,
* when malloc_failed is non-NULL, it'll avoid panic if malloc fails (in which case it'll be set to 1).
* Returns DICT_OK if expand was performed, and DICT_ERR if skipped. */
@ -189,6 +176,19 @@ int _dictExpand(dict *d, unsigned long size, bool fShrink, int* malloc_failed)
return DICT_OK;
}
/* Resize the table to the minimal size that contains all the elements,
* but with the invariant of a USED/BUCKETS ratio near to <= 1 */
int dictResize(dict *d)
{
unsigned long minimal;
if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
minimal = d->ht[0].used;
if (minimal < DICT_HT_INITIAL_SIZE)
minimal = DICT_HT_INITIAL_SIZE;
return _dictExpand(d, minimal, false /*fShirnk*/, nullptr);
}
int dictMerge(dict *dst, dict *src)
{
#define MERGE_BLOCK_SIZE 4
@ -273,7 +273,7 @@ int dictMerge(dict *dst, dict *src)
return DICT_OK;
}
dictExpand(dst, dictSize(dst)+dictSize(src), false /* fShrink */); // start dst rehashing if necessary
_dictExpand(dst, dictSize(dst)+dictSize(src), false /* fShrink */, nullptr); // start dst rehashing if necessary
auto &htDst = dictIsRehashing(dst) ? dst->ht[1] : dst->ht[0];
for (int iht = 0; iht < 2; ++iht)
{
@ -328,12 +328,16 @@ int dictMerge(dict *dst, dict *src)
/* return DICT_ERR if expand was not performed */
int dictExpand(dict *d, unsigned long size, bool fShrink) {
// External expand likely means mass insertion, and we don't want to shrink during that
d->noshrink = true;
return _dictExpand(d, size, fShrink, NULL);
}
/* return DICT_ERR if expand failed due to memory allocation failure */
int dictTryExpand(dict *d, unsigned long size, bool fShrink) {
int malloc_failed;
// External expand likely means mass insertion, and we don't want to shrink during that
d->noshrink = true;
_dictExpand(d, size, fShrink, &malloc_failed);
return malloc_failed? DICT_ERR : DICT_OK;
}
@ -677,6 +681,9 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
dictEntry *he, *prevHe;
int table;
// if we are deleting elements we probably aren't mass inserting anymore and it is safe to shrink
d->noshrink = false;
if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL;
if (dictIsRehashing(d)) _dictRehashStep(d);
@ -715,6 +722,7 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
if (!dictIsRehashing(d)) break;
}
_dictExpandIfNeeded(d);
return NULL; /* not found */
}
@ -1317,7 +1325,7 @@ static int _dictExpandIfNeeded(dict *d)
if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE, false /*fShrink*/);
if (d->ht[0].size == 0) return _dictExpand(d, DICT_HT_INITIAL_SIZE, false /*fShrink*/, nullptr);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
@ -1328,12 +1336,12 @@ static int _dictExpandIfNeeded(dict *d)
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio) &&
dictTypeExpandAllowed(d))
{
return dictExpand(d, d->ht[0].used + 1, false /*fShrink*/);
return _dictExpand(d, d->ht[0].used + 1, false /*fShrink*/, nullptr);
}
else if (d->ht[0].used > 0 && d->ht[0].size >= (1024*SHRINK_FACTOR) && (d->ht[0].used * 16) < d->ht[0].size && dict_can_resize)
else if (d->ht[0].used > 0 && d->ht[0].size >= (1024*SHRINK_FACTOR) && (d->ht[0].used * 16) < d->ht[0].size && dict_can_resize && !d->noshrink)
{
// If the dictionary has shurnk a lot we'll need to shrink the hash table instead
return dictExpand(d, d->ht[0].size/SHRINK_FACTOR, true /*fShrink*/);
return _dictExpand(d, d->ht[0].size/SHRINK_FACTOR, true /*fShrink*/, nullptr);
}
return DICT_OK;
}

View File

@ -124,6 +124,7 @@ typedef struct dict {
unsigned refcount;
dictAsyncRehashCtl *asyncdata;
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
uint8_t noshrink = false;
} dict;
/* If safe is set to 1 this is a safe iterator, that means, you can call

View File

@ -868,10 +868,15 @@ cant_free:
redisDb *db = g_pserver->db[idb];
if (db->FStorageProvider())
{
serverLog(LL_WARNING, "Failed to evict keys, falling back to flushing entire cache. Consider increasing maxmemory-samples.");
db->removeAllCachedValues();
if (((mem_reported - zmalloc_used_memory()) + mem_freed) >= mem_tofree)
result = EVICT_OK;
if (db->size() != 0 && db->size(true /*fcachedOnly*/) == 0 && db->keycacheIsEnabled()) {
serverLog(LL_WARNING, "Key cache exceeds maxmemory, freeing - performance may be affected increase maxmemory if possible");
db->disableKeyCache();
} else if (db->size(true /*fCachedOnly*/)) {
serverLog(LL_WARNING, "Failed to evict keys, falling back to flushing entire cache. Consider increasing maxmemory-samples.");
db->removeAllCachedValues();
if (((mem_reported - zmalloc_used_memory()) + mem_freed) >= mem_tofree)
result = EVICT_OK;
}
}
}
}

View File

@ -2563,6 +2563,352 @@ void stopSaving(int success) {
NULL);
}
class JobBase
{
public:
enum class JobType {
Function,
Insert
};
JobType type;
JobBase(JobType type)
: type(type)
{}
virtual ~JobBase() = default;
};
struct rdbInsertJob : public JobBase
{
redisDb *db = nullptr;
sds key = nullptr;
robj *val = nullptr;
long long lru_clock;
long long expiretime;
long long lru_idle;
long long lfu_freq;
std::vector<std::pair<robj_sharedptr, long long>> vecsubexpires;
void addSubexpireKey(robj *subkey, long long when) {
vecsubexpires.push_back(std::make_pair(robj_sharedptr(subkey), when));
decrRefCount(subkey);
}
rdbInsertJob()
: JobBase(JobBase::JobType::Insert)
{}
rdbInsertJob(rdbInsertJob &&src)
: JobBase(JobBase::JobType::Insert)
{
db = src.db;
src.db = nullptr;
key = src.key;
src.key = nullptr;
val = src.val;
src.val = nullptr;
lru_clock = src.lru_clock;
expiretime = src.expiretime;
lru_idle = src.lru_idle;
lfu_freq = src.lfu_freq;
vecsubexpires = std::move(src.vecsubexpires);
}
~rdbInsertJob() {
if (key)
sdsfree(key);
if (val)
decrRefCount(val);
}
};
struct rdbFunctionJob : public JobBase
{
public:
std::function<void()> m_fn;
rdbFunctionJob(std::function<void()> &&fn)
: JobBase(JobBase::JobType::Function), m_fn(fn)
{}
};
class rdbAsyncWorkThread
{
rdbSaveInfo *rsi;
int rdbflags;
moodycamel::BlockingConcurrentQueue<JobBase*> queueJobs;
fastlock m_lockPause { "rdbAsyncWork-Pause"};
bool fLaunched = false;
std::atomic<int> fExit {false};
std::atomic<size_t> ckeysLoaded;
std::atomic<int> cstorageWritesInFlight;
std::atomic<bool> workerThreadDone;
std::thread m_thread;
std::vector<JobBase*> vecbatch;
long long now;
long long lastPing = -1;
static void listFreeMethod(const void *v) {
delete reinterpret_cast<const JobBase*>(v);
}
public:
rdbAsyncWorkThread(rdbSaveInfo *rsi, int rdbflags, long long now)
: rsi(rsi), rdbflags(rdbflags), now(now)
{
ckeysLoaded = 0;
cstorageWritesInFlight = 0;
}
~rdbAsyncWorkThread() {
if (m_thread.joinable())
endWork();
}
void start() {
serverAssert(!fLaunched);
m_thread = std::thread(&rdbAsyncWorkThread::loadWorkerThreadMain, this);
fLaunched = true;
}
void throttle() {
if (g_pserver->m_pstorageFactory && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
while ((cstorageWritesInFlight.load(std::memory_order_relaxed) || queueJobs.size_approx()) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
usleep(1);
pauseExecution();
ProcessWhileBlocked();
resumeExecution();
}
if ((getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
for (int idb = 0; idb < cserver.dbnum; ++idb) {
redisDb *db = g_pserver->db[idb];
if (db->size() > 0 && db->keycacheIsEnabled()) {
serverLog(LL_WARNING, "Key cache %d exceeds maxmemory during load, freeing - performance may be affected increase maxmemory if possible", idb);
db->disableKeyCache();
}
}
}
}
}
void enqueue(std::unique_ptr<rdbInsertJob> &spjob) {
vecbatch.push_back(spjob.release());
if (vecbatch.size() >= 64) {
queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size());
vecbatch.clear();
throttle();
}
}
void pauseExecution() {
m_lockPause.lock();
}
void resumeExecution() {
m_lockPause.unlock();
}
void enqueue(std::function<void()> &&fn) {
std::unique_ptr<JobBase> spjob = std::make_unique<rdbFunctionJob>(std::move(fn));
queueJobs.enqueue(spjob.release());
throttle();
}
void ProcessWhileBlocked() {
if ((mstime() - lastPing) > 1000) { // Ping if its been a second or longer
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln);
if (mi->masterhost && mi->repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster(mi);
}
lastPing = mstime();
}
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
}
size_t ckeys() { return ckeysLoaded; }
size_t endWork() {
if (!vecbatch.empty()) {
queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size());
vecbatch.clear();
}
std::atomic_thread_fence(std::memory_order_seq_cst); // The queue must have transferred to the consumer before we call fExit
serverAssert(fLaunched);
fExit = true;
if (g_pserver->m_pstorageFactory) {
// If we have a storage provider it can take some time to complete and we want to process events in the meantime
while (!workerThreadDone) {
usleep(10);
pauseExecution();
ProcessWhileBlocked();
resumeExecution();
}
}
m_thread.join();
while (cstorageWritesInFlight.load(std::memory_order_seq_cst)) {
usleep(10);
ProcessWhileBlocked();
}
fLaunched = false;
fExit = false;
serverAssert(queueJobs.size_approx() == 0);
return ckeysLoaded;
}
void processJob(rdbInsertJob &job) {
redisObjectStack keyobj;
initStaticStringObject(keyobj,job.key);
bool f1024thKey = false;
bool fStaleMvccKey = (this->rsi) ? mvccFromObj(job.val) < this->rsi->mvccMinThreshold : false;
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the replica. */
bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != -1 && job.expiretime < this->now;
if (fStaleMvccKey || fExpiredKey) {
if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->mi != nullptr && this->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) {
// We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us
robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key)));
this->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup);
}
sdsfree(job.key);
job.key = nullptr;
decrRefCount(job.val);
job.val = nullptr;
} else {
/* Add the new object in the hash table */
int fInserted = dbMerge(job.db, job.key, job.val, (this->rsi && this->rsi->fForceSetKey) || (this->rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef
if (fInserted)
{
auto ckeys = this->ckeysLoaded.fetch_add(1, std::memory_order_relaxed);
f1024thKey = (ckeys % 1024) == 0;
/* Set the expire time if needed */
if (job.expiretime != -1)
{
setExpire(NULL,job.db,&keyobj,nullptr,job.expiretime);
}
/* Set usage information (for eviction). */
objectSetLRUOrLFU(job.val,job.lfu_freq,job.lru_idle,job.lru_clock,1000);
/* call key space notification on key loaded for modules only */
moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, job.db->id);
replicationNotifyLoadedKey(job.db, &keyobj, job.val, job.expiretime);
for (auto &pair : job.vecsubexpires)
{
setExpire(NULL, job.db, &keyobj, pair.first, pair.second);
replicateSubkeyExpire(job.db, &keyobj, pair.first.get(), pair.second);
}
job.val = nullptr; // don't free this as we moved ownership to the DB
}
}
/* If we have a storage provider check if we need to evict some keys to stay under our memory limit,
do this every 16 keys to limit the perf impact */
if (g_pserver->m_pstorageFactory && f1024thKey)
{
bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK);
if (fHighMemory || f1024thKey)
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
if (g_pserver->m_pstorageFactory) {
g_pserver->db[idb]->processChangesAsync(this->cstorageWritesInFlight);
fHighMemory = false;
}
}
if (fHighMemory)
performEvictions(false /* fPreSnapshot*/);
}
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
}
}
static void loadWorkerThreadMain(rdbAsyncWorkThread *pqueue) {
rdbAsyncWorkThread &queue = *pqueue;
redisServerThreadVars vars = {};
vars.clients_pending_asyncwrite = listCreate();
serverTL = &vars;
aeSetThreadOwnsLockOverride(true);
// We will inheret the server thread's affinity mask, clear it as we want to run on a different core.
cpu_set_t *cpuset = CPU_ALLOC(std::thread::hardware_concurrency());
if (cpuset != nullptr) {
size_t size = CPU_ALLOC_SIZE(std::thread::hardware_concurrency());
CPU_ZERO_S(size, cpuset);
for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
CPU_SET_S(i, size, cpuset);
}
pthread_setaffinity_np(pthread_self(), size, cpuset);
CPU_FREE(cpuset);
}
for (;;) {
if (queue.queueJobs.size_approx() == 0) {
if (queue.fExit.load(std::memory_order_relaxed))
break;
}
if (queue.fExit.load(std::memory_order_seq_cst) && queue.queueJobs.size_approx() == 0)
break;
vars.gcEpoch = g_pserver->garbageCollector.startEpoch();
JobBase *rgjob[64];
int cjobs = 0;
while ((cjobs = pqueue->queueJobs.wait_dequeue_bulk_timed(rgjob, 64, std::chrono::milliseconds(5))) > 0) {
std::unique_lock<fastlock> ulPause(pqueue->m_lockPause);
for (int ijob = 0; ijob < cjobs; ++ijob) {
JobBase *pjob = rgjob[ijob];
switch (pjob->type)
{
case JobBase::JobType::Insert:
pqueue->processJob(*static_cast<rdbInsertJob*>(pjob));
break;
case JobBase::JobType::Function:
static_cast<rdbFunctionJob*>(pjob)->m_fn();
break;
}
delete pjob;
}
}
g_pserver->garbageCollector.endEpoch(vars.gcEpoch);
}
if (g_pserver->m_pstorageFactory) {
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb]->processChangesAsync(queue.cstorageWritesInFlight);
}
queue.workerThreadDone = true;
ProcessPendingAsyncWrites();
listRelease(vars.clients_pending_asyncwrite);
aeSetThreadOwnsLockOverride(false);
}
};
/* Track loading progress in order to serve client's from time to time
and if needed calculate rdb checksum */
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
@ -2574,6 +2920,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
(g_pserver->loading_process_events_interval_keys &&
(r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys)))
{
rdbAsyncWorkThread *pwthread = reinterpret_cast<rdbAsyncWorkThread*>(r->chksum_arg);
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
@ -2584,7 +2932,13 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
replicationSendNewlineToMaster(mi);
}
loadingProgress(r->processed_bytes);
if (pwthread)
pwthread->pauseExecution();
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
if (pwthread)
pwthread->resumeExecution();
processModuleLoadingProgressEvent(0);
robj *ping_argv[1];
@ -2597,28 +2951,38 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
}
}
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
uint64_t dbid = 0;
int type, rdbver;
redisDb *db = g_pserver->db[dbid];
redisDb *dbCur = g_pserver->db[dbid];
char buf[1024];
/* Key-specific attributes, set by opcodes before the key type. */
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now;
long long lru_clock = 0;
unsigned long long ckeysLoaded = 0;
uint64_t mvcc_tstamp = OBJ_MVCC_INVALID;
size_t ckeysLoaded = 0;
now = mstime();
rdbAsyncWorkThread wqueue(rsi, rdbflags, now);
robj *subexpireKey = nullptr;
sds key = nullptr;
bool fLastKeyExpired = false;
std::unique_ptr<rdbInsertJob> spjob;
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
g_pserver->db[idb]->trackChanges(true, 1024);
// If we're tracking changes we need to reset this
bool fTracking = g_pserver->db[0]->FTrackingChanges();
if (fTracking) {
// We don't want to track here because processChangesAsync is outside the normal scope handling
for (int idb = 0; idb < cserver.dbnum; ++idb) {
if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges();
}
}
rdb->update_cksum = rdbLoadProgressCallback;
rdb->chksum_arg = &wqueue;
rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes;
if (rioRead(rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0';
@ -2634,8 +2998,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
return C_ERR;
}
now = mstime();
lru_clock = LRU_CLOCK();
wqueue.start();
while(1) {
robj *val;
@ -2683,7 +3047,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
"databases. Exiting\n", cserver.dbnum);
exit(1);
}
db = g_pserver->db[dbid];
dbCur = g_pserver->db[dbid];
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently
@ -2693,7 +3057,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
goto eoferr;
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
db->expand(db_size);
if (g_pserver->allowRdbResizeOp && !g_pserver->m_pstorageFactory) {
wqueue.enqueue([dbCur, db_size]{
dbCur->expand(db_size);
});
}
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_AUX) {
/* AUX: generic string-string fields. Use to add state to RDB
@ -2768,12 +3136,10 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
}
}
else {
redisObjectStack keyobj;
initStaticStringObject(keyobj,key);
long long expireT = strtoll(szFromObj(auxval), nullptr, 10);
setExpire(NULL, db, &keyobj, subexpireKey, expireT);
replicateSubkeyExpire(db, &keyobj, subexpireKey, expireT);
decrRefCount(subexpireKey);
serverAssert(spjob != nullptr);
serverAssert(sdscmp(key, spjob->key) == 0);
spjob->addSubexpireKey(subexpireKey, expireT);
subexpireKey = nullptr;
}
} else {
@ -2846,7 +3212,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
key = nullptr;
}
if ((key = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
if ((key = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS_SHARED,NULL)) == NULL)
goto eoferr;
/* Read value */
if ((val = rdbLoadObject(type,rdb,key,mvcc_tstamp)) == NULL) {
@ -2854,7 +3220,20 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
key = nullptr;
goto eoferr;
}
bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false;
if (spjob != nullptr)
wqueue.enqueue(spjob);
spjob = std::make_unique<rdbInsertJob>();
spjob->db = dbCur;
spjob->key = sdsdupshared(key);
spjob->val = val;
spjob->lru_clock = lru_clock;
spjob->expiretime = expiretime;
spjob->lru_idle = lru_idle;
spjob->lfu_freq = lfu_freq;
val = nullptr;
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
@ -2864,75 +3243,15 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
* Similarly if the RDB is the preamble of an AOF file, we want to
* load all the keys as they are, since the log of operations later
* assume to work in an exact keyspace state. */
redisObjectStack keyobj;
initStaticStringObject(keyobj,key);
bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now;
if (fStaleMvccKey || fExpiredKey) {
if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) {
// We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us
robj_sharedptr objKeyDup(createStringObject(key, sdslen(key)));
rsi->mi->staleKeyMap->operator[](db->id).push_back(objKeyDup);
}
fLastKeyExpired = true;
sdsfree(key);
key = nullptr;
decrRefCount(val);
val = nullptr;
} else {
/* If we have a storage provider check if we need to evict some keys to stay under our memory limit,
do this every 16 keys to limit the perf impact */
if (g_pserver->m_pstorageFactory && (ckeysLoaded % 128) == 0)
{
fLastKeyExpired = fStaleMvccKey || fExpiredKey;
ckeysLoaded++;
if (g_pserver->m_pstorageFactory && (ckeysLoaded % 128) == 0)
{
if (!serverTL->gcEpoch.isReset()) {
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK);
if (fHighMemory || (ckeysLoaded % (1024)) == 0)
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges();
if (fHighMemory && !(rsi && rsi->fForceSetKey)) {
g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica)
fHighMemory = false; // we took care of it
}
g_pserver->db[idb]->trackChanges(false, 1024);
}
if (fHighMemory)
performEvictions(false /* fPreSnapshot*/);
}
}
redisObjectStack keyobj;
initStaticStringObject(keyobj,key);
/* Add the new object in the hash table */
int fInserted = dbMerge(db, key, val, (rsi && rsi->fForceSetKey) || (rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef
fLastKeyExpired = false;
if (fInserted)
{
++ckeysLoaded;
/* Set the expire time if needed */
if (expiretime != -1)
{
setExpire(NULL,db,&keyobj,nullptr,expiretime);
}
/* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);
/* call key space notification on key loaded for modules only */
moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, db->id);
replicationNotifyLoadedKey(db, &keyobj, val, expiretime);
}
else
{
decrRefCount(val);
val = nullptr;
}
}
@ -2948,6 +3267,9 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
lru_idle = -1;
}
if (spjob != nullptr)
wqueue.enqueue(spjob);
if (key != nullptr)
{
sdsfree(key);
@ -2980,11 +3302,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
}
}
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges();
wqueue.endWork();
if (fTracking) {
// Reset track changes
for (int idb = 0; idb < cserver.dbnum; ++idb) {
g_pserver->db[idb]->trackChanges(false);
}
}
return C_OK;
/* Unexpected end of file is handled here calling rdbReportReadError():
@ -2992,6 +3317,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
* the RDB file from a socket during initial SYNC (diskless replica mode),
* we'll report the error to the caller, so that we can retry. */
eoferr:
if (fTracking) {
// Reset track changes
for (int idb = 0; idb < cserver.dbnum; ++idb) {
g_pserver->db[idb]->trackChanges(false);
}
}
wqueue.endWork();
if (key != nullptr)
{
sdsfree(key);

View File

@ -119,10 +119,11 @@
#define RDB_MODULE_OPCODE_STRING 5 /* String. */
/* rdbLoad...() functions flags. */
#define RDB_LOAD_NONE 0
#define RDB_LOAD_ENC (1<<0)
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_LOAD_NONE 0
#define RDB_LOAD_ENC (1<<0)
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_LOAD_SDS_SHARED ((1 << 3) | RDB_LOAD_SDS)
/* flags on the purpose of rdb save or load */
#define RDBFLAGS_NONE 0 /* No special RDB loading. */

View File

@ -99,6 +99,7 @@ static const rio rioBufferIO = {
rioBufferTell,
rioBufferFlush,
NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
@ -113,6 +114,7 @@ static const rio rioConstBufferIO = {
rioBufferTell,
rioBufferFlush,
NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
@ -176,6 +178,7 @@ static const rio rioFileIO = {
rioFileTell,
rioFileFlush,
NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
@ -275,6 +278,7 @@ static const rio rioConnIO = {
rioConnTell,
rioConnFlush,
NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
@ -394,6 +398,7 @@ static const rio rioFdIO = {
rioFdTell,
rioFdFlush,
NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */

View File

@ -58,6 +58,7 @@ struct _rio {
* and len fields pointing to the new block of data to add to the checksum
* computation. */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
void *chksum_arg;
/* The current checksum and flags (see RIO_FLAG_*) */
uint64_t cksum, flags;

View File

@ -2612,7 +2612,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
g_pserver->rdb_bgsave_scheduled = 0;
}
if (cserver.storage_memory_model == STORAGE_WRITEBACK && g_pserver->m_pstorageFactory) {
if (cserver.storage_memory_model == STORAGE_WRITEBACK && g_pserver->m_pstorageFactory && !g_pserver->loading) {
run_with_period(g_pserver->storage_flush_period) {
flushStorageWeak();
}
@ -2888,7 +2888,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
static thread_local bool fFirstRun = true;
// note: we also copy the DB pointer in case a DB swap is done while the lock is released
std::vector<redisDb*> vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released
if (cserver.storage_memory_model == STORAGE_WRITETHROUGH && g_pserver->m_pstorageFactory != nullptr)
if (cserver.storage_memory_model == STORAGE_WRITETHROUGH && g_pserver->m_pstorageFactory != nullptr && !g_pserver->loading)
{
if (!fFirstRun) {
mstime_t storage_process_latency;

View File

@ -39,6 +39,9 @@
#include "rio.h"
#include "atomicvar.h"
#include <concurrentqueue.h>
#include <blockingconcurrentqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <cmath>
@ -1157,12 +1160,14 @@ public:
void setStorageProvider(StorageCache *pstorage);
void trackChanges(bool fBulk, size_t sizeHint = 0);
bool FTrackingChanges() const { return !!m_fTrackingChanges; }
// Process and commit changes for secondary storage. Note that process and commit are seperated
// to allow you to release the global lock before commiting. To prevent deadlocks you *must*
// either release the global lock or keep the same global lock between the two functions as
// a second look is kept to ensure writes to secondary storage are ordered
bool processChanges(bool fSnapshot);
void processChangesAsync(std::atomic<int> &pendingJobs);
void commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree = nullptr);
// This should only be used if you look at the key, we do not fixup
@ -1180,6 +1185,8 @@ public:
bool FStorageProvider() { return m_spstorage != nullptr; }
bool removeCachedValue(const char *key, dictEntry **ppde = nullptr);
void removeAllCachedValues();
void disableKeyCache();
bool keycacheIsEnabled();
bool prefetchKeysAsync(client *c, struct parsed_command &command, bool fExecOK);
@ -1327,6 +1334,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::setExpire;
using redisDbPersistentData::trackChanges;
using redisDbPersistentData::processChanges;
using redisDbPersistentData::processChangesAsync;
using redisDbPersistentData::commitChanges;
using redisDbPersistentData::setexpireUnsafe;
using redisDbPersistentData::setexpire;
@ -1334,11 +1342,14 @@ struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::endSnapshot;
using redisDbPersistentData::restoreSnapshot;
using redisDbPersistentData::removeAllCachedValues;
using redisDbPersistentData::disableKeyCache;
using redisDbPersistentData::keycacheIsEnabled;
using redisDbPersistentData::dictUnsafeKeyOnly;
using redisDbPersistentData::resortExpire;
using redisDbPersistentData::prefetchKeysAsync;
using redisDbPersistentData::prepOverwriteForSnapshot;
using redisDbPersistentData::FRehashing;
using redisDbPersistentData::FTrackingChanges;
public:
expireset::setiter expireitr;
@ -2300,6 +2311,7 @@ struct redisServer {
sds aof_child_diff; /* AOF diff accumulator child side. */
int aof_rewrite_pending = 0; /* is a call to aofChildWriteDiffData already queued? */
/* RDB persistence */
int allowRdbResizeOp; /* Debug situations we may want rehash to be ocurring, so ignore resize */
long long dirty; /* Changes to DB from the last save */
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
struct _rdbThreadVars

View File

@ -53,17 +53,6 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons
options.create_missing_column_families = true;
rocksdb::DB *db = nullptr;
if (rgchConfig != nullptr)
{
std::string options_string(rgchConfig, cchConfig);
rocksdb::Status status;
if (!(status = rocksdb::GetDBOptionsFromString(options, options_string, &options)).ok())
{
fprintf(stderr, "Failed to parse FLASH options: %s\r\n", status.ToString().c_str());
exit(EXIT_FAILURE);
}
}
options.max_background_compactions = 4;
options.max_background_flushes = 2;
options.bytes_per_sync = 1048576;
@ -90,6 +79,17 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons
cf_options.level_compaction_dynamic_level_bytes = true;
veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(std::to_string(idb), cf_options));
}
if (rgchConfig != nullptr)
{
std::string options_string(rgchConfig, cchConfig);
rocksdb::Status status;
if (!(status = rocksdb::GetDBOptionsFromString(options, options_string, &options)).ok())
{
fprintf(stderr, "Failed to parse FLASH options: %s\r\n", status.ToString().c_str());
exit(EXIT_FAILURE);
}
}
std::vector<rocksdb::ColumnFamilyHandle*> handles;
status = rocksdb::DB::Open(options, dbfile, veccoldesc, &handles, &db);

View File

@ -202,6 +202,18 @@ test {client freed during loading} {
}
}
test {repeated load} {
start_server [list overrides [list server-threads 3 allow-rdb-resize-op no]] {
r debug populate 500000 key 1000
set digest [r debug digest]
for {set j 0} {$j < 10} {incr j} {
r debug reload
assert_equal $digest [r debug digest]
}
}
}
# Our COW metrics (Private_Dirty) work only on Linux
set system_name [string tolower [exec uname -s]]
if {$system_name eq {linux}} {