Fix issue where SCAN misses elements while snapshot is in flight
Former-commit-id: ce005d748ebf0e116d674a96f74d698d17394010
This commit is contained in:
parent
1fef6c42b7
commit
3692771457
43
src/db.cpp
43
src/db.cpp
@ -1032,7 +1032,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (o == nullptr && count > 100)
|
if (o == nullptr && count >= 100)
|
||||||
{
|
{
|
||||||
// Do an async version
|
// Do an async version
|
||||||
const redisDbPersistentDataSnapshot *snapshot = nullptr;
|
const redisDbPersistentDataSnapshot *snapshot = nullptr;
|
||||||
@ -1046,7 +1046,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
|||||||
sds patCopy = pat ? sdsdup(pat) : nullptr;
|
sds patCopy = pat ? sdsdup(pat) : nullptr;
|
||||||
sds typeCopy = type ? sdsdup(type) : nullptr;
|
sds typeCopy = type ? sdsdup(type) : nullptr;
|
||||||
g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{
|
g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{
|
||||||
auto cursorResult = snapshot->scan_threadsafe(cursor, count, keys, nullptr);
|
auto cursorResult = snapshot->scan_threadsafe(cursor, count, keys);
|
||||||
if (use_pattern) {
|
if (use_pattern) {
|
||||||
listNode *ln = listFirst(keys);
|
listNode *ln = listFirst(keys);
|
||||||
int patlen = sdslen(patCopy);
|
int patlen = sdslen(patCopy);
|
||||||
@ -1109,23 +1109,30 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (ht) {
|
if (ht) {
|
||||||
void *privdata[2];
|
if (ht == c->db->dictUnsafeKeyOnly())
|
||||||
/* We set the max number of iterations to ten times the specified
|
{
|
||||||
* COUNT, so if the hash table is in a pathological state (very
|
cursor = c->db->scan_threadsafe(cursor, count, keys);
|
||||||
* sparsely populated) we avoid to block too much time at the cost
|
}
|
||||||
* of returning no or very few elements. */
|
else
|
||||||
long maxiterations = count*10;
|
{
|
||||||
|
void *privdata[2];
|
||||||
|
/* We set the max number of iterations to ten times the specified
|
||||||
|
* COUNT, so if the hash table is in a pathological state (very
|
||||||
|
* sparsely populated) we avoid to block too much time at the cost
|
||||||
|
* of returning no or very few elements. */
|
||||||
|
long maxiterations = count*10;
|
||||||
|
|
||||||
/* We pass two pointers to the callback: the list to which it will
|
/* We pass two pointers to the callback: the list to which it will
|
||||||
* add new elements, and the object containing the dictionary so that
|
* add new elements, and the object containing the dictionary so that
|
||||||
* it is possible to fetch more data in a type-dependent way. */
|
* it is possible to fetch more data in a type-dependent way. */
|
||||||
privdata[0] = keys;
|
privdata[0] = keys;
|
||||||
privdata[1] = o.unsafe_robjcast();
|
privdata[1] = o.unsafe_robjcast();
|
||||||
do {
|
do {
|
||||||
cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
|
cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
|
||||||
} while (cursor &&
|
} while (cursor &&
|
||||||
maxiterations-- &&
|
maxiterations-- &&
|
||||||
listLength(keys) < (unsigned long)count);
|
listLength(keys) < (unsigned long)count);
|
||||||
|
}
|
||||||
} else if (o->type == OBJ_SET) {
|
} else if (o->type == OBJ_SET) {
|
||||||
int pos = 0;
|
int pos = 0;
|
||||||
int64_t ll;
|
int64_t ll;
|
||||||
|
@ -1395,7 +1395,7 @@ public:
|
|||||||
bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; }
|
bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; }
|
||||||
|
|
||||||
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly = false, bool fCacheOnly = false) const;
|
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly = false, bool fCacheOnly = false) const;
|
||||||
unsigned long scan_threadsafe(unsigned long iterator, long count, list *keys, struct scan_callback_data *pdata = nullptr) const;
|
unsigned long scan_threadsafe(unsigned long iterator, long count, list *keys) const;
|
||||||
|
|
||||||
using redisDbPersistentData::createSnapshot;
|
using redisDbPersistentData::createSnapshot;
|
||||||
using redisDbPersistentData::endSnapshot;
|
using redisDbPersistentData::endSnapshot;
|
||||||
|
@ -20,6 +20,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
|||||||
|
|
||||||
if (m_spdbSnapshotHOLDER != nullptr)
|
if (m_spdbSnapshotHOLDER != nullptr)
|
||||||
{
|
{
|
||||||
|
serverLog(LL_DEBUG, "Attempting reuse of snapshot, client tstamp: %llu snapshot tstamp: %llu", mvccCheckpoint, m_spdbSnapshotHOLDER->m_mvccCheckpoint);
|
||||||
// If possible reuse an existing snapshot (we want to minimize nesting)
|
// If possible reuse an existing snapshot (we want to minimize nesting)
|
||||||
if (mvccCheckpoint <= m_spdbSnapshotHOLDER->m_mvccCheckpoint)
|
if (mvccCheckpoint <= m_spdbSnapshotHOLDER->m_mvccCheckpoint)
|
||||||
{
|
{
|
||||||
@ -59,8 +60,11 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
|||||||
}
|
}
|
||||||
|
|
||||||
// See if we have too many levels and can bail out of this to reduce load
|
// See if we have too many levels and can bail out of this to reduce load
|
||||||
if (fOptional && (levels >= 4))
|
if (fOptional && (levels >= 6))
|
||||||
|
{
|
||||||
|
serverLog(LL_DEBUG, "Snapshot nesting too deep, abondoning");
|
||||||
return nullptr;
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot());
|
auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot());
|
||||||
|
|
||||||
@ -402,44 +406,38 @@ void snapshot_scan_callback(void *privdata, const dictEntry *de)
|
|||||||
sds sdskey = (sds)dictGetKey(de);
|
sds sdskey = (sds)dictGetKey(de);
|
||||||
listAddNodeHead(data->keys, createStringObject(sdskey, sdslen(sdskey)));
|
listAddNodeHead(data->keys, createStringObject(sdskey, sdslen(sdskey)));
|
||||||
}
|
}
|
||||||
unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, list *keys, scan_callback_data *pdata) const
|
unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, list *keys) const
|
||||||
{
|
{
|
||||||
unsigned long iteratorReturn = 0;
|
unsigned long iteratorReturn = 0;
|
||||||
|
|
||||||
scan_callback_data dataT;
|
scan_callback_data data;
|
||||||
if (pdata == nullptr)
|
data.dictTombstone = m_pdictTombstone;
|
||||||
{
|
data.keys = keys;
|
||||||
dataT.dictTombstone = m_pdictTombstone;
|
|
||||||
dataT.keys = keys;
|
|
||||||
pdata = &dataT;
|
|
||||||
}
|
|
||||||
|
|
||||||
const redisDbPersistentDataSnapshot *psnapshot;
|
const redisDbPersistentDataSnapshot *psnapshot;
|
||||||
__atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
|
__atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
|
||||||
if (psnapshot != nullptr)
|
if (psnapshot != nullptr)
|
||||||
{
|
{
|
||||||
// Always process the snapshot first as we assume its bigger than we are
|
// Always process the snapshot first as we assume its bigger than we are
|
||||||
iteratorReturn = psnapshot->scan_threadsafe(iterator, count, keys, pdata);
|
iteratorReturn = psnapshot->scan_threadsafe(iterator, count, keys);
|
||||||
}
|
|
||||||
|
|
||||||
if (psnapshot == nullptr)
|
// Just catch up with our snapshot
|
||||||
|
do
|
||||||
|
{
|
||||||
|
iterator = dictScan(m_pdict, iterator, snapshot_scan_callback, nullptr, &data);
|
||||||
|
} while (iterator != 0 && (iterator < iteratorReturn || iteratorReturn == 0));
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
long maxiterations = count * 10; // allow more iterations than keys for sparse tables
|
long maxiterations = count * 10; // allow more iterations than keys for sparse tables
|
||||||
iteratorReturn = iterator;
|
iteratorReturn = iterator;
|
||||||
do {
|
do {
|
||||||
iteratorReturn = dictScan(m_pdict, iteratorReturn, snapshot_scan_callback, NULL, pdata);
|
iteratorReturn = dictScan(m_pdict, iteratorReturn, snapshot_scan_callback, NULL, &data);
|
||||||
} while (iteratorReturn &&
|
} while (iteratorReturn &&
|
||||||
maxiterations-- &&
|
maxiterations-- &&
|
||||||
listLength(keys) < (unsigned long)count);
|
listLength(keys) < (unsigned long)count);
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
// Just catch up with our snapshot
|
|
||||||
do
|
|
||||||
{
|
|
||||||
iterator = dictScan(m_pdict, iterator, snapshot_scan_callback, nullptr, pdata);
|
|
||||||
} while (iterator != 0 && (iterator < iteratorReturn || iteratorReturn == 0));
|
|
||||||
}
|
|
||||||
return iteratorReturn;
|
return iteratorReturn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user