Merge branch 'keydbpro' into async_commands

Former-commit-id: 9eaddb8ca1424ff3225dac9c144d23848228c7d2
This commit is contained in:
Malavan Sotheeswaran 2021-11-30 11:47:51 -05:00
commit 54f3436250
27 changed files with 5770 additions and 310 deletions

View File

@ -0,0 +1,583 @@
// Provides an efficient blocking version of moodycamel::ConcurrentQueue.
// ©2015-2020 Cameron Desrochers. Distributed under the terms of the simplified
// BSD license, available at the top of concurrentqueue.h.
// Also dual-licensed under the Boost Software License (see LICENSE.md)
// Uses Jeff Preshing's semaphore implementation (under the terms of its
// separate zlib license, see lightweightsemaphore.h).
#pragma once
#include "concurrentqueue.h"
#include "lightweightsemaphore.h"
#include <type_traits>
#include <cerrno>
#include <memory>
#include <chrono>
#include <ctime>
namespace moodycamel
{
// This is a blocking version of the queue. It has an almost identical interface to
// the normal non-blocking version, with the addition of various wait_dequeue() methods
// and the removal of producer-specific dequeue methods.
template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
class BlockingConcurrentQueue
{
private:
typedef ::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
typedef ::moodycamel::LightweightSemaphore LightweightSemaphore;
public:
typedef typename ConcurrentQueue::producer_token_t producer_token_t;
typedef typename ConcurrentQueue::consumer_token_t consumer_token_t;
typedef typename ConcurrentQueue::index_t index_t;
typedef typename ConcurrentQueue::size_t size_t;
typedef typename std::make_signed<size_t>::type ssize_t;
static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
public:
// Creates a queue with at least `capacity` element slots; note that the
// actual number of elements that can be inserted without additional memory
// allocation depends on the number of producers and the block size (e.g. if
// the block size is equal to `capacity`, only a single block will be allocated
// up-front, which means only a single producer will be able to enqueue elements
// without an extra allocation -- blocks aren't shared between producers).
// This method is not thread safe -- it is up to the user to ensure that the
// queue is fully constructed before it starts being used by other threads (this
// includes making the memory effects of construction visible, possibly with a
// memory barrier).
explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
: inner(capacity), sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
{
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
if (!sema) {
MOODYCAMEL_THROW(std::bad_alloc());
}
}
BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
: inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
{
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
if (!sema) {
MOODYCAMEL_THROW(std::bad_alloc());
}
}
// Disable copying and copy assignment
BlockingConcurrentQueue(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
BlockingConcurrentQueue& operator=(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
// Moving is supported, but note that it is *not* a thread-safe operation.
// Nobody can use the queue while it's being moved, and the memory effects
// of that move must be propagated to other threads before they can use it.
// Note: When a queue is moved, its tokens are still valid but can only be
// used with the destination queue (i.e. semantically they are moved along
// with the queue itself).
BlockingConcurrentQueue(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
: inner(std::move(other.inner)), sema(std::move(other.sema))
{ }
inline BlockingConcurrentQueue& operator=(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
{
return swap_internal(other);
}
// Swaps this queue's state with the other's. Not thread-safe.
// Swapping two queues does not invalidate their tokens, however
// the tokens that were created for one queue must be used with
// only the swapped queue (i.e. the tokens are tied to the
// queue's movable state, not the object itself).
inline void swap(BlockingConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
{
swap_internal(other);
}
private:
BlockingConcurrentQueue& swap_internal(BlockingConcurrentQueue& other)
{
if (this == &other) {
return *this;
}
inner.swap(other.inner);
sema.swap(other.sema);
return *this;
}
public:
// Enqueues a single item (by copying it).
// Allocates memory if required. Only fails if memory allocation fails (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(T const& item)
{
if ((details::likely)(inner.enqueue(item))) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by moving it, if possible).
// Allocates memory if required. Only fails if memory allocation fails (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(T&& item)
{
if ((details::likely)(inner.enqueue(std::move(item)))) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by copying it) using an explicit producer token.
// Allocates memory if required. Only fails if memory allocation fails (or
// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(producer_token_t const& token, T const& item)
{
if ((details::likely)(inner.enqueue(token, item))) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by moving it, if possible) using an explicit producer token.
// Allocates memory if required. Only fails if memory allocation fails (or
// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(producer_token_t const& token, T&& item)
{
if ((details::likely)(inner.enqueue(token, std::move(item)))) {
sema->signal();
return true;
}
return false;
}
// Enqueues several items.
// Allocates memory if required. Only fails if memory allocation fails (or
// implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
// is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Note: Use std::make_move_iterator if the elements should be moved instead of copied.
// Thread-safe.
template<typename It>
inline bool enqueue_bulk(It itemFirst, size_t count)
{
if ((details::likely)(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
return true;
}
return false;
}
// Enqueues several items using an explicit producer token.
// Allocates memory if required. Only fails if memory allocation fails
// (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Note: Use std::make_move_iterator if the elements should be moved
// instead of copied.
// Thread-safe.
template<typename It>
inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
if ((details::likely)(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
return true;
}
return false;
}
// Enqueues a single item (by copying it).
// Does not allocate memory. Fails if not enough room to enqueue (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
// is 0).
// Thread-safe.
inline bool try_enqueue(T const& item)
{
if (inner.try_enqueue(item)) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by moving it, if possible).
// Does not allocate memory (except for one-time implicit producer).
// Fails if not enough room to enqueue (or implicit production is
// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
// Thread-safe.
inline bool try_enqueue(T&& item)
{
if (inner.try_enqueue(std::move(item))) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by copying it) using an explicit producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Thread-safe.
inline bool try_enqueue(producer_token_t const& token, T const& item)
{
if (inner.try_enqueue(token, item)) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by moving it, if possible) using an explicit producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Thread-safe.
inline bool try_enqueue(producer_token_t const& token, T&& item)
{
if (inner.try_enqueue(token, std::move(item))) {
sema->signal();
return true;
}
return false;
}
// Enqueues several items.
// Does not allocate memory (except for one-time implicit producer).
// Fails if not enough room to enqueue (or implicit production is
// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
// Note: Use std::make_move_iterator if the elements should be moved
// instead of copied.
// Thread-safe.
template<typename It>
inline bool try_enqueue_bulk(It itemFirst, size_t count)
{
if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
return true;
}
return false;
}
// Enqueues several items using an explicit producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Note: Use std::make_move_iterator if the elements should be moved
// instead of copied.
// Thread-safe.
template<typename It>
inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
return true;
}
return false;
}
// Attempts to dequeue from the queue.
// Returns false if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename U>
inline bool try_dequeue(U& item)
{
if (sema->tryWait()) {
while (!inner.try_dequeue(item)) {
continue;
}
return true;
}
return false;
}
// Attempts to dequeue from the queue using an explicit consumer token.
// Returns false if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename U>
inline bool try_dequeue(consumer_token_t& token, U& item)
{
if (sema->tryWait()) {
while (!inner.try_dequeue(token, item)) {
continue;
}
return true;
}
return false;
}
// Attempts to dequeue several elements from the queue.
// Returns the number of items actually dequeued.
// Returns 0 if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename It>
inline size_t try_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
return count;
}
// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued.
// Returns 0 if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename It>
inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
return count;
}
// Blocks the current thread until there's something to dequeue, then
// dequeues it.
// Never allocates. Thread-safe.
template<typename U>
inline void wait_dequeue(U& item)
{
while (!sema->wait()) {
continue;
}
while (!inner.try_dequeue(item)) {
continue;
}
}
// Blocks the current thread until either there's something to dequeue
// or the timeout (specified in microseconds) expires. Returns false
// without setting `item` if the timeout expires, otherwise assigns
// to `item` and returns true.
// Using a negative timeout indicates an indefinite timeout,
// and is thus functionally equivalent to calling wait_dequeue.
// Never allocates. Thread-safe.
template<typename U>
inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
{
if (!sema->wait(timeout_usecs)) {
return false;
}
while (!inner.try_dequeue(item)) {
continue;
}
return true;
}
// Blocks the current thread until either there's something to dequeue
// or the timeout expires. Returns false without setting `item` if the
// timeout expires, otherwise assigns to `item` and returns true.
// Never allocates. Thread-safe.
template<typename U, typename Rep, typename Period>
inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
{
return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
}
// Blocks the current thread until there's something to dequeue, then
// dequeues it using an explicit consumer token.
// Never allocates. Thread-safe.
template<typename U>
inline void wait_dequeue(consumer_token_t& token, U& item)
{
while (!sema->wait()) {
continue;
}
while (!inner.try_dequeue(token, item)) {
continue;
}
}
// Blocks the current thread until either there's something to dequeue
// or the timeout (specified in microseconds) expires. Returns false
// without setting `item` if the timeout expires, otherwise assigns
// to `item` and returns true.
// Using a negative timeout indicates an indefinite timeout,
// and is thus functionally equivalent to calling wait_dequeue.
// Never allocates. Thread-safe.
template<typename U>
inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::int64_t timeout_usecs)
{
if (!sema->wait(timeout_usecs)) {
return false;
}
while (!inner.try_dequeue(token, item)) {
continue;
}
return true;
}
// Blocks the current thread until either there's something to dequeue
// or the timeout expires. Returns false without setting `item` if the
// timeout expires, otherwise assigns to `item` and returns true.
// Never allocates. Thread-safe.
template<typename U, typename Rep, typename Period>
inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::chrono::duration<Rep, Period> const& timeout)
{
return wait_dequeue_timed(token, item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
}
// Attempts to dequeue several elements from the queue.
// Returns the number of items actually dequeued, which will
// always be at least one (this method blocks until the queue
// is non-empty) and at most max.
// Never allocates. Thread-safe.
template<typename It>
inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
return count;
}
// Attempts to dequeue several elements from the queue.
// Returns the number of items actually dequeued, which can
// be 0 if the timeout expires while waiting for elements,
// and at most max.
// Using a negative timeout indicates an indefinite timeout,
// and is thus functionally equivalent to calling wait_dequeue_bulk.
// Never allocates. Thread-safe.
template<typename It>
inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
return count;
}
// Attempts to dequeue several elements from the queue.
// Returns the number of items actually dequeued, which can
// be 0 if the timeout expires while waiting for elements,
// and at most max.
// Never allocates. Thread-safe.
template<typename It, typename Rep, typename Period>
inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
{
return wait_dequeue_bulk_timed<It&>(itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
}
// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued, which will
// always be at least one (this method blocks until the queue
// is non-empty) and at most max.
// Never allocates. Thread-safe.
template<typename It>
inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
return count;
}
// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued, which can
// be 0 if the timeout expires while waiting for elements,
// and at most max.
// Using a negative timeout indicates an indefinite timeout,
// and is thus functionally equivalent to calling wait_dequeue_bulk.
// Never allocates. Thread-safe.
template<typename It>
inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
return count;
}
// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued, which can
// be 0 if the timeout expires while waiting for elements,
// and at most max.
// Never allocates. Thread-safe.
template<typename It, typename Rep, typename Period>
inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
{
return wait_dequeue_bulk_timed<It&>(token, itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
}
// Returns an estimate of the total number of elements currently in the queue. This
// estimate is only accurate if the queue has completely stabilized before it is called
// (i.e. all enqueue and dequeue operations have completed and their memory effects are
// visible on the calling thread, and no further operations start while this method is
// being called).
// Thread-safe.
inline size_t size_approx() const
{
return (size_t)sema->availableApprox();
}
// Returns true if the underlying atomic variables used by
// the queue are lock-free (they should be on most platforms).
// Thread-safe.
static bool is_lock_free()
{
return ConcurrentQueue::is_lock_free();
}
private:
template<typename U, typename A1, typename A2>
static inline U* create(A1&& a1, A2&& a2)
{
void* p = (Traits::malloc)(sizeof(U));
return p != nullptr ? new (p) U(std::forward<A1>(a1), std::forward<A2>(a2)) : nullptr;
}
template<typename U>
static inline void destroy(U* p)
{
if (p != nullptr) {
p->~U();
}
(Traits::free)(p);
}
private:
ConcurrentQueue inner;
std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
};
template<typename T, typename Traits>
inline void swap(BlockingConcurrentQueue<T, Traits>& a, BlockingConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
{
a.swap(b);
}
} // end namespace moodycamel

