parent
10ccdf88e2
commit
70cad1901a
@ -8,6 +8,7 @@ set(EXAMPLES
|
||||
condense
|
||||
jsonx
|
||||
messagereader
|
||||
parsebyparts
|
||||
pretty
|
||||
prettyauto
|
||||
schemavalidator
|
||||
|
164
example/parsebyparts/parsebyparts.cpp
Normal file
164
example/parsebyparts/parsebyparts.cpp
Normal file
@ -0,0 +1,164 @@
|
||||
// Example of parsing JSON to document by parts.
|
||||
|
||||
// Using C++11 threads
|
||||
#if __cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1600)
|
||||
|
||||
#include "rapidjson/document.h"
|
||||
#include "rapidjson/error/en.h"
|
||||
#include "rapidjson/writer.h"
|
||||
#include "rapidjson/ostreamwrapper.h"
|
||||
#include <condition_variable>
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
using namespace rapidjson;
|
||||
|
||||
template<unsigned parseFlags = kParseDefaultFlags>
|
||||
class AsyncDocumentParser {
|
||||
public:
|
||||
AsyncDocumentParser(Document& d) : ass_(*this), d_(d), parseThread_(&AsyncDocumentParser::Parse, this), completed_() {}
|
||||
|
||||
~AsyncDocumentParser() {
|
||||
if (!parseThread_.joinable())
|
||||
return;
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
|
||||
// Wait until the buffer is read up (or parsing is completed)
|
||||
while (!ass_.Empty() && !completed_)
|
||||
finish_.wait(lock);
|
||||
|
||||
// Automatically append '\0' as the terminator in the stream.
|
||||
static const char terminator[] = "";
|
||||
ass_.src_ = terminator;
|
||||
ass_.end_ = terminator + 1;
|
||||
notEmpty_.notify_one(); // unblock the AsyncStringStream
|
||||
}
|
||||
|
||||
parseThread_.join();
|
||||
}
|
||||
|
||||
void ParsePart(const char* buffer, size_t length) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
|
||||
// Wait until the buffer is read up (or parsing is completed)
|
||||
while (!ass_.Empty() && !completed_)
|
||||
finish_.wait(lock);
|
||||
|
||||
// Stop further parsing if the parsing process is completed.
|
||||
if (completed_)
|
||||
return;
|
||||
|
||||
// Set the buffer to stream and unblock the AsyncStringStream
|
||||
ass_.src_ = buffer;
|
||||
ass_.end_ = buffer + length;
|
||||
notEmpty_.notify_one();
|
||||
}
|
||||
|
||||
private:
|
||||
void Parse() {
|
||||
d_.ParseStream<parseFlags>(ass_);
|
||||
|
||||
// The stream may not be fully read, notify finish anyway to unblock ParsePart()
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
completed_ = true; // Parsing process is completed
|
||||
finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting.
|
||||
}
|
||||
|
||||
struct AsyncStringStream {
|
||||
typedef char Ch;
|
||||
|
||||
AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {}
|
||||
|
||||
char Peek() const {
|
||||
std::unique_lock<std::mutex> lock(parser_.mutex_);
|
||||
|
||||
// If nothing in stream, block to wait.
|
||||
while (Empty())
|
||||
parser_.notEmpty_.wait(lock);
|
||||
|
||||
return *src_;
|
||||
}
|
||||
|
||||
char Take() {
|
||||
std::unique_lock<std::mutex> lock(parser_.mutex_);
|
||||
|
||||
// If nothing in stream, block to wait.
|
||||
while (Empty())
|
||||
parser_.notEmpty_.wait(lock);
|
||||
|
||||
count_++;
|
||||
char c = *src_++;
|
||||
|
||||
// If all stream is read up, notify that the stream is finish.
|
||||
if (Empty())
|
||||
parser_.finish_.notify_one();
|
||||
|
||||
return c;
|
||||
}
|
||||
|
||||
size_t Tell() const { return count_; }
|
||||
|
||||
// Not implemented
|
||||
char* PutBegin() { return 0; }
|
||||
void Put(char) {}
|
||||
void Flush() {}
|
||||
size_t PutEnd(char*) { return 0; }
|
||||
|
||||
bool Empty() const { return src_ == end_; }
|
||||
|
||||
AsyncDocumentParser& parser_;
|
||||
const char* src_; //!< Current read position.
|
||||
const char* end_; //!< End of buffer
|
||||
size_t count_; //!< Number of characters taken so far.
|
||||
};
|
||||
|
||||
AsyncStringStream ass_;
|
||||
Document& d_;
|
||||
std::thread parseThread_;
|
||||
std::mutex mutex_;
|
||||
std::condition_variable notEmpty_;
|
||||
std::condition_variable finish_;
|
||||
bool completed_;
|
||||
};
|
||||
|
||||
int main() {
|
||||
Document d;
|
||||
|
||||
{
|
||||
AsyncDocumentParser<> parser(d);
|
||||
|
||||
const char json1[] = " { \"hello\" : \"world\", \"t\" : tr";
|
||||
//const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // Fot test parsing error
|
||||
const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14";
|
||||
const char json3[] = "16, \"a\":[1, 2, 3, 4] } ";
|
||||
|
||||
parser.ParsePart(json1, sizeof(json1) - 1);
|
||||
parser.ParsePart(json2, sizeof(json2) - 1);
|
||||
parser.ParsePart(json3, sizeof(json3) - 1);
|
||||
}
|
||||
|
||||
if (d.HasParseError()) {
|
||||
std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl;
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
// Stringify the JSON to cout
|
||||
OStreamWrapper os(std::cout);
|
||||
Writer<OStreamWrapper> writer(os);
|
||||
d.Accept(writer);
|
||||
std::cout << std::endl;
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
#else // Not supporting C++11
|
||||
|
||||
#include <iostream>
|
||||
int main() {
|
||||
std::cout << "This example requires C++11 compiler" << std::endl;
|
||||
}
|
||||
|
||||
#endif
|
Loading…
x
Reference in New Issue
Block a user