Second implementation of nested hashes

Former-commit-id: ba950a3d1f5708ed986f9b348eafcace6a7c22b9
This commit is contained in:
John Sully 2020-11-06 19:24:48 +00:00
parent 7db922f44b
commit e085772d01
9 changed files with 406 additions and 12 deletions

View File

@ -276,7 +276,7 @@ endif
REDIS_SERVER_NAME=keydb-server
REDIS_SENTINEL_NAME=keydb-sentinel
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o $(ASM_OBJ)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o $(ASM_OBJ)
REDIS_CLI_NAME=keydb-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o motd.o $(ASM_OBJ)
REDIS_BENCHMARK_NAME=keydb-benchmark

View File

@ -424,6 +424,10 @@ void addReplyProto(client *c, const char *s, size_t len) {
_addReplyProtoToList(c,s,len);
}
void addReplyProtoCString(client *c, const char *s) {
addReplyProto(c, s, strlen(s));
}
std::string escapeString(sds str)
{
std::string newstr;

View File

@ -30,6 +30,7 @@
#include "server.h"
#include "cron.h"
#include "t_nhash.h"
#include <math.h>
#include <ctype.h>
#include <mutex>
@ -395,6 +396,7 @@ void decrRefCount(robj_roptr o) {
case OBJ_MODULE: freeModuleObject(o); break;
case OBJ_STREAM: freeStreamObject(o); break;
case OBJ_CRON: freeCronObject(o); break;
case OBJ_NESTEDHASH: freeNestedHashObject(o); break;
default: serverPanic("Unknown object type"); break;
}
if (g_pserver->fActiveReplica) {

View File

@ -390,6 +390,10 @@ public:
: sdsview(sdsdup(other.m_str))
{}
sdsstring(const char *rgch, size_t cch)
: sdsview(sdsnewlen(rgch, cch))
{}
sdsstring(sdsstring &&other)
: sdsview(other.m_str)
{
@ -410,6 +414,12 @@ public:
return *this;
}
sds release() {
sds sdsT = m_str;
m_str = nullptr;
return sdsT;
}
~sdsstring()
{
sdsfree(m_str);

View File

@ -63,6 +63,7 @@
#include <mutex>
#include "aelocker.h"
#include "motd.h"
#include "t_nhash.h"
#ifdef __linux__
#include <sys/prctl.h>
#endif
@ -1060,7 +1061,15 @@ struct redisCommand redisCommandTable[] = {
{"stralgo",stralgoCommand,-2,
"read-only @string",
0,lcsGetKeys,0,0,0,0,0,0}
0,lcsGetKeys,0,0,0,0,0,0},
{"keydb.nhget",nhgetCommand,-2,
"read-only fast @hash",
0,NULL,1,1,1,0,0,0},
{"keydb.nhset",nhsetCommand,-3,
"read-only fast @hash",
0,NULL,1,1,1,0,0,0},
};
/*============================ Utility functions ============================ */
@ -3558,7 +3567,15 @@ void call(client *c, int flags) {
updateCachedTime(0);
incrementMvccTstamp();
start = g_pserver->ustime;
c->cmd->proc(c);
try {
c->cmd->proc(c);
} catch (robj_roptr o) {
addReply(c, o);
} catch (robj *o) {
addReply(c, o);
} catch (const char *sz) {
addReplyError(c, sz);
}
serverTL->commandsExecuted++;
duration = ustime()-start;
dirty = g_pserver->dirty-dirty;

View File

@ -654,11 +654,11 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
/* A redis object, that is a type able to hold a string / list / set */
/* The actual Redis Object */
#define OBJ_STRING 0 /* String object. */
#define OBJ_LIST 1 /* List object. */
#define OBJ_SET 2 /* Set object. */
#define OBJ_ZSET 3 /* Sorted set object. */
#define OBJ_HASH 4 /* Hash object. */
#define OBJ_STRING 0 /* String object. */
#define OBJ_LIST 1 /* List object. */
#define OBJ_SET 2 /* Set object. */
#define OBJ_ZSET 3 /* Sorted set object. */
#define OBJ_HASH 4 /* Hash object. */
/* The "module" object type is a special one that signals that the object
* is one directly managed by a Redis module. In this case the value points
@ -671,10 +671,10 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
* by a 64 bit module type ID, which has a 54 bits module-specific signature
* in order to dispatch the loading to the right module, plus a 10 bits
* encoding version. */
#define OBJ_MODULE 5 /* Module object. */
#define OBJ_STREAM 6 /* Stream object. */
#define OBJ_CRON 7 /* CRON job */
#define OBJ_MODULE 5 /* Module object. */
#define OBJ_STREAM 6 /* Stream object. */
#define OBJ_CRON 7 /* CRON job */
#define OBJ_NESTEDHASH 8 /* Nested Hash Object */
/* Extract encver / signature from a module type ID. */
#define REDISMODULE_TYPE_ENCVER_BITS 10
@ -2009,6 +2009,7 @@ void addReplyNullArray(client *c);
void addReplyBool(client *c, int b);
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext);
void addReplyProto(client *c, const char *s, size_t len);
void addReplyProtoCString(client *c, const char *s);
void addReplyBulk(client *c, robj_roptr obj);
void AddReplyFromClient(client *c, client *src);
void addReplyBulkCString(client *c, const char *s);

353
src/t_nhash.cpp Normal file
View File

@ -0,0 +1,353 @@
/*
* Copyright (c) 2020, EQ Alpha Technology Ltd. <john at eqalpha dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "server.h"
#include <math.h>
void dictObjectDestructor(void *privdata, void *val);
dictType nestedHashDictType {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictObjectDestructor, /* val destructor */
};
robj *createNestHashBucket() {
dict *d = dictCreate(&nestedHashDictType, nullptr);
return createObject(OBJ_NESTEDHASH, d);
}
void freeNestedHashObject(robj_roptr o) {
dictRelease((dict*)ptrFromObj(o));
}
robj *fetchFromKey(redisDb *db, robj_roptr key) {
const char *pchCur = szFromObj(key);
const char *pchStart = pchCur;
const char *pchMax = pchCur + sdslen(pchCur);
robj *o = nullptr;
while (pchCur <= pchMax) {
if (pchCur == pchMax || *pchCur == '.') {
// WARNING: Don't deref pchCur as it may be pchMax
// New word
if ((pchCur - pchStart) < 1) {
throw shared.syntaxerr; // malformed
}
dict *d = nullptr;
if (o == nullptr)
d = db->pdict;
else
d = (dict*)ptrFromObj(o);
sdsstring str(pchStart, pchCur - pchStart);
dictEntry *de = dictFind(d, str.get());
o = (de != nullptr) ? (robj*)dictGetVal(de) : nullptr;
if (o == nullptr) throw shared.nokeyerr; // Not Found
serverAssert(o->type == OBJ_NESTEDHASH || o->type == OBJ_STRING || o->type == OBJ_LIST);
if (o->type == OBJ_STRING && pchCur != pchMax)
throw shared.nokeyerr; // Past the end
pchStart = pchCur + 1;
}
++pchCur;
}
return o;
}
// Returns one if we overwrote a value
bool setWithKey(redisDb *db, robj_roptr key, robj *val, bool fCreateBuckets) {
const char *pchCur = szFromObj(key);
const char *pchStart = pchCur;
const char *pchMax = pchCur + sdslen(pchCur);
robj *o = nullptr;
while (pchCur <= pchMax) {
if (pchCur == pchMax || *pchCur == '.') {
// WARNING: Don't deref pchCur as it may be pchMax
// New word
if ((pchCur - pchStart) < 1) {
throw shared.syntaxerr; // malformed
}
dict *d = nullptr;
if (o == nullptr)
d = db->pdict;
else
d = (dict*)ptrFromObj(o);
sdsstring str(pchStart, pchCur - pchStart);
dictEntry *de = dictFind(d, str.get());
if (pchCur == pchMax) {
val->addref();
if (de != nullptr) {
decrRefCount((robj*)dictGetVal(de));
dictSetVal(d, de, val);
return true;
} else {
dictAdd(d, str.release(), val);
return false;
}
} else {
o = (de != nullptr) ? (robj*)dictGetVal(de) : nullptr;
if (o == nullptr) {
if (!fCreateBuckets)
throw shared.nokeyerr; // Not Found
o = createNestHashBucket();
serverAssert(dictAdd(d, str.release(), o) == DICT_OK);
} else if (o->type != OBJ_NESTEDHASH) {
decrRefCount(o);
o = createNestHashBucket();
de->v.val = o;
}
}
pchStart = pchCur + 1;
}
++pchCur;
}
throw "Internal Error";
}
void writeNestedHashToClient(client *c, robj_roptr o) {
if (o == nullptr) {
addReply(c, shared.null[c->resp]);
} else if (o->type == OBJ_STRING) {
addReplyBulk(c, o);
} else if (o->type == OBJ_LIST) {
unsigned char *zl = (unsigned char*)ptrFromObj(o);
addReplyArrayLen(c, ziplistLen(zl));
unsigned char *p = ziplistIndex(zl, ZIPLIST_HEAD);
while (p != nullptr) {
unsigned char *str;
unsigned int len;
long long lval;
if (ziplistGet(p, &str, &len, &lval)) {
char rgT[128];
if (str == nullptr) {
len = ll2string(rgT, 128, lval);
str = (unsigned char*)rgT;
}
addReplyBulkCBuffer(c, (const char*)str, len);
}
p = ziplistNext(zl, p);
}
} else {
serverAssert(o->type == OBJ_NESTEDHASH );
dict *d = (dict*)ptrFromObj(o);
if (dictSize(d) > 1)
addReplyArrayLen(c, dictSize(d));
dictIterator *di = dictGetIterator(d);
dictEntry *de;
while ((de = dictNext(di))) {
robj_roptr oT = (robj*)dictGetVal(de);
addReplyArrayLen(c, 2);
addReplyBulkCBuffer(c, (sds)dictGetKey(de), sdslen((sds)dictGetKey(de)));
if (oT->type == OBJ_STRING) {
addReplyBulk(c, oT);
} else {
writeNestedHashToClient(c, oT);
}
}
dictReleaseIterator(di);
}
}
inline bool FSimpleJsonEscapeCh(char ch) {
return (ch == '"' || ch == '\\');
}
inline bool FExtendedJsonEscapeCh(char ch) {
return ch <= 0x1F;
}
sds writeJsonValue(sds output, const char *valIn, size_t cchIn) {
const char *val = valIn;
size_t cch = cchIn;
int cchEscapeExtra = 0;
// First scan for escaped chars
for (size_t ich = 0; ich < cchIn; ++ich) {
if (FSimpleJsonEscapeCh(valIn[ich])) {
++cchEscapeExtra;
} else if (FExtendedJsonEscapeCh(valIn[ich])) {
cchEscapeExtra += 5;
}
}
if (cchEscapeExtra > 0) {
size_t ichDst = 0;
sds dst = sdsnewlen(SDS_NOINIT, cchIn+cchEscapeExtra);
for (size_t ich = 0; ich < cchIn; ++ich) {
switch (valIn[ich]) {
case '"':
dst[ichDst++] = '\\'; dst[ichDst++] = '"';
break;
case '\\':
dst[ichDst++] = '\\'; dst[ichDst++] = '\\';
break;
default:
serverAssert(!FSimpleJsonEscapeCh(valIn[ich]));
if (FExtendedJsonEscapeCh(valIn[ich])) {
dst[ichDst++] = '\\'; dst[ichDst++] = 'u';
sprintf(dst + ichDst, "%4x", valIn[ich]);
ichDst += 4;
} else {
dst[ichDst++] = valIn[ich];
}
break;
}
}
val = (const char*)dst;
serverAssert(ichDst == (cchIn+cchEscapeExtra));
cch = ichDst;
}
output = sdscat(output, "\"");
output = sdscatlen(output, val, cch);
output = sdscat(output, "\"");
if (val != valIn)
sdsfree(val);
return output;
}
sds writeJsonValue(sds output, sds val) {
return writeJsonValue(output, (const char*)val, sdslen(val));
}
sds writeNestedHashAsJson(sds output, robj_roptr o) {
if (o->type == OBJ_STRING) {
output = writeJsonValue(output, (sds)szFromObj(o));
} else if (o->type == OBJ_LIST) {
unsigned char *zl = (unsigned char*)ptrFromObj(o);
output = sdscat(output, "[");
unsigned char *p = ziplistIndex(zl, ZIPLIST_HEAD);
bool fFirst = true;
while (p != nullptr) {
unsigned char *str;
unsigned int len;
long long lval;
if (ziplistGet(p, &str, &len, &lval)) {
char rgT[128];
if (str == nullptr) {
len = ll2string(rgT, 128, lval);
str = (unsigned char*)rgT;
}
if (!fFirst)
output = sdscat(output, ",");
fFirst = false;
output = writeJsonValue(output, (const char*)str, len);
}
p = ziplistNext(zl, p);
}
output = sdscat(output, "]");
} else {
output = sdscat(output, "{");
dictIterator *di = dictGetIterator((dict*)ptrFromObj(o));
dictEntry *de;
bool fFirst = true;
while ((de = dictNext(di))) {
robj_roptr oT = (robj*)dictGetVal(de);
if (!fFirst)
output = sdscat(output, ",");
fFirst = false;
output = writeJsonValue(output, (sds)dictGetKey(de));
output = sdscat(output, " : ");
output = writeNestedHashAsJson(output, oT);
}
dictReleaseIterator(di);
output = sdscat(output, "}");
}
return output;
}
void nhsetCommand(client *c) {
if (c->argc < 3)
throw shared.syntaxerr;
robj *val = c->argv[2];
if (c->argc > 3) {
// Its a list, we'll store as a ziplist
val = createZiplistObject();
for (int iarg = 2; iarg < c->argc; ++iarg) {
sds arg = (sds)szFromObj(c->argv[iarg]);
val->m_ptr = ziplistPush((unsigned char*)ptrFromObj(val), (unsigned char*)arg, sdslen(arg), ZIPLIST_TAIL);
}
}
try {
if (setWithKey(c->db, c->argv[1], val, true)) {
addReplyLongLong(c, 1); // we replaced a value
} else {
addReplyLongLong(c, 0); // we added a new value
}
} catch (...) {
if (val != c->argv[2])
decrRefCount(val);
throw;
}
if (val != c->argv[2])
decrRefCount(val);
}
void nhgetCommand(client *c) {
if (c->argc != 2 && c->argc != 3)
throw shared.syntaxerr;
bool fJson = false;
int argOffset = 0;
if (c->argc == 3) {
argOffset++;
if (strcasecmp(szFromObj(c->argv[1]), "json") == 0) {
fJson = true;
} else if (strcasecmp(szFromObj(c->argv[1]), "resp") != 0) {
throw shared.syntaxerr;
}
}
robj *o = fetchFromKey(c->db, c->argv[argOffset + 1]);
if (fJson) {
sds val = writeNestedHashAsJson(sdsnew(nullptr), o);
addReplyBulkSds(c, val);
} else {
writeNestedHashToClient(c, o);
}
}

6
src/t_nhash.h Normal file
View File

@ -0,0 +1,6 @@
#pragma once
void freeNestedHashObject(robj_roptr o);
void nhsetCommand(client *c);
void nhgetCommand(client *c);

View File

@ -28,6 +28,7 @@ set ::all_tests {
unit/type/hash
unit/type/stream
unit/type/stream-cgroups
unit/type/nestedhash
unit/sort
unit/expire
unit/other