Files
futriix/internal/storage/transaction.go

723 lines
22 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/storage/transaction.go
// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control)
// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование.
package storage
import (
"encoding/binary"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"futriis/internal/serializer"
)
// TransactionID представляет уникальный идентификатор транзакции
type TransactionID uint64
// TransactionState представляет состояние транзакции
type TransactionState int32
const (
TransactionActive TransactionState = iota
TransactionCommitted
TransactionAborted
)
// TransactionRecord представляет запись в WAL
type TransactionRecord struct {
ID TransactionID `msgpack:"id"`
State TransactionState `msgpack:"state"`
Timestamp int64 `msgpack:"timestamp"`
Operations []Operation `msgpack:"operations"`
}
// Operation представляет одну операцию в транзакции
type Operation struct {
Type string `msgpack:"type"` // "insert", "update", "delete"
Database string `msgpack:"database"`
Collection string `msgpack:"collection"`
DocumentID string `msgpack:"document_id"`
Data map[string]interface{} `msgpack:"data"`
Version uint64 `msgpack:"version"`
OldData map[string]interface{} `msgpack:"old_data"` // для отката
}
// DocumentVersion представляет версию документа для MVCC
type DocumentVersion struct {
Document *Document `msgpack:"document"`
Timestamp int64 `msgpack:"timestamp"`
TxID TransactionID `msgpack:"tx_id"`
}
// TransactionInfo представляет информацию о транзакции для API
type TransactionInfo struct {
ID string `json:"id"`
Status string `json:"status"`
StartTime int64 `json:"start_time"`
OperationCount int `json:"operation_count"`
Operations []OperationInfo `json:"operations,omitempty"`
}
// OperationInfo представляет информацию об операции для API
type OperationInfo struct {
Type string `json:"type"`
Database string `json:"database"`
Collection string `json:"collection"`
DocumentID string `json:"document_id"`
}
// LoggerInterface определяет интерфейс для логирования
type LoggerInterface interface {
Debug(msg string)
Info(msg string)
Error(msg string)
Warn(msg string)
}
// TransactionManager управляет транзакциями
type TransactionManager struct {
activeTransactions sync.Map // map[TransactionID]*Transaction
nextTxID atomic.Uint64
wal *WriteAheadLog
logger LoggerInterface
mu sync.RWMutex
walPath string
snapshotInterval int64 // интервал создания снапшотов в секундах
lastSnapshot int64
}
// Transaction представляет одну транзакцию
type Transaction struct {
ID TransactionID
State atomic.Int32
Operations []Operation
StartTime int64
mu sync.RWMutex
}
// WriteAheadLog реализует журнал предзаписи
type WriteAheadLog struct {
file *os.File
writeChan chan []byte
done chan struct{}
mu sync.RWMutex
syncInterval time.Duration
lastSync time.Time
}
var (
globalTxManager *TransactionManager
txManagerOnce sync.Once
currentTx atomic.Value // *Transaction
)
// InitTransactionManager инициализирует глобальный менеджер транзакций
func InitTransactionManager(walPath string) error {
var err error
txManagerOnce.Do(func() {
globalTxManager = &TransactionManager{
nextTxID: atomic.Uint64{},
walPath: walPath,
snapshotInterval: 300, // 5 минут
lastSnapshot: time.Now().Unix(),
}
globalTxManager.nextTxID.Store(1)
err = globalTxManager.initWAL(walPath)
if err == nil {
// Восстанавливаем состояние из WAL
go globalTxManager.recoverFromWAL()
// Запускаем периодическое создание снапшотов
go globalTxManager.snapshotLoop()
}
})
return err
}
// SetTransactionLogger устанавливает логгер для транзакций
func SetTransactionLogger(logger LoggerInterface) {
if globalTxManager != nil {
globalTxManager.logger = logger
}
}
// initWAL инициализирует Write-Ahead Log
func (tm *TransactionManager) initWAL(walPath string) error {
wal, err := NewWriteAheadLog(walPath)
if err != nil {
return err
}
tm.wal = wal
return nil
}
// NewWriteAheadLog создаёт новый WAL
func NewWriteAheadLog(path string) (*WriteAheadLog, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
wal := &WriteAheadLog{
file: file,
writeChan: make(chan []byte, 10000),
done: make(chan struct{}),
syncInterval: 5 * time.Second,
lastSync: time.Now(),
}
go wal.writerLoop()
return wal, nil
}
// writerLoop асинхронно записывает данные в WAL
func (wal *WriteAheadLog) writerLoop() {
batch := make([][]byte, 0, 100)
ticker := time.NewTicker(wal.syncInterval)
defer ticker.Stop()
for {
select {
case data, ok := <-wal.writeChan:
if !ok {
if len(batch) > 0 {
wal.flushBatch(batch)
}
close(wal.done)
return
}
batch = append(batch, data)
if len(batch) >= 100 {
wal.flushBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
wal.flushBatch(batch)
batch = batch[:0]
}
wal.sync()
}
}
}
// flushBatch записывает пакет данных в файл
func (wal *WriteAheadLog) flushBatch(batch [][]byte) {
wal.mu.Lock()
defer wal.mu.Unlock()
for _, data := range batch {
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
if _, err := wal.file.Write(lenBuf); err != nil {
continue
}
if _, err := wal.file.Write(data); err != nil {
continue
}
}
}
// sync синхронизирует файл с диском
func (wal *WriteAheadLog) sync() {
wal.mu.Lock()
defer wal.mu.Unlock()
if time.Since(wal.lastSync) >= wal.syncInterval {
wal.file.Sync()
wal.lastSync = time.Now()
}
}
// Write записывает запись в WAL
func (wal *WriteAheadLog) Write(record *TransactionRecord) error {
data, err := serializer.Marshal(record)
if err != nil {
return err
}
select {
case wal.writeChan <- data:
return nil
case <-time.After(5 * time.Second):
return fmt.Errorf("WAL write timeout")
}
}
// Close закрывает WAL
func (wal *WriteAheadLog) Close() error {
close(wal.writeChan)
<-wal.done
return wal.file.Close()
}
// ReadAll читает все записи из WAL
func (wal *WriteAheadLog) ReadAll() ([]*TransactionRecord, error) {
wal.mu.RLock()
defer wal.mu.RUnlock()
if _, err := wal.file.Seek(0, 0); err != nil {
return nil, err
}
records := make([]*TransactionRecord, 0)
buf := make([]byte, 4)
for {
_, err := wal.file.Read(buf)
if err != nil {
break
}
recordLen := binary.BigEndian.Uint32(buf)
recordData := make([]byte, recordLen)
_, err = wal.file.Read(recordData)
if err != nil {
break
}
var record TransactionRecord
if err := serializer.Unmarshal(recordData, &record); err != nil {
continue
}
records = append(records, &record)
}
return records, nil
}
// BeginTransaction начинает новую транзакцию
func BeginTransaction() *Transaction {
if globalTxManager == nil {
InitTransactionManager("futriis.wal")
}
tx := &Transaction{
ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1),
StartTime: time.Now().UnixMilli(),
Operations: make([]Operation, 0),
}
tx.State.Store(int32(TransactionActive))
globalTxManager.activeTransactions.Store(tx.ID, tx)
currentTx.Store(tx)
// Записываем начало транзакции в WAL
record := &TransactionRecord{
ID: tx.ID,
State: TransactionActive,
Timestamp: tx.StartTime,
Operations: tx.Operations,
}
if globalTxManager.wal != nil {
globalTxManager.wal.Write(record)
}
if globalTxManager.logger != nil {
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started", tx.ID))
}
AuditLog("START", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
"start_time": tx.StartTime,
})
return tx
}
// BeginTransactionOnCollection начинает транзакцию для конкретной коллекции
func BeginTransactionOnCollection(coll *Collection) error {
if globalTxManager == nil {
if err := InitTransactionManager("futriis.wal"); err != nil {
return err
}
}
tx := BeginTransaction()
if tx == nil {
return fmt.Errorf("failed to create transaction")
}
if globalTxManager.logger != nil {
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started on collection %s.%s", tx.ID, coll.dbName, coll.name))
}
return nil
}
// AddToTransaction добавляет операцию в текущую транзакцию
func AddToTransaction(coll *Collection, opType string, docID string, newData map[string]interface{}, oldData map[string]interface{}, version uint64) error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
if TransactionState(tx.State.Load()) != TransactionActive {
return fmt.Errorf("transaction is not active")
}
op := Operation{
Type: opType,
Database: coll.dbName,
Collection: coll.name,
DocumentID: docID,
Data: newData,
OldData: oldData,
Version: version,
}
tx.mu.Lock()
tx.Operations = append(tx.Operations, op)
tx.mu.Unlock()
if globalTxManager.logger != nil {
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d: added %s operation on %s", tx.ID, opType, docID))
}
return nil
}
// CommitCurrentTransaction коммитит текущую транзакцию
func CommitCurrentTransaction() error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
if TransactionState(tx.State.Load()) != TransactionActive {
return fmt.Errorf("transaction is not active")
}
// Применяем все операции атомарно
for _, op := range tx.Operations {
if err := applyOperation(op); err != nil {
// Откатываем при ошибке
AbortCurrentTransaction()
return fmt.Errorf("transaction commit failed at operation %s: %v", op.Type, err)
}
}
tx.State.Store(int32(TransactionCommitted))
// Записываем коммит в WAL
record := &TransactionRecord{
ID: tx.ID,
State: TransactionCommitted,
Timestamp: time.Now().UnixMilli(),
Operations: tx.Operations,
}
if globalTxManager.wal != nil {
globalTxManager.wal.Write(record)
}
if globalTxManager.logger != nil {
globalTxManager.logger.Info(fmt.Sprintf("Transaction %d committed with %d operations", tx.ID, len(tx.Operations)))
}
AuditLog("COMMIT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
"operations": len(tx.Operations),
})
// Очищаем текущую транзакцию
currentTx.Store(nil)
globalTxManager.activeTransactions.Delete(tx.ID)
return nil
}
// AbortCurrentTransaction откатывает текущую транзакцию
func AbortCurrentTransaction() error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
tx.State.Store(int32(TransactionAborted))
// Записываем откат в WAL
record := &TransactionRecord{
ID: tx.ID,
State: TransactionAborted,
Timestamp: time.Now().UnixMilli(),
Operations: tx.Operations,
}
if globalTxManager.wal != nil {
globalTxManager.wal.Write(record)
}
if globalTxManager.logger != nil {
globalTxManager.logger.Info(fmt.Sprintf("Transaction %d aborted with %d operations", tx.ID, len(tx.Operations)))
}
AuditLog("ABORT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
"operations": len(tx.Operations),
})
// Очищаем текущую транзакцию
currentTx.Store(nil)
globalTxManager.activeTransactions.Delete(tx.ID)
return nil
}
// HasActiveTransaction проверяет наличие активной транзакции
func HasActiveTransaction() bool {
return currentTx.Load() != nil
}
// GetCurrentTransactionID возвращает ID текущей транзакции
func GetCurrentTransactionID() string {
txVal := currentTx.Load()
if txVal == nil {
return ""
}
tx := txVal.(*Transaction)
return fmt.Sprintf("%d", tx.ID)
}
// GetActiveTransactions возвращает список активных транзакций для API
func GetActiveTransactions() []TransactionInfo {
if globalTxManager == nil {
return []TransactionInfo{}
}
transactions := make([]TransactionInfo, 0)
globalTxManager.activeTransactions.Range(func(key, value interface{}) bool {
tx := value.(*Transaction)
status := "active"
if TransactionState(tx.State.Load()) == TransactionCommitted {
status = "committed"
} else if TransactionState(tx.State.Load()) == TransactionAborted {
status = "aborted"
}
tx.mu.RLock()
opCount := len(tx.Operations)
operations := make([]OperationInfo, 0, opCount)
for _, op := range tx.Operations {
operations = append(operations, OperationInfo{
Type: op.Type,
Database: op.Database,
Collection: op.Collection,
DocumentID: op.DocumentID,
})
}
tx.mu.RUnlock()
transactions = append(transactions, TransactionInfo{
ID: fmt.Sprintf("%d", tx.ID),
Status: status,
StartTime: tx.StartTime,
OperationCount: opCount,
Operations: operations,
})
return true
})
return transactions
}
// GetTransactionByID возвращает транзакцию по ID
func GetTransactionByID(id string) (*Transaction, error) {
if globalTxManager == nil {
return nil, fmt.Errorf("transaction manager not initialized")
}
var txID TransactionID
fmt.Sscanf(id, "%d", &txID)
if val, ok := globalTxManager.activeTransactions.Load(txID); ok {
return val.(*Transaction), nil
}
return nil, fmt.Errorf("transaction not found")
}
// FindInTransaction ищет документ в контексте транзакции
func FindInTransaction(coll *Collection, id string) (*Document, error) {
txVal := currentTx.Load()
if txVal == nil {
return coll.Find(id)
}
tx := txVal.(*Transaction)
tx.mu.RLock()
defer tx.mu.RUnlock()
// Ищем в операциях транзакции в обратном порядке
for i := len(tx.Operations) - 1; i >= 0; i-- {
op := tx.Operations[i]
if op.DocumentID == id {
if op.Type == "delete" {
return nil, fmt.Errorf("document deleted in transaction")
}
if op.Type == "insert" || op.Type == "update" {
doc := NewDocumentWithID(op.DocumentID)
for k, v := range op.Data {
doc.SetField(k, v)
}
doc.Version = op.Version
return doc, nil
}
}
}
return coll.Find(id)
}
// applyOperation применяет операцию к хранилищу
func applyOperation(op Operation) error {
db, err := GetGlobalStorage().GetDatabase(op.Database)
if err != nil {
return fmt.Errorf("database not found: %s", op.Database)
}
coll, err := db.GetCollection(op.Collection)
if err != nil {
return fmt.Errorf("collection not found: %s", op.Collection)
}
switch op.Type {
case "insert":
doc := NewDocumentWithID(op.DocumentID)
for k, v := range op.Data {
doc.SetField(k, v)
}
doc.Version = op.Version
return coll.Insert(doc)
case "update":
return coll.Update(op.DocumentID, op.Data)
case "delete":
return coll.Delete(op.DocumentID)
}
return nil
}
// recoverFromWAL восстанавливает состояние из WAL после сбоя
func (tm *TransactionManager) recoverFromWAL() {
if tm.wal == nil {
return
}
records, err := tm.wal.ReadAll()
if err != nil {
if tm.logger != nil {
tm.logger.Error(fmt.Sprintf("Failed to read WAL: %v", err))
}
return
}
for _, record := range records {
if record.State == TransactionCommitted {
// Повторно применяем закоммиченные операции
for _, op := range record.Operations {
if err := applyOperation(op); err != nil {
if tm.logger != nil {
tm.logger.Error(fmt.Sprintf("Failed to replay operation: %v", err))
}
}
}
}
// Транзакции в состоянии Active или Aborted игнорируем
}
if tm.logger != nil {
tm.logger.Info(fmt.Sprintf("Recovered %d transactions from WAL", len(records)))
}
}
// snapshotLoop периодически создаёт снапшоты
func (tm *TransactionManager) snapshotLoop() {
ticker := time.NewTicker(time.Duration(tm.snapshotInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
tm.createSnapshot()
}
}
// createSnapshot создаёт снапшот текущего состояния
func (tm *TransactionManager) createSnapshot() {
now := time.Now().Unix()
if now-tm.lastSnapshot < tm.snapshotInterval {
return
}
snapshotPath := fmt.Sprintf("%s.snapshot.%d", tm.walPath, now)
if err := GetGlobalStorage().Backup(snapshotPath); err != nil {
if tm.logger != nil {
tm.logger.Error(fmt.Sprintf("Failed to create snapshot: %v", err))
}
return
}
tm.lastSnapshot = now
if tm.logger != nil {
tm.logger.Info(fmt.Sprintf("Snapshot created: %s", snapshotPath))
}
// Удаляем старые снапшоты
go tm.cleanOldSnapshots()
}
// cleanOldSnapshots удаляет старые снапшоты
func (tm *TransactionManager) cleanOldSnapshots() {
// Реализация удаления старых снапшотов
}
// GetGlobalStorage возвращает глобальное хранилище (должно быть установлено при инициализации)
var globalStorage *Storage
// SetGlobalStorage устанавливает глобальное хранилище
func SetGlobalStorage(s *Storage) {
globalStorage = s
}
// GetGlobalStorage возвращает глобальное хранилище
func GetGlobalStorage() *Storage {
return globalStorage
}
// AuditLog записывает событие в аудит
func AuditLog(operation, dataType, name string, details map[string]interface{}) {
LogAudit(operation, dataType, name, details)
}
// MVCCSnapshot создаёт снапшот текущего состояния для MVCC
func MVCCSnapshot() uint64 {
return uint64(time.Now().UnixNano())
}
// CreateDocumentVersion создаёт новую версию документа для MVCC
func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion {
return &DocumentVersion{
Document: doc.Clone(),
Timestamp: time.Now().UnixMilli(),
TxID: txID,
}
}