Merge remote-tracking branch 'opensource/RELEASE_6' into keydbpro

Former-commit-id: 439c655a543f4d3224d90bcdeb21ba43c2ab8ab7
This commit is contained in:
christianEQ 2022-01-14 22:19:42 +00:00
commit df51a33413
14 changed files with 334 additions and 134 deletions

View File

@ -6,6 +6,7 @@ Build-Depends:
debhelper (>= 9~),
dpkg-dev (>= 1.17.5),
systemd,
libsystemd-dev <!nocheck>,
procps <!nocheck>,
pkg-config <!nocheck>,
build-essential <!nocheck>,

View File

@ -4,13 +4,91 @@ Upstream-Name: keydb-enterprise
Source: https://gitlab.eqalpha.com/keydb-dev/KeyDB-Pro
Files: *
Copyright: © 2006-2014 Salvatore Sanfilippo <antirez@gmail.com>
Copyright © 2019, John Sully
License: Proprietary
Copyright:
© 2006-2014 Salvatore Sanfilippo <antirez@gmail.com>
© 2019-2021 John Sully
© 2019-2021 EQ Alpha Technology Ltd.
License: BSD-3-clause
Files: debian/*
Copyright: © 2009 Chris Lamb <lamby@debian.org>
Files:
src/rio.*
src/t_zset.c
src/ziplist.h
src/intset.*
src/redis-check-aof.c
deps/hiredis/*
deps/linenoise/*
Copyright:
© 2009-2012 Pieter Noordhuis <pcnoordhuis@gmail.com>
© 2009-2012 Salvatore Sanfilippo <antirez@gmail.com>
License: BSD-3-clause
Files:
src/lzf.h
src/lzfP.h
src/lzf_d.c
src/lzf_c.c
Copyright:
© 2000-2007 Marc Alexander Lehmann <schmorp@schmorp.de>
© 2009-2012 Salvatore Sanfilippo <antirez@gmail.com>
License: BSD-2-clause
Files: src/setproctitle.c
Copyright:
© 2010 William Ahern
© 2013 Salvatore Sanfilippo
© 2013 Stam He
License: BSD-3-clause
Files: src/ae_evport.c
Copyright: © 2012 Joyent, Inc.
License: BSD-3-clause
Files: src/ae_kqueue.c
Copyright: © 2009 Harish Mallipeddi <harish.mallipeddi@gmail.com>
License: BSD-3-clause
Files: utils/install_server.sh
Copyright: © 2011 Dvir Volk <dvirsk@gmail.com>
License: BSD-3-clause
Files: deps/jemalloc/*
Copyright:
© 2002-2012 Jason Evans <jasone@canonware.com>
© 2007-2012 Mozilla Foundation
© 2009-2012 Facebook, Inc.
License: BSD-3-clause
Files: src/pqsort.*
Copyright: © 1992-1993 The Regents of the University of California
License: BSD-3-clause
Files: deps/lua/*
Copyright: © 1994-2012 Lua.org, PUC-Ri
License: MIT
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
.
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
Files: pkg/deb/debian/*
Copyright:
© 2009 Chris Lamb <lamby@debian.org>
© 2020-2021 EQ Alpha Technology Ltd. <support@eqalpha.com>
License: BSD-3-clause
License: BSD-2-clause

View File

@ -3,6 +3,7 @@
include /usr/share/dpkg/buildflags.mk
export BUILD_TLS=yes
export USE_SYSTEMD=yes
export CFLAGS CPPFLAGS LDFLAGS
export DEB_BUILD_MAINT_OPTIONS = hardening=+all
export DEB_LDFLAGS_MAINT_APPEND = -ldl -latomic $(LUA_LDFLAGS)

View File

@ -11,6 +11,7 @@ Build-Depends:
# liblua5.1-dev,
# lua-bitop-dev,
# lua-cjson-dev,
libsystemd-dev <!nocheck>,
procps <!nocheck>,
build-essential <!nocheck>,
tcl <!nocheck>,

View File

@ -9,8 +9,10 @@ Copyright © 2019, John Sully
License: Proprietary
Files: debian/*
Copyright: © 2009 Chris Lamb <lamby@debian.org>
Files: pkg/deb/debian_dh9/*
Copyright:
© 2009 Chris Lamb <lamby@debian.org>
© 2020-2021 EQ Alpha Technology Ltd. <support@eqalpha.com>
License: BSD-3-clause
License: BSD-2-clause

View File

@ -9,6 +9,7 @@ include /usr/share/dpkg/buildflags.mk
#LUA_LDFLAGS = $(addprefix -llua5.1-,$(LUA_LIBS_DEBIAN)) $(addprefix ../deps/lua/src/,$(LUA_OBJECTS))
export BUILD_TLS=yes
export USE_SYSTEMD=yes
export CFLAGS CPPFLAGS LDFLAGS
export DEB_BUILD_MAINT_OPTIONS = hardening=+all
export DEB_LDFLAGS_MAINT_APPEND = -ldl -latomic $(LUA_LDFLAGS)

View File

@ -2447,6 +2447,9 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
if (sdslen(link->sndbuf) == 0 && msglen != 0)
{
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link] {
/* The connection could be timed out before this posted function executes (thanks to TCP keepalive).
* So check that the connection is still there before setting the write handler, otherwise you segfault */
if (link->conn != nullptr)
connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
});
}

