initial build

This commit is contained in:
Kenneth Cheng 2020-04-30 19:51:09 +08:00
parent 40a63c08b3
commit e13d9a4f58
30 changed files with 4453 additions and 0 deletions

5
README.md Normal file
View File

@ -0,0 +1,5 @@
# An Example Redis Module
This project is a simple redis module demonstrating basic API usage and `librmutil`.
You can treat it as a basic module template. See the project's [README](../README.md) for more details.

509
redismodule.h Normal file
View File

@ -0,0 +1,509 @@
#ifndef REDISMODULE_H
#define REDISMODULE_H
#include <sys/types.h>
#include <stdint.h>
#include <stdio.h>
/* ---------------- Defines common between core and modules --------------- */
/* Error status return values. */
#define REDISMODULE_OK 0
#define REDISMODULE_ERR 1
/* API versions. */
#define REDISMODULE_APIVER_1 1
/* API flags and constants */
#define REDISMODULE_READ (1<<0)
#define REDISMODULE_WRITE (1<<1)
#define REDISMODULE_LIST_HEAD 0
#define REDISMODULE_LIST_TAIL 1
/* Key types. */
#define REDISMODULE_KEYTYPE_EMPTY 0
#define REDISMODULE_KEYTYPE_STRING 1
#define REDISMODULE_KEYTYPE_LIST 2
#define REDISMODULE_KEYTYPE_HASH 3
#define REDISMODULE_KEYTYPE_SET 4
#define REDISMODULE_KEYTYPE_ZSET 5
#define REDISMODULE_KEYTYPE_MODULE 6
/* Reply types. */
#define REDISMODULE_REPLY_UNKNOWN -1
#define REDISMODULE_REPLY_STRING 0
#define REDISMODULE_REPLY_ERROR 1
#define REDISMODULE_REPLY_INTEGER 2
#define REDISMODULE_REPLY_ARRAY 3
#define REDISMODULE_REPLY_NULL 4
/* Postponed array length. */
#define REDISMODULE_POSTPONED_ARRAY_LEN -1
/* Expire */
#define REDISMODULE_NO_EXPIRE -1
/* Sorted set API flags. */
#define REDISMODULE_ZADD_XX (1<<0)
#define REDISMODULE_ZADD_NX (1<<1)
#define REDISMODULE_ZADD_ADDED (1<<2)
#define REDISMODULE_ZADD_UPDATED (1<<3)
#define REDISMODULE_ZADD_NOP (1<<4)
/* Hash API flags. */
#define REDISMODULE_HASH_NONE 0
#define REDISMODULE_HASH_NX (1<<0)
#define REDISMODULE_HASH_XX (1<<1)
#define REDISMODULE_HASH_CFIELDS (1<<2)
#define REDISMODULE_HASH_EXISTS (1<<3)
/* Context Flags: Info about the current context returned by
* RM_GetContextFlags(). */
/* The command is running in the context of a Lua script */
#define REDISMODULE_CTX_FLAGS_LUA (1<<0)
/* The command is running inside a Redis transaction */
#define REDISMODULE_CTX_FLAGS_MULTI (1<<1)
/* The instance is a master */
#define REDISMODULE_CTX_FLAGS_MASTER (1<<2)
/* The instance is a slave */
#define REDISMODULE_CTX_FLAGS_SLAVE (1<<3)
/* The instance is read-only (usually meaning it's a slave as well) */
#define REDISMODULE_CTX_FLAGS_READONLY (1<<4)
/* The instance is running in cluster mode */
#define REDISMODULE_CTX_FLAGS_CLUSTER (1<<5)
/* The instance has AOF enabled */
#define REDISMODULE_CTX_FLAGS_AOF (1<<6)
/* The instance has RDB enabled */
#define REDISMODULE_CTX_FLAGS_RDB (1<<7)
/* The instance has Maxmemory set */
#define REDISMODULE_CTX_FLAGS_MAXMEMORY (1<<8)
/* Maxmemory is set and has an eviction policy that may delete keys */
#define REDISMODULE_CTX_FLAGS_EVICT (1<<9)
/* Redis is out of memory according to the maxmemory flag. */
#define REDISMODULE_CTX_FLAGS_OOM (1<<10)
/* Less than 25% of memory available according to maxmemory. */
#define REDISMODULE_CTX_FLAGS_OOM_WARNING (1<<11)
#define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */
#define REDISMODULE_NOTIFY_STRING (1<<3) /* $ */
#define REDISMODULE_NOTIFY_LIST (1<<4) /* l */
#define REDISMODULE_NOTIFY_SET (1<<5) /* s */
#define REDISMODULE_NOTIFY_HASH (1<<6) /* h */
#define REDISMODULE_NOTIFY_ZSET (1<<7) /* z */
#define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */
#define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */
#define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */
#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */
/* A special pointer that we can use between the core and the module to signal
* field deletion, and that is impossible to be a valid pointer. */
#define REDISMODULE_HASH_DELETE ((RedisModuleString*)(long)1)
/* Error messages. */
#define REDISMODULE_ERRORMSG_WRONGTYPE "WRONGTYPE Operation against a key holding the wrong kind of value"
#define REDISMODULE_POSITIVE_INFINITE (1.0/0.0)
#define REDISMODULE_NEGATIVE_INFINITE (-1.0/0.0)
/* Cluster API defines. */
#define REDISMODULE_NODE_ID_LEN 40
#define REDISMODULE_NODE_MYSELF (1<<0)
#define REDISMODULE_NODE_MASTER (1<<1)
#define REDISMODULE_NODE_SLAVE (1<<2)
#define REDISMODULE_NODE_PFAIL (1<<3)
#define REDISMODULE_NODE_FAIL (1<<4)
#define REDISMODULE_NODE_NOFAILOVER (1<<5)
#define REDISMODULE_CLUSTER_FLAG_NONE 0
#define REDISMODULE_CLUSTER_FLAG_NO_FAILOVER (1<<1)
#define REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION (1<<2)
#define REDISMODULE_NOT_USED(V) ((void) V)
/* This type represents a timer handle, and is returned when a timer is
* registered and used in order to invalidate a timer. It's just a 64 bit
* number, because this is how each timer is represented inside the radix tree
* of timers that are going to expire, sorted by expire time. */
typedef uint64_t RedisModuleTimerID;
/* ------------------------- End of common defines ------------------------ */
#ifndef REDISMODULE_CORE
typedef long long mstime_t;
/* Incomplete structures for compiler checks but opaque access. */
typedef struct RedisModuleCtx RedisModuleCtx;
typedef struct RedisModuleKey RedisModuleKey;
typedef struct RedisModuleString RedisModuleString;
typedef struct RedisModuleCallReply RedisModuleCallReply;
typedef struct RedisModuleIO RedisModuleIO;
typedef struct RedisModuleType RedisModuleType;
typedef struct RedisModuleDigest RedisModuleDigest;
typedef struct RedisModuleBlockedClient RedisModuleBlockedClient;
typedef struct RedisModuleClusterInfo RedisModuleClusterInfo;
typedef struct RedisModuleDict RedisModuleDict;
typedef struct RedisModuleDictIter RedisModuleDictIter;
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value);
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value);
typedef void (*RedisModuleTypeFreeFunc)(void *value);
typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
#define REDISMODULE_TYPE_METHOD_VERSION 1
typedef struct RedisModuleTypeMethods {
uint64_t version;
RedisModuleTypeLoadFunc rdb_load;
RedisModuleTypeSaveFunc rdb_save;
RedisModuleTypeRewriteFunc aof_rewrite;
RedisModuleTypeMemUsageFunc mem_usage;
RedisModuleTypeDigestFunc digest;
RedisModuleTypeFreeFunc free;
} RedisModuleTypeMethods;
#define REDISMODULE_GET_API(name) \
RedisModule_GetApi("RedisModule_" #name, ((void **)&RedisModule_ ## name))
#define REDISMODULE_API_FUNC(x) (*x)
void *REDISMODULE_API_FUNC(RedisModule_Alloc)(size_t bytes);
void *REDISMODULE_API_FUNC(RedisModule_Realloc)(void *ptr, size_t bytes);
void REDISMODULE_API_FUNC(RedisModule_Free)(void *ptr);
void *REDISMODULE_API_FUNC(RedisModule_Calloc)(size_t nmemb, size_t size);
char *REDISMODULE_API_FUNC(RedisModule_Strdup)(const char *str);
int REDISMODULE_API_FUNC(RedisModule_GetApi)(const char *, void *);
int REDISMODULE_API_FUNC(RedisModule_CreateCommand)(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep);
void REDISMODULE_API_FUNC(RedisModule_SetModuleAttribs)(RedisModuleCtx *ctx, const char *name, int ver, int apiver);
int REDISMODULE_API_FUNC(RedisModule_IsModuleNameBusy)(const char *name);
int REDISMODULE_API_FUNC(RedisModule_WrongArity)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithLongLong)(RedisModuleCtx *ctx, long long ll);
int REDISMODULE_API_FUNC(RedisModule_GetSelectedDb)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_SelectDb)(RedisModuleCtx *ctx, int newid);
void *REDISMODULE_API_FUNC(RedisModule_OpenKey)(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode);
void REDISMODULE_API_FUNC(RedisModule_CloseKey)(RedisModuleKey *kp);
int REDISMODULE_API_FUNC(RedisModule_KeyType)(RedisModuleKey *kp);
size_t REDISMODULE_API_FUNC(RedisModule_ValueLength)(RedisModuleKey *kp);
int REDISMODULE_API_FUNC(RedisModule_ListPush)(RedisModuleKey *kp, int where, RedisModuleString *ele);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_ListPop)(RedisModuleKey *key, int where);
RedisModuleCallReply *REDISMODULE_API_FUNC(RedisModule_Call)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...);
const char *REDISMODULE_API_FUNC(RedisModule_CallReplyProto)(RedisModuleCallReply *reply, size_t *len);
void REDISMODULE_API_FUNC(RedisModule_FreeCallReply)(RedisModuleCallReply *reply);
int REDISMODULE_API_FUNC(RedisModule_CallReplyType)(RedisModuleCallReply *reply);
long long REDISMODULE_API_FUNC(RedisModule_CallReplyInteger)(RedisModuleCallReply *reply);
size_t REDISMODULE_API_FUNC(RedisModule_CallReplyLength)(RedisModuleCallReply *reply);
RedisModuleCallReply *REDISMODULE_API_FUNC(RedisModule_CallReplyArrayElement)(RedisModuleCallReply *reply, size_t idx);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateString)(RedisModuleCtx *ctx, const char *ptr, size_t len);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongLong)(RedisModuleCtx *ctx, long long ll);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromString)(RedisModuleCtx *ctx, const RedisModuleString *str);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...);
void REDISMODULE_API_FUNC(RedisModule_FreeString)(RedisModuleCtx *ctx, RedisModuleString *str);
const char *REDISMODULE_API_FUNC(RedisModule_StringPtrLen)(const RedisModuleString *str, size_t *len);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithError)(RedisModuleCtx *ctx, const char *err);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithSimpleString)(RedisModuleCtx *ctx, const char *msg);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithArray)(RedisModuleCtx *ctx, long len);
void REDISMODULE_API_FUNC(RedisModule_ReplySetArrayLength)(RedisModuleCtx *ctx, long len);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithStringBuffer)(RedisModuleCtx *ctx, const char *buf, size_t len);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithString)(RedisModuleCtx *ctx, RedisModuleString *str);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithNull)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithDouble)(RedisModuleCtx *ctx, double d);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithCallReply)(RedisModuleCtx *ctx, RedisModuleCallReply *reply);
int REDISMODULE_API_FUNC(RedisModule_StringToLongLong)(const RedisModuleString *str, long long *ll);
int REDISMODULE_API_FUNC(RedisModule_StringToDouble)(const RedisModuleString *str, double *d);
void REDISMODULE_API_FUNC(RedisModule_AutoMemory)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_Replicate)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...);
int REDISMODULE_API_FUNC(RedisModule_ReplicateVerbatim)(RedisModuleCtx *ctx);
const char *REDISMODULE_API_FUNC(RedisModule_CallReplyStringPtr)(RedisModuleCallReply *reply, size_t *len);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromCallReply)(RedisModuleCallReply *reply);
int REDISMODULE_API_FUNC(RedisModule_DeleteKey)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_UnlinkKey)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_StringSet)(RedisModuleKey *key, RedisModuleString *str);
char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *len, int mode);
int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen);
mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire);
int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr);
int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore);
int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score);
int REDISMODULE_API_FUNC(RedisModule_ZsetRem)(RedisModuleKey *key, RedisModuleString *ele, int *deleted);
void REDISMODULE_API_FUNC(RedisModule_ZsetRangeStop)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_ZsetFirstInScoreRange)(RedisModuleKey *key, double min, double max, int minex, int maxex);
int REDISMODULE_API_FUNC(RedisModule_ZsetLastInScoreRange)(RedisModuleKey *key, double min, double max, int minex, int maxex);
int REDISMODULE_API_FUNC(RedisModule_ZsetFirstInLexRange)(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max);
int REDISMODULE_API_FUNC(RedisModule_ZsetLastInLexRange)(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_ZsetRangeCurrentElement)(RedisModuleKey *key, double *score);
int REDISMODULE_API_FUNC(RedisModule_ZsetRangeNext)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_ZsetRangePrev)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_ZsetRangeEndReached)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_HashSet)(RedisModuleKey *key, int flags, ...);
int REDISMODULE_API_FUNC(RedisModule_HashGet)(RedisModuleKey *key, int flags, ...);
int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx);
void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos);
unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx);
void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes);
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods);
int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value);
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key);
void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key);
void REDISMODULE_API_FUNC(RedisModule_SaveUnsigned)(RedisModuleIO *io, uint64_t value);
uint64_t REDISMODULE_API_FUNC(RedisModule_LoadUnsigned)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_SaveSigned)(RedisModuleIO *io, int64_t value);
int64_t REDISMODULE_API_FUNC(RedisModule_LoadSigned)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_EmitAOF)(RedisModuleIO *io, const char *cmdname, const char *fmt, ...);
void REDISMODULE_API_FUNC(RedisModule_SaveString)(RedisModuleIO *io, RedisModuleString *s);
void REDISMODULE_API_FUNC(RedisModule_SaveStringBuffer)(RedisModuleIO *io, const char *str, size_t len);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_LoadString)(RedisModuleIO *io);
char *REDISMODULE_API_FUNC(RedisModule_LoadStringBuffer)(RedisModuleIO *io, size_t *lenptr);
void REDISMODULE_API_FUNC(RedisModule_SaveDouble)(RedisModuleIO *io, double value);
double REDISMODULE_API_FUNC(RedisModule_LoadDouble)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value);
float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...);
void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...);
int REDISMODULE_API_FUNC(RedisModule_StringAppendBuffer)(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len);
void REDISMODULE_API_FUNC(RedisModule_RetainString)(RedisModuleCtx *ctx, RedisModuleString *str);
int REDISMODULE_API_FUNC(RedisModule_StringCompare)(RedisModuleString *a, RedisModuleString *b);
RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetContextFromIO)(RedisModuleIO *io);
long long REDISMODULE_API_FUNC(RedisModule_Milliseconds)(void);
void REDISMODULE_API_FUNC(RedisModule_DigestAddStringBuffer)(RedisModuleDigest *md, unsigned char *ele, size_t len);
void REDISMODULE_API_FUNC(RedisModule_DigestAddLongLong)(RedisModuleDigest *md, long long ele);
void REDISMODULE_API_FUNC(RedisModule_DigestEndSequence)(RedisModuleDigest *md);
RedisModuleDict *REDISMODULE_API_FUNC(RedisModule_CreateDict)(RedisModuleCtx *ctx);
void REDISMODULE_API_FUNC(RedisModule_FreeDict)(RedisModuleCtx *ctx, RedisModuleDict *d);
uint64_t REDISMODULE_API_FUNC(RedisModule_DictSize)(RedisModuleDict *d);
int REDISMODULE_API_FUNC(RedisModule_DictSetC)(RedisModuleDict *d, void *key, size_t keylen, void *ptr);
int REDISMODULE_API_FUNC(RedisModule_DictReplaceC)(RedisModuleDict *d, void *key, size_t keylen, void *ptr);
int REDISMODULE_API_FUNC(RedisModule_DictSet)(RedisModuleDict *d, RedisModuleString *key, void *ptr);
int REDISMODULE_API_FUNC(RedisModule_DictReplace)(RedisModuleDict *d, RedisModuleString *key, void *ptr);
void *REDISMODULE_API_FUNC(RedisModule_DictGetC)(RedisModuleDict *d, void *key, size_t keylen, int *nokey);
void *REDISMODULE_API_FUNC(RedisModule_DictGet)(RedisModuleDict *d, RedisModuleString *key, int *nokey);
int REDISMODULE_API_FUNC(RedisModule_DictDelC)(RedisModuleDict *d, void *key, size_t keylen, void *oldval);
int REDISMODULE_API_FUNC(RedisModule_DictDel)(RedisModuleDict *d, RedisModuleString *key, void *oldval);
RedisModuleDictIter *REDISMODULE_API_FUNC(RedisModule_DictIteratorStartC)(RedisModuleDict *d, const char *op, void *key, size_t keylen);
RedisModuleDictIter *REDISMODULE_API_FUNC(RedisModule_DictIteratorStart)(RedisModuleDict *d, const char *op, RedisModuleString *key);
void REDISMODULE_API_FUNC(RedisModule_DictIteratorStop)(RedisModuleDictIter *di);
int REDISMODULE_API_FUNC(RedisModule_DictIteratorReseekC)(RedisModuleDictIter *di, const char *op, void *key, size_t keylen);
int REDISMODULE_API_FUNC(RedisModule_DictIteratorReseek)(RedisModuleDictIter *di, const char *op, RedisModuleString *key);
void *REDISMODULE_API_FUNC(RedisModule_DictNextC)(RedisModuleDictIter *di, size_t *keylen, void **dataptr);
void *REDISMODULE_API_FUNC(RedisModule_DictPrevC)(RedisModuleDictIter *di, size_t *keylen, void **dataptr);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_DictNext)(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_DictPrev)(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr);
int REDISMODULE_API_FUNC(RedisModule_DictCompareC)(RedisModuleDictIter *di, const char *op, void *key, size_t keylen);
int REDISMODULE_API_FUNC(RedisModule_DictCompare)(RedisModuleDictIter *di, const char *op, RedisModuleString *key);
/* Experimental APIs */
#ifdef REDISMODULE_EXPERIMENTAL_API
#define REDISMODULE_EXPERIMENTAL_API_VERSION 3
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms);
int REDISMODULE_API_FUNC(RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata);
int REDISMODULE_API_FUNC(RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_IsBlockedTimeoutRequest)(RedisModuleCtx *ctx);
void *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientPrivateData)(RedisModuleCtx *ctx);
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientHandle)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_AbortBlock)(RedisModuleBlockedClient *bc);
RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetThreadSafeContext)(RedisModuleBlockedClient *bc);
void REDISMODULE_API_FUNC(RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx);
void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx);
void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb);
int REDISMODULE_API_FUNC(RedisModule_BlockedClientDisconnected)(RedisModuleCtx *ctx);
void REDISMODULE_API_FUNC(RedisModule_RegisterClusterMessageReceiver)(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback);
int REDISMODULE_API_FUNC(RedisModule_SendClusterMessage)(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len);
int REDISMODULE_API_FUNC(RedisModule_GetClusterNodeInfo)(RedisModuleCtx *ctx, const char *id, char *ip, char *master_id, int *port, int *flags);
char **REDISMODULE_API_FUNC(RedisModule_GetClusterNodesList)(RedisModuleCtx *ctx, size_t *numnodes);
void REDISMODULE_API_FUNC(RedisModule_FreeClusterNodesList)(char **ids);
RedisModuleTimerID REDISMODULE_API_FUNC(RedisModule_CreateTimer)(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data);
int REDISMODULE_API_FUNC(RedisModule_StopTimer)(RedisModuleCtx *ctx, RedisModuleTimerID id, void **data);
int REDISMODULE_API_FUNC(RedisModule_GetTimerInfo)(RedisModuleCtx *ctx, RedisModuleTimerID id, uint64_t *remaining, void **data);
const char *REDISMODULE_API_FUNC(RedisModule_GetMyClusterID)(void);
size_t REDISMODULE_API_FUNC(RedisModule_GetClusterSize)(void);
void REDISMODULE_API_FUNC(RedisModule_GetRandomBytes)(unsigned char *dst, size_t len);
void REDISMODULE_API_FUNC(RedisModule_GetRandomHexChars)(char *dst, size_t len);
void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback);
void REDISMODULE_API_FUNC(RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags);
#endif
/* This is included inline inside each Redis module. */
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) __attribute__((unused));
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
void *getapifuncptr = ((void**)ctx)[0];
RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;
REDISMODULE_GET_API(Alloc);
REDISMODULE_GET_API(Calloc);
REDISMODULE_GET_API(Free);
REDISMODULE_GET_API(Realloc);
REDISMODULE_GET_API(Strdup);
REDISMODULE_GET_API(CreateCommand);
REDISMODULE_GET_API(SetModuleAttribs);
REDISMODULE_GET_API(IsModuleNameBusy);
REDISMODULE_GET_API(WrongArity);
REDISMODULE_GET_API(ReplyWithLongLong);
REDISMODULE_GET_API(ReplyWithError);
REDISMODULE_GET_API(ReplyWithSimpleString);
REDISMODULE_GET_API(ReplyWithArray);
REDISMODULE_GET_API(ReplySetArrayLength);
REDISMODULE_GET_API(ReplyWithStringBuffer);
REDISMODULE_GET_API(ReplyWithString);
REDISMODULE_GET_API(ReplyWithNull);
REDISMODULE_GET_API(ReplyWithCallReply);
REDISMODULE_GET_API(ReplyWithDouble);
REDISMODULE_GET_API(ReplySetArrayLength);
REDISMODULE_GET_API(GetSelectedDb);
REDISMODULE_GET_API(SelectDb);
REDISMODULE_GET_API(OpenKey);
REDISMODULE_GET_API(CloseKey);
REDISMODULE_GET_API(KeyType);
REDISMODULE_GET_API(ValueLength);
REDISMODULE_GET_API(ListPush);
REDISMODULE_GET_API(ListPop);
REDISMODULE_GET_API(StringToLongLong);
REDISMODULE_GET_API(StringToDouble);
REDISMODULE_GET_API(Call);
REDISMODULE_GET_API(CallReplyProto);
REDISMODULE_GET_API(FreeCallReply);
REDISMODULE_GET_API(CallReplyInteger);
REDISMODULE_GET_API(CallReplyType);
REDISMODULE_GET_API(CallReplyLength);
REDISMODULE_GET_API(CallReplyArrayElement);
REDISMODULE_GET_API(CallReplyStringPtr);
REDISMODULE_GET_API(CreateStringFromCallReply);
REDISMODULE_GET_API(CreateString);
REDISMODULE_GET_API(CreateStringFromLongLong);
REDISMODULE_GET_API(CreateStringFromString);
REDISMODULE_GET_API(CreateStringPrintf);
REDISMODULE_GET_API(FreeString);
REDISMODULE_GET_API(StringPtrLen);
REDISMODULE_GET_API(AutoMemory);
REDISMODULE_GET_API(Replicate);
REDISMODULE_GET_API(ReplicateVerbatim);
REDISMODULE_GET_API(DeleteKey);
REDISMODULE_GET_API(UnlinkKey);
REDISMODULE_GET_API(StringSet);
REDISMODULE_GET_API(StringDMA);
REDISMODULE_GET_API(StringTruncate);
REDISMODULE_GET_API(GetExpire);
REDISMODULE_GET_API(SetExpire);
REDISMODULE_GET_API(ZsetAdd);
REDISMODULE_GET_API(ZsetIncrby);
REDISMODULE_GET_API(ZsetScore);
REDISMODULE_GET_API(ZsetRem);
REDISMODULE_GET_API(ZsetRangeStop);
REDISMODULE_GET_API(ZsetFirstInScoreRange);
REDISMODULE_GET_API(ZsetLastInScoreRange);
REDISMODULE_GET_API(ZsetFirstInLexRange);
REDISMODULE_GET_API(ZsetLastInLexRange);
REDISMODULE_GET_API(ZsetRangeCurrentElement);
REDISMODULE_GET_API(ZsetRangeNext);
REDISMODULE_GET_API(ZsetRangePrev);
REDISMODULE_GET_API(ZsetRangeEndReached);
REDISMODULE_GET_API(HashSet);
REDISMODULE_GET_API(HashGet);
REDISMODULE_GET_API(IsKeysPositionRequest);
REDISMODULE_GET_API(KeyAtPos);
REDISMODULE_GET_API(GetClientId);
REDISMODULE_GET_API(GetContextFlags);
REDISMODULE_GET_API(PoolAlloc);
REDISMODULE_GET_API(CreateDataType);
REDISMODULE_GET_API(ModuleTypeSetValue);
REDISMODULE_GET_API(ModuleTypeGetType);
REDISMODULE_GET_API(ModuleTypeGetValue);
REDISMODULE_GET_API(SaveUnsigned);
REDISMODULE_GET_API(LoadUnsigned);
REDISMODULE_GET_API(SaveSigned);
REDISMODULE_GET_API(LoadSigned);
REDISMODULE_GET_API(SaveString);
REDISMODULE_GET_API(SaveStringBuffer);
REDISMODULE_GET_API(LoadString);
REDISMODULE_GET_API(LoadStringBuffer);
REDISMODULE_GET_API(SaveDouble);
REDISMODULE_GET_API(LoadDouble);
REDISMODULE_GET_API(SaveFloat);
REDISMODULE_GET_API(LoadFloat);
REDISMODULE_GET_API(EmitAOF);
REDISMODULE_GET_API(Log);
REDISMODULE_GET_API(LogIOError);
REDISMODULE_GET_API(StringAppendBuffer);
REDISMODULE_GET_API(RetainString);
REDISMODULE_GET_API(StringCompare);
REDISMODULE_GET_API(GetContextFromIO);
REDISMODULE_GET_API(Milliseconds);
REDISMODULE_GET_API(DigestAddStringBuffer);
REDISMODULE_GET_API(DigestAddLongLong);
REDISMODULE_GET_API(DigestEndSequence);
REDISMODULE_GET_API(CreateDict);
REDISMODULE_GET_API(FreeDict);
REDISMODULE_GET_API(DictSize);
REDISMODULE_GET_API(DictSetC);
REDISMODULE_GET_API(DictReplaceC);
REDISMODULE_GET_API(DictSet);
REDISMODULE_GET_API(DictReplace);
REDISMODULE_GET_API(DictGetC);
REDISMODULE_GET_API(DictGet);
REDISMODULE_GET_API(DictDelC);
REDISMODULE_GET_API(DictDel);
REDISMODULE_GET_API(DictIteratorStartC);
REDISMODULE_GET_API(DictIteratorStart);
REDISMODULE_GET_API(DictIteratorStop);
REDISMODULE_GET_API(DictIteratorReseekC);
REDISMODULE_GET_API(DictIteratorReseek);
REDISMODULE_GET_API(DictNextC);
REDISMODULE_GET_API(DictPrevC);
REDISMODULE_GET_API(DictNext);
REDISMODULE_GET_API(DictPrev);
REDISMODULE_GET_API(DictCompare);
REDISMODULE_GET_API(DictCompareC);
#ifdef REDISMODULE_EXPERIMENTAL_API
REDISMODULE_GET_API(GetThreadSafeContext);
REDISMODULE_GET_API(FreeThreadSafeContext);
REDISMODULE_GET_API(ThreadSafeContextLock);
REDISMODULE_GET_API(ThreadSafeContextUnlock);
REDISMODULE_GET_API(BlockClient);
REDISMODULE_GET_API(UnblockClient);
REDISMODULE_GET_API(IsBlockedReplyRequest);
REDISMODULE_GET_API(IsBlockedTimeoutRequest);
REDISMODULE_GET_API(GetBlockedClientPrivateData);
REDISMODULE_GET_API(GetBlockedClientHandle);
REDISMODULE_GET_API(AbortBlock);
REDISMODULE_GET_API(SetDisconnectCallback);
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
REDISMODULE_GET_API(BlockedClientDisconnected);
REDISMODULE_GET_API(RegisterClusterMessageReceiver);
REDISMODULE_GET_API(SendClusterMessage);
REDISMODULE_GET_API(GetClusterNodeInfo);
REDISMODULE_GET_API(GetClusterNodesList);
REDISMODULE_GET_API(FreeClusterNodesList);
REDISMODULE_GET_API(CreateTimer);
REDISMODULE_GET_API(StopTimer);
REDISMODULE_GET_API(GetTimerInfo);
REDISMODULE_GET_API(GetMyClusterID);
REDISMODULE_GET_API(GetClusterSize);
REDISMODULE_GET_API(GetRandomBytes);
REDISMODULE_GET_API(GetRandomHexChars);
REDISMODULE_GET_API(SetClusterFlags);
#endif
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
RedisModule_SetModuleAttribs(ctx,name,ver,apiver);
return REDISMODULE_OK;
}
#else
/* Things only defined for the modules core, not exported to modules
* including this file. */
#define RedisModuleString robj
#endif /* REDISMODULE_CORE */
#endif /* REDISMOUDLE_H */

