Malavan Sotheeswaran 38169682f0
Merge latest internal to OSS (#720)
* add docker build

* fix the working dir in Dockerfile

* add release publish docker image

* address intentation and use default release

* migrate keydb_modstatsd to keydb-internal

* rm

* add submodule cpp-statsd-client

* include trigger keydb_modstatsd Makefile in modules Makefile

* update

* have clean also trigger keydb_modstatsd clean

* move cpp-statsd-client to deps

* checkout to a06a5b9359f31d946fe163b9038586982971ae49

* update relative path in compilation

* remove submodule instead use the source

* include building keydb statsd module

* fix check in Dockerfile docker-entrypoint.sh

* fix

* fix the comment caused stuck docker build

* use commit hash as tag template

* fix

* test tag

* Revert "test tag"

This reverts commit 9cbc57137d57aab4fdd5a9283bae07391b3c7f8b.

* make docker build independent

* add new build to ci

* emit system free metrics with '/proc/meminfo'

* have emit system free memory within metrics_time_taken_us and also add metric time taken for it

* Remove Expireset (#217)

Major refactor to place expiry information directly in the object struct.

* update MemFree to MemAvailable in keydb statsd

* add metric emit for non-empty primary with less than 2 connected replicas

* address comments

* Multiply CPU percent metric by 100

* Fix memory leaks

* Fix slow to free when low lock contention

* fix nodename metricsname

* fix unnecessary replace

* Make propagating before freeing module context optional (#225)

* don't propogate on module context free for rdb load

* default in wrong place

* Flash expiration (#197)

Design Doc: https://docs.google.com/document/d/1NmnYGnHLdZp-KOUCUatX5iXpF-L3YK4VUc9Lm3Tqxpo/edit?usp=sharing

* Emit more aggregate metrics in modstatsd (#223)

* Permit keys of differing slots as long as they are served by this cluster and we are not migrating

* Fix over pessimistic checks that prevent replicas from serving mget

* Fix logic bug

* async rehash is preventing rehashing during RDB load after a db flush.  Ensure it can't interefere after a flush

* make async rehash configurable

* only use internal locks when multithreaded (#205)

* Fix crossslot error migrating batches of keys

* Fix bug where we erroneously answer queries belonging to another shard

* fix mac compile

* enable level_compaction_dynamic_level_bytes after flush, and flush expires for FLASH (#229)

* enable level_compaction_dynamic_level_bytes after flush, and flush expires

* update debug reload for flash

* update debug reload for flash complete

* missing forward declare

* commit existing changes then track changes for debug reload

* missing args

* commitChanges is conditional

Co-authored-by: John Sully <jsully@snapchat.com>

---------

Co-authored-by: zliang <zliang@snapchat.com>
Co-authored-by: John Sully <jsully@snapchat.com>
Co-authored-by: Alex Cope <acope@snapchat.com>
Co-authored-by: John Sully <john@csquare.ca>
2023-09-28 18:13:27 -04:00

346 lines
9.3 KiB
C++

#ifndef UDP_SENDER_HPP
#define UDP_SENDER_HPP
#ifdef _WIN32
#define NOMINMAX
#include <io.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#endif
#include <atomic>
#include <cmath>
#include <cstdint>
#include <cstring>
#include <deque>
#include <mutex>
#include <string>
#include <thread>
namespace Statsd {
#ifdef _WIN32
using SOCKET_TYPE = SOCKET;
constexpr SOCKET_TYPE k_invalidSocket{INVALID_SOCKET};
#define SOCKET_ERRNO WSAGetLastError()
#define SOCKET_CLOSE closesocket
#else
using SOCKET_TYPE = int;
constexpr SOCKET_TYPE k_invalidSocket{-1};
#define SOCKET_ERRNO errno
#define SOCKET_CLOSE close
#endif
/*!
*
* UDP sender
*
* A simple UDP sender handling batching.
*
*/
class UDPSender final {
public:
//!@name Constructor and destructor, non-copyable
//!@{
//! Constructor
UDPSender(const std::string& host,
const uint16_t port,
const uint64_t batchsize,
const uint64_t sendInterval) noexcept;
//! Destructor
~UDPSender();
UDPSender(const UDPSender&) = delete;
UDPSender& operator=(const UDPSender&) = delete;
UDPSender(UDPSender&&) = delete;
//!@}
//!@name Methods
//!@{
//! Send or enqueue a message
void send(const std::string& message) noexcept;
//! Returns the error message as a string
const std::string& errorMessage() const noexcept;
//! Returns true if the sender is initialized
bool initialized() const noexcept;
//! Flushes any queued messages
void flush() noexcept;
//!@}
private:
// @name Private methods
// @{
//! Initialize the sender and returns true when it is initialized
bool initialize() noexcept;
//! Queue a message to be sent to the daemon later
inline void queueMessage(const std::string& message) noexcept;
//! Send a message to the daemon
void sendToDaemon(const std::string& message) noexcept;
//!@}
private:
// @name State variables
// @{
//! Shall we exit?
std::atomic<bool> m_mustExit{false};
//!@}
// @name Network info
// @{
//! The hostname
std::string m_host;
//! The port
uint16_t m_port;
//! The structure holding the server
struct sockaddr_in m_server;
//! The socket to be used
SOCKET_TYPE m_socket = k_invalidSocket;
//!@}
// @name Batching info
// @{
//! The batching size
uint64_t m_batchsize;
//! The sending frequency in milliseconds
uint64_t m_sendInterval;
//! The queue batching the messages
std::deque<std::string> m_batchingMessageQueue;
//! The mutex used for batching
std::mutex m_batchingMutex;
//! The thread dedicated to the batching
std::thread m_batchingThread;
//!@}
//! Error message (optional string)
std::string m_errorMessage;
};
namespace detail {
inline bool isValidSocket(const SOCKET_TYPE socket) {
return socket != k_invalidSocket;
}
#ifdef _WIN32
struct WinSockSingleton {
inline static const WinSockSingleton& getInstance() {
static const WinSockSingleton instance;
return instance;
}
inline bool ok() const {
return m_ok;
}
~WinSockSingleton() {
WSACleanup();
}
private:
WinSockSingleton() {
WSADATA wsa;
m_ok = WSAStartup(MAKEWORD(2, 2), &wsa) == 0;
}
bool m_ok;
};
#endif
} // namespace detail
inline UDPSender::UDPSender(const std::string& host,
const uint16_t port,
const uint64_t batchsize,
const uint64_t sendInterval) noexcept
: m_host(host), m_port(port), m_batchsize(batchsize), m_sendInterval(sendInterval) {
// Initialize the socket
if (!initialize()) {
return;
}
// If batching is on, use a dedicated thread to send after the wait time is reached
if (m_batchsize != 0 && m_sendInterval > 0) {
// Define the batching thread
m_batchingThread = std::thread([this] {
// TODO: this will drop unsent stats, should we send all the unsent stats before we exit?
while (!m_mustExit.load(std::memory_order_acquire)) {
std::deque<std::string> stagedMessageQueue;
std::unique_lock<std::mutex> batchingLock(m_batchingMutex);
m_batchingMessageQueue.swap(stagedMessageQueue);
batchingLock.unlock();
// Flush the queue
while (!stagedMessageQueue.empty()) {
sendToDaemon(stagedMessageQueue.front());
stagedMessageQueue.pop_front();
}
// Wait before sending the next batch
std::this_thread::sleep_for(std::chrono::milliseconds(m_sendInterval));
}
});
}
}
inline UDPSender::~UDPSender() {
if (!initialized()) {
return;
}
// If we're running a background thread tell it to stop
if (m_batchingThread.joinable()) {
m_mustExit.store(true, std::memory_order_release);
m_batchingThread.join();
}
// Cleanup the socket
SOCKET_CLOSE(m_socket);
}
inline void UDPSender::send(const std::string& message) noexcept {
m_errorMessage.clear();
// If batching is on, accumulate messages in the queue
if (m_batchsize > 0) {
queueMessage(message);
return;
}
// Or send it right now
sendToDaemon(message);
}
inline void UDPSender::queueMessage(const std::string& message) noexcept {
// We aquire a lock but only if we actually need to (i.e. there is a thread also accessing the queue)
auto batchingLock =
m_batchingThread.joinable() ? std::unique_lock<std::mutex>(m_batchingMutex) : std::unique_lock<std::mutex>();
// Either we don't have a place to batch our message or we exceeded the batch size, so make a new batch
if (m_batchingMessageQueue.empty() || m_batchingMessageQueue.back().length() > m_batchsize) {
m_batchingMessageQueue.emplace_back();
m_batchingMessageQueue.back().reserve(m_batchsize + 256);
} // When there is already a batch open we need a separator when its not empty
else if (!m_batchingMessageQueue.back().empty()) {
m_batchingMessageQueue.back().push_back('\n');
}
// Add the new message to the batch
m_batchingMessageQueue.back().append(message);
}
inline const std::string& UDPSender::errorMessage() const noexcept {
return m_errorMessage;
}
inline bool UDPSender::initialize() noexcept {
#ifdef _WIN32
if (!detail::WinSockSingleton::getInstance().ok()) {
m_errorMessage = "WSAStartup failed: errno=" + std::to_string(SOCKET_ERRNO);
}
#endif
// Connect the socket
m_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (!detail::isValidSocket(m_socket)) {
m_errorMessage = "socket creation failed: errno=" + std::to_string(SOCKET_ERRNO);
return false;
}
std::memset(&m_server, 0, sizeof(m_server));
m_server.sin_family = AF_INET;
m_server.sin_port = htons(m_port);
if (inet_pton(AF_INET, m_host.c_str(), &m_server.sin_addr) == 0) {
// An error code has been returned by inet_aton
// Specify the criteria for selecting the socket address structure
struct addrinfo hints;
std::memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
// Get the address info using the hints
struct addrinfo* results = nullptr;
const int ret{getaddrinfo(m_host.c_str(), nullptr, &hints, &results)};
if (ret != 0) {
// An error code has been returned by getaddrinfo
SOCKET_CLOSE(m_socket);
m_socket = k_invalidSocket;
m_errorMessage = "getaddrinfo failed: err=" + std::to_string(ret) + ", msg=" + gai_strerror(ret);
return false;
}
// Copy the results in m_server
struct sockaddr_in* host_addr = (struct sockaddr_in*)results->ai_addr;
std::memcpy(&m_server.sin_addr, &host_addr->sin_addr, sizeof(struct in_addr));
// Free the memory allocated
freeaddrinfo(results);
}
return true;
}
inline void UDPSender::sendToDaemon(const std::string& message) noexcept {
// Try sending the message
const auto ret = sendto(m_socket,
message.data(),
#ifdef _WIN32
static_cast<int>(message.size()),
#else
message.size(),
#endif
0,
(struct sockaddr*)&m_server,
sizeof(m_server));
if (ret == -1) {
m_errorMessage = "sendto server failed: host=" + m_host + ":" + std::to_string(m_port) +
", err=" + std::to_string(SOCKET_ERRNO);
}
}
inline bool UDPSender::initialized() const noexcept {
return m_socket != k_invalidSocket;
}
inline void UDPSender::flush() noexcept {
// We aquire a lock but only if we actually need to (ie there is a thread also accessing the queue)
auto batchingLock =
m_batchingThread.joinable() ? std::unique_lock<std::mutex>(m_batchingMutex) : std::unique_lock<std::mutex>();
// Flush the queue
while (!m_batchingMessageQueue.empty()) {
sendToDaemon(m_batchingMessageQueue.front());
m_batchingMessageQueue.pop_front();
}
}
} // namespace Statsd
#endif