Files
futriix/internal/cluster/node.go

613 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/cluster/node.go
// Назначение: Реализация узла кластера (node) для распределённой СУБД с поддержкой временных меток.
// Полностью lock-free с использованием атомарных операций.
package cluster
import (
"encoding/json"
"fmt"
"net"
"sync/atomic"
"time"
"futriis/internal/log"
"futriis/internal/storage"
"github.com/google/uuid"
)
// NodeStatus представляет состояние узла кластера
type NodeStatus int32
const (
StatusOffline NodeStatus = iota
StatusActive
StatusSyncing
StatusFailed
)
// Node представляет отдельный узел в распределённой системе
type Node struct {
ID string // Уникальный идентификатор узла
IP string // IP-адрес узла
Port int // Порт для коммуникации
Status atomic.Int32 // Атомарный статус узла (NodeStatus)
Storage *storage.Storage
logger *log.Logger
coordinator *RaftCoordinator // Ссылка на координатора (теперь RaftCoordinator)
lastSeen atomic.Int64 // Время последнего heartbeat (Unix миллисекунды)
joinedAt atomic.Int64 // Время присоединения к кластеру
createdAt int64 // Время создания узла
startedAt int64 // Время последнего старта узла
stoppedAt int64 // Время остановки узла
incomingConn chan net.Conn // Канал для входящих соединений (wait-free)
stopChan chan struct{}
requestCount atomic.Uint64 // Счётчик обработанных запросов
bytesRx atomic.Uint64 // Получено байт
bytesTx atomic.Uint64 // Отправлено байт
}
// NodeConfig содержит конфигурацию для создания узла
type NodeConfig struct {
IP string
Port int
Storage *storage.Storage
Logger *log.Logger
Coordinator *RaftCoordinator
}
// NewNode создаёт новый экземпляр узла кластера
func NewNode(ip string, port int, store *storage.Storage, logger *log.Logger) *Node {
now := time.Now().UnixMilli()
node := &Node{
ID: uuid.New().String(),
IP: ip,
Port: port,
Storage: store,
logger: logger,
incomingConn: make(chan net.Conn, 1000), // Буферизованный канал для wait-free приёма
stopChan: make(chan struct{}),
createdAt: now,
startedAt: now,
}
node.Status.Store(int32(StatusActive))
node.lastSeen.Store(now)
node.joinedAt.Store(0) // Пока не присоединён к кластеру
// Запуск сервера для приёма межузловых соединений
go node.startTCPServer()
// Запуск обработчика входящих соединений
go node.handleIncomingConnections()
// Запуск heartbeat-отправки (если координатор известен)
go node.heartbeatLoop()
if logger != nil {
logger.Info(fmt.Sprintf("Node %s created at %s", node.ID, node.GetCreatedAtStr()))
}
return node
}
// startTCPServer запускает TCP-сервер для приёма запросов от других узлов
func (n *Node) startTCPServer() {
addr := fmt.Sprintf("%s:%d", n.IP, n.Port)
listener, err := net.Listen("tcp", addr)
if err != nil {
if n.logger != nil {
n.logger.Error(fmt.Sprintf("Node %s failed to start TCP server: %v", n.ID, err))
}
n.Status.Store(int32(StatusFailed))
return
}
defer listener.Close()
if n.logger != nil {
n.logger.Info(fmt.Sprintf("Node %s listening on %s (started at %s)", n.ID, addr, n.GetStartedAtStr()))
}
for {
select {
case <-n.stopChan:
return
default:
conn, err := listener.Accept()
if err != nil {
if n.logger != nil {
n.logger.Error(fmt.Sprintf("Node %s accept error: %v", n.ID, err))
}
continue
}
// Неблокирующая отправка в канал
select {
case n.incomingConn <- conn:
n.bytesRx.Add(1)
default:
if n.logger != nil {
n.logger.Warn(fmt.Sprintf("Node %s incoming connection queue full, dropping connection", n.ID))
}
conn.Close()
}
}
}
}
// handleIncomingConnections обрабатывает входящие соединения wait-free способом
func (n *Node) handleIncomingConnections() {
for {
select {
case <-n.stopChan:
return
case conn := <-n.incomingConn:
n.requestCount.Add(1)
go n.handleNodeRequest(conn)
}
}
}
// handleNodeRequest обрабатывает конкретный запрос от другого узла
func (n *Node) handleNodeRequest(conn net.Conn) {
defer conn.Close()
decoder := json.NewDecoder(conn)
var req NodeRequest
if err := decoder.Decode(&req); err != nil {
if n.logger != nil {
n.logger.Error(fmt.Sprintf("Node %s failed to decode request: %v", n.ID, err))
}
return
}
// Обновляем время последнего контакта
n.lastSeen.Store(time.Now().UnixMilli())
if n.logger != nil {
n.logger.Debug(fmt.Sprintf("Node %s received request type %s from %s at %s",
n.ID, req.Type, req.FromNode, time.UnixMilli(req.Timestamp).Format("15:04:05.000")))
}
// Маршрутизация запроса в зависимости от типа
switch req.Type {
case "replicate":
n.handleReplicateRequest(req.Data)
case "query":
n.handleQueryRequest(req.Data, conn)
case "sync":
n.handleSyncRequest(req.Data, conn)
case "heartbeat":
n.handleHeartbeatRequest(req, conn)
default:
if n.logger != nil {
n.logger.Warn(fmt.Sprintf("Node %s unknown request type: %s", n.ID, req.Type))
}
}
}
// handleReplicateRequest обрабатывает запрос на репликацию документа
func (n *Node) handleReplicateRequest(data []byte) {
startTime := time.Now().UnixMilli()
var repData struct {
Database string `json:"database"`
Collection string `json:"collection"`
Document map[string]interface{} `json:"document"`
SourceNode string `json:"source_node"`
ReplicaID string `json:"replica_id"`
}
if err := json.Unmarshal(data, &repData); err != nil {
if n.logger != nil {
n.logger.Error(fmt.Sprintf("Node %s failed to unmarshal replicate data: %v", n.ID, err))
}
return
}
// Получаем базу данных
db, err := n.Storage.GetDatabase(repData.Database)
if err != nil {
if n.logger != nil {
n.logger.Error(fmt.Sprintf("Node %s database not found for replication: %s", n.ID, repData.Database))
}
return
}
// Получаем коллекцию
coll, err := db.GetCollection(repData.Collection)
if err != nil {
if n.logger != nil {
n.logger.Error(fmt.Sprintf("Node %s collection not found for replication: %s", n.ID, repData.Collection))
}
return
}
// Создаём документ и вставляем
doc := &storage.Document{
ID: repData.Document["_id"].(string),
Fields: repData.Document,
}
if err := coll.Insert(doc); err != nil {
if n.logger != nil {
n.logger.Error(fmt.Sprintf("Node %s failed to replicate document: %v", n.ID, err))
}
} else {
duration := time.Now().UnixMilli() - startTime
if n.logger != nil {
n.logger.Debug(fmt.Sprintf("Node %s replicated document %s from %s (took %d ms)",
n.ID, doc.ID, repData.SourceNode, duration))
}
}
}
// handleQueryRequest обрабатывает запрос на чтение данных с узла
func (n *Node) handleQueryRequest(data []byte, conn net.Conn) {
startTime := time.Now().UnixMilli()
var queryData struct {
Database string `json:"database"`
Collection string `json:"collection"`
DocumentID string `json:"document_id"`
RequestID string `json:"request_id"`
}
if err := json.Unmarshal(data, &queryData); err != nil {
n.sendErrorResponse(conn, err.Error())
return
}
// Получаем базу данных
db, err := n.Storage.GetDatabase(queryData.Database)
if err != nil {
n.sendErrorResponse(conn, err.Error())
return
}
// Получаем коллекцию
coll, err := db.GetCollection(queryData.Collection)
if err != nil {
n.sendErrorResponse(conn, err.Error())
return
}
// Находим документ
doc, err := coll.Find(queryData.DocumentID)
if err != nil {
n.sendErrorResponse(conn, err.Error())
return
}
duration := time.Now().UnixMilli() - startTime
// Отправляем успешный ответ
response := map[string]interface{}{
"status": "success",
"data": doc,
"node_id": n.ID,
"request_id": queryData.RequestID,
"duration_ms": duration,
"timestamp": time.Now().UnixMilli(),
}
encoder := json.NewEncoder(conn)
encoder.Encode(response)
n.bytesTx.Add(uint64(len(response)))
if n.logger != nil {
n.logger.Debug(fmt.Sprintf("Node %s handled query for %s.%s:%s (took %d ms)",
n.ID, queryData.Database, queryData.Collection, queryData.DocumentID, duration))
}
}
// handleSyncRequest обрабатывает запрос на синхронизацию всей коллекции
func (n *Node) handleSyncRequest(data []byte, conn net.Conn) {
startTime := time.Now().UnixMilli()
var syncData struct {
Database string `json:"database"`
Collection string `json:"collection"`
RequestID string `json:"request_id"`
Since int64 `json:"since"` // Синхронизация с указанного времени
}
if err := json.Unmarshal(data, &syncData); err != nil {
n.sendErrorResponse(conn, err.Error())
return
}
// Получаем базу данных
db, err := n.Storage.GetDatabase(syncData.Database)
if err != nil {
n.sendErrorResponse(conn, err.Error())
return
}
// Получаем коллекцию
coll, err := db.GetCollection(syncData.Collection)
if err != nil {
n.sendErrorResponse(conn, err.Error())
return
}
// Получаем все документы
docs := coll.GetAllDocuments()
// Фильтруем по времени, если указано
if syncData.Since > 0 {
filtered := make([]*storage.Document, 0)
for _, doc := range docs {
if doc.UpdatedAt > syncData.Since {
filtered = append(filtered, doc)
}
}
docs = filtered
}
duration := time.Now().UnixMilli() - startTime
response := map[string]interface{}{
"status": "success",
"docs": docs,
"count": len(docs),
"node_id": n.ID,
"request_id": syncData.RequestID,
"duration_ms": duration,
"timestamp": time.Now().UnixMilli(),
"sync_duration_ms": duration,
}
encoder := json.NewEncoder(conn)
encoder.Encode(response)
if n.logger != nil {
n.logger.Info(fmt.Sprintf("Node %s synced %d documents from %s.%s (took %d ms)",
n.ID, len(docs), syncData.Database, syncData.Collection, duration))
}
}
// handleHeartbeatRequest обрабатывает heartbeat запрос
func (n *Node) handleHeartbeatRequest(req NodeRequest, conn net.Conn) {
n.lastSeen.Store(time.Now().UnixMilli())
response := map[string]interface{}{
"status": "alive",
"node_id": n.ID,
"timestamp": time.Now().UnixMilli(),
"uptime_ms": time.Now().UnixMilli() - n.startedAt,
}
encoder := json.NewEncoder(conn)
encoder.Encode(response)
}
// sendErrorResponse отправляет ошибку в ответ на запрос
func (n *Node) sendErrorResponse(conn net.Conn, errMsg string) {
response := map[string]interface{}{
"status": "error",
"error": errMsg,
"node_id": n.ID,
"timestamp": time.Now().UnixMilli(),
}
encoder := json.NewEncoder(conn)
encoder.Encode(response)
}
// heartbeatLoop отправляет периодические сигналы жизни координатору
func (n *Node) heartbeatLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-n.stopChan:
return
case <-ticker.C:
if n.coordinator != nil {
n.coordinator.SendHeartbeat(n.ID)
n.lastSeen.Store(time.Now().UnixMilli())
if n.logger != nil {
n.logger.Debug(fmt.Sprintf("Node %s sent heartbeat at %s", n.ID, n.GetLastSeenStr()))
}
}
}
}
}
// GetNodeStatus возвращает текущий статус узла (атомарно)
func (n *Node) GetNodeStatus() NodeStatus {
return NodeStatus(n.Status.Load())
}
// IsActive проверяет, активен ли узел
func (n *Node) IsActive() bool {
return NodeStatus(n.Status.Load()) == StatusActive
}
// SetCoordinator устанавливает координатора для узла
func (n *Node) SetCoordinator(coord *RaftCoordinator) {
n.coordinator = coord
now := time.Now().UnixMilli()
n.joinedAt.Store(now)
if n.logger != nil {
n.logger.Info(fmt.Sprintf("Node %s joined cluster at %s", n.ID, n.GetJoinedAtStr()))
}
}
// JoinCluster присоединяет узел к кластеру
func (n *Node) JoinCluster(coord *RaftCoordinator) error {
if n.coordinator != nil {
return fmt.Errorf("node already joined to cluster")
}
n.SetCoordinator(coord)
// Регистрируем узел в координаторе
if err := coord.RegisterNode(n); err != nil {
return fmt.Errorf("failed to register node: %v", err)
}
n.Status.Store(int32(StatusActive))
if n.logger != nil {
n.logger.Info(fmt.Sprintf("Node %s successfully joined cluster at %s", n.ID, n.GetJoinedAtStr()))
}
return nil
}
// LeaveCluster покидает кластер
func (n *Node) LeaveCluster() error {
if n.coordinator == nil {
return fmt.Errorf("node not in cluster")
}
n.Status.Store(int32(StatusOffline))
if err := n.coordinator.RemoveNode(n.ID); err != nil {
n.logger.Warn(fmt.Sprintf("Failed to remove node from coordinator: %v", err))
}
n.coordinator = nil
n.joinedAt.Store(0)
if n.logger != nil {
n.logger.Info(fmt.Sprintf("Node %s left cluster at %s", n.ID, time.Now().Format("2006-01-02 15:04:05.000")))
}
return nil
}
// GetLastSeen возвращает время последнего контакта
func (n *Node) GetLastSeen() int64 {
return n.lastSeen.Load()
}
// GetLastSeenStr возвращает человекочитаемое время последнего контакта
func (n *Node) GetLastSeenStr() string {
lastSeen := n.lastSeen.Load()
if lastSeen == 0 {
return "never"
}
return time.UnixMilli(lastSeen).Format("2006-01-02 15:04:05.000")
}
// GetJoinedAt возвращает время присоединения к кластеру
func (n *Node) GetJoinedAt() int64 {
return n.joinedAt.Load()
}
// GetJoinedAtStr возвращает человекочитаемое время присоединения
func (n *Node) GetJoinedAtStr() string {
joinedAt := n.joinedAt.Load()
if joinedAt == 0 {
return "not joined"
}
return time.UnixMilli(joinedAt).Format("2006-01-02 15:04:05.000")
}
// GetStartedAt возвращает время старта узла
func (n *Node) GetStartedAt() int64 {
return n.startedAt
}
// GetStartedAtStr возвращает человекочитаемое время старта
func (n *Node) GetStartedAtStr() string {
return time.UnixMilli(n.startedAt).Format("2006-01-02 15:04:05.000")
}
// GetCreatedAt возвращает время создания узла
func (n *Node) GetCreatedAt() int64 {
return n.createdAt
}
// GetCreatedAtStr возвращает человекочитаемое время создания
func (n *Node) GetCreatedAtStr() string {
return time.UnixMilli(n.createdAt).Format("2006-01-02 15:04:05.000")
}
// GetUptime возвращает время работы узла
func (n *Node) GetUptime() time.Duration {
if n.startedAt == 0 {
return 0
}
return time.Duration(time.Now().UnixMilli()-n.startedAt) * time.Millisecond
}
// GetStats возвращает статистику узла
func (n *Node) GetStats() map[string]interface{} {
return map[string]interface{}{
"id": n.ID,
"ip": n.IP,
"port": n.Port,
"status": n.GetNodeStatus(),
"created_at": n.GetCreatedAtStr(),
"started_at": n.GetStartedAtStr(),
"joined_at": n.GetJoinedAtStr(),
"last_seen": n.GetLastSeenStr(),
"uptime": n.GetUptime().String(),
"request_count": n.requestCount.Load(),
"bytes_rx": n.bytesRx.Load(),
"bytes_tx": n.bytesTx.Load(),
}
}
// Stop останавливает работу узла
func (n *Node) Stop() {
n.Status.Store(int32(StatusOffline))
n.stoppedAt = time.Now().UnixMilli()
close(n.stopChan)
if n.logger != nil {
n.logger.Info(fmt.Sprintf("Node %s stopped at %s", n.ID, time.UnixMilli(n.stoppedAt).Format("2006-01-02 15:04:05.000")))
}
}
// GetAddress возвращает адрес узла в формате "ip:port"
func (n *Node) GetAddress() string {
return fmt.Sprintf("%s:%d", n.IP, n.Port)
}
// ReplicateDocument отправляет документ на репликацию всем активным узлам
func (n *Node) ReplicateDocument(database, collection string, doc *storage.Document) error {
if n.coordinator == nil {
if n.logger != nil {
n.logger.Warn("No coordinator set, skipping replication")
}
return fmt.Errorf("no coordinator set")
}
// Получаем список всех узлов от координатора
nodes := n.coordinator.GetActiveNodes()
startTime := time.Now().UnixMilli()
for _, nodeInfo := range nodes {
if nodeInfo.ID == n.ID {
continue // Пропускаем себя
}
// В реальной реализации здесь была бы отправка на узел
if n.logger != nil {
n.logger.Debug(fmt.Sprintf("Would replicate to node %s (last seen: %s)",
nodeInfo.ID, time.UnixMilli(nodeInfo.LastSeen).Format("15:04:05.000")))
}
}
duration := time.Now().UnixMilli() - startTime
if n.logger != nil {
n.logger.Info(fmt.Sprintf("Replicated document %s to %d nodes (took %d ms)",
doc.ID, len(nodes)-1, duration))
}
return nil
}