3743
deps/concurrentqueue/concurrentqueue.h vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,412 @@
// Provides an efficient implementation of a semaphore (LightweightSemaphore).
// This is an extension of Jeff Preshing's sempahore implementation (licensed
// under the terms of its separate zlib license) that has been adapted and
// extended by Cameron Desrochers.
#pragma once
#include <cstddef> // For std::size_t
#include <atomic>
#include <type_traits> // For std::make_signed<T>
#if defined(_WIN32)
// Avoid including windows.h in a header; we only need a handful of
// items, so we'll redeclare them here (this is relatively safe since
// the API generally has to remain stable between Windows versions).
// I know this is an ugly hack but it still beats polluting the global
// namespace with thousands of generic names or adding a .cpp for nothing.
extern "C" {
struct _SECURITY_ATTRIBUTES;
__declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
__declspec(dllimport) int __stdcall CloseHandle(void* hObject);
__declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
__declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
}
#elif defined(__MACH__)
#include <mach/mach.h>
#elif defined(__unix__)
#include <semaphore.h>
#endif
namespace moodycamel
{
namespace details
{
// Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
// portable + lightweight semaphore implementations, originally from
// https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
// LICENSE:
// Copyright (c) 2015 Jeff Preshing
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgement in the product documentation would be
// appreciated but is not required.
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution.
#if defined(_WIN32)
class Semaphore
{
private:
void* m_hSema;
Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
public:
Semaphore(int initialCount = 0)
{
assert(initialCount >= 0);
const long maxLong = 0x7fffffff;
m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
assert(m_hSema);
}
~Semaphore()
{
CloseHandle(m_hSema);
}
bool wait()
{
const unsigned long infinite = 0xffffffff;
return WaitForSingleObject(m_hSema, infinite) == 0;
}
bool try_wait()
{
return WaitForSingleObject(m_hSema, 0) == 0;
}
bool timed_wait(std::uint64_t usecs)
{
return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) == 0;
}
void signal(int count = 1)
{
while (!ReleaseSemaphore(m_hSema, count, nullptr));
}
};
#elif defined(__MACH__)
//---------------------------------------------------------
// Semaphore (Apple iOS and OSX)
// Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
//---------------------------------------------------------
class Semaphore
{
private:
semaphore_t m_sema;
Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
public:
Semaphore(int initialCount = 0)
{
assert(initialCount >= 0);
kern_return_t rc = semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
assert(rc == KERN_SUCCESS);
(void)rc;
}
~Semaphore()
{
semaphore_destroy(mach_task_self(), m_sema);
}
bool wait()
{
return semaphore_wait(m_sema) == KERN_SUCCESS;
}
bool try_wait()
{
return timed_wait(0);
}
bool timed_wait(std::uint64_t timeout_usecs)
{
mach_timespec_t ts;
ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000);
ts.tv_nsec = static_cast<int>((timeout_usecs % 1000000) * 1000);
// added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
kern_return_t rc = semaphore_timedwait(m_sema, ts);
return rc == KERN_SUCCESS;
}
void signal()
{
while (semaphore_signal(m_sema) != KERN_SUCCESS);
}
void signal(int count)
{
while (count-- > 0)
{
while (semaphore_signal(m_sema) != KERN_SUCCESS);
}
}
};
#elif defined(__unix__)
//---------------------------------------------------------
// Semaphore (POSIX, Linux)
//---------------------------------------------------------
class Semaphore
{
private:
sem_t m_sema;
Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
public:
Semaphore(int initialCount = 0)
{
assert(initialCount >= 0);
int rc = sem_init(&m_sema, 0, static_cast<unsigned int>(initialCount));
assert(rc == 0);
(void)rc;
}
~Semaphore()
{
sem_destroy(&m_sema);
}
bool wait()
{
// http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
int rc;
do {
rc = sem_wait(&m_sema);
} while (rc == -1 && errno == EINTR);
return rc == 0;
}
bool try_wait()
{
int rc;
do {
rc = sem_trywait(&m_sema);
} while (rc == -1 && errno == EINTR);
return rc == 0;
}
bool timed_wait(std::uint64_t usecs)
{
struct timespec ts;
const int usecs_in_1_sec = 1000000;
const int nsecs_in_1_sec = 1000000000;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += (time_t)(usecs / usecs_in_1_sec);
ts.tv_nsec += (long)(usecs % usecs_in_1_sec) * 1000;
// sem_timedwait bombs if you have more than 1e9 in tv_nsec
// so we have to clean things up before passing it in
if (ts.tv_nsec >= nsecs_in_1_sec) {
ts.tv_nsec -= nsecs_in_1_sec;
++ts.tv_sec;
}
int rc;
do {
rc = sem_timedwait(&m_sema, &ts);
} while (rc == -1 && errno == EINTR);
return rc == 0;
}
void signal()
{
while (sem_post(&m_sema) == -1);
}
void signal(int count)
{
while (count-- > 0)
{
while (sem_post(&m_sema) == -1);
}
}
};
#else
#error Unsupported platform! (No semaphore wrapper available)
#endif
} // end namespace details
//---------------------------------------------------------
// LightweightSemaphore
//---------------------------------------------------------
class LightweightSemaphore
{
public:
typedef std::make_signed<std::size_t>::type ssize_t;
private:
std::atomic<ssize_t> m_count;
details::Semaphore m_sema;
int m_maxSpins;
bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
{
ssize_t oldCount;
int spin = m_maxSpins;
while (--spin >= 0)
{
oldCount = m_count.load(std::memory_order_relaxed);
if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
return true;
std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
}
oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
if (oldCount > 0)
return true;
if (timeout_usecs < 0)
{
if (m_sema.wait())
return true;
}
if (timeout_usecs > 0 && m_sema.timed_wait((std::uint64_t)timeout_usecs))
return true;
// At this point, we've timed out waiting for the semaphore, but the
// count is still decremented indicating we may still be waiting on
// it. So we have to re-adjust the count, but only if the semaphore
// wasn't signaled enough times for us too since then. If it was, we
// need to release the semaphore too.
while (true)
{
oldCount = m_count.load(std::memory_order_acquire);
if (oldCount >= 0 && m_sema.try_wait())
return true;
if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
return false;
}
}
ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
{
assert(max > 0);
ssize_t oldCount;
int spin = m_maxSpins;
while (--spin >= 0)
{
oldCount = m_count.load(std::memory_order_relaxed);
if (oldCount > 0)
{
ssize_t newCount = oldCount > max ? oldCount - max : 0;
if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
return oldCount - newCount;
}
std::atomic_signal_fence(std::memory_order_acquire);
}
oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
if (oldCount <= 0)
{
if ((timeout_usecs == 0) || (timeout_usecs < 0 && !m_sema.wait()) || (timeout_usecs > 0 && !m_sema.timed_wait((std::uint64_t)timeout_usecs)))
{
while (true)
{
oldCount = m_count.load(std::memory_order_acquire);
if (oldCount >= 0 && m_sema.try_wait())
break;
if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
return 0;
}
}
}
if (max > 1)
return 1 + tryWaitMany(max - 1);
return 1;
}
public:
LightweightSemaphore(ssize_t initialCount = 0, int maxSpins = 10000) : m_count(initialCount), m_maxSpins(maxSpins)
{
assert(initialCount >= 0);
assert(maxSpins >= 0);
}
bool tryWait()
{
ssize_t oldCount = m_count.load(std::memory_order_relaxed);
while (oldCount > 0)
{
if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
return true;
}
return false;
}
bool wait()
{
return tryWait() || waitWithPartialSpinning();
}
bool wait(std::int64_t timeout_usecs)
{
return tryWait() || waitWithPartialSpinning(timeout_usecs);
}
// Acquires between 0 and (greedily) max, inclusive
ssize_t tryWaitMany(ssize_t max)
{
assert(max >= 0);
ssize_t oldCount = m_count.load(std::memory_order_relaxed);
while (oldCount > 0)
{
ssize_t newCount = oldCount > max ? oldCount - max : 0;
if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
return oldCount - newCount;
}
return 0;
}
// Acquires at least one, and (greedily) at most max
ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
{
assert(max >= 0);
ssize_t result = tryWaitMany(max);
if (result == 0 && max > 0)
result = waitManyWithPartialSpinning(max, timeout_usecs);
return result;
}
ssize_t waitMany(ssize_t max)
{
ssize_t result = waitMany(max, -1);
assert(result > 0);
return result;
}
void signal(ssize_t count = 1)
{
assert(count >= 0);
ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
ssize_t toRelease = -oldCount < count ? -oldCount : count;
if (toRelease > 0)
{
m_sema.signal((int)toRelease);
}
}
std::size_t availableApprox() const
{
ssize_t count = m_count.load(std::memory_order_relaxed);
return count > 0 ? static_cast<std::size_t>(count) : 0;
}
};
} // end namespace moodycamel

View File

@ -2041,3 +2041,14 @@ server-threads 2
# Enable FLASH support? (Enterprise Only)
# storage-provider flash /path/to/flash/db
# KeyDB will attempt to balance clients across threads evenly; However, replica clients
# are usually much more expensive than a normal client, and so KeyDB will try to assign
# fewer clients to threads with a replica. The weighting factor below is intented to help tune
# this behavior. A replica weighting factor of 2 means we treat a replica as the equivalent
# of two normal clients. Adjusting this value may improve performance when replication is
# used. The best weighting is workload specific - e.g. read heavy workloads should set
# this to 1. Very write heavy workloads may benefit from higher numbers.
#
# By default KeyDB sets this to 2.
replica-weighting-factor 2

View File

@ -12,6 +12,7 @@ public:
virtual const char *name() const = 0;
virtual size_t totalDiskspaceUsed() const = 0;
virtual bool FSlow() const = 0;
virtual size_t filedsRequired() const { return 0; }
};
class IStorage

View File

@ -246,7 +246,7 @@ endif
endif
# Include paths to dependencies
FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/license/
FINAL_CXXFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/rocksdb/include/ -I../deps/license
FINAL_CXXFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/rocksdb/include/ -I../deps/license -I../deps/concurrentqueue
# Determine systemd support and/or build preference (defaulting to auto-detection)
BUILD_WITH_SYSTEMD=no

View File

@ -136,9 +136,10 @@ void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const
size_t StorageCache::count() const
{
std::unique_lock<fastlock> ul(m_lock);
std::unique_lock<fastlock> ul(m_lock, std::defer_lock);
bool fLocked = ul.try_lock();
size_t count = m_spstorage->count();
if (m_pdict != nullptr) {
if (m_pdict != nullptr && fLocked) {
serverAssert(bulkInsertsInProgress.load(std::memory_order_seq_cst) || count == (dictSize(m_pdict) + m_collisionCount));
}
return count;
@ -147,4 +148,14 @@ size_t StorageCache::count() const
void StorageCache::beginWriteBatch() {
serverAssert(GlobalLocksAcquired()); // Otherwise we deadlock
m_spstorage->beginWriteBatch();
}
void StorageCache::emergencyFreeCache() {
dict *d = m_pdict;
m_pdict = nullptr;
if (d != nullptr) {
g_pserver->asyncworkqueue->AddWorkFunction([d]{
dictRelease(d);
});
}
}

View File

@ -43,6 +43,8 @@ public:
void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem);
void retrieve(sds key, IStorage::callbackSingle fn) const;
bool erase(sds key);
void emergencyFreeCache();
bool keycacheIsEnabled() const { return m_pdict != nullptr; }
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }

View File

@ -787,6 +787,7 @@ void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep,
}
thread_local spin_worker tl_worker = nullptr;
thread_local bool fOwnLockOverride = false;
void setAeLockSetThreadSpinWorker(spin_worker worker)
{
tl_worker = worker;
@ -807,9 +808,14 @@ void aeReleaseLock()
g_lock.unlock();
}
void aeSetThreadOwnsLockOverride(int fOverride)
{
fOwnLockOverride = fOverride;
}
int aeThreadOwnsLock()
{
return g_lock.fOwnLock();
return fOwnLockOverride || g_lock.fOwnLock();
}
int aeLockContested(int threshold)

View File

@ -168,6 +168,7 @@ void aeAcquireLock();
int aeTryAcquireLock(int fWeak);
void aeReleaseLock();
int aeThreadOwnsLock();
void aeSetThreadOwnsLockOverride(int fOverride);
int aeLockContested(int threshold);
int aeLockContention(); // returns the number of instantaneous threads waiting on the lock