View File

@ -365,11 +365,7 @@ typedef struct RedisModuleCommandFilter {
static list *moduleCommandFilters;
/* Module GIL Variables */
static int s_cAcquisitionsServer = 0;
static int s_cAcquisitionsModule = 0;
static std::mutex s_mutex;
static std::condition_variable s_cv;
static std::recursive_mutex s_mutexModule;
static readWriteLock s_moduleGIL;
thread_local bool g_fModuleThread = false;
typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
@ -5971,95 +5967,58 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
// as the server thread acquisition is sufficient. If we did try to lock we would deadlock
static bool FModuleCallBackLock(bool fServerThread)
{
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0;
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_moduleGIL.hasReader();
}
void moduleAcquireGIL(int fServerThread, int fExclusive) {
std::unique_lock<std::mutex> lock(s_mutex);
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
if (FModuleCallBackLock(fServerThread)) {
return;
}
while (*pcheck > 0)
s_cv.wait(lock);
if (fServerThread)
{
++s_cAcquisitionsServer;
s_moduleGIL.acquireRead();
}
else
{
// only try to acquire the mutexModule in exclusive mode
if (fExclusive){
// It is possible that another module thread holds the GIL (and s_mutexModule as a result).
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
// As a result, a deadlock has occured.
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
// in order to prevent this deadlock from occuring.
while (!s_mutexModule.try_lock())
s_cv.wait(lock);
}
++s_cAcquisitionsModule;
fModuleGILWlocked++;
s_moduleGIL.acquireWrite(fExclusive);
}
}
int moduleTryAcquireGIL(bool fServerThread, int fExclusive) {
std::unique_lock<std::mutex> lock(s_mutex, std::defer_lock);
if (!lock.try_lock())
return 1;
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
if (FModuleCallBackLock(fServerThread)) {
return 0;
}
if (*pcheck > 0)
return 1;
if (fServerThread)
{
++s_cAcquisitionsServer;
if (!s_moduleGIL.tryAcquireRead())
return 1;
}
else
{
// only try to acquire the mutexModule in exclusive mode
if (fExclusive){
if (!s_mutexModule.try_lock())
if (!s_moduleGIL.tryAcquireWrite(fExclusive))
return 1;
}
++s_cAcquisitionsModule;
fModuleGILWlocked++;
}
return 0;
}
void moduleReleaseGIL(int fServerThread, int fExclusive) {
std::unique_lock<std::mutex> lock(s_mutex);
if (FModuleCallBackLock(fServerThread)) {
return;
}
if (fServerThread)
{
--s_cAcquisitionsServer;
s_moduleGIL.releaseRead();
}
else
{
// only try to release the mutexModule in exclusive mode
if (fExclusive)
s_mutexModule.unlock();
--s_cAcquisitionsModule;
fModuleGILWlocked--;
s_moduleGIL.releaseWrite(fExclusive);
}
s_cv.notify_all();
}
int moduleGILAcquiredByModule(void) {
return fModuleGILWlocked > 0;
return s_moduleGIL.hasWriter();
}

View File

@ -2034,7 +2034,9 @@ void ProcessPendingAsyncWrites()
bool fResult = c->postFunction([](client *c) {
c->fPendingAsyncWriteHandler = false;
clientInstallWriteHandler(c);
c->lock.unlock();
handleClientsWithPendingWrites(c->iel, g_pserver->aof_state);
c->lock.lock();
}, false);
if (!fResult)
@ -4042,37 +4044,10 @@ int checkClientPauseTimeoutAndReturnIfPaused(void) {
*
* The function returns the total number of events processed. */
void processEventsWhileBlocked(int iel) {
serverAssert(GlobalLocksAcquired());
int eventsCount = 0;
executeWithoutGlobalLock([&](){
int iterations = 4; /* See the function top-comment. */
std::vector<client*> vecclients;
listIter li;
listNode *ln;
listRewind(g_pserver->clients, &li);
// All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks
// so unlock here, and save them for reacquisition later
while ((ln = listNext(&li)) != nullptr)
{
client *c = (client*)listNodeValue(ln);
if (c->lock.fOwnLock()) {
serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop
c->lock.unlock();
vecclients.push_back(c);
}
}
/* Since we're about to release our lock we need to flush the repl backlog queue */
bool fReplBacklog = g_pserver->repl_batch_offStart >= 0;
if (fReplBacklog) {
flushReplBacklogToClients();
g_pserver->repl_batch_idxStart = -1;
g_pserver->repl_batch_offStart = -1;
}
long long eventsCount = 0;
aeReleaseLock();
serverAssert(!GlobalLocksAcquired());
try
{
ProcessingEventsWhileBlocked = 1;
@ -4091,19 +4066,10 @@ void processEventsWhileBlocked(int iel) {
}
catch (...)
{
// Caller expects us to be locked so fix and rethrow
ProcessingEventsWhileBlocked = 0;
AeLocker locker;
locker.arm(nullptr);
locker.release();
for (client *c : vecclients)
c->lock.lock();
throw;
}
AeLocker locker;
locker.arm(nullptr);
locker.release();
});
// Try to complete any async rehashes (this would normally happen in dbCron, but that won't run here)
for (int idb = 0; idb < cserver.dbnum; ++idb) {
@ -4118,15 +4084,6 @@ void processEventsWhileBlocked(int iel) {
whileBlockedCron();
// Restore it so the calling code is not confused
if (fReplBacklog && !serverTL->el->stop) {
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
}
for (client *c : vecclients)
c->lock.lock();
// If a different thread processed the shutdown we need to abort the lua command or we will hang
if (serverTL->el->stop)
throw ShutdownException();

113
src/readwritelock.h Normal file
View File

@ -0,0 +1,113 @@
#pragma once
#include <condition_variable>
class readWriteLock {
std::mutex m_readLock;
std::recursive_mutex m_writeLock;
std::condition_variable m_cv;
int m_readCount = 0;
int m_writeCount = 0;
bool m_writeWaiting = false;
public:
void acquireRead() {
std::unique_lock<std::mutex> rm(m_readLock);
while (m_writeCount > 0 || m_writeWaiting)
m_cv.wait(rm);
m_readCount++;
}
bool tryAcquireRead() {
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock);
if (!rm.try_lock())
return false;
if (m_writeCount > 0 || m_writeWaiting)
return false;
m_readCount++;
return true;
}
void acquireWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
m_writeWaiting = true;
while (m_readCount > 0)
m_cv.wait(rm);
if (exclusive) {
/* Another thread might have the write lock while we have the read lock
but won't be able to release it until they can acquire the read lock
so release the read lock and try again instead of waiting to avoid deadlock */
while(!m_writeLock.try_lock())
m_cv.wait(rm);
}
m_writeCount++;
m_writeWaiting = false;
}
void upgradeWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
m_writeWaiting = true;
while (m_readCount > 1)
m_cv.wait(rm);
if (exclusive) {
/* Another thread might have the write lock while we have the read lock
but won't be able to release it until they can acquire the read lock
so release the read lock and try again instead of waiting to avoid deadlock */
while(!m_writeLock.try_lock())
m_cv.wait(rm);
}
m_writeCount++;
m_readCount--;
m_writeWaiting = false;
}
bool tryAcquireWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock);
if (!rm.try_lock())
return false;
if (m_readCount > 0)
return false;
if (exclusive)
if (!m_writeLock.try_lock())
return false;
m_writeCount++;
return true;
}
void releaseRead() {
std::unique_lock<std::mutex> rm(m_readLock);
serverAssert(m_readCount > 0);
m_readCount--;
m_cv.notify_all();
}
void releaseWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
serverAssert(m_writeCount > 0);
if (exclusive)
m_writeLock.unlock();
m_writeCount--;
m_cv.notify_all();
}
void downgradeWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
serverAssert(m_writeCount > 0);
if (exclusive)
m_writeLock.unlock();
m_writeCount--;
while (m_writeCount > 0 || m_writeWaiting)
m_cv.wait(rm);
m_readCount++;
}
bool hasReader() {
return m_readCount > 0;
}
bool hasWriter() {
return m_writeCount > 0;
}
bool writeWaiting() {
return m_writeWaiting;
}
};

