Files
futriix/internal/storage/engine.go

432 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/storage/engine.go
// Назначение: In-memory движок хранения документов с поддержкой коллекций,
// слайсов (аналог БД), тапплов (аналог таблиц), полей и кортежей.
// Полностью wait-free с использованием sync.Map и атомарных операций.
package storage
import (
"fmt"
"os"
"sync"
"sync/atomic"
"futriis/internal/log"
"futriis/internal/serializer"
)
// Storage представляет основное хранилище баз данных
type Storage struct {
databases sync.Map // map[string]*Database
pageSize int64
logger *log.Logger
totalDocs atomic.Int64
}
// Database представляет базу данных (аналог слайса в реляционных СУБД)
type Database struct {
name string
collections sync.Map // map[string]*Collection
}
// NewStorage создаёт новый экземпляр хранилища
func NewStorage(pageSizeMB int, logger *log.Logger) *Storage {
return &Storage{
pageSize: int64(pageSizeMB) * 1024 * 1024,
logger: logger,
}
}
// CreateDatabase создаёт новую базу данных
func (s *Storage) CreateDatabase(name string) error {
if _, exists := s.databases.LoadOrStore(name, &Database{name: name}); exists {
return fmt.Errorf("database already exists")
}
AuditDatabaseOperation("CREATE", name)
if s.logger != nil {
s.logger.Info("Database created: " + name)
}
return nil
}
// GetDatabase возвращает базу данных по имени
func (s *Storage) GetDatabase(name string) (*Database, error) {
if val, ok := s.databases.Load(name); ok {
return val.(*Database), nil
}
return nil, fmt.Errorf("database not found")
}
// DropDatabase удаляет базу данных
func (s *Storage) DropDatabase(name string) error {
if _, ok := s.databases.LoadAndDelete(name); !ok {
return fmt.Errorf("database not found")
}
AuditDatabaseOperation("DROP", name)
if s.logger != nil {
s.logger.Info("Database dropped: " + name)
}
return nil
}
// ListDatabases возвращает список всех баз данных
func (s *Storage) ListDatabases() []string {
databases := make([]string, 0)
s.databases.Range(func(key, value interface{}) bool {
databases = append(databases, key.(string))
return true
})
return databases
}
// Name возвращает имя базы данных
func (db *Database) Name() string {
return db.name
}
// CreateCollection создаёт новую коллекцию в базе данных
func (db *Database) CreateCollection(name string) error {
if _, exists := db.collections.LoadOrStore(name, NewCollection(db.name, name, nil)); exists {
return fmt.Errorf("collection already exists")
}
AuditCollectionOperation("CREATE", db.name, name, nil)
return nil
}
// CreateCollectionWithSettings создаёт коллекцию с настройками
func (db *Database) CreateCollectionWithSettings(name string, settings *CollectionSettings) error {
if _, exists := db.collections.LoadOrStore(name, NewCollection(db.name, name, settings)); exists {
return fmt.Errorf("collection already exists")
}
AuditCollectionOperation("CREATE", db.name, name, settings)
return nil
}
// GetCollection возвращает коллекцию по имени
func (db *Database) GetCollection(name string) (*Collection, error) {
if val, ok := db.collections.Load(name); ok {
return val.(*Collection), nil
}
return nil, fmt.Errorf("collection not found")
}
// DropCollection удаляет коллекцию
func (db *Database) DropCollection(name string) error {
if _, ok := db.collections.LoadAndDelete(name); !ok {
return fmt.Errorf("collection not found")
}
AuditCollectionOperation("DROP", db.name, name, nil)
return nil
}
// ListCollections возвращает список всех коллекций в базе данных
func (db *Database) ListCollections() []string {
collections := make([]string, 0)
db.collections.Range(func(key, value interface{}) bool {
collections = append(collections, key.(string))
return true
})
return collections
}
// GetTotalDocuments возвращает общее количество документов во всех коллекциях
func (s *Storage) GetTotalDocuments() int64 {
return s.totalDocs.Load()
}
// GetPageSize возвращает размер страницы памяти
func (s *Storage) GetPageSize() int64 {
return s.pageSize
}
// SerializeDatabase сериализует всю базу данных в MessagePack
func (db *Database) SerializeDatabase() ([]byte, error) {
dbData := make(map[string]interface{})
db.collections.Range(func(key, value interface{}) bool {
coll := value.(*Collection)
collData := make(map[string]interface{})
docs := coll.GetAllDocuments()
collDocs := make([]*Document, 0, len(docs))
for _, doc := range docs {
collDocs = append(collDocs, doc)
}
collData["documents"] = collDocs
collData["metadata"] = coll.GetMetadata()
dbData[key.(string)] = collData
return true
})
return serializer.Marshal(dbData)
}
// DeserializeDatabase десериализует базу данных из MessagePack
func (db *Database) DeserializeDatabase(data []byte) error {
var dbData map[string]interface{}
if err := serializer.Unmarshal(data, &dbData); err != nil {
return err
}
for collName, collDataRaw := range dbData {
collData, ok := collDataRaw.(map[string]interface{})
if !ok {
continue
}
settings := &CollectionSettings{
MaxDocuments: 0,
ValidateSchema: false,
AutoIndexID: true,
TTLSeconds: 0,
}
if metaRaw, ok := collData["metadata"]; ok {
if meta, ok := metaRaw.(map[string]interface{}); ok {
if settingsRaw, ok := meta["settings"]; ok {
if settingsMap, ok := settingsRaw.(map[string]interface{}); ok {
if maxDocs, ok := settingsMap["max_documents"]; ok {
if v, ok := maxDocs.(int); ok {
settings.MaxDocuments = v
} else if v, ok := maxDocs.(int64); ok {
settings.MaxDocuments = int(v)
} else if v, ok := maxDocs.(float64); ok {
settings.MaxDocuments = int(v)
}
}
if validateSchema, ok := settingsMap["validate_schema"]; ok {
if v, ok := validateSchema.(bool); ok {
settings.ValidateSchema = v
}
}
if autoIndexID, ok := settingsMap["auto_index_id"]; ok {
if v, ok := autoIndexID.(bool); ok {
settings.AutoIndexID = v
}
}
if ttlSeconds, ok := settingsMap["ttl_seconds"]; ok {
if v, ok := ttlSeconds.(int); ok {
settings.TTLSeconds = v
} else if v, ok := ttlSeconds.(int64); ok {
settings.TTLSeconds = int(v)
} else if v, ok := ttlSeconds.(float64); ok {
settings.TTLSeconds = int(v)
}
}
}
}
} else if meta, ok := metaRaw.(*CollectionMetadata); ok {
if meta.Settings != nil {
settings = meta.Settings
}
}
}
coll := NewCollection(db.name, collName, settings)
if docsRaw, ok := collData["documents"]; ok {
if docs, ok := docsRaw.([]interface{}); ok {
for _, docRaw := range docs {
if doc, ok := docRaw.(*Document); ok {
coll.Insert(doc)
} else if docMap, ok := docRaw.(map[string]interface{}); ok {
doc := NewDocument()
if id, ok := docMap["ID"].(string); ok {
doc.ID = id
} else if id, ok := docMap["id"].(string); ok {
doc.ID = id
}
if fields, ok := docMap["fields"]; ok {
if fieldsMap, ok := fields.(map[string]interface{}); ok {
for k, v := range fieldsMap {
doc.SetField(k, v)
}
}
}
if createdAt, ok := docMap["created_at"]; ok {
if v, ok := createdAt.(int64); ok {
doc.CreatedAt = v
} else if v, ok := createdAt.(float64); ok {
doc.CreatedAt = int64(v)
}
}
if updatedAt, ok := docMap["updated_at"]; ok {
if v, ok := updatedAt.(int64); ok {
doc.UpdatedAt = v
} else if v, ok := updatedAt.(float64); ok {
doc.UpdatedAt = int64(v)
}
}
if version, ok := docMap["version"]; ok {
if v, ok := version.(uint64); ok {
doc.Version = v
} else if v, ok := version.(int64); ok {
doc.Version = uint64(v)
} else if v, ok := version.(float64); ok {
doc.Version = uint64(v)
}
}
coll.Insert(doc)
}
}
} else if docs, ok := docsRaw.([]*Document); ok {
for _, doc := range docs {
coll.Insert(doc)
}
}
}
db.collections.Store(collName, coll)
AuditCollectionOperation("RESTORE", db.name, collName, settings)
}
return nil
}
// GetDatabaseNames возвращает имена всех баз данных
func (s *Storage) GetDatabaseNames() []string {
return s.ListDatabases()
}
// ExistsDatabase проверяет существование базы данных
func (s *Storage) ExistsDatabase(name string) bool {
_, ok := s.databases.Load(name)
return ok
}
// GetDatabaseCount возвращает количество баз данных
func (s *Storage) GetDatabaseCount() int {
count := 0
s.databases.Range(func(key, value interface{}) bool {
count++
return true
})
return count
}
// ========== Дополнительные методы для управления хранилищем ==========
// Backup создаёт резервную копию всех данных
func (s *Storage) Backup(backupPath string) error {
if s.logger != nil {
s.logger.Info(fmt.Sprintf("Starting backup to %s", backupPath))
}
backup := make(map[string]interface{})
s.databases.Range(func(key, value interface{}) bool {
dbName := key.(string)
db := value.(*Database)
dbData, err := db.SerializeDatabase()
if err != nil {
if s.logger != nil {
s.logger.Error(fmt.Sprintf("Failed to serialize database %s: %v", dbName, err))
}
return false
}
backup[dbName] = dbData
return true
})
data, err := serializer.Marshal(backup)
if err != nil {
return fmt.Errorf("failed to marshal backup: %v", err)
}
if err := os.WriteFile(backupPath, data, 0644); err != nil {
return fmt.Errorf("failed to write backup: %v", err)
}
if s.logger != nil {
s.logger.Info(fmt.Sprintf("Backup completed: %s", backupPath))
}
return nil
}
// Restore восстанавливает данные из резервной копии
func (s *Storage) Restore(backupPath string) error {
if s.logger != nil {
s.logger.Info(fmt.Sprintf("Starting restore from %s", backupPath))
}
data, err := os.ReadFile(backupPath)
if err != nil {
return fmt.Errorf("failed to read backup file: %v", err)
}
var backup map[string][]byte
if err := serializer.Unmarshal(data, &backup); err != nil {
return fmt.Errorf("failed to unmarshal backup: %v", err)
}
for dbName, dbData := range backup {
if !s.ExistsDatabase(dbName) {
if err := s.CreateDatabase(dbName); err != nil {
return fmt.Errorf("failed to create database %s: %v", dbName, err)
}
}
db, err := s.GetDatabase(dbName)
if err != nil {
return fmt.Errorf("failed to get database %s: %v", dbName, err)
}
if err := db.DeserializeDatabase(dbData); err != nil {
return fmt.Errorf("failed to restore database %s: %v", dbName, err)
}
}
if s.logger != nil {
s.logger.Info("Restore completed")
}
return nil
}
// GetStats возвращает статистику хранилища
func (s *Storage) GetStats() map[string]interface{} {
stats := map[string]interface{}{
"total_databases": s.GetDatabaseCount(),
"total_documents": s.GetTotalDocuments(),
"page_size_bytes": s.pageSize,
}
databases := make([]map[string]interface{}, 0)
s.databases.Range(func(key, value interface{}) bool {
db := value.(*Database)
dbStats := map[string]interface{}{
"name": db.name,
"collections": len(db.ListCollections()),
}
totalDocs := int64(0)
totalSize := int64(0)
db.collections.Range(func(k, v interface{}) bool {
coll := v.(*Collection)
totalDocs += coll.Count()
totalSize += coll.Size()
return true
})
dbStats["documents"] = totalDocs
dbStats["size_bytes"] = totalSize
databases = append(databases, dbStats)
return true
})
stats["databases"] = databases
return stats
}