31
rmutil/Makefile Normal file
View File

@ -0,0 +1,31 @@
# set environment variable RM_INCLUDE_DIR to the location of redismodule.h
ifndef RM_INCLUDE_DIR
RM_INCLUDE_DIR=../
endif
CFLAGS ?= -g -fPIC -O3 -std=gnu99 -Wall -Wno-unused-function
CFLAGS += -I$(RM_INCLUDE_DIR)
CC=gcc
OBJS=util.o strings.o sds.o vector.o alloc.o periodic.o
all: librmutil.a
clean:
rm -rf *.o *.a
librmutil.a: $(OBJS)
ar rcs $@ $^
test_vector: test_vector.o vector.o
$(CC) -Wall -o $@ $^ -lc -lpthread -O0
@(sh -c ./$@)
.PHONY: test_vector
test_periodic: test_periodic.o periodic.o
$(CC) -Wall -o $@ $^ -lc -lpthread -O0
@(sh -c ./$@)
.PHONY: test_periodic
test: test_periodic test_vector
.PHONY: test

32
rmutil/alloc.c Normal file
View File

@ -0,0 +1,32 @@
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include "alloc.h"
/* A patched implementation of strdup that will use our patched calloc */
char *rmalloc_strndup(const char *s, size_t n) {
char *ret = calloc(n + 1, sizeof(char));
if (ret)
memcpy(ret, s, n);
return ret;
}
/*
* Re-patching RedisModule_Alloc and friends to the original malloc functions
*
* This function should be called if you are working with malloc-patched code
* outside of redis, usually for unit tests. Call it once when entering your unit
* tests' main().
*
* Since including "alloc.h" while defining REDIS_MODULE_TARGET
* replaces all malloc functions in redis with the RM_Alloc family of functions,
* when running that code outside of redis, your app will crash. This function
* patches the RM_Alloc functions back to the original mallocs. */
void RMUTil_InitAlloc() {
RedisModule_Alloc = malloc;
RedisModule_Realloc = realloc;
RedisModule_Calloc = calloc;
RedisModule_Free = free;
RedisModule_Strdup = strdup;
}

