Merge branch 'unstable' into keydbpro

Former-commit-id: d89c15518f984c1d4d4e7638a4e8ac5aa499632a
This commit is contained in:
John Sully 2020-05-11 00:53:38 -04:00
commit d1948ab944
21 changed files with 154 additions and 65 deletions

View File

@ -14,24 +14,21 @@ jobs:
git submodule init
git submodule update
sudo apt remove libzstd
sudo apt-get install uuid-dev libcurl4-openssl-dev libbz2-dev zlib1g-dev libsnappy-dev liblz4-dev libzstd-dev libgflags-dev
sudo apt-get -y install uuid-dev libcurl4-openssl-dev libbz2-dev zlib1g-dev libsnappy-dev liblz4-dev libzstd-dev libgflags-dev
sudo apt-get -y install uuid-dev libcurl4-openssl-dev
make -j2
- name: test
run: |
sudo apt-get install tcl8.5
sudo apt-get -y install tcl8.5
./runtest --clients 2 --verbose
- name: cluster-test
run: |
./runtest-cluster
- name: sentinel test
run: |
./runtest-sentinel
- name: module tests
run: |
./runtest-moduleapi
build-ubuntu-old:
runs-on: ubuntu-16.04
steps:
- uses: actions/checkout@v1
with:
submodules: true
- name: make
run: |
git submodule init
git submodule update
sudo apt remove libzstd
sudo apt-get install uuid-dev libcurl4-openssl-dev libbz2-dev zlib1g-dev libsnappy-dev liblz4-dev libzstd-dev libgflags-dev
make -j2

3
.gitmodules vendored
View File

@ -1,3 +1,6 @@
[submodule "deps/rocksdb"]
path = deps/rocksdb
url = https://github.com/facebook/rocksdb.git
[submodule "deps/depot_tools"]
path = deps/depot_tools
url = https://chromium.googlesource.com/chromium/tools/depot_tools.git

View File

