diff --git a/src/defrag.c b/src/defrag.c index 3813ec3ac..40bba2d33 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -39,10 +39,15 @@ #ifdef HAVE_DEFRAG typedef struct defragCtx { - redisDb *db; + void *privdata; int slot; } defragCtx; +typedef struct defragPubSubCtx { + kvstore *pubsub_channels; + dict *(*clientPubSubChannels)(client*); +} defragPubSubCtx; + /* this method was added to jemalloc in order to help us understand which * pointers are worthwhile moving and which aren't */ int je_get_defrag_hint(void* ptr); @@ -86,14 +91,16 @@ sds activeDefragSds(sds sdsptr) { return NULL; } -/* Defrag helper for robj and/or string objects +/* Defrag helper for robj and/or string objects with expected refcount. * - * returns NULL in case the allocation wasn't moved. - * when it returns a non-null value, the old pointer was already released - * and should NOT be accessed. */ -robj *activeDefragStringOb(robj* ob) { + * Like activeDefragStringOb, but it requires the caller to pass in the expected + * reference count. In some cases, the caller needs to update a robj whose + * reference count is not 1, in these cases, the caller must explicitly pass + * in the reference count, otherwise defragmentation will not be performed. + * Note that the caller is responsible for updating any other references to the robj. */ +robj *activeDefragStringObEx(robj* ob, int expected_refcount) { robj *ret = NULL; - if (ob->refcount!=1) + if (ob->refcount!=expected_refcount) return NULL; /* try to defrag robj (only if not an EMBSTR type (handled below). */ @@ -124,6 +131,15 @@ robj *activeDefragStringOb(robj* ob) { return ret; } +/* Defrag helper for robj and/or string objects + * + * returns NULL in case the allocation wasn't moved. + * when it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +robj *activeDefragStringOb(robj* ob) { + return activeDefragStringObEx(ob, 1); +} + /* Defrag helper for lua scripts * * returns NULL in case the allocation wasn't moved. @@ -145,12 +161,20 @@ luaScript *activeDefragLuaScript(luaScript *script) { } /* Defrag helper for dict main allocations (dict struct, and hash tables). - * receives a pointer to the dict* and implicitly updates it when the dict - * struct itself was moved. */ -void dictDefragTables(dict* d) { + * Receives a pointer to the dict* and return a new dict* when the dict + * struct itself was moved. + * + * Returns NULL in case the allocation wasn't moved. + * When it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +dict *dictDefragTables(dict *d) { + dict *ret = NULL; dictEntry **newtable; + /* handle the dict struct */ + if ((ret = activeDefragAlloc(d))) + d = ret; /* handle the first hash table */ - if (!d->ht_table[0]) return; /* created but unused */ + if (!d->ht_table[0]) return ret; /* created but unused */ newtable = activeDefragAlloc(d->ht_table[0]); if (newtable) d->ht_table[0] = newtable; @@ -160,6 +184,7 @@ void dictDefragTables(dict* d) { if (newtable) d->ht_table[1] = newtable; } + return ret; } /* Internal function used by zslDefrag */ @@ -460,11 +485,9 @@ void defragZsetSkiplist(redisDb *db, dictEntry *kde) { } dictReleaseIterator(di); } - /* handle the dict struct */ - if ((newdict = activeDefragAlloc(zs->dict))) + /* defrag the dict struct and tables */ + if ((newdict = dictDefragTables(zs->dict))) zs->dict = newdict; - /* defrag the dict tables */ - dictDefragTables(zs->dict); } void defragHash(redisDb *db, dictEntry *kde) { @@ -476,11 +499,9 @@ void defragHash(redisDb *db, dictEntry *kde) { defragLater(db, kde); else activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS); - /* handle the dict struct */ - if ((newd = activeDefragAlloc(ob->ptr))) + /* defrag the dict struct and tables */ + if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd; - /* defrag the dict tables */ - dictDefragTables(ob->ptr); } void defragSet(redisDb *db, dictEntry *kde) { @@ -492,11 +513,9 @@ void defragSet(redisDb *db, dictEntry *kde) { defragLater(db, kde); else activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL); - /* handle the dict struct */ - if ((newd = activeDefragAlloc(ob->ptr))) + /* defrag the dict struct and tables */ + if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd; - /* defrag the dict tables */ - dictDefragTables(ob->ptr); } /* Defrag callback for radix tree iterator, called for each node, @@ -677,7 +696,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { robj *newob, *ob; unsigned char *newzl; sds newsds; - redisDb *db = ctx->db; + redisDb *db = ctx->privdata; int slot = ctx->slot; /* Try to defrag the key name. */ newsds = activeDefragSds(keysds); @@ -761,18 +780,6 @@ void defragScanCallback(void *privdata, const dictEntry *de) { server.stat_active_defrag_scanned++; } -static void defragKvstoreDefragScanCallBack(dict **d) { - dict *newd; - /* handle the dict struct */ - if ((newd = activeDefragAlloc(*d))) - *d = newd; - dictDefragTables(*d); -} - -void activeDefragKvstore(kvstore *kvs) { - kvstoreDictLUTDefrag(kvs, defragKvstoreDefragScanCallBack); -} - /* Utility function to get the fragmentation ratio from jemalloc. * It is critical to do that by comparing only heap maps that belong to * jemalloc, and skip ones the jemalloc keeps as spare. Since we use this @@ -798,6 +805,41 @@ float getAllocatorFragmentation(size_t *out_frag_bytes) { return frag_pct; } +/* Defrag scan callback for the pubsub dictionary. */ +void defragPubsubScanCallback(void *privdata, const dictEntry *de) { + defragCtx *ctx = privdata; + defragPubSubCtx *pubsub_ctx = ctx->privdata; + kvstore *pubsub_channels = pubsub_ctx->pubsub_channels; + robj *newchannel, *channel = dictGetKey(de); + dict *newclients, *clients = dictGetVal(de); + + /* Try to defrag the channel name. */ + serverAssert(channel->refcount == (int)dictSize(clients) + 1); + newchannel = activeDefragStringObEx(channel, dictSize(clients) + 1); + if (newchannel) { + kvstoreDictSetKey(pubsub_channels, ctx->slot, (dictEntry*)de, newchannel); + + /* The channel name is shared by the client's pubsub(shard) and server's + * pubsub(shard), after defraging the channel name, we need to update + * the reference in the clients' dictionary. */ + dictIterator *di = dictGetIterator(clients); + dictEntry *clientde; + while((clientde = dictNext(di)) != NULL) { + client *c = dictGetKey(clientde); + dictEntry *pubsub_channel = dictFind(pubsub_ctx->clientPubSubChannels(c), newchannel); + serverAssert(pubsub_channel); + dictSetKey(pubsub_ctx->clientPubSubChannels(c), pubsub_channel, newchannel); + } + dictReleaseIterator(di); + } + + /* Try to defrag the dictionary of clients that is stored as the value part. */ + if ((newclients = dictDefragTables(clients))) + kvstoreDictSetVal(pubsub_channels, ctx->slot, (dictEntry*)de, newclients); + + server.stat_active_defrag_scanned++; +} + /* We may need to defrag other globals, one small allocation can hold a full allocator run. * so although small, it is still important to defrag these */ void defragOtherGlobals(void) { @@ -807,6 +849,8 @@ void defragOtherGlobals(void) { * that remain static for a long time */ activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); moduleDefragGlobals(); + kvstoreDictLUTDefrag(server.pubsub_channels, dictDefragTables); + kvstoreDictLUTDefrag(server.pubsubshard_channels, dictDefragTables); } /* returns 0 more work may or may not be needed (see non-zero cursor), @@ -944,12 +988,11 @@ void computeDefragCycles(void) { * This works in a similar way to activeExpireCycle, in the sense that * we do incremental work across calls. */ void activeDefragCycle(void) { - static defragCtx ctx; static int slot = -1; static int current_db = -1; static int defrag_later_item_in_progress = 0; - static unsigned long cursor = 0; - static unsigned long expires_cursor = 0; + static int defrag_stage = 0; + static unsigned long defrag_cursor = 0; static redisDb *db = NULL; static long long start_scan, start_stat; unsigned int iterations = 0; @@ -957,6 +1000,7 @@ void activeDefragCycle(void) { unsigned long long prev_scanned = server.stat_active_defrag_scanned; long long start, timelimit, endtime; mstime_t latency; + int all_stages_finished = 0; int quit = 0; if (!server.active_defrag_enabled) { @@ -969,8 +1013,8 @@ void activeDefragCycle(void) { defrag_later_current_key = NULL; defrag_later_cursor = 0; current_db = -1; - cursor = 0; - expires_cursor = 0; + defrag_stage = 0; + defrag_cursor = 0; slot = -1; defrag_later_item_in_progress = 0; db = NULL; @@ -1008,7 +1052,7 @@ void activeDefragCycle(void) { dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc}; do { /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!cursor && !expires_cursor && (slot < 0)) { + if (!defrag_stage && !defrag_cursor && (slot < 0)) { /* finish any leftovers from previous db before moving to the next one */ if (db && defragLaterStep(db, slot, endtime)) { quit = 1; /* time is up, we didn't finish all the work */ @@ -1029,12 +1073,11 @@ void activeDefragCycle(void) { start_scan = now; current_db = -1; - cursor = 0; - expires_cursor = 0; + defrag_stage = 0; + defrag_cursor = 0; slot = -1; defrag_later_item_in_progress = 0; db = NULL; - memset(&ctx, -1, sizeof(ctx)); server.active_defrag_running = 0; computeDefragCycles(); /* if another scan is needed, start it right away */ @@ -1049,16 +1092,33 @@ void activeDefragCycle(void) { } db = &server.db[current_db]; - activeDefragKvstore(db->keys); - activeDefragKvstore(db->expires); - cursor = 0; - expires_cursor = 0; - slot = kvstoreFindDictIndexByKeyIndex(db->keys, 1); + kvstoreDictLUTDefrag(db->keys, dictDefragTables); + kvstoreDictLUTDefrag(db->expires, dictDefragTables); + defrag_stage = 0; + defrag_cursor = 0; + slot = -1; defrag_later_item_in_progress = 0; - ctx.db = db; - ctx.slot = slot; } + + /* This array of structures holds the parameters for all defragmentation stages. */ + typedef struct defragStage { + kvstore *kvs; + dictScanFunction *scanfn; + void *privdata; + } defragStage; + defragStage defrag_stages[] = { + {db->keys, defragScanCallback, db}, + {db->expires, scanCallbackCountScanned, NULL}, + {server.pubsub_channels, defragPubsubScanCallback, + &(defragPubSubCtx){server.pubsub_channels, getClientPubSubChannels}}, + {server.pubsubshard_channels, defragPubsubScanCallback, + &(defragPubSubCtx){server.pubsubshard_channels, getClientPubSubShardChannels}}, + }; do { + int num_stages = sizeof(defrag_stages) / sizeof(defrag_stages[0]); + serverAssert(defrag_stage < num_stages); + defragStage *current_stage = &defrag_stages[defrag_stage]; + /* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */ if (defragLaterStep(db, slot, endtime)) { quit = 1; /* time is up, we didn't finish all the work */ @@ -1066,26 +1126,31 @@ void activeDefragCycle(void) { } if (!defrag_later_item_in_progress) { - /* Scan the keyspace dict unless we're scanning the expire dict. */ - if (!expires_cursor) - cursor = kvstoreDictScanDefrag(db->keys, slot, cursor, - defragScanCallback, - &defragfns, &ctx); - /* When done scanning the keyspace dict, we scan the expire dict. */ - if (!cursor) - expires_cursor = kvstoreDictScanDefrag(db->expires, slot, expires_cursor, - scanCallbackCountScanned, - &defragfns, NULL); + /* Continue defragmentation from the previous stage. + * If slot is -1, it means this stage starts from the first non-empty slot. */ + if (slot == -1) slot = kvstoreGetFirstNonEmptyDictIndex(current_stage->kvs); + defrag_cursor = kvstoreDictScanDefrag(current_stage->kvs, slot, defrag_cursor, + current_stage->scanfn, &defragfns, &(defragCtx){current_stage->privdata, slot}); } - if (!(cursor || expires_cursor)) { + + if (!defrag_cursor) { /* Move to the next slot only if regular and large item scanning has been completed. */ if (listLength(db->defrag_later) > 0) { defrag_later_item_in_progress = 1; continue; } - slot = kvstoreGetNextNonEmptyDictIndex(db->keys, slot); + + /* Move to the next slot in the current stage. If we've reached the end, move to the next stage. */ + if ((slot = kvstoreGetNextNonEmptyDictIndex(current_stage->kvs, slot)) == -1) + defrag_stage++; defrag_later_item_in_progress = 0; - ctx.slot = slot; + } + + /* Check if all defragmentation stages have been processed. + * If so, mark as finished and reset the stage counter to move on to next database. */ + if (defrag_stage == num_stages) { + all_stages_finished = 1; + defrag_stage = 0; } /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys @@ -1093,12 +1158,13 @@ void activeDefragCycle(void) { * check if we reached the time limit. * But regardless, don't start a new db in this loop, this is because after * the last db we call defragOtherGlobals, which must be done in one cycle */ - if ((!(cursor || expires_cursor) && slot == -1) || + if (all_stages_finished || ++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64) { - if (!cursor || ustime() > endtime) { + /* Quit if all stages were finished or timeout. */ + if (all_stages_finished || ustime() > endtime) { quit = 1; break; } @@ -1106,7 +1172,7 @@ void activeDefragCycle(void) { prev_defragged = server.stat_active_defrag_hits; prev_scanned = server.stat_active_defrag_scanned; } - } while(((cursor || expires_cursor) || slot > 0) && !quit); + } while(!all_stages_finished && !quit); } while(!quit); latencyEndMonitor(latency); diff --git a/src/kvstore.c b/src/kvstore.c index 624af75be..5517594c6 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -525,8 +525,17 @@ int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target) { return result; } +/* Wrapper for kvstoreFindDictIndexByKeyIndex to get the first non-empty dict index in the kvstore. */ +int kvstoreGetFirstNonEmptyDictIndex(kvstore *kvs) { + return kvstoreFindDictIndexByKeyIndex(kvs, 1); +} + /* Returns next non-empty dict index strictly after given one, or -1 if provided didx is the last one. */ int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx) { + if (kvs->num_dicts == 1) { + assert(didx == 0); + return -1; + } unsigned long long next_key = cumulativeKeyCountRead(kvs, didx) + 1; return next_key <= kvstoreSize(kvs) ? kvstoreFindDictIndexByKeyIndex(kvs, next_key) : -1; } @@ -550,7 +559,7 @@ kvstoreIterator *kvstoreIteratorInit(kvstore *kvs) { kvstoreIterator *kvs_it = zmalloc(sizeof(*kvs_it)); kvs_it->kvs = kvs; kvs_it->didx = -1; - kvs_it->next_didx = kvstoreFindDictIndexByKeyIndex(kvs_it->kvs, 1); /* Finds first non-empty dict index. */ + kvs_it->next_didx = kvstoreGetFirstNonEmptyDictIndex(kvs_it->kvs); /* Finds first non-empty dict index. */ dictInitSafeIterator(&kvs_it->di, NULL); return kvs_it; } @@ -752,10 +761,11 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dic * that callback can reallocate. */ void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) { for (int didx = 0; didx < kvs->num_dicts; didx++) { - dict **d = kvstoreGetDictRef(kvs, didx); + dict **d = kvstoreGetDictRef(kvs, didx), *newd; if (!*d) continue; - defragfn(d); + if ((newd = defragfn(*d))) + *d = newd; } } diff --git a/src/kvstore.h b/src/kvstore.h index fdd06ba92..56a486199 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -28,6 +28,7 @@ int kvstoreGetFairRandomDictIndex(kvstore *kvs); void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full); int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target); +int kvstoreGetFirstNonEmptyDictIndex(kvstore *kvs); int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx); int kvstoreNumNonEmptyDicts(kvstore *kvs); int kvstoreNumAllocatedDicts(kvstore *kvs); @@ -60,7 +61,7 @@ dictEntry *kvstoreDictFindEntryByPtrAndHash(kvstore *kvs, int didx, const void * unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, unsigned int count); int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size); unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata); -typedef void (kvstoreDictLUTDefragFunction)(dict **d); +typedef dict *(kvstoreDictLUTDefragFunction)(dict *d); void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn); void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key); dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key); diff --git a/src/pubsub.c b/src/pubsub.c index 8d8a1555f..b6db719b6 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -263,9 +263,9 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { unsigned int slot = 0; /* Add the channel to the client -> channels hash table */ - if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) { + void *position = dictFindPositionForInsert(type.clientPubSubChannels(c),channel,NULL); + if (position) { /* Not yet subscribed to this channel */ retval = 1; - incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ if (server.cluster_enabled && type.shard) { slot = getKeySlot(channel->ptr); @@ -275,6 +275,7 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { if (existing) { clients = dictGetVal(existing); + channel = dictGetKey(existing); } else { clients = dictCreate(&clientDictType); kvstoreDictSetVal(*type.serverPubSubChannels, slot, de, clients); @@ -282,6 +283,8 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { } serverAssert(dictAdd(clients, c, NULL) != DICT_ERR); + serverAssert(dictInsertAtPosition(type.clientPubSubChannels(c), channel, position)); + incrRefCount(channel); } /* Notify the client */ addReplyPubsubSubscribed(c,channel,type); diff --git a/src/server.h b/src/server.h index de490f702..24c159dae 100644 --- a/src/server.h +++ b/src/server.h @@ -3183,6 +3183,8 @@ int serverPubsubShardSubscriptionCount(void); size_t pubsubMemOverhead(client *c); void unmarkClientAsPubSub(client *c); int pubsubTotalSubscriptions(void); +dict *getClientPubSubChannels(client *c); +dict *getClientPubSubShardChannels(client *c); /* Keyspace events notification */ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid); diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index ef953fe26..e6ae34e7d 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -404,6 +404,105 @@ run_solo {defrag} { r save ;# saving an rdb iterates over all the data / pointers } {OK} + test "Active defrag pubsub: $type" { + r flushdb + r config resetstat + r config set hz 100 + r config set activedefrag no + r config set active-defrag-threshold-lower 5 + r config set active-defrag-cycle-min 65 + r config set active-defrag-cycle-max 75 + r config set active-defrag-ignore-bytes 1500kb + r config set maxmemory 0 + + # Populate memory with interleaving pubsub-key pattern of same size + set n 50000 + set dummy_channel "[string repeat x 400]" + set rd [redis_deferring_client] + set rd_pubsub [redis_deferring_client] + for {set j 0} {$j < $n} {incr j} { + set channel_name "$dummy_channel[format "%06d" $j]" + $rd_pubsub subscribe $channel_name + $rd_pubsub read ; # Discard subscribe replies + $rd_pubsub ssubscribe $channel_name + $rd_pubsub read ; # Discard ssubscribe replies + $rd set k$j $channel_name + $rd read ; # Discard set replies + } + + after 120 ;# serverCron only updates the info once in 100ms + if {$::verbose} { + puts "used [s allocator_allocated]" + puts "rss [s allocator_active]" + puts "frag [s allocator_frag_ratio]" + puts "frag_bytes [s allocator_frag_bytes]" + } + assert_lessthan [s allocator_frag_ratio] 1.05 + + # Delete all the keys to create fragmentation + for {set j 0} {$j < $n} {incr j} { $rd del k$j } + for {set j 0} {$j < $n} {incr j} { $rd read } ; # Discard del replies + $rd close + after 120 ;# serverCron only updates the info once in 100ms + if {$::verbose} { + puts "used [s allocator_allocated]" + puts "rss [s allocator_active]" + puts "frag [s allocator_frag_ratio]" + puts "frag_bytes [s allocator_frag_bytes]" + } + assert_morethan [s allocator_frag_ratio] 1.35 + + catch {r config set activedefrag yes} e + if {[r config get activedefrag] eq "activedefrag yes"} { + + # wait for the active defrag to start working (decision once a second) + wait_for_condition 50 100 { + [s total_active_defrag_time] ne 0 + } else { + after 120 ;# serverCron only updates the info once in 100ms + puts [r info memory] + puts [r info stats] + puts [r memory malloc-stats] + fail "defrag not started." + } + + # wait for the active defrag to stop working + wait_for_condition 500 100 { + [s active_defrag_running] eq 0 + } else { + after 120 ;# serverCron only updates the info once in 100ms + puts [r info memory] + puts [r memory malloc-stats] + fail "defrag didn't stop." + } + + # test the fragmentation is lower + after 120 ;# serverCron only updates the info once in 100ms + if {$::verbose} { + puts "used [s allocator_allocated]" + puts "rss [s allocator_active]" + puts "frag [s allocator_frag_ratio]" + puts "frag_bytes [s allocator_frag_bytes]" + } + assert_lessthan_equal [s allocator_frag_ratio] 1.05 + } + + # Publishes some message to all the pubsub clients to make sure that + # we didn't break the data structure. + for {set j 0} {$j < $n} {incr j} { + set channel "$dummy_channel[format "%06d" $j]" + r publish $channel "hello" + assert_equal "message $channel hello" [$rd_pubsub read] + $rd_pubsub unsubscribe $channel + $rd_pubsub read + r spublish $channel "hello" + assert_equal "smessage $channel hello" [$rd_pubsub read] + $rd_pubsub sunsubscribe $channel + $rd_pubsub read + } + $rd_pubsub close + } + if {$type eq "standalone"} { ;# skip in cluster mode test "Active defrag big list: $type" { r flushdb