51
rmutil/alloc.h Normal file
View File

@ -0,0 +1,51 @@
#ifndef __RMUTIL_ALLOC__
#define __RMUTIL_ALLOC__
/* Automatic Redis Module Allocation functions monkey-patching.
*
* Including this file while REDIS_MODULE_TARGET is defined, will explicitly
* override malloc, calloc, realloc & free with RedisModule_Alloc,
* RedisModule_Callc, etc implementations, that allow Redis better control and
* reporting over allocations per module.
*
* You should include this file in all c files AS THE LAST INCLUDED FILE
*
* This only has effect when when compiling with the macro REDIS_MODULE_TARGET
* defined. The idea is that for unit tests it will not be defined, but for the
* module build target it will be.
*
*/
#include <stdlib.h>
#include <redismodule.h>
char *rmalloc_strndup(const char *s, size_t n);
#ifdef REDIS_MODULE_TARGET /* Set this when compiling your code as a module */
#define malloc(size) RedisModule_Alloc(size)
#define calloc(count, size) RedisModule_Calloc(count, size)
#define realloc(ptr, size) RedisModule_Realloc(ptr, size)
#define free(ptr) RedisModule_Free(ptr)
#ifdef strdup
#undef strdup
#endif
#define strdup(ptr) RedisModule_Strdup(ptr)
/* More overriding */
// needed to avoid calling strndup->malloc
#ifdef strndup
#undef strndup
#endif
#define strndup(s, n) rmalloc_strndup(s, n)
#else
#endif /* REDIS_MODULE_TARGET */
/* This function should be called if you are working with malloc-patched code
* outside of redis, usually for unit tests. Call it once when entering your unit
* tests' main() */
void RMUTil_InitAlloc();
#endif /* __RMUTIL_ALLOC__ */

107
rmutil/heap.c Normal file
View File

@ -0,0 +1,107 @@
#include "heap.h"
/* Byte-wise swap two items of size SIZE. */
#define SWAP(a, b, size) \
do \
{ \
register size_t __size = (size); \
register char *__a = (a), *__b = (b); \
do \
{ \
char __tmp = *__a; \
*__a++ = *__b; \
*__b++ = __tmp; \
} while (--__size > 0); \
} while (0)
inline char *__vector_GetPtr(Vector *v, size_t pos) {
return v->data + (pos * v->elemSize);
}
void __sift_up(Vector *v, size_t first, size_t last, int (*cmp)(void *, void *)) {
size_t len = last - first;
if (len > 1) {
len = (len - 2) / 2;
size_t ptr = first + len;
if (cmp(__vector_GetPtr(v, ptr), __vector_GetPtr(v, --last)) < 0) {
char t[v->elemSize];
memcpy(t, __vector_GetPtr(v, last), v->elemSize);
do {
memcpy(__vector_GetPtr(v, last), __vector_GetPtr(v, ptr), v->elemSize);
last = ptr;
if (len == 0)
break;
len = (len - 1) / 2;
ptr = first + len;
} while (cmp(__vector_GetPtr(v, ptr), t) < 0);
memcpy(__vector_GetPtr(v, last), t, v->elemSize);
}
}
}
void __sift_down(Vector *v, size_t first, size_t last, int (*cmp)(void *, void *), size_t start) {
// left-child of __start is at 2 * __start + 1
// right-child of __start is at 2 * __start + 2
size_t len = last - first;
size_t child = start - first;
if (len < 2 || (len - 2) / 2 < child)
return;
child = 2 * child + 1;
if ((child + 1) < len && cmp(__vector_GetPtr(v, first + child), __vector_GetPtr(v, first + child + 1)) < 0) {
// right-child exists and is greater than left-child
++child;
}
// check if we are in heap-order
if (cmp(__vector_GetPtr(v, first + child), __vector_GetPtr(v, start)) < 0)
// we are, __start is larger than it's largest child
return;
char top[v->elemSize];
memcpy(top, __vector_GetPtr(v, start), v->elemSize);
do {
// we are not in heap-order, swap the parent with it's largest child
memcpy(__vector_GetPtr(v, start), __vector_GetPtr(v, first + child), v->elemSize);
start = first + child;
if ((len - 2) / 2 < child)
break;
// recompute the child based off of the updated parent
child = 2 * child + 1;
if ((child + 1) < len && cmp(__vector_GetPtr(v, first + child), __vector_GetPtr(v, first + child + 1)) < 0) {
// right-child exists and is greater than left-child
++child;
}
// check if we are in heap-order
} while (cmp(__vector_GetPtr(v, first + child), top) >= 0);
memcpy(__vector_GetPtr(v, start), top, v->elemSize);
}
void Make_Heap(Vector *v, size_t first, size_t last, int (*cmp)(void *, void *)) {
if (last - first > 1) {
// start from the first parent, there is no need to consider children
for (int start = (last - first - 2) / 2; start >= 0; --start) {
__sift_down(v, first, last, cmp, first + start);
}
}
}
inline void Heap_Push(Vector *v, size_t first, size_t last, int (*cmp)(void *, void *)) {
__sift_up(v, first, last, cmp);
}
inline void Heap_Pop(Vector *v, size_t first, size_t last, int (*cmp)(void *, void *)) {
if (last - first > 1) {
SWAP(__vector_GetPtr(v, first), __vector_GetPtr(v, --last), v->elemSize);
__sift_down(v, first, last, cmp, first);
}
}

38
rmutil/heap.h Normal file
View File

@ -0,0 +1,38 @@
#ifndef __HEAP_H__
#define __HEAP_H__
#include "vector.h"
/* Make heap from range
* Rearranges the elements in the range [first,last) in such a way that they form a heap.
* A heap is a way to organize the elements of a range that allows for fast retrieval of the element with the highest
* value at any moment (with pop_heap), even repeatedly, while allowing for fast insertion of new elements (with
* push_heap).
* The element with the highest value is always pointed by first. The order of the other elements depends on the
* particular implementation, but it is consistent throughout all heap-related functions of this header.
* The elements are compared using cmp.
*/
void Make_Heap(Vector *v, size_t first, size_t last, int (*cmp)(void *, void *));
/* Push element into heap range
* Given a heap in the range [first,last-1), this function extends the range considered a heap to [first,last) by
* placing the value in (last-1) into its corresponding location within it.
* A range can be organized into a heap by calling make_heap. After that, its heap properties are preserved if elements
* are added and removed from it using push_heap and pop_heap, respectively.
*/
void Heap_Push(Vector *v, size_t first, size_t last, int (*cmp)(void *, void *));
/* Pop element from heap range
* Rearranges the elements in the heap range [first,last) in such a way that the part considered a heap is shortened
* by one: The element with the highest value is moved to (last-1).
* While the element with the highest value is moved from first to (last-1) (which now is out of the heap), the other
* elements are reorganized in such a way that the range [first,last-1) preserves the properties of a heap.
* A range can be organized into a heap by calling make_heap. After that, its heap properties are preserved if elements
* are added and removed from it using push_heap and pop_heap, respectively.
*/
void Heap_Pop(Vector *v, size_t first, size_t last, int (*cmp)(void *, void *));
#endif //__HEAP_H__

11
rmutil/logging.h Normal file
View File

@ -0,0 +1,11 @@
#ifndef __RMUTIL_LOGGING_H__
#define __RMUTIL_LOGGING_H__
/* Convenience macros for redis logging */
#define RM_LOG_DEBUG(ctx, ...) RedisModule_Log(ctx, "debug", __VA_ARGS__)
#define RM_LOG_VERBOSE(ctx, ...) RedisModule_Log(ctx, "verbose", __VA_ARGS__)
#define RM_LOG_NOTICE(ctx, ...) RedisModule_Log(ctx, "notice", __VA_ARGS__)
#define RM_LOG_WARNING(ctx, ...) RedisModule_Log(ctx, "warning", __VA_ARGS__)
#endif

88
rmutil/periodic.c Normal file
View File

@ -0,0 +1,88 @@
#define REDISMODULE_EXPERIMENTAL_API
#include "periodic.h"
#include <pthread.h>
#include <stdlib.h>
#include <errno.h>
typedef struct RMUtilTimer {
RMutilTimerFunc cb;
RMUtilTimerTerminationFunc onTerm;
void *privdata;
struct timespec interval;
pthread_t thread;
pthread_mutex_t lock;
pthread_cond_t cond;
} RMUtilTimer;
static struct timespec timespecAdd(struct timespec *a, struct timespec *b) {
struct timespec ret;
ret.tv_sec = a->tv_sec + b->tv_sec;
long long ns = a->tv_nsec + b->tv_nsec;
ret.tv_sec += ns / 1000000000;
ret.tv_nsec = ns % 1000000000;
return ret;
}
static void *rmutilTimer_Loop(void *ctx) {
RMUtilTimer *tm = ctx;
int rc = ETIMEDOUT;
struct timespec ts;
pthread_mutex_lock(&tm->lock);
while (rc != 0) {
clock_gettime(CLOCK_REALTIME, &ts);
struct timespec timeout = timespecAdd(&ts, &tm->interval);
if ((rc = pthread_cond_timedwait(&tm->cond, &tm->lock, &timeout)) == ETIMEDOUT) {
// Create a thread safe context if we're running inside redis
RedisModuleCtx *rctx = NULL;
if (RedisModule_GetThreadSafeContext) rctx = RedisModule_GetThreadSafeContext(NULL);
// call our callback...
tm->cb(rctx, tm->privdata);
// If needed - free the thread safe context.
// It's up to the user to decide whether automemory is active there
if (rctx) RedisModule_FreeThreadSafeContext(rctx);
}
if (rc == EINVAL) {
perror("Error waiting for condition");
break;
}
}
// call the termination callback if needed
if (tm->onTerm != NULL) {
tm->onTerm(tm->privdata);
}
// free resources associated with the timer
pthread_cond_destroy(&tm->cond);
free(tm);
return NULL;
}
/* set a new frequency for the timer. This will take effect AFTER the next trigger */
void RMUtilTimer_SetInterval(struct RMUtilTimer *t, struct timespec newInterval) {
t->interval = newInterval;
}
RMUtilTimer *RMUtil_NewPeriodicTimer(RMutilTimerFunc cb, RMUtilTimerTerminationFunc onTerm,
void *privdata, struct timespec interval) {
RMUtilTimer *ret = malloc(sizeof(*ret));
*ret = (RMUtilTimer){
.privdata = privdata, .interval = interval, .cb = cb, .onTerm = onTerm,
};
pthread_cond_init(&ret->cond, NULL);
pthread_mutex_init(&ret->lock, NULL);
pthread_create(&ret->thread, NULL, rmutilTimer_Loop, ret);
return ret;
}
int RMUtilTimer_Terminate(struct RMUtilTimer *t) {
return pthread_cond_signal(&t->cond);
}

46
rmutil/periodic.h Normal file
View File

