Initial scan implementation (not tested)
Former-commit-id: 0f3911d56e6878d91b6e3cacc7d637934ffe6099
This commit is contained in:
parent
e81a4a159a
commit
29e4973d7b
58
src/db.cpp
58
src/db.cpp
@ -961,10 +961,10 @@ int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
|
|||||||
*
|
*
|
||||||
* In the case of a Hash object the function returns both the field and value
|
* In the case of a Hash object the function returns both the field and value
|
||||||
* of every element on the Hash. */
|
* of every element on the Hash. */
|
||||||
|
void scanFilterAndReply(client *c, list *keys, sds pat, sds type, int use_pattern, robj_roptr o, unsigned long cursor);
|
||||||
void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
||||||
int i, j;
|
int i, j;
|
||||||
list *keys = listCreate();
|
list *keys = listCreate();
|
||||||
listNode *node, *nextnode;
|
|
||||||
long count = 10;
|
long count = 10;
|
||||||
sds pat = NULL;
|
sds pat = NULL;
|
||||||
sds type = NULL;
|
sds type = NULL;
|
||||||
@ -1014,6 +1014,46 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (o == nullptr && count > 100)
|
||||||
|
{
|
||||||
|
// Do an async version
|
||||||
|
const redisDbPersistentDataSnapshot *snapshot = nullptr;
|
||||||
|
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
|
||||||
|
snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */);
|
||||||
|
if (snapshot != nullptr)
|
||||||
|
{
|
||||||
|
aeEventLoop *el = serverTL->el;
|
||||||
|
blockClient(c, BLOCKED_ASYNC);
|
||||||
|
redisDb *db = c->db;
|
||||||
|
sds patCopy = pat ? sdsdup(pat) : nullptr;
|
||||||
|
sds typeCopy = type ? sdsdup(type) : nullptr;
|
||||||
|
g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{
|
||||||
|
auto cursorResult = snapshot->scan_threadsafe(cursor, count, keys, nullptr);
|
||||||
|
|
||||||
|
aePostFunction(el, [c, snapshot, keys, db, cursorResult, patCopy, typeCopy, use_pattern]{
|
||||||
|
aeReleaseLock(); // we need to lock with coordination of the client
|
||||||
|
|
||||||
|
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
||||||
|
AeLocker locker;
|
||||||
|
locker.arm(c);
|
||||||
|
|
||||||
|
unblockClient(c);
|
||||||
|
scanFilterAndReply(c, keys, patCopy, typeCopy, use_pattern, nullptr, cursorResult);
|
||||||
|
if (patCopy != nullptr)
|
||||||
|
sdsfree(patCopy);
|
||||||
|
if (typeCopy != nullptr)
|
||||||
|
sdsfree(typeCopy);
|
||||||
|
|
||||||
|
db->endSnapshot(snapshot);
|
||||||
|
listSetFreeMethod(keys,decrRefCountVoid);
|
||||||
|
listRelease(keys);
|
||||||
|
aeAcquireLock();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Step 2: Iterate the collection.
|
/* Step 2: Iterate the collection.
|
||||||
*
|
*
|
||||||
* Note that if the object is encoded with a ziplist, intset, or any other
|
* Note that if the object is encoded with a ziplist, intset, or any other
|
||||||
@ -1080,6 +1120,18 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
|||||||
serverPanic("Not handled encoding in SCAN.");
|
serverPanic("Not handled encoding in SCAN.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
scanFilterAndReply(c, keys, pat, type, use_pattern, o, cursor);
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
listSetFreeMethod(keys,decrRefCountVoid);
|
||||||
|
listRelease(keys);
|
||||||
|
}
|
||||||
|
|
||||||
|
void scanFilterAndReply(client *c, list *keys, sds pat, sds type, int use_pattern, robj_roptr o, unsigned long cursor)
|
||||||
|
{
|
||||||
|
listNode *node, *nextnode;
|
||||||
|
int patlen = (pat != nullptr) ? sdslen(pat) : 0;
|
||||||
|
|
||||||
/* Step 3: Filter elements. */
|
/* Step 3: Filter elements. */
|
||||||
node = listFirst(keys);
|
node = listFirst(keys);
|
||||||
while (node) {
|
while (node) {
|
||||||
@ -1144,10 +1196,6 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
|||||||
decrRefCount(kobj);
|
decrRefCount(kobj);
|
||||||
listDelNode(keys, node);
|
listDelNode(keys, node);
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup:
|
|
||||||
listSetFreeMethod(keys,decrRefCountVoid);
|
|
||||||
listRelease(keys);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* The SCAN command completely relies on scanGenericCommand. */
|
/* The SCAN command completely relies on scanGenericCommand. */
|
||||||
|
@ -1395,6 +1395,8 @@ public:
|
|||||||
bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; }
|
bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; }
|
||||||
|
|
||||||
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly = false, bool fCacheOnly = false) const;
|
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly = false, bool fCacheOnly = false) const;
|
||||||
|
unsigned long scan_threadsafe(unsigned long iterator, long count, list *keys, struct scan_callback_data *pdata = nullptr) const;
|
||||||
|
|
||||||
using redisDbPersistentData::createSnapshot;
|
using redisDbPersistentData::createSnapshot;
|
||||||
using redisDbPersistentData::endSnapshot;
|
using redisDbPersistentData::endSnapshot;
|
||||||
using redisDbPersistentData::endSnapshotAsync;
|
using redisDbPersistentData::endSnapshotAsync;
|
||||||
|
@ -388,6 +388,61 @@ dict_iter redisDbPersistentData::find_cached_threadsafe(const char *key) const
|
|||||||
return dict_iter(de);
|
return dict_iter(de);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct scan_callback_data
|
||||||
|
{
|
||||||
|
dict *dictTombstone;
|
||||||
|
list *keys;
|
||||||
|
};
|
||||||
|
void snapshot_scan_callback(void *privdata, const dictEntry *de)
|
||||||
|
{
|
||||||
|
scan_callback_data *data = (scan_callback_data*)privdata;
|
||||||
|
if (data->dictTombstone != nullptr && dictFind(data->dictTombstone, dictGetKey(de)) != nullptr)
|
||||||
|
return;
|
||||||
|
|
||||||
|
sds sdskey = (sds)dictGetKey(de);
|
||||||
|
listAddNodeHead(data->keys, createStringObject(sdskey, sdslen(sdskey)));
|
||||||
|
}
|
||||||
|
unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, list *keys, scan_callback_data *pdata) const
|
||||||
|
{
|
||||||
|
unsigned long iteratorReturn = 0;
|
||||||
|
|
||||||
|
scan_callback_data dataT;
|
||||||
|
if (pdata == nullptr)
|
||||||
|
{
|
||||||
|
dataT.dictTombstone = m_pdictTombstone;
|
||||||
|
dataT.keys = keys;
|
||||||
|
pdata = &dataT;
|
||||||
|
}
|
||||||
|
|
||||||
|
const redisDbPersistentDataSnapshot *psnapshot;
|
||||||
|
__atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
|
||||||
|
if (psnapshot != nullptr)
|
||||||
|
{
|
||||||
|
// Always process the snapshot first as we assume its bigger than we are
|
||||||
|
iteratorReturn = psnapshot->scan_threadsafe(iterator, count, keys, pdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (psnapshot == nullptr)
|
||||||
|
{
|
||||||
|
long maxiterations = count * 10; // allow more iterations than keys for sparse tables
|
||||||
|
iteratorReturn = iterator;
|
||||||
|
do {
|
||||||
|
iteratorReturn = dictScan(m_pdict, iteratorReturn, snapshot_scan_callback, NULL, pdata);
|
||||||
|
} while (iteratorReturn &&
|
||||||
|
maxiterations-- &&
|
||||||
|
listLength(keys) < (unsigned long)count);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Just catch up with our snapshot
|
||||||
|
do
|
||||||
|
{
|
||||||
|
iterator = dictScan(m_pdict, iterator, snapshot_scan_callback, nullptr, pdata);
|
||||||
|
} while (iterator != 0 && iterator < iteratorReturn);
|
||||||
|
}
|
||||||
|
return iteratorReturn;
|
||||||
|
}
|
||||||
|
|
||||||
bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly, bool fCacheOnly) const
|
bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly, bool fCacheOnly) const
|
||||||
{
|
{
|
||||||
// Take the size so we can ensure we visited every element exactly once
|
// Take the size so we can ensure we visited every element exactly once
|
||||||
|
Loading…
x
Reference in New Issue
Block a user