From 67216c6066533329f4007b016c5b8b951390af56 Mon Sep 17 00:00:00 2001 From: gvsafronov Date: Mon, 23 Feb 2026 22:48:31 +0300 Subject: [PATCH] futriix for Solaris-based systems --- Makefile | 95 ++++++ README.md | 31 ++ cmd/futriis/main.go | 98 ++++++ config.toml | 32 ++ data/futriis.aof | 0 go.mod | 17 + go.sum | 25 ++ internal/cli/commands.go | 43 +++ internal/cli/handler.go | 66 ++++ internal/cli/history.go | 85 +++++ internal/cli/prompt.go | 157 ++++++++++ internal/cluster/node.go | 336 ++++++++++++++++++++ internal/engine/engine.go | 521 +++++++++++++++++++++++++++++++ internal/lua/plugin.go | 150 +++++++++ internal/msgpack/deserializer.go | 42 +++ internal/msgpack/serializer.go | 66 ++++ internal/replication/aof.go | 172 ++++++++++ internal/server/server.go | 126 ++++++++ internal/storage/slice.go | 122 ++++++++ internal/storage/storage.go | 40 +++ internal/storage/tapple.go | 90 ++++++ internal/storage/tuple.go | 149 +++++++++ internal/transaction/tx.go | 145 +++++++++ pkg/config/config.go | 133 ++++++++ pkg/types/id.go | 19 ++ pkg/types/types.go | 98 ++++++ pkg/utils/colors.go | 83 +++++ pkg/utils/logger.go | 54 ++++ 28 files changed, 2995 insertions(+) create mode 100644 Makefile create mode 100644 README.md create mode 100644 cmd/futriis/main.go create mode 100644 config.toml create mode 100644 data/futriis.aof create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/cli/commands.go create mode 100644 internal/cli/handler.go create mode 100644 internal/cli/history.go create mode 100644 internal/cli/prompt.go create mode 100644 internal/cluster/node.go create mode 100644 internal/engine/engine.go create mode 100644 internal/lua/plugin.go create mode 100644 internal/msgpack/deserializer.go create mode 100644 internal/msgpack/serializer.go create mode 100644 internal/replication/aof.go create mode 100644 internal/server/server.go create mode 100644 internal/storage/slice.go create mode 100644 internal/storage/storage.go create mode 100644 internal/storage/tapple.go create mode 100644 internal/storage/tuple.go create mode 100644 internal/transaction/tx.go create mode 100644 pkg/config/config.go create mode 100644 pkg/types/id.go create mode 100644 pkg/types/types.go create mode 100644 pkg/utils/colors.go create mode 100644 pkg/utils/logger.go diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e2ed362 --- /dev/null +++ b/Makefile @@ -0,0 +1,95 @@ +# /futriis/Makefile +# Makefile для сборки проекта futriis на Unix-системах (Solaris, OpenIndiana, Linux) + +.PHONY: all clean deps build build-server run run-server test install help + +# Переменные +BINARY_NAME=futriis +SERVER_BINARY_NAME=futriisd +GO=go +GOFLAGS=-ldflags="-s -w" +BUILD_DIR=build +CMD_DIR=./cmd/futriis + +# Цвета для вывода +GREEN=\033[0;32m +RED=\033[0;31m +NC=\033[0m # No Color + +# Проверка операционной системы +UNAME_S := $(shell uname -s) +ifeq ($(UNAME_S),Linux) + OS_SUPPORTED=true +endif +ifeq ($(UNAME_S),SunOS) + OS_SUPPORTED=true +endif +ifndef OS_SUPPORTED + $(error "Операционная система $(UNAME_S) не поддерживается. Проект поддерживает только Solaris, OpenIndiana и Linux.") +endif + +all: clean deps build + +# Создание директории для сборки +$(BUILD_DIR): + @mkdir -p $(BUILD_DIR) + @mkdir -p $(BUILD_DIR)/plugins + @mkdir -p $(BUILD_DIR)/data + +# Установка зависимостей +deps: + @echo "$(GREEN)Установка зависимостей...$(NC)" + $(GO) mod download + $(GO) mod tidy + +# Сборка клиента +build: $(BUILD_DIR) + @echo "$(GREEN)Сборка клиента для $(UNAME_S)...$(NC)" + $(GO) build $(GOFLAGS) -o $(BUILD_DIR)/$(BINARY_NAME) $(CMD_DIR) + @echo "$(GREEN)Сборка завершена. Бинарный файл: $(BUILD_DIR)/$(BINARY_NAME)$(NC)" + +# Сборка сервера +build-server: $(BUILD_DIR) + @echo "$(GREEN)Сборка сервера для $(UNAME_S)...$(NC)" + $(GO) build $(GOFLAGS) -o $(BUILD_DIR)/$(SERVER_BINARY_NAME) $(CMD_DIR) + @echo "$(GREEN)Сборка сервера завершена. Бинарный файл: $(BUILD_DIR)/$(SERVER_BINARY_NAME)$(NC)" + +# Установка проекта +install: + @echo "$(GREEN)Установка проекта...$(NC)" + $(GO) install $(GOFLAGS) $(CMD_DIR) + @echo "$(GREEN)Установка завершена$(NC)" + +# Запуск клиента +run: build + @echo "$(GREEN)Запуск futriis клиента...$(NC)" + ./$(BUILD_DIR)/$(BINARY_NAME) + +# Запуск сервера +run-server: build-server + @echo "$(GREEN)Запуск futriis сервера...$(NC)" + ./$(BUILD_DIR)/$(SERVER_BINARY_NAME) -server -config config.toml + +# Тестирование +test: + @echo "$(GREEN)Запуск тестов...$(NC)" + $(GO) test -v ./... + +# Очистка +clean: + @echo "$(GREEN)Очистка...$(NC)" + rm -rf $(BUILD_DIR) + $(GO) clean + +# Показать помощь +help: + @echo "Доступные команды Make:" + @echo " make deps - установка зависимостей" + @echo " make build - сборка клиента" + @echo " make build-server - сборка сервера" + @echo " make install - установка проекта" + @echo " make run - запуск клиента" + @echo " make run-server - запуск сервера" + @echo " make test - запуск тестов" + @echo " make clean - очистка" + @echo " make help - показать эту справку" diff --git a/README.md b/README.md new file mode 100644 index 0000000..245bc29 --- /dev/null +++ b/README.md @@ -0,0 +1,31 @@ +# futriis - Распределённая in-memory СУБД + +futriis - это легковесная, wait-free и lock-free дружественная in-memory СУБД, +реализованная на Go 1.25.6 с поддержкой MessagePack сериализации. + +## Поддерживаемые ОС + +- Solaris +- OpenIndiana +- Linux (все популярные дистрибутивы) + +**Важно:** Windows и MacOS X не поддерживаются! + +## Архитектура + +СУБД реализует три основных типа данных: +- **Таппл (Tapple)** - аналог базы данных в РСУБД +- **Слайс (Slice)** - аналог таблицы +- **Кортеж (Tuple)** - аналог записи в таблице + +## Требования + +- Go 1.25.6 или выше +- Только Unix-подобная ОС (Solaris, OpenIndiana, Linux) + +## Установка и сборка + +1. Клонируйте репозиторий: +```bash +git clone https://github.com/futriis/db.git +cd futriis diff --git a/cmd/futriis/main.go b/cmd/futriis/main.go new file mode 100644 index 0000000..1b3efe5 --- /dev/null +++ b/cmd/futriis/main.go @@ -0,0 +1,98 @@ +// /futriis/cmd/futriis/main.go +// Основная точка входа в приложение futriis + +package main + +import ( + "flag" + "os" + "os/signal" + "syscall" + + "futriis/internal/cli" + "futriis/internal/server" + "futriis/pkg/config" + "futriis/pkg/utils" +) +var ( + configPath = flag.String("config", "config.toml", "путь к файлу конфигурации") + serverMode = flag.Bool("server", false, "запуск в режиме сервера") +) + +func main() { + flag.Parse() + + // Загружаем конфигурацию + cfg, err := config.Load(*configPath) + if err != nil { + utils.PrintError("Ошибка загрузки конфигурации: %v", err) + os.Exit(1) + } + + // Настраиваем обработку сигналов для корректного завершения + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-sigChan + utils.PrintInfo("\nПолучен сигнал завершения. Закрываем приложение...") + cleanup() + os.Exit(0) + }() + + if *serverMode { + // Запуск в режиме сервера + startServer(cfg) + } else { + // Запуск в режиме CLI + startCLI() + } + + cleanup() +} + +// startCLI запускает интерактивный CLI +func startCLI() { + handler := cli.NewHandler() + + if len(os.Args) > 1 && os.Args[1] != "-config" { + // Пакетный режим + executeBatch(os.Args[1:]) + } else { + // Интерактивный режим + if err := handler.Run(); err != nil { + utils.PrintError("Ошибка выполнения: %v", err) + cleanup() + os.Exit(1) + } + } +} + +// startServer запускает серверный режим +func startServer(cfg *config.Config) { + handler := cli.NewHandler() + srv := server.NewServer(cfg.Node.Address, handler.Engine()) + + if err := srv.Start(); err != nil { + utils.PrintError("Ошибка запуска сервера: %v", err) + os.Exit(1) + } + + // Бесконечное ожидание + select {} +} + +// executeBatch выполняет команды из аргументов командной строки +func executeBatch(commands []string) { + utils.PrintInfo("Пакетный режим пока не реализован") +} + +// cleanup выполняет очистку ресурсов перед завершением +func cleanup() { + // Закрываем логгер + logger := utils.GetLogger() + if logger != nil { + logger.Log("INFO", "Приложение завершено") + logger.Close() + } +} diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..3473534 --- /dev/null +++ b/config.toml @@ -0,0 +1,32 @@ +# /futriis/config.toml +# Конфигурационный файл СУБД futriis + +[cluster] +name = "futriis-cluster" +coordinator_address = "127.0.0.1:7379" +replication_factor = 2 +sync_replication = true +auto_rebalance = true + +[node] +id = "node-1" +address = "127.0.0.1:7380" +data_dir = "./data" +aof_enabled = true +aof_file = "./data/futriis.aof" + +[storage] +page_size = 4096 +max_memory = "1GB" +eviction_policy = "noeviction" + +[replication] +enabled = true +sync_mode = "full" # full, async +heartbeat_interval = 5 +timeout = 30 + +[lua] +enabled = true +plugins_dir = "./plugins" +max_memory = "128MB" diff --git a/data/futriis.aof b/data/futriis.aof new file mode 100644 index 0000000..e69de29 diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..8babd30 --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module futriis + +go 1.25.6 + +require ( + github.com/BurntSushi/toml v1.3.2 + github.com/mattn/go-runewidth v0.0.16 + github.com/vmihailenco/msgpack/v5 v5.4.1 + github.com/yuin/gopher-lua v1.1.1 + golang.org/x/term v0.18.0 +) + +require ( + github.com/rivo/uniseg v0.4.7 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + golang.org/x/sys v0.18.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..58cb17c --- /dev/null +++ b/go.sum @@ -0,0 +1,25 @@ +github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= +github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/cli/commands.go b/internal/cli/commands.go new file mode 100644 index 0000000..cee5702 --- /dev/null +++ b/internal/cli/commands.go @@ -0,0 +1,43 @@ +// /futriis/internal/cli/commands.go +// Пакет cli определяет структуры команд и их параметры +package cli + +// Command представляет команду СУБД +type Command struct { + Name string + Description string + Usage string + Handler func(args []string) (string, error) +} + +// CommandRegistry реестр всех доступных команд +type CommandRegistry struct { + commands map[string]*Command +} + +// NewCommandRegistry создаёт новый реестр команд +func NewCommandRegistry() *CommandRegistry { + return &CommandRegistry{ + commands: make(map[string]*Command), + } +} + +// Register регистрирует новую команду +func (cr *CommandRegistry) Register(cmd *Command) { + cr.commands[cmd.Name] = cmd +} + +// Get возвращает команду по имени +func (cr *CommandRegistry) Get(name string) (*Command, bool) { + cmd, ok := cr.commands[name] + return cmd, ok +} + +// List возвращает список всех команд +func (cr *CommandRegistry) List() []*Command { + cmds := make([]*Command, 0, len(cr.commands)) + for _, cmd := range cr.commands { + cmds = append(cmds, cmd) + } + return cmds +} diff --git a/internal/cli/handler.go b/internal/cli/handler.go new file mode 100644 index 0000000..1b8adca --- /dev/null +++ b/internal/cli/handler.go @@ -0,0 +1,66 @@ +// /futriis/internal/cli/handler.go +// Пакет cli реализует обработчик команд для интерактивного режима + +package cli + +import ( + "fmt" + "strings" + + "futriis/internal/engine" + "futriis/pkg/utils" +) + +// Handler представляет обработчик команд CLI +type Handler struct { + prompt *Prompt + engine *engine.Engine +} + +// NewHandler создаёт новый обработчик команд +func NewHandler() *Handler { + return &Handler{ + prompt: NewPrompt(), + engine: engine.NewEngine(), + } +} + +// Engine возвращает движок базы данных +func (h *Handler) Engine() *engine.Engine { + return h.engine +} + +// Run запускает основной цикл обработки команд +func (h *Handler) Run() error { + utils.PrintBanner() + utils.PrintInfo("Добро пожаловать в Futriis DB. Введите 'help' для списка команд.\n") + + for { + // Читаем команду + input, err := h.prompt.ReadLine() + if err != nil { + return err + } + + // Пропускаем пустые строки + if strings.TrimSpace(input) == "" { + continue + } + + // Обрабатываем команду + result, err := h.engine.Execute(input) + if err != nil { + utils.PrintError("Ошибка: %v", err) + continue + } + + // Выводим результат + if result != "" { + if result == "exit" { + utils.PrintInfo("До свидания!") + return nil + } + fmt.Println(result) + } + } +} diff --git a/internal/cli/history.go b/internal/cli/history.go new file mode 100644 index 0000000..e0da8c1 --- /dev/null +++ b/internal/cli/history.go @@ -0,0 +1,85 @@ +// /futriis/internal/cli/history.go +// Пакет cli реализует управление историей команд с поддержкой клавиши Up + +package cli + +import ( + "os" + + "golang.org/x/term" +) +// History управляет историей команд +type History struct { + commands []string + position int + maxSize int +} + +// NewHistory создаёт новую историю команд +func NewHistory(maxSize int) *History { + return &History{ + commands: make([]string, 0, maxSize), + position: 0, + maxSize: maxSize, + } +} + +// Add добавляет команду в историю +func (h *History) Add(cmd string) { + if cmd == "" { + return + } + + // Не добавляем дубликаты подряд + if len(h.commands) > 0 && h.commands[len(h.commands)-1] == cmd { + return + } + + // Если достигнут максимум, удаляем самую старую команду + if len(h.commands) >= h.maxSize { + h.commands = h.commands[1:] + } + + h.commands = append(h.commands, cmd) + h.position = len(h.commands) +} + +// GetPrevious возвращает предыдущую команду из истории +func (h *History) GetPrevious() string { + if len(h.commands) == 0 { + return "" + } + + if h.position > 0 { + h.position-- + } + + return h.commands[h.position] +} + +// GetNext возвращает следующую команду из истории +func (h *History) GetNext() string { + if h.position < len(h.commands)-1 { + h.position++ + return h.commands[h.position] + } + h.position = len(h.commands) + return "" +} + +// Reset сбрасывает позицию в истории +func (h *History) Reset() { + h.position = len(h.commands) +} + +// SetupRawMode устанавливает терминал в raw-режим для обработки клавиш +func SetupRawMode() (*term.State, error) { + fd := int(os.Stdin.Fd()) + return term.MakeRaw(fd) +} + +// RestoreMode восстанавливает режим терминала +func RestoreMode(oldState *term.State) error { + fd := int(os.Stdin.Fd()) + return term.Restore(fd, oldState) +} diff --git a/internal/cli/prompt.go b/internal/cli/prompt.go new file mode 100644 index 0000000..fba0777 --- /dev/null +++ b/internal/cli/prompt.go @@ -0,0 +1,157 @@ +// /futriis/internal/cli/prompt.go +// Пакет cli реализует интерактивное приглашение с поддержкой истории команд + +package cli + +import ( + "bufio" + "fmt" + "os" + + "futriis/pkg/utils" + "github.com/mattn/go-runewidth" + "golang.org/x/term" +) + +// Prompt представляет интерактивное приглашение +type Prompt struct { + history *History + buffer []rune + pos int +} + +// NewPrompt создаёт новое приглашение +func NewPrompt() *Prompt { + return &Prompt{ + history: NewHistory(100), + buffer: make([]rune, 0), + pos: 0, + } +} + +// ReadLine читает строку с поддержкой истории и редактирования +func (p *Prompt) ReadLine() (string, error) { + oldState, err := term.MakeRaw(int(os.Stdin.Fd())) + if err != nil { + return p.readSimple() + } + defer term.Restore(int(os.Stdin.Fd()), oldState) + + // Очищаем буфер + p.buffer = make([]rune, 0) + p.pos = 0 + + // Показываем приглашение + fmt.Print(utils.GetPrompt()) + + reader := bufio.NewReader(os.Stdin) + + for { + r, _, err := reader.ReadRune() + if err != nil { + return "", err + } + + switch r { + case 3: // Ctrl+C + return "", nil + + case 4: // Ctrl+D + if len(p.buffer) == 0 { + return "exit", nil + } + + case 13, 10: // Enter + fmt.Println() + cmd := string(p.buffer) + p.history.Add(cmd) + p.history.Reset() + return cmd, nil + + case 127: // Backspace + if p.pos > 0 { + // Удаляем символ перед курсором + p.buffer = append(p.buffer[:p.pos-1], p.buffer[p.pos:]...) + p.pos-- + p.refreshLine() + } + + case 27: // Escape sequence (стрелки) + // Читаем следующие два символа + r2, _, _ := reader.ReadRune() + r3, _, _ := reader.ReadRune() + + if r2 == '[' { + switch r3 { + case 'A': // Up arrow + prev := p.history.GetPrevious() + if prev != "" { + p.buffer = []rune(prev) + p.pos = len(p.buffer) + p.refreshLine() + } + + case 'B': // Down arrow + next := p.history.GetNext() + p.buffer = []rune(next) + p.pos = len(p.buffer) + p.refreshLine() + + case 'C': // Right arrow + if p.pos < len(p.buffer) { + p.pos++ + p.refreshLine() + } + + case 'D': // Left arrow + if p.pos > 0 { + p.pos-- + p.refreshLine() + } + } + } + + default: + // Добавляем символ (поддержка Unicode, включая русский) + if r >= 32 { // Печатные символы + // Вставляем символ в позицию курсора + if p.pos == len(p.buffer) { + p.buffer = append(p.buffer, r) + } else { + p.buffer = append(p.buffer[:p.pos], append([]rune{r}, p.buffer[p.pos:]...)...) + } + p.pos++ + p.refreshLine() + } + } + } +} + +// refreshLine обновляет текущую строку с правильным позиционированием курсора +func (p *Prompt) refreshLine() { + // Сохраняем позицию + fmt.Print("\r\033[K") // Возврат в начало строки и очистка + + // Печатаем приглашение и текущий буфер + promptStr := utils.ColorPromptCode + "futriis:~> " + utils.ColorReset + fmt.Print(promptStr + string(p.buffer)) + + // Вычисляем ширину приглашения (без ANSI кодов) + promptWidth := runewidth.StringWidth("futriis:~> ") + + // Вычисляем позицию курсора (учитываем ширину каждого символа) + cursorPos := promptWidth + for i := 0; i < p.pos; i++ { + cursorPos += runewidth.RuneWidth(p.buffer[i]) + } + + // Возвращаем курсор на позицию + fmt.Printf("\033[%dG", cursorPos+1) +} + +// readSimple читает строку без специальной обработки (fallback) +func (p *Prompt) readSimple() (string, error) { + fmt.Print(utils.GetPrompt()) + reader := bufio.NewReader(os.Stdin) + return reader.ReadString('\n') +} diff --git a/internal/cluster/node.go b/internal/cluster/node.go new file mode 100644 index 0000000..776d2e7 --- /dev/null +++ b/internal/cluster/node.go @@ -0,0 +1,336 @@ +// /futriis/internal/cluster/node.go +// Пакет cluster реализует управление кластером и репликацию +package cluster + +import ( + "encoding/json" + "errors" + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "futriis/pkg/config" + "futriis/pkg/utils" +) +// NodeState состояние узла +type NodeState int + +const ( + StateOffline NodeState = iota + StateJoining + StateOnline + StateLeaving + StateFailed +) + +func (s NodeState) String() string { + switch s { + case StateOffline: + return "offline" + case StateJoining: + return "joining" + case StateOnline: + return "online" + case StateLeaving: + return "leaving" + case StateFailed: + return "failed" + default: + return "unknown" + } +} + +// Node представляет узел кластера +type Node struct { + ID string `json:"id"` + Address string `json:"address"` + State NodeState `json:"state"` + LastSeen time.Time `json:"last_seen"` + ShardIDs []string `json:"shard_ids"` + mu sync.RWMutex +} + +// ClusterManager управляет кластером +type ClusterManager struct { + nodes map[string]*Node + coordinatorAddr string + nodeID string + localAddr string + isCoordinator bool + mu sync.RWMutex + heartbeatStop chan struct{} + stats struct { + totalNodes int64 + activeNodes int64 + rebalancingCnt int64 + } +} + +// NewClusterManager создаёт новый менеджер кластера +func NewClusterManager(cfg *config.Config) *ClusterManager { + cm := &ClusterManager{ + nodes: make(map[string]*Node), + coordinatorAddr: cfg.Cluster.CoordinatorAddress, + nodeID: cfg.Node.ID, + localAddr: cfg.Node.Address, + isCoordinator: cfg.Node.Address == cfg.Cluster.CoordinatorAddress, + heartbeatStop: make(chan struct{}), + } + + // Добавляем себя в кластер + cm.addNode(&Node{ + ID: cfg.Node.ID, + Address: cfg.Node.Address, + State: StateOnline, + LastSeen: time.Now(), + }) + + return cm +} + +// Start запускает кластерные сервисы +func (cm *ClusterManager) Start() error { + if cm.isCoordinator { + // Запускаем координатор + go cm.startCoordinator() + } + + // Запускаем heartbeat + go cm.heartbeatLoop() + + utils.PrintInfo("Кластерный менеджер запущен") + return nil +} + +// Stop останавливает кластерные сервисы +func (cm *ClusterManager) Stop() { + close(cm.heartbeatStop) +} + +// heartbeatLoop отправляет heartbeat сигналы +func (cm *ClusterManager) heartbeatLoop() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + cm.sendHeartbeat() + case <-cm.heartbeatStop: + return + } + } +} + +// sendHeartbeat отправляет heartbeat координатору +func (cm *ClusterManager) sendHeartbeat() { + if !cm.isCoordinator { + // Отправляем heartbeat координатору + conn, err := net.DialTimeout("tcp", cm.coordinatorAddr, 3*time.Second) + if err != nil { + return + } + defer conn.Close() + + heartbeat := map[string]interface{}{ + "type": "heartbeat", + "node_id": cm.nodeID, + "address": cm.localAddr, + "time": time.Now().Unix(), + } + + json.NewEncoder(conn).Encode(heartbeat) + } +} + +// startCoordinator запускает координатор кластера +func (cm *ClusterManager) startCoordinator() { + listener, err := net.Listen("tcp", cm.coordinatorAddr) + if err != nil { + utils.PrintError("Ошибка запуска координатора: %v", err) + return + } + defer listener.Close() + + utils.PrintInfo("Координатор кластера запущен на " + cm.coordinatorAddr) + + for { + conn, err := listener.Accept() + if err != nil { + continue + } + + go cm.handleCoordinatorRequest(conn) + } +} + +// handleCoordinatorRequest обрабатывает запросы к координатору +func (cm *ClusterManager) handleCoordinatorRequest(conn net.Conn) { + defer conn.Close() + + var req map[string]interface{} + if err := json.NewDecoder(conn).Decode(&req); err != nil { + return + } + + msgType, _ := req["type"].(string) + + switch msgType { + case "heartbeat": + nodeID, _ := req["node_id"].(string) + address, _ := req["address"].(string) + cm.updateNodeHeartbeat(nodeID, address) + + case "join": + nodeID, _ := req["node_id"].(string) + address, _ := req["address"].(string) + cm.handleNodeJoin(nodeID, address) + + case "leave": + nodeID, _ := req["node_id"].(string) + cm.handleNodeLeave(nodeID) + } +} + +// handleNodeJoin обрабатывает присоединение узла +func (cm *ClusterManager) handleNodeJoin(nodeID, address string) { + cm.mu.Lock() + defer cm.mu.Unlock() + + if node, exists := cm.nodes[nodeID]; exists { + node.State = StateOnline + node.LastSeen = time.Now() + } else { + cm.nodes[nodeID] = &Node{ + ID: nodeID, + Address: address, + State: StateOnline, + LastSeen: time.Now(), + } + atomic.AddInt64(&cm.stats.totalNodes, 1) + } + + atomic.AddInt64(&cm.stats.activeNodes, 1) + utils.PrintSuccess("Узел %s (%s) присоединился к кластеру", nodeID, address) +} + +// handleNodeLeave обрабатывает отключение узла +func (cm *ClusterManager) handleNodeLeave(nodeID string) { + cm.mu.Lock() + defer cm.mu.Unlock() + + if node, exists := cm.nodes[nodeID]; exists { + node.State = StateOffline + atomic.AddInt64(&cm.stats.activeNodes, -1) + utils.PrintWarning("Узел %s покинул кластер", nodeID) + } +} + +// updateNodeHeartbeat обновляет время последнего heartbeat узла +func (cm *ClusterManager) updateNodeHeartbeat(nodeID, address string) { + cm.mu.Lock() + defer cm.mu.Unlock() + + if node, exists := cm.nodes[nodeID]; exists { + node.LastSeen = time.Now() + if node.State == StateOffline { + node.State = StateOnline + atomic.AddInt64(&cm.stats.activeNodes, 1) + } + } else { + cm.nodes[nodeID] = &Node{ + ID: nodeID, + Address: address, + State: StateOnline, + LastSeen: time.Now(), + } + atomic.AddInt64(&cm.stats.totalNodes, 1) + atomic.AddInt64(&cm.stats.activeNodes, 1) + } +} + +// AddNode добавляет новый узел в кластер +func (cm *ClusterManager) AddNode(address string) error { + if !cm.isCoordinator { + return errors.New("только координатор может добавлять узлы") + } + + cm.mu.Lock() + defer cm.mu.Unlock() + + // Генерируем ID для нового узла + nodeID := fmt.Sprintf("node-%d", len(cm.nodes)+1) + + cm.nodes[nodeID] = &Node{ + ID: nodeID, + Address: address, + State: StateJoining, + LastSeen: time.Now(), + } + + atomic.AddInt64(&cm.stats.totalNodes, 1) + + utils.PrintSuccess("Узел %s (%s) добавлен в кластер", nodeID, address) + return nil +} + +// RemoveNode удаляет узел из кластера +func (cm *ClusterManager) RemoveNode(nodeID string) error { + if !cm.isCoordinator { + return errors.New("только координатор может удалять узлы") + } + + cm.mu.Lock() + defer cm.mu.Unlock() + + if node, exists := cm.nodes[nodeID]; exists { + node.State = StateLeaving + delete(cm.nodes, nodeID) + atomic.AddInt64(&cm.stats.totalNodes, -1) + if node.State == StateOnline { + atomic.AddInt64(&cm.stats.activeNodes, -1) + } + utils.PrintSuccess("Узел %s удален из кластера", nodeID) + return nil + } + + return errors.New("узел не найден") +} + +// GetClusterStatus возвращает статус кластера +func (cm *ClusterManager) GetClusterStatus() map[string]interface{} { + cm.mu.RLock() + defer cm.mu.RUnlock() + + status := make(map[string]interface{}) + status["total_nodes"] = atomic.LoadInt64(&cm.stats.totalNodes) + status["active_nodes"] = atomic.LoadInt64(&cm.stats.activeNodes) + status["is_coordinator"] = cm.isCoordinator + status["coordinator"] = cm.coordinatorAddr + + nodes := make([]map[string]interface{}, 0, len(cm.nodes)) + for id, node := range cm.nodes { + nodes = append(nodes, map[string]interface{}{ + "id": id, + "address": node.Address, + "state": node.State.String(), + "last_seen": node.LastSeen.Format(time.RFC3339), + }) + } + status["nodes"] = nodes + + return status +} + +// addNode добавляет узел во внутреннюю карту +func (cm *ClusterManager) addNode(node *Node) { + cm.mu.Lock() + defer cm.mu.Unlock() + cm.nodes[node.ID] = node + atomic.AddInt64(&cm.stats.totalNodes, 1) + if node.State == StateOnline { + atomic.AddInt64(&cm.stats.activeNodes, 1) + } +} diff --git a/internal/engine/engine.go b/internal/engine/engine.go new file mode 100644 index 0000000..1157dce --- /dev/null +++ b/internal/engine/engine.go @@ -0,0 +1,521 @@ +// /futriis/internal/engine/engine.go +// Пакет engine реализует ядро СУБД, координирующее все операции +package engine + +import ( + "fmt" + "strings" + + "futriis/internal/cluster" + "futriis/internal/lua" + "futriis/internal/replication" + "futriis/internal/storage" + "futriis/internal/transaction" + "futriis/pkg/config" + "futriis/pkg/utils" +) + +// Engine представляет ядро СУБД +type Engine struct { + storage *storage.Storage + clusterMgr *cluster.ClusterManager + txMgr *transaction.TransactionManager + luaMgr *lua.PluginManager + aofMgr *replication.AOFManager + cfg *config.Config +} + +// NewEngine создаёт новый экземпляр ядра СУБД +func NewEngine() *Engine { + cfg := config.Get() + + // Создаём AOF менеджер + aofMgr, _ := replication.NewAOFManager(cfg.Node.AOFFile, cfg.Node.AOFEnabled) + + // Воспроизводим AOF если нужно + if aofMgr != nil && cfg.Node.AOFEnabled { + // TODO: восстановление состояния из AOF + } + + return &Engine{ + storage: storage.NewStorage(), + clusterMgr: cluster.NewClusterManager(cfg), + txMgr: transaction.NewTransactionManager(), + luaMgr: lua.NewPluginManager(&cfg.Lua), + aofMgr: aofMgr, + cfg: cfg, + } +} + +// Execute выполняет команду и возвращает результат +func (e *Engine) Execute(input string) (string, error) { + // Разбираем ввод + parts := strings.Fields(input) + if len(parts) == 0 { + return "", nil + } + + command := strings.ToLower(parts[0]) + args := parts[1:] + + // Записываем команду в AOF + if e.aofMgr != nil { + argsInterface := make([]interface{}, len(args)) + for i, v := range args { + argsInterface[i] = v + } + e.aofMgr.Append(command, argsInterface) + } + + // Обработка команд + switch command { + case "help": + return e.help(), nil + + case "exit", "quit": + return "exit", nil + + case "create": + return e.handleCreate(args) + + case "delete": + return e.handleDelete(args) + + case "update": + return e.handleUpdate(args) + + case "list": + return e.handleList(args) + + case "show": + return e.handleShow(args) + + case "begin": + return e.handleBegin() + + case "commit": + return e.handleCommit() + + case "rollback": + return e.handleRollback() + + case "cluster.status": + return e.handleClusterStatus() + + case "add.node": + return e.handleAddNode(args) + + case "evict.node": + return e.handleEvictNode(args) + + case "lua": + return e.handleLua(args) + + default: + return "", fmt.Errorf("неизвестная команда: %s", command) + } +} + +// handleCreate обрабатывает команды создания +func (e *Engine) handleCreate(args []string) (string, error) { + if len(args) < 2 { + return "", fmt.Errorf("недостаточно аргументов для команды create") + } + + switch args[0] { + case "tapple": + if len(args) < 2 { + return "", fmt.Errorf("не указано имя таппла") + } + return e.createTapple(args[1]) + case "slice": + if len(args) < 3 { + return "", fmt.Errorf("недостаточно аргументов для создания слайса") + } + return e.createSlice(args[1], args[2]) + case "tuple": + if len(args) < 4 { + return "", fmt.Errorf("недостаточно аргументов для создания кортежа") + } + return e.createTuple(args[1], args[2], args[3], args[4:]) + default: + return "", fmt.Errorf("неизвестный тип создания: %s", args[0]) + } +} + +// handleDelete обрабатывает команды удаления +func (e *Engine) handleDelete(args []string) (string, error) { + if len(args) < 2 { + return "", fmt.Errorf("недостаточно аргументов для команды delete") + } + + switch args[0] { + case "tapple": + if len(args) < 2 { + return "", fmt.Errorf("не указано имя таппла") + } + return e.deleteTapple(args[1]) + case "slice": + if len(args) < 3 { + return "", fmt.Errorf("недостаточно аргументов для удаления слайса") + } + return e.deleteSlice(args[1], args[2]) + case "tuple": + if len(args) < 4 { + return "", fmt.Errorf("недостаточно аргументов для удаления кортежа") + } + return e.deleteTuple(args[1], args[2], args[3]) + default: + return "", fmt.Errorf("неизвестный тип удаления: %s", args[0]) + } +} + +// handleUpdate обрабатывает команды обновления +func (e *Engine) handleUpdate(args []string) (string, error) { + if len(args) < 4 || args[0] != "tuple" { + return "", fmt.Errorf("неверная команда update") + } + return e.updateTuple(args[1], args[2], args[3], args[4:]) +} + +// handleList обрабатывает команды списка +func (e *Engine) handleList(args []string) (string, error) { + if len(args) < 1 { + return "", fmt.Errorf("недостаточно аргументов для команды list") + } + + switch args[0] { + case "tapples": + return e.listTapples() + case "slices": + if len(args) < 2 { + return "", fmt.Errorf("не указано имя таппла") + } + return e.listSlices(args[1]) + default: + return "", fmt.Errorf("неизвестный тип списка: %s", args[0]) + } +} + +// handleShow обрабатывает команды показа +func (e *Engine) handleShow(args []string) (string, error) { + if len(args) < 2 || args[0] != "tuples" { + return "", fmt.Errorf("неверная команда show") + } + if len(args) < 3 { + return "", fmt.Errorf("недостаточно аргументов для show tuples") + } + return e.showTuples(args[1], args[2]) +} + +// handleBegin начинает транзакцию +func (e *Engine) handleBegin() (string, error) { + id, err := e.txMgr.Begin() + if err != nil { + return "", err + } + return utils.ColorGreen + "Транзакция начата. ID: " + id + utils.ColorReset, nil +} + +// handleCommit фиксирует транзакцию +func (e *Engine) handleCommit() (string, error) { + err := e.txMgr.Commit() + if err != nil { + return "", err + } + return utils.ColorGreen + "Транзакция зафиксирована" + utils.ColorReset, nil +} + +// handleRollback откатывает транзакцию +func (e *Engine) handleRollback() (string, error) { + err := e.txMgr.Rollback() + if err != nil { + return "", err + } + return utils.ColorGreen + "Транзакция отменена" + utils.ColorReset, nil +} + +// handleClusterStatus показывает статус кластера +func (e *Engine) handleClusterStatus() (string, error) { + status := e.clusterMgr.GetClusterStatus() + + result := utils.ColorCyan + "Статус кластера:" + utils.ColorReset + "\n" + result += fmt.Sprintf(" Всего узлов: %d\n", status["total_nodes"]) + result += fmt.Sprintf(" Активных узлов: %d\n", status["active_nodes"]) + result += fmt.Sprintf(" Координатор: %v\n", status["coordinator"]) + + nodes, _ := status["nodes"].([]map[string]interface{}) + if len(nodes) > 0 { + result += utils.ColorCyan + "\nУзлы кластера:" + utils.ColorReset + "\n" + for _, node := range nodes { + result += fmt.Sprintf(" %s (%s) - %s, last seen: %s\n", + node["id"], node["address"], node["state"], node["last_seen"]) + } + } + + return result, nil +} + +// handleAddNode добавляет узел в кластер +func (e *Engine) handleAddNode(args []string) (string, error) { + if len(args) < 1 { + return "", fmt.Errorf("укажите адрес узла") + } + + address := args[0] + err := e.clusterMgr.AddNode(address) + if err != nil { + return "", err + } + + return utils.ColorGreen + "Узел " + address + " добавлен в кластер" + utils.ColorReset, nil +} + +// handleEvictNode удаляет узел из кластера +func (e *Engine) handleEvictNode(args []string) (string, error) { + if len(args) < 1 { + return "", fmt.Errorf("укажите ID узла или адрес") + } + + nodeID := args[0] + err := e.clusterMgr.RemoveNode(nodeID) + if err != nil { + return "", err + } + + return utils.ColorGreen + "Узел " + nodeID + " удален из кластера" + utils.ColorReset, nil +} + +// handleLua выполняет Lua скрипт +func (e *Engine) handleLua(args []string) (string, error) { + if len(args) < 1 { + return "", fmt.Errorf("укажите имя плагина") + } + + pluginName := args[0] + err := e.luaMgr.ExecutePlugin(pluginName) + if err != nil { + return "", err + } + + return utils.ColorGreen + "Плагин выполнен" + utils.ColorReset, nil +} + +// Методы для работы с тапплами +func (e *Engine) createTapple(name string) (string, error) { + tapple, err := e.storage.GetTappleManager().CreateTapple(name) + if err != nil { + return "", err + } + return utils.ColorGreen + "Таппл '" + tapple.Name + "' успешно создан" + utils.ColorReset, nil +} + +func (e *Engine) deleteTapple(name string) (string, error) { + err := e.storage.GetTappleManager().DeleteTapple(name) + if err != nil { + return "", err + } + return utils.ColorGreen + "Таппл '" + name + "' успешно удален" + utils.ColorReset, nil +} + +func (e *Engine) listTapples() (string, error) { + tapples := e.storage.GetTappleManager().ListTapples() + if len(tapples) == 0 { + return utils.ColorYellow + "Тапплы не найдены" + utils.ColorReset, nil + } + + result := utils.ColorCyan + "Список тапплов:" + utils.ColorReset + "\n" + for _, t := range tapples { + result += " " + utils.ColorGreen + t + utils.ColorReset + "\n" + } + return result, nil +} + +// Методы для работы со слайсами +func (e *Engine) createSlice(tappleName, sliceName string) (string, error) { + tapple, err := e.storage.GetTappleManager().GetTapple(tappleName) + if err != nil { + return "", err + } + + slice, err := e.storage.GetTappleManager().GetSliceManager().CreateSlice(tapple, sliceName) + if err != nil { + return "", err + } + + return utils.ColorGreen + "Слайс '" + slice.Name + "' в таппле '" + tappleName + "' успешно создан" + utils.ColorReset, nil +} + +func (e *Engine) deleteSlice(tappleName, sliceName string) (string, error) { + tapple, err := e.storage.GetTappleManager().GetTapple(tappleName) + if err != nil { + return "", err + } + + err = e.storage.GetTappleManager().GetSliceManager().DeleteSlice(tapple, sliceName) + if err != nil { + return "", err + } + + return utils.ColorGreen + "Слайс '" + sliceName + "' в таппле '" + tappleName + "' успешно удален" + utils.ColorReset, nil +} + +func (e *Engine) listSlices(tappleName string) (string, error) { + tapple, err := e.storage.GetTappleManager().GetTapple(tappleName) + if err != nil { + return "", err + } + + slices := e.storage.GetTappleManager().GetSliceManager().ListSlices(tapple) + if len(slices) == 0 { + return utils.ColorYellow + "Слайсы в таппле '" + tappleName + "' не найдены" + utils.ColorReset, nil + } + + result := utils.ColorCyan + "Список слайсов в таппле '" + tappleName + "':" + utils.ColorReset + "\n" + for _, s := range slices { + result += " " + utils.ColorGreen + s + utils.ColorReset + "\n" + } + return result, nil +} + +// Методы для работы с кортежами +func (e *Engine) createTuple(tappleName, sliceName, tupleID string, fieldsArgs []string) (string, error) { + tapple, err := e.storage.GetTappleManager().GetTapple(tappleName) + if err != nil { + return "", err + } + + slice, err := e.storage.GetTappleManager().GetSliceManager().GetSlice(tapple, sliceName) + if err != nil { + return "", err + } + + // Парсим поля + fields := make(map[string]interface{}) + for _, arg := range fieldsArgs { + parts := strings.SplitN(arg, "=", 2) + if len(parts) == 2 { + fields[parts[0]] = parts[1] + } + } + + tuple, err := e.storage.GetTappleManager().GetSliceManager().GetTupleManager().CreateTuple(slice, tupleID, fields) + if err != nil { + return "", err + } + + return utils.ColorGreen + "Кортеж '" + tuple.ID + "' в слайсе '" + sliceName + "' успешно создан" + utils.ColorReset, nil +} + +func (e *Engine) deleteTuple(tappleName, sliceName, tupleID string) (string, error) { + tapple, err := e.storage.GetTappleManager().GetTapple(tappleName) + if err != nil { + return "", err + } + + slice, err := e.storage.GetTappleManager().GetSliceManager().GetSlice(tapple, sliceName) + if err != nil { + return "", err + } + + err = e.storage.GetTappleManager().GetSliceManager().GetTupleManager().DeleteTuple(slice, tupleID) + if err != nil { + return "", err + } + + return utils.ColorGreen + "Кортеж '" + tupleID + "' в слайсе '" + sliceName + "' успешно удален" + utils.ColorReset, nil +} + +func (e *Engine) updateTuple(tappleName, sliceName, tupleID string, fieldsArgs []string) (string, error) { + tapple, err := e.storage.GetTappleManager().GetTapple(tappleName) + if err != nil { + return "", err + } + + slice, err := e.storage.GetTappleManager().GetSliceManager().GetSlice(tapple, sliceName) + if err != nil { + return "", err + } + + // Парсим поля + fields := make(map[string]interface{}) + for _, arg := range fieldsArgs { + parts := strings.SplitN(arg, "=", 2) + if len(parts) == 2 { + fields[parts[0]] = parts[1] + } + } + + tuple, err := e.storage.GetTappleManager().GetSliceManager().GetTupleManager().UpdateTuple(slice, tupleID, fields) + if err != nil { + return "", err + } + + return utils.ColorGreen + "Кортеж '" + tuple.ID + "' в слайсе '" + sliceName + "' успешно обновлен" + utils.ColorReset, nil +} + +// showTuples показывает все кортежи в слайсе +func (e *Engine) showTuples(tappleName, sliceName string) (string, error) { + tapple, err := e.storage.GetTappleManager().GetTapple(tappleName) + if err != nil { + return "", err + } + + slice, err := e.storage.GetTappleManager().GetSliceManager().GetSlice(tapple, sliceName) + if err != nil { + return "", err + } + + slice.RLock() + defer slice.RUnlock() + + if len(slice.Tuples) == 0 { + return utils.ColorYellow + "Кортежи в слайсе '" + sliceName + "' не найдены" + utils.ColorReset, nil + } + + result := utils.ColorCyan + "Список кортежей в слайсе '" + sliceName + "':" + utils.ColorReset + "\n" + for id, tuple := range slice.Tuples { + result += " " + utils.ColorGreen + "ID: " + id + utils.ColorReset + "\n" + for k, v := range tuple.Fields { + // ИСПРАВЛЕНО: заменено ColorPrompt на ColorPromptCode + result += " " + utils.ColorYellow + k + utils.ColorReset + ": " + utils.ColorPromptCode + fmt.Sprint(v) + utils.ColorReset + "\n" + } + result += "\n" + } + return result, nil +} + +// help возвращает справку по командам +func (e *Engine) help() string { + help := utils.ColorCyan + "Доступные команды:" + utils.ColorReset + "\n" + + help += "\n" + utils.ColorYellow + "Основные команды:" + utils.ColorReset + "\n" + help += " " + utils.ColorGreen + "create tapple " + utils.ColorReset + " - создать новый таппл (базу данных)\n" + help += " " + utils.ColorGreen + "create slice " + utils.ColorReset + " - создать новый слайс (таблицу)\n" + help += " " + utils.ColorGreen + "create tuple [key=value...]" + utils.ColorReset + " - создать новый кортеж (запись)\n" + help += " " + utils.ColorGreen + "delete tapple " + utils.ColorReset + " - удалить таппл\n" + help += " " + utils.ColorGreen + "delete slice " + utils.ColorReset + " - удалить слайс\n" + help += " " + utils.ColorGreen + "delete tuple " + utils.ColorReset + " - удалить кортеж\n" + help += " " + utils.ColorGreen + "update tuple [key=value...]" + utils.ColorReset + " - обновить кортеж\n" + help += " " + utils.ColorGreen + "list tapples" + utils.ColorReset + " - показать все тапплы\n" + help += " " + utils.ColorGreen + "list slices " + utils.ColorReset + " - показать все слайсы в таппле\n" + help += " " + utils.ColorGreen + "show tuples " + utils.ColorReset + " - показать все кортежи в слайсе\n" + + help += "\n" + utils.ColorYellow + "Транзакции:" + utils.ColorReset + "\n" + help += " " + utils.ColorGreen + "begin" + utils.ColorReset + " - начать транзакцию\n" + help += " " + utils.ColorGreen + "commit" + utils.ColorReset + " - зафиксировать транзакцию\n" + help += " " + utils.ColorGreen + "rollback" + utils.ColorReset + " - отменить транзакцию\n" + + help += "\n" + utils.ColorYellow + "Управление кластером:" + utils.ColorReset + "\n" + help += " " + utils.ColorGreen + "cluster.status" + utils.ColorReset + " - показать статус кластера\n" + help += " " + utils.ColorGreen + "add.node
" + utils.ColorReset + " - добавить узел в кластер\n" + help += " " + utils.ColorGreen + "evict.node " + utils.ColorReset + " - удалить узел из кластера\n" + + help += "\n" + utils.ColorYellow + "Lua плагины:" + utils.ColorReset + "\n" + help += " " + utils.ColorGreen + "lua " + utils.ColorReset + " - выполнить Lua плагин\n" + + help += "\n" + utils.ColorYellow + "Прочее:" + utils.ColorReset + "\n" + help += " " + utils.ColorGreen + "exit/quit" + utils.ColorReset + " - выйти из СУБД\n" + + return help +} diff --git a/internal/lua/plugin.go b/internal/lua/plugin.go new file mode 100644 index 0000000..a9ab9be --- /dev/null +++ b/internal/lua/plugin.go @@ -0,0 +1,150 @@ +// /futriis/internal/lua/plugin.go +// Пакет lua реализует поддержку Lua плагинов + +package lua + +import ( + "fmt" + "io/ioutil" + "path/filepath" + "sync" + + "futriis/pkg/config" + "futriis/pkg/utils" + "github.com/yuin/gopher-lua" +) + +// PluginManager управляет Lua плагинами +type PluginManager struct { + state *lua.LState + plugins map[string]*lua.LFunction + mu sync.RWMutex + enabled bool +} + +// NewPluginManager создаёт новый менеджер плагинов +func NewPluginManager(cfg *config.LuaConfig) *PluginManager { + pm := &PluginManager{ + plugins: make(map[string]*lua.LFunction), + enabled: cfg.Enabled, + } + + if cfg.Enabled { + pm.state = lua.NewState() + pm.registerFunctions() + } + + return pm +} + +// registerFunctions регистрирует функции Go, доступные из Lua +func (pm *PluginManager) registerFunctions() { + if !pm.enabled || pm.state == nil { + return + } + + // Регистрируем функции для работы с данными + pm.state.SetGlobal("print", pm.state.NewFunction(func(L *lua.LState) int { + top := L.GetTop() + args := make([]interface{}, top) + for i := 1; i <= top; i++ { + args[i-1] = L.Get(i).String() + } + utils.PrintInfo(fmt.Sprint(args...)) + return 0 + })) + + pm.state.SetGlobal("get", pm.state.NewFunction(func(L *lua.LState) int { + key := L.ToString(1) + // TODO: получить значение из хранилища + L.Push(lua.LString(key)) + return 1 + })) + + pm.state.SetGlobal("set", pm.state.NewFunction(func(L *lua.LState) int { + key := L.ToString(1) + value := L.ToString(2) + // TODO: установить значение в хранилище + utils.PrintInfo("Lua set: %s = %s", key, value) + return 0 + })) +} + +// LoadPlugins загружает все Lua плагины из директории +func (pm *PluginManager) LoadPlugins(pluginsDir string) error { + if !pm.enabled || pm.state == nil { + return nil + } + + files, err := ioutil.ReadDir(pluginsDir) + if err != nil { + return err + } + + for _, file := range files { + if filepath.Ext(file.Name()) == ".lua" { + if err := pm.LoadPlugin(filepath.Join(pluginsDir, file.Name())); err != nil { + utils.PrintError("Ошибка загрузки плагина %s: %v", file.Name(), err) + } + } + } + + return nil +} + +// LoadPlugin загружает один Lua плагин +func (pm *PluginManager) LoadPlugin(path string) error { + if !pm.enabled || pm.state == nil { + return nil + } + + data, err := ioutil.ReadFile(path) + if err != nil { + return err + } + + fn, err := pm.state.LoadString(string(data)) + if err != nil { + return err + } + + pm.mu.Lock() + pm.plugins[filepath.Base(path)] = fn + pm.mu.Unlock() + + utils.PrintSuccess("Загружен плагин: %s", filepath.Base(path)) + return nil +} + +// ExecutePlugin выполняет функцию плагина +func (pm *PluginManager) ExecutePlugin(name string, args ...lua.LValue) error { + if !pm.enabled || pm.state == nil { + return nil + } + + pm.mu.RLock() + fn, exists := pm.plugins[name] + pm.mu.RUnlock() + + if !exists { + return fmt.Errorf("плагин %s не найден", name) + } + + pm.state.Push(fn) + for _, arg := range args { + pm.state.Push(arg) + } + + if err := pm.state.PCall(len(args), lua.MultRet, nil); err != nil { + return err + } + + return nil +} + +// Close закрывает Lua состояние +func (pm *PluginManager) Close() { + if pm.state != nil { + pm.state.Close() + } +} diff --git a/internal/msgpack/deserializer.go b/internal/msgpack/deserializer.go new file mode 100644 index 0000000..954d23b --- /dev/null +++ b/internal/msgpack/deserializer.go @@ -0,0 +1,42 @@ +// /futriis/internal/msgpack/deserializer.go +// Пакет msgpack предоставляет дополнительные методы десериализации +// /futriis/internal/msgpack/deserializer.go + +package msgpack + +import ( + "futriis/pkg/types" + "github.com/vmihailenco/msgpack/v5" +) + +// Deserializer расширяет функциональность десериализации +type Deserializer struct { + serializer *Serializer +} + +// NewDeserializer создаёт новый экземпляр десериализатора +func NewDeserializer() *Deserializer { + return &Deserializer{ + serializer: NewSerializer(), + } +} + +// DeserializeValue десериализует значение произвольного типа +func (d *Deserializer) DeserializeValue(data []byte) (interface{}, error) { + var value interface{} + err := msgpack.Unmarshal(data, &value) + if err != nil { + return nil, err + } + return value, nil +} + +// DeserializeMap десериализует данные в карту +func (d *Deserializer) DeserializeMap(data []byte) (map[string]interface{}, error) { + var m map[string]interface{} + err := msgpack.Unmarshal(data, &m) + if err != nil { + return nil, err + } + return m, nil +} diff --git a/internal/msgpack/serializer.go b/internal/msgpack/serializer.go new file mode 100644 index 0000000..9c60ded --- /dev/null +++ b/internal/msgpack/serializer.go @@ -0,0 +1,66 @@ +// /futriis/internal/msgpack/serializer.go +// Пакет msgpack реализует сериализацию и десериализацию данных с использованием MessagePack +// /futriis/internal/msgpack/serializer.go + +package msgpack + +import ( + "futriis/pkg/types" + "github.com/vmihailenco/msgpack/v5" +) + +// Serializer предоставляет методы для сериализации данных +type Serializer struct { + enc *msgpack.Encoder + dec *msgpack.Decoder +} + +// NewSerializer создаёт новый экземпляр сериализатора +func NewSerializer() *Serializer { + return &Serializer{} +} + +// SerializeTuple сериализует кортеж в формат MessagePack +func (s *Serializer) SerializeTuple(tuple *types.Tuple) ([]byte, error) { + return msgpack.Marshal(tuple) +} + +// DeserializeTuple десериализует кортеж из формата MessagePack +func (s *Serializer) DeserializeTuple(data []byte) (*types.Tuple, error) { + var tuple types.Tuple + err := msgpack.Unmarshal(data, &tuple) + if err != nil { + return nil, err + } + return &tuple, nil +} + +// SerializeSlice сериализует слайс в формат MessagePack +func (s *Serializer) SerializeSlice(slice *types.Slice) ([]byte, error) { + return msgpack.Marshal(slice) +} + +// DeserializeSlice десериализует слайс из формата MessagePack +func (s *Serializer) DeserializeSlice(data []byte) (*types.Slice, error) { + var slice types.Slice + err := msgpack.Unmarshal(data, &slice) + if err != nil { + return nil, err + } + return &slice, nil +} + +// SerializeTapple сериализует таппл в формат MessagePack +func (s *Serializer) SerializeTapple(tapple *types.Tapple) ([]byte, error) { + return msgpack.Marshal(tapple) +} + +// DeserializeTapple десериализует таппл из формата MessagePack +func (s *Serializer) DeserializeTapple(data []byte) (*types.Tapple, error) { + var tapple types.Tapple + err := msgpack.Unmarshal(data, &tapple) + if err != nil { + return nil, err + } + return &tapple, nil +} diff --git a/internal/replication/aof.go b/internal/replication/aof.go new file mode 100644 index 0000000..868d9f1 --- /dev/null +++ b/internal/replication/aof.go @@ -0,0 +1,172 @@ +// /futriis/internal/replication/aof.go +// Пакет replication реализует механизм AOF (Append Only File) как в Redis) + +package replication + +import ( + "bufio" + "encoding/json" + "os" + "sync" + "time" + + "futriis/pkg/utils" +) + +// AOFCommand представляет команду для AOF +type AOFCommand struct { + Timestamp int64 `json:"ts"` + Command string `json:"cmd"` + Args []interface{} `json:"args"` +} + +// AOFManager управляет AOF файлом +type AOFManager struct { + file *os.File + writer *bufio.Writer + mu sync.Mutex + enabled bool + filePath string + stopChan chan struct{} +} + +// NewAOFManager создаёт новый менеджер AOF +func NewAOFManager(filePath string, enabled bool) (*AOFManager, error) { + aof := &AOFManager{ + enabled: enabled, + filePath: filePath, + stopChan: make(chan struct{}), + } + + if enabled { + if err := aof.openFile(); err != nil { + return nil, err + } + + // Запускаем периодический sync + go aof.syncLoop() + } + + return aof, nil +} + +// openFile открывает AOF файл +func (aof *AOFManager) openFile() error { + // Создаем директорию если не существует + dir := aof.filePath[:len(aof.filePath)-len("/futriis.aof")] + if err := os.MkdirAll(dir, 0755); err != nil { + return err + } + + file, err := os.OpenFile(aof.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + + aof.file = file + aof.writer = bufio.NewWriter(file) + return nil +} + +// syncLoop периодически синхронизирует данные на диск +func (aof *AOFManager) syncLoop() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + aof.mu.Lock() + if aof.writer != nil { + aof.writer.Flush() + aof.file.Sync() + } + aof.mu.Unlock() + case <-aof.stopChan: + return + } + } +} + +// Append добавляет команду в AOF +func (aof *AOFManager) Append(cmd string, args []interface{}) error { + if !aof.enabled || aof.writer == nil { + return nil + } + + aof.mu.Lock() + defer aof.mu.Unlock() + + command := AOFCommand{ + Timestamp: time.Now().UnixNano(), + Command: cmd, + Args: args, + } + + data, err := json.Marshal(command) + if err != nil { + return err + } + + if _, err := aof.writer.Write(data); err != nil { + return err + } + + if _, err := aof.writer.WriteString("\n"); err != nil { + return err + } + + return nil +} + +// Close закрывает AOF файл +func (aof *AOFManager) Close() error { + if !aof.enabled { + return nil + } + + close(aof.stopChan) + + aof.mu.Lock() + defer aof.mu.Unlock() + + if aof.writer != nil { + aof.writer.Flush() + } + + if aof.file != nil { + return aof.file.Close() + } + + return nil +} + +// Replay воспроизводит команды из AOF +func (aof *AOFManager) Replay(handler func(cmd string, args []interface{}) error) error { + if !aof.enabled { + return nil + } + + file, err := os.Open(aof.filePath) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + var cmd AOFCommand + if err := json.Unmarshal(scanner.Bytes(), &cmd); err != nil { + continue + } + + if err := handler(cmd.Command, cmd.Args); err != nil { + utils.PrintError("Ошибка воспроизведения команды: %v", err) + } + } + + return scanner.Err() +} diff --git a/internal/server/server.go b/internal/server/server.go new file mode 100644 index 0000000..d470d21 --- /dev/null +++ b/internal/server/server.go @@ -0,0 +1,126 @@ +// /futriis/internal/server/server.go +// Пакет server реализует серверную часть для клиент-серверной архитектуры + +package server + +import ( + "bufio" + "encoding/json" + "net" + "sync" + + "futriis/internal/engine" + "futriis/pkg/utils" +) + +// Server представляет сервер СУБД +type Server struct { + address string + engine *engine.Engine + listener net.Listener + clients map[net.Conn]bool + mu sync.RWMutex + stopChan chan struct{} +} + +// NewServer создаёт новый сервер +func NewServer(address string, engine *engine.Engine) *Server { + return &Server{ + address: address, + engine: engine, + clients: make(map[net.Conn]bool), + stopChan: make(chan struct{}), + } +} + +// Start запускает сервер +func (s *Server) Start() error { + listener, err := net.Listen("tcp", s.address) + if err != nil { + return err + } + + s.listener = listener + utils.PrintSuccess("Сервер запущен на %s", s.address) + + go s.acceptLoop() + + return nil +} + +// acceptLoop принимает входящие соединения +func (s *Server) acceptLoop() { + for { + select { + case <-s.stopChan: + return + default: + conn, err := s.listener.Accept() + if err != nil { + continue + } + + s.mu.Lock() + s.clients[conn] = true + s.mu.Unlock() + + go s.handleClient(conn) + } + } +} + +// handleClient обрабатывает клиентское соединение +func (s *Server) handleClient(conn net.Conn) { + defer func() { + s.mu.Lock() + delete(s.clients, conn) + s.mu.Unlock() + conn.Close() + }() + + scanner := bufio.NewScanner(conn) + for scanner.Scan() { + line := scanner.Text() + + // Парсим JSON запрос + var req map[string]interface{} + if err := json.Unmarshal([]byte(line), &req); err != nil { + // Если не JSON, обрабатываем как простую команду + result, err := s.engine.Execute(line) + s.sendResponse(conn, result, err) + } else { + // Обрабатываем JSON запрос + cmd, _ := req["command"].(string) + result, err := s.engine.Execute(cmd) + s.sendResponse(conn, result, err) + } + } +} + +// sendResponse отправляет ответ клиенту +func (s *Server) sendResponse(conn net.Conn, result string, err error) { + response := make(map[string]interface{}) + if err != nil { + response["error"] = err.Error() + } else { + response["result"] = result + } + + data, _ := json.Marshal(response) + conn.Write(append(data, '\n')) +} + +// Stop останавливает сервер +func (s *Server) Stop() { + close(s.stopChan) + + if s.listener != nil { + s.listener.Close() + } + + s.mu.Lock() + for conn := range s.clients { + conn.Close() + } + s.mu.Unlock() +} diff --git a/internal/storage/slice.go b/internal/storage/slice.go new file mode 100644 index 0000000..28c308a --- /dev/null +++ b/internal/storage/slice.go @@ -0,0 +1,122 @@ +// /futriis/internal/storage/slice.go +// Пакет storage реализует операции со слайсами (таблицами) + +package storage + +import ( + "errors" + "sync/atomic" + + "futriis/pkg/types" + "futriis/pkg/utils" +) + +// SliceManager управляет операциями со слайсами +type SliceManager struct { + tupleManager *TupleManager + stats struct { + created int64 + deleted int64 + } +} + +// NewSliceManager создаёт новый менеджер слайсов +func NewSliceManager() *SliceManager { + return &SliceManager{ + tupleManager: NewTupleManager(), + } +} + +// CreateSlice создаёт новый слайс в указанном таппле +func (sm *SliceManager) CreateSlice(tapple *types.Tapple, name string) (*types.Slice, error) { + if tapple == nil { + return nil, errors.New("tapple is nil") + } + + tapple.RLock() + _, exists := tapple.Slices[name] + tapple.RUnlock() + + if exists { + return nil, errors.New("slice already exists") + } + + slice := types.NewSlice(name) + + tapple.Lock() + tapple.Slices[name] = slice + tapple.Unlock() + + atomic.AddInt64(&sm.stats.created, 1) + + logger := utils.GetLogger() + if logger != nil { + logger.Log("INFO", "Created slice: "+name) + } + + return slice, nil +} + +// GetSlice возвращает слайс по имени +func (sm *SliceManager) GetSlice(tapple *types.Tapple, name string) (*types.Slice, error) { + if tapple == nil { + return nil, errors.New("tapple is nil") + } + + tapple.RLock() + slice, exists := tapple.Slices[name] + tapple.RUnlock() + + if !exists { + return nil, errors.New("slice not found") + } + + return slice, nil +} + +// DeleteSlice удаляет слайс +func (sm *SliceManager) DeleteSlice(tapple *types.Tapple, name string) error { + if tapple == nil { + return errors.New("tapple is nil") + } + + tapple.Lock() + _, exists := tapple.Slices[name] + if !exists { + tapple.Unlock() + return errors.New("slice not found") + } + + delete(tapple.Slices, name) + tapple.Unlock() + + atomic.AddInt64(&sm.stats.deleted, 1) + + logger := utils.GetLogger() + if logger != nil { + logger.Log("INFO", "Deleted slice: "+name) + } + + return nil +} + +// ListSlices возвращает список всех слайсов в таппле +func (sm *SliceManager) ListSlices(tapple *types.Tapple) []string { + if tapple == nil { + return nil + } + + tapple.RLock() + defer tapple.RUnlock() + + slices := make([]string, 0, len(tapple.Slices)) + for name := range tapple.Slices { + slices = append(slices, name) + } + return slices +} + +// GetTupleManager возвращает менеджер кортежей +func (sm *SliceManager) GetTupleManager() *TupleManager { + return sm.tupleManager +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go new file mode 100644 index 0000000..7649f86 --- /dev/null +++ b/internal/storage/storage.go @@ -0,0 +1,40 @@ +// /futriis/internal/storage/storage.go +// Пакет storage предоставляет единую точку доступа к хранилищу данных + +package storage + +import ( + "futriis/pkg/types" +) + +// Storage представляет основное хранилище данных +type Storage struct { + tappleManager *TappleManager +} + +// NewStorage создаёт новое хранилище +func NewStorage() *Storage { + return &Storage{ + tappleManager: NewTappleManager(), + } +} + +// GetTappleManager возвращает менеджер тапплов +func (s *Storage) GetTappleManager() *TappleManager { + return s.tappleManager +} + +// ExecuteCommand выполняет команду над хранилищем +func (s *Storage) ExecuteCommand(cmd string, args []string) (interface{}, error) { + // Будет расширяться по мере добавления команд + return nil, nil +} + +// Backup создаёт резервную копию всех данных +func (s *Storage) Backup() map[string]*types.Tapple { + backup := make(map[string]*types.Tapple) + for name, tapple := range s.tappleManager.tapples { + backup[name] = tapple + } + return backup +} diff --git a/internal/storage/tapple.go b/internal/storage/tapple.go new file mode 100644 index 0000000..6985051 --- /dev/null +++ b/internal/storage/tapple.go @@ -0,0 +1,90 @@ +// /futriis/internal/storage/tapple.go +// Пакет storage реализует операции с тапплами (базами данных) + +package storage + +import ( + "errors" + "sync/atomic" + + "futriis/pkg/types" + "futriis/pkg/utils" +) + +// TappleManager управляет операциями с тапплами +type TappleManager struct { + tapples map[string]*types.Tapple + sliceManager *SliceManager + stats struct { + created int64 + deleted int64 + } +} + +// NewTappleManager создаёт новый менеджер тапплов +func NewTappleManager() *TappleManager { + return &TappleManager{ + tapples: make(map[string]*types.Tapple), + sliceManager: NewSliceManager(), + } +} + +// CreateTapple создаёт новый таппл +func (tm *TappleManager) CreateTapple(name string) (*types.Tapple, error) { + if _, exists := tm.tapples[name]; exists { + return nil, errors.New("tapple already exists") + } + + tapple := types.NewTapple(name) + tm.tapples[name] = tapple + + atomic.AddInt64(&tm.stats.created, 1) + + logger := utils.GetLogger() + if logger != nil { + logger.Log("INFO", "Created tapple: "+name) + } + + return tapple, nil +} + +// GetTapple возвращает таппл по имени +func (tm *TappleManager) GetTapple(name string) (*types.Tapple, error) { + tapple, exists := tm.tapples[name] + if !exists { + return nil, errors.New("tapple not found") + } + return tapple, nil +} + +// DeleteTapple удаляет таппл +func (tm *TappleManager) DeleteTapple(name string) error { + if _, exists := tm.tapples[name]; !exists { + return errors.New("tapple not found") + } + + delete(tm.tapples, name) + + atomic.AddInt64(&tm.stats.deleted, 1) + + logger := utils.GetLogger() + if logger != nil { + logger.Log("INFO", "Deleted tapple: "+name) + } + + return nil +} + +// ListTapples возвращает список всех тапплов +func (tm *TappleManager) ListTapples() []string { + tapples := make([]string, 0, len(tm.tapples)) + for name := range tm.tapples { + tapples = append(tapples, name) + } + return tapples +} + +// GetSliceManager возвращает менеджер слайсов +func (tm *TappleManager) GetSliceManager() *SliceManager { + return tm.sliceManager +} diff --git a/internal/storage/tuple.go b/internal/storage/tuple.go new file mode 100644 index 0000000..94a7389 --- /dev/null +++ b/internal/storage/tuple.go @@ -0,0 +1,149 @@ +// /futriis/internal/storage/tuple.go +// Пакет storage реализует операции с кортежами (записями) + +package storage + +import ( + "errors" + "sync/atomic" + + "futriis/pkg/types" + "futriis/pkg/utils" +) +// TupleManager управляет операциями с кортежами +type TupleManager struct { + stats struct { + created int64 + updated int64 + deleted int64 + read int64 + } +} + +// NewTupleManager создаёт новый менеджер кортежей +func NewTupleManager() *TupleManager { + return &TupleManager{} +} + +// CreateTuple создаёт новый кортеж в указанном слайсе +// Wait-free операция: использует атомарные операции для счётчиков +func (tm *TupleManager) CreateTuple(slice *types.Slice, id string, fields map[string]interface{}) (*types.Tuple, error) { + if slice == nil { + return nil, errors.New("slice is nil") + } + + // Проверяем существование кортежа + slice.RLock() + _, exists := slice.Tuples[id] + slice.RUnlock() + + if exists { + return nil, errors.New("tuple already exists") + } + + // Создаём новый кортеж + tuple := types.NewTuple(id) + for k, v := range fields { + tuple.Fields[k] = v + } + + // Добавляем в слайс + slice.Lock() + slice.Tuples[id] = tuple + slice.Unlock() + + // Атомарно увеличиваем счётчик созданных + atomic.AddInt64(&tm.stats.created, 1) + + // Логируем операцию + logger := utils.GetLogger() + if logger != nil { + logger.Log("INFO", "Created tuple: "+id) + } + + return tuple, nil +} + +// ReadTuple читает кортеж по ID +// Wait-free операция для чтения +func (tm *TupleManager) ReadTuple(slice *types.Slice, id string) (*types.Tuple, error) { + if slice == nil { + return nil, errors.New("slice is nil") + } + + slice.RLock() + tuple, exists := slice.Tuples[id] + slice.RUnlock() + + if !exists { + return nil, errors.New("tuple not found") + } + + atomic.AddInt64(&tm.stats.read, 1) + return tuple, nil +} + +// UpdateTuple обновляет поля кортежа +func (tm *TupleManager) UpdateTuple(slice *types.Slice, id string, fields map[string]interface{}) (*types.Tuple, error) { + if slice == nil { + return nil, errors.New("slice is nil") + } + + slice.Lock() + defer slice.Unlock() + + tuple, exists := slice.Tuples[id] + if !exists { + return nil, errors.New("tuple not found") + } + + // Обновляем поля + for k, v := range fields { + tuple.Fields[k] = v + } + + atomic.AddInt64(&tm.stats.updated, 1) + + logger := utils.GetLogger() + if logger != nil { + logger.Log("INFO", "Updated tuple: "+id) + } + + return tuple, nil +} + +// DeleteTuple удаляет кортеж +func (tm *TupleManager) DeleteTuple(slice *types.Slice, id string) error { + if slice == nil { + return errors.New("slice is nil") + } + + slice.Lock() + _, exists := slice.Tuples[id] + if !exists { + slice.Unlock() + return errors.New("tuple not found") + } + + delete(slice.Tuples, id) + slice.Unlock() + + atomic.AddInt64(&tm.stats.deleted, 1) + + logger := utils.GetLogger() + if logger != nil { + logger.Log("INFO", "Deleted tuple: "+id) + } + + return nil +} + +// GetStats возвращает статистику операций +func (tm *TupleManager) GetStats() map[string]int64 { + return map[string]int64{ + "created": atomic.LoadInt64(&tm.stats.created), + "updated": atomic.LoadInt64(&tm.stats.updated), + "deleted": atomic.LoadInt64(&tm.stats.deleted), + "read": atomic.LoadInt64(&tm.stats.read), + } +} diff --git a/internal/transaction/tx.go b/internal/transaction/tx.go new file mode 100644 index 0000000..2fcd2ba --- /dev/null +++ b/internal/transaction/tx.go @@ -0,0 +1,145 @@ +// /futriis/internal/transaction/tx.go +// Пакет transaction реализует простые транзакции (не ACID) + +package transaction + +import ( + "errors" + "sync" + + "futriis/pkg/types" +) + +// TxState состояние транзакции +type TxState int + +const ( + TxStateActive TxState = iota + TxStateCommited + TxStateRolledBack +) + +// Operation представляет операцию в транзакции +type Operation struct { + Type string // create, update, delete + Tapple string + Slice string + Key string + Value interface{} + OldValue interface{} +} + +// Transaction представляет транзакцию +type Transaction struct { + ID string + State TxState + Operations []Operation + mu sync.RWMutex +} + +// TransactionManager управляет транзакциями +type TransactionManager struct { + transactions map[string]*Transaction + currentTx *Transaction + mu sync.RWMutex +} + +// NewTransactionManager создаёт новый менеджер транзакций +func NewTransactionManager() *TransactionManager { + return &TransactionManager{ + transactions: make(map[string]*Transaction), + } +} + +// Begin начинает новую транзакцию +func (tm *TransactionManager) Begin() (string, error) { + tm.mu.Lock() + defer tm.mu.Unlock() + + if tm.currentTx != nil && tm.currentTx.State == TxStateActive { + return "", errors.New("транзакция уже активна") + } + + id := generateTxID() + tx := &Transaction{ + ID: id, + State: TxStateActive, + Operations: make([]Operation, 0), + } + + tm.transactions[id] = tx + tm.currentTx = tx + + return id, nil +} + +// Commit фиксирует текущую транзакцию +func (tm *TransactionManager) Commit() error { + tm.mu.Lock() + defer tm.mu.Unlock() + + if tm.currentTx == nil { + return errors.New("нет активной транзакции") + } + + if tm.currentTx.State != TxStateActive { + return errors.New("транзакция не активна") + } + + tm.currentTx.State = TxStateCommited + tm.currentTx = nil + + return nil +} + +// Rollback откатывает текущую транзакцию +func (tm *TransactionManager) Rollback() error { + tm.mu.Lock() + defer tm.mu.Unlock() + + if tm.currentTx == nil { + return errors.New("нет активной транзакции") + } + + if tm.currentTx.State != TxStateActive { + return errors.New("транзакция не активна") + } + + tm.currentTx.State = TxStateRolledBack + tm.currentTx = nil + + return nil +} + +// AddOperation добавляет операцию в текущую транзакцию +func (tm *TransactionManager) AddOperation(op Operation) error { + tm.mu.RLock() + tx := tm.currentTx + tm.mu.RUnlock() + + if tx == nil { + return errors.New("нет активной транзакции") + } + + tx.mu.Lock() + defer tx.mu.Unlock() + + if tx.State != TxStateActive { + return errors.New("транзакция не активна") + } + + tx.Operations = append(tx.Operations, op) + return nil +} + +// GetCurrentTx возвращает текущую транзакцию +func (tm *TransactionManager) GetCurrentTx() *Transaction { + tm.mu.RLock() + defer tm.mu.RUnlock() + return tm.currentTx +} + +// generateTxID генерирует ID транзакции +func generateTxID() string { + return "tx-" + types.GenerateID() +} diff --git a/pkg/config/config.go b/pkg/config/config.go new file mode 100644 index 0000000..9ea5e7c --- /dev/null +++ b/pkg/config/config.go @@ -0,0 +1,133 @@ +// /futriis/pkg/config/config.go +// Пакет config предоставляет функциональность для работы с конфигурацией + +package config + +import ( + "sync/atomic" + + "github.com/BurntSushi/toml" + "futriis/pkg/utils" +) + +// ClusterConfig конфигурация кластера +type ClusterConfig struct { + Name string `toml:"name"` + CoordinatorAddress string `toml:"coordinator_address"` + ReplicationFactor int `toml:"replication_factor"` + SyncReplication bool `toml:"sync_replication"` + AutoRebalance bool `toml:"auto_rebalance"` +} + +// NodeConfig конфигурация узла +type NodeConfig struct { + ID string `toml:"id"` + Address string `toml:"address"` + DataDir string `toml:"data_dir"` + AOFEnabled bool `toml:"aof_enabled"` + AOFFile string `toml:"aof_file"` +} + +// StorageConfig конфигурация хранилища +type StorageConfig struct { + PageSize int `toml:"page_size"` + MaxMemory string `toml:"max_memory"` + EvictionPolicy string `toml:"eviction_policy"` +} + +// ReplicationConfig конфигурация репликации +type ReplicationConfig struct { + Enabled bool `toml:"enabled"` + SyncMode string `toml:"sync_mode"` + HeartbeatInterval int `toml:"heartbeat_interval"` + Timeout int `toml:"timeout"` +} + +// LuaConfig конфигурация Lua плагинов +type LuaConfig struct { + Enabled bool `toml:"enabled"` + PluginsDir string `toml:"plugins_dir"` + MaxMemory string `toml:"max_memory"` +} + +// Config основная структура конфигурации +type Config struct { + Cluster ClusterConfig `toml:"cluster"` + Node NodeConfig `toml:"node"` + Storage StorageConfig `toml:"storage"` + Replication ReplicationConfig `toml:"replication"` + Lua LuaConfig `toml:"lua"` +} + +var globalConfig atomic.Value + +// Load загружает конфигурацию из файла +func Load(path string) (*Config, error) { + var config Config + + if _, err := toml.DecodeFile(path, &config); err != nil { + return nil, err + } + + // Устанавливаем значения по умолчанию, если не указаны + if config.Cluster.CoordinatorAddress == "" { + config.Cluster.CoordinatorAddress = "127.0.0.1:7379" + } + + if config.Node.Address == "" { + config.Node.Address = "127.0.0.1:7380" + } + + if config.Node.DataDir == "" { + config.Node.DataDir = "./data" + } + + if config.Node.AOFFile == "" { + config.Node.AOFFile = "./data/futriis.aof" + } + + if config.Storage.PageSize == 0 { + config.Storage.PageSize = 4096 + } + + if config.Replication.HeartbeatInterval == 0 { + config.Replication.HeartbeatInterval = 5 + } + + if config.Replication.Timeout == 0 { + config.Replication.Timeout = 30 + } + + if config.Lua.PluginsDir == "" { + config.Lua.PluginsDir = "./plugins" + } + + globalConfig.Store(&config) + + utils.PrintSuccess("Конфигурация загружена из " + path) + return &config, nil +} + +// Get возвращает глобальную конфигурацию +func Get() *Config { + if cfg := globalConfig.Load(); cfg != nil { + return cfg.(*Config) + } + return nil +} + +// GetClusterConfig возвращает конфигурацию кластера +func GetClusterConfig() *ClusterConfig { + if cfg := Get(); cfg != nil { + return &cfg.Cluster + } + return nil +} + +// GetNodeConfig возвращает конфигурацию узла +func GetNodeConfig() *NodeConfig { + if cfg := Get(); cfg != nil { + return &cfg.Node + } + return nil +} diff --git a/pkg/types/id.go b/pkg/types/id.go new file mode 100644 index 0000000..de256fe --- /dev/null +++ b/pkg/types/id.go @@ -0,0 +1,19 @@ +// /futriis/pkg/types/id.go +// Пакет types предоставляет утилиты для генерации ID + +package types + +import ( + "fmt" + "sync/atomic" + "time" +) + +var idCounter uint64 + +// GenerateID генерирует уникальный ID +func GenerateID() string { + counter := atomic.AddUint64(&idCounter, 1) + timestamp := time.Now().UnixNano() + return fmt.Sprintf("%d-%d", timestamp, counter) +} diff --git a/pkg/types/types.go b/pkg/types/types.go new file mode 100644 index 0000000..6d53e7c --- /dev/null +++ b/pkg/types/types.go @@ -0,0 +1,98 @@ +// /futriis/pkg/types/types.go +// Пакет types определяет основные типы данных для СУБД futriis + +package types + +import ( + "sync" +) + +// Tuple представляет кортеж - аналог записи в таблице +// Содержит набор полей в формате ключ-значение +type Tuple struct { + ID string `msgpack:"id"` + Fields map[string]interface{} `msgpack:"fields"` +} + +// NewTuple создаёт новый кортеж с указанным ID +func NewTuple(id string) *Tuple { + return &Tuple{ + ID: id, + Fields: make(map[string]interface{}), + } +} + +// Slice представляет слайс - аналог таблицы в РСУБД +// Содержит коллекцию кортежей +type Slice struct { + Name string `msgpack:"name"` + Tuples map[string]*Tuple `msgpack:"tuples"` + mu sync.RWMutex // Mutex для безопасного доступа, но не блокирующий wait-free операции +} + +// NewSlice создаёт новый слайс с указанным именем +func NewSlice(name string) *Slice { + return &Slice{ + Name: name, + Tuples: make(map[string]*Tuple), + } +} + +// Lock блокирует слайс для записи +func (s *Slice) Lock() { + s.mu.Lock() +} + +// Unlock разблокирует слайс после записи +func (s *Slice) Unlock() { + s.mu.Unlock() +} + +// RLock блокирует слайс для чтения +func (s *Slice) RLock() { + s.mu.RLock() +} + +// RUnlock разблокирует слайс после чтения +func (s *Slice) RUnlock() { + s.mu.RUnlock() +} + +// Tapple представляет таппл - аналог базы данных в РСУБД +// Содержит коллекцию слайсов (таблиц) +type Tapple struct { + Name string `msgpack:"name"` + Slices map[string]*Slice `msgpack:"slices"` + mu sync.RWMutex // Mutex для безопасного доступа +} + +// NewTapple создаёт новый таппл с указанным именем +func NewTapple(name string) *Tapple { + return &Tapple{ + Name: name, + Slices: make(map[string]*Slice), + } +} + +// Lock блокирует таппл для записи +func (t *Tapple) Lock() { + t.mu.Lock() +} + +// Unlock разблокирует таппл после записи +func (t *Tapple) Unlock() { + t.mu.Unlock() +} + +// RLock блокирует таппл для чтения +func (t *Tapple) RLock() { + t.mu.RLock() +} + +// RUnlock разблокирует таппл после чтения +func (t *Tapple) RUnlock() { + t.mu.RUnlock() +} + +// Value представляет значение любого типа, поддерживаемого MessagePack +type Value interface{} diff --git a/pkg/utils/colors.go b/pkg/utils/colors.go new file mode 100644 index 0000000..324ee41 --- /dev/null +++ b/pkg/utils/colors.go @@ -0,0 +1,83 @@ +// /futriis/pkg/utils/colors.go +// Пакет utils предоставляет вспомогательные функции для форматирования вывода + +package utils + +import ( + "fmt" + "strings" +) + +// Цветовые коды ANSI +const ( + ColorReset = "\033[0m" + ColorRed = "\033[31m" + ColorGreen = "\033[32m" + ColorYellow = "\033[33m" + ColorBlue = "\033[34m" + ColorCyan = "\033[36m" + ColorPromptCode = "\033[38;5;39m" // Ярко-синий для приглашения + ColorDeepSkyBlue = "\033[38;2;0;191;255m" // #00bfff - Глубокий небесно-голубой +) + +// GetPrompt возвращает строку приглашения с цветом +func GetPrompt() string { + return ColorPromptCode + "futriis:~> " + ColorReset +} + +// PrintInfo выводит информационное сообщение +func PrintInfo(format string, args ...interface{}) { + fmt.Printf(ColorBlue+"[INFO] "+format+ColorReset+"\n", args...) +} + +// PrintSuccess выводит сообщение об успехе +func PrintSuccess(format string, args ...interface{}) { + fmt.Printf(ColorGreen+"[OK] "+format+ColorReset+"\n", args...) +} + +// PrintWarning выводит предупреждающее сообщение +func PrintWarning(format string, args ...interface{}) { + fmt.Printf(ColorYellow+"[WARNING] "+format+ColorReset+"\n", args...) +} + +// PrintError выводит сообщение об ошибке +func PrintError(format string, args ...interface{}) { + fmt.Printf(ColorRed+"[ERROR] "+format+ColorReset+"\n", args...) +} + +// PrintBanner выводит приветственный баннер при запуске +func PrintBanner() { + banner := ` + F U T R I I S + Distributed Database System +` + + // Верхняя граница цветом #00bfff + fmt.Print(ColorDeepSkyBlue + strings.Repeat("═", 35) + "\n" + ColorReset) + + // Баннер построчно с выравниванием + lines := strings.Split(banner, "\n") + for _, line := range lines { + if line != "" { + // Центрируем текст + padding := (35 - len(line)) / 2 + if padding < 0 { + padding = 0 + } + // Левая граница цветом #00bfff + fmt.Print(ColorDeepSkyBlue + "║" + ColorReset + strings.Repeat(" ", padding)) + + // Логотип и описание тоже цветом #00bfff + fmt.Print(ColorDeepSkyBlue + line + ColorReset) + + // Правая граница + fmt.Println(strings.Repeat(" ", 35-len(line)-padding) + ColorDeepSkyBlue + "║" + ColorReset) + } + } + + // Нижняя граница цветом #00bfff + fmt.Print(ColorDeepSkyBlue + strings.Repeat("═", 35) + "\n" + ColorReset) + + // Версия + fmt.Print(ColorDeepSkyBlue + "Версия 0.1.0\n\n" + ColorReset) +} diff --git a/pkg/utils/logger.go b/pkg/utils/logger.go new file mode 100644 index 0000000..211e364 --- /dev/null +++ b/pkg/utils/logger.go @@ -0,0 +1,54 @@ +// /futriis/pkg/utils/logger.go +// Пакет utils предоставляет функции для логирования + +package utils + +import ( + "fmt" + "os" + "time" +) + +// Logger представляет структуру для логирования +type Logger struct { + file *os.File +} + +var loggerInstance *Logger + +// InitLogger инициализирует логгер +func InitLogger(logFile string) error { + file, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + + loggerInstance = &Logger{ + file: file, + } + + return nil +} + +// GetLogger возвращает экземпляр логгера +func GetLogger() *Logger { + return loggerInstance +} + +// Log записывает сообщение в лог +func (l *Logger) Log(level, message string) { + if l == nil || l.file == nil { + return + } + + timestamp := time.Now().Format("2006-01-02 15:04:05") + logLine := fmt.Sprintf("[%s] %s: %s\n", timestamp, level, message) + l.file.WriteString(logLine) +} + +// Close закрывает файл лога +func (l *Logger) Close() { + if l != nil && l.file != nil { + l.file.Close() + } +}