@ -0,0 +1,46 @@
#ifndef RMUTIL_PERIODIC_H_
#define RMUTIL_PERIODIC_H_
#include <time.h>
#include <redismodule.h>
/** periodic.h - Utility periodic timer running a task repeatedly every given time interval */
/* RMUtilTimer - opaque context for the timer */
struct RMUtilTimer;
/* RMutilTimerFunc - callback type for timer tasks. The ctx is a thread-safe redis module context
* that should be locked/unlocked by the callback when running stuff against redis. privdata is
* pre-existing private data */
typedef void (*RMutilTimerFunc)(RedisModuleCtx *ctx, void *privdata);
typedef void (*RMUtilTimerTerminationFunc)(void *privdata);
/* Create and start a new periodic timer. Each timer has its own thread and can only be run and
* stopped once. The timer runs `cb` every `interval` with `privdata` passed to the callback. */
struct RMUtilTimer *RMUtil_NewPeriodicTimer(RMutilTimerFunc cb, RMUtilTimerTerminationFunc onTerm,
void *privdata, struct timespec interval);
/* set a new frequency for the timer. This will take effect AFTER the next trigger */
void RMUtilTimer_SetInterval(struct RMUtilTimer *t, struct timespec newInterval);
/* Stop the timer loop, call the termination callbck to free up any resources linked to the timer,
* and free the timer after stopping.
*
* This function doesn't wait for the thread to terminate, as it may cause a race condition if the
* timer's callback is waiting for the redis global lock.
* Instead you should make sure any resources are freed by the callback after the thread loop is
* finished.
*
* The timer is freed automatically, so the callback doesn't need to do anything about it.
* The callback gets the timer's associated privdata as its argument.
*
* If no callback is specified we do not free up privdata. If privdata is NULL we still call the
* callback, as it may log stuff or free global resources.
*/
int RMUtilTimer_Terminate(struct RMUtilTimer *t);
/* DEPRECATED - do not use this function (well now you can't), use terminate instead
Free the timer context. The caller should be responsible for freeing the private data at this
* point */
// void RMUtilTimer_Free(struct RMUtilTimer *t);
#endif

36
rmutil/priority_queue.c Normal file
View File

@ -0,0 +1,36 @@
#include "priority_queue.h"
#include "heap.h"
PriorityQueue *__newPriorityQueueSize(size_t elemSize, size_t cap, int (*cmp)(void *, void *)) {
PriorityQueue *pq = malloc(sizeof(PriorityQueue));
pq->v = __newVectorSize(elemSize, cap);
pq->cmp = cmp;
return pq;
}
inline size_t Priority_Queue_Size(PriorityQueue *pq) {
return Vector_Size(pq->v);
}
inline int Priority_Queue_Top(PriorityQueue *pq, void *ptr) {
return Vector_Get(pq->v, 0, ptr);
}
inline size_t __priority_Queue_PushPtr(PriorityQueue *pq, void *elem) {
size_t top = __vector_PushPtr(pq->v, elem);
Heap_Push(pq->v, 0, top, pq->cmp);
return top;
}
inline void Priority_Queue_Pop(PriorityQueue *pq) {
if (pq->v->top == 0) {
return;
}
Heap_Pop(pq->v, 0, pq->v->top, pq->cmp);
pq->v->top--;
}
void Priority_Queue_Free(PriorityQueue *pq) {
Vector_Free(pq->v);
free(pq);
}

55
rmutil/priority_queue.h Normal file
View File

@ -0,0 +1,55 @@
#ifndef __PRIORITY_QUEUE_H__
#define __PRIORITY_QUEUE_H__
#include "vector.h"
/* Priority queue
* Priority queues are designed such that its first element is always the greatest of the elements it contains.
* This context is similar to a heap, where elements can be inserted at any moment, and only the max heap element can be
* retrieved (the one at the top in the priority queue).
* Priority queues are implemented as Vectors. Elements are popped from the "back" of Vector, which is known as the top
* of the priority queue.
*/
typedef struct {
Vector *v;
int (*cmp)(void *, void *);
} PriorityQueue;
/* Construct priority queue
* Constructs a priority_queue container adaptor object.
*/
PriorityQueue *__newPriorityQueueSize(size_t elemSize, size_t cap, int (*cmp)(void *, void *));
#define NewPriorityQueue(type, cap, cmp) __newPriorityQueueSize(sizeof(type), cap, cmp)
/* Return size
* Returns the number of elements in the priority_queue.
*/
size_t Priority_Queue_Size(PriorityQueue *pq);
/* Access top element
* Copy the top element in the priority_queue to ptr.
* The top element is the element that compares higher in the priority_queue.
*/
int Priority_Queue_Top(PriorityQueue *pq, void *ptr);
/* Insert element
* Inserts a new element in the priority_queue.
*/
size_t __priority_Queue_PushPtr(PriorityQueue *pq, void *elem);
#define Priority_Queue_Push(pq, elem) __priority_Queue_PushPtr(pq, &(typeof(elem)){elem})
/* Remove top element
* Removes the element on top of the priority_queue, effectively reducing its size by one. The element removed is the
* one with the highest value.
* The value of this element can be retrieved before being popped by calling Priority_Queue_Top.
*/
void Priority_Queue_Pop(PriorityQueue *pq);
/* free the priority queue and the underlying data. Does not release its elements if
* they are pointers */
void Priority_Queue_Free(PriorityQueue *pq);
#endif //__PRIORITY_QUEUE_H__

1274
rmutil/sds.c Normal file

File diff suppressed because it is too large Load Diff

273
rmutil/sds.h Normal file
View File

