Files
futriix/internal/storage/transaction.go

723 lines
22 KiB
Go
Raw Normal View History

2026-05-01 17:07:26 +00:00
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/storage/transaction.go
// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control)
// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование.
package storage
import (
"encoding/binary"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"futriis/internal/serializer"
)
// TransactionID представляет уникальный идентификатор транзакции
type TransactionID uint64
// TransactionState представляет состояние транзакции
type TransactionState int32
const (
TransactionActive TransactionState = iota
TransactionCommitted
TransactionAborted
)
// TransactionRecord представляет запись в WAL
type TransactionRecord struct {
ID TransactionID `msgpack:"id"`
State TransactionState `msgpack:"state"`
Timestamp int64 `msgpack:"timestamp"`
Operations []Operation `msgpack:"operations"`
}
// Operation представляет одну операцию в транзакции
type Operation struct {
Type string `msgpack:"type"` // "insert", "update", "delete"
Database string `msgpack:"database"`
Collection string `msgpack:"collection"`
DocumentID string `msgpack:"document_id"`
Data map[string]interface{} `msgpack:"data"`
Version uint64 `msgpack:"version"`
OldData map[string]interface{} `msgpack:"old_data"` // для отката
}
// DocumentVersion представляет версию документа для MVCC
type DocumentVersion struct {
Document *Document `msgpack:"document"`
Timestamp int64 `msgpack:"timestamp"`
TxID TransactionID `msgpack:"tx_id"`
}
// TransactionInfo представляет информацию о транзакции для API
type TransactionInfo struct {
ID string `json:"id"`
Status string `json:"status"`
StartTime int64 `json:"start_time"`
OperationCount int `json:"operation_count"`
Operations []OperationInfo `json:"operations,omitempty"`
}
// OperationInfo представляет информацию об операции для API
type OperationInfo struct {
Type string `json:"type"`
Database string `json:"database"`
Collection string `json:"collection"`
DocumentID string `json:"document_id"`
}
// LoggerInterface определяет интерфейс для логирования
type LoggerInterface interface {
Debug(msg string)
Info(msg string)
Error(msg string)
Warn(msg string)
}
// TransactionManager управляет транзакциями
type TransactionManager struct {
activeTransactions sync.Map // map[TransactionID]*Transaction
nextTxID atomic.Uint64
wal *WriteAheadLog
logger LoggerInterface
mu sync.RWMutex
walPath string
snapshotInterval int64 // интервал создания снапшотов в секундах
lastSnapshot int64
}
// Transaction представляет одну транзакцию
type Transaction struct {
ID TransactionID
State atomic.Int32
Operations []Operation
StartTime int64
mu sync.RWMutex
}
// WriteAheadLog реализует журнал предзаписи
type WriteAheadLog struct {
file *os.File
writeChan chan []byte
done chan struct{}
mu sync.RWMutex
syncInterval time.Duration
lastSync time.Time
}
var (
globalTxManager *TransactionManager
txManagerOnce sync.Once
currentTx atomic.Value // *Transaction
)
// InitTransactionManager инициализирует глобальный менеджер транзакций
func InitTransactionManager(walPath string) error {
var err error
txManagerOnce.Do(func() {
globalTxManager = &TransactionManager{
nextTxID: atomic.Uint64{},
walPath: walPath,
snapshotInterval: 300, // 5 минут
lastSnapshot: time.Now().Unix(),
}
globalTxManager.nextTxID.Store(1)
err = globalTxManager.initWAL(walPath)
if err == nil {
// Восстанавливаем состояние из WAL
go globalTxManager.recoverFromWAL()
// Запускаем периодическое создание снапшотов
go globalTxManager.snapshotLoop()
}
})
return err
}
// SetTransactionLogger устанавливает логгер для транзакций
func SetTransactionLogger(logger LoggerInterface) {
if globalTxManager != nil {
globalTxManager.logger = logger
}
}
// initWAL инициализирует Write-Ahead Log
func (tm *TransactionManager) initWAL(walPath string) error {
wal, err := NewWriteAheadLog(walPath)
if err != nil {
return err
}
tm.wal = wal
return nil
}
// NewWriteAheadLog создаёт новый WAL
func NewWriteAheadLog(path string) (*WriteAheadLog, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
wal := &WriteAheadLog{
file: file,
writeChan: make(chan []byte, 10000),
done: make(chan struct{}),
syncInterval: 5 * time.Second,
lastSync: time.Now(),
}
go wal.writerLoop()
return wal, nil
}
// writerLoop асинхронно записывает данные в WAL
func (wal *WriteAheadLog) writerLoop() {
batch := make([][]byte, 0, 100)
ticker := time.NewTicker(wal.syncInterval)
defer ticker.Stop()
for {
select {
case data, ok := <-wal.writeChan:
if !ok {
if len(batch) > 0 {
wal.flushBatch(batch)
}
close(wal.done)
return
}
batch = append(batch, data)
if len(batch) >= 100 {
wal.flushBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
wal.flushBatch(batch)
batch = batch[:0]
}
wal.sync()
}
}
}
// flushBatch записывает пакет данных в файл
func (wal *WriteAheadLog) flushBatch(batch [][]byte) {
wal.mu.Lock()
defer wal.mu.Unlock()
for _, data := range batch {
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
if _, err := wal.file.Write(lenBuf); err != nil {
continue
}
if _, err := wal.file.Write(data); err != nil {
continue
}
}
}
// sync синхронизирует файл с диском
func (wal *WriteAheadLog) sync() {
wal.mu.Lock()
defer wal.mu.Unlock()
if time.Since(wal.lastSync) >= wal.syncInterval {
wal.file.Sync()
wal.lastSync = time.Now()
}
}
// Write записывает запись в WAL
func (wal *WriteAheadLog) Write(record *TransactionRecord) error {
data, err := serializer.Marshal(record)
if err != nil {
return err
}
select {
case wal.writeChan <- data:
return nil
case <-time.After(5 * time.Second):
return fmt.Errorf("WAL write timeout")
}
}
// Close закрывает WAL
func (wal *WriteAheadLog) Close() error {
close(wal.writeChan)
<-wal.done
return wal.file.Close()
}
// ReadAll читает все записи из WAL
func (wal *WriteAheadLog) ReadAll() ([]*TransactionRecord, error) {
wal.mu.RLock()
defer wal.mu.RUnlock()
if _, err := wal.file.Seek(0, 0); err != nil {
return nil, err
}
records := make([]*TransactionRecord, 0)
buf := make([]byte, 4)
for {
_, err := wal.file.Read(buf)
if err != nil {
break
}
recordLen := binary.BigEndian.Uint32(buf)
recordData := make([]byte, recordLen)
_, err = wal.file.Read(recordData)
if err != nil {
break
}
var record TransactionRecord
if err := serializer.Unmarshal(recordData, &record); err != nil {
continue
}
records = append(records, &record)
}
return records, nil
}
// BeginTransaction начинает новую транзакцию
func BeginTransaction() *Transaction {
if globalTxManager == nil {
InitTransactionManager("futriis.wal")
}
tx := &Transaction{
ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1),
StartTime: time.Now().UnixMilli(),
Operations: make([]Operation, 0),
}
tx.State.Store(int32(TransactionActive))
globalTxManager.activeTransactions.Store(tx.ID, tx)
currentTx.Store(tx)
// Записываем начало транзакции в WAL
record := &TransactionRecord{
ID: tx.ID,
State: TransactionActive,
Timestamp: tx.StartTime,
Operations: tx.Operations,
}
if globalTxManager.wal != nil {
globalTxManager.wal.Write(record)
}
if globalTxManager.logger != nil {
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started", tx.ID))
}
AuditLog("START", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
"start_time": tx.StartTime,
})
return tx
}
// BeginTransactionOnCollection начинает транзакцию для конкретной коллекции
func BeginTransactionOnCollection(coll *Collection) error {
if globalTxManager == nil {
if err := InitTransactionManager("futriis.wal"); err != nil {
return err
}
}
tx := BeginTransaction()
if tx == nil {
return fmt.Errorf("failed to create transaction")
}
if globalTxManager.logger != nil {
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started on collection %s.%s", tx.ID, coll.dbName, coll.name))
}
return nil
}
// AddToTransaction добавляет операцию в текущую транзакцию
func AddToTransaction(coll *Collection, opType string, docID string, newData map[string]interface{}, oldData map[string]interface{}, version uint64) error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
if TransactionState(tx.State.Load()) != TransactionActive {
return fmt.Errorf("transaction is not active")
}
op := Operation{
Type: opType,
Database: coll.dbName,
Collection: coll.name,
DocumentID: docID,
Data: newData,
OldData: oldData,
Version: version,
}
tx.mu.Lock()
tx.Operations = append(tx.Operations, op)
tx.mu.Unlock()
if globalTxManager.logger != nil {
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d: added %s operation on %s", tx.ID, opType, docID))
}
return nil
}
// CommitCurrentTransaction коммитит текущую транзакцию
func CommitCurrentTransaction() error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
if TransactionState(tx.State.Load()) != TransactionActive {
return fmt.Errorf("transaction is not active")
}
// Применяем все операции атомарно
for _, op := range tx.Operations {
if err := applyOperation(op); err != nil {
// Откатываем при ошибке
AbortCurrentTransaction()
return fmt.Errorf("transaction commit failed at operation %s: %v", op.Type, err)
}
}
tx.State.Store(int32(TransactionCommitted))
// Записываем коммит в WAL
record := &TransactionRecord{
ID: tx.ID,
State: TransactionCommitted,
Timestamp: time.Now().UnixMilli(),
Operations: tx.Operations,
}
if globalTxManager.wal != nil {
globalTxManager.wal.Write(record)
}
if globalTxManager.logger != nil {
globalTxManager.logger.Info(fmt.Sprintf("Transaction %d committed with %d operations", tx.ID, len(tx.Operations)))
}
AuditLog("COMMIT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
"operations": len(tx.Operations),
})
// Очищаем текущую транзакцию
currentTx.Store(nil)
globalTxManager.activeTransactions.Delete(tx.ID)
return nil
}
// AbortCurrentTransaction откатывает текущую транзакцию
func AbortCurrentTransaction() error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
tx.State.Store(int32(TransactionAborted))
// Записываем откат в WAL
record := &TransactionRecord{
ID: tx.ID,
State: TransactionAborted,
Timestamp: time.Now().UnixMilli(),
Operations: tx.Operations,
}
if globalTxManager.wal != nil {
globalTxManager.wal.Write(record)
}
if globalTxManager.logger != nil {
globalTxManager.logger.Info(fmt.Sprintf("Transaction %d aborted with %d operations", tx.ID, len(tx.Operations)))
}
AuditLog("ABORT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
"operations": len(tx.Operations),
})
// Очищаем текущую транзакцию
currentTx.Store(nil)
globalTxManager.activeTransactions.Delete(tx.ID)
return nil
}
// HasActiveTransaction проверяет наличие активной транзакции
func HasActiveTransaction() bool {
return currentTx.Load() != nil
}
// GetCurrentTransactionID возвращает ID текущей транзакции
func GetCurrentTransactionID() string {
txVal := currentTx.Load()
if txVal == nil {
return ""
}
tx := txVal.(*Transaction)
return fmt.Sprintf("%d", tx.ID)
}
// GetActiveTransactions возвращает список активных транзакций для API
func GetActiveTransactions() []TransactionInfo {
if globalTxManager == nil {
return []TransactionInfo{}
}
transactions := make([]TransactionInfo, 0)
globalTxManager.activeTransactions.Range(func(key, value interface{}) bool {
tx := value.(*Transaction)
status := "active"
if TransactionState(tx.State.Load()) == TransactionCommitted {
status = "committed"
} else if TransactionState(tx.State.Load()) == TransactionAborted {
status = "aborted"
}
tx.mu.RLock()
opCount := len(tx.Operations)
operations := make([]OperationInfo, 0, opCount)
for _, op := range tx.Operations {
operations = append(operations, OperationInfo{
Type: op.Type,
Database: op.Database,
Collection: op.Collection,
DocumentID: op.DocumentID,
})
}
tx.mu.RUnlock()
transactions = append(transactions, TransactionInfo{
ID: fmt.Sprintf("%d", tx.ID),
Status: status,
StartTime: tx.StartTime,
OperationCount: opCount,
Operations: operations,
})
return true
})
return transactions
}
// GetTransactionByID возвращает транзакцию по ID
func GetTransactionByID(id string) (*Transaction, error) {
if globalTxManager == nil {
return nil, fmt.Errorf("transaction manager not initialized")
}
var txID TransactionID
fmt.Sscanf(id, "%d", &txID)
if val, ok := globalTxManager.activeTransactions.Load(txID); ok {
return val.(*Transaction), nil
}
return nil, fmt.Errorf("transaction not found")
}
// FindInTransaction ищет документ в контексте транзакции
func FindInTransaction(coll *Collection, id string) (*Document, error) {
txVal := currentTx.Load()
if txVal == nil {
return coll.Find(id)
}
tx := txVal.(*Transaction)
tx.mu.RLock()
defer tx.mu.RUnlock()
// Ищем в операциях транзакции в обратном порядке
for i := len(tx.Operations) - 1; i >= 0; i-- {
op := tx.Operations[i]
if op.DocumentID == id {
if op.Type == "delete" {
return nil, fmt.Errorf("document deleted in transaction")
}
if op.Type == "insert" || op.Type == "update" {
doc := NewDocumentWithID(op.DocumentID)
for k, v := range op.Data {
doc.SetField(k, v)
}
doc.Version = op.Version
return doc, nil
}
}
}
return coll.Find(id)
}
// applyOperation применяет операцию к хранилищу
func applyOperation(op Operation) error {
db, err := GetGlobalStorage().GetDatabase(op.Database)
if err != nil {
return fmt.Errorf("database not found: %s", op.Database)
}
coll, err := db.GetCollection(op.Collection)
if err != nil {
return fmt.Errorf("collection not found: %s", op.Collection)
}
switch op.Type {
case "insert":
doc := NewDocumentWithID(op.DocumentID)
for k, v := range op.Data {
doc.SetField(k, v)
}
doc.Version = op.Version
return coll.Insert(doc)
case "update":
return coll.Update(op.DocumentID, op.Data)
case "delete":
return coll.Delete(op.DocumentID)
}
return nil
}
// recoverFromWAL восстанавливает состояние из WAL после сбоя
func (tm *TransactionManager) recoverFromWAL() {
if tm.wal == nil {
return
}
records, err := tm.wal.ReadAll()
if err != nil {
if tm.logger != nil {
tm.logger.Error(fmt.Sprintf("Failed to read WAL: %v", err))
}
return
}
for _, record := range records {
if record.State == TransactionCommitted {
// Повторно применяем закоммиченные операции
for _, op := range record.Operations {
if err := applyOperation(op); err != nil {
if tm.logger != nil {
tm.logger.Error(fmt.Sprintf("Failed to replay operation: %v", err))
}
}
}
}
// Транзакции в состоянии Active или Aborted игнорируем
}
if tm.logger != nil {
tm.logger.Info(fmt.Sprintf("Recovered %d transactions from WAL", len(records)))
}
}
// snapshotLoop периодически создаёт снапшоты
func (tm *TransactionManager) snapshotLoop() {
ticker := time.NewTicker(time.Duration(tm.snapshotInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
tm.createSnapshot()
}
}
// createSnapshot создаёт снапшот текущего состояния
func (tm *TransactionManager) createSnapshot() {
now := time.Now().Unix()
if now-tm.lastSnapshot < tm.snapshotInterval {
return
}
snapshotPath := fmt.Sprintf("%s.snapshot.%d", tm.walPath, now)
if err := GetGlobalStorage().Backup(snapshotPath); err != nil {
if tm.logger != nil {
tm.logger.Error(fmt.Sprintf("Failed to create snapshot: %v", err))
}
return
}
tm.lastSnapshot = now
if tm.logger != nil {
tm.logger.Info(fmt.Sprintf("Snapshot created: %s", snapshotPath))
}
// Удаляем старые снапшоты
go tm.cleanOldSnapshots()
}
// cleanOldSnapshots удаляет старые снапшоты
func (tm *TransactionManager) cleanOldSnapshots() {
// Реализация удаления старых снапшотов
}
// GetGlobalStorage возвращает глобальное хранилище (должно быть установлено при инициализации)
var globalStorage *Storage
// SetGlobalStorage устанавливает глобальное хранилище
func SetGlobalStorage(s *Storage) {
globalStorage = s
}
// GetGlobalStorage возвращает глобальное хранилище
func GetGlobalStorage() *Storage {
return globalStorage
}
// AuditLog записывает событие в аудит
func AuditLog(operation, dataType, name string, details map[string]interface{}) {
LogAudit(operation, dataType, name, details)
}
// MVCCSnapshot создаёт снапшот текущего состояния для MVCC
func MVCCSnapshot() uint64 {
return uint64(time.Now().UnixNano())
}
// CreateDocumentVersion создаёт новую версию документа для MVCC
func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion {
return &DocumentVersion{
Document: doc.Clone(),
Timestamp: time.Now().UnixMilli(),
TxID: txID,
}
}