Upload files to "internal/storage"
This commit is contained in:
431
internal/storage/engine.go
Normal file
431
internal/storage/engine.go
Normal file
@@ -0,0 +1,431 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2026 Safronov Grigorii
|
||||||
|
*
|
||||||
|
* Licensed under the CDDL, Version 1.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
*
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* https://opensource.org/licenses/CDDL-1.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Файл: internal/storage/engine.go
|
||||||
|
// Назначение: In-memory движок хранения документов с поддержкой коллекций,
|
||||||
|
// слайсов (аналог БД), тапплов (аналог таблиц), полей и кортежей.
|
||||||
|
// Полностью wait-free с использованием sync.Map и атомарных операций.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"futriis/internal/log"
|
||||||
|
"futriis/internal/serializer"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Storage представляет основное хранилище баз данных
|
||||||
|
type Storage struct {
|
||||||
|
databases sync.Map // map[string]*Database
|
||||||
|
pageSize int64
|
||||||
|
logger *log.Logger
|
||||||
|
totalDocs atomic.Int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Database представляет базу данных (аналог слайса в реляционных СУБД)
|
||||||
|
type Database struct {
|
||||||
|
name string
|
||||||
|
collections sync.Map // map[string]*Collection
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStorage создаёт новый экземпляр хранилища
|
||||||
|
func NewStorage(pageSizeMB int, logger *log.Logger) *Storage {
|
||||||
|
return &Storage{
|
||||||
|
pageSize: int64(pageSizeMB) * 1024 * 1024,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateDatabase создаёт новую базу данных
|
||||||
|
func (s *Storage) CreateDatabase(name string) error {
|
||||||
|
if _, exists := s.databases.LoadOrStore(name, &Database{name: name}); exists {
|
||||||
|
return fmt.Errorf("database already exists")
|
||||||
|
}
|
||||||
|
AuditDatabaseOperation("CREATE", name)
|
||||||
|
if s.logger != nil {
|
||||||
|
s.logger.Info("Database created: " + name)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDatabase возвращает базу данных по имени
|
||||||
|
func (s *Storage) GetDatabase(name string) (*Database, error) {
|
||||||
|
if val, ok := s.databases.Load(name); ok {
|
||||||
|
return val.(*Database), nil
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("database not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// DropDatabase удаляет базу данных
|
||||||
|
func (s *Storage) DropDatabase(name string) error {
|
||||||
|
if _, ok := s.databases.LoadAndDelete(name); !ok {
|
||||||
|
return fmt.Errorf("database not found")
|
||||||
|
}
|
||||||
|
AuditDatabaseOperation("DROP", name)
|
||||||
|
if s.logger != nil {
|
||||||
|
s.logger.Info("Database dropped: " + name)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListDatabases возвращает список всех баз данных
|
||||||
|
func (s *Storage) ListDatabases() []string {
|
||||||
|
databases := make([]string, 0)
|
||||||
|
s.databases.Range(func(key, value interface{}) bool {
|
||||||
|
databases = append(databases, key.(string))
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return databases
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name возвращает имя базы данных
|
||||||
|
func (db *Database) Name() string {
|
||||||
|
return db.name
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateCollection создаёт новую коллекцию в базе данных
|
||||||
|
func (db *Database) CreateCollection(name string) error {
|
||||||
|
if _, exists := db.collections.LoadOrStore(name, NewCollection(db.name, name, nil)); exists {
|
||||||
|
return fmt.Errorf("collection already exists")
|
||||||
|
}
|
||||||
|
AuditCollectionOperation("CREATE", db.name, name, nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateCollectionWithSettings создаёт коллекцию с настройками
|
||||||
|
func (db *Database) CreateCollectionWithSettings(name string, settings *CollectionSettings) error {
|
||||||
|
if _, exists := db.collections.LoadOrStore(name, NewCollection(db.name, name, settings)); exists {
|
||||||
|
return fmt.Errorf("collection already exists")
|
||||||
|
}
|
||||||
|
AuditCollectionOperation("CREATE", db.name, name, settings)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetCollection возвращает коллекцию по имени
|
||||||
|
func (db *Database) GetCollection(name string) (*Collection, error) {
|
||||||
|
if val, ok := db.collections.Load(name); ok {
|
||||||
|
return val.(*Collection), nil
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("collection not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// DropCollection удаляет коллекцию
|
||||||
|
func (db *Database) DropCollection(name string) error {
|
||||||
|
if _, ok := db.collections.LoadAndDelete(name); !ok {
|
||||||
|
return fmt.Errorf("collection not found")
|
||||||
|
}
|
||||||
|
AuditCollectionOperation("DROP", db.name, name, nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListCollections возвращает список всех коллекций в базе данных
|
||||||
|
func (db *Database) ListCollections() []string {
|
||||||
|
collections := make([]string, 0)
|
||||||
|
db.collections.Range(func(key, value interface{}) bool {
|
||||||
|
collections = append(collections, key.(string))
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return collections
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTotalDocuments возвращает общее количество документов во всех коллекциях
|
||||||
|
func (s *Storage) GetTotalDocuments() int64 {
|
||||||
|
return s.totalDocs.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPageSize возвращает размер страницы памяти
|
||||||
|
func (s *Storage) GetPageSize() int64 {
|
||||||
|
return s.pageSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// SerializeDatabase сериализует всю базу данных в MessagePack
|
||||||
|
func (db *Database) SerializeDatabase() ([]byte, error) {
|
||||||
|
dbData := make(map[string]interface{})
|
||||||
|
|
||||||
|
db.collections.Range(func(key, value interface{}) bool {
|
||||||
|
coll := value.(*Collection)
|
||||||
|
collData := make(map[string]interface{})
|
||||||
|
|
||||||
|
docs := coll.GetAllDocuments()
|
||||||
|
collDocs := make([]*Document, 0, len(docs))
|
||||||
|
for _, doc := range docs {
|
||||||
|
collDocs = append(collDocs, doc)
|
||||||
|
}
|
||||||
|
collData["documents"] = collDocs
|
||||||
|
collData["metadata"] = coll.GetMetadata()
|
||||||
|
|
||||||
|
dbData[key.(string)] = collData
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
return serializer.Marshal(dbData)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeserializeDatabase десериализует базу данных из MessagePack
|
||||||
|
func (db *Database) DeserializeDatabase(data []byte) error {
|
||||||
|
var dbData map[string]interface{}
|
||||||
|
if err := serializer.Unmarshal(data, &dbData); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for collName, collDataRaw := range dbData {
|
||||||
|
collData, ok := collDataRaw.(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
settings := &CollectionSettings{
|
||||||
|
MaxDocuments: 0,
|
||||||
|
ValidateSchema: false,
|
||||||
|
AutoIndexID: true,
|
||||||
|
TTLSeconds: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
if metaRaw, ok := collData["metadata"]; ok {
|
||||||
|
if meta, ok := metaRaw.(map[string]interface{}); ok {
|
||||||
|
if settingsRaw, ok := meta["settings"]; ok {
|
||||||
|
if settingsMap, ok := settingsRaw.(map[string]interface{}); ok {
|
||||||
|
if maxDocs, ok := settingsMap["max_documents"]; ok {
|
||||||
|
if v, ok := maxDocs.(int); ok {
|
||||||
|
settings.MaxDocuments = v
|
||||||
|
} else if v, ok := maxDocs.(int64); ok {
|
||||||
|
settings.MaxDocuments = int(v)
|
||||||
|
} else if v, ok := maxDocs.(float64); ok {
|
||||||
|
settings.MaxDocuments = int(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if validateSchema, ok := settingsMap["validate_schema"]; ok {
|
||||||
|
if v, ok := validateSchema.(bool); ok {
|
||||||
|
settings.ValidateSchema = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if autoIndexID, ok := settingsMap["auto_index_id"]; ok {
|
||||||
|
if v, ok := autoIndexID.(bool); ok {
|
||||||
|
settings.AutoIndexID = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ttlSeconds, ok := settingsMap["ttl_seconds"]; ok {
|
||||||
|
if v, ok := ttlSeconds.(int); ok {
|
||||||
|
settings.TTLSeconds = v
|
||||||
|
} else if v, ok := ttlSeconds.(int64); ok {
|
||||||
|
settings.TTLSeconds = int(v)
|
||||||
|
} else if v, ok := ttlSeconds.(float64); ok {
|
||||||
|
settings.TTLSeconds = int(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if meta, ok := metaRaw.(*CollectionMetadata); ok {
|
||||||
|
if meta.Settings != nil {
|
||||||
|
settings = meta.Settings
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
coll := NewCollection(db.name, collName, settings)
|
||||||
|
|
||||||
|
if docsRaw, ok := collData["documents"]; ok {
|
||||||
|
if docs, ok := docsRaw.([]interface{}); ok {
|
||||||
|
for _, docRaw := range docs {
|
||||||
|
if doc, ok := docRaw.(*Document); ok {
|
||||||
|
coll.Insert(doc)
|
||||||
|
} else if docMap, ok := docRaw.(map[string]interface{}); ok {
|
||||||
|
doc := NewDocument()
|
||||||
|
if id, ok := docMap["ID"].(string); ok {
|
||||||
|
doc.ID = id
|
||||||
|
} else if id, ok := docMap["id"].(string); ok {
|
||||||
|
doc.ID = id
|
||||||
|
}
|
||||||
|
if fields, ok := docMap["fields"]; ok {
|
||||||
|
if fieldsMap, ok := fields.(map[string]interface{}); ok {
|
||||||
|
for k, v := range fieldsMap {
|
||||||
|
doc.SetField(k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if createdAt, ok := docMap["created_at"]; ok {
|
||||||
|
if v, ok := createdAt.(int64); ok {
|
||||||
|
doc.CreatedAt = v
|
||||||
|
} else if v, ok := createdAt.(float64); ok {
|
||||||
|
doc.CreatedAt = int64(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if updatedAt, ok := docMap["updated_at"]; ok {
|
||||||
|
if v, ok := updatedAt.(int64); ok {
|
||||||
|
doc.UpdatedAt = v
|
||||||
|
} else if v, ok := updatedAt.(float64); ok {
|
||||||
|
doc.UpdatedAt = int64(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if version, ok := docMap["version"]; ok {
|
||||||
|
if v, ok := version.(uint64); ok {
|
||||||
|
doc.Version = v
|
||||||
|
} else if v, ok := version.(int64); ok {
|
||||||
|
doc.Version = uint64(v)
|
||||||
|
} else if v, ok := version.(float64); ok {
|
||||||
|
doc.Version = uint64(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
coll.Insert(doc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if docs, ok := docsRaw.([]*Document); ok {
|
||||||
|
for _, doc := range docs {
|
||||||
|
coll.Insert(doc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
db.collections.Store(collName, coll)
|
||||||
|
AuditCollectionOperation("RESTORE", db.name, collName, settings)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDatabaseNames возвращает имена всех баз данных
|
||||||
|
func (s *Storage) GetDatabaseNames() []string {
|
||||||
|
return s.ListDatabases()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExistsDatabase проверяет существование базы данных
|
||||||
|
func (s *Storage) ExistsDatabase(name string) bool {
|
||||||
|
_, ok := s.databases.Load(name)
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDatabaseCount возвращает количество баз данных
|
||||||
|
func (s *Storage) GetDatabaseCount() int {
|
||||||
|
count := 0
|
||||||
|
s.databases.Range(func(key, value interface{}) bool {
|
||||||
|
count++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|
||||||
|
// ========== Дополнительные методы для управления хранилищем ==========
|
||||||
|
|
||||||
|
// Backup создаёт резервную копию всех данных
|
||||||
|
func (s *Storage) Backup(backupPath string) error {
|
||||||
|
if s.logger != nil {
|
||||||
|
s.logger.Info(fmt.Sprintf("Starting backup to %s", backupPath))
|
||||||
|
}
|
||||||
|
|
||||||
|
backup := make(map[string]interface{})
|
||||||
|
|
||||||
|
s.databases.Range(func(key, value interface{}) bool {
|
||||||
|
dbName := key.(string)
|
||||||
|
db := value.(*Database)
|
||||||
|
|
||||||
|
dbData, err := db.SerializeDatabase()
|
||||||
|
if err != nil {
|
||||||
|
if s.logger != nil {
|
||||||
|
s.logger.Error(fmt.Sprintf("Failed to serialize database %s: %v", dbName, err))
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
backup[dbName] = dbData
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
data, err := serializer.Marshal(backup)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal backup: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.WriteFile(backupPath, data, 0644); err != nil {
|
||||||
|
return fmt.Errorf("failed to write backup: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.logger != nil {
|
||||||
|
s.logger.Info(fmt.Sprintf("Backup completed: %s", backupPath))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restore восстанавливает данные из резервной копии
|
||||||
|
func (s *Storage) Restore(backupPath string) error {
|
||||||
|
if s.logger != nil {
|
||||||
|
s.logger.Info(fmt.Sprintf("Starting restore from %s", backupPath))
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := os.ReadFile(backupPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to read backup file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var backup map[string][]byte
|
||||||
|
if err := serializer.Unmarshal(data, &backup); err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal backup: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for dbName, dbData := range backup {
|
||||||
|
if !s.ExistsDatabase(dbName) {
|
||||||
|
if err := s.CreateDatabase(dbName); err != nil {
|
||||||
|
return fmt.Errorf("failed to create database %s: %v", dbName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := s.GetDatabase(dbName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get database %s: %v", dbName, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.DeserializeDatabase(dbData); err != nil {
|
||||||
|
return fmt.Errorf("failed to restore database %s: %v", dbName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.logger != nil {
|
||||||
|
s.logger.Info("Restore completed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetStats возвращает статистику хранилища
|
||||||
|
func (s *Storage) GetStats() map[string]interface{} {
|
||||||
|
stats := map[string]interface{}{
|
||||||
|
"total_databases": s.GetDatabaseCount(),
|
||||||
|
"total_documents": s.GetTotalDocuments(),
|
||||||
|
"page_size_bytes": s.pageSize,
|
||||||
|
}
|
||||||
|
|
||||||
|
databases := make([]map[string]interface{}, 0)
|
||||||
|
s.databases.Range(func(key, value interface{}) bool {
|
||||||
|
db := value.(*Database)
|
||||||
|
dbStats := map[string]interface{}{
|
||||||
|
"name": db.name,
|
||||||
|
"collections": len(db.ListCollections()),
|
||||||
|
}
|
||||||
|
|
||||||
|
totalDocs := int64(0)
|
||||||
|
totalSize := int64(0)
|
||||||
|
db.collections.Range(func(k, v interface{}) bool {
|
||||||
|
coll := v.(*Collection)
|
||||||
|
totalDocs += coll.Count()
|
||||||
|
totalSize += coll.Size()
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
dbStats["documents"] = totalDocs
|
||||||
|
dbStats["size_bytes"] = totalSize
|
||||||
|
databases = append(databases, dbStats)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
stats["databases"] = databases
|
||||||
|
return stats
|
||||||
|
}
|
||||||
722
internal/storage/transaction.go
Normal file
722
internal/storage/transaction.go
Normal file
@@ -0,0 +1,722 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2026 Safronov Grigorii
|
||||||
|
*
|
||||||
|
* Licensed under the CDDL, Version 1.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
*
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* https://opensource.org/licenses/CDDL-1.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Файл: internal/storage/transaction.go
|
||||||
|
// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control)
|
||||||
|
// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"futriis/internal/serializer"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TransactionID представляет уникальный идентификатор транзакции
|
||||||
|
type TransactionID uint64
|
||||||
|
|
||||||
|
// TransactionState представляет состояние транзакции
|
||||||
|
type TransactionState int32
|
||||||
|
|
||||||
|
const (
|
||||||
|
TransactionActive TransactionState = iota
|
||||||
|
TransactionCommitted
|
||||||
|
TransactionAborted
|
||||||
|
)
|
||||||
|
|
||||||
|
// TransactionRecord представляет запись в WAL
|
||||||
|
type TransactionRecord struct {
|
||||||
|
ID TransactionID `msgpack:"id"`
|
||||||
|
State TransactionState `msgpack:"state"`
|
||||||
|
Timestamp int64 `msgpack:"timestamp"`
|
||||||
|
Operations []Operation `msgpack:"operations"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Operation представляет одну операцию в транзакции
|
||||||
|
type Operation struct {
|
||||||
|
Type string `msgpack:"type"` // "insert", "update", "delete"
|
||||||
|
Database string `msgpack:"database"`
|
||||||
|
Collection string `msgpack:"collection"`
|
||||||
|
DocumentID string `msgpack:"document_id"`
|
||||||
|
Data map[string]interface{} `msgpack:"data"`
|
||||||
|
Version uint64 `msgpack:"version"`
|
||||||
|
OldData map[string]interface{} `msgpack:"old_data"` // для отката
|
||||||
|
}
|
||||||
|
|
||||||
|
// DocumentVersion представляет версию документа для MVCC
|
||||||
|
type DocumentVersion struct {
|
||||||
|
Document *Document `msgpack:"document"`
|
||||||
|
Timestamp int64 `msgpack:"timestamp"`
|
||||||
|
TxID TransactionID `msgpack:"tx_id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransactionInfo представляет информацию о транзакции для API
|
||||||
|
type TransactionInfo struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
StartTime int64 `json:"start_time"`
|
||||||
|
OperationCount int `json:"operation_count"`
|
||||||
|
Operations []OperationInfo `json:"operations,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// OperationInfo представляет информацию об операции для API
|
||||||
|
type OperationInfo struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Database string `json:"database"`
|
||||||
|
Collection string `json:"collection"`
|
||||||
|
DocumentID string `json:"document_id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoggerInterface определяет интерфейс для логирования
|
||||||
|
type LoggerInterface interface {
|
||||||
|
Debug(msg string)
|
||||||
|
Info(msg string)
|
||||||
|
Error(msg string)
|
||||||
|
Warn(msg string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransactionManager управляет транзакциями
|
||||||
|
type TransactionManager struct {
|
||||||
|
activeTransactions sync.Map // map[TransactionID]*Transaction
|
||||||
|
nextTxID atomic.Uint64
|
||||||
|
wal *WriteAheadLog
|
||||||
|
logger LoggerInterface
|
||||||
|
mu sync.RWMutex
|
||||||
|
walPath string
|
||||||
|
snapshotInterval int64 // интервал создания снапшотов в секундах
|
||||||
|
lastSnapshot int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transaction представляет одну транзакцию
|
||||||
|
type Transaction struct {
|
||||||
|
ID TransactionID
|
||||||
|
State atomic.Int32
|
||||||
|
Operations []Operation
|
||||||
|
StartTime int64
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteAheadLog реализует журнал предзаписи
|
||||||
|
type WriteAheadLog struct {
|
||||||
|
file *os.File
|
||||||
|
writeChan chan []byte
|
||||||
|
done chan struct{}
|
||||||
|
mu sync.RWMutex
|
||||||
|
syncInterval time.Duration
|
||||||
|
lastSync time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
globalTxManager *TransactionManager
|
||||||
|
txManagerOnce sync.Once
|
||||||
|
currentTx atomic.Value // *Transaction
|
||||||
|
)
|
||||||
|
|
||||||
|
// InitTransactionManager инициализирует глобальный менеджер транзакций
|
||||||
|
func InitTransactionManager(walPath string) error {
|
||||||
|
var err error
|
||||||
|
txManagerOnce.Do(func() {
|
||||||
|
globalTxManager = &TransactionManager{
|
||||||
|
nextTxID: atomic.Uint64{},
|
||||||
|
walPath: walPath,
|
||||||
|
snapshotInterval: 300, // 5 минут
|
||||||
|
lastSnapshot: time.Now().Unix(),
|
||||||
|
}
|
||||||
|
globalTxManager.nextTxID.Store(1)
|
||||||
|
err = globalTxManager.initWAL(walPath)
|
||||||
|
if err == nil {
|
||||||
|
// Восстанавливаем состояние из WAL
|
||||||
|
go globalTxManager.recoverFromWAL()
|
||||||
|
// Запускаем периодическое создание снапшотов
|
||||||
|
go globalTxManager.snapshotLoop()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetTransactionLogger устанавливает логгер для транзакций
|
||||||
|
func SetTransactionLogger(logger LoggerInterface) {
|
||||||
|
if globalTxManager != nil {
|
||||||
|
globalTxManager.logger = logger
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// initWAL инициализирует Write-Ahead Log
|
||||||
|
func (tm *TransactionManager) initWAL(walPath string) error {
|
||||||
|
wal, err := NewWriteAheadLog(walPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tm.wal = wal
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWriteAheadLog создаёт новый WAL
|
||||||
|
func NewWriteAheadLog(path string) (*WriteAheadLog, error) {
|
||||||
|
file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
wal := &WriteAheadLog{
|
||||||
|
file: file,
|
||||||
|
writeChan: make(chan []byte, 10000),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
syncInterval: 5 * time.Second,
|
||||||
|
lastSync: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
go wal.writerLoop()
|
||||||
|
|
||||||
|
return wal, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// writerLoop асинхронно записывает данные в WAL
|
||||||
|
func (wal *WriteAheadLog) writerLoop() {
|
||||||
|
batch := make([][]byte, 0, 100)
|
||||||
|
ticker := time.NewTicker(wal.syncInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case data, ok := <-wal.writeChan:
|
||||||
|
if !ok {
|
||||||
|
if len(batch) > 0 {
|
||||||
|
wal.flushBatch(batch)
|
||||||
|
}
|
||||||
|
close(wal.done)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
batch = append(batch, data)
|
||||||
|
if len(batch) >= 100 {
|
||||||
|
wal.flushBatch(batch)
|
||||||
|
batch = batch[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-ticker.C:
|
||||||
|
if len(batch) > 0 {
|
||||||
|
wal.flushBatch(batch)
|
||||||
|
batch = batch[:0]
|
||||||
|
}
|
||||||
|
wal.sync()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// flushBatch записывает пакет данных в файл
|
||||||
|
func (wal *WriteAheadLog) flushBatch(batch [][]byte) {
|
||||||
|
wal.mu.Lock()
|
||||||
|
defer wal.mu.Unlock()
|
||||||
|
|
||||||
|
for _, data := range batch {
|
||||||
|
lenBuf := make([]byte, 4)
|
||||||
|
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
|
||||||
|
|
||||||
|
if _, err := wal.file.Write(lenBuf); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := wal.file.Write(data); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sync синхронизирует файл с диском
|
||||||
|
func (wal *WriteAheadLog) sync() {
|
||||||
|
wal.mu.Lock()
|
||||||
|
defer wal.mu.Unlock()
|
||||||
|
|
||||||
|
if time.Since(wal.lastSync) >= wal.syncInterval {
|
||||||
|
wal.file.Sync()
|
||||||
|
wal.lastSync = time.Now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write записывает запись в WAL
|
||||||
|
func (wal *WriteAheadLog) Write(record *TransactionRecord) error {
|
||||||
|
data, err := serializer.Marshal(record)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case wal.writeChan <- data:
|
||||||
|
return nil
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
return fmt.Errorf("WAL write timeout")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close закрывает WAL
|
||||||
|
func (wal *WriteAheadLog) Close() error {
|
||||||
|
close(wal.writeChan)
|
||||||
|
<-wal.done
|
||||||
|
return wal.file.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadAll читает все записи из WAL
|
||||||
|
func (wal *WriteAheadLog) ReadAll() ([]*TransactionRecord, error) {
|
||||||
|
wal.mu.RLock()
|
||||||
|
defer wal.mu.RUnlock()
|
||||||
|
|
||||||
|
if _, err := wal.file.Seek(0, 0); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
records := make([]*TransactionRecord, 0)
|
||||||
|
buf := make([]byte, 4)
|
||||||
|
|
||||||
|
for {
|
||||||
|
_, err := wal.file.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
recordLen := binary.BigEndian.Uint32(buf)
|
||||||
|
recordData := make([]byte, recordLen)
|
||||||
|
_, err = wal.file.Read(recordData)
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
var record TransactionRecord
|
||||||
|
if err := serializer.Unmarshal(recordData, &record); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
records = append(records, &record)
|
||||||
|
}
|
||||||
|
|
||||||
|
return records, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// BeginTransaction начинает новую транзакцию
|
||||||
|
func BeginTransaction() *Transaction {
|
||||||
|
if globalTxManager == nil {
|
||||||
|
InitTransactionManager("futriis.wal")
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := &Transaction{
|
||||||
|
ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1),
|
||||||
|
StartTime: time.Now().UnixMilli(),
|
||||||
|
Operations: make([]Operation, 0),
|
||||||
|
}
|
||||||
|
tx.State.Store(int32(TransactionActive))
|
||||||
|
|
||||||
|
globalTxManager.activeTransactions.Store(tx.ID, tx)
|
||||||
|
currentTx.Store(tx)
|
||||||
|
|
||||||
|
// Записываем начало транзакции в WAL
|
||||||
|
record := &TransactionRecord{
|
||||||
|
ID: tx.ID,
|
||||||
|
State: TransactionActive,
|
||||||
|
Timestamp: tx.StartTime,
|
||||||
|
Operations: tx.Operations,
|
||||||
|
}
|
||||||
|
if globalTxManager.wal != nil {
|
||||||
|
globalTxManager.wal.Write(record)
|
||||||
|
}
|
||||||
|
|
||||||
|
if globalTxManager.logger != nil {
|
||||||
|
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started", tx.ID))
|
||||||
|
}
|
||||||
|
|
||||||
|
AuditLog("START", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
|
||||||
|
"start_time": tx.StartTime,
|
||||||
|
})
|
||||||
|
|
||||||
|
return tx
|
||||||
|
}
|
||||||
|
|
||||||
|
// BeginTransactionOnCollection начинает транзакцию для конкретной коллекции
|
||||||
|
func BeginTransactionOnCollection(coll *Collection) error {
|
||||||
|
if globalTxManager == nil {
|
||||||
|
if err := InitTransactionManager("futriis.wal"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := BeginTransaction()
|
||||||
|
if tx == nil {
|
||||||
|
return fmt.Errorf("failed to create transaction")
|
||||||
|
}
|
||||||
|
|
||||||
|
if globalTxManager.logger != nil {
|
||||||
|
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started on collection %s.%s", tx.ID, coll.dbName, coll.name))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddToTransaction добавляет операцию в текущую транзакцию
|
||||||
|
func AddToTransaction(coll *Collection, opType string, docID string, newData map[string]interface{}, oldData map[string]interface{}, version uint64) error {
|
||||||
|
txVal := currentTx.Load()
|
||||||
|
if txVal == nil {
|
||||||
|
return fmt.Errorf("no active transaction")
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := txVal.(*Transaction)
|
||||||
|
if TransactionState(tx.State.Load()) != TransactionActive {
|
||||||
|
return fmt.Errorf("transaction is not active")
|
||||||
|
}
|
||||||
|
|
||||||
|
op := Operation{
|
||||||
|
Type: opType,
|
||||||
|
Database: coll.dbName,
|
||||||
|
Collection: coll.name,
|
||||||
|
DocumentID: docID,
|
||||||
|
Data: newData,
|
||||||
|
OldData: oldData,
|
||||||
|
Version: version,
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.mu.Lock()
|
||||||
|
tx.Operations = append(tx.Operations, op)
|
||||||
|
tx.mu.Unlock()
|
||||||
|
|
||||||
|
if globalTxManager.logger != nil {
|
||||||
|
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d: added %s operation on %s", tx.ID, opType, docID))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CommitCurrentTransaction коммитит текущую транзакцию
|
||||||
|
func CommitCurrentTransaction() error {
|
||||||
|
txVal := currentTx.Load()
|
||||||
|
if txVal == nil {
|
||||||
|
return fmt.Errorf("no active transaction")
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := txVal.(*Transaction)
|
||||||
|
if TransactionState(tx.State.Load()) != TransactionActive {
|
||||||
|
return fmt.Errorf("transaction is not active")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Применяем все операции атомарно
|
||||||
|
for _, op := range tx.Operations {
|
||||||
|
if err := applyOperation(op); err != nil {
|
||||||
|
// Откатываем при ошибке
|
||||||
|
AbortCurrentTransaction()
|
||||||
|
return fmt.Errorf("transaction commit failed at operation %s: %v", op.Type, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.State.Store(int32(TransactionCommitted))
|
||||||
|
|
||||||
|
// Записываем коммит в WAL
|
||||||
|
record := &TransactionRecord{
|
||||||
|
ID: tx.ID,
|
||||||
|
State: TransactionCommitted,
|
||||||
|
Timestamp: time.Now().UnixMilli(),
|
||||||
|
Operations: tx.Operations,
|
||||||
|
}
|
||||||
|
if globalTxManager.wal != nil {
|
||||||
|
globalTxManager.wal.Write(record)
|
||||||
|
}
|
||||||
|
|
||||||
|
if globalTxManager.logger != nil {
|
||||||
|
globalTxManager.logger.Info(fmt.Sprintf("Transaction %d committed with %d operations", tx.ID, len(tx.Operations)))
|
||||||
|
}
|
||||||
|
|
||||||
|
AuditLog("COMMIT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
|
||||||
|
"operations": len(tx.Operations),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Очищаем текущую транзакцию
|
||||||
|
currentTx.Store(nil)
|
||||||
|
globalTxManager.activeTransactions.Delete(tx.ID)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AbortCurrentTransaction откатывает текущую транзакцию
|
||||||
|
func AbortCurrentTransaction() error {
|
||||||
|
txVal := currentTx.Load()
|
||||||
|
if txVal == nil {
|
||||||
|
return fmt.Errorf("no active transaction")
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := txVal.(*Transaction)
|
||||||
|
tx.State.Store(int32(TransactionAborted))
|
||||||
|
|
||||||
|
// Записываем откат в WAL
|
||||||
|
record := &TransactionRecord{
|
||||||
|
ID: tx.ID,
|
||||||
|
State: TransactionAborted,
|
||||||
|
Timestamp: time.Now().UnixMilli(),
|
||||||
|
Operations: tx.Operations,
|
||||||
|
}
|
||||||
|
if globalTxManager.wal != nil {
|
||||||
|
globalTxManager.wal.Write(record)
|
||||||
|
}
|
||||||
|
|
||||||
|
if globalTxManager.logger != nil {
|
||||||
|
globalTxManager.logger.Info(fmt.Sprintf("Transaction %d aborted with %d operations", tx.ID, len(tx.Operations)))
|
||||||
|
}
|
||||||
|
|
||||||
|
AuditLog("ABORT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
|
||||||
|
"operations": len(tx.Operations),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Очищаем текущую транзакцию
|
||||||
|
currentTx.Store(nil)
|
||||||
|
globalTxManager.activeTransactions.Delete(tx.ID)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasActiveTransaction проверяет наличие активной транзакции
|
||||||
|
func HasActiveTransaction() bool {
|
||||||
|
return currentTx.Load() != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetCurrentTransactionID возвращает ID текущей транзакции
|
||||||
|
func GetCurrentTransactionID() string {
|
||||||
|
txVal := currentTx.Load()
|
||||||
|
if txVal == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
tx := txVal.(*Transaction)
|
||||||
|
return fmt.Sprintf("%d", tx.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetActiveTransactions возвращает список активных транзакций для API
|
||||||
|
func GetActiveTransactions() []TransactionInfo {
|
||||||
|
if globalTxManager == nil {
|
||||||
|
return []TransactionInfo{}
|
||||||
|
}
|
||||||
|
|
||||||
|
transactions := make([]TransactionInfo, 0)
|
||||||
|
|
||||||
|
globalTxManager.activeTransactions.Range(func(key, value interface{}) bool {
|
||||||
|
tx := value.(*Transaction)
|
||||||
|
status := "active"
|
||||||
|
if TransactionState(tx.State.Load()) == TransactionCommitted {
|
||||||
|
status = "committed"
|
||||||
|
} else if TransactionState(tx.State.Load()) == TransactionAborted {
|
||||||
|
status = "aborted"
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.mu.RLock()
|
||||||
|
opCount := len(tx.Operations)
|
||||||
|
operations := make([]OperationInfo, 0, opCount)
|
||||||
|
for _, op := range tx.Operations {
|
||||||
|
operations = append(operations, OperationInfo{
|
||||||
|
Type: op.Type,
|
||||||
|
Database: op.Database,
|
||||||
|
Collection: op.Collection,
|
||||||
|
DocumentID: op.DocumentID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
tx.mu.RUnlock()
|
||||||
|
|
||||||
|
transactions = append(transactions, TransactionInfo{
|
||||||
|
ID: fmt.Sprintf("%d", tx.ID),
|
||||||
|
Status: status,
|
||||||
|
StartTime: tx.StartTime,
|
||||||
|
OperationCount: opCount,
|
||||||
|
Operations: operations,
|
||||||
|
})
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
return transactions
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTransactionByID возвращает транзакцию по ID
|
||||||
|
func GetTransactionByID(id string) (*Transaction, error) {
|
||||||
|
if globalTxManager == nil {
|
||||||
|
return nil, fmt.Errorf("transaction manager not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
var txID TransactionID
|
||||||
|
fmt.Sscanf(id, "%d", &txID)
|
||||||
|
|
||||||
|
if val, ok := globalTxManager.activeTransactions.Load(txID); ok {
|
||||||
|
return val.(*Transaction), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("transaction not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindInTransaction ищет документ в контексте транзакции
|
||||||
|
func FindInTransaction(coll *Collection, id string) (*Document, error) {
|
||||||
|
txVal := currentTx.Load()
|
||||||
|
if txVal == nil {
|
||||||
|
return coll.Find(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := txVal.(*Transaction)
|
||||||
|
|
||||||
|
tx.mu.RLock()
|
||||||
|
defer tx.mu.RUnlock()
|
||||||
|
|
||||||
|
// Ищем в операциях транзакции в обратном порядке
|
||||||
|
for i := len(tx.Operations) - 1; i >= 0; i-- {
|
||||||
|
op := tx.Operations[i]
|
||||||
|
if op.DocumentID == id {
|
||||||
|
if op.Type == "delete" {
|
||||||
|
return nil, fmt.Errorf("document deleted in transaction")
|
||||||
|
}
|
||||||
|
if op.Type == "insert" || op.Type == "update" {
|
||||||
|
doc := NewDocumentWithID(op.DocumentID)
|
||||||
|
for k, v := range op.Data {
|
||||||
|
doc.SetField(k, v)
|
||||||
|
}
|
||||||
|
doc.Version = op.Version
|
||||||
|
return doc, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return coll.Find(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// applyOperation применяет операцию к хранилищу
|
||||||
|
func applyOperation(op Operation) error {
|
||||||
|
db, err := GetGlobalStorage().GetDatabase(op.Database)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("database not found: %s", op.Database)
|
||||||
|
}
|
||||||
|
|
||||||
|
coll, err := db.GetCollection(op.Collection)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("collection not found: %s", op.Collection)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch op.Type {
|
||||||
|
case "insert":
|
||||||
|
doc := NewDocumentWithID(op.DocumentID)
|
||||||
|
for k, v := range op.Data {
|
||||||
|
doc.SetField(k, v)
|
||||||
|
}
|
||||||
|
doc.Version = op.Version
|
||||||
|
return coll.Insert(doc)
|
||||||
|
|
||||||
|
case "update":
|
||||||
|
return coll.Update(op.DocumentID, op.Data)
|
||||||
|
|
||||||
|
case "delete":
|
||||||
|
return coll.Delete(op.DocumentID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// recoverFromWAL восстанавливает состояние из WAL после сбоя
|
||||||
|
func (tm *TransactionManager) recoverFromWAL() {
|
||||||
|
if tm.wal == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
records, err := tm.wal.ReadAll()
|
||||||
|
if err != nil {
|
||||||
|
if tm.logger != nil {
|
||||||
|
tm.logger.Error(fmt.Sprintf("Failed to read WAL: %v", err))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, record := range records {
|
||||||
|
if record.State == TransactionCommitted {
|
||||||
|
// Повторно применяем закоммиченные операции
|
||||||
|
for _, op := range record.Operations {
|
||||||
|
if err := applyOperation(op); err != nil {
|
||||||
|
if tm.logger != nil {
|
||||||
|
tm.logger.Error(fmt.Sprintf("Failed to replay operation: %v", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Транзакции в состоянии Active или Aborted игнорируем
|
||||||
|
}
|
||||||
|
|
||||||
|
if tm.logger != nil {
|
||||||
|
tm.logger.Info(fmt.Sprintf("Recovered %d transactions from WAL", len(records)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// snapshotLoop периодически создаёт снапшоты
|
||||||
|
func (tm *TransactionManager) snapshotLoop() {
|
||||||
|
ticker := time.NewTicker(time.Duration(tm.snapshotInterval) * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for range ticker.C {
|
||||||
|
tm.createSnapshot()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// createSnapshot создаёт снапшот текущего состояния
|
||||||
|
func (tm *TransactionManager) createSnapshot() {
|
||||||
|
now := time.Now().Unix()
|
||||||
|
if now-tm.lastSnapshot < tm.snapshotInterval {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshotPath := fmt.Sprintf("%s.snapshot.%d", tm.walPath, now)
|
||||||
|
if err := GetGlobalStorage().Backup(snapshotPath); err != nil {
|
||||||
|
if tm.logger != nil {
|
||||||
|
tm.logger.Error(fmt.Sprintf("Failed to create snapshot: %v", err))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
tm.lastSnapshot = now
|
||||||
|
|
||||||
|
if tm.logger != nil {
|
||||||
|
tm.logger.Info(fmt.Sprintf("Snapshot created: %s", snapshotPath))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Удаляем старые снапшоты
|
||||||
|
go tm.cleanOldSnapshots()
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanOldSnapshots удаляет старые снапшоты
|
||||||
|
func (tm *TransactionManager) cleanOldSnapshots() {
|
||||||
|
// Реализация удаления старых снапшотов
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetGlobalStorage возвращает глобальное хранилище (должно быть установлено при инициализации)
|
||||||
|
var globalStorage *Storage
|
||||||
|
|
||||||
|
// SetGlobalStorage устанавливает глобальное хранилище
|
||||||
|
func SetGlobalStorage(s *Storage) {
|
||||||
|
globalStorage = s
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetGlobalStorage возвращает глобальное хранилище
|
||||||
|
func GetGlobalStorage() *Storage {
|
||||||
|
return globalStorage
|
||||||
|
}
|
||||||
|
|
||||||
|
// AuditLog записывает событие в аудит
|
||||||
|
func AuditLog(operation, dataType, name string, details map[string]interface{}) {
|
||||||
|
LogAudit(operation, dataType, name, details)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MVCCSnapshot создаёт снапшот текущего состояния для MVCC
|
||||||
|
func MVCCSnapshot() uint64 {
|
||||||
|
return uint64(time.Now().UnixNano())
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateDocumentVersion создаёт новую версию документа для MVCC
|
||||||
|
func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion {
|
||||||
|
return &DocumentVersion{
|
||||||
|
Document: doc.Clone(),
|
||||||
|
Timestamp: time.Now().UnixMilli(),
|
||||||
|
TxID: txID,
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user