@ -0,0 +1,273 @@
/* SDSLib 2.0 -- A C dynamic strings library
*
* Copyright (c) 2006-2015, Salvatore Sanfilippo <antirez at gmail dot com>
* Copyright (c) 2015, Oran Agra
* Copyright (c) 2015, Redis Labs, Inc
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __SDS_H
#define __SDS_H
#define SDS_MAX_PREALLOC (1024*1024)
#include <sys/types.h>
#include <stdarg.h>
#include <stdint.h>
typedef char *sds;
/* Note: sdshdr5 is never used, we just access the flags byte directly.
* However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* used */
uint8_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len; /* used */
uint16_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
#define SDS_TYPE_5 0
#define SDS_TYPE_8 1
#define SDS_TYPE_16 2
#define SDS_TYPE_32 3
#define SDS_TYPE_64 4
#define SDS_TYPE_MASK 7
#define SDS_TYPE_BITS 3
#define SDS_HDR_VAR(T,s) struct sdshdr##T *sh = (void*)((s)-(sizeof(struct sdshdr##T)));
#define SDS_HDR(T,s) ((struct sdshdr##T *)((s)-(sizeof(struct sdshdr##T))))
#define SDS_TYPE_5_LEN(f) ((f)>>SDS_TYPE_BITS)
static inline size_t sdslen(const sds s) {
unsigned char flags = s[-1];
switch(flags&SDS_TYPE_MASK) {
case SDS_TYPE_5:
return SDS_TYPE_5_LEN(flags);
case SDS_TYPE_8:
return SDS_HDR(8,s)->len;
case SDS_TYPE_16:
return SDS_HDR(16,s)->len;
case SDS_TYPE_32:
return SDS_HDR(32,s)->len;
case SDS_TYPE_64:
return SDS_HDR(64,s)->len;
}
return 0;
}
static inline size_t sdsavail(const sds s) {
unsigned char flags = s[-1];
switch(flags&SDS_TYPE_MASK) {
case SDS_TYPE_5: {
return 0;
}
case SDS_TYPE_8: {
SDS_HDR_VAR(8,s);
return sh->alloc - sh->len;
}
case SDS_TYPE_16: {
SDS_HDR_VAR(16,s);
return sh->alloc - sh->len;
}
case SDS_TYPE_32: {
SDS_HDR_VAR(32,s);
return sh->alloc - sh->len;
}
case SDS_TYPE_64: {
SDS_HDR_VAR(64,s);
return sh->alloc - sh->len;
}
}
return 0;
}
static inline void sdssetlen(sds s, size_t newlen) {
unsigned char flags = s[-1];
switch(flags&SDS_TYPE_MASK) {
case SDS_TYPE_5:
{
unsigned char *fp = ((unsigned char*)s)-1;
*fp = SDS_TYPE_5 | (newlen << SDS_TYPE_BITS);
}
break;
case SDS_TYPE_8:
SDS_HDR(8,s)->len = newlen;
break;
case SDS_TYPE_16:
SDS_HDR(16,s)->len = newlen;
break;
case SDS_TYPE_32:
SDS_HDR(32,s)->len = newlen;
break;
case SDS_TYPE_64:
SDS_HDR(64,s)->len = newlen;
break;
}
}
static inline void sdsinclen(sds s, size_t inc) {
unsigned char flags = s[-1];
switch(flags&SDS_TYPE_MASK) {
case SDS_TYPE_5:
{
unsigned char *fp = ((unsigned char*)s)-1;
unsigned char newlen = SDS_TYPE_5_LEN(flags)+inc;
*fp = SDS_TYPE_5 | (newlen << SDS_TYPE_BITS);
}
break;
case SDS_TYPE_8:
SDS_HDR(8,s)->len += inc;
break;
case SDS_TYPE_16:
SDS_HDR(16,s)->len += inc;
break;
case SDS_TYPE_32:
SDS_HDR(32,s)->len += inc;
break;
case SDS_TYPE_64:
SDS_HDR(64,s)->len += inc;
break;
}
}
/* sdsalloc() = sdsavail() + sdslen() */
static inline size_t sdsalloc(const sds s) {
unsigned char flags = s[-1];
switch(flags&SDS_TYPE_MASK) {
case SDS_TYPE_5:
return SDS_TYPE_5_LEN(flags);
case SDS_TYPE_8:
return SDS_HDR(8,s)->alloc;
case SDS_TYPE_16:
return SDS_HDR(16,s)->alloc;
case SDS_TYPE_32:
return SDS_HDR(32,s)->alloc;
case SDS_TYPE_64:
return SDS_HDR(64,s)->alloc;
}
return 0;
}
static inline void sdssetalloc(sds s, size_t newlen) {
unsigned char flags = s[-1];
switch(flags&SDS_TYPE_MASK) {
case SDS_TYPE_5:
/* Nothing to do, this type has no total allocation info. */
break;
case SDS_TYPE_8:
SDS_HDR(8,s)->alloc = newlen;
break;
case SDS_TYPE_16:
SDS_HDR(16,s)->alloc = newlen;
break;
case SDS_TYPE_32:
SDS_HDR(32,s)->alloc = newlen;
break;
case SDS_TYPE_64:
SDS_HDR(64,s)->alloc = newlen;
break;
}
}
sds sdsnewlen(const void *init, size_t initlen);
sds sdsnew(const char *init);
sds sdsempty(void);
sds sdsdup(const sds s);
void sdsfree(sds s);
sds sdsgrowzero(sds s, size_t len);
sds sdscatlen(sds s, const void *t, size_t len);
sds sdscat(sds s, const char *t);
sds sdscatsds(sds s, const sds t);
sds sdscpylen(sds s, const char *t, size_t len);
sds sdscpy(sds s, const char *t);
sds sdscatvprintf(sds s, const char *fmt, va_list ap);
#ifdef __GNUC__
sds sdscatprintf(sds s, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
#else
sds sdscatprintf(sds s, const char *fmt, ...);
#endif
sds sdscatfmt(sds s, char const *fmt, ...);
sds sdstrim(sds s, const char *cset);
void sdsrange(sds s, int start, int end);
void sdsupdatelen(sds s);
void sdsclear(sds s);
int sdscmp(const sds s1, const sds s2);
sds *sdssplitlen(const char *s, int len, const char *sep, int seplen, int *count);
void sdsfreesplitres(sds *tokens, int count);
void sdstolower(sds s);
void sdstoupper(sds s);
sds sdsfromlonglong(long long value);
sds sdscatrepr(sds s, const char *p, size_t len);
sds *sdssplitargs(const char *line, int *argc);
sds sdsmapchars(sds s, const char *from, const char *to, size_t setlen);
sds sdsjoin(char **argv, int argc, char *sep);
sds sdsjoinsds(sds *argv, int argc, const char *sep, size_t seplen);
/* Low level functions exposed to the user API */
sds sdsMakeRoomFor(sds s, size_t addlen);
void sdsIncrLen(sds s, int incr);
sds sdsRemoveFreeSpace(sds s);
size_t sdsAllocSize(sds s);
void *sdsAllocPtr(sds s);
/* Export the allocator used by SDS to the program using SDS.
* Sometimes the program SDS is linked to, may use a different set of
* allocators, but may want to allocate or free things that SDS will
* respectively free or allocate. */
void *sds_malloc(size_t size);
void *sds_realloc(void *ptr, size_t size);
void sds_free(void *ptr);
#ifdef REDIS_TEST
int sdsTest(int argc, char *argv[]);
#endif
#endif

47
rmutil/sdsalloc.h Normal file
View File

@ -0,0 +1,47 @@
/* SDSLib 2.0 -- A C dynamic strings library
*
* Copyright (c) 2006-2015, Salvatore Sanfilippo <antirez at gmail dot com>
* Copyright (c) 2015, Redis Labs, Inc
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/* SDS allocator selection.
*
* This file is used in order to change the SDS allocator at compile time.
* Just define the following defines to what you want to use. Also add
* the include of your alternate allocator if needed (not needed in order
* to use the default libc allocator). */
#if defined(__MACH__)
#include <stdlib.h>
#else
#include <malloc.h>
#endif
//#include "zmalloc.h"
#define s_malloc malloc
#define s_realloc realloc
#define s_free free

81
rmutil/strings.c Normal file
View File

@ -0,0 +1,81 @@
#include <string.h>
#include <sys/param.h>
#include <ctype.h>
#include "strings.h"
#include "alloc.h"
#include "sds.h"
// RedisModuleString *RMUtil_CreateFormattedString(RedisModuleCtx *ctx, const char *fmt, ...) {
// sds s = sdsempty();
// va_list ap;
// va_start(ap, fmt);
// s = sdscatvprintf(s, fmt, ap);
// va_end(ap);
// RedisModuleString *ret = RedisModule_CreateString(ctx, (const char *)s, sdslen(s));
// sdsfree(s);
// return ret;
// }
int RMUtil_StringEquals(RedisModuleString *s1, RedisModuleString *s2) {
const char *c1, *c2;
size_t l1, l2;
c1 = RedisModule_StringPtrLen(s1, &l1);
c2 = RedisModule_StringPtrLen(s2, &l2);
if (l1 != l2) return 0;
return strncmp(c1, c2, l1) == 0;
}
int RMUtil_StringEqualsC(RedisModuleString *s1, const char *s2) {
const char *c1;
size_t l1, l2 = strlen(s2);
c1 = RedisModule_StringPtrLen(s1, &l1);
if (l1 != l2) return 0;
return strncmp(c1, s2, l1) == 0;
}
int RMUtil_StringEqualsCaseC(RedisModuleString *s1, const char *s2) {
const char *c1;
size_t l1, l2 = strlen(s2);
c1 = RedisModule_StringPtrLen(s1, &l1);
if (l1 != l2) return 0;
return strncasecmp(c1, s2, l1) == 0;
}
void RMUtil_StringToLower(RedisModuleString *s) {
size_t l;
char *c = (char *)RedisModule_StringPtrLen(s, &l);
size_t i;
for (i = 0; i < l; i++) {
*c = tolower(*c);
++c;
}
}
void RMUtil_StringToUpper(RedisModuleString *s) {
size_t l;
char *c = (char *)RedisModule_StringPtrLen(s, &l);
size_t i;
for (i = 0; i < l; i++) {
*c = toupper(*c);
++c;
}
}
void RMUtil_StringConvert(RedisModuleString **rs, const char **ss, size_t n, int options) {
for (size_t ii = 0; ii < n; ++ii) {
const char *p = RedisModule_StringPtrLen(rs[ii], NULL);
if (options & RMUTIL_STRINGCONVERT_COPY) {
p = strdup(p);
}
ss[ii] = p;
}
}

38
rmutil/strings.h Normal file
View File

@ -0,0 +1,38 @@
#ifndef __RMUTIL_STRINGS_H__
#define __RMUTIL_STRINGS_H__
#include <redismodule.h>
/*
* Create a new RedisModuleString object from a printf-style format and arguments.
* Note that RedisModuleString objects CANNOT be used as formatting arguments.
*/
// DEPRECATED since it was added to the RedisModule API. Replaced with a macro below
// RedisModuleString *RMUtil_CreateFormattedString(RedisModuleCtx *ctx, const char *fmt, ...);
#define RMUtil_CreateFormattedString RedisModule_CreateStringPrintf
/* Return 1 if the two strings are equal. Case *sensitive* */
int RMUtil_StringEquals(RedisModuleString *s1, RedisModuleString *s2);
/* Return 1 if the string is equal to a C NULL terminated string. Case *sensitive* */
int RMUtil_StringEqualsC(RedisModuleString *s1, const char *s2);
/* Return 1 if the string is equal to a C NULL terminated string. Case *insensitive* */
int RMUtil_StringEqualsCaseC(RedisModuleString *s1, const char *s2);
/* Converts a redis string to lowercase in place without reallocating anything */
void RMUtil_StringToLower(RedisModuleString *s);
/* Converts a redis string to uppercase in place without reallocating anything */
void RMUtil_StringToUpper(RedisModuleString *s);
// If set, copy the strings using strdup rather than simply storing pointers.
#define RMUTIL_STRINGCONVERT_COPY 1
/**
* Convert one or more RedisModuleString objects into `const char*`.
* Both rs and ss are arrays, and should be of <n> length.
* Options may be 0 or `RMUTIL_STRINGCONVERT_COPY`
*/
void RMUtil_StringConvert(RedisModuleString **rs, const char **ss, size_t n, int options);
#endif

69
rmutil/test.h Normal file
View File

@ -0,0 +1,69 @@
#ifndef __TESTUTIL_H__
#define __TESTUTIL_H__
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
static int numTests = 0;
static int numAsserts = 0;
#define TESTFUNC(f) \
printf(" Testing %s\t\t", __STRING(f)); \
numTests++; \
fflush(stdout); \
if (f()) { \
printf(" %s FAILED!\n", __STRING(f)); \
exit(1); \
} else \
printf("[PASS]\n");
#define ASSERTM(expr, ...) \
if (!(expr)) { \
fprintf(stderr, "%s:%d: Assertion '%s' Failed: " __VA_ARGS__ "\n", __FILE__, __LINE__, \
__STRING(expr)); \
return -1; \
} \
numAsserts++;
#define ASSERT(expr) \
if (!(expr)) { \
fprintf(stderr, "%s:%d Assertion '%s' Failed\n", __FILE__, __LINE__, __STRING(expr)); \
return -1; \
} \
numAsserts++;
#define ASSERT_STRING_EQ(s1, s2) ASSERT(!strcmp(s1, s2));
#define ASSERT_EQUAL(x, y, ...) \
if (x != y) { \
fprintf(stderr, "%s:%d: ", __FILE__, __LINE__); \
fprintf(stderr, "%g != %g: " __VA_ARGS__ "\n", (double)x, (double)y); \
return -1; \
} \
numAsserts++;
#define FAIL(fmt, ...) \
{ \
fprintf(stderr, "%s:%d: FAIL: " fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \
return -1; \
}
#define RETURN_TEST_SUCCESS return 0;
#define TEST_CASE(x, block) \
int x { \
block; \
return 0 \
}
#define PRINT_TEST_SUMMARY printf("\nTotal: %d tests and %d assertions OK\n", numTests, numAsserts);
#define TEST_MAIN(body) \
int main(int argc, char **argv) { \
printf("Starting Test '%s'...\n", argv[0]); \
body; \
PRINT_TEST_SUMMARY; \
printf("\n--------------------\n\n"); \
return 0; \
}
#endif

38
rmutil/test_heap.c Normal file
View File

@ -0,0 +1,38 @@
#include <stdio.h>
#include "heap.h"
#include "assert.h"
int cmp(void *a, void *b) {
int *__a = (int *) a;
int *__b = (int *) b;
return *__a - *__b;
}
int main(int argc, char **argv) {
int myints[] = {10, 20, 30, 5, 15};
Vector *v = NewVector(int, 5);
for (int i = 0; i < 5; i++) {
Vector_Push(v, myints[i]);
}
Make_Heap(v, 0, v->top, cmp);
int n;
Vector_Get(v, 0, &n);
assert(30 == n);
Heap_Pop(v, 0, v->top, cmp);
v->top = 4;
Vector_Get(v, 0, &n);
assert(20 == n);
Vector_Push(v, 99);
Heap_Push(v, 0, v->top, cmp);
Vector_Get(v, 0, &n);
assert(99 == n);
Vector_Free(v);
printf("PASS!\n");
return 0;
}

26
rmutil/test_periodic.c Normal file
View File

@ -0,0 +1,26 @@
#include <stdio.h>
#include <redismodule.h>
#include <unistd.h>
#include "periodic.h"
#include "assert.h"
#include "test.h"
void timerCb(RedisModuleCtx *ctx, void *p) {
int *x = p;
(*x)++;
}
int testPeriodic() {
int x = 0;
struct RMUtilTimer *tm = RMUtil_NewPeriodicTimer(
timerCb, NULL, &x, (struct timespec){.tv_sec = 0, .tv_nsec = 10000000});
sleep(1);
ASSERT_EQUAL(0, RMUtilTimer_Terminate(tm));
ASSERT(x > 0);
ASSERT(x <= 100);
return 0;
}
TEST_MAIN({ TESTFUNC(testPeriodic); });

View File

@ -0,0 +1,37 @@
#include <stdio.h>
#include "assert.h"
#include "priority_queue.h"
int cmp(void* i1, void* i2) {
int *__i1 = (int*) i1;
int *__i2 = (int*) i2;
return *__i1 - *__i2;
}
int main(int argc, char **argv) {
PriorityQueue *pq = NewPriorityQueue(int, 10, cmp);
assert(0 == Priority_Queue_Size(pq));
for (int i = 0; i < 5; i++) {
Priority_Queue_Push(pq, i);
}
assert(5 == Priority_Queue_Size(pq));
Priority_Queue_Pop(pq);
assert(4 == Priority_Queue_Size(pq));
Priority_Queue_Push(pq, 10);
Priority_Queue_Push(pq, 20);
Priority_Queue_Push(pq, 15);
int n;
Priority_Queue_Top(pq, &n);
assert(20 == n);
Priority_Queue_Pop(pq);
Priority_Queue_Top(pq, &n);
assert(15 == n);
Priority_Queue_Free(pq);
printf("PASS!\n");
return 0;
}

70
rmutil/test_util.h Normal file
View File

@ -0,0 +1,70 @@
#ifndef __TEST_UTIL_H__
#define __TEST_UTIL_H__
#include "util.h"
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#define RMUtil_Test(f) \
if (argc < 2 || RMUtil_ArgExists(__STRING(f), argv, argc, 1)) { \
int rc = f(ctx); \
if (rc != REDISMODULE_OK) { \
RedisModule_ReplyWithError(ctx, "Test " __STRING(f) " FAILED"); \
return REDISMODULE_ERR;\
}\
}
#define RMUtil_Assert(expr) if (!(expr)) { fprintf (stderr, "Assertion '%s' Failed\n", __STRING(expr)); return REDISMODULE_ERR; }
#define RMUtil_AssertNullReply(rep) RMUtil_Assert( \
RedisModule_CallReplyType(rep) == REDISMODULE_REPLY_NULL || RedisModule_CreateStringFromCallReply(rep) == NULL)
#define RMUtil_AssertReplyEquals(rep, cstr) RMUtil_Assert( \
RMUtil_StringEquals(RedisModule_CreateStringFromCallReply(rep), RedisModule_CreateString(ctx, cstr, strlen(cstr))) \
)
#
/**
* Create an arg list to pass to a redis command handler manually, based on the format in fmt.
* The accepted format specifiers are:
* c - for null terminated c strings
* s - for RedisModuleString* objects
* l - for longs
*
* Example: RMUtil_MakeArgs(ctx, &argc, "clc", "hello", 1337, "world");
*
* Returns an array of RedisModuleString pointers. The size of the array is store in argcp
*/
RedisModuleString **RMUtil_MakeArgs(RedisModuleCtx *ctx, int *argcp, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
RedisModuleString **argv = calloc(strlen(fmt), sizeof(RedisModuleString*));
int argc = 0;
const char *p = fmt;
while(*p) {
if (*p == 'c') {
char *cstr = va_arg(ap,char*);
argv[argc++] = RedisModule_CreateString(ctx, cstr, strlen(cstr));
} else if (*p == 's') {
argv[argc++] = va_arg(ap,void*);;
} else if (*p == 'l') {
long ll = va_arg(ap,long long);
argv[argc++] = RedisModule_CreateStringFromLongLong(ctx, ll);
} else {
goto fmterr;
}
p++;
}
*argcp = argc;
return argv;
fmterr:
free(argv);
return NULL;
}
#endif

58
rmutil/test_vector.c Normal file
View File

@ -0,0 +1,58 @@
#include "vector.h"
#include <stdio.h>
#include "test.h"
int testVector() {
Vector *v = NewVector(int, 1);
ASSERT(v != NULL);
// Vector_Put(v, 0, 1);
// Vector_Put(v, 1, 3);
for (int i = 0; i < 10; i++) {
Vector_Push(v, i);
}
ASSERT_EQUAL(10, Vector_Size(v));
ASSERT_EQUAL(16, Vector_Cap(v));
for (int i = 0; i < Vector_Size(v); i++) {
int n;
int rc = Vector_Get(v, i, &n);
ASSERT_EQUAL(1, rc);
// printf("%d %d\n", rc, n);
ASSERT_EQUAL(n, i);
}
Vector_Free(v);
v = NewVector(char *, 0);
int N = 4;
char *strings[4] = {"hello", "world", "foo", "bar"};
for (int i = 0; i < N; i++) {
Vector_Push(v, strings[i]);
}
ASSERT_EQUAL(N, Vector_Size(v));
ASSERT(Vector_Cap(v) >= N);
for (int i = 0; i < Vector_Size(v); i++) {
char *x;
int rc = Vector_Get(v, i, &x);
ASSERT_EQUAL(1, rc);
ASSERT_STRING_EQ(x, strings[i]);
}
int rc = Vector_Get(v, 100, NULL);
ASSERT_EQUAL(0, rc);
Vector_Free(v);
return 0;
// Vector_Push(v, "hello");
// Vector_Push(v, "world");
// char *x = NULL;
// int rc = Vector_Getx(v, 0, &x);
// printf("rc: %d got %s\n", rc, x);
}
TEST_MAIN({ TESTFUNC(testVector); });

299
rmutil/util.c Normal file
View File