View File

@ -357,6 +357,7 @@ bool initializeStorageProvider(const char **err)
{
// Create The Storage Factory (if necessary)
serverLog(LL_NOTICE, "Initializing FLASH storage provider (this may take a long time)");
adjustOpenFilesLimit();
g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0);
}
else if (!strcasecmp(g_sdsProvider, "test") && g_sdsArgs == nullptr)
@ -2468,7 +2469,10 @@ static int updateJemallocBgThread(int val, int prev, const char **err) {
static int updateReplBacklogSize(long long val, long long prev, const char **err) {
/* resizeReplicationBacklog sets g_pserver->repl_backlog_size, and relies on
* being able to tell when the size changes, so restore prev before calling it. */
UNUSED(err);
if (cserver.repl_backlog_disk_size) {
*err = "Unable to dynamically resize the backlog because disk backlog is enabled";
return 0;
}
g_pserver->repl_backlog_size = prev;
g_pserver->repl_backlog_config_size = val;
resizeReplicationBacklog(val);
@ -2723,7 +2727,9 @@ standardConfig configs[] = {
createBoolConfig("delete-on-evict", NULL, MODIFIABLE_CONFIG, cserver.delete_on_evict, 0, NULL, NULL),
createBoolConfig("use-fork", NULL, IMMUTABLE_CONFIG, cserver.fForkBgSave, 0, NULL, NULL),
createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0, NULL, NULL),
createBoolConfig("time-thread-priority", NULL, IMMUTABLE_CONFIG, cserver.time_thread_priority, 0, NULL, NULL),
createBoolConfig("prefetch-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->prefetch_enabled, 1, NULL, NULL),
createBoolConfig("allow-rdb-resize-op", NULL, MODIFIABLE_CONFIG, g_pserver->allowRdbResizeOp, 1, NULL, NULL),
createBoolConfig("crash-log-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->crashlog_enabled, 1, NULL, updateSighandlerEnabled),
createBoolConfig("crash-memcheck-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->memcheck_enabled, 1, NULL, NULL),
createBoolConfig("use-exit-on-panic", NULL, MODIFIABLE_CONFIG, g_pserver->use_exit_on_panic, 0, NULL, NULL),
@ -2802,6 +2808,7 @@ standardConfig configs[] = {
createIntConfig("min-clients-per-thread", NULL, MODIFIABLE_CONFIG, 0, 400, cserver.thread_min_client_threshold, 20, INTEGER_CONFIG, NULL, NULL),
createIntConfig("storage-flush-period", NULL, MODIFIABLE_CONFIG, 1, 10000, g_pserver->storage_flush_period, 500, INTEGER_CONFIG, NULL, NULL),
createIntConfig("replica-quorum", NULL, MODIFIABLE_CONFIG, -1, INT_MAX, g_pserver->repl_quorum, -1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("replica-weighting-factor", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, g_pserver->replicaIsolationFactor, 2, INTEGER_CONFIG, NULL, NULL),
/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
createUIntConfig("loading-process-events-interval-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->loading_process_events_interval_keys, 8192, MEMORY_CONFIG, NULL, NULL),
@ -2819,6 +2826,7 @@ standardConfig configs[] = {
createLongLongConfig("proto-max-bulk-len", NULL, MODIFIABLE_CONFIG, 1024*1024, LLONG_MAX, g_pserver->proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */
createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL),
createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, g_pserver->repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */
createLongLongConfig("repl-backlog-disk-reserve", NULL, IMMUTABLE_CONFIG, 0, LLONG_MAX, cserver.repl_backlog_disk_size, 0, MEMORY_CONFIG, NULL, NULL),
/* Unsigned Long Long configs */
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),

View File

@ -274,7 +274,7 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
return o;
}
bool dbAddCore(redisDb *db, sds key, robj *val, bool fUpdateMvcc, bool fAssumeNew = false) {
bool dbAddCore(redisDb *db, sds key, robj *val, bool fUpdateMvcc, bool fAssumeNew = false, dict_iter *piterExisting = nullptr) {
serverAssert(!val->FExpires());
sds copy = sdsdupshared(key);
@ -283,7 +283,7 @@ bool dbAddCore(redisDb *db, sds key, robj *val, bool fUpdateMvcc, bool fAssumeNe
setMvccTstamp(val, mvcc);
}
bool fInserted = db->insert(copy, val, fAssumeNew);
bool fInserted = db->insert(copy, val, fAssumeNew, piterExisting);
if (fInserted)
{
@ -353,8 +353,12 @@ void redisDb::dbOverwriteCore(redisDb::iter itr, sds keySds, robj *val, bool fUp
* This function does not modify the expire time of the existing key.
*
* The program is aborted if the key was not already present. */
void dbOverwrite(redisDb *db, robj *key, robj *val, bool fRemoveExpire) {
auto itr = db->find(key);
void dbOverwrite(redisDb *db, robj *key, robj *val, bool fRemoveExpire, dict_iter *pitrExisting) {
redisDb::iter itr;
if (pitrExisting != nullptr)
itr = *pitrExisting;
else
itr = db->find(key);
serverAssertWithInfo(NULL,key,itr != nullptr);
lookupKeyUpdateObj(itr.val(), LOOKUP_NONE);
@ -398,8 +402,9 @@ int dbMerge(redisDb *db, sds key, robj *val, int fReplace)
* in a context where there is no clear client performing the operation. */
void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
db->prepOverwriteForSnapshot(szFromObj(key));
if (!dbAddCore(db, szFromObj(key), val, true /* fUpdateMvcc */)) {
dbOverwrite(db, key, val, !keepttl);
dict_iter iter;
if (!dbAddCore(db, szFromObj(key), val, true /* fUpdateMvcc */, false /*fAssumeNew*/, &iter)) {
dbOverwrite(db, key, val, !keepttl, &iter);
}
incrRefCount(val);
if (signal) signalModifiedKey(c,db,key);
@ -2577,6 +2582,12 @@ void redisDbPersistentData::setStorageProvider(StorageCache *pstorage)
m_spstorage = std::unique_ptr<StorageCache>(pstorage);
}
void redisDbPersistentData::endStorageProvider()
{
serverAssert(m_spstorage != nullptr);
m_spstorage.reset();
}
void clusterStorageLoadCallback(const char *rgchkey, size_t cch, void *)
{
slotToKeyUpdateKeyCore(rgchkey, cch, true /*add*/);
@ -2605,11 +2616,20 @@ void redisDb::storageProviderInitialize()
}
}
bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew)
void redisDb::storageProviderDelete()
{
if (!fAssumeNew)
if (g_pserver->m_pstorageFactory != nullptr)
{
this->endStorageProvider();
}
}
bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew, dict_iter *piterExisting)
{
if (!fAssumeNew && (g_pserver->m_pstorageFactory != nullptr || m_pdbSnapshot != nullptr))
ensure(key);
int res = dictAdd(m_pdict, key, o);
dictEntry *de;
int res = dictAdd(m_pdict, key, o, &de);
serverAssert(FImplies(fAssumeNew, res == DICT_OK));
if (res == DICT_OK)
{
@ -2621,6 +2641,11 @@ bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew)
#endif
trackkey(key, false /* fUpdate */);
}
else
{
if (piterExisting)
*piterExisting = dict_iter(m_pdict, de);
}
return (res == DICT_OK);
}
@ -2912,6 +2937,35 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
return (m_spstorage != nullptr);
}
void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
{
++pendingJobs;
serverAssert(!m_fAllChanged);
dictEmpty(m_dictChanged, nullptr);
dict *dictNew = dictCreate(&dbDictType, nullptr);
std::swap(dictNew, m_pdict);
m_cnewKeysPending = 0;
g_pserver->asyncworkqueue->AddWorkFunction([dictNew, this, &pendingJobs]{
dictIterator *di = dictGetIterator(dictNew);
dictEntry *de;
std::vector<sds> veckeys;
std::vector<sds> vecvals;
while ((de = dictNext(di)) != nullptr)
{
robj *o = (robj*)dictGetVal(de);
sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o);
veckeys.push_back((sds)dictGetKey(de));
vecvals.push_back(temp);
}
m_spstorage->bulkInsert(veckeys.data(), vecvals.data(), veckeys.size());
for (auto val : vecvals)
sdsfree(val);
dictReleaseIterator(di);
dictRelease(dictNew);
--pendingJobs;
});
}
void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
{
if (m_pdbSnapshotStorageFlush)
@ -2985,7 +3039,7 @@ size_t redisDbPersistentData::size(bool fCachedOnly) const
+ (m_pdbSnapshot ? (m_pdbSnapshot->size(fCachedOnly) - dictSize(m_pdictTombstone)) : 0);
}
bool redisDbPersistentData::removeCachedValue(const char *key)
bool redisDbPersistentData::removeCachedValue(const char *key, dictEntry **ppde)
{
serverAssert(m_spstorage != nullptr);
// First ensure its not a pending key
@ -3001,7 +3055,11 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
}
// since we write ASAP the database already has a valid copy so safe to delete
dictDelete(m_pdict, key);
if (ppde != nullptr) {
*ppde = dictUnlink(m_pdict, key);
} else {
dictDelete(m_pdict, key);
}
if (m_spstorage != nullptr)
m_spstorage->batch_unlock();
@ -3045,6 +3103,20 @@ void redisDbPersistentData::removeAllCachedValues()
}
}
void redisDbPersistentData::disableKeyCache()
{
if (m_spstorage == nullptr)
return;
m_spstorage->emergencyFreeCache();
}
bool redisDbPersistentData::keycacheIsEnabled()
{
if (m_spstorage == nullptr)
return false;
return m_spstorage->keycacheIsEnabled();
}
void redisDbPersistentData::trackkey(const char *key, bool fUpdate)
{
if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) {
@ -3152,7 +3224,7 @@ int dbnumFromDb(redisDb *db)
serverPanic("invalid database pointer");
}
void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command)
bool redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command, bool fExecOK)
{
if (m_spstorage == nullptr) {
#if defined(__x86_64__) || defined(__i386__)
@ -3183,7 +3255,7 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
}
}
#endif
return;
return false;
}
AeLocker lock;
@ -3193,7 +3265,7 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
getKeysResult result = GETKEYS_RESULT_INIT;
auto cmd = lookupCommand(szFromObj(command.argv[0]));
if (cmd == nullptr)
return; // Bad command? It's not for us to judge, just bail
return false; // Bad command? It's not for us to judge, just bail
int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result);
for (int ikey = 0; ikey < numkeys; ++ikey)
{
@ -3225,41 +3297,61 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
}
}
lock.arm(c);
for (auto &tuple : vecInserts)
{
sds sharedKey = std::get<0>(tuple);
robj *o = std::get<1>(tuple);
std::unique_ptr<expireEntry> spexpire = std::move(std::get<2>(tuple));
if (o != nullptr)
bool fNoInsert = false;
if (!vecInserts.empty()) {
lock.arm(c);
for (auto &tuple : vecInserts)
{
if (this->find_cached_threadsafe(sharedKey) != nullptr)
sds sharedKey = std::get<0>(tuple);
robj *o = std::get<1>(tuple);
std::unique_ptr<expireEntry> spexpire = std::move(std::get<2>(tuple));
if (o != nullptr)
{
// While unlocked this was already ensured
decrRefCount(o);
sdsfree(sharedKey);
if (this->find_cached_threadsafe(sharedKey) != nullptr)
{
// While unlocked this was already ensured
decrRefCount(o);
sdsfree(sharedKey);
fNoInsert = true;
}
else
{
if (spexpire != nullptr) {
if (spexpire->when() < mstime()) {
fNoInsert = true;
break;
}
}
dictAdd(m_pdict, sharedKey, o);
o->SetFExpires(spexpire != nullptr);
if (spexpire != nullptr)
{
auto itr = m_setexpire->find(sharedKey);
if (itr != m_setexpire->end())
m_setexpire->erase(itr);
m_setexpire->insert(std::move(*spexpire));
serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end());
}
serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end()));
}
}
else
{
dictAdd(m_pdict, sharedKey, o);
o->SetFExpires(spexpire != nullptr);
if (spexpire != nullptr)
{
auto itr = m_setexpire->find(sharedKey);
if (itr != m_setexpire->end())
m_setexpire->erase(itr);
m_setexpire->insert(std::move(*spexpire));
serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end());
}
serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end()));
if (sharedKey != nullptr)
sdsfree(sharedKey); // BUG but don't bother crashing
}
}
else
{
if (sharedKey != nullptr)
sdsfree(sharedKey); // BUG but don't bother crashing
lock.disarm();
}
if (fExecOK && !fNoInsert && cmd->proc == getCommand && !vecInserts.empty()) {
robj *o = std::get<1>(vecInserts[0]);
if (o != nullptr) {
addReplyBulk(c, o);
return true;
}
}
return false;
}

View File

@ -408,7 +408,7 @@ dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata);
int empty_visits = buckets;
int empty_visits = buckets*10;
while (d->asyncdata->queue.size() < (size_t)buckets && (size_t)d->rehashidx < d->ht[0].size) {
dictEntry *de;
@ -577,9 +577,9 @@ static void _dictRehashStep(dict *d) {
}
/* Add an element to the target hash table */
int dictAdd(dict *d, void *key, void *val)
int dictAdd(dict *d, void *key, void *val, dictEntry **existing)
{
dictEntry *entry = dictAddRaw(d,key,NULL);
dictEntry *entry = dictAddRaw(d,key,existing);
if (!entry) return DICT_ERR;
dictSetVal(d, entry, val);
@ -713,6 +713,7 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
}
}
d->ht[table].used--;
_dictExpandIfNeeded(d);
return he;
}
prevHe = he;

View File

@ -206,7 +206,7 @@ typedef void (dictScanBucketFunction)(void *privdata, dictEntry **bucketref);
dict *dictCreate(dictType *type, void *privDataPtr);
int dictExpand(dict *d, unsigned long size, bool fShrink = false);
int dictTryExpand(dict *d, unsigned long size, bool fShrink);
int dictAdd(dict *d, void *key, void *val);
int dictAdd(dict *d, void *key, void *val, dictEntry **existing = nullptr);
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing);
dictEntry *dictAddOrFind(dict *d, void *key);
int dictReplace(dict *d, void *key, void *val);

View File

