Delete internal/cluster/node.go
This commit is contained in:
@@ -1,389 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2026 Safronov Grigorii
|
|
||||||
*
|
|
||||||
* Licensed under the CDDL, Version 1.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
*
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
* https://opensource.org/licenses/CDDL-1.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
// Файл: internal/cluster/node.go
|
|
||||||
// Назначение: Реализация узла кластера (node) для распределённой СУБД.
|
|
||||||
// Реализация узла кластера (node) для распределённой СУБД.
|
|
||||||
// Полностью lock-free с использованием атомарных операций.
|
|
||||||
|
|
||||||
package cluster
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"futriis/internal/log"
|
|
||||||
"futriis/internal/storage"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
// NodeStatus представляет состояние узла кластера
|
|
||||||
type NodeStatus int32
|
|
||||||
|
|
||||||
const (
|
|
||||||
StatusOffline NodeStatus = iota
|
|
||||||
StatusActive
|
|
||||||
StatusSyncing
|
|
||||||
StatusFailed
|
|
||||||
)
|
|
||||||
|
|
||||||
// Node представляет отдельный узел в распределённой системе
|
|
||||||
type Node struct {
|
|
||||||
ID string // Уникальный идентификатор узла
|
|
||||||
IP string // IP-адрес узла
|
|
||||||
Port int // Порт для коммуникации
|
|
||||||
Status atomic.Int32 // Атомарный статус узла (NodeStatus)
|
|
||||||
Storage *storage.Storage
|
|
||||||
logger *log.Logger
|
|
||||||
coordinator *RaftCoordinator // Ссылка на координатора (теперь RaftCoordinator)
|
|
||||||
lastSeen atomic.Int64 // Время последнего heartbeat (Unix nano)
|
|
||||||
incomingConn chan net.Conn // Канал для входящих соединений (wait-free)
|
|
||||||
stopChan chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NodeConfig содержит конфигурацию для создания узла
|
|
||||||
type NodeConfig struct {
|
|
||||||
IP string
|
|
||||||
Port int
|
|
||||||
Storage *storage.Storage
|
|
||||||
Logger *log.Logger
|
|
||||||
Coordinator *RaftCoordinator
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewNode создаёт новый экземпляр узла кластера
|
|
||||||
func NewNode(ip string, port int, store *storage.Storage, logger *log.Logger) *Node {
|
|
||||||
node := &Node{
|
|
||||||
ID: uuid.New().String(),
|
|
||||||
IP: ip,
|
|
||||||
Port: port,
|
|
||||||
Storage: store,
|
|
||||||
logger: logger,
|
|
||||||
incomingConn: make(chan net.Conn, 1000), // Буферизованный канал для wait-free приёма
|
|
||||||
stopChan: make(chan struct{}),
|
|
||||||
}
|
|
||||||
node.Status.Store(int32(StatusActive))
|
|
||||||
node.lastSeen.Store(time.Now().UnixNano())
|
|
||||||
|
|
||||||
// Запуск сервера для приёма межузловых соединений
|
|
||||||
go node.startTCPServer()
|
|
||||||
|
|
||||||
// Запуск обработчика входящих соединений
|
|
||||||
go node.handleIncomingConnections()
|
|
||||||
|
|
||||||
// Запуск heartbeat-отправки (если координатор известен)
|
|
||||||
go node.heartbeatLoop()
|
|
||||||
|
|
||||||
return node
|
|
||||||
}
|
|
||||||
|
|
||||||
// startTCPServer запускает TCP-сервер для приёма запросов от других узлов
|
|
||||||
func (n *Node) startTCPServer() {
|
|
||||||
addr := fmt.Sprintf("%s:%d", n.IP, n.Port)
|
|
||||||
listener, err := net.Listen("tcp", addr)
|
|
||||||
if err != nil {
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Error(fmt.Sprintf("Node %s failed to start TCP server: %v", n.ID, err))
|
|
||||||
}
|
|
||||||
n.Status.Store(int32(StatusFailed))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer listener.Close()
|
|
||||||
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Info(fmt.Sprintf("Node %s listening on %s", n.ID, addr))
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-n.stopChan:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
conn, err := listener.Accept()
|
|
||||||
if err != nil {
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Error(fmt.Sprintf("Node %s accept error: %v", n.ID, err))
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Неблокирующая отправка в канал
|
|
||||||
select {
|
|
||||||
case n.incomingConn <- conn:
|
|
||||||
default:
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Warn(fmt.Sprintf("Node %s incoming connection queue full, dropping connection", n.ID))
|
|
||||||
}
|
|
||||||
conn.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleIncomingConnections обрабатывает входящие соединения wait-free способом
|
|
||||||
func (n *Node) handleIncomingConnections() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-n.stopChan:
|
|
||||||
return
|
|
||||||
case conn := <-n.incomingConn:
|
|
||||||
go n.handleNodeRequest(conn)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleNodeRequest обрабатывает конкретный запрос от другого узла
|
|
||||||
func (n *Node) handleNodeRequest(conn net.Conn) {
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
decoder := json.NewDecoder(conn)
|
|
||||||
var req NodeRequest
|
|
||||||
if err := decoder.Decode(&req); err != nil {
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Error(fmt.Sprintf("Node %s failed to decode request: %v", n.ID, err))
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Обновляем время последнего контакта
|
|
||||||
n.lastSeen.Store(time.Now().UnixNano())
|
|
||||||
|
|
||||||
// Маршрутизация запроса в зависимости от типа
|
|
||||||
switch req.Type {
|
|
||||||
case "replicate":
|
|
||||||
n.handleReplicateRequest(req.Data)
|
|
||||||
case "query":
|
|
||||||
n.handleQueryRequest(req.Data, conn)
|
|
||||||
case "sync":
|
|
||||||
n.handleSyncRequest(req.Data, conn)
|
|
||||||
default:
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Warn(fmt.Sprintf("Node %s unknown request type: %s", n.ID, req.Type))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleReplicateRequest обрабатывает запрос на репликацию документа
|
|
||||||
func (n *Node) handleReplicateRequest(data []byte) {
|
|
||||||
var repData struct {
|
|
||||||
Database string `json:"database"`
|
|
||||||
Collection string `json:"collection"`
|
|
||||||
Document map[string]interface{} `json:"document"`
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := json.Unmarshal(data, &repData); err != nil {
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Error(fmt.Sprintf("Node %s failed to unmarshal replicate data: %v", n.ID, err))
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Получаем базу данных
|
|
||||||
db, err := n.Storage.GetDatabase(repData.Database)
|
|
||||||
if err != nil {
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Error(fmt.Sprintf("Node %s database not found for replication: %s", n.ID, repData.Database))
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Получаем коллекцию
|
|
||||||
coll, err := db.GetCollection(repData.Collection)
|
|
||||||
if err != nil {
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Error(fmt.Sprintf("Node %s collection not found for replication: %s", n.ID, repData.Collection))
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Создаём документ и вставляем
|
|
||||||
doc := &storage.Document{
|
|
||||||
ID: repData.Document["_id"].(string),
|
|
||||||
Fields: repData.Document,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := coll.Insert(doc); err != nil {
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Error(fmt.Sprintf("Node %s failed to replicate document: %v", n.ID, err))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Debug(fmt.Sprintf("Node %s replicated document %s", n.ID, doc.ID))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleQueryRequest обрабатывает запрос на чтение данных с узла
|
|
||||||
func (n *Node) handleQueryRequest(data []byte, conn net.Conn) {
|
|
||||||
var queryData struct {
|
|
||||||
Database string `json:"database"`
|
|
||||||
Collection string `json:"collection"`
|
|
||||||
DocumentID string `json:"document_id"`
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := json.Unmarshal(data, &queryData); err != nil {
|
|
||||||
n.sendErrorResponse(conn, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Получаем базу данных
|
|
||||||
db, err := n.Storage.GetDatabase(queryData.Database)
|
|
||||||
if err != nil {
|
|
||||||
n.sendErrorResponse(conn, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Получаем коллекцию
|
|
||||||
coll, err := db.GetCollection(queryData.Collection)
|
|
||||||
if err != nil {
|
|
||||||
n.sendErrorResponse(conn, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Находим документ
|
|
||||||
doc, err := coll.Find(queryData.DocumentID)
|
|
||||||
if err != nil {
|
|
||||||
n.sendErrorResponse(conn, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Отправляем успешный ответ
|
|
||||||
response := map[string]interface{}{
|
|
||||||
"status": "success",
|
|
||||||
"data": doc,
|
|
||||||
}
|
|
||||||
encoder := json.NewEncoder(conn)
|
|
||||||
encoder.Encode(response)
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleSyncRequest обрабатывает запрос на синхронизацию всей коллекции
|
|
||||||
func (n *Node) handleSyncRequest(data []byte, conn net.Conn) {
|
|
||||||
var syncData struct {
|
|
||||||
Database string `json:"database"`
|
|
||||||
Collection string `json:"collection"`
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := json.Unmarshal(data, &syncData); err != nil {
|
|
||||||
n.sendErrorResponse(conn, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Получаем базу данных
|
|
||||||
db, err := n.Storage.GetDatabase(syncData.Database)
|
|
||||||
if err != nil {
|
|
||||||
n.sendErrorResponse(conn, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Получаем коллекцию
|
|
||||||
coll, err := db.GetCollection(syncData.Collection)
|
|
||||||
if err != nil {
|
|
||||||
n.sendErrorResponse(conn, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Получаем все документы
|
|
||||||
docs := coll.GetAllDocuments()
|
|
||||||
|
|
||||||
response := map[string]interface{}{
|
|
||||||
"status": "success",
|
|
||||||
"docs": docs,
|
|
||||||
"count": len(docs),
|
|
||||||
}
|
|
||||||
encoder := json.NewEncoder(conn)
|
|
||||||
encoder.Encode(response)
|
|
||||||
}
|
|
||||||
|
|
||||||
// sendErrorResponse отправляет ошибку в ответ на запрос
|
|
||||||
func (n *Node) sendErrorResponse(conn net.Conn, errMsg string) {
|
|
||||||
response := map[string]interface{}{
|
|
||||||
"status": "error",
|
|
||||||
"error": errMsg,
|
|
||||||
}
|
|
||||||
encoder := json.NewEncoder(conn)
|
|
||||||
encoder.Encode(response)
|
|
||||||
}
|
|
||||||
|
|
||||||
// heartbeatLoop отправляет периодические сигналы жизни координатору
|
|
||||||
func (n *Node) heartbeatLoop() {
|
|
||||||
ticker := time.NewTicker(5 * time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-n.stopChan:
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
if n.coordinator != nil {
|
|
||||||
n.coordinator.SendHeartbeat(n.ID)
|
|
||||||
n.lastSeen.Store(time.Now().UnixNano())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetNodeStatus возвращает текущий статус узла (атомарно)
|
|
||||||
func (n *Node) GetNodeStatus() NodeStatus {
|
|
||||||
return NodeStatus(n.Status.Load())
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsActive проверяет, активен ли узел
|
|
||||||
func (n *Node) IsActive() bool {
|
|
||||||
return NodeStatus(n.Status.Load()) == StatusActive
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetCoordinator устанавливает координатора для узла
|
|
||||||
func (n *Node) SetCoordinator(coord *RaftCoordinator) {
|
|
||||||
n.coordinator = coord
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Info(fmt.Sprintf("Node %s connected to coordinator", n.ID))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop останавливает работу узла
|
|
||||||
func (n *Node) Stop() {
|
|
||||||
n.Status.Store(int32(StatusOffline))
|
|
||||||
close(n.stopChan)
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Info(fmt.Sprintf("Node %s stopped", n.ID))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAddress возвращает адрес узла в формате "ip:port"
|
|
||||||
func (n *Node) GetAddress() string {
|
|
||||||
return fmt.Sprintf("%s:%d", n.IP, n.Port)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReplicateDocument отправляет документ на репликацию всем активным узлам
|
|
||||||
func (n *Node) ReplicateDocument(database, collection string, doc *storage.Document) error {
|
|
||||||
if n.coordinator == nil {
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Warn("No coordinator set, skipping replication")
|
|
||||||
}
|
|
||||||
return fmt.Errorf("no coordinator set")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Получаем список всех узлов от координатора
|
|
||||||
nodes := n.coordinator.GetActiveNodes()
|
|
||||||
|
|
||||||
for _, nodeInfo := range nodes {
|
|
||||||
if nodeInfo.ID == n.ID {
|
|
||||||
continue // Пропускаем себя
|
|
||||||
}
|
|
||||||
|
|
||||||
// В реальной реализации здесь была бы отправка на узел
|
|
||||||
if n.logger != nil {
|
|
||||||
n.logger.Debug(fmt.Sprintf("Would replicate to node %s", nodeInfo.ID))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user