@ -0,0 +1,299 @@
#include <stdlib.h>
#include <errno.h>
#include <math.h>
#include <ctype.h>
#include <sys/time.h>
#include <stdarg.h>
#include <limits.h>
#include <string.h>
#define REDISMODULE_EXPERIMENTAL_API
#include <redismodule.h>
#include "util.h"
/**
Check if an argument exists in an argument list (argv,argc), starting at offset.
@return 0 if it doesn't exist, otherwise the offset it exists in
*/
int RMUtil_ArgExists(const char *arg, RedisModuleString **argv, int argc, int offset) {
size_t larg = strlen(arg);
for (; offset < argc; offset++) {
size_t l;
const char *carg = RedisModule_StringPtrLen(argv[offset], &l);
if (l != larg) continue;
if (carg != NULL && strncasecmp(carg, arg, larg) == 0) {
return offset;
}
}
return 0;
}
/**
Check if an argument exists in an argument list (argv,argc)
@return -1 if it doesn't exist, otherwise the offset it exists in
*/
int RMUtil_ArgIndex(const char *arg, RedisModuleString **argv, int argc) {
size_t larg = strlen(arg);
for (int offset = 0; offset < argc; offset++) {
size_t l;
const char *carg = RedisModule_StringPtrLen(argv[offset], &l);
if (l != larg) continue;
if (carg != NULL && strncasecmp(carg, arg, larg) == 0) {
return offset;
}
}
return -1;
}
RMUtilInfo *RMUtil_GetRedisInfo(RedisModuleCtx *ctx) {
RedisModuleCallReply *r = RedisModule_Call(ctx, "INFO", "c", "all");
if (r == NULL || RedisModule_CallReplyType(r) == REDISMODULE_REPLY_ERROR) {
return NULL;
}
int cap = 100; // rough estimate of info lines
RMUtilInfo *info = malloc(sizeof(RMUtilInfo));
info->entries = calloc(cap, sizeof(RMUtilInfoEntry));
int i = 0;
size_t sz;
char *text = (char *)RedisModule_CallReplyStringPtr(r, &sz);
char *line = text;
while (line && line < text + sz) {
char *line = strsep(&text, "\r\n");
if (line == NULL) break;
if (!(*line >= 'a' && *line <= 'z')) { // skip non entry lines
continue;
}
char *key = strsep(&line, ":");
info->entries[i].key = strdup(key);
info->entries[i].val = strdup(line);
i++;
if (i >= cap) {
cap *= 2;
info->entries = realloc(info->entries, cap * sizeof(RMUtilInfoEntry));
}
}
info->numEntries = i;
RedisModule_FreeCallReply(r);
return info;
}
void RMUtilRedisInfo_Free(RMUtilInfo *info) {
for (int i = 0; i < info->numEntries; i++) {
free(info->entries[i].key);
free(info->entries[i].val);
}
free(info->entries);
free(info);
}
int RMUtilInfo_GetInt(RMUtilInfo *info, const char *key, long long *val) {
const char *p = NULL;
if (!RMUtilInfo_GetString(info, key, &p)) {
return 0;
}
*val = strtoll(p, NULL, 10);
if ((errno == ERANGE && (*val == LONG_MAX || *val == LONG_MIN)) || (errno != 0 && *val == 0)) {
*val = -1;
return 0;
}
return 1;
}
int RMUtilInfo_GetString(RMUtilInfo *info, const char *key, const char **str) {
int i;
for (i = 0; i < info->numEntries; i++) {
if (!strcmp(key, info->entries[i].key)) {
*str = info->entries[i].val;
return 1;
}
}
return 0;
}
int RMUtilInfo_GetDouble(RMUtilInfo *info, const char *key, double *d) {
const char *p = NULL;
if (!RMUtilInfo_GetString(info, key, &p)) {
printf("not found %s\n", key);
return 0;
}
*d = strtod(p, NULL);
if ((errno == ERANGE && (*d == HUGE_VAL || *d == -HUGE_VAL)) || (errno != 0 && *d == 0)) {
return 0;
}
return 1;
}
/*
c -- pointer to a Null terminated C string pointer.
b -- pointer to a C buffer, followed by pointer to a size_t for its length
s -- pointer to a RedisModuleString
l -- pointer to Long long integer.
d -- pointer to a Double
* -- do not parse this argument at all
*/
int RMUtil_ParseArgs(RedisModuleString **argv, int argc, int offset, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
int rc = rmutil_vparseArgs(argv, argc, offset, fmt, ap);
va_end(ap);
return rc;
}
// Internal function that parses arguments based on the format described above
int rmutil_vparseArgs(RedisModuleString **argv, int argc, int offset, const char *fmt, va_list ap) {
int i = offset;
char *c = (char *)fmt;
while (*c && i < argc) {
// read c string
if (*c == 'c') {
char **p = va_arg(ap, char **);
*p = (char *)RedisModule_StringPtrLen(argv[i], NULL);
} else if (*c == 'b') {
char **p = va_arg(ap, char **);
size_t *len = va_arg(ap, size_t *);
*p = (char *)RedisModule_StringPtrLen(argv[i], len);
} else if (*c == 's') { // read redis string
RedisModuleString **s = va_arg(ap, void *);
*s = argv[i];
} else if (*c == 'l') { // read long
long long *l = va_arg(ap, long long *);
if (RedisModule_StringToLongLong(argv[i], l) != REDISMODULE_OK) {
return REDISMODULE_ERR;
}
} else if (*c == 'd') { // read double
double *d = va_arg(ap, double *);
if (RedisModule_StringToDouble(argv[i], d) != REDISMODULE_OK) {
return REDISMODULE_ERR;
}
} else if (*c == '*') { // skip current arg
// do nothing
} else {
return REDISMODULE_ERR; // WAT?
}
c++;
i++;
}
// if the format is longer than argc, retun an error
if (*c != 0) {
return REDISMODULE_ERR;
}
return REDISMODULE_OK;
}
int RMUtil_ParseArgsAfter(const char *token, RedisModuleString **argv, int argc, const char *fmt,
...) {
int pos = RMUtil_ArgIndex(token, argv, argc);
if (pos < 0) {
return REDISMODULE_ERR;
}
va_list ap;
va_start(ap, fmt);
int rc = rmutil_vparseArgs(argv, argc, pos + 1, fmt, ap);
va_end(ap);
return rc;
}
RedisModuleCallReply *RedisModule_CallReplyArrayElementByPath(RedisModuleCallReply *rep,
const char *path) {
if (rep == NULL) return NULL;
RedisModuleCallReply *ele = rep;
const char *s = path;
char *e;
long idx;
do {
errno = 0;
idx = strtol(s, &e, 10);
if ((errno == ERANGE && (idx == LONG_MAX || idx == LONG_MIN)) || (errno != 0 && idx == 0) ||
(REDISMODULE_REPLY_ARRAY != RedisModule_CallReplyType(ele)) || (s == e)) {
ele = NULL;
break;
}
s = e;
ele = RedisModule_CallReplyArrayElement(ele, idx - 1);
} while ((ele != NULL) && (*e != '\0'));
return ele;
}
int RedisModule_TryGetValue(RedisModuleKey *key, const RedisModuleType *type, void **out) {
if (key == NULL) {
return RMUTIL_VALUE_MISSING;
}
int keytype = RedisModule_KeyType(key);
if (keytype == REDISMODULE_KEYTYPE_EMPTY) {
return RMUTIL_VALUE_EMPTY;
} else if (keytype == REDISMODULE_KEYTYPE_MODULE && RedisModule_ModuleTypeGetType(key) == type) {
*out = RedisModule_ModuleTypeGetValue(key);
return RMUTIL_VALUE_OK;
} else {
return RMUTIL_VALUE_MISMATCH;
}
}
RedisModuleString **RMUtil_ParseVarArgs(RedisModuleString **argv, int argc, int offset,
const char *keyword, size_t *nargs) {
if (offset > argc) {
return NULL;
}
argv += offset;
argc -= offset;
int ix = RMUtil_ArgIndex(keyword, argv, argc);
if (ix < 0) {
return NULL;
} else if (ix >= argc - 1) {
*nargs = RMUTIL_VARARGS_BADARG;
return argv;
}
argv += (ix + 1);
argc -= (ix + 1);
long long n = 0;
RMUtil_ParseArgs(argv, argc, 0, "l", &n);
if (n > argc - 1 || n < 0) {
*nargs = RMUTIL_VARARGS_BADARG;
return argv;
}
*nargs = n;
return argv + 1;
}
void RMUtil_DefaultAofRewrite(RedisModuleIO *aof, RedisModuleString *key, void *value) {
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(NULL);
RedisModuleCallReply *rep = RedisModule_Call(ctx, "DUMP", "s", key);
if (rep != NULL && RedisModule_CallReplyType(rep) == REDISMODULE_REPLY_STRING) {
size_t n;
const char *s = RedisModule_CallReplyStringPtr(rep, &n);
RedisModule_EmitAOF(aof, "RESTORE", "slb", key, 0, s, n);
} else {
RedisModule_Log(RedisModule_GetContextFromIO(aof), "warning", "Failed to emit AOF");
}
if (rep != NULL) {
RedisModule_FreeCallReply(rep);
}
RedisModule_FreeThreadSafeContext(ctx);
}

151
rmutil/util.h Normal file
View File

@ -0,0 +1,151 @@
#ifndef __UTIL_H__
#define __UTIL_H__
#include <redismodule.h>
#include <stdarg.h>
/// make sure the response is not NULL or an error, and if it is sends the error to the client and
/// exit the current function
#define RMUTIL_ASSERT_NOERROR(ctx, r) \
if (r == NULL) { \
return RedisModule_ReplyWithError(ctx, "ERR reply is NULL"); \
} else if (RedisModule_CallReplyType(r) == REDISMODULE_REPLY_ERROR) { \
RedisModule_ReplyWithCallReply(ctx, r); \
return REDISMODULE_ERR; \
}
#define __rmutil_register_cmd(ctx, cmd, f, mode) \
if (RedisModule_CreateCommand(ctx, cmd, f, mode, 1, 1, 1) == REDISMODULE_ERR) \
return REDISMODULE_ERR;
#define RMUtil_RegisterReadCmd(ctx, cmd, f) __rmutil_register_cmd(ctx, cmd, f, "readonly")
#define RMUtil_RegisterWriteCmd(ctx, cmd, f) __rmutil_register_cmd(ctx, cmd, f, "write")
#define RMUtil_RegisterWriteDenyOOMCmd(ctx, cmd, f) __rmutil_register_cmd(ctx, cmd, f, "write deny-oom")
/* RedisModule utilities. */
/** DEPRECATED: Return the offset of an arg if it exists in the arg list, or 0 if it's not there */
int RMUtil_ArgExists(const char *arg, RedisModuleString **argv, int argc, int offset);
/* Same as argExists but returns -1 if not found. Use this, RMUtil_ArgExists is kept for backwards
compatibility. */
int RMUtil_ArgIndex(const char *arg, RedisModuleString **argv, int argc);
/**
Automatically conver the arg list to corresponding variable pointers according to a given format.
You pass it the command arg list and count, the starting offset, a parsing format, and pointers to
the variables.
The format is a string consisting of the following identifiers:
c -- pointer to a Null terminated C string pointer.
s -- pointer to a RedisModuleString
l -- pointer to Long long integer.
d -- pointer to a Double
* -- do not parse this argument at all
Example: If I want to parse args[1], args[2] as a long long and double, I do:
double d;
long long l;
RMUtil_ParseArgs(argv, argc, 1, "ld", &l, &d);
*/
int RMUtil_ParseArgs(RedisModuleString **argv, int argc, int offset, const char *fmt, ...);
/**
Same as RMUtil_ParseArgs, but only parses the arguments after `token`, if it was found.
This is useful for optional stuff like [LIMIT [offset] [limit]]
*/
int RMUtil_ParseArgsAfter(const char *token, RedisModuleString **argv, int argc, const char *fmt,
...);
int rmutil_vparseArgs(RedisModuleString **argv, int argc, int offset, const char *fmt, va_list ap);
#define RMUTIL_VARARGS_BADARG ((size_t)-1)
/**
* Parse arguments in the form of KEYWORD {len} {arg} .. {arg}_len.
* If keyword is present, returns the position within `argv` containing the arguments.
* Returns NULL if the keyword is not found.
* If a parse error has occurred, `nargs` is set to RMUTIL_VARARGS_BADARG, but
* the return value is not NULL.
*/
RedisModuleString **RMUtil_ParseVarArgs(RedisModuleString **argv, int argc, int offset,
const char *keyword, size_t *nargs);
/**
* Default implementation of an AoF rewrite function that simply calls DUMP/RESTORE
* internally. To use this function, pass it as the .aof_rewrite value in
* RedisModuleTypeMethods
*/
void RMUtil_DefaultAofRewrite(RedisModuleIO *aof, RedisModuleString *key, void *value);
// A single key/value entry in a redis info map
typedef struct {
char *key;
char *val;
} RMUtilInfoEntry;
// Representation of INFO command response, as a list of k/v pairs
typedef struct {
RMUtilInfoEntry *entries;
int numEntries;
} RMUtilInfo;
/**
* Get redis INFO result and parse it as RMUtilInfo.
* Returns NULL if something goes wrong.
* The resulting object needs to be freed with RMUtilRedisInfo_Free
*/
RMUtilInfo *RMUtil_GetRedisInfo(RedisModuleCtx *ctx);
/**
* Free an RMUtilInfo object and its entries
*/
void RMUtilRedisInfo_Free(RMUtilInfo *info);
/**
* Get an integer value from an info object. Returns 1 if the value was found and
* is an integer, 0 otherwise. the value is placed in 'val'
*/
int RMUtilInfo_GetInt(RMUtilInfo *info, const char *key, long long *val);
/**
* Get a string value from an info object. The value is placed in str.
* Returns 1 if the key was found, 0 if not
*/
int RMUtilInfo_GetString(RMUtilInfo *info, const char *key, const char **str);
/**
* Get a double value from an info object. Returns 1 if the value was found and is
* a correctly formatted double, 0 otherwise. the value is placed in 'd'
*/
int RMUtilInfo_GetDouble(RMUtilInfo *info, const char *key, double *d);
/*
* Returns a call reply array's element given by a space-delimited path. E.g.,
* the path "1 2 3" will return the 3rd element from the 2 element of the 1st
* element from an array (or NULL if not found)
*/
RedisModuleCallReply *RedisModule_CallReplyArrayElementByPath(RedisModuleCallReply *rep,
const char *path);
/**
* Extract the module type from an opened key.
*/
typedef enum {
RMUTIL_VALUE_OK = 0,
RMUTIL_VALUE_MISSING,
RMUTIL_VALUE_EMPTY,
RMUTIL_VALUE_MISMATCH
} RMUtil_TryGetValueStatus;
/**
* Tries to extract the module-specific type from the value.
* @param key an opened key (may be null)
* @param type the pointer to the type to match to
* @param[out] out if the value is present, will be set to it.
* @return a value in the @ref RMUtil_TryGetValueStatus enum.
*/
int RedisModule_TryGetValue(RedisModuleKey *key, const RedisModuleType *type, void **out);
#endif

88
rmutil/vector.c Normal file
View File

