futriix for Solaris-based systems

This commit is contained in:
Григорий Сафронов 2026-02-23 22:48:31 +03:00
commit 67216c6066
28 changed files with 2995 additions and 0 deletions

95
Makefile Normal file
View File

@ -0,0 +1,95 @@
# /futriis/Makefile
# Makefile для сборки проекта futriis на Unix-системах (Solaris, OpenIndiana, Linux)
.PHONY: all clean deps build build-server run run-server test install help
# Переменные
BINARY_NAME=futriis
SERVER_BINARY_NAME=futriisd
GO=go
GOFLAGS=-ldflags="-s -w"
BUILD_DIR=build
CMD_DIR=./cmd/futriis
# Цвета для вывода
GREEN=\033[0;32m
RED=\033[0;31m
NC=\033[0m # No Color
# Проверка операционной системы
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Linux)
OS_SUPPORTED=true
endif
ifeq ($(UNAME_S),SunOS)
OS_SUPPORTED=true
endif
ifndef OS_SUPPORTED
$(error "Операционная система $(UNAME_S) не поддерживается. Проект поддерживает только Solaris, OpenIndiana и Linux.")
endif
all: clean deps build
# Создание директории для сборки
$(BUILD_DIR):
@mkdir -p $(BUILD_DIR)
@mkdir -p $(BUILD_DIR)/plugins
@mkdir -p $(BUILD_DIR)/data
# Установка зависимостей
deps:
@echo "$(GREEN)Установка зависимостей...$(NC)"
$(GO) mod download
$(GO) mod tidy
# Сборка клиента
build: $(BUILD_DIR)
@echo "$(GREEN)Сборка клиента для $(UNAME_S)...$(NC)"
$(GO) build $(GOFLAGS) -o $(BUILD_DIR)/$(BINARY_NAME) $(CMD_DIR)
@echo "$(GREEN)Сборка завершена. Бинарный файл: $(BUILD_DIR)/$(BINARY_NAME)$(NC)"
# Сборка сервера
build-server: $(BUILD_DIR)
@echo "$(GREEN)Сборка сервера для $(UNAME_S)...$(NC)"
$(GO) build $(GOFLAGS) -o $(BUILD_DIR)/$(SERVER_BINARY_NAME) $(CMD_DIR)
@echo "$(GREEN)Сборка сервера завершена. Бинарный файл: $(BUILD_DIR)/$(SERVER_BINARY_NAME)$(NC)"
# Установка проекта
install:
@echo "$(GREEN)Установка проекта...$(NC)"
$(GO) install $(GOFLAGS) $(CMD_DIR)
@echo "$(GREEN)Установка завершена$(NC)"
# Запуск клиента
run: build
@echo "$(GREEN)Запуск futriis клиента...$(NC)"
./$(BUILD_DIR)/$(BINARY_NAME)
# Запуск сервера
run-server: build-server
@echo "$(GREEN)Запуск futriis сервера...$(NC)"
./$(BUILD_DIR)/$(SERVER_BINARY_NAME) -server -config config.toml
# Тестирование
test:
@echo "$(GREEN)Запуск тестов...$(NC)"
$(GO) test -v ./...
# Очистка
clean:
@echo "$(GREEN)Очистка...$(NC)"
rm -rf $(BUILD_DIR)
$(GO) clean
# Показать помощь
help:
@echo "Доступные команды Make:"
@echo " make deps - установка зависимостей"
@echo " make build - сборка клиента"
@echo " make build-server - сборка сервера"
@echo " make install - установка проекта"
@echo " make run - запуск клиента"
@echo " make run-server - запуск сервера"
@echo " make test - запуск тестов"
@echo " make clean - очистка"
@echo " make help - показать эту справку"

31
README.md Normal file
View File

@ -0,0 +1,31 @@
# futriis - Распределённая in-memory СУБД
futriis - это легковесная, wait-free и lock-free дружественная in-memory СУБД,
реализованная на Go 1.25.6 с поддержкой MessagePack сериализации.
## Поддерживаемые ОС
- Solaris
- OpenIndiana
- Linux (все популярные дистрибутивы)
**Важно:** Windows и MacOS X не поддерживаются!
## Архитектура
СУБД реализует три основных типа данных:
- **Таппл (Tapple)** - аналог базы данных в РСУБД
- **Слайс (Slice)** - аналог таблицы
- **Кортеж (Tuple)** - аналог записи в таблице
## Требования
- Go 1.25.6 или выше
- Только Unix-подобная ОС (Solaris, OpenIndiana, Linux)
## Установка и сборка
1. Клонируйте репозиторий:
```bash
git clone https://github.com/futriis/db.git
cd futriis

98
cmd/futriis/main.go Normal file
View File

@ -0,0 +1,98 @@
// /futriis/cmd/futriis/main.go
// Основная точка входа в приложение futriis
package main
import (
"flag"
"os"
"os/signal"
"syscall"
"futriis/internal/cli"
"futriis/internal/server"
"futriis/pkg/config"
"futriis/pkg/utils"
)
var (
configPath = flag.String("config", "config.toml", "путь к файлу конфигурации")
serverMode = flag.Bool("server", false, "запуск в режиме сервера")
)
func main() {
flag.Parse()
// Загружаем конфигурацию
cfg, err := config.Load(*configPath)
if err != nil {
utils.PrintError("Ошибка загрузки конфигурации: %v", err)
os.Exit(1)
}
// Настраиваем обработку сигналов для корректного завершения
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
utils.PrintInfo("\nПолучен сигнал завершения. Закрываем приложение...")
cleanup()
os.Exit(0)
}()
if *serverMode {
// Запуск в режиме сервера
startServer(cfg)
} else {
// Запуск в режиме CLI
startCLI()
}
cleanup()
}
// startCLI запускает интерактивный CLI
func startCLI() {
handler := cli.NewHandler()
if len(os.Args) > 1 && os.Args[1] != "-config" {
// Пакетный режим
executeBatch(os.Args[1:])
} else {
// Интерактивный режим
if err := handler.Run(); err != nil {
utils.PrintError("Ошибка выполнения: %v", err)
cleanup()
os.Exit(1)
}
}
}
// startServer запускает серверный режим
func startServer(cfg *config.Config) {
handler := cli.NewHandler()
srv := server.NewServer(cfg.Node.Address, handler.Engine())
if err := srv.Start(); err != nil {
utils.PrintError("Ошибка запуска сервера: %v", err)
os.Exit(1)
}
// Бесконечное ожидание
select {}
}
// executeBatch выполняет команды из аргументов командной строки
func executeBatch(commands []string) {
utils.PrintInfo("Пакетный режим пока не реализован")
}
// cleanup выполняет очистку ресурсов перед завершением
func cleanup() {
// Закрываем логгер
logger := utils.GetLogger()
if logger != nil {
logger.Log("INFO", "Приложение завершено")
logger.Close()
}
}

32
config.toml Normal file
View File

@ -0,0 +1,32 @@
# /futriis/config.toml
# Конфигурационный файл СУБД futriis
[cluster]
name = "futriis-cluster"
coordinator_address = "127.0.0.1:7379"
replication_factor = 2
sync_replication = true
auto_rebalance = true
[node]
id = "node-1"
address = "127.0.0.1:7380"
data_dir = "./data"
aof_enabled = true
aof_file = "./data/futriis.aof"
[storage]
page_size = 4096
max_memory = "1GB"
eviction_policy = "noeviction"
[replication]
enabled = true
sync_mode = "full" # full, async
heartbeat_interval = 5
timeout = 30
[lua]
enabled = true
plugins_dir = "./plugins"
max_memory = "128MB"

0
data/futriis.aof Normal file
View File

17
go.mod Normal file
View File

@ -0,0 +1,17 @@
module futriis
go 1.25.6
require (
github.com/BurntSushi/toml v1.3.2
github.com/mattn/go-runewidth v0.0.16
github.com/vmihailenco/msgpack/v5 v5.4.1
github.com/yuin/gopher-lua v1.1.1
golang.org/x/term v0.18.0
)
require (
github.com/rivo/uniseg v0.4.7 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/sys v0.18.0 // indirect
)

25
go.sum Normal file
View File

@ -0,0 +1,25 @@
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

43
internal/cli/commands.go Normal file
View File

@ -0,0 +1,43 @@
// /futriis/internal/cli/commands.go
// Пакет cli определяет структуры команд и их параметры
package cli
// Command представляет команду СУБД
type Command struct {
Name string
Description string
Usage string
Handler func(args []string) (string, error)
}
// CommandRegistry реестр всех доступных команд
type CommandRegistry struct {
commands map[string]*Command
}
// NewCommandRegistry создаёт новый реестр команд
func NewCommandRegistry() *CommandRegistry {
return &CommandRegistry{
commands: make(map[string]*Command),
}
}
// Register регистрирует новую команду
func (cr *CommandRegistry) Register(cmd *Command) {
cr.commands[cmd.Name] = cmd
}
// Get возвращает команду по имени
func (cr *CommandRegistry) Get(name string) (*Command, bool) {
cmd, ok := cr.commands[name]
return cmd, ok
}
// List возвращает список всех команд
func (cr *CommandRegistry) List() []*Command {
cmds := make([]*Command, 0, len(cr.commands))
for _, cmd := range cr.commands {
cmds = append(cmds, cmd)
}
return cmds
}

66
internal/cli/handler.go Normal file
View File

@ -0,0 +1,66 @@
// /futriis/internal/cli/handler.go
// Пакет cli реализует обработчик команд для интерактивного режима
package cli
import (
"fmt"
"strings"
"futriis/internal/engine"
"futriis/pkg/utils"
)
// Handler представляет обработчик команд CLI
type Handler struct {
prompt *Prompt
engine *engine.Engine
}
// NewHandler создаёт новый обработчик команд
func NewHandler() *Handler {
return &Handler{
prompt: NewPrompt(),
engine: engine.NewEngine(),
}
}
// Engine возвращает движок базы данных
func (h *Handler) Engine() *engine.Engine {
return h.engine
}
// Run запускает основной цикл обработки команд
func (h *Handler) Run() error {
utils.PrintBanner()
utils.PrintInfo("Добро пожаловать в Futriis DB. Введите 'help' для списка команд.\n")
for {
// Читаем команду
input, err := h.prompt.ReadLine()
if err != nil {
return err
}
// Пропускаем пустые строки
if strings.TrimSpace(input) == "" {
continue
}
// Обрабатываем команду
result, err := h.engine.Execute(input)
if err != nil {
utils.PrintError("Ошибка: %v", err)
continue
}
// Выводим результат
if result != "" {
if result == "exit" {
utils.PrintInfo("До свидания!")
return nil
}
fmt.Println(result)
}
}
}

85
internal/cli/history.go Normal file
View File

@ -0,0 +1,85 @@
// /futriis/internal/cli/history.go
// Пакет cli реализует управление историей команд с поддержкой клавиши Up
package cli
import (
"os"
"golang.org/x/term"
)
// History управляет историей команд
type History struct {
commands []string
position int
maxSize int
}
// NewHistory создаёт новую историю команд
func NewHistory(maxSize int) *History {
return &History{
commands: make([]string, 0, maxSize),
position: 0,
maxSize: maxSize,
}
}
// Add добавляет команду в историю
func (h *History) Add(cmd string) {
if cmd == "" {
return
}
// Не добавляем дубликаты подряд
if len(h.commands) > 0 && h.commands[len(h.commands)-1] == cmd {
return
}
// Если достигнут максимум, удаляем самую старую команду
if len(h.commands) >= h.maxSize {
h.commands = h.commands[1:]
}
h.commands = append(h.commands, cmd)
h.position = len(h.commands)
}
// GetPrevious возвращает предыдущую команду из истории
func (h *History) GetPrevious() string {
if len(h.commands) == 0 {
return ""
}
if h.position > 0 {
h.position--
}
return h.commands[h.position]
}
// GetNext возвращает следующую команду из истории
func (h *History) GetNext() string {
if h.position < len(h.commands)-1 {
h.position++
return h.commands[h.position]
}
h.position = len(h.commands)
return ""
}
// Reset сбрасывает позицию в истории
func (h *History) Reset() {
h.position = len(h.commands)
}
// SetupRawMode устанавливает терминал в raw-режим для обработки клавиш
func SetupRawMode() (*term.State, error) {
fd := int(os.Stdin.Fd())
return term.MakeRaw(fd)
}
// RestoreMode восстанавливает режим терминала
func RestoreMode(oldState *term.State) error {
fd := int(os.Stdin.Fd())
return term.Restore(fd, oldState)
}

157
internal/cli/prompt.go Normal file
View File

@ -0,0 +1,157 @@
// /futriis/internal/cli/prompt.go
// Пакет cli реализует интерактивное приглашение с поддержкой истории команд
package cli
import (
"bufio"
"fmt"
"os"
"futriis/pkg/utils"
"github.com/mattn/go-runewidth"
"golang.org/x/term"
)
// Prompt представляет интерактивное приглашение
type Prompt struct {
history *History
buffer []rune
pos int
}
// NewPrompt создаёт новое приглашение
func NewPrompt() *Prompt {
return &Prompt{
history: NewHistory(100),
buffer: make([]rune, 0),
pos: 0,
}
}
// ReadLine читает строку с поддержкой истории и редактирования
func (p *Prompt) ReadLine() (string, error) {
oldState, err := term.MakeRaw(int(os.Stdin.Fd()))
if err != nil {
return p.readSimple()
}
defer term.Restore(int(os.Stdin.Fd()), oldState)
// Очищаем буфер
p.buffer = make([]rune, 0)
p.pos = 0
// Показываем приглашение
fmt.Print(utils.GetPrompt())
reader := bufio.NewReader(os.Stdin)
for {
r, _, err := reader.ReadRune()
if err != nil {
return "", err
}
switch r {
case 3: // Ctrl+C
return "", nil
case 4: // Ctrl+D
if len(p.buffer) == 0 {
return "exit", nil
}
case 13, 10: // Enter
fmt.Println()
cmd := string(p.buffer)
p.history.Add(cmd)
p.history.Reset()
return cmd, nil
case 127: // Backspace
if p.pos > 0 {
// Удаляем символ перед курсором
p.buffer = append(p.buffer[:p.pos-1], p.buffer[p.pos:]...)
p.pos--
p.refreshLine()
}
case 27: // Escape sequence (стрелки)
// Читаем следующие два символа
r2, _, _ := reader.ReadRune()
r3, _, _ := reader.ReadRune()
if r2 == '[' {
switch r3 {
case 'A': // Up arrow
prev := p.history.GetPrevious()
if prev != "" {
p.buffer = []rune(prev)
p.pos = len(p.buffer)
p.refreshLine()
}
case 'B': // Down arrow
next := p.history.GetNext()
p.buffer = []rune(next)
p.pos = len(p.buffer)
p.refreshLine()
case 'C': // Right arrow
if p.pos < len(p.buffer) {
p.pos++
p.refreshLine()
}
case 'D': // Left arrow
if p.pos > 0 {
p.pos--
p.refreshLine()
}
}
}
default:
// Добавляем символ (поддержка Unicode, включая русский)
if r >= 32 { // Печатные символы
// Вставляем символ в позицию курсора
if p.pos == len(p.buffer) {
p.buffer = append(p.buffer, r)
} else {
p.buffer = append(p.buffer[:p.pos], append([]rune{r}, p.buffer[p.pos:]...)...)
}
p.pos++
p.refreshLine()
}
}
}
}
// refreshLine обновляет текущую строку с правильным позиционированием курсора
func (p *Prompt) refreshLine() {
// Сохраняем позицию
fmt.Print("\r\033[K") // Возврат в начало строки и очистка
// Печатаем приглашение и текущий буфер
promptStr := utils.ColorPromptCode + "futriis:~> " + utils.ColorReset
fmt.Print(promptStr + string(p.buffer))
// Вычисляем ширину приглашения (без ANSI кодов)
promptWidth := runewidth.StringWidth("futriis:~> ")
// Вычисляем позицию курсора (учитываем ширину каждого символа)
cursorPos := promptWidth
for i := 0; i < p.pos; i++ {
cursorPos += runewidth.RuneWidth(p.buffer[i])
}
// Возвращаем курсор на позицию
fmt.Printf("\033[%dG", cursorPos+1)
}
// readSimple читает строку без специальной обработки (fallback)
func (p *Prompt) readSimple() (string, error) {
fmt.Print(utils.GetPrompt())
reader := bufio.NewReader(os.Stdin)
return reader.ReadString('\n')
}

336
internal/cluster/node.go Normal file
View File

@ -0,0 +1,336 @@
// /futriis/internal/cluster/node.go
// Пакет cluster реализует управление кластером и репликацию
package cluster
import (
"encoding/json"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"futriis/pkg/config"
"futriis/pkg/utils"
)
// NodeState состояние узла
type NodeState int
const (
StateOffline NodeState = iota
StateJoining
StateOnline
StateLeaving
StateFailed
)
func (s NodeState) String() string {
switch s {
case StateOffline:
return "offline"
case StateJoining:
return "joining"
case StateOnline:
return "online"
case StateLeaving:
return "leaving"
case StateFailed:
return "failed"
default:
return "unknown"
}
}
// Node представляет узел кластера
type Node struct {
ID string `json:"id"`
Address string `json:"address"`
State NodeState `json:"state"`
LastSeen time.Time `json:"last_seen"`
ShardIDs []string `json:"shard_ids"`
mu sync.RWMutex
}
// ClusterManager управляет кластером
type ClusterManager struct {
nodes map[string]*Node
coordinatorAddr string
nodeID string
localAddr string
isCoordinator bool
mu sync.RWMutex
heartbeatStop chan struct{}
stats struct {
totalNodes int64
activeNodes int64
rebalancingCnt int64
}
}
// NewClusterManager создаёт новый менеджер кластера
func NewClusterManager(cfg *config.Config) *ClusterManager {
cm := &ClusterManager{
nodes: make(map[string]*Node),
coordinatorAddr: cfg.Cluster.CoordinatorAddress,
nodeID: cfg.Node.ID,
localAddr: cfg.Node.Address,
isCoordinator: cfg.Node.Address == cfg.Cluster.CoordinatorAddress,
heartbeatStop: make(chan struct{}),
}
// Добавляем себя в кластер
cm.addNode(&Node{
ID: cfg.Node.ID,
Address: cfg.Node.Address,
State: StateOnline,
LastSeen: time.Now(),
})
return cm
}
// Start запускает кластерные сервисы
func (cm *ClusterManager) Start() error {
if cm.isCoordinator {
// Запускаем координатор
go cm.startCoordinator()
}
// Запускаем heartbeat
go cm.heartbeatLoop()
utils.PrintInfo("Кластерный менеджер запущен")
return nil
}
// Stop останавливает кластерные сервисы
func (cm *ClusterManager) Stop() {
close(cm.heartbeatStop)
}
// heartbeatLoop отправляет heartbeat сигналы
func (cm *ClusterManager) heartbeatLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cm.sendHeartbeat()
case <-cm.heartbeatStop:
return
}
}
}
// sendHeartbeat отправляет heartbeat координатору
func (cm *ClusterManager) sendHeartbeat() {
if !cm.isCoordinator {
// Отправляем heartbeat координатору
conn, err := net.DialTimeout("tcp", cm.coordinatorAddr, 3*time.Second)
if err != nil {
return
}
defer conn.Close()
heartbeat := map[string]interface{}{
"type": "heartbeat",
"node_id": cm.nodeID,
"address": cm.localAddr,
"time": time.Now().Unix(),
}
json.NewEncoder(conn).Encode(heartbeat)
}
}
// startCoordinator запускает координатор кластера
func (cm *ClusterManager) startCoordinator() {
listener, err := net.Listen("tcp", cm.coordinatorAddr)
if err != nil {
utils.PrintError("Ошибка запуска координатора: %v", err)
return
}
defer listener.Close()
utils.PrintInfo("Координатор кластера запущен на " + cm.coordinatorAddr)
for {
conn, err := listener.Accept()
if err != nil {
continue
}
go cm.handleCoordinatorRequest(conn)
}
}
// handleCoordinatorRequest обрабатывает запросы к координатору
func (cm *ClusterManager) handleCoordinatorRequest(conn net.Conn) {
defer conn.Close()
var req map[string]interface{}
if err := json.NewDecoder(conn).Decode(&req); err != nil {
return
}
msgType, _ := req["type"].(string)
switch msgType {
case "heartbeat":
nodeID, _ := req["node_id"].(string)
address, _ := req["address"].(string)
cm.updateNodeHeartbeat(nodeID, address)
case "join":
nodeID, _ := req["node_id"].(string)
address, _ := req["address"].(string)
cm.handleNodeJoin(nodeID, address)
case "leave":
nodeID, _ := req["node_id"].(string)
cm.handleNodeLeave(nodeID)
}
}
// handleNodeJoin обрабатывает присоединение узла
func (cm *ClusterManager) handleNodeJoin(nodeID, address string) {
cm.mu.Lock()
defer cm.mu.Unlock()
if node, exists := cm.nodes[nodeID]; exists {
node.State = StateOnline
node.LastSeen = time.Now()
} else {
cm.nodes[nodeID] = &Node{
ID: nodeID,
Address: address,
State: StateOnline,
LastSeen: time.Now(),
}
atomic.AddInt64(&cm.stats.totalNodes, 1)
}
atomic.AddInt64(&cm.stats.activeNodes, 1)
utils.PrintSuccess("Узел %s (%s) присоединился к кластеру", nodeID, address)
}
// handleNodeLeave обрабатывает отключение узла
func (cm *ClusterManager) handleNodeLeave(nodeID string) {
cm.mu.Lock()
defer cm.mu.Unlock()
if node, exists := cm.nodes[nodeID]; exists {
node.State = StateOffline
atomic.AddInt64(&cm.stats.activeNodes, -1)
utils.PrintWarning("Узел %s покинул кластер", nodeID)
}
}
// updateNodeHeartbeat обновляет время последнего heartbeat узла
func (cm *ClusterManager) updateNodeHeartbeat(nodeID, address string) {
cm.mu.Lock()
defer cm.mu.Unlock()
if node, exists := cm.nodes[nodeID]; exists {
node.LastSeen = time.Now()
if node.State == StateOffline {
node.State = StateOnline
atomic.AddInt64(&cm.stats.activeNodes, 1)
}
} else {
cm.nodes[nodeID] = &Node{
ID: nodeID,
Address: address,
State: StateOnline,
LastSeen: time.Now(),
}
atomic.AddInt64(&cm.stats.totalNodes, 1)
atomic.AddInt64(&cm.stats.activeNodes, 1)
}
}
// AddNode добавляет новый узел в кластер
func (cm *ClusterManager) AddNode(address string) error {
if !cm.isCoordinator {
return errors.New("только координатор может добавлять узлы")
}
cm.mu.Lock()
defer cm.mu.Unlock()
// Генерируем ID для нового узла
nodeID := fmt.Sprintf("node-%d", len(cm.nodes)+1)
cm.nodes[nodeID] = &Node{
ID: nodeID,
Address: address,
State: StateJoining,
LastSeen: time.Now(),
}
atomic.AddInt64(&cm.stats.totalNodes, 1)
utils.PrintSuccess("Узел %s (%s) добавлен в кластер", nodeID, address)
return nil
}
// RemoveNode удаляет узел из кластера
func (cm *ClusterManager) RemoveNode(nodeID string) error {
if !cm.isCoordinator {
return errors.New("только координатор может удалять узлы")
}
cm.mu.Lock()
defer cm.mu.Unlock()
if node, exists := cm.nodes[nodeID]; exists {
node.State = StateLeaving
delete(cm.nodes, nodeID)
atomic.AddInt64(&cm.stats.totalNodes, -1)
if node.State == StateOnline {
atomic.AddInt64(&cm.stats.activeNodes, -1)
}
utils.PrintSuccess("Узел %s удален из кластера", nodeID)
return nil
}
return errors.New("узел не найден")
}
// GetClusterStatus возвращает статус кластера
func (cm *ClusterManager) GetClusterStatus() map[string]interface{} {
cm.mu.RLock()
defer cm.mu.RUnlock()
status := make(map[string]interface{})
status["total_nodes"] = atomic.LoadInt64(&cm.stats.totalNodes)
status["active_nodes"] = atomic.LoadInt64(&cm.stats.activeNodes)
status["is_coordinator"] = cm.isCoordinator
status["coordinator"] = cm.coordinatorAddr
nodes := make([]map[string]interface{}, 0, len(cm.nodes))
for id, node := range cm.nodes {
nodes = append(nodes, map[string]interface{}{
"id": id,
"address": node.Address,
"state": node.State.String(),
"last_seen": node.LastSeen.Format(time.RFC3339),
})
}
status["nodes"] = nodes
return status
}
// addNode добавляет узел во внутреннюю карту
func (cm *ClusterManager) addNode(node *Node) {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.nodes[node.ID] = node
atomic.AddInt64(&cm.stats.totalNodes, 1)
if node.State == StateOnline {
atomic.AddInt64(&cm.stats.activeNodes, 1)
}
}

521
internal/engine/engine.go Normal file
View File

@ -0,0 +1,521 @@
// /futriis/internal/engine/engine.go
// Пакет engine реализует ядро СУБД, координирующее все операции
package engine
import (
"fmt"
"strings"
"futriis/internal/cluster"
"futriis/internal/lua"
"futriis/internal/replication"
"futriis/internal/storage"
"futriis/internal/transaction"
"futriis/pkg/config"
"futriis/pkg/utils"
)
// Engine представляет ядро СУБД
type Engine struct {
storage *storage.Storage
clusterMgr *cluster.ClusterManager
txMgr *transaction.TransactionManager
luaMgr *lua.PluginManager
aofMgr *replication.AOFManager
cfg *config.Config
}
// NewEngine создаёт новый экземпляр ядра СУБД
func NewEngine() *Engine {
cfg := config.Get()
// Создаём AOF менеджер
aofMgr, _ := replication.NewAOFManager(cfg.Node.AOFFile, cfg.Node.AOFEnabled)
// Воспроизводим AOF если нужно
if aofMgr != nil && cfg.Node.AOFEnabled {
// TODO: восстановление состояния из AOF
}
return &Engine{
storage: storage.NewStorage(),
clusterMgr: cluster.NewClusterManager(cfg),
txMgr: transaction.NewTransactionManager(),
luaMgr: lua.NewPluginManager(&cfg.Lua),
aofMgr: aofMgr,
cfg: cfg,
}
}
// Execute выполняет команду и возвращает результат
func (e *Engine) Execute(input string) (string, error) {
// Разбираем ввод
parts := strings.Fields(input)
if len(parts) == 0 {
return "", nil
}
command := strings.ToLower(parts[0])
args := parts[1:]
// Записываем команду в AOF
if e.aofMgr != nil {
argsInterface := make([]interface{}, len(args))
for i, v := range args {
argsInterface[i] = v
}
e.aofMgr.Append(command, argsInterface)
}
// Обработка команд
switch command {
case "help":
return e.help(), nil
case "exit", "quit":
return "exit", nil
case "create":
return e.handleCreate(args)
case "delete":
return e.handleDelete(args)
case "update":
return e.handleUpdate(args)
case "list":
return e.handleList(args)
case "show":
return e.handleShow(args)
case "begin":
return e.handleBegin()
case "commit":
return e.handleCommit()
case "rollback":
return e.handleRollback()
case "cluster.status":
return e.handleClusterStatus()
case "add.node":
return e.handleAddNode(args)
case "evict.node":
return e.handleEvictNode(args)
case "lua":
return e.handleLua(args)
default:
return "", fmt.Errorf("неизвестная команда: %s", command)
}
}
// handleCreate обрабатывает команды создания
func (e *Engine) handleCreate(args []string) (string, error) {
if len(args) < 2 {
return "", fmt.Errorf("недостаточно аргументов для команды create")
}
switch args[0] {
case "tapple":
if len(args) < 2 {
return "", fmt.Errorf("не указано имя таппла")
}
return e.createTapple(args[1])
case "slice":
if len(args) < 3 {
return "", fmt.Errorf("недостаточно аргументов для создания слайса")
}
return e.createSlice(args[1], args[2])
case "tuple":
if len(args) < 4 {
return "", fmt.Errorf("недостаточно аргументов для создания кортежа")
}
return e.createTuple(args[1], args[2], args[3], args[4:])
default:
return "", fmt.Errorf("неизвестный тип создания: %s", args[0])
}
}
// handleDelete обрабатывает команды удаления
func (e *Engine) handleDelete(args []string) (string, error) {
if len(args) < 2 {
return "", fmt.Errorf("недостаточно аргументов для команды delete")
}
switch args[0] {
case "tapple":
if len(args) < 2 {
return "", fmt.Errorf("не указано имя таппла")
}
return e.deleteTapple(args[1])
case "slice":
if len(args) < 3 {
return "", fmt.Errorf("недостаточно аргументов для удаления слайса")
}
return e.deleteSlice(args[1], args[2])
case "tuple":
if len(args) < 4 {
return "", fmt.Errorf("недостаточно аргументов для удаления кортежа")
}
return e.deleteTuple(args[1], args[2], args[3])
default:
return "", fmt.Errorf("неизвестный тип удаления: %s", args[0])
}
}
// handleUpdate обрабатывает команды обновления
func (e *Engine) handleUpdate(args []string) (string, error) {
if len(args) < 4 || args[0] != "tuple" {
return "", fmt.Errorf("неверная команда update")
}
return e.updateTuple(args[1], args[2], args[3], args[4:])
}
// handleList обрабатывает команды списка
func (e *Engine) handleList(args []string) (string, error) {
if len(args) < 1 {
return "", fmt.Errorf("недостаточно аргументов для команды list")
}
switch args[0] {
case "tapples":
return e.listTapples()
case "slices":
if len(args) < 2 {
return "", fmt.Errorf("не указано имя таппла")
}
return e.listSlices(args[1])
default:
return "", fmt.Errorf("неизвестный тип списка: %s", args[0])
}
}
// handleShow обрабатывает команды показа
func (e *Engine) handleShow(args []string) (string, error) {
if len(args) < 2 || args[0] != "tuples" {
return "", fmt.Errorf("неверная команда show")
}
if len(args) < 3 {
return "", fmt.Errorf("недостаточно аргументов для show tuples")
}
return e.showTuples(args[1], args[2])
}
// handleBegin начинает транзакцию
func (e *Engine) handleBegin() (string, error) {
id, err := e.txMgr.Begin()
if err != nil {
return "", err
}
return utils.ColorGreen + "Транзакция начата. ID: " + id + utils.ColorReset, nil
}
// handleCommit фиксирует транзакцию
func (e *Engine) handleCommit() (string, error) {
err := e.txMgr.Commit()
if err != nil {
return "", err
}
return utils.ColorGreen + "Транзакция зафиксирована" + utils.ColorReset, nil
}
// handleRollback откатывает транзакцию
func (e *Engine) handleRollback() (string, error) {
err := e.txMgr.Rollback()
if err != nil {
return "", err
}
return utils.ColorGreen + "Транзакция отменена" + utils.ColorReset, nil
}
// handleClusterStatus показывает статус кластера
func (e *Engine) handleClusterStatus() (string, error) {
status := e.clusterMgr.GetClusterStatus()
result := utils.ColorCyan + "Статус кластера:" + utils.ColorReset + "\n"
result += fmt.Sprintf(" Всего узлов: %d\n", status["total_nodes"])
result += fmt.Sprintf(" Активных узлов: %d\n", status["active_nodes"])
result += fmt.Sprintf(" Координатор: %v\n", status["coordinator"])
nodes, _ := status["nodes"].([]map[string]interface{})
if len(nodes) > 0 {
result += utils.ColorCyan + "\nУзлы кластера:" + utils.ColorReset + "\n"
for _, node := range nodes {
result += fmt.Sprintf(" %s (%s) - %s, last seen: %s\n",
node["id"], node["address"], node["state"], node["last_seen"])
}
}
return result, nil
}
// handleAddNode добавляет узел в кластер
func (e *Engine) handleAddNode(args []string) (string, error) {
if len(args) < 1 {
return "", fmt.Errorf("укажите адрес узла")
}
address := args[0]
err := e.clusterMgr.AddNode(address)
if err != nil {
return "", err
}
return utils.ColorGreen + "Узел " + address + " добавлен в кластер" + utils.ColorReset, nil
}
// handleEvictNode удаляет узел из кластера
func (e *Engine) handleEvictNode(args []string) (string, error) {
if len(args) < 1 {
return "", fmt.Errorf("укажите ID узла или адрес")
}
nodeID := args[0]
err := e.clusterMgr.RemoveNode(nodeID)
if err != nil {
return "", err
}
return utils.ColorGreen + "Узел " + nodeID + " удален из кластера" + utils.ColorReset, nil
}
// handleLua выполняет Lua скрипт
func (e *Engine) handleLua(args []string) (string, error) {
if len(args) < 1 {
return "", fmt.Errorf("укажите имя плагина")
}
pluginName := args[0]
err := e.luaMgr.ExecutePlugin(pluginName)
if err != nil {
return "", err
}
return utils.ColorGreen + "Плагин выполнен" + utils.ColorReset, nil
}
// Методы для работы с тапплами
func (e *Engine) createTapple(name string) (string, error) {
tapple, err := e.storage.GetTappleManager().CreateTapple(name)
if err != nil {
return "", err
}
return utils.ColorGreen + "Таппл '" + tapple.Name + "' успешно создан" + utils.ColorReset, nil
}
func (e *Engine) deleteTapple(name string) (string, error) {
err := e.storage.GetTappleManager().DeleteTapple(name)
if err != nil {
return "", err
}
return utils.ColorGreen + "Таппл '" + name + "' успешно удален" + utils.ColorReset, nil
}
func (e *Engine) listTapples() (string, error) {
tapples := e.storage.GetTappleManager().ListTapples()
if len(tapples) == 0 {
return utils.ColorYellow + "Тапплы не найдены" + utils.ColorReset, nil
}
result := utils.ColorCyan + "Список тапплов:" + utils.ColorReset + "\n"
for _, t := range tapples {
result += " " + utils.ColorGreen + t + utils.ColorReset + "\n"
}
return result, nil
}
// Методы для работы со слайсами
func (e *Engine) createSlice(tappleName, sliceName string) (string, error) {
tapple, err := e.storage.GetTappleManager().GetTapple(tappleName)
if err != nil {
return "", err
}
slice, err := e.storage.GetTappleManager().GetSliceManager().CreateSlice(tapple, sliceName)
if err != nil {
return "", err
}
return utils.ColorGreen + "Слайс '" + slice.Name + "' в таппле '" + tappleName + "' успешно создан" + utils.ColorReset, nil
}
func (e *Engine) deleteSlice(tappleName, sliceName string) (string, error) {
tapple, err := e.storage.GetTappleManager().GetTapple(tappleName)
if err != nil {
return "", err
}
err = e.storage.GetTappleManager().GetSliceManager().DeleteSlice(tapple, sliceName)
if err != nil {
return "", err
}
return utils.ColorGreen + "Слайс '" + sliceName + "' в таппле '" + tappleName + "' успешно удален" + utils.ColorReset, nil
}
func (e *Engine) listSlices(tappleName string) (string, error) {
tapple, err := e.storage.GetTappleManager().GetTapple(tappleName)
if err != nil {
return "", err
}
slices := e.storage.GetTappleManager().GetSliceManager().ListSlices(tapple)
if len(slices) == 0 {
return utils.ColorYellow + "Слайсы в таппле '" + tappleName + "' не найдены" + utils.ColorReset, nil
}
result := utils.ColorCyan + "Список слайсов в таппле '" + tappleName + "':" + utils.ColorReset + "\n"
for _, s := range slices {
result += " " + utils.ColorGreen + s + utils.ColorReset + "\n"
}
return result, nil
}
// Методы для работы с кортежами
func (e *Engine) createTuple(tappleName, sliceName, tupleID string, fieldsArgs []string) (string, error) {
tapple, err := e.storage.GetTappleManager().GetTapple(tappleName)
if err != nil {
return "", err
}
slice, err := e.storage.GetTappleManager().GetSliceManager().GetSlice(tapple, sliceName)
if err != nil {
return "", err
}
// Парсим поля
fields := make(map[string]interface{})
for _, arg := range fieldsArgs {
parts := strings.SplitN(arg, "=", 2)
if len(parts) == 2 {
fields[parts[0]] = parts[1]
}
}
tuple, err := e.storage.GetTappleManager().GetSliceManager().GetTupleManager().CreateTuple(slice, tupleID, fields)
if err != nil {
return "", err
}
return utils.ColorGreen + "Кортеж '" + tuple.ID + "' в слайсе '" + sliceName + "' успешно создан" + utils.ColorReset, nil
}
func (e *Engine) deleteTuple(tappleName, sliceName, tupleID string) (string, error) {
tapple, err := e.storage.GetTappleManager().GetTapple(tappleName)
if err != nil {
return "", err
}
slice, err := e.storage.GetTappleManager().GetSliceManager().GetSlice(tapple, sliceName)
if err != nil {
return "", err
}
err = e.storage.GetTappleManager().GetSliceManager().GetTupleManager().DeleteTuple(slice, tupleID)
if err != nil {
return "", err
}
return utils.ColorGreen + "Кортеж '" + tupleID + "' в слайсе '" + sliceName + "' успешно удален" + utils.ColorReset, nil
}
func (e *Engine) updateTuple(tappleName, sliceName, tupleID string, fieldsArgs []string) (string, error) {
tapple, err := e.storage.GetTappleManager().GetTapple(tappleName)
if err != nil {
return "", err
}
slice, err := e.storage.GetTappleManager().GetSliceManager().GetSlice(tapple, sliceName)
if err != nil {
return "", err
}
// Парсим поля
fields := make(map[string]interface{})
for _, arg := range fieldsArgs {
parts := strings.SplitN(arg, "=", 2)
if len(parts) == 2 {
fields[parts[0]] = parts[1]
}
}
tuple, err := e.storage.GetTappleManager().GetSliceManager().GetTupleManager().UpdateTuple(slice, tupleID, fields)
if err != nil {
return "", err
}
return utils.ColorGreen + "Кортеж '" + tuple.ID + "' в слайсе '" + sliceName + "' успешно обновлен" + utils.ColorReset, nil
}
// showTuples показывает все кортежи в слайсе
func (e *Engine) showTuples(tappleName, sliceName string) (string, error) {
tapple, err := e.storage.GetTappleManager().GetTapple(tappleName)
if err != nil {
return "", err
}
slice, err := e.storage.GetTappleManager().GetSliceManager().GetSlice(tapple, sliceName)
if err != nil {
return "", err
}
slice.RLock()
defer slice.RUnlock()
if len(slice.Tuples) == 0 {
return utils.ColorYellow + "Кортежи в слайсе '" + sliceName + "' не найдены" + utils.ColorReset, nil
}
result := utils.ColorCyan + "Список кортежей в слайсе '" + sliceName + "':" + utils.ColorReset + "\n"
for id, tuple := range slice.Tuples {
result += " " + utils.ColorGreen + "ID: " + id + utils.ColorReset + "\n"
for k, v := range tuple.Fields {
// ИСПРАВЛЕНО: заменено ColorPrompt на ColorPromptCode
result += " " + utils.ColorYellow + k + utils.ColorReset + ": " + utils.ColorPromptCode + fmt.Sprint(v) + utils.ColorReset + "\n"
}
result += "\n"
}
return result, nil
}
// help возвращает справку по командам
func (e *Engine) help() string {
help := utils.ColorCyan + "Доступные команды:" + utils.ColorReset + "\n"
help += "\n" + utils.ColorYellow + "Основные команды:" + utils.ColorReset + "\n"
help += " " + utils.ColorGreen + "create tapple <name>" + utils.ColorReset + " - создать новый таппл (базу данных)\n"
help += " " + utils.ColorGreen + "create slice <tapple> <name>" + utils.ColorReset + " - создать новый слайс (таблицу)\n"
help += " " + utils.ColorGreen + "create tuple <tapple> <slice> <id> [key=value...]" + utils.ColorReset + " - создать новый кортеж (запись)\n"
help += " " + utils.ColorGreen + "delete tapple <name>" + utils.ColorReset + " - удалить таппл\n"
help += " " + utils.ColorGreen + "delete slice <tapple> <name>" + utils.ColorReset + " - удалить слайс\n"
help += " " + utils.ColorGreen + "delete tuple <tapple> <slice> <id>" + utils.ColorReset + " - удалить кортеж\n"
help += " " + utils.ColorGreen + "update tuple <tapple> <slice> <id> [key=value...]" + utils.ColorReset + " - обновить кортеж\n"
help += " " + utils.ColorGreen + "list tapples" + utils.ColorReset + " - показать все тапплы\n"
help += " " + utils.ColorGreen + "list slices <tapple>" + utils.ColorReset + " - показать все слайсы в таппле\n"
help += " " + utils.ColorGreen + "show tuples <tapple> <slice>" + utils.ColorReset + " - показать все кортежи в слайсе\n"
help += "\n" + utils.ColorYellow + "Транзакции:" + utils.ColorReset + "\n"
help += " " + utils.ColorGreen + "begin" + utils.ColorReset + " - начать транзакцию\n"
help += " " + utils.ColorGreen + "commit" + utils.ColorReset + " - зафиксировать транзакцию\n"
help += " " + utils.ColorGreen + "rollback" + utils.ColorReset + " - отменить транзакцию\n"
help += "\n" + utils.ColorYellow + "Управление кластером:" + utils.ColorReset + "\n"
help += " " + utils.ColorGreen + "cluster.status" + utils.ColorReset + " - показать статус кластера\n"
help += " " + utils.ColorGreen + "add.node <address>" + utils.ColorReset + " - добавить узел в кластер\n"
help += " " + utils.ColorGreen + "evict.node <node_id>" + utils.ColorReset + " - удалить узел из кластера\n"
help += "\n" + utils.ColorYellow + "Lua плагины:" + utils.ColorReset + "\n"
help += " " + utils.ColorGreen + "lua <plugin_name>" + utils.ColorReset + " - выполнить Lua плагин\n"
help += "\n" + utils.ColorYellow + "Прочее:" + utils.ColorReset + "\n"
help += " " + utils.ColorGreen + "exit/quit" + utils.ColorReset + " - выйти из СУБД\n"
return help
}

150
internal/lua/plugin.go Normal file
View File

@ -0,0 +1,150 @@
// /futriis/internal/lua/plugin.go
// Пакет lua реализует поддержку Lua плагинов
package lua
import (
"fmt"
"io/ioutil"
"path/filepath"
"sync"
"futriis/pkg/config"
"futriis/pkg/utils"
"github.com/yuin/gopher-lua"
)
// PluginManager управляет Lua плагинами
type PluginManager struct {
state *lua.LState
plugins map[string]*lua.LFunction
mu sync.RWMutex
enabled bool
}
// NewPluginManager создаёт новый менеджер плагинов
func NewPluginManager(cfg *config.LuaConfig) *PluginManager {
pm := &PluginManager{
plugins: make(map[string]*lua.LFunction),
enabled: cfg.Enabled,
}
if cfg.Enabled {
pm.state = lua.NewState()
pm.registerFunctions()
}
return pm
}
// registerFunctions регистрирует функции Go, доступные из Lua
func (pm *PluginManager) registerFunctions() {
if !pm.enabled || pm.state == nil {
return
}
// Регистрируем функции для работы с данными
pm.state.SetGlobal("print", pm.state.NewFunction(func(L *lua.LState) int {
top := L.GetTop()
args := make([]interface{}, top)
for i := 1; i <= top; i++ {
args[i-1] = L.Get(i).String()
}
utils.PrintInfo(fmt.Sprint(args...))
return 0
}))
pm.state.SetGlobal("get", pm.state.NewFunction(func(L *lua.LState) int {
key := L.ToString(1)
// TODO: получить значение из хранилища
L.Push(lua.LString(key))
return 1
}))
pm.state.SetGlobal("set", pm.state.NewFunction(func(L *lua.LState) int {
key := L.ToString(1)
value := L.ToString(2)
// TODO: установить значение в хранилище
utils.PrintInfo("Lua set: %s = %s", key, value)
return 0
}))
}
// LoadPlugins загружает все Lua плагины из директории
func (pm *PluginManager) LoadPlugins(pluginsDir string) error {
if !pm.enabled || pm.state == nil {
return nil
}
files, err := ioutil.ReadDir(pluginsDir)
if err != nil {
return err
}
for _, file := range files {
if filepath.Ext(file.Name()) == ".lua" {
if err := pm.LoadPlugin(filepath.Join(pluginsDir, file.Name())); err != nil {
utils.PrintError("Ошибка загрузки плагина %s: %v", file.Name(), err)
}
}
}
return nil
}
// LoadPlugin загружает один Lua плагин
func (pm *PluginManager) LoadPlugin(path string) error {
if !pm.enabled || pm.state == nil {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
fn, err := pm.state.LoadString(string(data))
if err != nil {
return err
}
pm.mu.Lock()
pm.plugins[filepath.Base(path)] = fn
pm.mu.Unlock()
utils.PrintSuccess("Загружен плагин: %s", filepath.Base(path))
return nil
}
// ExecutePlugin выполняет функцию плагина
func (pm *PluginManager) ExecutePlugin(name string, args ...lua.LValue) error {
if !pm.enabled || pm.state == nil {
return nil
}
pm.mu.RLock()
fn, exists := pm.plugins[name]
pm.mu.RUnlock()
if !exists {
return fmt.Errorf("плагин %s не найден", name)
}
pm.state.Push(fn)
for _, arg := range args {
pm.state.Push(arg)
}
if err := pm.state.PCall(len(args), lua.MultRet, nil); err != nil {
return err
}
return nil
}
// Close закрывает Lua состояние
func (pm *PluginManager) Close() {
if pm.state != nil {
pm.state.Close()
}
}

View File

@ -0,0 +1,42 @@
// /futriis/internal/msgpack/deserializer.go
// Пакет msgpack предоставляет дополнительные методы десериализации
// /futriis/internal/msgpack/deserializer.go
package msgpack
import (
"futriis/pkg/types"
"github.com/vmihailenco/msgpack/v5"
)
// Deserializer расширяет функциональность десериализации
type Deserializer struct {
serializer *Serializer
}
// NewDeserializer создаёт новый экземпляр десериализатора
func NewDeserializer() *Deserializer {
return &Deserializer{
serializer: NewSerializer(),
}
}
// DeserializeValue десериализует значение произвольного типа
func (d *Deserializer) DeserializeValue(data []byte) (interface{}, error) {
var value interface{}
err := msgpack.Unmarshal(data, &value)
if err != nil {
return nil, err
}
return value, nil
}
// DeserializeMap десериализует данные в карту
func (d *Deserializer) DeserializeMap(data []byte) (map[string]interface{}, error) {
var m map[string]interface{}
err := msgpack.Unmarshal(data, &m)
if err != nil {
return nil, err
}
return m, nil
}

View File

@ -0,0 +1,66 @@
// /futriis/internal/msgpack/serializer.go
// Пакет msgpack реализует сериализацию и десериализацию данных с использованием MessagePack
// /futriis/internal/msgpack/serializer.go
package msgpack
import (
"futriis/pkg/types"
"github.com/vmihailenco/msgpack/v5"
)
// Serializer предоставляет методы для сериализации данных
type Serializer struct {
enc *msgpack.Encoder
dec *msgpack.Decoder
}
// NewSerializer создаёт новый экземпляр сериализатора
func NewSerializer() *Serializer {
return &Serializer{}
}
// SerializeTuple сериализует кортеж в формат MessagePack
func (s *Serializer) SerializeTuple(tuple *types.Tuple) ([]byte, error) {
return msgpack.Marshal(tuple)
}
// DeserializeTuple десериализует кортеж из формата MessagePack
func (s *Serializer) DeserializeTuple(data []byte) (*types.Tuple, error) {
var tuple types.Tuple
err := msgpack.Unmarshal(data, &tuple)
if err != nil {
return nil, err
}
return &tuple, nil
}
// SerializeSlice сериализует слайс в формат MessagePack
func (s *Serializer) SerializeSlice(slice *types.Slice) ([]byte, error) {
return msgpack.Marshal(slice)
}
// DeserializeSlice десериализует слайс из формата MessagePack
func (s *Serializer) DeserializeSlice(data []byte) (*types.Slice, error) {
var slice types.Slice
err := msgpack.Unmarshal(data, &slice)
if err != nil {
return nil, err
}
return &slice, nil
}
// SerializeTapple сериализует таппл в формат MessagePack
func (s *Serializer) SerializeTapple(tapple *types.Tapple) ([]byte, error) {
return msgpack.Marshal(tapple)
}
// DeserializeTapple десериализует таппл из формата MessagePack
func (s *Serializer) DeserializeTapple(data []byte) (*types.Tapple, error) {
var tapple types.Tapple
err := msgpack.Unmarshal(data, &tapple)
if err != nil {
return nil, err
}
return &tapple, nil
}

172
internal/replication/aof.go Normal file
View File

@ -0,0 +1,172 @@
// /futriis/internal/replication/aof.go
// Пакет replication реализует механизм AOF (Append Only File) как в Redis)
package replication
import (
"bufio"
"encoding/json"
"os"
"sync"
"time"
"futriis/pkg/utils"
)
// AOFCommand представляет команду для AOF
type AOFCommand struct {
Timestamp int64 `json:"ts"`
Command string `json:"cmd"`
Args []interface{} `json:"args"`
}
// AOFManager управляет AOF файлом
type AOFManager struct {
file *os.File
writer *bufio.Writer
mu sync.Mutex
enabled bool
filePath string
stopChan chan struct{}
}
// NewAOFManager создаёт новый менеджер AOF
func NewAOFManager(filePath string, enabled bool) (*AOFManager, error) {
aof := &AOFManager{
enabled: enabled,
filePath: filePath,
stopChan: make(chan struct{}),
}
if enabled {
if err := aof.openFile(); err != nil {
return nil, err
}
// Запускаем периодический sync
go aof.syncLoop()
}
return aof, nil
}
// openFile открывает AOF файл
func (aof *AOFManager) openFile() error {
// Создаем директорию если не существует
dir := aof.filePath[:len(aof.filePath)-len("/futriis.aof")]
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
file, err := os.OpenFile(aof.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
aof.file = file
aof.writer = bufio.NewWriter(file)
return nil
}
// syncLoop периодически синхронизирует данные на диск
func (aof *AOFManager) syncLoop() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
aof.mu.Lock()
if aof.writer != nil {
aof.writer.Flush()
aof.file.Sync()
}
aof.mu.Unlock()
case <-aof.stopChan:
return
}
}
}
// Append добавляет команду в AOF
func (aof *AOFManager) Append(cmd string, args []interface{}) error {
if !aof.enabled || aof.writer == nil {
return nil
}
aof.mu.Lock()
defer aof.mu.Unlock()
command := AOFCommand{
Timestamp: time.Now().UnixNano(),
Command: cmd,
Args: args,
}
data, err := json.Marshal(command)
if err != nil {
return err
}
if _, err := aof.writer.Write(data); err != nil {
return err
}
if _, err := aof.writer.WriteString("\n"); err != nil {
return err
}
return nil
}
// Close закрывает AOF файл
func (aof *AOFManager) Close() error {
if !aof.enabled {
return nil
}
close(aof.stopChan)
aof.mu.Lock()
defer aof.mu.Unlock()
if aof.writer != nil {
aof.writer.Flush()
}
if aof.file != nil {
return aof.file.Close()
}
return nil
}
// Replay воспроизводит команды из AOF
func (aof *AOFManager) Replay(handler func(cmd string, args []interface{}) error) error {
if !aof.enabled {
return nil
}
file, err := os.Open(aof.filePath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
var cmd AOFCommand
if err := json.Unmarshal(scanner.Bytes(), &cmd); err != nil {
continue
}
if err := handler(cmd.Command, cmd.Args); err != nil {
utils.PrintError("Ошибка воспроизведения команды: %v", err)
}
}
return scanner.Err()
}

126
internal/server/server.go Normal file
View File

@ -0,0 +1,126 @@
// /futriis/internal/server/server.go
// Пакет server реализует серверную часть для клиент-серверной архитектуры
package server
import (
"bufio"
"encoding/json"
"net"
"sync"
"futriis/internal/engine"
"futriis/pkg/utils"
)
// Server представляет сервер СУБД
type Server struct {
address string
engine *engine.Engine
listener net.Listener
clients map[net.Conn]bool
mu sync.RWMutex
stopChan chan struct{}
}
// NewServer создаёт новый сервер
func NewServer(address string, engine *engine.Engine) *Server {
return &Server{
address: address,
engine: engine,
clients: make(map[net.Conn]bool),
stopChan: make(chan struct{}),
}
}
// Start запускает сервер
func (s *Server) Start() error {
listener, err := net.Listen("tcp", s.address)
if err != nil {
return err
}
s.listener = listener
utils.PrintSuccess("Сервер запущен на %s", s.address)
go s.acceptLoop()
return nil
}
// acceptLoop принимает входящие соединения
func (s *Server) acceptLoop() {
for {
select {
case <-s.stopChan:
return
default:
conn, err := s.listener.Accept()
if err != nil {
continue
}
s.mu.Lock()
s.clients[conn] = true
s.mu.Unlock()
go s.handleClient(conn)
}
}
}
// handleClient обрабатывает клиентское соединение
func (s *Server) handleClient(conn net.Conn) {
defer func() {
s.mu.Lock()
delete(s.clients, conn)
s.mu.Unlock()
conn.Close()
}()
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
line := scanner.Text()
// Парсим JSON запрос
var req map[string]interface{}
if err := json.Unmarshal([]byte(line), &req); err != nil {
// Если не JSON, обрабатываем как простую команду
result, err := s.engine.Execute(line)
s.sendResponse(conn, result, err)
} else {
// Обрабатываем JSON запрос
cmd, _ := req["command"].(string)
result, err := s.engine.Execute(cmd)
s.sendResponse(conn, result, err)
}
}
}
// sendResponse отправляет ответ клиенту
func (s *Server) sendResponse(conn net.Conn, result string, err error) {
response := make(map[string]interface{})
if err != nil {
response["error"] = err.Error()
} else {
response["result"] = result
}
data, _ := json.Marshal(response)
conn.Write(append(data, '\n'))
}
// Stop останавливает сервер
func (s *Server) Stop() {
close(s.stopChan)
if s.listener != nil {
s.listener.Close()
}
s.mu.Lock()
for conn := range s.clients {
conn.Close()
}
s.mu.Unlock()
}

122
internal/storage/slice.go Normal file
View File

@ -0,0 +1,122 @@
// /futriis/internal/storage/slice.go
// Пакет storage реализует операции со слайсами (таблицами)
package storage
import (
"errors"
"sync/atomic"
"futriis/pkg/types"
"futriis/pkg/utils"
)
// SliceManager управляет операциями со слайсами
type SliceManager struct {
tupleManager *TupleManager
stats struct {
created int64
deleted int64
}
}
// NewSliceManager создаёт новый менеджер слайсов
func NewSliceManager() *SliceManager {
return &SliceManager{
tupleManager: NewTupleManager(),
}
}
// CreateSlice создаёт новый слайс в указанном таппле
func (sm *SliceManager) CreateSlice(tapple *types.Tapple, name string) (*types.Slice, error) {
if tapple == nil {
return nil, errors.New("tapple is nil")
}
tapple.RLock()
_, exists := tapple.Slices[name]
tapple.RUnlock()
if exists {
return nil, errors.New("slice already exists")
}
slice := types.NewSlice(name)
tapple.Lock()
tapple.Slices[name] = slice
tapple.Unlock()
atomic.AddInt64(&sm.stats.created, 1)
logger := utils.GetLogger()
if logger != nil {
logger.Log("INFO", "Created slice: "+name)
}
return slice, nil
}
// GetSlice возвращает слайс по имени
func (sm *SliceManager) GetSlice(tapple *types.Tapple, name string) (*types.Slice, error) {
if tapple == nil {
return nil, errors.New("tapple is nil")
}
tapple.RLock()
slice, exists := tapple.Slices[name]
tapple.RUnlock()
if !exists {
return nil, errors.New("slice not found")
}
return slice, nil
}
// DeleteSlice удаляет слайс
func (sm *SliceManager) DeleteSlice(tapple *types.Tapple, name string) error {
if tapple == nil {
return errors.New("tapple is nil")
}
tapple.Lock()
_, exists := tapple.Slices[name]
if !exists {
tapple.Unlock()
return errors.New("slice not found")
}
delete(tapple.Slices, name)
tapple.Unlock()
atomic.AddInt64(&sm.stats.deleted, 1)
logger := utils.GetLogger()
if logger != nil {
logger.Log("INFO", "Deleted slice: "+name)
}
return nil
}
// ListSlices возвращает список всех слайсов в таппле
func (sm *SliceManager) ListSlices(tapple *types.Tapple) []string {
if tapple == nil {
return nil
}
tapple.RLock()
defer tapple.RUnlock()
slices := make([]string, 0, len(tapple.Slices))
for name := range tapple.Slices {
slices = append(slices, name)
}
return slices
}
// GetTupleManager возвращает менеджер кортежей
func (sm *SliceManager) GetTupleManager() *TupleManager {
return sm.tupleManager
}

View File

@ -0,0 +1,40 @@
// /futriis/internal/storage/storage.go
// Пакет storage предоставляет единую точку доступа к хранилищу данных
package storage
import (
"futriis/pkg/types"
)
// Storage представляет основное хранилище данных
type Storage struct {
tappleManager *TappleManager
}
// NewStorage создаёт новое хранилище
func NewStorage() *Storage {
return &Storage{
tappleManager: NewTappleManager(),
}
}
// GetTappleManager возвращает менеджер тапплов
func (s *Storage) GetTappleManager() *TappleManager {
return s.tappleManager
}
// ExecuteCommand выполняет команду над хранилищем
func (s *Storage) ExecuteCommand(cmd string, args []string) (interface{}, error) {
// Будет расширяться по мере добавления команд
return nil, nil
}
// Backup создаёт резервную копию всех данных
func (s *Storage) Backup() map[string]*types.Tapple {
backup := make(map[string]*types.Tapple)
for name, tapple := range s.tappleManager.tapples {
backup[name] = tapple
}
return backup
}

View File

@ -0,0 +1,90 @@
// /futriis/internal/storage/tapple.go
// Пакет storage реализует операции с тапплами (базами данных)
package storage
import (
"errors"
"sync/atomic"
"futriis/pkg/types"
"futriis/pkg/utils"
)
// TappleManager управляет операциями с тапплами
type TappleManager struct {
tapples map[string]*types.Tapple
sliceManager *SliceManager
stats struct {
created int64
deleted int64
}
}
// NewTappleManager создаёт новый менеджер тапплов
func NewTappleManager() *TappleManager {
return &TappleManager{
tapples: make(map[string]*types.Tapple),
sliceManager: NewSliceManager(),
}
}
// CreateTapple создаёт новый таппл
func (tm *TappleManager) CreateTapple(name string) (*types.Tapple, error) {
if _, exists := tm.tapples[name]; exists {
return nil, errors.New("tapple already exists")
}
tapple := types.NewTapple(name)
tm.tapples[name] = tapple
atomic.AddInt64(&tm.stats.created, 1)
logger := utils.GetLogger()
if logger != nil {
logger.Log("INFO", "Created tapple: "+name)
}
return tapple, nil
}
// GetTapple возвращает таппл по имени
func (tm *TappleManager) GetTapple(name string) (*types.Tapple, error) {
tapple, exists := tm.tapples[name]
if !exists {
return nil, errors.New("tapple not found")
}
return tapple, nil
}
// DeleteTapple удаляет таппл
func (tm *TappleManager) DeleteTapple(name string) error {
if _, exists := tm.tapples[name]; !exists {
return errors.New("tapple not found")
}
delete(tm.tapples, name)
atomic.AddInt64(&tm.stats.deleted, 1)
logger := utils.GetLogger()
if logger != nil {
logger.Log("INFO", "Deleted tapple: "+name)
}
return nil
}
// ListTapples возвращает список всех тапплов
func (tm *TappleManager) ListTapples() []string {
tapples := make([]string, 0, len(tm.tapples))
for name := range tm.tapples {
tapples = append(tapples, name)
}
return tapples
}
// GetSliceManager возвращает менеджер слайсов
func (tm *TappleManager) GetSliceManager() *SliceManager {
return tm.sliceManager
}

149
internal/storage/tuple.go Normal file
View File

@ -0,0 +1,149 @@
// /futriis/internal/storage/tuple.go
// Пакет storage реализует операции с кортежами (записями)
package storage
import (
"errors"
"sync/atomic"
"futriis/pkg/types"
"futriis/pkg/utils"
)
// TupleManager управляет операциями с кортежами
type TupleManager struct {
stats struct {
created int64
updated int64
deleted int64
read int64
}
}
// NewTupleManager создаёт новый менеджер кортежей
func NewTupleManager() *TupleManager {
return &TupleManager{}
}
// CreateTuple создаёт новый кортеж в указанном слайсе
// Wait-free операция: использует атомарные операции для счётчиков
func (tm *TupleManager) CreateTuple(slice *types.Slice, id string, fields map[string]interface{}) (*types.Tuple, error) {
if slice == nil {
return nil, errors.New("slice is nil")
}
// Проверяем существование кортежа
slice.RLock()
_, exists := slice.Tuples[id]
slice.RUnlock()
if exists {
return nil, errors.New("tuple already exists")
}
// Создаём новый кортеж
tuple := types.NewTuple(id)
for k, v := range fields {
tuple.Fields[k] = v
}
// Добавляем в слайс
slice.Lock()
slice.Tuples[id] = tuple
slice.Unlock()
// Атомарно увеличиваем счётчик созданных
atomic.AddInt64(&tm.stats.created, 1)
// Логируем операцию
logger := utils.GetLogger()
if logger != nil {
logger.Log("INFO", "Created tuple: "+id)
}
return tuple, nil
}
// ReadTuple читает кортеж по ID
// Wait-free операция для чтения
func (tm *TupleManager) ReadTuple(slice *types.Slice, id string) (*types.Tuple, error) {
if slice == nil {
return nil, errors.New("slice is nil")
}
slice.RLock()
tuple, exists := slice.Tuples[id]
slice.RUnlock()
if !exists {
return nil, errors.New("tuple not found")
}
atomic.AddInt64(&tm.stats.read, 1)
return tuple, nil
}
// UpdateTuple обновляет поля кортежа
func (tm *TupleManager) UpdateTuple(slice *types.Slice, id string, fields map[string]interface{}) (*types.Tuple, error) {
if slice == nil {
return nil, errors.New("slice is nil")
}
slice.Lock()
defer slice.Unlock()
tuple, exists := slice.Tuples[id]
if !exists {
return nil, errors.New("tuple not found")
}
// Обновляем поля
for k, v := range fields {
tuple.Fields[k] = v
}
atomic.AddInt64(&tm.stats.updated, 1)
logger := utils.GetLogger()
if logger != nil {
logger.Log("INFO", "Updated tuple: "+id)
}
return tuple, nil
}
// DeleteTuple удаляет кортеж
func (tm *TupleManager) DeleteTuple(slice *types.Slice, id string) error {
if slice == nil {
return errors.New("slice is nil")
}
slice.Lock()
_, exists := slice.Tuples[id]
if !exists {
slice.Unlock()
return errors.New("tuple not found")
}
delete(slice.Tuples, id)
slice.Unlock()
atomic.AddInt64(&tm.stats.deleted, 1)
logger := utils.GetLogger()
if logger != nil {
logger.Log("INFO", "Deleted tuple: "+id)
}
return nil
}
// GetStats возвращает статистику операций
func (tm *TupleManager) GetStats() map[string]int64 {
return map[string]int64{
"created": atomic.LoadInt64(&tm.stats.created),
"updated": atomic.LoadInt64(&tm.stats.updated),
"deleted": atomic.LoadInt64(&tm.stats.deleted),
"read": atomic.LoadInt64(&tm.stats.read),
}
}

145
internal/transaction/tx.go Normal file
View File

@ -0,0 +1,145 @@
// /futriis/internal/transaction/tx.go
// Пакет transaction реализует простые транзакции (не ACID)
package transaction
import (
"errors"
"sync"
"futriis/pkg/types"
)
// TxState состояние транзакции
type TxState int
const (
TxStateActive TxState = iota
TxStateCommited
TxStateRolledBack
)
// Operation представляет операцию в транзакции
type Operation struct {
Type string // create, update, delete
Tapple string
Slice string
Key string
Value interface{}
OldValue interface{}
}
// Transaction представляет транзакцию
type Transaction struct {
ID string
State TxState
Operations []Operation
mu sync.RWMutex
}
// TransactionManager управляет транзакциями
type TransactionManager struct {
transactions map[string]*Transaction
currentTx *Transaction
mu sync.RWMutex
}
// NewTransactionManager создаёт новый менеджер транзакций
func NewTransactionManager() *TransactionManager {
return &TransactionManager{
transactions: make(map[string]*Transaction),
}
}
// Begin начинает новую транзакцию
func (tm *TransactionManager) Begin() (string, error) {
tm.mu.Lock()
defer tm.mu.Unlock()
if tm.currentTx != nil && tm.currentTx.State == TxStateActive {
return "", errors.New("транзакция уже активна")
}
id := generateTxID()
tx := &Transaction{
ID: id,
State: TxStateActive,
Operations: make([]Operation, 0),
}
tm.transactions[id] = tx
tm.currentTx = tx
return id, nil
}
// Commit фиксирует текущую транзакцию
func (tm *TransactionManager) Commit() error {
tm.mu.Lock()
defer tm.mu.Unlock()
if tm.currentTx == nil {
return errors.New("нет активной транзакции")
}
if tm.currentTx.State != TxStateActive {
return errors.New("транзакция не активна")
}
tm.currentTx.State = TxStateCommited
tm.currentTx = nil
return nil
}
// Rollback откатывает текущую транзакцию
func (tm *TransactionManager) Rollback() error {
tm.mu.Lock()
defer tm.mu.Unlock()
if tm.currentTx == nil {
return errors.New("нет активной транзакции")
}
if tm.currentTx.State != TxStateActive {
return errors.New("транзакция не активна")
}
tm.currentTx.State = TxStateRolledBack
tm.currentTx = nil
return nil
}
// AddOperation добавляет операцию в текущую транзакцию
func (tm *TransactionManager) AddOperation(op Operation) error {
tm.mu.RLock()
tx := tm.currentTx
tm.mu.RUnlock()
if tx == nil {
return errors.New("нет активной транзакции")
}
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.State != TxStateActive {
return errors.New("транзакция не активна")
}
tx.Operations = append(tx.Operations, op)
return nil
}
// GetCurrentTx возвращает текущую транзакцию
func (tm *TransactionManager) GetCurrentTx() *Transaction {
tm.mu.RLock()
defer tm.mu.RUnlock()
return tm.currentTx
}
// generateTxID генерирует ID транзакции
func generateTxID() string {
return "tx-" + types.GenerateID()
}

133
pkg/config/config.go Normal file
View File

@ -0,0 +1,133 @@
// /futriis/pkg/config/config.go
// Пакет config предоставляет функциональность для работы с конфигурацией
package config
import (
"sync/atomic"
"github.com/BurntSushi/toml"
"futriis/pkg/utils"
)
// ClusterConfig конфигурация кластера
type ClusterConfig struct {
Name string `toml:"name"`
CoordinatorAddress string `toml:"coordinator_address"`
ReplicationFactor int `toml:"replication_factor"`
SyncReplication bool `toml:"sync_replication"`
AutoRebalance bool `toml:"auto_rebalance"`
}
// NodeConfig конфигурация узла
type NodeConfig struct {
ID string `toml:"id"`
Address string `toml:"address"`
DataDir string `toml:"data_dir"`
AOFEnabled bool `toml:"aof_enabled"`
AOFFile string `toml:"aof_file"`
}
// StorageConfig конфигурация хранилища
type StorageConfig struct {
PageSize int `toml:"page_size"`
MaxMemory string `toml:"max_memory"`
EvictionPolicy string `toml:"eviction_policy"`
}
// ReplicationConfig конфигурация репликации
type ReplicationConfig struct {
Enabled bool `toml:"enabled"`
SyncMode string `toml:"sync_mode"`
HeartbeatInterval int `toml:"heartbeat_interval"`
Timeout int `toml:"timeout"`
}
// LuaConfig конфигурация Lua плагинов
type LuaConfig struct {
Enabled bool `toml:"enabled"`
PluginsDir string `toml:"plugins_dir"`
MaxMemory string `toml:"max_memory"`
}
// Config основная структура конфигурации
type Config struct {
Cluster ClusterConfig `toml:"cluster"`
Node NodeConfig `toml:"node"`
Storage StorageConfig `toml:"storage"`
Replication ReplicationConfig `toml:"replication"`
Lua LuaConfig `toml:"lua"`
}
var globalConfig atomic.Value
// Load загружает конфигурацию из файла
func Load(path string) (*Config, error) {
var config Config
if _, err := toml.DecodeFile(path, &config); err != nil {
return nil, err
}
// Устанавливаем значения по умолчанию, если не указаны
if config.Cluster.CoordinatorAddress == "" {
config.Cluster.CoordinatorAddress = "127.0.0.1:7379"
}
if config.Node.Address == "" {
config.Node.Address = "127.0.0.1:7380"
}
if config.Node.DataDir == "" {
config.Node.DataDir = "./data"
}
if config.Node.AOFFile == "" {
config.Node.AOFFile = "./data/futriis.aof"
}
if config.Storage.PageSize == 0 {
config.Storage.PageSize = 4096
}
if config.Replication.HeartbeatInterval == 0 {
config.Replication.HeartbeatInterval = 5
}
if config.Replication.Timeout == 0 {
config.Replication.Timeout = 30
}
if config.Lua.PluginsDir == "" {
config.Lua.PluginsDir = "./plugins"
}
globalConfig.Store(&config)
utils.PrintSuccess("Конфигурация загружена из " + path)
return &config, nil
}
// Get возвращает глобальную конфигурацию
func Get() *Config {
if cfg := globalConfig.Load(); cfg != nil {
return cfg.(*Config)
}
return nil
}
// GetClusterConfig возвращает конфигурацию кластера
func GetClusterConfig() *ClusterConfig {
if cfg := Get(); cfg != nil {
return &cfg.Cluster
}
return nil
}
// GetNodeConfig возвращает конфигурацию узла
func GetNodeConfig() *NodeConfig {
if cfg := Get(); cfg != nil {
return &cfg.Node
}
return nil
}

19
pkg/types/id.go Normal file
View File

@ -0,0 +1,19 @@
// /futriis/pkg/types/id.go
// Пакет types предоставляет утилиты для генерации ID
package types
import (
"fmt"
"sync/atomic"
"time"
)
var idCounter uint64
// GenerateID генерирует уникальный ID
func GenerateID() string {
counter := atomic.AddUint64(&idCounter, 1)
timestamp := time.Now().UnixNano()
return fmt.Sprintf("%d-%d", timestamp, counter)
}

98
pkg/types/types.go Normal file
View File

@ -0,0 +1,98 @@
// /futriis/pkg/types/types.go
// Пакет types определяет основные типы данных для СУБД futriis
package types
import (
"sync"
)
// Tuple представляет кортеж - аналог записи в таблице
// Содержит набор полей в формате ключ-значение
type Tuple struct {
ID string `msgpack:"id"`
Fields map[string]interface{} `msgpack:"fields"`
}
// NewTuple создаёт новый кортеж с указанным ID
func NewTuple(id string) *Tuple {
return &Tuple{
ID: id,
Fields: make(map[string]interface{}),
}
}
// Slice представляет слайс - аналог таблицы в РСУБД
// Содержит коллекцию кортежей
type Slice struct {
Name string `msgpack:"name"`
Tuples map[string]*Tuple `msgpack:"tuples"`
mu sync.RWMutex // Mutex для безопасного доступа, но не блокирующий wait-free операции
}
// NewSlice создаёт новый слайс с указанным именем
func NewSlice(name string) *Slice {
return &Slice{
Name: name,
Tuples: make(map[string]*Tuple),
}
}
// Lock блокирует слайс для записи
func (s *Slice) Lock() {
s.mu.Lock()
}
// Unlock разблокирует слайс после записи
func (s *Slice) Unlock() {
s.mu.Unlock()
}
// RLock блокирует слайс для чтения
func (s *Slice) RLock() {
s.mu.RLock()
}
// RUnlock разблокирует слайс после чтения
func (s *Slice) RUnlock() {
s.mu.RUnlock()
}
// Tapple представляет таппл - аналог базы данных в РСУБД
// Содержит коллекцию слайсов (таблиц)
type Tapple struct {
Name string `msgpack:"name"`
Slices map[string]*Slice `msgpack:"slices"`
mu sync.RWMutex // Mutex для безопасного доступа
}
// NewTapple создаёт новый таппл с указанным именем
func NewTapple(name string) *Tapple {
return &Tapple{
Name: name,
Slices: make(map[string]*Slice),
}
}
// Lock блокирует таппл для записи
func (t *Tapple) Lock() {
t.mu.Lock()
}
// Unlock разблокирует таппл после записи
func (t *Tapple) Unlock() {
t.mu.Unlock()
}
// RLock блокирует таппл для чтения
func (t *Tapple) RLock() {
t.mu.RLock()
}
// RUnlock разблокирует таппл после чтения
func (t *Tapple) RUnlock() {
t.mu.RUnlock()
}
// Value представляет значение любого типа, поддерживаемого MessagePack
type Value interface{}

83
pkg/utils/colors.go Normal file
View File

@ -0,0 +1,83 @@
// /futriis/pkg/utils/colors.go
// Пакет utils предоставляет вспомогательные функции для форматирования вывода
package utils
import (
"fmt"
"strings"
)
// Цветовые коды ANSI
const (
ColorReset = "\033[0m"
ColorRed = "\033[31m"
ColorGreen = "\033[32m"
ColorYellow = "\033[33m"
ColorBlue = "\033[34m"
ColorCyan = "\033[36m"
ColorPromptCode = "\033[38;5;39m" // Ярко-синий для приглашения
ColorDeepSkyBlue = "\033[38;2;0;191;255m" // #00bfff - Глубокий небесно-голубой
)
// GetPrompt возвращает строку приглашения с цветом
func GetPrompt() string {
return ColorPromptCode + "futriis:~> " + ColorReset
}
// PrintInfo выводит информационное сообщение
func PrintInfo(format string, args ...interface{}) {
fmt.Printf(ColorBlue+"[INFO] "+format+ColorReset+"\n", args...)
}
// PrintSuccess выводит сообщение об успехе
func PrintSuccess(format string, args ...interface{}) {
fmt.Printf(ColorGreen+"[OK] "+format+ColorReset+"\n", args...)
}
// PrintWarning выводит предупреждающее сообщение
func PrintWarning(format string, args ...interface{}) {
fmt.Printf(ColorYellow+"[WARNING] "+format+ColorReset+"\n", args...)
}
// PrintError выводит сообщение об ошибке
func PrintError(format string, args ...interface{}) {
fmt.Printf(ColorRed+"[ERROR] "+format+ColorReset+"\n", args...)
}
// PrintBanner выводит приветственный баннер при запуске
func PrintBanner() {
banner := `
F U T R I I S
Distributed Database System
`
// Верхняя граница цветом #00bfff
fmt.Print(ColorDeepSkyBlue + strings.Repeat("═", 35) + "\n" + ColorReset)
// Баннер построчно с выравниванием
lines := strings.Split(banner, "\n")
for _, line := range lines {
if line != "" {
// Центрируем текст
padding := (35 - len(line)) / 2
if padding < 0 {
padding = 0
}
// Левая граница цветом #00bfff
fmt.Print(ColorDeepSkyBlue + "║" + ColorReset + strings.Repeat(" ", padding))
// Логотип и описание тоже цветом #00bfff
fmt.Print(ColorDeepSkyBlue + line + ColorReset)
// Правая граница
fmt.Println(strings.Repeat(" ", 35-len(line)-padding) + ColorDeepSkyBlue + "║" + ColorReset)
}
}
// Нижняя граница цветом #00bfff
fmt.Print(ColorDeepSkyBlue + strings.Repeat("═", 35) + "\n" + ColorReset)
// Версия
fmt.Print(ColorDeepSkyBlue + "Версия 0.1.0\n\n" + ColorReset)
}

54
pkg/utils/logger.go Normal file
View File

@ -0,0 +1,54 @@
// /futriis/pkg/utils/logger.go
// Пакет utils предоставляет функции для логирования
package utils
import (
"fmt"
"os"
"time"
)
// Logger представляет структуру для логирования
type Logger struct {
file *os.File
}
var loggerInstance *Logger
// InitLogger инициализирует логгер
func InitLogger(logFile string) error {
file, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
loggerInstance = &Logger{
file: file,
}
return nil
}
// GetLogger возвращает экземпляр логгера
func GetLogger() *Logger {
return loggerInstance
}
// Log записывает сообщение в лог
func (l *Logger) Log(level, message string) {
if l == nil || l.file == nil {
return
}
timestamp := time.Now().Format("2006-01-02 15:04:05")
logLine := fmt.Sprintf("[%s] %s: %s\n", timestamp, level, message)
l.file.WriteString(logLine)
}
// Close закрывает файл лога
func (l *Logger) Close() {
if l != nil && l.file != nil {
l.file.Close()
}
}