Merge branch 'unstable' into advanced_db
Former-commit-id: 7530d93cd5c4a9fc4e6466abcf96edbdd8daea9e
This commit is contained in:
commit
331f38b94e
@ -47,8 +47,6 @@ endif
|
||||
USEASM?=true
|
||||
|
||||
ifneq ($(SANITIZE),)
|
||||
CC=clang
|
||||
CXX=clang++
|
||||
CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
|
||||
CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
|
||||
LDFLAGS+= -fsanitize=$(SANITIZE)
|
||||
@ -190,7 +188,7 @@ endif
|
||||
|
||||
REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS)
|
||||
REDIS_CXX=$(QUIET_CC)$(CC) $(FINAL_CXXFLAGS)
|
||||
REDIS_NASM=$(QUIET_CC)nasm -felf64
|
||||
KEYDB_AS=$(QUIET_CC) as --64 -g
|
||||
REDIS_LD=$(QUIET_LINK)$(CXX) $(FINAL_LDFLAGS)
|
||||
REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL)
|
||||
|
||||
@ -295,7 +293,7 @@ dict-benchmark: dict.cpp zmalloc.cpp sds.c siphash.c
|
||||
$(REDIS_CXX) -c $<
|
||||
|
||||
%.o: %.asm .make-prerequisites
|
||||
$(REDIS_NASM) $<
|
||||
$(KEYDB_AS) $< -o $@
|
||||
|
||||
clean:
|
||||
rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark
|
||||
|
@ -80,11 +80,11 @@ public:
|
||||
mutex_wrapper g_lock;
|
||||
|
||||
#else
|
||||
fastlock g_lock;
|
||||
fastlock g_lock("AE (global)");
|
||||
#endif
|
||||
thread_local aeEventLoop *g_eventLoopThisThread = NULL;
|
||||
|
||||
#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSER FAILURE\n"); *((volatile int*)0) = 1; } while(0)
|
||||
#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSERT FAILURE %s: %d\n", __FILE__, __LINE__); *((volatile int*)0) = 1; } while(0)
|
||||
|
||||
/* Include the best multiplexing layer supported by this system.
|
||||
* The following should be ordered by performances, descending. */
|
||||
@ -327,7 +327,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
|
||||
for (i = 0; i < setsize; i++)
|
||||
eventLoop->events[i].mask = AE_NONE;
|
||||
|
||||
fastlock_init(&eventLoop->flock);
|
||||
fastlock_init(&eventLoop->flock, "event loop");
|
||||
int rgfd[2];
|
||||
if (pipe(rgfd) < 0)
|
||||
goto err;
|
||||
|
@ -678,7 +678,7 @@ client *createFakeClient(void) {
|
||||
c->puser = NULL;
|
||||
listSetFreeMethod(c->reply,freeClientReplyValue);
|
||||
listSetDupMethod(c->reply,dupClientReplyValue);
|
||||
fastlock_init(&c->lock);
|
||||
fastlock_init(&c->lock, "fake client");
|
||||
fastlock_lock(&c->lock);
|
||||
initClientMultiState(c);
|
||||
return c;
|
||||
|
125
src/cluster.cpp
125
src/cluster.cpp
@ -617,6 +617,17 @@ clusterLink *createClusterLink(clusterNode *node) {
|
||||
* This function will just make sure that the original node associated
|
||||
* with this link will have the 'link' field set to NULL. */
|
||||
void freeClusterLink(clusterLink *link) {
|
||||
if (ielFromEventLoop(serverTL->el) != IDX_EVENT_LOOP_MAIN)
|
||||
{
|
||||
// we can't perform this operation on this thread, queue it on the main thread
|
||||
if (link->node)
|
||||
link->node->link = NULL;
|
||||
link->node = nullptr;
|
||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{
|
||||
freeClusterLink(link);
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (link->fd != -1) {
|
||||
aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_READABLE|AE_WRITABLE);
|
||||
}
|
||||
@ -2139,21 +2150,35 @@ void handleLinkIOError(clusterLink *link) {
|
||||
* consumed by write(). We don't try to optimize this for speed too much
|
||||
* as this is a very low traffic channel. */
|
||||
void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
serverAssert(ielFromEventLoop(el) == IDX_EVENT_LOOP_MAIN);
|
||||
clusterLink *link = (clusterLink*) privdata;
|
||||
ssize_t nwritten;
|
||||
UNUSED(el);
|
||||
UNUSED(mask);
|
||||
|
||||
nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
|
||||
// We're about to release the lock, so the link's sndbuf needs to be owned fully by us
|
||||
// allocate a new one in case anyone tries to write while we're waiting
|
||||
sds sndbuf = link->sndbuf;
|
||||
link->sndbuf = sdsempty();
|
||||
|
||||
aeReleaseLock();
|
||||
nwritten = write(fd, sndbuf, sdslen(sndbuf));
|
||||
aeAcquireLock();
|
||||
|
||||
if (nwritten <= 0) {
|
||||
serverLog(LL_DEBUG,"I/O error writing to node link: %s",
|
||||
(nwritten == -1) ? strerror(errno) : "short write");
|
||||
sdsfree(sndbuf);
|
||||
handleLinkIOError(link);
|
||||
return;
|
||||
}
|
||||
sdsrange(link->sndbuf,nwritten,-1);
|
||||
sdsrange(sndbuf,nwritten,-1);
|
||||
// Restore our send buffer, ensuring any unsent data is first
|
||||
sndbuf = sdscat(sndbuf, link->sndbuf);
|
||||
sdsfree(link->sndbuf);
|
||||
link->sndbuf = sndbuf;
|
||||
if (sdslen(link->sndbuf) == 0)
|
||||
aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_WRITABLE);
|
||||
aeDeleteFileEvent(el, link->fd, AE_WRITABLE);
|
||||
}
|
||||
|
||||
/* Read data. Try to read the first field of the header first to check the
|
||||
@ -2228,9 +2253,10 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
* the link to be invalidated, so it is safe to call this function
|
||||
* from event handlers that will do stuff with the same link later. */
|
||||
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if (sdslen(link->sndbuf) == 0 && msglen != 0)
|
||||
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER,
|
||||
clusterWriteHandler,link);
|
||||
aeCreateRemoteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER,
|
||||
clusterWriteHandler,link,false);
|
||||
|
||||
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
|
||||
|
||||
@ -3284,7 +3310,7 @@ void clusterHandleSlaveMigration(int max_slaves) {
|
||||
void resetManualFailover(void) {
|
||||
if (g_pserver->cluster->mf_end && clientsArePaused()) {
|
||||
g_pserver->clients_pause_end_time = 0;
|
||||
clientsArePaused(); /* Just use the side effect of the function. */
|
||||
unpauseClientsIfNecessary();
|
||||
}
|
||||
g_pserver->cluster->mf_end = 0; /* No manual failover in progress. */
|
||||
g_pserver->cluster->mf_can_start = 0;
|
||||
@ -4154,57 +4180,76 @@ void clusterReplyMultiBulkSlots(client *c) {
|
||||
dictIterator *di = dictGetSafeIterator(g_pserver->cluster->nodes);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
clusterNode *node = (clusterNode*)dictGetVal(de);
|
||||
int j = 0, start = -1;
|
||||
int start = -1;
|
||||
|
||||
/* Skip slaves (that are iterated when producing the output of their
|
||||
* master) and masters not serving any slot. */
|
||||
if (!nodeIsMaster(node) || node->numslots == 0) continue;
|
||||
|
||||
for (j = 0; j < CLUSTER_SLOTS; j++) {
|
||||
int bit, i;
|
||||
static_assert((CLUSTER_SLOTS % (sizeof(uint32_t)*8)) == 0, "code below assumes the bitfield is a multiple of sizeof(unsinged)");
|
||||
|
||||
if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
|
||||
if (start == -1) start = j;
|
||||
for (unsigned iw = 0; iw < (CLUSTER_SLOTS/sizeof(uint32_t)/8); ++iw)
|
||||
{
|
||||
uint32_t wordCur = reinterpret_cast<uint32_t*>(node->slots)[iw];
|
||||
if (iw != ((CLUSTER_SLOTS/sizeof(uint32_t)/8)-1))
|
||||
{
|
||||
if (start == -1 && wordCur == 0)
|
||||
continue;
|
||||
if (start != -1 && (wordCur+1)==0)
|
||||
continue;
|
||||
}
|
||||
if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
|
||||
int nested_elements = 3; /* slots (2) + master addr (1). */
|
||||
void *nested_replylen = addReplyDeferredLen(c);
|
||||
|
||||
if (bit && j == CLUSTER_SLOTS-1) j++;
|
||||
unsigned ibitStartLoop = iw*sizeof(uint32_t)*8;
|
||||
|
||||
/* If slot exists in output map, add to it's list.
|
||||
* else, create a new output map for this slot */
|
||||
if (start == j-1) {
|
||||
addReplyLongLong(c, start); /* only one slot; low==high */
|
||||
addReplyLongLong(c, start);
|
||||
} else {
|
||||
addReplyLongLong(c, start); /* low */
|
||||
addReplyLongLong(c, j-1); /* high */
|
||||
for (unsigned j = ibitStartLoop; j < (iw+1)*sizeof(uint32_t)*8; j++) {
|
||||
int i;
|
||||
int bit = (int)(wordCur & 1);
|
||||
wordCur >>= 1;
|
||||
if (bit != 0) {
|
||||
if (start == -1) start = j;
|
||||
}
|
||||
start = -1;
|
||||
if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
|
||||
int nested_elements = 3; /* slots (2) + master addr (1). */
|
||||
void *nested_replylen = addReplyDeferredLen(c);
|
||||
|
||||
/* First node reply position is always the master */
|
||||
addReplyArrayLen(c, 3);
|
||||
addReplyBulkCString(c, node->ip);
|
||||
addReplyLongLong(c, node->port);
|
||||
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
|
||||
if (bit && j == CLUSTER_SLOTS-1) j++;
|
||||
|
||||
/* Remaining nodes in reply are replicas for slot range */
|
||||
for (i = 0; i < node->numslaves; i++) {
|
||||
/* This loop is copy/pasted from clusterGenNodeDescription()
|
||||
* with modifications for per-slot node aggregation */
|
||||
if (nodeFailed(node->slaves[i])) continue;
|
||||
/* If slot exists in output map, add to it's list.
|
||||
* else, create a new output map for this slot */
|
||||
if (start == j-1) {
|
||||
addReplyLongLong(c, start); /* only one slot; low==high */
|
||||
addReplyLongLong(c, start);
|
||||
} else {
|
||||
addReplyLongLong(c, start); /* low */
|
||||
addReplyLongLong(c, j-1); /* high */
|
||||
}
|
||||
start = -1;
|
||||
|
||||
/* First node reply position is always the master */
|
||||
addReplyArrayLen(c, 3);
|
||||
addReplyBulkCString(c, node->slaves[i]->ip);
|
||||
addReplyLongLong(c, node->slaves[i]->port);
|
||||
addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
|
||||
nested_elements++;
|
||||
addReplyBulkCString(c, node->ip);
|
||||
addReplyLongLong(c, node->port);
|
||||
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
|
||||
|
||||
/* Remaining nodes in reply are replicas for slot range */
|
||||
for (i = 0; i < node->numslaves; i++) {
|
||||
/* This loop is copy/pasted from clusterGenNodeDescription()
|
||||
* with modifications for per-slot node aggregation */
|
||||
if (nodeFailed(node->slaves[i])) continue;
|
||||
addReplyArrayLen(c, 3);
|
||||
addReplyBulkCString(c, node->slaves[i]->ip);
|
||||
addReplyLongLong(c, node->slaves[i]->port);
|
||||
addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
|
||||
nested_elements++;
|
||||
}
|
||||
setDeferredArrayLen(c, nested_replylen, nested_elements);
|
||||
num_masters++;
|
||||
}
|
||||
setDeferredArrayLen(c, nested_replylen, nested_elements);
|
||||
num_masters++;
|
||||
}
|
||||
}
|
||||
serverAssert(start == -1);
|
||||
}
|
||||
|
||||
dictReleaseIterator(di);
|
||||
setDeferredArrayLen(c, slot_replylen, num_masters);
|
||||
}
|
||||
|
@ -653,8 +653,6 @@ void keysCommand(client *c) {
|
||||
unsigned long numkeys = 0;
|
||||
void *replylen = addReplyDeferredLen(c);
|
||||
|
||||
aeReleaseLock();
|
||||
|
||||
allkeys = (pattern[0] == '*' && pattern[1] == '\0');
|
||||
c->db->iterate([&](const char *key, robj *)->bool {
|
||||
robj *keyobj;
|
||||
@ -670,12 +668,6 @@ void keysCommand(client *c) {
|
||||
return true;
|
||||
});
|
||||
setDeferredArrayLen(c,replylen,numkeys);
|
||||
|
||||
fastlock_unlock(&c->db->lock); // we must release the DB lock before acquiring the AE lock to prevent deadlocks
|
||||
AeLocker lock;
|
||||
lock.arm(c);
|
||||
fastlock_lock(&c->db->lock); // we still need the DB lock
|
||||
lock.release();
|
||||
}
|
||||
|
||||
/* This callback is used by scanGenericCommand in order to collect elements
|
||||
|
@ -55,7 +55,7 @@ typedef ucontext_t sigcontext_t;
|
||||
#endif
|
||||
#endif
|
||||
|
||||
bool g_fInCrash = false;
|
||||
int g_fInCrash = false;
|
||||
|
||||
/* ================================= Debugging ============================== */
|
||||
|
||||
|
110
src/fastlock.cpp
110
src/fastlock.cpp
@ -36,10 +36,13 @@
|
||||
#include <assert.h>
|
||||
#include <pthread.h>
|
||||
#include <limits.h>
|
||||
#include <map>
|
||||
#ifdef __linux__
|
||||
#include <linux/futex.h>
|
||||
#endif
|
||||
#include <string.h>
|
||||
#include <stdarg.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#ifdef __APPLE__
|
||||
#include <TargetConditionals.h>
|
||||
@ -57,6 +60,7 @@
|
||||
#define UNUSED(x) ((void)x)
|
||||
#endif
|
||||
|
||||
extern int g_fInCrash;
|
||||
|
||||
/****************************************************
|
||||
*
|
||||
@ -125,6 +129,80 @@
|
||||
|
||||
#endif
|
||||
|
||||
#pragma weak _serverPanic
|
||||
extern "C" __attribute__((weak)) void _serverPanic(const char * /*file*/, int /*line*/, const char * /*msg*/, ...)
|
||||
{
|
||||
*((char*)-1) = 'x';
|
||||
}
|
||||
|
||||
#pragma weak serverLog
|
||||
__attribute__((weak)) void serverLog(int , const char *fmt, ...)
|
||||
{
|
||||
va_list args;
|
||||
va_start(args, fmt);
|
||||
vprintf(fmt, args);
|
||||
va_end(args);
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
class DeadlockDetector
|
||||
{
|
||||
std::map<pid_t, fastlock *> m_mapwait;
|
||||
fastlock m_lock { "deadlock detector" };
|
||||
public:
|
||||
void registerwait(fastlock *lock, pid_t thispid)
|
||||
{
|
||||
if (lock == &m_lock || g_fInCrash)
|
||||
return;
|
||||
fastlock_lock(&m_lock);
|
||||
m_mapwait.insert(std::make_pair(thispid, lock));
|
||||
|
||||
// Detect cycles
|
||||
pid_t pidCheck = thispid;
|
||||
size_t cchecks = 0;
|
||||
for (;;)
|
||||
{
|
||||
auto itr = m_mapwait.find(pidCheck);
|
||||
if (itr == m_mapwait.end())
|
||||
break;
|
||||
pidCheck = itr->second->m_pidOwner;
|
||||
if (pidCheck == thispid)
|
||||
{
|
||||
// Deadlock detected, printout some debugging info and crash
|
||||
serverLog(3 /*LL_WARNING*/, "\n\n");
|
||||
serverLog(3 /*LL_WARNING*/, "!!! ERROR: Deadlock detected !!!");
|
||||
pidCheck = thispid;
|
||||
for (;;)
|
||||
{
|
||||
auto itr = m_mapwait.find(pidCheck);
|
||||
serverLog(3 /* LL_WARNING */, "\t%d: (%p) %s", pidCheck, itr->second, itr->second->szName);
|
||||
pidCheck = itr->second->m_pidOwner;
|
||||
if (pidCheck == thispid)
|
||||
break;
|
||||
}
|
||||
serverLog(3 /*LL_WARNING*/, "!!! KeyDB Will Now Crash !!!");
|
||||
_serverPanic(__FILE__, __LINE__, "Deadlock detected");
|
||||
}
|
||||
|
||||
if (cchecks > m_mapwait.size())
|
||||
break; // There is a cycle but we're not in it
|
||||
++cchecks;
|
||||
}
|
||||
fastlock_unlock(&m_lock);
|
||||
}
|
||||
|
||||
void clearwait(fastlock *lock, pid_t thispid)
|
||||
{
|
||||
if (lock == &m_lock || g_fInCrash)
|
||||
return;
|
||||
fastlock_lock(&m_lock);
|
||||
m_mapwait.erase(thispid);
|
||||
fastlock_unlock(&m_lock);
|
||||
}
|
||||
};
|
||||
|
||||
DeadlockDetector g_dlock;
|
||||
|
||||
static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough");
|
||||
uint64_t g_longwaits = 0;
|
||||
|
||||
@ -135,7 +213,6 @@ uint64_t fastlock_getlongwaitcount()
|
||||
return rval;
|
||||
}
|
||||
|
||||
#ifndef ASM_SPINLOCK
|
||||
#ifdef __linux__
|
||||
static int futex(volatile unsigned *uaddr, int futex_op, int val,
|
||||
const struct timespec *timeout, int val3)
|
||||
@ -144,7 +221,6 @@ static int futex(volatile unsigned *uaddr, int futex_op, int val,
|
||||
timeout, uaddr, val3);
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
extern "C" pid_t gettid()
|
||||
{
|
||||
@ -163,13 +239,29 @@ extern "C" pid_t gettid()
|
||||
return pidCache;
|
||||
}
|
||||
|
||||
extern "C" void fastlock_init(struct fastlock *lock)
|
||||
extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned mask)
|
||||
{
|
||||
#ifdef __linux__
|
||||
g_dlock.registerwait(lock, pid);
|
||||
__atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE);
|
||||
futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, wake, nullptr, mask);
|
||||
__atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE);
|
||||
g_dlock.clearwait(lock, pid);
|
||||
#endif
|
||||
__atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED);
|
||||
}
|
||||
|
||||
extern "C" void fastlock_init(struct fastlock *lock, const char *name)
|
||||
{
|
||||
lock->m_ticket.m_active = 0;
|
||||
lock->m_ticket.m_avail = 0;
|
||||
lock->m_depth = 0;
|
||||
lock->m_pidOwner = -1;
|
||||
lock->futex = 0;
|
||||
int cch = strlen(name);
|
||||
cch = std::min<int>(cch, sizeof(lock->szName)-1);
|
||||
memcpy(lock->szName, name, cch);
|
||||
lock->szName[cch] = '\0';
|
||||
ANNOTATE_RWLOCK_CREATE(lock);
|
||||
}
|
||||
|
||||
@ -184,12 +276,12 @@ extern "C" void fastlock_lock(struct fastlock *lock)
|
||||
return;
|
||||
}
|
||||
|
||||
int tid = gettid();
|
||||
unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE);
|
||||
#ifdef __linux__
|
||||
unsigned mask = (1U << (myticket % 32));
|
||||
#endif
|
||||
int cloops = 0;
|
||||
ticket ticketT;
|
||||
|
||||
for (;;)
|
||||
{
|
||||
__atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);
|
||||
@ -201,17 +293,11 @@ extern "C" void fastlock_lock(struct fastlock *lock)
|
||||
#endif
|
||||
if ((++cloops % 1024*1024) == 0)
|
||||
{
|
||||
#ifdef __linux__
|
||||
__atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE);
|
||||
futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, ticketT.u, nullptr, mask);
|
||||
__atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE);
|
||||
#endif
|
||||
__atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED);
|
||||
fastlock_sleep(lock, tid, ticketT.u, mask);
|
||||
}
|
||||
}
|
||||
|
||||
lock->m_depth = 1;
|
||||
int tid = gettid();
|
||||
__atomic_store(&lock->m_pidOwner, &tid, __ATOMIC_RELEASE);
|
||||
ANNOTATE_RWLOCK_ACQUIRED(lock, true);
|
||||
std::atomic_thread_fence(std::memory_order_acquire);
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
#include <inttypes.h>
|
||||
#include <stddef.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
@ -7,7 +8,7 @@ extern "C" {
|
||||
|
||||
/* Begin C API */
|
||||
struct fastlock;
|
||||
void fastlock_init(struct fastlock *lock);
|
||||
void fastlock_init(struct fastlock *lock, const char *name);
|
||||
void fastlock_lock(struct fastlock *lock);
|
||||
int fastlock_trylock(struct fastlock *lock, int fWeak);
|
||||
void fastlock_unlock(struct fastlock *lock);
|
||||
@ -40,29 +41,31 @@ struct ticket
|
||||
|
||||
struct fastlock
|
||||
{
|
||||
volatile struct ticket m_ticket;
|
||||
|
||||
volatile int m_pidOwner;
|
||||
volatile int m_depth;
|
||||
char szName[56];
|
||||
/* Volatile data on seperate cache line */
|
||||
volatile struct ticket m_ticket;
|
||||
unsigned futex;
|
||||
char padding[56]; // ensure ticket and futex are on their own independent cache line
|
||||
|
||||
#ifdef __cplusplus
|
||||
fastlock()
|
||||
fastlock(const char *name)
|
||||
{
|
||||
fastlock_init(this);
|
||||
fastlock_init(this, name);
|
||||
}
|
||||
|
||||
void lock()
|
||||
inline void lock()
|
||||
{
|
||||
fastlock_lock(this);
|
||||
}
|
||||
|
||||
bool try_lock(bool fWeak = false)
|
||||
inline bool try_lock(bool fWeak = false)
|
||||
{
|
||||
return !!fastlock_trylock(this, fWeak);
|
||||
}
|
||||
|
||||
void unlock()
|
||||
inline void unlock()
|
||||
{
|
||||
fastlock_unlock(this);
|
||||
}
|
||||
@ -80,3 +83,5 @@ struct fastlock
|
||||
bool fOwnLock(); // true if this thread owns the lock, NOTE: not 100% reliable, use for debugging only
|
||||
#endif
|
||||
};
|
||||
|
||||
static_assert(offsetof(struct fastlock, m_ticket) == 64, "ensure padding is correct");
|
@ -1,156 +1,164 @@
|
||||
section .text
|
||||
.intel_syntax noprefix
|
||||
.text
|
||||
|
||||
extern gettid
|
||||
extern sched_yield
|
||||
extern g_longwaits
|
||||
.extern gettid
|
||||
.extern fastlock_sleep
|
||||
|
||||
; This is the first use of assembly in this codebase, a valid question is WHY?
|
||||
; The spinlock we implement here is performance critical, and simply put GCC
|
||||
; emits awful code. The original C code is left in fastlock.cpp for reference
|
||||
; and x-plat.
|
||||
# This is the first use of assembly in this codebase, a valid question is WHY?
|
||||
# The spinlock we implement here is performance critical, and simply put GCC
|
||||
# emits awful code. The original C code is left in fastlock.cpp for reference
|
||||
# and x-plat.
|
||||
|
||||
ALIGN 16
|
||||
global fastlock_lock
|
||||
.ALIGN 16
|
||||
.global fastlock_lock
|
||||
.type fastlock_lock,@function
|
||||
fastlock_lock:
|
||||
; RDI points to the struct:
|
||||
; uint16_t active
|
||||
; uint16_t avail
|
||||
; int32_t m_pidOwner
|
||||
; int32_t m_depth
|
||||
.cfi_startproc
|
||||
.cfi_def_cfa rsp, 8
|
||||
# RDI points to the struct:
|
||||
# int32_t m_pidOwner
|
||||
# int32_t m_depth
|
||||
# [rdi+64] ...
|
||||
# uint16_t active
|
||||
# uint16_t avail
|
||||
|
||||
; First get our TID and put it in ecx
|
||||
push rdi ; we need our struct pointer (also balance the stack for the call)
|
||||
call gettid ; get our thread ID (TLS is nasty in ASM so don't bother inlining)
|
||||
mov esi, eax ; back it up in esi
|
||||
pop rdi ; get our pointer back
|
||||
# First get our TID and put it in ecx
|
||||
push rdi # we need our struct pointer (also balance the stack for the call)
|
||||
.cfi_adjust_cfa_offset 8
|
||||
call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining)
|
||||
mov esi, eax # back it up in esi
|
||||
pop rdi # get our pointer back
|
||||
.cfi_adjust_cfa_offset -8
|
||||
|
||||
cmp [rdi+4], esi ; Is the TID we got back the owner of the lock?
|
||||
je .LLocked ; Don't spin in that case
|
||||
cmp [rdi], esi # Is the TID we got back the owner of the lock?
|
||||
je .LLocked # Don't spin in that case
|
||||
|
||||
xor eax, eax ; eliminate partial register dependency
|
||||
inc eax ; we want to add one
|
||||
lock xadd [rdi+2], ax ; do the xadd, ax contains the value before the addition
|
||||
; ax now contains the ticket
|
||||
ALIGN 16
|
||||
xor eax, eax # eliminate partial register dependency
|
||||
inc eax # we want to add one
|
||||
lock xadd [rdi+66], ax # do the xadd, ax contains the value before the addition
|
||||
# ax now contains the ticket
|
||||
# OK Start the wait loop
|
||||
xor ecx, ecx
|
||||
.ALIGN 16
|
||||
.LLoop:
|
||||
mov edx, [rdi]
|
||||
cmp dx, ax ; is our ticket up?
|
||||
je .LLocked ; leave the loop
|
||||
mov edx, [rdi+64]
|
||||
cmp dx, ax # is our ticket up?
|
||||
je .LLocked # leave the loop
|
||||
pause
|
||||
add ecx, 1000h ; Have we been waiting a long time? (oflow if we have)
|
||||
; 1000h is set so we overflow on the 1024*1024'th iteration (like the C code)
|
||||
jnc .LLoop ; If so, give up our timeslice to someone who's doing real work
|
||||
; Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop"
|
||||
; But the compiler doesn't know that we rarely hit this, and when we do we know the lock is
|
||||
; taking a long time to be released anyways. We optimize for the common case of short
|
||||
; lock intervals. That's why we're using a spinlock in the first place
|
||||
; If we get here we're going to sleep in the kernel with a futex
|
||||
add ecx, 0x1000 # Have we been waiting a long time? (oflow if we have)
|
||||
# 1000h is set so we overflow on the 1024*1024'th iteration (like the C code)
|
||||
jnc .LLoop # If so, give up our timeslice to someone who's doing real work
|
||||
# Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop"
|
||||
# But the compiler doesn't know that we rarely hit this, and when we do we know the lock is
|
||||
# taking a long time to be released anyways. We optimize for the common case of short
|
||||
# lock intervals. That's why we're using a spinlock in the first place
|
||||
# If we get here we're going to sleep in the kernel with a futex
|
||||
push rdi
|
||||
push rsi
|
||||
push rax
|
||||
; Setup the syscall args
|
||||
; rdi ARG1 futex (already in rdi)
|
||||
mov esi, (9 | 128) ; rsi ARG2 FUTEX_WAIT_BITSET_PRIVATE
|
||||
; rdx ARG3 ticketT.u (already in edx)
|
||||
xor r10d, r10d ; r10 ARG4 NULL
|
||||
mov r8, rdi ; r8 ARG5 dup rdi
|
||||
xor r9d, r9d
|
||||
bts r9d, eax ; r9 ARG6 mask
|
||||
mov eax, 202 ; sys_futex
|
||||
; Do the syscall
|
||||
lock or [rdi+12], r9d ; inform the unlocking thread we're waiting
|
||||
syscall ; wait for the futex
|
||||
not r9d ; convert our flag into a mask of bits not to touch
|
||||
lock and [rdi+12], r9d ; clear the flag in the futex control mask
|
||||
; cleanup and continue
|
||||
mov rcx, g_longwaits
|
||||
inc qword [rcx] ; increment our long wait counter
|
||||
.cfi_adjust_cfa_offset 24
|
||||
# Setup the syscall args
|
||||
|
||||
# rdi ARG1 futex (already in rdi)
|
||||
# rsi ARG2 tid (already in esi)
|
||||
# rdx ARG3 ticketT.u (already in edx)
|
||||
bts ecx, eax # rcx ARG4 mask
|
||||
call fastlock_sleep
|
||||
# cleanup and continue
|
||||
pop rax
|
||||
pop rsi
|
||||
xor ecx, ecx ; Reset our loop counter
|
||||
jmp .LLoop ; Get back in the game
|
||||
ALIGN 16
|
||||
pop rdi
|
||||
.cfi_adjust_cfa_offset -24
|
||||
xor ecx, ecx # Reset our loop counter
|
||||
jmp .LLoop # Get back in the game
|
||||
.ALIGN 16
|
||||
.LLocked:
|
||||
mov [rdi+4], esi ; lock->m_pidOwner = gettid()
|
||||
inc dword [rdi+8] ; lock->m_depth++
|
||||
mov [rdi], esi # lock->m_pidOwner = gettid()
|
||||
inc dword ptr [rdi+4] # lock->m_depth++
|
||||
ret
|
||||
.cfi_endproc
|
||||
|
||||
ALIGN 16
|
||||
global fastlock_trylock
|
||||
.ALIGN 16
|
||||
.global fastlock_trylock
|
||||
.type fastlock_trylock,@function
|
||||
fastlock_trylock:
|
||||
; RDI points to the struct:
|
||||
; uint16_t active
|
||||
; uint16_t avail
|
||||
; int32_t m_pidOwner
|
||||
; int32_t m_depth
|
||||
# RDI points to the struct:
|
||||
# int32_t m_pidOwner
|
||||
# int32_t m_depth
|
||||
# [rdi+64] ...
|
||||
# uint16_t active
|
||||
# uint16_t avail
|
||||
|
||||
; First get our TID and put it in ecx
|
||||
push rdi ; we need our struct pointer (also balance the stack for the call)
|
||||
call gettid ; get our thread ID (TLS is nasty in ASM so don't bother inlining)
|
||||
mov esi, eax ; back it up in esi
|
||||
pop rdi ; get our pointer back
|
||||
# First get our TID and put it in ecx
|
||||
push rdi # we need our struct pointer (also balance the stack for the call)
|
||||
call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining)
|
||||
mov esi, eax # back it up in esi
|
||||
pop rdi # get our pointer back
|
||||
|
||||
cmp [rdi+4], esi ; Is the TID we got back the owner of the lock?
|
||||
je .LRecursive ; Don't spin in that case
|
||||
cmp [rdi], esi # Is the TID we got back the owner of the lock?
|
||||
je .LRecursive # Don't spin in that case
|
||||
|
||||
mov eax, [rdi] ; get both active and avail counters
|
||||
mov ecx, eax ; duplicate in ecx
|
||||
ror ecx, 16 ; swap upper and lower 16-bits
|
||||
cmp eax, ecx ; are the upper and lower 16-bits the same?
|
||||
jnz .LAlreadyLocked ; If not return failure
|
||||
mov eax, [rdi+64] # get both active and avail counters
|
||||
mov ecx, eax # duplicate in ecx
|
||||
ror ecx, 16 # swap upper and lower 16-bits
|
||||
cmp eax, ecx # are the upper and lower 16-bits the same?
|
||||
jnz .LAlreadyLocked # If not return failure
|
||||
|
||||
; at this point we know eax+ecx have [avail][active] and they are both the same
|
||||
add ecx, 10000h ; increment avail, ecx is now our wanted value
|
||||
lock cmpxchg [rdi], ecx ; If rdi still contains the value in eax, put in ecx (inc avail)
|
||||
jnz .LAlreadyLocked ; If Z is not set then someone locked it while we were preparing
|
||||
# at this point we know eax+ecx have [avail][active] and they are both the same
|
||||
add ecx, 0x10000 # increment avail, ecx is now our wanted value
|
||||
lock cmpxchg [rdi+64], ecx # If rdi still contains the value in eax, put in ecx (inc avail)
|
||||
jnz .LAlreadyLocked # If Z is not set then someone locked it while we were preparing
|
||||
xor eax, eax
|
||||
inc eax ; return SUCCESS! (eax=1)
|
||||
mov [rdi+4], esi ; lock->m_pidOwner = gettid()
|
||||
mov dword [rdi+8], eax ; lock->m_depth = 1
|
||||
inc eax # return SUCCESS! (eax=1)
|
||||
mov [rdi], esi # lock->m_pidOwner = gettid()
|
||||
mov dword ptr [rdi+4], eax # lock->m_depth = 1
|
||||
ret
|
||||
ALIGN 16
|
||||
.ALIGN 16
|
||||
.LRecursive:
|
||||
xor eax, eax
|
||||
inc eax ; return SUCCESS! (eax=1)
|
||||
inc dword [rdi+8] ; lock->m_depth++
|
||||
inc eax # return SUCCESS! (eax=1)
|
||||
inc dword ptr [rdi+4] # lock->m_depth++
|
||||
ret
|
||||
ALIGN 16
|
||||
.ALIGN 16
|
||||
.LAlreadyLocked:
|
||||
xor eax, eax ; return 0;
|
||||
xor eax, eax # return 0
|
||||
ret
|
||||
|
||||
ALIGN 16
|
||||
global fastlock_unlock
|
||||
.ALIGN 16
|
||||
.global fastlock_unlock
|
||||
fastlock_unlock:
|
||||
; RDI points to the struct:
|
||||
; uint16_t active
|
||||
; uint16_t avail
|
||||
; int32_t m_pidOwner
|
||||
; int32_t m_depth
|
||||
# RDI points to the struct:
|
||||
# int32_t m_pidOwner
|
||||
# int32_t m_depth
|
||||
# [rdi+64] ...
|
||||
# uint16_t active
|
||||
# uint16_t avail
|
||||
push r11
|
||||
sub dword [rdi+8], 1 ; decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state
|
||||
jnz .LDone ; if depth is non-zero this is a recursive unlock, and we still hold it
|
||||
mov dword [rdi+4], -1 ; pidOwner = -1 (we don't own it anymore)
|
||||
mov ecx, [rdi] ; get current active (this one)
|
||||
inc ecx ; bump it to the next thread
|
||||
mov [rdi], cx ; give up our ticket (note: lock is not required here because the spinlock itself guards this variable)
|
||||
; At this point the lock is removed, however we must wake up any pending futexs
|
||||
mov r9d, 1 ; eax is the bitmask for 2 threads
|
||||
rol r9d, cl ; place the mask in the right spot for the next 2 threads
|
||||
ALIGN 16
|
||||
sub dword ptr [rdi+4], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state
|
||||
jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it
|
||||
mov dword ptr [rdi], -1 # pidOwner = -1 (we don't own it anymore)
|
||||
mov ecx, [rdi+64] # get current active (this one)
|
||||
inc ecx # bump it to the next thread
|
||||
mov [rdi+64], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable)
|
||||
# At this point the lock is removed, however we must wake up any pending futexs
|
||||
mov r9d, 1 # eax is the bitmask for 2 threads
|
||||
rol r9d, cl # place the mask in the right spot for the next 2 threads
|
||||
add rdi, 64 # rdi now points to the token
|
||||
.ALIGN 16
|
||||
.LRetryWake:
|
||||
mov r11d, [rdi+12] ; load the futex mask
|
||||
and r11d, r9d ; are any threads waiting on a futex?
|
||||
jz .LDone ; if not we're done.
|
||||
; we have to wake the futexs
|
||||
; rdi ARG1 futex (already in rdi)
|
||||
mov esi, (10 | 128) ; rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE
|
||||
mov edx, 0x7fffffff ; rdx ARG3 INT_MAX (number of threads to wake)
|
||||
xor r10d, r10d ; r10 ARG4 NULL
|
||||
mov r8, rdi ; r8 ARG5 dup rdi
|
||||
; r9 ARG6 mask (already set above)
|
||||
mov eax, 202 ; sys_futex
|
||||
mov r11d, [rdi+4] # load the futex mask
|
||||
and r11d, r9d # are any threads waiting on a futex?
|
||||
jz .LDone # if not we're done.
|
||||
# we have to wake the futexs
|
||||
# rdi ARG1 futex (already in rdi)
|
||||
mov esi, (10 | 128) # rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE
|
||||
mov edx, 0x7fffffff # rdx ARG3 INT_MAX (number of threads to wake)
|
||||
xor r10d, r10d # r10 ARG4 NULL
|
||||
mov r8, rdi # r8 ARG5 dup rdi
|
||||
# r9 ARG6 mask (already set above)
|
||||
mov eax, 202 # sys_futex
|
||||
syscall
|
||||
cmp eax, 1 ; did we wake as many as we expected?
|
||||
cmp eax, 1 # did we wake as many as we expected?
|
||||
jnz .LRetryWake
|
||||
.LDone:
|
||||
pop r11
|
||||
|
@ -116,7 +116,7 @@ client *createClient(int fd, int iel) {
|
||||
uint64_t client_id;
|
||||
client_id = g_pserver->next_client_id.fetch_add(1);
|
||||
c->iel = iel;
|
||||
fastlock_init(&c->lock);
|
||||
fastlock_init(&c->lock, "client");
|
||||
c->id = client_id;
|
||||
c->resp = 2;
|
||||
c->fd = fd;
|
||||
@ -248,7 +248,11 @@ void clientInstallAsyncWriteHandler(client *c) {
|
||||
int prepareClientToWrite(client *c, bool fAsync) {
|
||||
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
||||
serverAssert(FCorrectThread(c) || fAsync);
|
||||
serverAssert(c->fd <= 0 || c->lock.fOwnLock());
|
||||
if (FCorrectThread(c)) {
|
||||
serverAssert(c->fd <= 0 || c->lock.fOwnLock());
|
||||
} else {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
}
|
||||
|
||||
if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer.
|
||||
// do not install a write handler
|
||||
@ -1509,7 +1513,6 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
||||
} else {
|
||||
serverLog(LL_VERBOSE,
|
||||
"Error writing to client: %s", strerror(errno));
|
||||
lock.unlock();
|
||||
freeClientAsync(c);
|
||||
|
||||
return C_ERR;
|
||||
@ -1528,7 +1531,6 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
||||
|
||||
/* Close connection after entire reply has been sent. */
|
||||
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
|
||||
lock.unlock();
|
||||
freeClientAsync(c);
|
||||
return C_ERR;
|
||||
}
|
||||
@ -2519,9 +2521,17 @@ NULL
|
||||
close_this_client = 1;
|
||||
} else {
|
||||
if (FCorrectThread(client))
|
||||
{
|
||||
freeClient(client);
|
||||
}
|
||||
else
|
||||
{
|
||||
int iel = client->iel;
|
||||
freeClientAsync(client);
|
||||
aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] {
|
||||
freeClientsInAsyncFreeQueue(iel);
|
||||
});
|
||||
}
|
||||
}
|
||||
killed++;
|
||||
}
|
||||
@ -2950,38 +2960,48 @@ void flushSlavesOutputBuffers(void) {
|
||||
* than the time left for the previous pause, no change is made to the
|
||||
* left duration. */
|
||||
void pauseClients(mstime_t end) {
|
||||
if (!g_pserver->clients_paused || end > g_pserver->clients_pause_end_time)
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if (!serverTL->clients_paused || end > g_pserver->clients_pause_end_time)
|
||||
g_pserver->clients_pause_end_time = end;
|
||||
g_pserver->clients_paused = 1;
|
||||
|
||||
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||
{
|
||||
g_pserver->rgthreadvar[iel].clients_paused = true;
|
||||
}
|
||||
}
|
||||
|
||||
/* Return non-zero if clients are currently paused. As a side effect the
|
||||
* function checks if the pause time was reached and clear it. */
|
||||
int clientsArePaused(void) {
|
||||
if (g_pserver->clients_paused &&
|
||||
return serverTL->clients_paused;
|
||||
}
|
||||
|
||||
void unpauseClientsIfNecessary()
|
||||
{
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if (serverTL->clients_paused &&
|
||||
g_pserver->clients_pause_end_time < g_pserver->mstime)
|
||||
{
|
||||
aeAcquireLock();
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
client *c;
|
||||
|
||||
g_pserver->clients_paused = 0;
|
||||
serverTL->clients_paused = 0;
|
||||
|
||||
/* Put all the clients in the unblocked clients queue in order to
|
||||
* force the re-processing of the input buffer if any. */
|
||||
listRewind(g_pserver->clients,&li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
c = (client*)listNodeValue(ln);
|
||||
if (!FCorrectThread(c))
|
||||
continue;
|
||||
|
||||
/* Don't touch slaves and blocked clients.
|
||||
* The latter pending requests will be processed when unblocked. */
|
||||
if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue;
|
||||
queueClientForReprocessing(c);
|
||||
}
|
||||
aeReleaseLock();
|
||||
}
|
||||
return g_pserver->clients_paused;
|
||||
}
|
||||
|
||||
/* This function is called by Redis in order to process a few events from
|
||||
@ -3000,6 +3020,12 @@ int processEventsWhileBlocked(int iel) {
|
||||
int iterations = 4; /* See the function top-comment. */
|
||||
int count = 0;
|
||||
|
||||
client *c = serverTL->current_client;
|
||||
if (c != nullptr)
|
||||
{
|
||||
serverAssert(c->flags & CLIENT_PROTECTED);
|
||||
c->lock.unlock();
|
||||
}
|
||||
aeReleaseLock();
|
||||
while (iterations--) {
|
||||
int events = 0;
|
||||
@ -3008,7 +3034,11 @@ int processEventsWhileBlocked(int iel) {
|
||||
if (!events) break;
|
||||
count += events;
|
||||
}
|
||||
aeAcquireLock();
|
||||
AeLocker locker;
|
||||
if (c != nullptr)
|
||||
c->lock.lock();
|
||||
locker.arm(c);
|
||||
locker.release();
|
||||
return count;
|
||||
}
|
||||
|
||||
|
@ -301,9 +301,11 @@ int pubsubPublishMessage(robj *channel, robj *message) {
|
||||
client *c = reinterpret_cast<client*>(ln->value);
|
||||
if (c->flags & CLIENT_CLOSE_ASAP) // avoid blocking if the write will be ignored
|
||||
continue;
|
||||
fastlock_lock(&c->lock);
|
||||
if (FCorrectThread(c))
|
||||
fastlock_lock(&c->lock);
|
||||
addReplyPubsubMessage(c,channel,message);
|
||||
fastlock_unlock(&c->lock);
|
||||
if (FCorrectThread(c))
|
||||
fastlock_unlock(&c->lock);
|
||||
receivers++;
|
||||
}
|
||||
}
|
||||
@ -321,10 +323,12 @@ int pubsubPublishMessage(robj *channel, robj *message) {
|
||||
{
|
||||
if (pat->pclient->flags & CLIENT_CLOSE_ASAP)
|
||||
continue;
|
||||
fastlock_lock(&pat->pclient->lock);
|
||||
if (FCorrectThread(pat->pclient))
|
||||
fastlock_lock(&pat->pclient->lock);
|
||||
addReplyPubsubPatMessage(pat->pclient,
|
||||
pat->pattern,channel,message);
|
||||
fastlock_unlock(&pat->pclient->lock);
|
||||
if (FCorrectThread(pat->pclient))
|
||||
fastlock_unlock(&pat->pclient->lock);
|
||||
receivers++;
|
||||
}
|
||||
}
|
||||
|
@ -173,6 +173,8 @@ typedef struct redisConfig {
|
||||
sds appendonly;
|
||||
} redisConfig;
|
||||
|
||||
int g_fInCrash = false;
|
||||
|
||||
/* Prototypes */
|
||||
static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
static void createMissingClients(client c);
|
||||
|
@ -90,6 +90,8 @@ int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253
|
||||
int *spectrum_palette;
|
||||
int spectrum_palette_size;
|
||||
|
||||
int g_fInCrash = 0;
|
||||
|
||||
/*------------------------------------------------------------------------------
|
||||
* Utility functions
|
||||
*--------------------------------------------------------------------------- */
|
||||
|
@ -384,7 +384,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
|
||||
/* Don't feed slaves that are still waiting for BGSAVE to start */
|
||||
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||
std::unique_lock<decltype(replica->lock)> lock(replica->lock);
|
||||
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
||||
std::unique_lock<decltype(replica->lock)> lock(replica->lock, std::defer_lock);
|
||||
// When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async()
|
||||
if (FCorrectThread(replica))
|
||||
lock.lock();
|
||||
if (serverTL->current_client && FSameHost(serverTL->current_client, replica))
|
||||
{
|
||||
replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start;
|
||||
@ -433,7 +437,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
|
||||
|
||||
while((ln = listNext(&li))) {
|
||||
client *replica = (client*)ln->value;
|
||||
std::lock_guard<decltype(replica->lock)> ulock(replica->lock);
|
||||
std::unique_lock<decltype(replica->lock)> ulock(replica->lock, std::defer_lock);
|
||||
if (FCorrectThread(replica))
|
||||
ulock.lock();
|
||||
if (FMasterHost(replica))
|
||||
continue; // Active Active case, don't feed back
|
||||
|
||||
@ -482,7 +488,10 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
|
||||
listRewind(monitors,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *monitor = (client*)ln->value;
|
||||
std::lock_guard<decltype(monitor->lock)> lock(monitor->lock);
|
||||
std::unique_lock<decltype(monitor->lock)> lock(monitor->lock, std::defer_lock);
|
||||
// When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async()
|
||||
if (FCorrectThread(c))
|
||||
lock.lock();
|
||||
addReplyAsync(monitor,cmdobj);
|
||||
}
|
||||
decrRefCount(cmdobj);
|
||||
@ -1205,7 +1214,21 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
|
||||
}
|
||||
else
|
||||
{
|
||||
aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica]{
|
||||
aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] {
|
||||
// Because the client could have been closed while the lambda waited to run we need to
|
||||
// verify the replica is still connected
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(g_pserver->slaves,&li);
|
||||
bool fFound = false;
|
||||
while ((ln = listNext(&li))) {
|
||||
if (listNodeValue(ln) == replica) {
|
||||
fFound = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!fFound)
|
||||
return;
|
||||
aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE);
|
||||
if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) {
|
||||
freeClient(replica);
|
||||
|
@ -1715,6 +1715,9 @@ void clientsCron(int iel) {
|
||||
fastlock_unlock(&c->lock);
|
||||
}
|
||||
}
|
||||
|
||||
/* Free any pending clients */
|
||||
freeClientsInAsyncFreeQueue(iel);
|
||||
}
|
||||
|
||||
/* This function handles 'background' operations we are required to do
|
||||
@ -1835,6 +1838,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
/* Update the time cache. */
|
||||
updateCachedTime();
|
||||
|
||||
/* Unpause clients if enough time has elapsed */
|
||||
unpauseClientsIfNecessary();
|
||||
|
||||
g_pserver->hz = g_pserver->config_hz;
|
||||
/* Adapt the g_pserver->hz value to the number of configured clients. If we have
|
||||
* many clients, we want to call serverCron() with an higher frequency. */
|
||||
@ -1842,7 +1848,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
while (listLength(g_pserver->clients) / g_pserver->hz >
|
||||
MAX_CLIENTS_PER_CLOCK_TICK)
|
||||
{
|
||||
g_pserver->hz *= 2;
|
||||
g_pserver->hz += g_pserver->hz; // *= 2
|
||||
if (g_pserver->hz > CONFIG_MAX_HZ) {
|
||||
g_pserver->hz = CONFIG_MAX_HZ;
|
||||
break;
|
||||
@ -2055,9 +2061,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
flushAppendOnlyFile(0);
|
||||
}
|
||||
|
||||
/* Clear the paused clients flag if needed. */
|
||||
clientsArePaused(); /* Don't check return value, just use the side effect.*/
|
||||
|
||||
/* Replication cron function -- used to reconnect to master,
|
||||
* detect transfer failures, start background RDB transfers and so forth. */
|
||||
run_with_period(1000) replicationCron();
|
||||
@ -2115,6 +2118,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
|
||||
processUnblockedClients(iel);
|
||||
}
|
||||
|
||||
/* Unpause clients if enough time has elapsed */
|
||||
unpauseClientsIfNecessary();
|
||||
|
||||
ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves
|
||||
clientsCron(iel);
|
||||
|
||||
@ -2907,6 +2913,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
|
||||
pvar->cclients = 0;
|
||||
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
|
||||
pvar->current_client = nullptr;
|
||||
pvar->clients_paused = 0;
|
||||
if (pvar->el == NULL) {
|
||||
serverLog(LL_WARNING,
|
||||
"Failed creating the event loop. Error message: '%s'",
|
||||
@ -2914,7 +2921,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
|
||||
exit(1);
|
||||
}
|
||||
|
||||
fastlock_init(&pvar->lockPendingWrite);
|
||||
fastlock_init(&pvar->lockPendingWrite, "lockPendingWrite");
|
||||
|
||||
if (!fMain)
|
||||
{
|
||||
@ -2961,8 +2968,6 @@ void initServer(void) {
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
setupSignalHandlers();
|
||||
|
||||
fastlock_init(&g_pserver->flock);
|
||||
|
||||
g_pserver->db = (redisDb*)zmalloc(sizeof(redisDb)*cserver.dbnum, MALLOC_LOCAL);
|
||||
|
||||
/* Create the Redis databases, and initialize other internal state. */
|
||||
@ -2996,7 +3001,6 @@ void initServer(void) {
|
||||
g_pserver->ready_keys = listCreate();
|
||||
g_pserver->clients_waiting_acks = listCreate();
|
||||
g_pserver->get_ack_from_slaves = 0;
|
||||
g_pserver->clients_paused = 0;
|
||||
cserver.system_memory_size = zmalloc_get_memory_size();
|
||||
|
||||
createSharedObjects();
|
||||
@ -3733,7 +3737,6 @@ int processCommand(client *c, int callFlags) {
|
||||
queueMultiCommand(c);
|
||||
addReply(c,shared.queued);
|
||||
} else {
|
||||
std::unique_lock<decltype(c->db->lock)> ulock(c->db->lock);
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||
g_pserver->db[idb].trackChanges();
|
||||
call(c,callFlags);
|
||||
@ -4122,7 +4125,7 @@ sds genRedisInfoString(const char *section) {
|
||||
g_pserver->port,
|
||||
(intmax_t)uptime,
|
||||
(intmax_t)(uptime/(3600*24)),
|
||||
g_pserver->hz,
|
||||
g_pserver->hz.load(),
|
||||
g_pserver->config_hz,
|
||||
(unsigned long) lruclock,
|
||||
cserver.executable ? cserver.executable : "",
|
||||
|
13
src/server.h
13
src/server.h
@ -1253,8 +1253,6 @@ public:
|
||||
long long last_expire_set; /* when the last expire was set */
|
||||
double avg_ttl; /* Average TTL, just for stats */
|
||||
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
||||
|
||||
fastlock lock;
|
||||
} redisDb;
|
||||
|
||||
/* Client MULTI/EXEC state */
|
||||
@ -1627,6 +1625,7 @@ struct redisServerThreadVars {
|
||||
aeEventLoop *el;
|
||||
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
|
||||
int ipfd_count; /* Used slots in ipfd[] */
|
||||
int clients_paused; /* True if clients are currently paused */
|
||||
std::vector<client*> clients_pending_write; /* There is to write or install handler. */
|
||||
list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */
|
||||
list *clients_pending_asyncwrite;
|
||||
@ -1636,7 +1635,7 @@ struct redisServerThreadVars {
|
||||
client blocked on a module command needs
|
||||
to be processed. */
|
||||
client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */
|
||||
struct fastlock lockPendingWrite;
|
||||
struct fastlock lockPendingWrite { "thread pending write" };
|
||||
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
||||
long unsigned commandsExecuted = 0;
|
||||
};
|
||||
@ -1719,7 +1718,7 @@ struct redisServer {
|
||||
int config_hz; /* Configured HZ value. May be different than
|
||||
the actual 'hz' field value if dynamic-hz
|
||||
is enabled. */
|
||||
int hz; /* serverCron() calls frequency in hertz */
|
||||
std::atomic<int> hz; /* serverCron() calls frequency in hertz */
|
||||
redisDb *db;
|
||||
dict *commands; /* Command table */
|
||||
dict *orig_commands; /* Command table before command renaming. */
|
||||
@ -1754,7 +1753,6 @@ struct redisServer {
|
||||
list *clients_to_close; /* Clients to close asynchronously */
|
||||
list *slaves, *monitors; /* List of slaves and MONITORs */
|
||||
rax *clients_index; /* Active clients dictionary by client ID. */
|
||||
int clients_paused; /* True if clients are currently paused */
|
||||
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
|
||||
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
|
||||
std::atomic<uint64_t> next_client_id; /* Next client unique ID. Incremental. */
|
||||
@ -2024,8 +2022,6 @@ struct redisServer {
|
||||
|
||||
int fActiveReplica; /* Can this replica also be a master? */
|
||||
|
||||
struct fastlock flock;
|
||||
|
||||
// Format:
|
||||
// Lower 20 bits: a counter incrementing for each command executed in the same millisecond
|
||||
// Upper 44 bits: mstime (least significant 44-bits) enough for ~500 years before rollover from date of addition
|
||||
@ -2255,6 +2251,7 @@ void disconnectSlavesExcept(unsigned char *uuid);
|
||||
int listenToPort(int port, int *fds, int *count, int fReusePort, int fFirstListen);
|
||||
void pauseClients(mstime_t duration);
|
||||
int clientsArePaused(void);
|
||||
void unpauseClientsIfNecessary();
|
||||
int processEventsWhileBlocked(int iel);
|
||||
int handleClientsWithPendingWrites(int iel);
|
||||
int clientHasPendingReplies(client *c);
|
||||
@ -3010,7 +3007,7 @@ void xorDigest(unsigned char *digest, const void *ptr, size_t len);
|
||||
int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags);
|
||||
|
||||
int moduleGILAcquiredByModule(void);
|
||||
extern bool g_fInCrash;
|
||||
extern int g_fInCrash;
|
||||
static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate
|
||||
{
|
||||
return aeThreadOwnsLock() || moduleGILAcquiredByModule() || g_fInCrash;
|
||||
|
@ -14,6 +14,8 @@ proc main {} {
|
||||
spawn_instance redis $::redis_base_port $::instances_count {
|
||||
"cluster-enabled yes"
|
||||
"appendonly yes"
|
||||
"testmode yes"
|
||||
"server-threads 3"
|
||||
}
|
||||
run_tests
|
||||
cleanup
|
||||
|
@ -6,7 +6,7 @@ if {$system_name eq {linux} || $system_name eq {darwin}} {
|
||||
test "Server is able to generate a stack trace on selected systems" {
|
||||
r config set watchdog-period 200
|
||||
r debug sleep 1
|
||||
set pattern "*debugCommand*"
|
||||
set pattern "*watchdogSignalHandler*"
|
||||
set retry 10
|
||||
while {$retry} {
|
||||
set result [exec tail -100 < [srv 0 stdout]]
|
||||
|
@ -238,7 +238,7 @@ start_server {tags {"bitops"}} {
|
||||
r set a "abcdefg"
|
||||
r bitop lshift x a 8
|
||||
r get x
|
||||
} "\x00abcdefg"
|
||||
} "\000abcdefg"
|
||||
|
||||
test {BITOP lshift char} {
|
||||
r set a "\xAA"
|
||||
|
Loading…
x
Reference in New Issue
Block a user