@ -0,0 +1,88 @@
#include "vector.h"
#include <stdio.h>
inline int __vector_PushPtr(Vector *v, void *elem) {
if (v->top == v->cap) {
Vector_Resize(v, v->cap ? v->cap * 2 : 1);
}
__vector_PutPtr(v, v->top, elem);
return v->top;
}
inline int Vector_Get(Vector *v, size_t pos, void *ptr) {
// return 0 if pos is out of bounds
if (pos >= v->top) {
return 0;
}
memcpy(ptr, v->data + (pos * v->elemSize), v->elemSize);
return 1;
}
/* Get the element at the end of the vector, decreasing the size by one */
inline int Vector_Pop(Vector *v, void *ptr) {
if (v->top > 0) {
if (ptr != NULL) {
Vector_Get(v, v->top - 1, ptr);
}
v->top--;
return 1;
}
return 0;
}
inline int __vector_PutPtr(Vector *v, size_t pos, void *elem) {
// resize if pos is out of bounds
if (pos >= v->cap) {
Vector_Resize(v, pos + 1);
}
if (elem) {
memcpy(v->data + pos * v->elemSize, elem, v->elemSize);
} else {
memset(v->data + pos * v->elemSize, 0, v->elemSize);
}
// move the end offset to pos if we grew
if (pos >= v->top) {
v->top = pos + 1;
}
return 1;
}
int Vector_Resize(Vector *v, size_t newcap) {
int oldcap = v->cap;
v->cap = newcap;
v->data = realloc(v->data, v->cap * v->elemSize);
// If we grew:
// put all zeros at the newly realloc'd part of the vector
if (newcap > oldcap) {
int offset = oldcap * v->elemSize;
memset(v->data + offset, 0, v->cap * v->elemSize - offset);
}
return v->cap;
}
Vector *__newVectorSize(size_t elemSize, size_t cap) {
Vector *vec = malloc(sizeof(Vector));
vec->data = calloc(cap, elemSize);
vec->top = 0;
vec->elemSize = elemSize;
vec->cap = cap;
return vec;
}
void Vector_Free(Vector *v) {
free(v->data);
free(v);
}
/* return the used size of the vector, regardless of capacity */
inline int Vector_Size(Vector *v) { return v->top; }
/* return the actual capacity */
inline int Vector_Cap(Vector *v) { return v->cap; }

73
rmutil/vector.h Normal file
View File

@ -0,0 +1,73 @@
#ifndef __VECTOR_H__
#define __VECTOR_H__
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
/*
* Generic resizable vector that can be used if you just want to store stuff
* temporarily.
* Works like C++ std::vector with an underlying resizable buffer
*/
typedef struct {
char *data;
size_t elemSize;
size_t cap;
size_t top;
} Vector;
/* Create a new vector with element size. This should generally be used
* internall by the NewVector macro */
Vector *__newVectorSize(size_t elemSize, size_t cap);
// Put a pointer in the vector. To be used internall by the library
int __vector_PutPtr(Vector *v, size_t pos, void *elem);
/*
* Create a new vector for a given type and a given capacity.
* e.g. NewVector(int, 0) - empty vector of ints
*/
#define NewVector(type, cap) __newVectorSize(sizeof(type), cap)
/*
* get the element at index pos. The value is copied in to ptr. If pos is outside
* the vector capacity, we return 0
* otherwise 1
*/
int Vector_Get(Vector *v, size_t pos, void *ptr);
/* Get the element at the end of the vector, decreasing the size by one */
int Vector_Pop(Vector *v, void *ptr);
//#define Vector_Getx(v, pos, ptr) pos < v->cap ? 1 : 0; *ptr =
//*(typeof(ptr))(v->data + v->elemSize*pos)
/*
* Put an element at pos.
* Note: If pos is outside the vector capacity, we resize it accordingly
*/
#define Vector_Put(v, pos, elem) __vector_PutPtr(v, pos, elem ? &(typeof(elem)){elem} : NULL)
/* Push an element at the end of v, resizing it if needed. This macro wraps
* __vector_PushPtr */
#define Vector_Push(v, elem) __vector_PushPtr(v, elem ? &(typeof(elem)){elem} : NULL)
int __vector_PushPtr(Vector *v, void *elem);
/* resize capacity of v */
int Vector_Resize(Vector *v, size_t newcap);
/* return the used size of the vector, regardless of capacity */
int Vector_Size(Vector *v);
/* return the actual capacity */
int Vector_Cap(Vector *v);
/* free the vector and the underlying data. Does not release its elements if
* they are pointers*/
void Vector_Free(Vector *v);
int __vecotr_PutPtr(Vector *v, size_t pos, void *elem);
#endif

38
src/Makefile Normal file
View File

@ -0,0 +1,38 @@
#set environment variable RM_INCLUDE_DIR to the location of redismodule.h
ifndef RM_INCLUDE_DIR
RM_INCLUDE_DIR=../
endif
ifndef RMUTIL_LIBDIR
RMUTIL_LIBDIR=../rmutil
endif
# find the OS
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
# Compile flags for linux / osx
ifeq ($(uname_S),Linux)
SHOBJ_CFLAGS ?= -fno-common -g -ggdb -lc -lm
SHOBJ_LDFLAGS ?= -shared -Bsymbolic
else ifeq ($(uname_S),Darwin)
SHOBJ_CFLAGS ?= -dynamic -fno-common -g -ggdb
SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup -macosx_version_min 10.13
else
SHOBJ_CFLAGS ?= -dynamic -fno-common -g -ggdb -lc -lm
SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup
endif
CFLAGS = -I$(RM_INCLUDE_DIR) -Wall -g -fPIC -std=gnu99
CC=gcc
all: rmutil dbx.so
rmutil: FORCE
$(MAKE) -C $(RMUTIL_LIBDIR)
dbx.so: dbx.o
$(LD) -o $@ dbx.o $(SHOBJ_LDFLAGS) $(LIBS) -L$(RMUTIL_LIBDIR) -lrmutil -lc
clean:
rm -rf *.xo *.so *.o
FORCE:

430
src/dbx.c Normal file
View File