@ -1,5 +1,6 @@
![Current Release](https://img.shields.io/github/release/JohnSully/KeyDB.svg)
[![Build Status](https://travis-ci.org/JohnSully/KeyDB.svg?branch=unstable)](https://travis-ci.org/JohnSully/KeyDB) [![Join the chat at https://gitter.im/KeyDB/community](https://badges.gitter.im/KeyDB/community.svg)](https://gitter.im/KeyDB/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
![CI](https://github.com/JohnSully/KeyDB/workflows/CI/badge.svg?branch=unstable)
[![Join the chat at https://gitter.im/KeyDB/community](https://badges.gitter.im/KeyDB/community.svg)](https://gitter.im/KeyDB/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![StackShare](http://img.shields.io/badge/tech-stack-0690fa.svg?style=flat)](https://stackshare.io/eq-alpha-technology-inc/eq-alpha-technology-inc)
##### Need Help? Check out our extensive [documentation](https://docs.keydb.dev).

1
deps/depot_tools vendored Submodule

@ -0,0 +1 @@
Subproject commit aaf566999558aa8ead38811228cd539a6e6e2fda

View File

@ -72,6 +72,7 @@ endif
ifeq ($(COMPILER_NAME),clang)
CXXFLAGS+= -stdlib=libc++
LDFLAGS+= -latomic
endif
# To get ARM stack traces if Redis crashes we need a special C flag.
@ -91,10 +92,6 @@ else
LICENSE_LIB_DIR=../deps/license/x64/
endif
ifneq (,$(findstring armv,$(uname_M)))
FINAL_LIBS+=-latomic
endif
# Backwards compatibility for selecting an allocator
ifeq ($(USE_TCMALLOC),yes)
MALLOC=tcmalloc
@ -123,18 +120,14 @@ endif
FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS)
FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CXXFLAGS) $(REDIS_CFLAGS)
FINAL_LDFLAGS=$(LDFLAGS) $(REDIS_LDFLAGS) $(DEBUG)
FINAL_LIBS+=-lm -lcurl -lz -latomic -L$(LICENSE_LIB_DIR) -lkey -lcrypto
FINAL_LIBS+=-lm -lcurl -lz -latomic -L$(LICENSE_LIB_DIR) -lkey -lcrypto -lbz2
DEBUG=-g -ggdb
# Linux ARM needs -latomic at linking time
ifneq (,$(filter aarch64 armv,$(uname_M)))
FINAL_LIBS+=-latomic
else
ifneq (,$(findstring armv,$(uname_M)))
FINAL_LIBS+=-latomic
endif
ifneq ($(uname_S),Darwin)
FINAL_LIBS+=-latomic
endif
ifeq ($(uname_S),SunOS)
# SunOS
ifneq ($(@@),32bit)

View File

@ -4246,7 +4246,7 @@ void clusterReplyMultiBulkSlots(client *c) {
static_assert((CLUSTER_SLOTS % (sizeof(uint32_t)*8)) == 0, "code below assumes the bitfield is a multiple of sizeof(unsinged)");
for (unsigned iw = 0; iw < (CLUSTER_SLOTS/sizeof(uint32_t)/8); ++iw)
for (int iw = 0; iw < (CLUSTER_SLOTS/(int)sizeof(uint32_t)/8); ++iw)
{
uint32_t wordCur = reinterpret_cast<uint32_t*>(node->slots)[iw];
if (iw != ((CLUSTER_SLOTS/sizeof(uint32_t)/8)-1))
@ -4259,7 +4259,7 @@ void clusterReplyMultiBulkSlots(client *c) {
unsigned ibitStartLoop = iw*sizeof(uint32_t)*8;
for (unsigned j = ibitStartLoop; j < (iw+1)*sizeof(uint32_t)*8; j++) {
for (int j = ibitStartLoop; j < (iw+1)*(int)sizeof(uint32_t)*8; j++) {
int i;
int bit = (int)(wordCur & 1);
wordCur >>= 1;

View File

@ -66,7 +66,6 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew)
return;
}
static void lookupKeyUpdateObj(robj *val, int flags)
{
/* Update the access time for the ageing algorithm.
@ -285,6 +284,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val, bool fRemoveExpire) {
auto itr = db->find(key);
serverAssertWithInfo(NULL,key,itr != nullptr);
lookupKeyUpdateObj(val, LOOKUP_NONE);
db->dbOverwriteCore(itr, key, val, !!g_pserver->fActiveReplica, fRemoveExpire);
}

View File

@ -45,6 +45,7 @@
#include <stdarg.h>
#include <stdio.h>
#include "config.h"
#include "serverassert.h"
#ifdef __APPLE__
#include <TargetConditionals.h>
@ -140,13 +141,11 @@ extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex);
#endif
#pragma weak _serverPanic
extern "C" __attribute__((weak)) void _serverPanic(const char * /*file*/, int /*line*/, const char * /*msg*/, ...)
{
*((char*)-1) = 'x';
}
#pragma weak serverLog
__attribute__((weak)) void serverLog(int , const char *fmt, ...)
{
va_list args;
@ -166,7 +165,7 @@ extern "C" pid_t gettid()
if (pidCache == -1) {
uint64_t tidT;
pthread_threadid_np(nullptr, &tidT);
assert(tidT < UINT_MAX);
serverAssert(tidT < UINT_MAX);
pidCache = (int)tidT;
}
#endif
@ -395,7 +394,7 @@ extern "C" void fastlock_unlock(struct fastlock *lock)
{
int pidT;
__atomic_load(&lock->m_pidOwner, &pidT, __ATOMIC_RELAXED);
assert(pidT >= 0); // unlock after free
serverAssert(pidT >= 0); // unlock after free
int t = -1;
__atomic_store(&lock->m_pidOwner, &t, __ATOMIC_RELEASE);
std::atomic_thread_fence(std::memory_order_release);
@ -433,7 +432,7 @@ extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex)
extern "C" void fastlock_free(struct fastlock *lock)
{
// NOP
assert((lock->m_ticket.m_active == lock->m_ticket.m_avail) // Asser the lock is unlocked
serverAssert((lock->m_ticket.m_active == lock->m_ticket.m_avail) // Asser the lock is unlocked
|| (lock->m_pidOwner == gettid() && (lock->m_ticket.m_active == lock->m_ticket.m_avail-1))); // OR we own the lock and nobody else is waiting
lock->m_pidOwner = -2; // sentinal value indicating free
ANNOTATE_RWLOCK_DESTROY(lock);

View File

@ -84,4 +84,4 @@ struct fastlock
#endif
};
static_assert(offsetof(struct fastlock, m_ticket) == 64, "ensure padding is correct");
static_assert(offsetof(struct fastlock, m_ticket) == 64, "ensure padding is correct");

View File

@ -50,9 +50,6 @@
const double EARTH_RADIUS_IN_METERS = 6372797.560856;
const double MERCATOR_MAX = 20037726.37;
#if 0 //unused
const double MERCATOR_MIN = -20037726.37;
#endif
static inline double deg_rad(double ang) { return ang * D_R; }
static inline double rad_deg(double ang) { return ang / D_R; }

View File

@ -1173,7 +1173,12 @@ struct commandHelp {
"name [single/repeat] [optional: start] delay script numkeys [key N] [arg N]",
"Run a specified script after start + delay, optionally repeating every delay interval. The job may be cancelled by deleting the key associated with the job (name parameter)",
10,
"6.5.2"
"6.5.2"},
{ "KEYDB.HRENAME",
"key [src hash key] [dst hash key]",
"Rename a hash key, copying the value.",
4,
"6.5.3"
}
};

View File

@ -1775,15 +1775,15 @@ int modulePopulateReplicationInfoStructure(void *ri, int structver) {
memset(ri1,0,sizeof(*ri1));
ri1->version = structver;
ri1->master = listLength(g_pserver->masters) == 0;
if (ri1->master)
if (!ri1->master)
{
redisMaster *mi = (redisMaster*)listFirst(g_pserver->masters);
redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters));
ri1->masterhost = (char*)(mi->masterhost? mi->masterhost: "");
ri1->masterport = mi->masterport;
}
else
{
ri1->masterhost = nullptr;
ri1->masterhost = "";
ri1->masterport = -1;
}
ri1->repl1_offset = g_pserver->master_repl_offset;
@ -4701,13 +4701,14 @@ void moduleHandleBlockedClients(int iel) {
if ((c != nullptr) && (iel != c->iel))
continue;
std::unique_lock<fastlock> ul;
listDelNode(moduleUnblockedClients,ln);
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
if (c)
{
AssertCorrectThread(c);
fastlock_lock(&c->lock);
ul = std::unique_lock<fastlock>(c->lock);
}
/* Release the lock during the loop, as long as we don't
@ -4773,7 +4774,6 @@ void moduleHandleBlockedClients(int iel) {
/* Free 'bc' only after unblocking the client, since it is
* referenced in the client blocking context, and must be valid
* when calling unblockClient(). */
fastlock_unlock(&c->lock);
bc->module->blocked_clients--;
zfree(bc);
@ -7765,8 +7765,12 @@ int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) {
*lfu_freq = -1;
if (!key->value)
return REDISMODULE_ERR;
serverLog(LL_WARNING, "MAXMEMORY_POLICY: %X", g_pserver->maxmemory_policy);
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU)
{
*lfu_freq = LFUDecrAndReturn(key->value);
serverLog(LL_WARNING, "lfu_freq: %lld", lfu_freq);
}
return REDISMODULE_OK;
}

View File

@ -1084,7 +1084,7 @@ void copyClientOutputBuffer(client *dst, client *src) {
/* Return true if the specified client has pending reply buffers to write to
* the socket. */
int clientHasPendingReplies(client *c) {
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
return (c->bufpos || listLength(c->reply));
}
static std::atomic<int> rgacceptsInFlight[MAX_EVENT_LOOPS];
@ -1597,6 +1597,8 @@ bool freeClient(client *c) {
return true;
}
fastlock lockasyncfree {"async free lock"};
/* Schedule a client to free it at a safe time in the serverCron() function.
* This function is useful when we need to terminate a client but we are in
* a context where calling freeClient() is not possible, because the client
@ -1613,6 +1615,7 @@ void freeClientAsync(client *c) {
lock.arm(c);
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; // race condition after we acquire the lock
c->flags |= CLIENT_CLOSE_ASAP;
std::unique_lock<fastlock> ul(lockasyncfree);
listAddNodeTail(g_pserver->clients_to_close,c);
}
@ -1624,6 +1627,7 @@ void freeClientsInAsyncFreeQueue(int iel) {
// Store the clients in a temp vector since freeClient will modify this list
std::vector<client*> vecclientsFree;
std::unique_lock<fastlock> ul(lockasyncfree);
while((ln = listNext(&li)))
{
client *c = (client*)listNodeValue(ln);
@ -1633,6 +1637,7 @@ void freeClientsInAsyncFreeQueue(int iel) {
listDelNode(g_pserver->clients_to_close, ln);
}
}
ul.unlock();
for (client *c : vecclientsFree)
{
@ -2401,11 +2406,13 @@ void readQueryFromClient(connection *conn) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
aelock.arm(c);
freeClientAsync(c);
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection");
aelock.arm(c);
freeClientAsync(c);
return;
} else if (c->flags & CLIENT_MASTER) {
@ -2787,6 +2794,7 @@ NULL
!= C_OK) return;
struct client *target = lookupClientByID(id);
if (target && target->flags & CLIENT_BLOCKED) {
std::unique_lock<fastlock> ul(target->lock);
if (unblock_error)
addReplyError(target,
"-UNBLOCKED client unblocked via CLIENT UNBLOCK");

View File

@ -127,9 +127,10 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
* OBJ_ENCODING_EMBSTR_SIZE_LIMIT, otherwise the RAW encoding is
* used.
*
* The current limit of 44 is chosen so that the biggest string object
* The current limit of 52 is chosen so that the biggest string object
* we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 44
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48
static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total");
robj *createStringObject(const char *ptr, size_t len) {
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT)
return createEmbeddedStringObject(ptr,len);

View File

@ -324,7 +324,7 @@ typedef struct RedisModuleReplicationInfo {
from the module to the core right now. Here
for future compatibility. */
int master; /* true if master, false if replica */
char *masterhost; /* master instance hostname for NOW_REPLICA */
const char *masterhost; /* master instance hostname for NOW_REPLICA */
int masterport; /* master instance port for NOW_REPLICA */
char *replid1; /* Main replication ID */
char *replid2; /* Secondary replication ID */

View File

@ -1043,6 +1043,10 @@ struct redisCommand redisCommandTable[] = {
{"keydb.cron",cronCommand,-5,
"write use-memory",
0,NULL,1,1,1,0,0,0},
{"keydb.hrename", hrenameCommand, 4,
"write fast @hash",
0,NULL,0,0,0,0,0,0}
};
/*============================ Utility functions ============================ */

View File

@ -94,6 +94,7 @@ typedef long long ustime_t; /* microsecond time type. */
#include "uuid.h"
#include "semiorderedset.h"
#include "connection.h" /* Connection abstraction */
#include "serverassert.h"
#define REDISMODULE_CORE 1
#include "redismodule.h" /* Redis modules API defines. */
@ -271,11 +272,21 @@ public:
return m_ptr == other.m_ptr;
}
bool operator==(const void *p) const
{
return m_ptr == p;
}
bool operator!=(const robj_sharedptr &other) const
{
return m_ptr != other.m_ptr;
}
bool operator!=(const void *p) const
{
return m_ptr != p;
}
redisObject* operator->() const
{
return m_ptr;
@ -666,16 +677,6 @@ public:
* The actual resolution depends on g_pserver->hz. */
#define run_with_period(_ms_) if ((_ms_ <= 1000/g_pserver->hz) || !(g_pserver->cronloops%((_ms_)/(1000/g_pserver->hz))))
/* We can print the stacktrace, so our assert is defined this way: */
#define serverAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_serverAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1)))
#define serverAssert(_e) ((_e)?(void)0 : (_serverAssert(#_e,__FILE__,__LINE__),_exit(1)))
#ifdef _DEBUG
#define serverAssertDebug(_e) serverAssert(_e)
#else
#define serverAssertDebug(_e)
#endif
#define serverPanic(...) _serverPanic(__FILE__,__LINE__,__VA_ARGS__),_exit(1)
/*-----------------------------------------------------------------------------
* Data types
*----------------------------------------------------------------------------*/
@ -3375,6 +3376,7 @@ void xdelCommand(client *c);
void xtrimCommand(client *c);
void aclCommand(client *c);
void replicaReplayCommand(client *c);
void hrenameCommand(client *c);
int FBrokenLinkToMaster();
int FActiveMaster(client *c);
@ -3396,9 +3398,6 @@ void *realloc(void *ptr, size_t size);
#endif
/* Debugging stuff */
void _serverAssertWithInfo(const client *c, robj_roptr o, const char *estr, const char *file, int line);
extern "C" void _serverAssert(const char *estr, const char *file, int line);
extern "C" void _serverPanic(const char *file, int line, const char *msg, ...);
void bugReportStart(void);
void serverLogObjectDebugInfo(robj_roptr o);
void sigsegvHandler(int sig, siginfo_t *info, void *secret);

15
src/serverassert.h Normal file
View File

@ -0,0 +1,15 @@
#pragma once
void _serverAssertWithInfo(const struct client *c, class robj_roptr o, const char *estr, const char *file, int line);
extern "C" void _serverAssert(const char *estr, const char *file, int line);
extern "C" void _serverPanic(const char *file, int line, const char *msg, ...);
/* We can print the stacktrace, so our assert is defined this way: */
#define serverAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_serverAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1)))
#define serverAssert(_e) ((_e)?(void)0 : (_serverAssert(#_e,__FILE__,__LINE__),_exit(1)))
#ifdef _DEBUG
#define serverAssertDebug(_e) serverAssert(_e)
#else
#define serverAssertDebug(_e)
#endif
#define serverPanic(...) _serverPanic(__FILE__,__LINE__,__VA_ARGS__),_exit(1)

View File

@ -1,7 +1,7 @@
#ifndef __STORAGE_H__
#define __STORAGE_H__
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 44 // Note: also defined in object.c - should always match
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 // Note: also defined in object.c - should always match
#ifdef __cplusplus
extern "C" {

View File

@ -832,3 +832,33 @@ void hscanCommand(client *c) {
checkType(c,o,OBJ_HASH)) return;
scanGenericCommand(c,o,cursor);
}
void hrenameCommand(client *c) {
robj *o = nullptr;
const unsigned char *vstr;
unsigned int vlen;
long long ll;
if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp])) == nullptr ||
checkType(c,o,OBJ_HASH)) return;
if (hashTypeGetValue(o, szFromObj(c->argv[2]), &vstr, &vlen, &ll) != C_OK)
{
addReplyError(c, "hash key doesn't exist");
return;
}
sds sdsT = nullptr;
if (vstr != nullptr)
{
sdsT = sdsnewlen(vstr, vlen);
}
else
{
sdsT = sdsfromlonglong(ll);
}
hashTypeDelete(o, szFromObj(c->argv[2]));
hashTypeSet(o, szFromObj(c->argv[3]), sdsT, HASH_SET_TAKE_VALUE);
addReplyLongLong(c, 1);
}

View File

@ -445,6 +445,38 @@ start_server {tags {"hash"}} {
}
}
test {KEYDB.HRENAME basic} {
r flushall
r hset testkey foo bar
r keydb.hrename testkey foo foo1
assert_equal [r hlen testkey] {1}
assert_equal [r hget testkey foo1] {bar}
}
test {KEYDB.HRENAME same name} {
r flushall
r hset testkey foo bar
r keydb.hrename testkey foo foo
assert_equal [r hlen testkey] {1}
assert_equal [r hget testkey foo] {bar}
}
test {KEYDB.HRENAME overwrite dest} {
r flushall
r hset testkey foo bar foo1 baz
r keydb.hrename testkey foo foo1
assert_equal [r hlen testkey] {1}
assert_equal [r hget testkey foo1] {bar}
}
test {KEYDB.HRENAME integer basic} {
r flushall
r hset testkey foo 1
r keydb.hrename testkey foo bar
assert_equal [r hlen testkey] {1}
assert_equal [r hget testkey bar] {1}
}
test {Hash ziplist regression test for large keys} {
r hset hash kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkk a
r hset hash kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkk b