Cleaned up code a bit, need to rewrite some comments to reflect new behaviour
Former-commit-id: 850ec766cd71614ce9e61c12414545cd212d3878
This commit is contained in:
parent
9e344f2577
commit
f7d8a0950d
@ -522,7 +522,6 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) {
|
|||||||
if (g_pserver->maxmemory_policy == MAXMEMORY_NO_EVICTION)
|
if (g_pserver->maxmemory_policy == MAXMEMORY_NO_EVICTION)
|
||||||
goto cant_free; /* We need to free memory, but policy forbids. */
|
goto cant_free; /* We need to free memory, but policy forbids. */
|
||||||
|
|
||||||
serverLog(LL_NOTICE, "evicting i guess lol, the overhead was %ld, the repl_backlog_size, %lld", freeMemoryGetNotCountedMemory(), g_pserver->repl_backlog_size);
|
|
||||||
while (mem_freed < mem_tofree) {
|
while (mem_freed < mem_tofree) {
|
||||||
int j, k, i;
|
int j, k, i;
|
||||||
static unsigned int next_db = 0;
|
static unsigned int next_db = 0;
|
||||||
|
@ -223,9 +223,6 @@ void clientInstallWriteHandler(client *c) {
|
|||||||
* if not already done and, for slaves, if the replica can actually receive
|
* if not already done and, for slaves, if the replica can actually receive
|
||||||
* writes at this stage. */
|
* writes at this stage. */
|
||||||
|
|
||||||
if (c->flags & CLIENT_SLAVE)
|
|
||||||
serverLog(LL_NOTICE, "installing write handler");
|
|
||||||
|
|
||||||
if (!(c->flags & CLIENT_PENDING_WRITE) &&
|
if (!(c->flags & CLIENT_PENDING_WRITE) &&
|
||||||
(c->replstate == REPL_STATE_NONE ||
|
(c->replstate == REPL_STATE_NONE ||
|
||||||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
||||||
@ -277,10 +274,6 @@ void clientInstallAsyncWriteHandler(client *c) {
|
|||||||
int prepareClientToWrite(client *c) {
|
int prepareClientToWrite(client *c) {
|
||||||
bool fAsync = !FCorrectThread(c); // Not async if we're on the right thread
|
bool fAsync = !FCorrectThread(c); // Not async if we're on the right thread
|
||||||
|
|
||||||
if (c->flags & CLIENT_SLAVE){
|
|
||||||
serverLog(LL_NOTICE, "got into prepareClientToWrite");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!fAsync) {
|
if (!fAsync) {
|
||||||
serverAssert(c->conn == nullptr || c->lock.fOwnLock());
|
serverAssert(c->conn == nullptr || c->lock.fOwnLock());
|
||||||
} else {
|
} else {
|
||||||
@ -1695,10 +1688,6 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
serverAssertDebug(FCorrectThread(c));
|
serverAssertDebug(FCorrectThread(c));
|
||||||
|
|
||||||
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
||||||
// serverLog(LL_NOTICE, "acq client");
|
|
||||||
|
|
||||||
if (c->flags & CLIENT_SLAVE)
|
|
||||||
serverLog(LL_NOTICE, "writeToClient has happened");
|
|
||||||
|
|
||||||
while(clientHasPendingReplies(c)) {
|
while(clientHasPendingReplies(c)) {
|
||||||
serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR);
|
serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR);
|
||||||
@ -1710,7 +1699,6 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
|
|
||||||
/* If the buffer was sent, set bufpos to zero to continue with
|
/* If the buffer was sent, set bufpos to zero to continue with
|
||||||
* the remainder of the reply. */
|
* the remainder of the reply. */
|
||||||
// serverLog(LL_NOTICE, "buf pos: %d, sentlen: %ld", c->bufpos, c->sentlen);
|
|
||||||
if ((int)c->sentlen == c->bufpos) {
|
if ((int)c->sentlen == c->bufpos) {
|
||||||
c->bufpos = 0;
|
c->bufpos = 0;
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
@ -1764,33 +1752,24 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
* We always read from the replication backlog directly */
|
* We always read from the replication backlog directly */
|
||||||
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
|
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
|
||||||
|
|
||||||
/* Right now, we're bringing in the offStart into the scope
|
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
|
||||||
* If repl_batch_offStart is equal to -1, that means the mechanism is disabled
|
|
||||||
* which implies there is no data to flush and that the global offset is accurate */
|
|
||||||
// long long offStart = g_pserver->repl_batch_offStart == -1 ? g_pserver->master_repl_offset : g_pserver->repl_batch_offStart;
|
|
||||||
long long offStart = c->repl_end_off;
|
|
||||||
long long idxStart = getReplIndexFromOffset(offStart);
|
|
||||||
|
|
||||||
serverAssert(c->repl_curr_off != -1);
|
serverAssert(c->repl_curr_off != -1);
|
||||||
if (c->repl_curr_off != offStart){
|
if (c->repl_curr_off != c->repl_end_off){
|
||||||
serverLog(LL_NOTICE, "printing the stats for client %lu: c->repl_curr_off: %lld, repl_batch_offStart: %lld, nwritten: %ld, offStart: %lld",
|
long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off);
|
||||||
c->id, c->repl_curr_off, g_pserver->repl_batch_offStart, nwritten, offStart);
|
long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog
|
||||||
|
* in the event of a wrap around write */
|
||||||
long long curr_idx = getReplIndexFromOffset(c->repl_curr_off);
|
|
||||||
long long nwrittenPart2 = 0;
|
|
||||||
/* normal case with no wrap around */
|
/* normal case with no wrap around */
|
||||||
if (idxStart >= curr_idx){
|
if (repl_end_idx >= repl_curr_idx){
|
||||||
nwritten = connWrite(c->conn, g_pserver->repl_backlog + curr_idx, idxStart - curr_idx);
|
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx);
|
||||||
/* wrap around case, v. rare */
|
/* wrap around case */
|
||||||
/* also v. buggy so there's that */
|
|
||||||
} else {
|
} else {
|
||||||
serverLog(LL_NOTICE, "ROAD OF RESISTANCE");
|
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx);
|
||||||
nwritten = connWrite(c->conn, g_pserver->repl_backlog + curr_idx, g_pserver->repl_backlog_size - curr_idx);
|
|
||||||
/* only attempt wrapping if we write the correct number of bytes */
|
/* only attempt wrapping if we write the correct number of bytes */
|
||||||
if (nwritten == g_pserver->repl_backlog_size - curr_idx){
|
if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){
|
||||||
long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, idxStart);
|
nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx);
|
||||||
if (nwrittenPart2 != -1)
|
if (nwritten2ndStage != -1)
|
||||||
nwritten += nwrittenPart2;
|
nwritten += nwritten2ndStage;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1798,31 +1777,19 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
if (nwritten > 0){
|
if (nwritten > 0){
|
||||||
totwritten += nwritten;
|
totwritten += nwritten;
|
||||||
c->repl_curr_off += nwritten;
|
c->repl_curr_off += nwritten;
|
||||||
if (1){
|
serverAssert(c->repl_curr_off <= c->repl_end_off);
|
||||||
serverLog(LL_NOTICE, "printing the stats for client %lu: c->repl_curr_off: %lld, repl_batch_offStart: %lld, nwritten: %ld, offStart: %lld",
|
|
||||||
c->id, c->repl_curr_off, g_pserver->repl_batch_offStart, nwritten, offStart);
|
|
||||||
}
|
|
||||||
serverAssert(c->repl_curr_off <= offStart);
|
|
||||||
/* If the client offset matches the global offset, we wrote all we needed to,
|
/* If the client offset matches the global offset, we wrote all we needed to,
|
||||||
* in which case, there is no pending write */
|
* in which case, there is no pending write */
|
||||||
if (c->repl_curr_off == offStart){
|
if (c->repl_curr_off == c->repl_end_off){
|
||||||
serverLog(LL_NOTICE, "good, %lld", offStart);
|
|
||||||
c->fPendingReplicaWrite = false;
|
c->fPendingReplicaWrite = false;
|
||||||
} else {
|
|
||||||
serverLog(LL_NOTICE, "mismatch between repl_curr_off (%lld) and offStart (%lld)", c->repl_curr_off, offStart);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If the second part of a write didn't go through, we still need to register that */
|
/* If the second part of a write didn't go through, we still need to register that */
|
||||||
if (nwrittenPart2 == -1) nwritten = -1;
|
if (nwritten2ndStage == -1) nwritten = -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (c->flags & CLIENT_SLAVE && handler_installed)
|
|
||||||
// serverLog(LL_NOTICE, "Total bytes written, %ld, write handler installed?: %d", totwritten, handler_installed);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// serverLog(LL_NOTICE, "rel client");
|
|
||||||
g_pserver->stat_net_output_bytes += totwritten;
|
g_pserver->stat_net_output_bytes += totwritten;
|
||||||
if (nwritten == -1) {
|
if (nwritten == -1) {
|
||||||
if (connGetState(c->conn) == CONN_STATE_CONNECTED) {
|
if (connGetState(c->conn) == CONN_STATE_CONNECTED) {
|
||||||
@ -1843,11 +1810,6 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime;
|
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime;
|
||||||
}
|
}
|
||||||
if (!clientHasPendingReplies(c) && !c->fPendingReplicaWrite) {
|
if (!clientHasPendingReplies(c) && !c->fPendingReplicaWrite) {
|
||||||
// if(c->flags & CLIENT_SLAVE && handler_installed){
|
|
||||||
// serverLog(LL_NOTICE, "Uninstalling handler");
|
|
||||||
// serverLog(LL_NOTICE, "repl_backlog_size: %lld", g_pserver->repl_backlog_size);
|
|
||||||
// serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
|
|
||||||
// }
|
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
if (handler_installed) connSetWriteHandler(c->conn, NULL);
|
if (handler_installed) connSetWriteHandler(c->conn, NULL);
|
||||||
|
|
||||||
@ -1863,7 +1825,6 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
/* Write event handler. Just send data to the client. */
|
/* Write event handler. Just send data to the client. */
|
||||||
void sendReplyToClient(connection *conn) {
|
void sendReplyToClient(connection *conn) {
|
||||||
client *c = (client*)connGetPrivateData(conn);
|
client *c = (client*)connGetPrivateData(conn);
|
||||||
// serverLog(LL_NOTICE, "called the sendreplytoclient");
|
|
||||||
if (writeToClient(c,1) == C_ERR)
|
if (writeToClient(c,1) == C_ERR)
|
||||||
{
|
{
|
||||||
AeLocker ae;
|
AeLocker ae;
|
||||||
@ -1997,7 +1958,6 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
|
|||||||
|
|
||||||
auto vec = std::move(g_pserver->rgthreadvar[iel].clients_pending_write);
|
auto vec = std::move(g_pserver->rgthreadvar[iel].clients_pending_write);
|
||||||
processed += (int)vec.size();
|
processed += (int)vec.size();
|
||||||
// serverLog(LL_NOTICE, "entered handleClientsWithPendingWrites");
|
|
||||||
|
|
||||||
for (client *c : vec) {
|
for (client *c : vec) {
|
||||||
serverAssertDebug(FCorrectThread(c));
|
serverAssertDebug(FCorrectThread(c));
|
||||||
@ -2013,12 +1973,6 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
|
|||||||
/* Don't write to clients that are going to be closed anyway. */
|
/* Don't write to clients that are going to be closed anyway. */
|
||||||
if (c->flags & CLIENT_CLOSE_ASAP) continue;
|
if (c->flags & CLIENT_CLOSE_ASAP) continue;
|
||||||
|
|
||||||
// if (c->flags & CLIENT_SLAVE){
|
|
||||||
// if(clientHasPendingReplies(c))
|
|
||||||
// serverLog(LL_NOTICE, "somehow the client buffer has these values: %s", c->buf);
|
|
||||||
// serverLog(LL_NOTICE, "LOL");
|
|
||||||
// }
|
|
||||||
|
|
||||||
/* Try to write buffers to the client socket. */
|
/* Try to write buffers to the client socket. */
|
||||||
if (writeToClient(c,0) == C_ERR)
|
if (writeToClient(c,0) == C_ERR)
|
||||||
{
|
{
|
||||||
@ -2216,34 +2170,6 @@ static void setProtocolError(const char *errstr, client *c) {
|
|||||||
c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR);
|
c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void printQueryBuffer(client *c) {
|
|
||||||
if (cserver.verbosity <= LL_VERBOSE || c->flags & CLIENT_MASTER) {
|
|
||||||
sds client = catClientInfoString(sdsempty(),c);
|
|
||||||
|
|
||||||
/* Sample some protocol to given an idea about what was inside. */
|
|
||||||
char buf[PROTO_DUMP_LEN*2];
|
|
||||||
if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) {
|
|
||||||
snprintf(buf,sizeof(buf),"%s", c->querybuf+c->qb_pos);
|
|
||||||
} else {
|
|
||||||
snprintf(buf,sizeof(buf),"%.*s (... more %zu bytes ...) %.*s", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Remove non printable chars. */
|
|
||||||
char *p = buf;
|
|
||||||
while (*p != '\0') {
|
|
||||||
if (!isprint(*p)) *p = '.';
|
|
||||||
p++;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Log all the client and protocol info. */
|
|
||||||
int loglevel = (c->flags & CLIENT_MASTER) ? LL_WARNING :
|
|
||||||
LL_VERBOSE;
|
|
||||||
serverLog(loglevel,
|
|
||||||
"Query buffer from client %lu: %s. %s", c->id, client, buf);
|
|
||||||
sdsfree(client);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Process the query buffer for client 'c', setting up the client argument
|
/* Process the query buffer for client 'c', setting up the client argument
|
||||||
* vector for command execution. Returns C_OK if after running the function
|
* vector for command execution. Returns C_OK if after running the function
|
||||||
* the client has a well-formed ready to be processed command, otherwise
|
* the client has a well-formed ready to be processed command, otherwise
|
||||||
@ -2498,8 +2424,6 @@ void parseClientCommandBuffer(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
size_t cqueriesStart = c->vecqueuedcmd.size();
|
size_t cqueriesStart = c->vecqueuedcmd.size();
|
||||||
// if (c->flags & CLIENT_MASTER)
|
|
||||||
// printQueryBuffer(c);
|
|
||||||
if (c->reqtype == PROTO_REQ_INLINE) {
|
if (c->reqtype == PROTO_REQ_INLINE) {
|
||||||
if (processInlineBuffer(c) != C_OK) break;
|
if (processInlineBuffer(c) != C_OK) break;
|
||||||
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
|
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
|
||||||
|
@ -60,7 +60,6 @@ static void propagateMasterStaleKeys();
|
|||||||
void updateLowestOffsetAmongReplicas(){
|
void updateLowestOffsetAmongReplicas(){
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
serverAssert(!g_pserver->repl_backlog_lock.fOwnLock());
|
serverAssert(!g_pserver->repl_backlog_lock.fOwnLock());
|
||||||
// serverLog(LL_NOTICE, "off- have repl");
|
|
||||||
long long min_offset = LONG_LONG_MAX;
|
long long min_offset = LONG_LONG_MAX;
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
@ -73,14 +72,13 @@ void updateLowestOffsetAmongReplicas(){
|
|||||||
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
||||||
|
|
||||||
std::unique_lock<fastlock> ul(replica->lock);
|
std::unique_lock<fastlock> ul(replica->lock);
|
||||||
// serverLog(LL_NOTICE, "off- acq client");
|
|
||||||
|
|
||||||
min_offset = std::min(min_offset, replica->repl_curr_off);
|
min_offset = std::min(min_offset, replica->repl_curr_off);
|
||||||
// serverLog(LL_NOTICE, "off- rel client");
|
|
||||||
}
|
}
|
||||||
/* return -1 if no other minimum was found */
|
/* return -1 if no other minimum was found */
|
||||||
g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst);
|
g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We take a global flag to remember if this instance generated an RDB
|
/* We take a global flag to remember if this instance generated an RDB
|
||||||
* because of replication, so that we can remove the RDB file in case
|
* because of replication, so that we can remove the RDB file in case
|
||||||
* the instance is configured to have no persistence. */
|
* the instance is configured to have no persistence. */
|
||||||
@ -232,6 +230,8 @@ void createReplicationBacklog(void) {
|
|||||||
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
|
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long long getReplIndexFromOffset(long long offset);
|
||||||
|
|
||||||
/* This function is called when the user modifies the replication backlog
|
/* This function is called when the user modifies the replication backlog
|
||||||
* size at runtime. It is up to the function to both update the
|
* size at runtime. It is up to the function to both update the
|
||||||
* g_pserver->repl_backlog_size and to resize the buffer and setup it so that
|
* g_pserver->repl_backlog_size and to resize the buffer and setup it so that
|
||||||
@ -243,8 +243,6 @@ void resizeReplicationBacklog(long long newsize) {
|
|||||||
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
|
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
|
||||||
if (g_pserver->repl_backlog_size == newsize) return;
|
if (g_pserver->repl_backlog_size == newsize) return;
|
||||||
|
|
||||||
serverLog(LL_NOTICE, "WE HAD TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize);
|
|
||||||
|
|
||||||
if (g_pserver->repl_backlog != NULL) {
|
if (g_pserver->repl_backlog != NULL) {
|
||||||
/* What we actually do is to flush the old buffer and realloc a new
|
/* What we actually do is to flush the old buffer and realloc a new
|
||||||
* empty one. It will refill with new data incrementally.
|
* empty one. It will refill with new data incrementally.
|
||||||
@ -252,60 +250,9 @@ void resizeReplicationBacklog(long long newsize) {
|
|||||||
* worse often we need to alloc additional space before freeing the
|
* worse often we need to alloc additional space before freeing the
|
||||||
* old buffer. */
|
* old buffer. */
|
||||||
|
|
||||||
if (g_pserver->repl_batch_idxStart >= 0) {
|
|
||||||
// We need to keep critical data so we can't shrink less than the hot data in the buffer
|
|
||||||
newsize = std::max(newsize, g_pserver->master_repl_offset - g_pserver->repl_batch_offStart);
|
|
||||||
char *backlog = (char*)zmalloc(newsize);
|
|
||||||
g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - g_pserver->repl_batch_offStart;
|
|
||||||
|
|
||||||
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
|
|
||||||
auto cbActiveBacklog = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart;
|
|
||||||
memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbActiveBacklog);
|
|
||||||
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
|
||||||
} else {
|
|
||||||
auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart;
|
|
||||||
memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1);
|
|
||||||
memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
|
|
||||||
auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx;
|
|
||||||
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
|
||||||
}
|
|
||||||
zfree(g_pserver->repl_backlog);
|
|
||||||
g_pserver->repl_backlog = backlog;
|
|
||||||
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
|
|
||||||
g_pserver->repl_batch_idxStart = 0;
|
|
||||||
g_pserver->repl_backlog_start = g_pserver->master_repl_offset;
|
|
||||||
} else {
|
|
||||||
zfree(g_pserver->repl_backlog);
|
|
||||||
g_pserver->repl_backlog = (char*)zmalloc(newsize);
|
|
||||||
g_pserver->repl_backlog_histlen = 0;
|
|
||||||
g_pserver->repl_backlog_idx = 0;
|
|
||||||
/* Next byte we have is... the next since the buffer is empty. */
|
|
||||||
g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
g_pserver->repl_backlog_size = newsize;
|
|
||||||
}
|
|
||||||
|
|
||||||
long long getReplIndexFromOffset(long long offset);
|
|
||||||
|
|
||||||
/* The above but for when clients need extra replication backlog because ??? */
|
|
||||||
void resizeReplicationBacklogForClients(long long newsize) {
|
|
||||||
if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
|
|
||||||
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
|
|
||||||
if (g_pserver->repl_backlog_size == newsize) return;
|
|
||||||
|
|
||||||
serverLog(LL_NOTICE, "WE HAVE TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize);
|
|
||||||
/* get the critical client size, i.e. the size of the data unflushed to clients */
|
/* get the critical client size, i.e. the size of the data unflushed to clients */
|
||||||
long long earliest_off = g_pserver->repl_lowest_off.load();
|
long long earliest_off = g_pserver->repl_lowest_off.load();
|
||||||
|
|
||||||
|
|
||||||
if (g_pserver->repl_backlog != NULL) {
|
|
||||||
/* What we actually do is to flush the old buffer and realloc a new
|
|
||||||
* empty one. It will refill with new data incrementally.
|
|
||||||
* The reason is that copying a few gigabytes adds latency and even
|
|
||||||
* worse often we need to alloc additional space before freeing the
|
|
||||||
* old buffer. */
|
|
||||||
|
|
||||||
if (earliest_off != -1) {
|
if (earliest_off != -1) {
|
||||||
// We need to keep critical data so we can't shrink less than the hot data in the buffer
|
// We need to keep critical data so we can't shrink less than the hot data in the buffer
|
||||||
newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off);
|
newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off);
|
||||||
@ -316,8 +263,6 @@ void resizeReplicationBacklogForClients(long long newsize) {
|
|||||||
if (g_pserver->repl_backlog_idx >= earliest_idx) {
|
if (g_pserver->repl_backlog_idx >= earliest_idx) {
|
||||||
auto cbActiveBacklog = g_pserver->repl_backlog_idx - earliest_idx;
|
auto cbActiveBacklog = g_pserver->repl_backlog_idx - earliest_idx;
|
||||||
memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbActiveBacklog);
|
memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbActiveBacklog);
|
||||||
serverLog(LL_NOTICE, "g_pserver->master_repl_offset: %lld, earliest_off: %lld, g_pserver->repl_backlog_idx: %lld, earliest_idx: %lld, repl_backlog_start: %lld",
|
|
||||||
g_pserver->master_repl_offset, earliest_off, g_pserver->repl_backlog_idx, earliest_idx, g_pserver->repl_backlog_start);
|
|
||||||
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
||||||
} else {
|
} else {
|
||||||
auto cbPhase1 = g_pserver->repl_backlog_size - earliest_idx;
|
auto cbPhase1 = g_pserver->repl_backlog_size - earliest_idx;
|
||||||
@ -344,14 +289,9 @@ void resizeReplicationBacklogForClients(long long newsize) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
g_pserver->repl_backlog_size = newsize;
|
g_pserver->repl_backlog_size = newsize;
|
||||||
|
|
||||||
serverLog(LL_NOTICE, "We are ending with: master_repl_offset: %lld, repl_batch_offStart: %lld, new_off: %lld, "
|
|
||||||
"repl_backlog_idx: %lld, repl_batch_idxStart: %lld, new_idx: %lld, repl_backlog_size: %lld",
|
|
||||||
g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, 0LL,
|
|
||||||
g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, 0LL, g_pserver->repl_backlog_size
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void freeReplicationBacklog(void) {
|
void freeReplicationBacklog(void) {
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
listIter li;
|
listIter li;
|
||||||
@ -391,17 +331,11 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
|
|||||||
// This is an emergency overflow, we better resize to fit
|
// This is an emergency overflow, we better resize to fit
|
||||||
long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize);
|
long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize);
|
||||||
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize);
|
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize);
|
||||||
resizeReplicationBacklogForClients(newsize);
|
resizeReplicationBacklog(newsize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// serverLog(LL_NOTICE, "Pt2 start with: master_repl_offset: %lld, repl_batch_offStart: %lld, "
|
|
||||||
// "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, repl_backlog_size: %lld",
|
|
||||||
// g_pserver->master_repl_offset, g_pserver->repl_batch_offStart,
|
|
||||||
// g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, g_pserver->repl_backlog_size
|
|
||||||
// );
|
|
||||||
|
|
||||||
g_pserver->master_repl_offset += len;
|
g_pserver->master_repl_offset += len;
|
||||||
|
|
||||||
/* This is a circular buffer, so write as much data we can at every
|
/* This is a circular buffer, so write as much data we can at every
|
||||||
@ -423,12 +357,6 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
|
|||||||
/* Set the offset of the first byte we have in the backlog. */
|
/* Set the offset of the first byte we have in the backlog. */
|
||||||
g_pserver->repl_backlog_off = g_pserver->master_repl_offset -
|
g_pserver->repl_backlog_off = g_pserver->master_repl_offset -
|
||||||
g_pserver->repl_backlog_histlen + 1;
|
g_pserver->repl_backlog_histlen + 1;
|
||||||
|
|
||||||
// serverLog(LL_NOTICE, "Pt2 end with: master_repl_offset: %lld, repl_batch_offStart: %lld, "
|
|
||||||
// "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, repl_backlog_size: %lld",
|
|
||||||
// g_pserver->master_repl_offset, g_pserver->repl_batch_offStart,
|
|
||||||
// g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, g_pserver->repl_backlog_size
|
|
||||||
// );
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
|
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
|
||||||
@ -578,9 +506,7 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc)
|
|||||||
|
|
||||||
/* Add the SELECT command into the backlog. */
|
/* Add the SELECT command into the backlog. */
|
||||||
/* We don't do this for advanced replication because this will be done later when it adds the whole RREPLAY command */
|
/* We don't do this for advanced replication because this will be done later when it adds the whole RREPLAY command */
|
||||||
if (g_pserver->repl_backlog && fSendRaw) {
|
if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd);
|
||||||
feedReplicationBacklogWithObject(selectcmd);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
|
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
|
||||||
decrRefCount(selectcmd);
|
decrRefCount(selectcmd);
|
||||||
@ -678,7 +604,7 @@ void replicationFeedSlaves(list *replicas, int dictid, robj **argv, int argc) {
|
|||||||
void showLatestBacklog(void) {
|
void showLatestBacklog(void) {
|
||||||
if (g_pserver->repl_backlog == NULL) return;
|
if (g_pserver->repl_backlog == NULL) return;
|
||||||
|
|
||||||
long long dumplen = 1024;
|
long long dumplen = 256;
|
||||||
if (g_pserver->repl_backlog_histlen < dumplen)
|
if (g_pserver->repl_backlog_histlen < dumplen)
|
||||||
dumplen = g_pserver->repl_backlog_histlen;
|
dumplen = g_pserver->repl_backlog_histlen;
|
||||||
|
|
||||||
@ -769,7 +695,9 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
|
|||||||
}
|
}
|
||||||
decrRefCount(cmdobj);
|
decrRefCount(cmdobj);
|
||||||
}
|
}
|
||||||
#define BYPASS_PSYNC
|
|
||||||
|
int prepareClientToWrite(client *c);
|
||||||
|
|
||||||
/* Feed the replica 'c' with the replication backlog starting from the
|
/* Feed the replica 'c' with the replication backlog starting from the
|
||||||
* specified 'offset' up to the end of the backlog. */
|
* specified 'offset' up to the end of the backlog. */
|
||||||
long long addReplyReplicationBacklog(client *c, long long offset) {
|
long long addReplyReplicationBacklog(client *c, long long offset) {
|
||||||
@ -809,26 +737,14 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
|
|||||||
* split the reply in two parts if we are cross-boundary. */
|
* split the reply in two parts if we are cross-boundary. */
|
||||||
len = g_pserver->repl_backlog_histlen - skip;
|
len = g_pserver->repl_backlog_histlen - skip;
|
||||||
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
|
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
|
||||||
#ifdef BYPASS_PSYNC
|
|
||||||
c->repl_curr_off = offset - 1;
|
c->repl_curr_off = offset - 1;
|
||||||
c->repl_end_off = g_pserver->master_repl_offset;
|
c->repl_end_off = g_pserver->master_repl_offset;
|
||||||
serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", c->id, getClientPeerId(c), c->repl_curr_off);
|
|
||||||
|
|
||||||
/* Force the partial sync to be queued */
|
/* Force the partial sync to be queued */
|
||||||
prepareClientToWrite(c);
|
prepareClientToWrite(c);
|
||||||
c->fPendingReplicaWrite = true;
|
c->fPendingReplicaWrite = true;
|
||||||
#else
|
|
||||||
while(len) {
|
|
||||||
long long thislen =
|
|
||||||
((g_pserver->repl_backlog_size - j) < len) ?
|
|
||||||
(g_pserver->repl_backlog_size - j) : len;
|
|
||||||
|
|
||||||
serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
|
|
||||||
addReplySds(c,sdsnewlen(g_pserver->repl_backlog + j, thislen));
|
|
||||||
len -= thislen;
|
|
||||||
j = 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
return g_pserver->repl_backlog_histlen - skip;
|
return g_pserver->repl_backlog_histlen - skip;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -866,15 +782,11 @@ int replicationSetupSlaveForFullResync(client *replica, long long offset) {
|
|||||||
replica->repl_curr_off = offset;
|
replica->repl_curr_off = offset;
|
||||||
replica->repl_end_off = g_pserver->master_repl_offset;
|
replica->repl_end_off = g_pserver->master_repl_offset;
|
||||||
|
|
||||||
serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", replica->id, getClientPeerId(replica), replica->repl_curr_off);
|
|
||||||
|
|
||||||
/* We are going to accumulate the incremental changes for this
|
/* We are going to accumulate the incremental changes for this
|
||||||
* replica as well. Set replicaseldb to -1 in order to force to re-emit
|
* replica as well. Set replicaseldb to -1 in order to force to re-emit
|
||||||
* a SELECT statement in the replication stream. */
|
* a SELECT statement in the replication stream. */
|
||||||
g_pserver->replicaseldb = -1;
|
g_pserver->replicaseldb = -1;
|
||||||
|
|
||||||
serverLog(LL_NOTICE, "We are setting up here lad");
|
|
||||||
|
|
||||||
/* Don't send this reply to slaves that approached us with
|
/* Don't send this reply to slaves that approached us with
|
||||||
* the old SYNC command. */
|
* the old SYNC command. */
|
||||||
if (!(replica->flags & CLIENT_PRE_PSYNC)) {
|
if (!(replica->flags & CLIENT_PRE_PSYNC)) {
|
||||||
@ -1179,7 +1091,6 @@ void syncCommand(client *c) {
|
|||||||
if (g_pserver->FRdbSaveInProgress() &&
|
if (g_pserver->FRdbSaveInProgress() &&
|
||||||
g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK)
|
g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK)
|
||||||
{
|
{
|
||||||
serverLog(LL_NOTICE, "case 1");
|
|
||||||
/* Ok a background save is in progress. Let's check if it is a good
|
/* Ok a background save is in progress. Let's check if it is a good
|
||||||
* one for replication, i.e. if there is another replica that is
|
* one for replication, i.e. if there is another replica that is
|
||||||
* registering differences since the server forked to save. */
|
* registering differences since the server forked to save. */
|
||||||
@ -1211,7 +1122,6 @@ void syncCommand(client *c) {
|
|||||||
} else if (g_pserver->FRdbSaveInProgress() &&
|
} else if (g_pserver->FRdbSaveInProgress() &&
|
||||||
g_pserver->rdb_child_type == RDB_CHILD_TYPE_SOCKET)
|
g_pserver->rdb_child_type == RDB_CHILD_TYPE_SOCKET)
|
||||||
{
|
{
|
||||||
serverLog(LL_NOTICE, "case 2");
|
|
||||||
/* There is an RDB child process but it is writing directly to
|
/* There is an RDB child process but it is writing directly to
|
||||||
* children sockets. We need to wait for the next BGSAVE
|
* children sockets. We need to wait for the next BGSAVE
|
||||||
* in order to synchronize. */
|
* in order to synchronize. */
|
||||||
@ -1219,7 +1129,6 @@ void syncCommand(client *c) {
|
|||||||
|
|
||||||
/* CASE 3: There is no BGSAVE is progress. */
|
/* CASE 3: There is no BGSAVE is progress. */
|
||||||
} else {
|
} else {
|
||||||
serverLog(LL_NOTICE, "case 3");
|
|
||||||
if (g_pserver->repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
|
if (g_pserver->repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
|
||||||
/* Diskless replication RDB child is created inside
|
/* Diskless replication RDB child is created inside
|
||||||
* replicationCron() since we want to delay its start a
|
* replicationCron() since we want to delay its start a
|
||||||
@ -4606,10 +4515,11 @@ void _clientAsyncReplyBufferReserve(client *c, size_t len);
|
|||||||
void flushReplBacklogToClients()
|
void flushReplBacklogToClients()
|
||||||
{
|
{
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
/* If we have the repl backlog lock, we will deadlock */
|
||||||
|
serverAssert(!g_pserver->repl_backlog_lock.fOwnLock());
|
||||||
if (g_pserver->repl_batch_offStart < 0)
|
if (g_pserver->repl_batch_offStart < 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
|
||||||
if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) {
|
if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) {
|
||||||
bool fAsyncWrite = false;
|
bool fAsyncWrite = false;
|
||||||
// Ensure no overflow
|
// Ensure no overflow
|
||||||
@ -4617,34 +4527,18 @@ void flushReplBacklogToClients()
|
|||||||
serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size);
|
serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size);
|
||||||
serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx);
|
serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx);
|
||||||
|
|
||||||
serverLog(LL_NOTICE, "the master repl offset is %lld", g_pserver->master_repl_offset);
|
|
||||||
showLatestBacklog();
|
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listRewind(g_pserver->slaves, &li);
|
listRewind(g_pserver->slaves, &li);
|
||||||
while ((ln = listNext(&li))) {
|
while ((ln = listNext(&li))) {
|
||||||
client *replica = (client*)listNodeValue(ln);
|
client *replica = (client*)listNodeValue(ln);
|
||||||
|
|
||||||
// serverLog(LL_NOTICE, "client %lu is in the party", replica->id);
|
|
||||||
|
|
||||||
// serverLog(LL_NOTICE, "is there a write pending for %lu, %d", replica->id, replica->fPendingReplicaWrite);
|
|
||||||
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||||
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
||||||
|
|
||||||
std::unique_lock<fastlock> ul(replica->lock, std::defer_lock);
|
std::unique_lock<fastlock> ul(replica->lock);
|
||||||
if (FCorrectThread(replica))
|
|
||||||
ul.lock();
|
|
||||||
else
|
|
||||||
fAsyncWrite = true;
|
|
||||||
|
|
||||||
|
|
||||||
/* If we are online and the RDB has been sent, there is no need to feed the client buffer
|
|
||||||
* We will send our replies directly from the replication backlog instead */
|
|
||||||
#ifdef BYPASS_BUFFER
|
|
||||||
{
|
|
||||||
std::unique_lock<fastlock> asyncUl(replica->lock, std::defer_lock);
|
|
||||||
if (!FCorrectThread(replica))
|
if (!FCorrectThread(replica))
|
||||||
asyncUl.lock();
|
fAsyncWrite = true;
|
||||||
|
|
||||||
/* We should have set the repl_curr_off when synchronizing, so it shouldn't be -1 here */
|
/* We should have set the repl_curr_off when synchronizing, so it shouldn't be -1 here */
|
||||||
serverAssert(replica->repl_curr_off != -1);
|
serverAssert(replica->repl_curr_off != -1);
|
||||||
@ -4658,25 +4552,6 @@ void flushReplBacklogToClients()
|
|||||||
replica->fPendingReplicaWrite = true;
|
replica->fPendingReplicaWrite = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
|
|
||||||
long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart;
|
|
||||||
serverAssert((g_pserver->master_repl_offset - g_pserver->repl_batch_offStart) == cbCopy);
|
|
||||||
serverAssert((g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart) >= (cbCopy));
|
|
||||||
serverAssert((g_pserver->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size);
|
|
||||||
|
|
||||||
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy);
|
|
||||||
} else {
|
|
||||||
auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart;
|
|
||||||
if (fAsyncWrite)
|
|
||||||
_clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx);
|
|
||||||
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1);
|
|
||||||
addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
|
|
||||||
|
|
||||||
serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (fAsyncWrite)
|
if (fAsyncWrite)
|
||||||
ProcessPendingAsyncWrites();
|
ProcessPendingAsyncWrites();
|
||||||
|
@ -1796,7 +1796,6 @@ int clientsCronTrackClientsMemUsage(client *c) {
|
|||||||
mem += zmalloc_size(c);
|
mem += zmalloc_size(c);
|
||||||
mem += c->argv_len_sum();
|
mem += c->argv_len_sum();
|
||||||
if (c->argv) mem += zmalloc_size(c->argv);
|
if (c->argv) mem += zmalloc_size(c->argv);
|
||||||
// serverLog(LL_NOTICE, "Mem here is : %lu", mem);
|
|
||||||
/* Now that we have the memory used by the client, remove the old
|
/* Now that we have the memory used by the client, remove the old
|
||||||
* value from the old category, and add it back. */
|
* value from the old category, and add it back. */
|
||||||
g_pserver->stat_clients_type_memory[c->client_cron_last_memory_type] -=
|
g_pserver->stat_clients_type_memory[c->client_cron_last_memory_type] -=
|
||||||
@ -1855,7 +1854,6 @@ void clientsCron(int iel) {
|
|||||||
while(listLength(g_pserver->clients) && iterations--) {
|
while(listLength(g_pserver->clients) && iterations--) {
|
||||||
client *c;
|
client *c;
|
||||||
listNode *head;
|
listNode *head;
|
||||||
// serverLog(LL_NOTICE, "we are at iteration: %d", iterations);
|
|
||||||
/* Rotate the list, take the current head, process.
|
/* Rotate the list, take the current head, process.
|
||||||
* This way if the client must be removed from the list it's the
|
* This way if the client must be removed from the list it's the
|
||||||
* first element and we don't incur into O(N) computation. */
|
* first element and we don't incur into O(N) computation. */
|
||||||
|
@ -3540,7 +3540,6 @@ void tlsInit(void);
|
|||||||
void tlsInitThread();
|
void tlsInitThread();
|
||||||
int tlsConfigure(redisTLSContextConfig *ctx_config);
|
int tlsConfigure(redisTLSContextConfig *ctx_config);
|
||||||
|
|
||||||
int prepareClientToWrite(client *c);
|
|
||||||
|
|
||||||
|
|
||||||
class ShutdownException
|
class ShutdownException
|
||||||
|
Loading…
x
Reference in New Issue
Block a user