@ -0,0 +1,430 @@
#include <stdlib.h>
#include <regex.h>
#include <ctype.h>
#include "../redismodule.h"
#include "../rmutil/util.h"
#include "../rmutil/strings.h"
#include "../rmutil/vector.h"
#include "../rmutil/test_util.h"
/* Helper function: compiles a regex, or dies complaining. */
int regexCompile(RedisModuleCtx *ctx, regex_t *r, const char *t) {
int status = regcomp(r, t, REG_EXTENDED | REG_NOSUB | REG_NEWLINE);
if (status) {
char rerr[128];
char err[256];
regerror(status, r, rerr, 128);
sprintf(err, "ERR regex compilation failed: %s", rerr);
RedisModule_ReplyWithError(ctx, err);
return status;
}
return 0;
}
char* toLower(char* s) {
for(char *p = s; *p; p++)
*p = tolower(*p);
return s;
}
char* toUpper(char* s) {
for(char *p = s; *p; p++)
*p = toupper(*p);
return s;
}
int whereRecord(RedisModuleCtx *ctx, RedisModuleString *key, Vector *vWhere) {
char *field, *w;
int condition;
int match = 1;
// If where statement is defined, get the specified hash content and do comparison
if (Vector_Size(vWhere) == 3) {
Vector_Get(vWhere, 0, &field);
Vector_Get(vWhere, 1, &condition);
Vector_Get(vWhere, 2, &w);
if (condition == 7)
toLower(w);
if (strlen(w) == 0)
match = 0;
else {
RedisModuleCallReply *tags = RedisModule_Call(ctx, "HGET", "sc", key, field);
if (RedisModule_CallReplyLength(tags) > 0) {
RedisModuleString *rms = RedisModule_CreateStringFromCallReply(tags);
size_t l;
const char *s = RedisModule_StringPtrLen(rms, &l);
switch(condition) {
case 0:
match = strcmp(s, w) >= 0? 1: 0;
break;
case 1:
match = strcmp(s, w) <= 0? 1: 0;
break;
case 2:
case 3:
match = strcmp(s, w) != 0? 1: 0;
break;
case 4:
match = strcmp(s, w) > 0? 1: 0;
break;
case 5:
match = strcmp(s, w) < 0? 1: 0;
break;
case 6:
match = strcmp(s, w) == 0? 1: 0;
break;
case 7:
match = strstr(toLower((char*)s), w)? 1: 0;
break;
}
RedisModule_FreeString(ctx, rms);
RedisModule_FreeCallReply(tags);
}
else
match = 0;
}
}
return match;
}
void showRecord(RedisModuleCtx *ctx, RedisModuleString *key, Vector *vSelect) {
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
char* field;
size_t nSelected = Vector_Size(vSelect);
size_t n = 0;
for(size_t i = 0; i < nSelected; i++) {
Vector_Get(vSelect, i, &field);
// If '*' is specified in selected hash list, display all hashs then
if (strcmp(field, "*") == 0) {
RedisModuleCallReply *tags = RedisModule_Call(ctx, "HGETALL", "s", key);
size_t tf = RedisModule_CallReplyLength(tags);
if (tf > 0) {
for(size_t j=0; j<tf; j++) {
RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(tags, j));
RedisModule_ReplyWithString(ctx, rms);
n++;
RedisModule_FreeString(ctx, rms);
}
}
RedisModule_FreeCallReply(tags);
}
else {
// Display the hash name and content
RedisModuleCallReply *tags = RedisModule_Call(ctx, "HGET", "sc", key, field);
if (RedisModule_CallReplyLength(tags) > 0) {
RedisModuleString *rms = RedisModule_CreateStringFromCallReply(tags);
RedisModule_ReplyWithSimpleString(ctx, field);
RedisModule_ReplyWithString(ctx, rms);
RedisModule_FreeString(ctx, rms);
}
else
RedisModule_ReplyWithNull(ctx); // If hash is undefined
n+=2;
RedisModule_FreeCallReply(tags);
}
}
RedisModule_ReplySetArrayLength(ctx, n);
}
/* Split the string by specified delimilator */
Vector* splitStringByChar(char *s, char* d) {
size_t cap;
char *p = s;
for (cap=1; p[cap]; p[cap]==d[0] ? cap++ : *p++);
Vector *v = NewVector(void *, cap);
if (strlen(s) > 0) {
char *token = strtok(s, d);
for(int i=0; i<cap; i++) {
if (token != NULL) Vector_Push(v, token);
token = strtok(NULL, d);
}
}
return v;
}
Vector* splitWhereString(char *s) {
Vector *v = NewVector(void *, 16);
static char chk[8][3] = {">=", "<=", "!=", "<>", ">", "<", "=", "~"};
char *p = s;
while(*p++) {
for(int i=0; i<8; i++) {
char *c = chk[i];
if (strncmp(c, p, strlen(c)) == 0) {
*p = 0;
p += strlen(c);
Vector_Push(v, s);
Vector_Push(v, i);
Vector_Push(v, p);
break;
}
}
}
return v;
}
int processRecords(RedisModuleCtx *ctx, RedisModuleCallReply *keys, regex_t *r, Vector *vSelect, Vector *vWhere) {
size_t nKeys = RedisModule_CallReplyLength(keys);
size_t affected = 0;
for (size_t i = 0; i < nKeys; i++) {
RedisModuleString *key = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(keys, i));
size_t l;
const char *s = RedisModule_StringPtrLen(key, &l);
if (!regexec(r, s, 1, NULL, 0)) {
if (vWhere == NULL || whereRecord(ctx, key, vWhere)) {
showRecord(ctx, key, vSelect);
affected++;
}
}
RedisModule_FreeString(ctx, key);
}
return affected;
}
/* Create temporary set for sorting */
int buildSetByPattern(RedisModuleCtx *ctx, regex_t *r, char *setName, Vector *vWhere) {
RedisModule_Call(ctx, "del", "c", setName);
RedisModuleString *scursor = RedisModule_CreateStringFromLongLong(ctx, 0);
long long lcursor;
size_t affected = 0;
do {
RedisModuleCallReply *rep = RedisModule_Call(ctx, "SCAN", "s", scursor);
/* Get the current cursor. */
scursor = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(rep, 0));
RedisModule_StringToLongLong(scursor, &lcursor);
/* Filter by pattern matching. */
RedisModuleCallReply *keys = RedisModule_CallReplyArrayElement(rep, 1);
size_t nKeys = RedisModule_CallReplyLength(keys);
for (size_t i = 0; i < nKeys; i++) {
RedisModuleString *key = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(keys, i));
size_t l;
const char *s = RedisModule_StringPtrLen(key, &l);
if (!regexec(r, s, 1, NULL, 0)) {
if (vWhere == NULL || whereRecord(ctx, key, vWhere)) {
RedisModule_Call(ctx, "SADD", "cs", setName, key);
affected++;
}
}
RedisModule_FreeString(ctx, key);
}
RedisModule_FreeCallReply(keys);
RedisModule_FreeCallReply(rep);
} while (lcursor);
return affected;
}
int SelectCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc < 2)
return RedisModule_WrongArity(ctx);
// Table
RedisModuleString *fromKeys;
// Process the arguments
size_t plen;
char s[1024] = "";
for (int i=1; i<argc; i++) {
if (strlen(s) > 0) strcat(s, " ");
const char *temp = RedisModule_StringPtrLen(argv[i], &plen);
if (strlen(s) + plen > 1024) {
RedisModule_ReplyWithError(ctx, "arguments are too long");
return REDISMODULE_ERR;
}
if (argc > 2) { // argc > 2 means the arguments are not in quoted. i.e. "..."
char *p = (char*)temp;
while (*p++) *p = *p == 32? 7: *p; // Convert all spaces in tabs, then convert back during parsing
}
if (strcmp("like", temp) == 0)
strcat(s, "~");
else
strcat(s, temp);
}
int step = 0;
char temp[1024] = "";
char stmSelect[1024] = "";
char stmWhere[1024] = "";
char stmOrder[1024] = "";
char *token = strtok(s, " ");
while (token != NULL) {
// If it is beginning in single quote, find the end quote in the following tokens
if (token[0] == 39) {
strcpy(temp, &token[1]);
strcat(temp, " ");
strcat(temp, strtok(NULL, "'"));
strcpy(token, temp);
}
switch(step) {
case 0:
if (strcmp("from", token) == 0)
step = -1;
else {
if (strlen(stmSelect) + strlen(token) > 512) {
RedisModule_ReplyWithError(ctx, "select arguments are too long");
return REDISMODULE_ERR;
}
strcat(stmSelect, token);
}
break;
case -1:
// parse from statement
fromKeys = RMUtil_CreateFormattedString(ctx, token);
step = 2;
break;
case 2:
if (strcmp("where", token) == 0)
step = -3;
else if (strcmp("order", token) == 0)
step = -5;
else {
RedisModule_ReplyWithError(ctx, "where or order statement is expected");
return REDISMODULE_ERR;
}
break;
case -3:
case 4:
// parse where statement
if (strcmp("order", token) == 0)
step = -5;
else {
if (strlen(stmWhere) + strlen(token) > 512) {
RedisModule_ReplyWithError(ctx, "where arguments are too long");
return REDISMODULE_ERR;
}
char *p = token;
while (*p++) *p = *p == 7? 32: *p;
strcat(stmWhere, token);
step = 4;
}
break;
case -5:
if (strcmp("by", token) == 0)
step = -6;
else {
RedisModule_ReplyWithError(ctx, "missing 'by' after order");
return REDISMODULE_ERR;
}
break;
case -6:
case 7:
// parse order statement
if (strlen(stmOrder) + strlen(token) > 512) {
RedisModule_ReplyWithError(ctx, "order arguments are too long");
return REDISMODULE_ERR;
}
if (strcmp("desc", token) == 0)
strcat(stmOrder, "-");
else
if (strcmp("asc", token) != 0)
strcat(stmOrder, token);
step = 7;
break;
}
token = strtok(NULL, " ");
}
if (step <= 0) {
RedisModule_ReplyWithError(ctx, "parse error");
return REDISMODULE_ERR;
}
Vector *vSelect = splitStringByChar(stmSelect, ",");
Vector *vWhere = splitWhereString(stmWhere);
Vector *vOrder = splitStringByChar(stmOrder, ",");
RedisModule_AutoMemory(ctx);
/* Convert key to regex */
const char *pat = RedisModule_StringPtrLen(fromKeys, &plen);
regex_t regex;
if (regexCompile(ctx, &regex, pat)) return REDISMODULE_ERR;
/* Print result in array format */
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
if (Vector_Size(vOrder) > 0) {
// temporary set name
char setName[32];
sprintf(setName, "__db_tempset_%i", rand());
char stmt[128];
RedisModuleCallReply *rep;
if (buildSetByPattern(ctx, &regex, setName, vWhere) > 0) {
// Sort the fields under the key and send the resulting array to processRecords module
char *field;
Vector_Get(vOrder, 0, &field);
if (field[strlen(field)-1] == '-') {
field[strlen(field)-1] = 0;
sprintf(stmt, "*->%s", field);
rep = RedisModule_Call(ctx, "SORT", "ccccc", setName, "by", stmt, "desc", "alpha");
}
else {
sprintf(stmt, "*->%s", field);
rep = RedisModule_Call(ctx, "SORT", "cccc", setName, "by", stmt, "alpha");
}
size_t n = processRecords(ctx, rep, &regex, vSelect, NULL);
RedisModule_FreeCallReply(rep);
// set number of output
RedisModule_ReplySetArrayLength(ctx, n);
}
else
RedisModule_ReplySetArrayLength(ctx, 0);
// Remove the temporary set before leave
RedisModule_Call(ctx, "DEL", "c", setName);
}
else {
RedisModuleString *scursor = RedisModule_CreateStringFromLongLong(ctx, 0);
long long lcursor;
size_t n = 0;
do {
RedisModuleCallReply *rep = RedisModule_Call(ctx, "SCAN", "s", scursor);
/* Get the current cursor. */
scursor = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(rep, 0));
RedisModule_StringToLongLong(scursor, &lcursor);
/* Filter by pattern matching. */
RedisModuleCallReply *rkeys = RedisModule_CallReplyArrayElement(rep, 1);
n += processRecords(ctx, rkeys, &regex, vSelect, vWhere);
RedisModule_FreeCallReply(rep);
} while (lcursor);
RedisModule_ReplySetArrayLength(ctx, n);
RedisModule_FreeString(ctx, scursor);
}
RedisModule_FreeString(ctx, fromKeys);
Vector_Free(vSelect);
Vector_Free(vWhere);
Vector_Free(vOrder);
return REDISMODULE_OK;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx) {
// Register the module
if (RedisModule_Init(ctx, "dbx", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
// Register the command
if (RedisModule_CreateCommand(ctx, "dbx.select", SelectCommand, "readonly", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}

354
src/module.c Normal file
View File

@ -0,0 +1,354 @@
#include <stdlib.h>
#include <regex.h>
#include "../redismodule.h"
#include "../rmutil/util.h"
#include "../rmutil/strings.h"
#include "../rmutil/vector.h"
#include "../rmutil/test_util.h"
/* Helper function: compiles a regex, or dies complaining. */
int regexCompile(RedisModuleCtx *ctx, regex_t *r, const char *t) {
int status = regcomp(r, t, REG_EXTENDED | REG_NOSUB | REG_NEWLINE);
if (status) {
char rerr[128];
char err[256];
regerror(status, r, rerr, 128);
sprintf(err, "ERR regex compilation failed: %s", rerr);
RedisModule_ReplyWithError(ctx, err);
return status;
}
return 0;
}
int getRecord(RedisModuleCtx *ctx, RedisModuleString *key, Vector *vSelect, Vector *vWhere) {
char* field;
int match = 1;
size_t i, l;
// If where statement is defined, get the specified hash content and do comparison
if (Vector_Size(vWhere) == 2) {
Vector_Get(vWhere, 0, &field);
RedisModuleCallReply *tags = RedisModule_Call(ctx, "HGET", "sc", key, field);
if (RedisModule_CallReplyLength(tags) > 0) {
RedisModuleString *rms = RedisModule_CreateStringFromCallReply(tags);
Vector_Get(vWhere, 1, &field);
if (strlen(field) > 0) {
const char *s = RedisModule_StringPtrLen(rms, &l);
match = (strcmp(field, s) == 0)? 1: 0;
RedisModule_FreeString(ctx, rms);
}
else
match = 0;
}
else
match = 0;
RedisModule_FreeCallReply(tags);
}
// If match, print the specified hashs
if (match) {
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
size_t nSelected = Vector_Size(vSelect);
size_t n = 0;
for(i = 0; i < nSelected; i++) {
Vector_Get(vSelect, i, &field);
// If '*' is specified in selected hash list, display all hashs then
if (strcmp(field, "*") == 0) {
RedisModuleCallReply *tags = RedisModule_Call(ctx, "HGETALL", "s", key);
size_t tf = RedisModule_CallReplyLength(tags);
if (tf > 0) {
for(size_t j=0; j<tf; j++) {
RedisModuleString *rms = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(tags, j));
RedisModule_ReplyWithString(ctx, rms);
n++;
RedisModule_FreeString(ctx, rms);
}
}
RedisModule_FreeCallReply(tags);
}
else {
// Display the hash name and content
RedisModuleCallReply *tags = RedisModule_Call(ctx, "HGET", "sc", key, field);
if (RedisModule_CallReplyLength(tags) > 0) {
RedisModuleString *rms = RedisModule_CreateStringFromCallReply(tags);
RedisModule_ReplyWithSimpleString(ctx, field);
RedisModule_ReplyWithString(ctx, rms);
RedisModule_FreeString(ctx, rms);
}
else
RedisModule_ReplyWithNull(ctx); // If hash is undefined
n+=2;
RedisModule_FreeCallReply(tags);
}
}
RedisModule_ReplySetArrayLength(ctx, n);
}
return match;
}
/* Split the string by specified delimilator */
Vector* splitStringByChar(char *s, char* d) {
size_t cap;
char *p = s;
for (cap=1; p[cap]; p[cap]==d[0] ? cap++ : *p++);
Vector *v = NewVector(void *, cap);
if (strlen(s) > 0) {
char *token = strtok(s, d);
for(int i=0; i<cap; i++) {
if (token != NULL) Vector_Push(v, token);
token = strtok(NULL, d);
}
}
return v;
}
int processRecords(RedisModuleCtx *ctx, RedisModuleCallReply *keys, regex_t *r, Vector *vSelect, Vector *vWhere) {
size_t nKeys = RedisModule_CallReplyLength(keys);
size_t affected = 0;
for (size_t i = 0; i < nKeys; i++) {
RedisModuleString *key = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(keys, i));
size_t l;
const char *s = RedisModule_StringPtrLen(key, &l);
if (!regexec(r, s, 1, NULL, 0))
affected += getRecord(ctx, key, vSelect, vWhere);
RedisModule_FreeString(ctx, key);
}
return affected;
}
/* Create temporary set for sorting */
int buildSetByPattern(RedisModuleCtx *ctx, regex_t *r, char *setName) {
RedisModule_Call(ctx, "del", "c", setName);
RedisModuleString *scursor = RedisModule_CreateStringFromLongLong(ctx, 0);
long long lcursor;
size_t affected = 0;
do {
RedisModuleCallReply *rep = RedisModule_Call(ctx, "SCAN", "s", scursor);
/* Get the current cursor. */
scursor = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(rep, 0));
RedisModule_StringToLongLong(scursor, &lcursor);
/* Filter by pattern matching. */
RedisModuleCallReply *keys = RedisModule_CallReplyArrayElement(rep, 1);
size_t nKeys = RedisModule_CallReplyLength(keys);
for (size_t i = 0; i < nKeys; i++) {
RedisModuleString *key = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(keys, i));
size_t l;
const char *s = RedisModule_StringPtrLen(key, &l);
if (!regexec(r, s, 1, NULL, 0)) {
RedisModule_Call(ctx, "SADD", "cs", setName, key);
affected++;
}
RedisModule_FreeString(ctx, key);
}
RedisModule_FreeCallReply(keys);
RedisModule_FreeCallReply(rep);
} while (lcursor);
return affected;
}
int SelectCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc < 2)
return RedisModule_WrongArity(ctx);
// Table
RedisModuleString *fromKeys;
// Process the arguments
size_t plen;
char s[1024] = "";
for (int i=1; i<argc; i++) {
if (strlen(s) > 0) strcat(s, " ");
const char *temp = RedisModule_StringPtrLen(argv[i], &plen);
if (strlen(s) + plen > 1024) {
RedisModule_ReplyWithError(ctx, "arguments are too long");
return REDISMODULE_ERR;
}
char *p = (char*)temp;
while (*p++) *p = *p == 32? 7: *p;
if (strcmp("like", temp) == 0)
strcat(s, "?");
else
strcat(s, temp);
}
int step = 0;
char stmSelect[512] = "";
char stmWhere[512] = "";
char stmOrder[512] = "";
char *token = strtok(s, " ");
while (token != NULL) {
switch(step) {
case 0:
if (strcmp("from", token) == 0)
step = -1;
else {
if (strlen(stmSelect) + strlen(token) > 512) {
RedisModule_ReplyWithError(ctx, "select arguments are too long");
return REDISMODULE_ERR;
}
strcat(stmSelect, token);
}
break;
case -1:
// parse from statement
fromKeys = RMUtil_CreateFormattedString(ctx, token);
step = 2;
break;
case 2:
if (strcmp("where", token) == 0)
step = -3;
else if (strcmp("order", token) == 0)
step = -5;
else {
RedisModule_ReplyWithError(ctx, "where or order statement is expected");
return REDISMODULE_ERR;
}
break;
case -3:
case 4:
// parse where statement
if (strcmp("order", token) == 0)
step = -5;
else {
if (strlen(stmWhere) + strlen(token) > 512) {
RedisModule_ReplyWithError(ctx, "where arguments are too long");
return REDISMODULE_ERR;
}
char *p = token;
while (*p++) *p = *p == 7? 32: *p;
strcat(stmWhere, token);
step = 4;
}
break;
case -5:
if (strcmp("by", token) == 0)
step = -6;
else {
RedisModule_ReplyWithError(ctx, "missing 'by' after order");
return REDISMODULE_ERR;
}
break;
case -6:
case 7:
// parse order statement
if (strlen(stmOrder) + strlen(token) > 512) {
RedisModule_ReplyWithError(ctx, "order arguments are too long");
return REDISMODULE_ERR;
}
if (strcmp("desc", token) == 0)
strcat(stmOrder, "-");
else
if (strcmp("asc", token) != 0)
strcat(stmOrder, token);
step = 7;
break;
}
token = strtok(NULL, " ");
}
if (step <= 0) {
RedisModule_ReplyWithError(ctx, "parse error");
return REDISMODULE_ERR;
}
Vector *vSelect = splitStringByChar(stmSelect, ",");
Vector *vWhere = splitStringByChar(stmWhere, "=");
Vector *vOrder = splitStringByChar(stmOrder, ",");
RedisModule_AutoMemory(ctx);
/* Convert key to regex */
const char *pat = RedisModule_StringPtrLen(fromKeys, &plen);
regex_t regex;
if (regexCompile(ctx, &regex, pat)) return REDISMODULE_ERR;
/* Print result in array format */
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
if (Vector_Size(vOrder) > 0) {
// temporary set name
char setName[32];
sprintf(setName, "__db_tempset_%i", rand());
char stmt[128];
RedisModuleCallReply *rep;
if (buildSetByPattern(ctx, &regex, setName) > 0) {
// Sort the fields under the key and send the resulting array to processRecords module
char *field;
Vector_Get(vOrder, 0, &field);
if (field[strlen(field)-1] == '-') {
field[strlen(field)-1] = 0;
sprintf(stmt, "*->%s", field);
rep = RedisModule_Call(ctx, "SORT", "ccccc", setName, "by", stmt, "desc", "alpha");
}
else {
sprintf(stmt, "*->%s", field);
rep = RedisModule_Call(ctx, "SORT", "cccc", setName, "by", stmt, "alpha");
}
size_t n = processRecords(ctx, rep, &regex, vSelect, vWhere);
RedisModule_FreeCallReply(rep);
// set number of output
RedisModule_ReplySetArrayLength(ctx, n);
}
else
RedisModule_ReplySetArrayLength(ctx, 0);
// Remove the temporary set before leave
RedisModule_Call(ctx, "DEL", "c", setName);
}
else {
RedisModuleString *scursor = RedisModule_CreateStringFromLongLong(ctx, 0);
long long lcursor;
size_t n = 0;
do {
RedisModuleCallReply *rep = RedisModule_Call(ctx, "SCAN", "s", scursor);
/* Get the current cursor. */
scursor = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(rep, 0));
RedisModule_StringToLongLong(scursor, &lcursor);
/* Filter by pattern matching. */
RedisModuleCallReply *rkeys = RedisModule_CallReplyArrayElement(rep, 1);
n += processRecords(ctx, rkeys, &regex, vSelect, vWhere);
RedisModule_FreeCallReply(rep);
} while (lcursor);
RedisModule_ReplySetArrayLength(ctx, n);
RedisModule_FreeString(ctx, scursor);
}
RedisModule_FreeString(ctx, fromKeys);
Vector_Free(vSelect);
Vector_Free(vWhere);
Vector_Free(vOrder);
return REDISMODULE_OK;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx) {
// Register the module
if (RedisModule_Init(ctx, "dbx", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
// Register the command
if (RedisModule_CreateCommand(ctx, "dbx.select", SelectCommand, "readonly", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}