@ -34,6 +34,7 @@
#include "bio.h"
#include "atomicvar.h"
#include <mutex>
#include <map>
#include <math.h>
/* ----------------------------------------------------------------------------
@ -461,6 +462,69 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
return C_ERR;
}
class FreeMemoryLazyFree : public ICollectable
{
ssize_t m_cb = 0;
std::vector<std::pair<dict*, std::vector<dictEntry*>>> vecdictvecde;
public:
static std::atomic<int> s_clazyFreesInProgress;
FreeMemoryLazyFree() {
s_clazyFreesInProgress++;
}
FreeMemoryLazyFree(const FreeMemoryLazyFree&) = delete;
FreeMemoryLazyFree(FreeMemoryLazyFree&&) = default;
~FreeMemoryLazyFree() {
aeAcquireLock();
for (auto &pair : vecdictvecde) {
for (auto de : pair.second) {
dictFreeUnlinkedEntry(pair.first, de);
}
}
aeReleaseLock();
--s_clazyFreesInProgress;
}
ssize_t addEntry(dict *d, dictEntry *de) {
ssize_t cbFreedNow = 0;
ssize_t cb = sizeof(dictEntry);
cb += sdsAllocSize((sds)dictGetKey(de));
robj *o = (robj*)dictGetVal(de);
switch (o->type) {
case OBJ_STRING:
cb += getStringObjectSdsUsedMemory(o)+sizeof(robj);
break;
default:
// If we don't know about it we can't accurately track the memory so free now
cbFreedNow = zmalloc_used_memory();
decrRefCount(o);
cbFreedNow -= zmalloc_used_memory();
de->v.val = nullptr;
}
auto itr = std::lower_bound(vecdictvecde.begin(), vecdictvecde.end(), d,
[](const std::pair<dict*, std::vector<dictEntry*>> &a, dict *d) -> bool {
return a.first < d;
}
);
if (itr == vecdictvecde.end() || itr->first != d) {
itr = vecdictvecde.insert(itr, std::make_pair(d, std::vector<dictEntry*>()));
}
serverAssert(itr->first == d);
itr->second.push_back(de);
m_cb += cb;
return cb + cbFreedNow;
}
size_t memory_queued() { return m_cb; }
};
std::atomic<int> FreeMemoryLazyFree::s_clazyFreesInProgress {0};
/* Return 1 if used memory is more than maxmemory after allocating more memory,
* return 0 if not. Redis may reject user's requests or evict some keys if used
* memory exceeds maxmemory, especially, when we allocate huge memory at once. */
@ -509,6 +573,9 @@ static int isSafeToPerformEvictions(void) {
* and just be masters exact copies. */
if (g_pserver->m_pstorageFactory == nullptr && listLength(g_pserver->masters) && g_pserver->repl_slave_ignore_maxmemory && !g_pserver->fActiveReplica) return 0;
/* If we have a lazy free obj pending, our amounts will be off, wait for it to go away */
if (FreeMemoryLazyFree::s_clazyFreesInProgress > 0) return 0;
/* When clients are paused the dataset should be static not just from the
* POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed. */
@ -573,6 +640,8 @@ int performEvictions(bool fPreSnapshot) {
int result = EVICT_FAIL;
int ckeysFailed = 0;
std::unique_ptr<FreeMemoryLazyFree> splazy = std::make_unique<FreeMemoryLazyFree>();
if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL,false,fPreSnapshot) == C_OK)
return EVICT_OK;
@ -662,7 +731,7 @@ int performEvictions(bool fPreSnapshot) {
/* volatile-random and allkeys-random policy */
if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM
|| fEvictToStorage)
|| fFallback)
{
/* When evicting a random key, we try to evict a key for
* each DB, so we use the static 'next_db' variable to
@ -698,9 +767,9 @@ int performEvictions(bool fPreSnapshot) {
if (fEvictToStorage)
{
// This key is in the storage so we only need to free the object
delta = (long long) zmalloc_used_memory();
if (db->removeCachedValue(bestkey)) {
delta -= (long long) zmalloc_used_memory();
dictEntry *deT;
if (db->removeCachedValue(bestkey, &deT)) {
mem_freed += splazy->addEntry(db->dictUnsafeKeyOnly(), deT);
ckeysFailed = 0;
}
else {
@ -709,7 +778,6 @@ int performEvictions(bool fPreSnapshot) {
if (ckeysFailed > 1024)
goto cant_free;
}
mem_freed += delta;
}
else
{
@ -764,7 +832,7 @@ int performEvictions(bool fPreSnapshot) {
/* After some time, exit the loop early - even if memory limit
* hasn't been reached. If we suddenly need to free a lot of
* memory, don't want to spend too much time here. */
if (elapsedUs(evictionTimer) > eviction_time_limit_us) {
if (g_pserver->m_pstorageFactory == nullptr && elapsedUs(evictionTimer) > eviction_time_limit_us) {
// We still need to free memory - start eviction timer proc
if (!isEvictionProcRunning && serverTL->el != nullptr) {
isEvictionProcRunning = 1;
@ -781,6 +849,10 @@ int performEvictions(bool fPreSnapshot) {
/* at this point, the memory is OK, or we have reached the time limit */
result = (isEvictionProcRunning) ? EVICT_RUNNING : EVICT_OK;
if (splazy != nullptr && splazy->memory_queued() > 0 && !serverTL->gcEpoch.isReset()) {
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy));
}
cant_free:
if (g_pserver->m_pstorageFactory)
{
@ -796,10 +868,15 @@ cant_free:
redisDb *db = g_pserver->db[idb];
if (db->FStorageProvider())
{
serverLog(LL_WARNING, "Failed to evict keys, falling back to flushing entire cache. Consider increasing maxmemory-samples.");
db->removeAllCachedValues();
if (((mem_reported - zmalloc_used_memory()) + mem_freed) >= mem_tofree)
result = EVICT_OK;
if (db->size() != 0 && db->size(true /*fcachedOnly*/) == 0 && db->keycacheIsEnabled()) {
serverLog(LL_WARNING, "Key cache exceeds maxmemory, freeing - performance may be affected increase maxmemory if possible");
db->disableKeyCache();
} else if (db->size(true /*fCachedOnly*/)) {
serverLog(LL_WARNING, "Failed to evict keys, falling back to flushing entire cache. Consider increasing maxmemory-samples.");
db->removeAllCachedValues();
if (((mem_reported - zmalloc_used_memory()) + mem_freed) >= mem_tofree)
result = EVICT_OK;
}
}
}
}

View File

@ -158,7 +158,6 @@ client *createClient(connection *conn, int iel) {
c->flags = 0;
c->fPendingAsyncWrite = FALSE;
c->fPendingAsyncWriteHandler = FALSE;
c->fPendingReplicaWrite = FALSE;
c->ctime = c->lastinteraction = g_pserver->unixtime;
/* If the default user does not require authentication, the user is
* directly authenticated. */
@ -318,7 +317,7 @@ int prepareClientToWrite(client *c) {
/* Schedule the client to write the output buffers to the socket, unless
* it should already be setup to do so (it has already pending data). */
if (!fAsync && !clientHasPendingReplies(c) && !c->fPendingReplicaWrite) clientInstallWriteHandler(c);
if (!fAsync && (c->flags & CLIENT_SLAVE || !clientHasPendingReplies(c))) clientInstallWriteHandler(c);
if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
/* Authorize the caller to queue in the output buffer of this client. */
@ -1132,7 +1131,7 @@ void copyClientOutputBuffer(client *dst, client *src) {
/* Return true if the specified client has pending reply buffers to write to
* the socket. */
int clientHasPendingReplies(client *c) {
return (c->bufpos || listLength(c->reply));
return (c->bufpos || listLength(c->reply) || c->FPendingReplicaWrite());
}
static std::atomic<int> rgacceptsInFlight[MAX_EVENT_LOOPS];
@ -1145,6 +1144,8 @@ int chooseBestThreadForAccept()
int cclientsThread;
atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread);
cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed);
// Note: Its repl factor less one because cclients also includes replicas, so we don't want to double count
cclientsThread += (g_pserver->rgthreadvar[iel].cclientsReplica) * (g_pserver->replicaIsolationFactor-1);
if (cclientsThread < cserver.thread_min_client_threshold)
return iel;
if (cclientsThread < cclientsMin)
@ -1313,7 +1314,9 @@ void acceptOnThread(connection *conn, int flags, char *cip)
}
else
{
ielTarget = chooseBestThreadForAccept();
// Cluster connections are more transient, so its not worth the cost to balance
// we can trust that SO_REUSEPORT is doing its job of distributing connections
ielTarget = g_pserver->cluster_enabled ? ielCur : chooseBestThreadForAccept();
}
rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed);
@ -1667,6 +1670,7 @@ bool freeClient(client *c) {
ln = listSearchKey(l,c);
serverAssert(ln != NULL);
listDelNode(l,ln);
g_pserver->rgthreadvar[c->iel].cclientsReplica--;
/* We need to remember the time when we started to have zero
* attached slaves, as after some time we'll free the replication
* backlog. */
@ -1785,104 +1789,105 @@ int writeToClient(client *c, int handler_installed) {
std::unique_lock<decltype(c->lock)> lock(c->lock);
while(clientHasPendingReplies(c)) {
serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR);
if (c->bufpos > 0) {
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
o = (clientReplyBlock*)listNodeValue(listFirst(c->reply));
if (o->used == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
continue;
}
nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
/* If we fully sent the object on head go to the next one */
if (c->sentlen == o->used) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
}
}
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interface).
*
* However if we are over the maxmemory limit we ignore that and
* just deliver as much data as it is possible to deliver.
*
* Moreover, we also send as much as possible if the client is
* a replica or a monitor (otherwise, on high-speed traffic, the
* replication/output buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(g_pserver->maxmemory == 0 ||
zmalloc_used_memory() < g_pserver->maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
/* We can only directly read from the replication backlog if the client
is a replica, so only attempt to do so if that's the case. */
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) {
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
serverAssert(c->repl_curr_off != -1);
if (c->repl_curr_off != c->repl_end_off){
long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off);
long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog
* in the event of a wrap around write */
/* normal case with no wrap around */
if (repl_end_idx >= repl_curr_idx){
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx);
/* wrap around case */
while (clientHasPendingReplies(c)) {
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
serverAssert(c->repl_curr_off != -1);
if (c->repl_curr_off != c->repl_end_off){
long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off);
long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog
* in the event of a wrap around write */
/* normal case with no wrap around */
if (repl_end_idx >= repl_curr_idx){
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx);
/* wrap around case */
} else {
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx);
/* only attempt wrapping if we write the correct number of bytes */
if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){
nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx);
if (nwritten2ndStage != -1)
nwritten += nwritten2ndStage;
}
}
/* only increment bytes if an error didn't occur */
if (nwritten > 0){
totwritten += nwritten;
c->repl_curr_off += nwritten;
serverAssert(c->repl_curr_off <= c->repl_end_off);
}
/* If the second part of a write didn't go through, we still need to register that */
if (nwritten2ndStage == -1) nwritten = -1;
if (nwritten == -1)
break;
} else {
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx);
/* only attempt wrapping if we write the correct number of bytes */
if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){
nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx);
if (nwritten2ndStage != -1)
nwritten += nwritten2ndStage;
}
break;
}
/* only increment bytes if an error didn't occur */
if (nwritten > 0){
}
} else {
while(clientHasPendingReplies(c)) {
serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR);
if (c->bufpos > 0) {
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
c->repl_curr_off += nwritten;
serverAssert(c->repl_curr_off <= c->repl_end_off);
/* If the client's current offset matches the last offset it can read from, there is no pending write */
if (c->repl_curr_off == c->repl_end_off){
c->fPendingReplicaWrite = false;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
o = (clientReplyBlock*)listNodeValue(listFirst(c->reply));
if (o->used == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
continue;
}
nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
/* If we fully sent the object on head go to the next one */
if (c->sentlen == o->used) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
}
}
/* If the second part of a write didn't go through, we still need to register that */
if (nwritten2ndStage == -1) nwritten = -1;
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interface).
*
* However if we are over the maxmemory limit we ignore that and
* just deliver as much data as it is possible to deliver.
*
* Moreover, we also send as much as possible if the client is
* a replica or a monitor (otherwise, on high-speed traffic, the
* replication/output buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(g_pserver->maxmemory == 0 ||
zmalloc_used_memory() < g_pserver->maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
}
g_pserver->stat_net_output_bytes += totwritten;
if (nwritten == -1) {
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
@ -1900,7 +1905,7 @@ int writeToClient(client *c, int handler_installed) {
* We just rely on data / pings received for timeout detection. */
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime;
}
if (!clientHasPendingReplies(c) && !c->fPendingReplicaWrite) {
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
if (handler_installed) connSetWriteHandler(c->conn, NULL);
@ -2024,7 +2029,6 @@ void ProcessPendingAsyncWrites()
* need to use a syscall in order to install the writable event handler,
* get it called, and so forth. */
int handleClientsWithPendingWrites(int iel, int aof_state) {
std::unique_lock<fastlock> lockf(g_pserver->rgthreadvar[iel].lockPendingWrite);
int processed = 0;
serverAssert(iel == (serverTL - g_pserver->rgthreadvar));
@ -2047,7 +2051,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
ae_flags |= AE_BARRIER;
}
std::unique_lock<fastlock> lockf(g_pserver->rgthreadvar[iel].lockPendingWrite);
auto vec = std::move(g_pserver->rgthreadvar[iel].clients_pending_write);
lockf.unlock();
processed += (int)vec.size();
for (client *c : vec) {
@ -2064,7 +2070,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;
/* Try to write buffers to the client socket. */
/* Try to write buffers to the client socket, unless its a replica in multithread mode */
if (writeToClient(c,0) == C_ERR)
{
if (c->flags & CLIENT_CLOSE_ASAP)
@ -2080,7 +2086,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c) || c->fPendingReplicaWrite) {
if (clientHasPendingReplies(c)) {
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) {
freeClientAsync(c);
}
@ -2551,11 +2557,13 @@ void parseClientCommandBuffer(client *c) {
}
/* Prefetch outside the lock for better perf */
if (g_pserver->prefetch_enabled && cserver.cthreads > 1 && cqueriesStart < c->vecqueuedcmd.size() &&
if (g_pserver->prefetch_enabled && (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) && cqueriesStart < c->vecqueuedcmd.size() &&
(g_pserver->m_pstorageFactory || aeLockContested(cserver.cthreads/2) || cserver.cthreads == 1) && !GlobalLocksAcquired()) {
auto &query = c->vecqueuedcmd.back();
if (query.argc > 0 && query.argc == query.argcMax) {
c->db->prefetchKeysAsync(c, query);
if (c->db->prefetchKeysAsync(c, query, c->vecqueuedcmd.size() == 1)) {
c->vecqueuedcmd.erase(c->vecqueuedcmd.begin());
}
}
}
c->reqtype = 0;
@ -2710,7 +2718,7 @@ void readQueryFromClient(connection *conn) {
return;
}
if (cserver.cthreads > 1) {
if (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) {
parseClientCommandBuffer(c);
if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) {
// Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often
@ -3738,7 +3746,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
/* In the case of a replica client, writes to said replica are using data from the replication backlog
* as opposed to it's own internal buffer, this number should keep track of that */
unsigned long getClientReplicationBacklogSharedUsage(client *c) {
return (!(c->flags & CLIENT_SLAVE) || !c->fPendingReplicaWrite ) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off;
return (!(c->flags & CLIENT_SLAVE) || !c->FPendingReplicaWrite() ) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off;
}
/* This function returns the number of bytes that Redis is

View File

@ -27,14 +27,17 @@ void *operator new(std::size_t size, const std::nothrow_t &) noexcept
return zmalloc(size, MALLOC_LOCAL);
}
//need to do null checks for delete since the compiler can optimize out null checks in zfree
void operator delete(void * p) noexcept
{
zfree(p);
if (p != nullptr)
zfree(p);
}
void operator delete(void *p, std::size_t) noexcept
{
zfree(p);
if (p != nullptr)
zfree(p);
}
#endif

View File

@ -2563,6 +2563,355 @@ void stopSaving(int success) {
NULL);
}
class JobBase
{
public:
enum class JobType {
Function,
Insert
};
JobType type;
JobBase(JobType type)
: type(type)
{}
virtual ~JobBase() = default;
};
struct rdbInsertJob : public JobBase
{
redisDb *db = nullptr;
sds key = nullptr;
robj *val = nullptr;
long long lru_clock;
long long expiretime;
long long lru_idle;
long long lfu_freq;
std::vector<std::pair<robj_sharedptr, long long>> vecsubexpires;
void addSubexpireKey(robj *subkey, long long when) {
vecsubexpires.push_back(std::make_pair(robj_sharedptr(subkey), when));
decrRefCount(subkey);
}
rdbInsertJob()
: JobBase(JobBase::JobType::Insert)
{}
rdbInsertJob(rdbInsertJob &&src)
: JobBase(JobBase::JobType::Insert)
{
db = src.db;
src.db = nullptr;
key = src.key;
src.key = nullptr;
val = src.val;
src.val = nullptr;
lru_clock = src.lru_clock;
expiretime = src.expiretime;
lru_idle = src.lru_idle;
lfu_freq = src.lfu_freq;
vecsubexpires = std::move(src.vecsubexpires);
}
~rdbInsertJob() {
if (key)
sdsfree(key);
if (val)
decrRefCount(val);
}
};
struct rdbFunctionJob : public JobBase
{
public:
std::function<void()> m_fn;
rdbFunctionJob(std::function<void()> &&fn)
: JobBase(JobBase::JobType::Function), m_fn(fn)
{}
};
class rdbAsyncWorkThread
{
rdbSaveInfo *rsi;
int rdbflags;
moodycamel::BlockingConcurrentQueue<JobBase*> queueJobs;
fastlock m_lockPause { "rdbAsyncWork-Pause"};
bool fLaunched = false;
std::atomic<int> fExit {false};
std::atomic<size_t> ckeysLoaded;
std::atomic<int> cstorageWritesInFlight;
std::atomic<bool> workerThreadDone;
std::thread m_thread;
std::vector<JobBase*> vecbatch;
long long now;
long long lastPing = -1;
static void listFreeMethod(const void *v) {
delete reinterpret_cast<const JobBase*>(v);
}
public:
rdbAsyncWorkThread(rdbSaveInfo *rsi, int rdbflags, long long now)
: rsi(rsi), rdbflags(rdbflags), now(now)
{
ckeysLoaded = 0;
cstorageWritesInFlight = 0;
}
~rdbAsyncWorkThread() {
fExit = true;
while (m_lockPause.fOwnLock())
m_lockPause.unlock();
if (m_thread.joinable())
endWork();
}
void start() {
serverAssert(!fLaunched);
m_thread = std::thread(&rdbAsyncWorkThread::loadWorkerThreadMain, this);
fLaunched = true;
}
void throttle() {
if (g_pserver->m_pstorageFactory && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
while ((cstorageWritesInFlight.load(std::memory_order_relaxed) || queueJobs.size_approx()) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
usleep(1);
pauseExecution();
ProcessWhileBlocked();
resumeExecution();
}
if ((getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
for (int idb = 0; idb < cserver.dbnum; ++idb) {
redisDb *db = g_pserver->db[idb];
if (db->size() > 0 && db->keycacheIsEnabled()) {
serverLog(LL_WARNING, "Key cache %d exceeds maxmemory during load, freeing - performance may be affected increase maxmemory if possible", idb);
db->disableKeyCache();
}
}
}
}
}
void enqueue(std::unique_ptr<rdbInsertJob> &spjob) {
vecbatch.push_back(spjob.release());
if (vecbatch.size() >= 64) {
queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size());
vecbatch.clear();
throttle();
}
}
void pauseExecution() {
m_lockPause.lock();
}
void resumeExecution() {
m_lockPause.unlock();
}
void enqueue(std::function<void()> &&fn) {
std::unique_ptr<JobBase> spjob = std::make_unique<rdbFunctionJob>(std::move(fn));
queueJobs.enqueue(spjob.release());
throttle();
}
void ProcessWhileBlocked() {
if ((mstime() - lastPing) > 1000) { // Ping if its been a second or longer
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln);
if (mi->masterhost && mi->repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster(mi);
}
lastPing = mstime();
}
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
}
size_t ckeys() { return ckeysLoaded; }
size_t endWork() {
if (!vecbatch.empty()) {
queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size());
vecbatch.clear();
}
std::atomic_thread_fence(std::memory_order_seq_cst); // The queue must have transferred to the consumer before we call fExit
serverAssert(fLaunched);
fExit = true;
if (g_pserver->m_pstorageFactory) {
// If we have a storage provider it can take some time to complete and we want to process events in the meantime
while (!workerThreadDone) {
usleep(10);
pauseExecution();
ProcessWhileBlocked();
resumeExecution();
}
}
m_thread.join();
while (cstorageWritesInFlight.load(std::memory_order_seq_cst)) {
usleep(10);
ProcessWhileBlocked();
}
fLaunched = false;
fExit = false;
serverAssert(queueJobs.size_approx() == 0);
return ckeysLoaded;
}
void processJob(rdbInsertJob &job) {
redisObjectStack keyobj;
initStaticStringObject(keyobj,job.key);
bool f1024thKey = false;
bool fStaleMvccKey = (this->rsi) ? mvccFromObj(job.val) < this->rsi->mvccMinThreshold : false;
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the replica. */
bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != -1 && job.expiretime < this->now;
if (fStaleMvccKey || fExpiredKey) {
if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->mi != nullptr && this->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) {
// We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us
robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key)));
this->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup);
}
sdsfree(job.key);
job.key = nullptr;
decrRefCount(job.val);
job.val = nullptr;
} else {
/* Add the new object in the hash table */
int fInserted = dbMerge(job.db, job.key, job.val, (this->rsi && this->rsi->fForceSetKey) || (this->rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef
if (fInserted)
{
auto ckeys = this->ckeysLoaded.fetch_add(1, std::memory_order_relaxed);
f1024thKey = (ckeys % 1024) == 0;
/* Set the expire time if needed */
if (job.expiretime != -1)
{
setExpire(NULL,job.db,&keyobj,nullptr,job.expiretime);
}
/* Set usage information (for eviction). */
objectSetLRUOrLFU(job.val,job.lfu_freq,job.lru_idle,job.lru_clock,1000);
/* call key space notification on key loaded for modules only */
moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, job.db->id);
replicationNotifyLoadedKey(job.db, &keyobj, job.val, job.expiretime);
for (auto &pair : job.vecsubexpires)
{
setExpire(NULL, job.db, &keyobj, pair.first, pair.second);
replicateSubkeyExpire(job.db, &keyobj, pair.first.get(), pair.second);
}
job.val = nullptr; // don't free this as we moved ownership to the DB
}
}
/* If we have a storage provider check if we need to evict some keys to stay under our memory limit,
do this every 16 keys to limit the perf impact */
if (g_pserver->m_pstorageFactory && f1024thKey)
{
bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK);
if (fHighMemory || f1024thKey)
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
if (g_pserver->m_pstorageFactory) {
g_pserver->db[idb]->processChangesAsync(this->cstorageWritesInFlight);
fHighMemory = false;
}
}
if (fHighMemory)
performEvictions(false /* fPreSnapshot*/);
}
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
}
}
static void loadWorkerThreadMain(rdbAsyncWorkThread *pqueue) {
rdbAsyncWorkThread &queue = *pqueue;
redisServerThreadVars vars = {};
vars.clients_pending_asyncwrite = listCreate();
serverTL = &vars;
aeSetThreadOwnsLockOverride(true);
// We will inheret the server thread's affinity mask, clear it as we want to run on a different core.
cpu_set_t *cpuset = CPU_ALLOC(std::thread::hardware_concurrency());
if (cpuset != nullptr) {
size_t size = CPU_ALLOC_SIZE(std::thread::hardware_concurrency());
CPU_ZERO_S(size, cpuset);
for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
CPU_SET_S(i, size, cpuset);
}
pthread_setaffinity_np(pthread_self(), size, cpuset);
CPU_FREE(cpuset);
}
for (;;) {
if (queue.queueJobs.size_approx() == 0) {
if (queue.fExit.load(std::memory_order_relaxed))
break;
}
if (queue.fExit.load(std::memory_order_seq_cst) && queue.queueJobs.size_approx() == 0)
break;
vars.gcEpoch = g_pserver->garbageCollector.startEpoch();
JobBase *rgjob[64];
int cjobs = 0;
while ((cjobs = pqueue->queueJobs.wait_dequeue_bulk_timed(rgjob, 64, std::chrono::milliseconds(5))) > 0) {
std::unique_lock<fastlock> ulPause(pqueue->m_lockPause);
for (int ijob = 0; ijob < cjobs; ++ijob) {
JobBase *pjob = rgjob[ijob];
switch (pjob->type)
{
case JobBase::JobType::Insert:
pqueue->processJob(*static_cast<rdbInsertJob*>(pjob));
break;
case JobBase::JobType::Function:
static_cast<rdbFunctionJob*>(pjob)->m_fn();
break;
}
delete pjob;
}
}
g_pserver->garbageCollector.endEpoch(vars.gcEpoch);
}
if (g_pserver->m_pstorageFactory) {
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb]->processChangesAsync(queue.cstorageWritesInFlight);
}
queue.workerThreadDone = true;
ProcessPendingAsyncWrites();
listRelease(vars.clients_pending_asyncwrite);
aeSetThreadOwnsLockOverride(false);
}
};
/* Track loading progress in order to serve client's from time to time
and if needed calculate rdb checksum */
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
@ -2574,6 +2923,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
(g_pserver->loading_process_events_interval_keys &&
(r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys)))
{
rdbAsyncWorkThread *pwthread = reinterpret_cast<rdbAsyncWorkThread*>(r->chksum_arg);
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
@ -2584,7 +2935,13 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
replicationSendNewlineToMaster(mi);
}
loadingProgress(r->processed_bytes);
if (pwthread)
pwthread->pauseExecution();
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
if (pwthread)
pwthread->resumeExecution();
processModuleLoadingProgressEvent(0);
robj *ping_argv[1];
@ -2597,28 +2954,38 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
}
}
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
uint64_t dbid = 0;
int type, rdbver;
redisDb *db = g_pserver->db[dbid];
redisDb *dbCur = g_pserver->db[dbid];
char buf[1024];
/* Key-specific attributes, set by opcodes before the key type. */
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now;
long long lru_clock = 0;
unsigned long long ckeysLoaded = 0;
uint64_t mvcc_tstamp = OBJ_MVCC_INVALID;
size_t ckeysLoaded = 0;
now = mstime();
rdbAsyncWorkThread wqueue(rsi, rdbflags, now);
robj *subexpireKey = nullptr;
sds key = nullptr;
bool fLastKeyExpired = false;
std::unique_ptr<rdbInsertJob> spjob;
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
g_pserver->db[idb]->trackChanges(true, 1024);
// If we're tracking changes we need to reset this
bool fTracking = g_pserver->db[0]->FTrackingChanges();
if (fTracking) {
// We don't want to track here because processChangesAsync is outside the normal scope handling
for (int idb = 0; idb < cserver.dbnum; ++idb) {
if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges();
}
}
rdb->update_cksum = rdbLoadProgressCallback;
rdb->chksum_arg = &wqueue;
rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes;
if (rioRead(rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0';
@ -2634,8 +3001,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
return C_ERR;
}
now = mstime();
lru_clock = LRU_CLOCK();
wqueue.start();
while(1) {
robj *val;
@ -2683,7 +3050,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
"databases. Exiting\n", cserver.dbnum);
exit(1);
}
db = g_pserver->db[dbid];
dbCur = g_pserver->db[dbid];
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently
@ -2693,7 +3060,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
goto eoferr;
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
db->expand(db_size);
if (g_pserver->allowRdbResizeOp && !g_pserver->m_pstorageFactory) {
wqueue.enqueue([dbCur, db_size]{
dbCur->expand(db_size);
});
}
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_AUX) {
/* AUX: generic string-string fields. Use to add state to RDB
@ -2768,12 +3139,10 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
}
}
else {
redisObjectStack keyobj;
initStaticStringObject(keyobj,key);
long long expireT = strtoll(szFromObj(auxval), nullptr, 10);
setExpire(NULL, db, &keyobj, subexpireKey, expireT);
replicateSubkeyExpire(db, &keyobj, subexpireKey, expireT);
decrRefCount(subexpireKey);
serverAssert(spjob != nullptr);
serverAssert(sdscmp(key, spjob->key) == 0);
spjob->addSubexpireKey(subexpireKey, expireT);
subexpireKey = nullptr;
}
} else {
@ -2846,7 +3215,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
key = nullptr;
}
if ((key = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
if ((key = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS_SHARED,NULL)) == NULL)
goto eoferr;
/* Read value */
if ((val = rdbLoadObject(type,rdb,key,mvcc_tstamp)) == NULL) {
@ -2854,7 +3223,20 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
key = nullptr;
goto eoferr;
}
bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false;
if (spjob != nullptr)
wqueue.enqueue(spjob);
spjob = std::make_unique<rdbInsertJob>();
spjob->db = dbCur;
spjob->key = sdsdupshared(key);
spjob->val = val;
spjob->lru_clock = lru_clock;
spjob->expiretime = expiretime;
spjob->lru_idle = lru_idle;
spjob->lfu_freq = lfu_freq;
val = nullptr;
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
@ -2864,75 +3246,15 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
* Similarly if the RDB is the preamble of an AOF file, we want to
* load all the keys as they are, since the log of operations later
* assume to work in an exact keyspace state. */
redisObjectStack keyobj;
initStaticStringObject(keyobj,key);
bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now;
if (fStaleMvccKey || fExpiredKey) {
if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) {
// We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us
robj_sharedptr objKeyDup(createStringObject(key, sdslen(key)));
rsi->mi->staleKeyMap->operator[](db->id).push_back(objKeyDup);
}
fLastKeyExpired = true;
sdsfree(key);
key = nullptr;
decrRefCount(val);
val = nullptr;
} else {
/* If we have a storage provider check if we need to evict some keys to stay under our memory limit,
do this every 16 keys to limit the perf impact */
if (g_pserver->m_pstorageFactory && (ckeysLoaded % 128) == 0)
{
fLastKeyExpired = fStaleMvccKey || fExpiredKey;
ckeysLoaded++;
if (g_pserver->m_pstorageFactory && (ckeysLoaded % 128) == 0)
{
if (!serverTL->gcEpoch.isReset()) {
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK);
if (fHighMemory || (ckeysLoaded % (1024)) == 0)
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges();
if (fHighMemory && !(rsi && rsi->fForceSetKey)) {
g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica)
fHighMemory = false; // we took care of it
}
g_pserver->db[idb]->trackChanges(false, 1024);
}
if (fHighMemory)
performEvictions(false /* fPreSnapshot*/);
}
}
redisObjectStack keyobj;
initStaticStringObject(keyobj,key);
/* Add the new object in the hash table */
int fInserted = dbMerge(db, key, val, (rsi && rsi->fForceSetKey) || (rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef
fLastKeyExpired = false;
if (fInserted)
{
++ckeysLoaded;
/* Set the expire time if needed */
if (expiretime != -1)
{
setExpire(NULL,db,&keyobj,nullptr,expiretime);
}
/* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);
/* call key space notification on key loaded for modules only */
moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, db->id);
replicationNotifyLoadedKey(db, &keyobj, val, expiretime);
}
else
{
decrRefCount(val);
val = nullptr;
}
}
@ -2948,6 +3270,9 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
lru_idle = -1;
}
if (spjob != nullptr)
wqueue.enqueue(spjob);
if (key != nullptr)
{
sdsfree(key);
@ -2980,11 +3305,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
}
}
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges();
wqueue.endWork();
if (fTracking) {
// Reset track changes
for (int idb = 0; idb < cserver.dbnum; ++idb) {
g_pserver->db[idb]->trackChanges(false);
}
}
return C_OK;
/* Unexpected end of file is handled here calling rdbReportReadError():
@ -2992,6 +3320,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
* the RDB file from a socket during initial SYNC (diskless replica mode),
* we'll report the error to the caller, so that we can retry. */
eoferr:
if (fTracking) {
// Reset track changes
for (int idb = 0; idb < cserver.dbnum; ++idb) {
g_pserver->db[idb]->trackChanges(false);
}
}
wqueue.endWork();
if (key != nullptr)
{
sdsfree(key);

View File

@ -119,10 +119,11 @@
#define RDB_MODULE_OPCODE_STRING 5 /* String. */
/* rdbLoad...() functions flags. */
#define RDB_LOAD_NONE 0
#define RDB_LOAD_ENC (1<<0)
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_LOAD_NONE 0
#define RDB_LOAD_ENC (1<<0)
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_LOAD_SDS_SHARED ((1 << 3) | RDB_LOAD_SDS)
/* flags on the purpose of rdb save or load */
#define RDBFLAGS_NONE 0 /* No special RDB loading. */

View File

@ -46,6 +46,7 @@
#include <chrono>
#include <unordered_map>
#include <string>
#include <sys/mman.h>
void replicationDiscardCachedMaster(redisMaster *mi);
void replicationResurrectCachedMaster(redisMaster *mi, connection *conn);
@ -184,8 +185,39 @@ int bg_unlink(const char *filename) {
/* ---------------------------------- MASTER -------------------------------- */
bool createDiskBacklog() {
// Lets create some disk backed pages and add them here
std::string path = "./repl-backlog-temp" + std::to_string(gettid());
int fd = open(path.c_str(), O_CREAT | O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
if (fd < 0) {
return false;
}
size_t alloc = cserver.repl_backlog_disk_size;
int result = truncate(path.c_str(), alloc);
unlink(path.c_str()); // ensure the fd is the only ref
if (result == -1) {
close (fd);
return false;
}
g_pserver->repl_backlog_disk = (char*)mmap(nullptr, alloc, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);
if (g_pserver->repl_backlog_disk == MAP_FAILED) {
g_pserver->repl_backlog_disk = nullptr;
return false;
}
serverLog(LL_VERBOSE, "Disk Backed Replication Allocated");
return true;
}
void createReplicationBacklog(void) {
serverAssert(g_pserver->repl_backlog == NULL);
if (cserver.repl_backlog_disk_size) {
if (!createDiskBacklog()) {
serverLog(LL_WARNING, "Failed to create disk backlog, will use memory only");
}
}
g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL);
g_pserver->repl_backlog_histlen = 0;
g_pserver->repl_backlog_idx = 0;
@ -234,9 +266,22 @@ void resizeReplicationBacklog(long long newsize) {
long long earliest_off = g_pserver->repl_lowest_off.load();
if (earliest_off != -1) {
// We need to keep critical data so we can't shrink less than the hot data in the buffer
char *backlog = nullptr;
newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off);
char *backlog = (char*)zmalloc(newsize);
if (cserver.repl_backlog_disk_size != 0) {
if (newsize > g_pserver->repl_backlog_config_size) {
if (g_pserver->repl_backlog == g_pserver->repl_backlog_disk)
return; // Can't do anything more
serverLog(LL_NOTICE, "Switching to disk backed replication backlog due to exceeding memory limits");
backlog = g_pserver->repl_backlog_disk;
newsize = cserver.repl_backlog_disk_size;
}
}
// We need to keep critical data so we can't shrink less than the hot data in the buffer
if (backlog == nullptr)
backlog = (char*)zmalloc(newsize);
g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - earliest_off;
long long earliest_idx = getReplIndexFromOffset(earliest_off);
@ -251,7 +296,10 @@ void resizeReplicationBacklog(long long newsize) {
auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx;
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
}
zfree(g_pserver->repl_backlog);
if (g_pserver->repl_backlog != g_pserver->repl_backlog_disk)
zfree(g_pserver->repl_backlog);
else
serverLog(LL_NOTICE, "Returning to memory backed replication backlog");
g_pserver->repl_backlog = backlog;
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
if (g_pserver->repl_batch_idxStart >= 0) {
@ -261,7 +309,10 @@ void resizeReplicationBacklog(long long newsize) {
}
g_pserver->repl_backlog_start = earliest_off;
} else {
zfree(g_pserver->repl_backlog);
if (g_pserver->repl_backlog != g_pserver->repl_backlog_disk)
zfree(g_pserver->repl_backlog);
else
serverLog(LL_NOTICE, "Returning to memory backed replication backlog");
g_pserver->repl_backlog = (char*)zmalloc(newsize);
g_pserver->repl_backlog_histlen = 0;
g_pserver->repl_backlog_idx = 0;
@ -311,6 +362,8 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
long long maxClientBuffer = (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes;
if (maxClientBuffer <= 0)
maxClientBuffer = LLONG_MAX; // infinite essentially
if (cserver.repl_backlog_disk_size)
maxClientBuffer = std::max(g_pserver->repl_backlog_size, cserver.repl_backlog_disk_size);
long long min_offset = LLONG_MAX;
int listening_replicas = 0;
while ((ln = listNext(&li))) {
@ -761,7 +814,6 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
/* Force the partial sync to be queued */
prepareClientToWrite(c);
c->fPendingReplicaWrite = true;
return len;
}
@ -890,6 +942,7 @@ int masterTryPartialResynchronization(client *c) {
c->repl_ack_time = g_pserver->unixtime;
c->repl_put_online_on_ack = 0;
listAddNodeTail(g_pserver->slaves,c);
g_pserver->rgthreadvar[c->iel].cclientsReplica++;
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
@ -993,6 +1046,7 @@ int startBgsaveForReplication(int mincapa) {
replica->replstate = REPL_STATE_NONE;
replica->flags &= ~CLIENT_SLAVE;
listDelNode(g_pserver->slaves,ln);
g_pserver->rgthreadvar[replica->iel].cclientsReplica--;
addReplyError(replica,
"BGSAVE failed, replication can't continue");
replica->flags |= CLIENT_CLOSE_AFTER_REPLY;
@ -1122,6 +1176,7 @@ void syncCommand(client *c) {
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(g_pserver->slaves,c);
g_pserver->rgthreadvar[c->iel].cclientsReplica++;
/* Create the replication backlog if needed. */
if (listLength(g_pserver->slaves) == 1 && g_pserver->repl_backlog == NULL) {
@ -4992,6 +5047,22 @@ void flushReplBacklogToClients()
long long min_offset = LONG_LONG_MAX;
// Ensure no overflow
serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset);
if (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart > g_pserver->repl_backlog_size) {
// We overflowed
listIter li;
listNode *ln;
listRewind(g_pserver->slaves, &li);
while ((ln = listNext(&li))) {
client *c = (client*)listNodeValue(ln);
sds sdsClient = catClientInfoString(sdsempty(),c);
freeClientAsync(c);
serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", sdsClient);
sdsfree(sdsClient);
}
goto LDone;
}
// Ensure no overflow if we get here
serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size);
serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx);
@ -5022,16 +5093,13 @@ void flushReplBacklogToClients()
replica->repl_end_off = g_pserver->master_repl_offset;
/* Only if the there isn't already a pending write do we prepare the client to write */
if (!replica->fPendingReplicaWrite){
serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset);
prepareClientToWrite(replica);
replica->fPendingReplicaWrite = true;
}
serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset);
prepareClientToWrite(replica);
}
if (fAsyncWrite)
ProcessPendingAsyncWrites();
LDone:
// This may be called multiple times per "frame" so update with our progress flushing to clients
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;

View File

@ -99,6 +99,7 @@ static const rio rioBufferIO = {
rioBufferTell,
rioBufferFlush,
NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
@ -113,6 +114,7 @@ static const rio rioConstBufferIO = {
rioBufferTell,
rioBufferFlush,
NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
@ -176,6 +178,7 @@ static const rio rioFileIO = {
rioFileTell,
rioFileFlush,
NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
@ -275,6 +278,7 @@ static const rio rioConnIO = {
rioConnTell,
rioConnFlush,
NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
@ -394,6 +398,7 @@ static const rio rioFdIO = {
rioFdTell,
rioFdFlush,
NULL, /* update_checksum */
NULL, /* update checksum arg */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */

View File

@ -58,6 +58,7 @@ struct _rio {
* and len fields pointing to the new block of data to add to the checksum
* computation. */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
void *chksum_arg;
/* The current checksum and flags (see RIO_FLAG_*) */
uint64_t cksum, flags;

View File

@ -1707,12 +1707,12 @@ void tryResizeHashTables(int dbid) {
* is returned. */
int redisDbPersistentData::incrementallyRehash() {
/* Keys dictionary */
if (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) {
int result = dictRehashMilliseconds(m_pdict,1);
result += dictRehashMilliseconds(m_pdictTombstone,1);
return result; /* already used our millisecond for this loop... */
}
return 0;
int result = 0;
if (dictIsRehashing(m_pdict))
result += dictRehashMilliseconds(m_pdict,1);
if (dictIsRehashing(m_pdictTombstone))
dictRehashMilliseconds(m_pdictTombstone,1); // don't count this
return result; /* already used our millisecond for this loop... */
}
/* This function is called once a background process of some kind terminates,
@ -2115,12 +2115,17 @@ void databasesCron(bool fMainThread) {
if (g_pserver->activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
if (serverTL->rehashCtl != nullptr) {
if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) {
break;
} else {
dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/);
serverTL->rehashCtl = nullptr;
if (!serverTL->rehashCtl->done.load(std::memory_order_relaxed)) {
aeReleaseLock();
if (dictRehashSomeAsync(serverTL->rehashCtl, rehashes_per_ms)) {
aeAcquireLock();
break;
}
aeAcquireLock();
}
dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/);
serverTL->rehashCtl = nullptr;
}
serverAssert(serverTL->rehashCtl == nullptr);
@ -2128,22 +2133,27 @@ void databasesCron(bool fMainThread) {
/* Are we async rehashing? And if so is it time to re-calibrate? */
/* The recalibration limit is a prime number to ensure balancing across threads */
if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled && cserver.cthreads > 1 && dictSize(dict) > 2048 && dictIsRehashing(dict) && !g_pserver->loading) {
serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms);
++async_rehashes;
serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms * ((1000 / g_pserver->hz) / 10)); // Estimate 10% CPU time spent in lock contention
if (serverTL->rehashCtl)
++async_rehashes;
}
if (serverTL->rehashCtl)
break;
// Before starting anything new, can we end the rehash of a blocked thread?
if (dict->asyncdata != nullptr) {
while (dict->asyncdata != nullptr) {
auto asyncdata = dict->asyncdata;
if (asyncdata->done) {
dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer
serverAssert(dict->asyncdata != asyncdata);
break; // completion can be expensive, don't do anything else
} else {
break;
}
}
if (dict->asyncdata)
break;
rehashes_per_ms = g_pserver->db[rehash_db]->incrementallyRehash();
async_rehashes = 0;
if (rehashes_per_ms > 0) {
@ -2364,13 +2374,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
UNUSED(id);
UNUSED(clientData);
if (serverTL->rehashCtl != nullptr && !serverTL->rehashCtl->done) {
aeReleaseLock();
// If there is not enough lock contention we may not have made enough progress on the async
// rehash. Ensure we finish it outside the lock.
dictRehashSomeAsync(serverTL->rehashCtl, serverTL->rehashCtl->queue.size());
aeAcquireLock();
}
if (g_pserver->maxmemory && g_pserver->m_pstorageFactory)
performEvictions(false);
/* If another threads unblocked one of our clients, and this thread has been idle
then beforeSleep won't have a chance to process the unblocking. So we also
@ -2614,7 +2619,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
g_pserver->rdb_bgsave_scheduled = 0;
}
if (cserver.storage_memory_model == STORAGE_WRITEBACK && g_pserver->m_pstorageFactory) {
if (cserver.storage_memory_model == STORAGE_WRITEBACK && g_pserver->m_pstorageFactory && !g_pserver->loading) {
run_with_period(g_pserver->storage_flush_period) {
flushStorageWeak();
}
@ -2660,13 +2665,8 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
UNUSED(id);
UNUSED(clientData);
if (serverTL->rehashCtl != nullptr && !serverTL->rehashCtl->done) {
aeReleaseLock();
// If there is not enough lock contention we may not have made enough progress on the async
// rehash. Ensure we finish it outside the lock.
dictRehashSomeAsync(serverTL->rehashCtl, serverTL->rehashCtl->queue.size());
aeAcquireLock();
}
if (g_pserver->maxmemory && g_pserver->m_pstorageFactory)
performEvictions(false);
int iel = ielFromEventLoop(eventLoop);
serverAssert(iel != IDX_EVENT_LOOP_MAIN);
@ -2895,7 +2895,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
static thread_local bool fFirstRun = true;
// note: we also copy the DB pointer in case a DB swap is done while the lock is released
std::vector<redisDb*> vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released
if (cserver.storage_memory_model == STORAGE_WRITETHROUGH && g_pserver->m_pstorageFactory != nullptr)
if (cserver.storage_memory_model == STORAGE_WRITETHROUGH && g_pserver->m_pstorageFactory != nullptr && !g_pserver->loading)
{
if (!fFirstRun) {
mstime_t storage_process_latency;
@ -2967,6 +2967,16 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
serverAssert(sleeping_threads <= cserver.cthreads);
}
if (!g_pserver->garbageCollector.empty()) {
// Server threads don't free the GC, but if we don't have a
// a bgsave or some other async task then we'll hold onto the
// data for too long
g_pserver->asyncworkqueue->AddWorkFunction([]{
auto epoch = g_pserver->garbageCollector.startEpoch();
g_pserver->garbageCollector.endEpoch(epoch);
}, true /*fHiPri*/);
}
/* Determine whether the modules are enabled before sleeping, and use that result
both here, and after wakeup to avoid double acquire or release of the GIL */
serverTL->modulesEnabledThisAeLoop = !!moduleCount();
@ -3477,6 +3487,8 @@ int setOOMScoreAdj(int process_class) {
* g_pserver->maxclients to the value that we can actually handle. */
void adjustOpenFilesLimit(void) {
rlim_t maxfiles = g_pserver->maxclients+CONFIG_MIN_RESERVED_FDS;
if (g_pserver->m_pstorageFactory)
maxfiles += g_pserver->m_pstorageFactory->filedsRequired();
struct rlimit limit;
if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {
@ -3795,8 +3807,6 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
pvar->in_eval = 0;
pvar->in_exec = 0;
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE);
aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE);
pvar->current_client = nullptr;
pvar->fRetrySetAofEvent = false;
if (pvar->el == NULL) {
@ -3805,6 +3815,8 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
strerror(errno));
exit(1);
}
aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE);
aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE);
fastlock_init(&pvar->lockPendingWrite, "lockPendingWrite");
@ -4010,6 +4022,9 @@ void InitServerLast() {
g_pserver->initial_memory_usage = zmalloc_used_memory();
g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads);
// Allocate the repl backlog
}
/* Parse the flags string description 'strflags' and set them to the
@ -6637,6 +6652,7 @@ static void sigShutdownHandler(int sig) {
if (g_pserver->shutdown_asap && sig == SIGINT) {
serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
rdbRemoveTempFile(g_pserver->rdbThreadVars.tmpfileNum, 1);
g_pserver->garbageCollector.shutdown();
exit(1); /* Exit with an error since this was not a clean shutdown. */
} else if (g_pserver->loading) {
serverLogFromHandler(LL_WARNING, "Received shutdown signal during loading, exiting now.");
@ -6818,6 +6834,7 @@ void loadDataFromDisk(void) {
serverLog(LL_NOTICE, "Loading the RDB even though we have a storage provider because the database is empty");
}
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
if (g_pserver->aof_state == AOF_ON) {
if (loadAppendOnlyFile(g_pserver->aof_filename) == C_OK)
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
@ -6863,6 +6880,8 @@ void loadDataFromDisk(void) {
exit(1);
}
}
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch.reset();
}
void redisOutOfMemoryHandler(size_t allocation_size) {
@ -7137,13 +7156,6 @@ void *workerThreadMain(void *parg)
serverAssert(!GlobalLocksAcquired());
aeDeleteEventLoop(el);
aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb) {
if (g_pserver->rgthreadvar[iel].rgdbSnapshot[idb] != nullptr)
g_pserver->db[idb]->endSnapshot(g_pserver->rgthreadvar[iel].rgdbSnapshot[idb]);
}
aeReleaseLock();
return NULL;
}
@ -7463,7 +7475,13 @@ int main(int argc, char **argv) {
}
InitServerLast();
loadDataFromDisk();
try {
loadDataFromDisk();
} catch (ShutdownException) {
exit(EXIT_SUCCESS);
}
if (g_pserver->cluster_enabled) {
if (verifyClusterConfigWithData() == C_ERR) {
serverLog(LL_WARNING,
@ -7519,6 +7537,11 @@ int main(int argc, char **argv) {
serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
pthread_create(&cserver.time_thread_id, nullptr, timeThreadMain, nullptr);
if (cserver.time_thread_priority) {
struct sched_param time_thread_priority;
time_thread_priority.sched_priority = sched_get_priority_max(SCHED_FIFO);
pthread_setschedparam(cserver.time_thread_id, SCHED_FIFO, &time_thread_priority);
}
pthread_attr_t tattr;
pthread_attr_init(&tattr);
@ -7560,8 +7583,7 @@ int main(int argc, char **argv) {
if (!fLockAcquired)
g_fInCrash = true; // We don't actually crash right away, because we want to sync any storage providers
for (int idb = 0; idb < cserver.dbnum; ++idb) {
delete g_pserver->db[idb];
g_pserver->db[idb] = nullptr;
g_pserver->db[idb]->storageProviderDelete();
}
// If we couldn't acquire the global lock it means something wasn't shutdown and we'll probably deadlock
serverAssert(fLockAcquired);

View File

@ -39,6 +39,9 @@
#include "rio.h"
#include "atomicvar.h"
#include <concurrentqueue.h>
#include <blockingconcurrentqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <cmath>
@ -1070,6 +1073,9 @@ class dict_iter : public dict_const_iter
{
dict *m_dict = nullptr;
public:
dict_iter()
: dict_const_iter(nullptr)
{}
explicit dict_iter(nullptr_t)
: dict_const_iter(nullptr)
{}
@ -1134,7 +1140,7 @@ public:
void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, m_pdict); }
void getExpireStats(char *buf, size_t bufsize) { m_setexpire->getstats(buf, bufsize); }
bool insert(char *k, robj *o, bool fAssumeNew = false);
bool insert(char *k, robj *o, bool fAssumeNew = false, dict_iter *existing = nullptr);
void tryResize();
int incrementallyRehash();
void updateValue(dict_iter itr, robj *val);
@ -1156,14 +1162,17 @@ public:
bool FRehashing() const { return dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone); }
void setStorageProvider(StorageCache *pstorage);
void endStorageProvider();
void trackChanges(bool fBulk, size_t sizeHint = 0);
bool FTrackingChanges() const { return !!m_fTrackingChanges; }
// Process and commit changes for secondary storage. Note that process and commit are seperated
// to allow you to release the global lock before commiting. To prevent deadlocks you *must*
// either release the global lock or keep the same global lock between the two functions as
// a second look is kept to ensure writes to secondary storage are ordered
bool processChanges(bool fSnapshot);
void processChangesAsync(std::atomic<int> &pendingJobs);
void commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree = nullptr);
// This should only be used if you look at the key, we do not fixup
@ -1179,10 +1188,12 @@ public:
void restoreSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
bool FStorageProvider() { return m_spstorage != nullptr; }
bool removeCachedValue(const char *key);
bool removeCachedValue(const char *key, dictEntry **ppde = nullptr);
void removeAllCachedValues();
void disableKeyCache();
bool keycacheIsEnabled();
void prefetchKeysAsync(client *c, struct parsed_command &command);
bool prefetchKeysAsync(client *c, struct parsed_command &command, bool fExecOK);
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
@ -1297,6 +1308,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
void initialize(int id);
void storageProviderInitialize();
void storageProviderDelete();
virtual ~redisDb();
void dbOverwriteCore(redisDb::iter itr, sds keySds, robj *val, bool fUpdateMvcc, bool fRemoveExpire);
@ -1330,17 +1342,21 @@ struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::setExpire;
using redisDbPersistentData::trackChanges;
using redisDbPersistentData::processChanges;
using redisDbPersistentData::processChangesAsync;
using redisDbPersistentData::commitChanges;
using redisDbPersistentData::setexpireUnsafe;
using redisDbPersistentData::setexpire;
using redisDbPersistentData::endSnapshot;
using redisDbPersistentData::restoreSnapshot;
using redisDbPersistentData::removeAllCachedValues;
using redisDbPersistentData::disableKeyCache;
using redisDbPersistentData::keycacheIsEnabled;
using redisDbPersistentData::dictUnsafeKeyOnly;
using redisDbPersistentData::resortExpire;
using redisDbPersistentData::prefetchKeysAsync;
using redisDbPersistentData::prepOverwriteForSnapshot;
using redisDbPersistentData::FRehashing;
using redisDbPersistentData::FTrackingChanges;
public:
const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional) {
@ -1607,7 +1623,6 @@ struct client {
* when sending data to this replica. */
long long repl_end_off = -1; /* Replication offset to write to, stored in the replica, as opposed to using the global offset
* to prevent needing the global lock */
int fPendingReplicaWrite; /* Is there a write queued for this replica? */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
@ -1670,6 +1685,10 @@ struct client {
robj **argv;
size_t argv_len_sumActive = 0;
bool FPendingReplicaWrite() const {
return repl_curr_off != repl_end_off;
}
// post a function from a non-client thread to run on its client thread
bool postFunction(std::function<void(client *)> fn, bool fLock = true);
size_t argv_len_sum() const;
@ -2011,7 +2030,7 @@ public:
// Per-thread variabels that may be accessed without a lock
struct redisServerThreadVars {
aeEventLoop *el;
aeEventLoop *el = nullptr;
socketFds ipfd; /* TCP socket file descriptors */
socketFds tlsfd; /* TLS socket file descriptors */
int in_eval; /* Are we inside EVAL? */
@ -2020,6 +2039,7 @@ struct redisServerThreadVars {
list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */
list *clients_pending_asyncwrite;
int cclients;
int cclientsReplica = 0;
client *current_client; /* Current client */
long fixed_time_expire = 0; /* If > 0, expire keys against server.mstime. */
client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */
@ -2139,6 +2159,8 @@ struct redisServerConst {
int storage_memory_model = STORAGE_WRITETHROUGH;
char *storage_conf = nullptr;
int fForkBgSave = false;
int time_thread_priority = false;
long long repl_backlog_disk_size = 0;
};
struct redisServer {
@ -2215,6 +2237,8 @@ struct redisServer {
int active_expire_enabled; /* Can be disabled for testing purposes. */
int replicaIsolationFactor = 1;
/* Fields used only for stats */
long long stat_numcommands; /* Number of processed commands */
long long stat_numconnections; /* Number of connections received */
@ -2312,6 +2336,7 @@ struct redisServer {
sds aof_child_diff; /* AOF diff accumulator child side. */
int aof_rewrite_pending = 0; /* is a call to aofChildWriteDiffData already queued? */
/* RDB persistence */
int allowRdbResizeOp; /* Debug situations we may want rehash to be ocurring, so ignore resize */
long long dirty; /* Changes to DB from the last save */
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
struct _rdbThreadVars
@ -2376,6 +2401,7 @@ struct redisServer {
int replicaseldb; /* Last SELECTed DB in replication output */
int repl_ping_slave_period; /* Master pings the replica every N seconds */
char *repl_backlog; /* Replication backlog for partial syncs */
char *repl_backlog_disk = nullptr;
long long repl_backlog_size; /* Backlog circular buffer size */
long long repl_backlog_config_size; /* The repl backlog may grow but we want to know what the user set it to */
long long repl_backlog_histlen; /* Backlog actual data length */
@ -3343,7 +3369,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
#define LOOKUP_NONOTIFY (1<<1)
#define LOOKUP_UPDATEMVCC (1<<2)
void dbAdd(redisDb *db, robj *key, robj *val);
void dbOverwrite(redisDb *db, robj *key, robj *val, bool fRemoveExpire = false);
void dbOverwrite(redisDb *db, robj *key, robj *val, bool fRemoveExpire = false, dict_iter *pitrExisting = nullptr);
int dbMerge(redisDb *db, sds key, robj *val, int fReplace);
void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal);
void setKey(client *c, redisDb *db, robj *key, robj *val);

View File

@ -202,6 +202,18 @@ test {client freed during loading} {
}
}
test {repeated load} {
start_server [list overrides [list server-threads 3 allow-rdb-resize-op no]] {
r debug populate 500000 key 1000
set digest [r debug digest]
for {set j 0} {$j < 10} {incr j} {
r debug reload
assert_equal $digest [r debug digest]
}
}
}
# Our COW metrics (Private_Dirty) work only on Linux
set system_name [string tolower [exec uname -s]]
if {$system_name eq {linux}} {

View File

@ -88,5 +88,28 @@ start_server {tags {"repl"}} {
}
assert_equal [r debug digest] [r -1 debug digest]
}
test {REPL Backlog handles large value} {
# initialize bigval to 64-bytes
r flushall
r config set repl-backlog-size 1K
r config set client-output-buffer-limit "replica 1024 1024 0"
set bigval "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
for {set i 0} { $i < 20 } { incr i } {
append bigval $bigval
}
r set bigkey $bigval
# We expect the replication to be disconnected so wait a bit
wait_for_condition 50 100 {
[s -1 master_link_status] eq {down}
} else {
fail "Memory limit exceeded but not detected"
}
wait_for_condition 50 100 {
[r debug digest] eq [r -1 debug digest]
} else {
fail "Replica did not reconnect"
}
}
}
}

View File

@ -44,6 +44,11 @@ start_server {tags {"introspection"}} {
set e
} {ERR*}
test {replica-weighting-factor does not accept values less than 1} {
catch {r config set replica-weighting-factor 0} e
set e
} {ERR*}
test {CLIENT SETNAME can assign a name to this connection} {
assert_equal [r client setname myname] {OK}
r client list
@ -109,6 +114,7 @@ start_server {tags {"introspection"}} {
server_cpulist
bio_cpulist
aof_rewrite_cpulist
time-thread-priority
bgsave_cpulist
storage-cache-mode
storage-provider-options
@ -117,6 +123,7 @@ start_server {tags {"introspection"}} {
active-replica
bind
set-proc-title
repl-backlog-disk-reserve
}
if {!$::tls} {