View File

@ -2036,7 +2036,6 @@ int main(int argc, const char **argv) {
} while(config.loop);
zfree(data);
zfree(data);
if (config.redis_config != NULL) freeRedisConfig(config.redis_config);
return 0;

View File

@ -68,6 +68,7 @@
#include "keycheck.h"
#include "motd.h"
#include "t_nhash.h"
#include "readwritelock.h"
#ifdef __linux__
#include <sys/prctl.h>
#include <sys/mman.h>
@ -92,8 +93,10 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan;
/* Global vars */
namespace GlobalHidden {
struct redisServer server; /* Server global state */
readWriteLock forkLock;
}
redisServer *g_pserver = &GlobalHidden::server;
readWriteLock *g_forkLock = &GlobalHidden::forkLock;
struct redisServerConst cserver;
thread_local struct redisServerThreadVars *serverTL = NULL; // thread local server vars
std::mutex time_thread_mutex;
@ -6756,6 +6759,63 @@ void closeChildUnusedResourceAfterFork() {
cserver.pidfile = NULL;
}
void executeWithoutGlobalLock(std::function<void()> func) {
serverAssert(GlobalLocksAcquired());
std::vector<client*> vecclients;
listIter li;
listNode *ln;
listRewind(g_pserver->clients, &li);
// All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks
// so unlock here, and save them for reacquisition later
while ((ln = listNext(&li)) != nullptr)
{
client *c = (client*)listNodeValue(ln);
if (c->lock.fOwnLock()) {
serverAssert(c->flags & CLIENT_PROTECTED || c->flags & CLIENT_EXECUTING_COMMAND); // If the client is not protected we have no gurantee they won't be free'd in the event loop
c->lock.unlock();
vecclients.push_back(c);
}
}
/* Since we're about to release our lock we need to flush the repl backlog queue */
bool fReplBacklog = g_pserver->repl_batch_offStart >= 0;
if (fReplBacklog) {
flushReplBacklogToClients();
g_pserver->repl_batch_idxStart = -1;
g_pserver->repl_batch_offStart = -1;
}
aeReleaseLock();
serverAssert(!GlobalLocksAcquired());
try {
func();
}
catch (...) {
// Caller expects us to be locked so fix and rethrow
AeLocker locker;
locker.arm(nullptr);
locker.release();
for (client *c : vecclients)
c->lock.lock();
throw;
}
AeLocker locker;
locker.arm(nullptr);
locker.release();
// Restore it so the calling code is not confused
if (fReplBacklog) {
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
}
for (client *c : vecclients)
c->lock.lock();
}
/* purpose is one of CHILD_TYPE_ types */
int redisFork(int purpose) {
int childpid;
@ -6767,7 +6827,9 @@ int redisFork(int purpose) {
openChildInfoPipe();
}
long long startWriteLock = ustime();
g_forkLock->acquireWrite();
latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000);
if ((childpid = fork()) == 0) {
/* Child */
g_pserver->in_fork_child = purpose;
@ -6776,6 +6838,7 @@ int redisFork(int purpose) {
closeChildUnusedResourceAfterFork();
} else {
/* Parent */
g_forkLock->releaseWrite();
g_pserver->stat_total_forks++;
g_pserver->stat_fork_time = ustime()-start;
g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */
@ -7131,16 +7194,32 @@ void *timeThreadMain(void*) {
timespec delay;
delay.tv_sec = 0;
delay.tv_nsec = 100;
int cycle_count = 0;
g_forkLock->acquireRead();
while (true) {
{
std::unique_lock<std::mutex> lock(time_thread_mutex);
if (sleeping_threads >= cserver.cthreads) {
g_forkLock->releaseRead();
time_thread_cv.wait(lock);
g_forkLock->acquireRead();
cycle_count = 0;
}
}
updateCachedTime();
clock_nanosleep(CLOCK_MONOTONIC, 0, &delay, NULL);
if (cycle_count == MAX_CYCLES_TO_HOLD_FORK_LOCK) {
g_forkLock->releaseRead();
g_forkLock->acquireRead();
cycle_count = 0;
}
#if defined(__APPLE__)
nanosleep(&delay, nullptr);
#else
clock_nanosleep(CLOCK_MONOTONIC, 0, &delay, NULL);
#endif
cycle_count++;
}
g_forkLock->releaseRead();
}
void *workerThreadMain(void *parg)

View File

@ -101,6 +101,7 @@ typedef long long ustime_t; /* microsecond time type. */
#include "connection.h" /* Connection abstraction */
#include "serverassert.h"
#include "expire.h"
#include "readwritelock.h"
#define REDISMODULE_CORE 1
#include "redismodule.h" /* Redis modules API defines. */
@ -813,6 +814,9 @@ typedef enum {
#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
#define REDISMODULE_AUX_AFTER_RDB (1<<1)
/* Number of cycles before time thread gives up fork lock */
#define MAX_CYCLES_TO_HOLD_FORK_LOCK 10
struct RedisModule;
struct RedisModuleIO;
struct RedisModuleDigest;
@ -2751,6 +2755,7 @@ typedef struct {
*----------------------------------------------------------------------------*/
//extern struct redisServer server;
extern readWriteLock *g_forkLock;
extern struct redisServerConst cserver;
extern thread_local struct redisServerThreadVars *serverTL; // thread local server vars
extern struct sharedObjectsStruct shared;
@ -3146,6 +3151,7 @@ void sendChildInfo(childInfoType info_type, size_t keys, const char *pname);
void receiveChildInfo(void);
/* Fork helpers */
void executeWithoutGlobalLock(std::function<void()> func);
int redisFork(int type);
int hasActiveChildProcess();
void resetChildState();
@ -3288,7 +3294,7 @@ void resetErrorTableStats(void);
void adjustOpenFilesLimit(void);
void incrementErrorCount(const char *fullerr, size_t namelen);
void closeListeningSockets(int unlink_unix_socket);
void updateCachedTime();
void updateCachedTime(void);
void resetServerStats(void);
void activeDefragCycle(void);
unsigned int getLRUClock(void);

View File

@ -1,5 +1,5 @@
#define KEYDB_REAL_VERSION "6.2.1"
#define KEYDB_VERSION_NUM 0x00060201
#define KEYDB_REAL_VERSION "255.255.255"
#define KEYDB_VERSION_NUM 0x00ffffff
extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config
enum VersionCompareResult