Files
futriis/internal/storage/transaction.go
2026-04-08 21:43:35 +03:00

383 lines
12 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Файл: internal/storage/transaction.go
// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control)
// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование.
package storage
import (
"encoding/binary"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"futriis/internal/serializer"
)
// TransactionID представляет уникальный идентификатор транзакции
type TransactionID uint64
// TransactionState представляет состояние транзакции
type TransactionState int32
const (
TransactionActive TransactionState = iota
TransactionCommitted
TransactionAborted
)
// TransactionRecord представляет запись в WAL
type TransactionRecord struct {
ID TransactionID `msgpack:"id"`
State TransactionState `msgpack:"state"`
Timestamp int64 `msgpack:"timestamp"`
Operations []Operation `msgpack:"operations"`
}
// Operation представляет одну операцию в транзакции
type Operation struct {
Type string `msgpack:"type"` // "insert", "update", "delete"
Database string `msgpack:"database"`
Collection string `msgpack:"collection"`
DocumentID string `msgpack:"document_id"`
Data map[string]interface{} `msgpack:"data"`
Version uint64 `msgpack:"version"`
}
// DocumentVersion представляет версию документа для MVCC
type DocumentVersion struct {
Document *Document `msgpack:"document"`
Timestamp int64 `msgpack:"timestamp"`
TxID TransactionID `msgpack:"tx_id"`
}
// TransactionManager управляет транзакциями
type TransactionManager struct {
activeTransactions sync.Map // map[TransactionID]*Transaction
nextTxID atomic.Uint64
wal *WriteAheadLog
mu sync.RWMutex
}
// Transaction представляет одну транзакцию
type Transaction struct {
ID TransactionID
State atomic.Int32
Operations []Operation
StartTime int64
mu sync.RWMutex
}
// WriteAheadLog реализует журнал предзаписи
type WriteAheadLog struct {
file *os.File
writeChan chan []byte
done chan struct{}
mu sync.RWMutex
}
var (
globalTxManager *TransactionManager
txManagerOnce sync.Once
currentTx atomic.Value // *Transaction
)
// InitTransactionManager инициализирует глобальный менеджер транзакций
func InitTransactionManager(walPath string) error {
var err error
txManagerOnce.Do(func() {
globalTxManager = &TransactionManager{
nextTxID: atomic.Uint64{},
}
globalTxManager.nextTxID.Store(1)
err = globalTxManager.initWAL(walPath)
})
return err
}
// initWAL инициализирует Write-Ahead Log
func (tm *TransactionManager) initWAL(walPath string) error {
wal, err := NewWriteAheadLog(walPath)
if err != nil {
return err
}
tm.wal = wal
// Восстанавливаем состояние из WAL при запуске
go tm.recoverFromWAL()
return nil
}
// NewWriteAheadLog создаёт новый WAL
func NewWriteAheadLog(path string) (*WriteAheadLog, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
wal := &WriteAheadLog{
file: file,
writeChan: make(chan []byte, 10000),
done: make(chan struct{}),
}
go wal.writerLoop()
return wal, nil
}
// writerLoop асинхронно записывает данные в WAL
func (wal *WriteAheadLog) writerLoop() {
for data := range wal.writeChan {
wal.mu.Lock()
// Формат записи: [длина (4 байта)][данные]
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
if _, err := wal.file.Write(lenBuf); err != nil {
continue
}
if _, err := wal.file.Write(data); err != nil {
continue
}
wal.file.Sync()
wal.mu.Unlock()
}
close(wal.done)
}
// Write записывает запись в WAL
func (wal *WriteAheadLog) Write(record *TransactionRecord) error {
data, err := serializer.Marshal(record)
if err != nil {
return err
}
select {
case wal.writeChan <- data:
return nil
default:
return fmt.Errorf("WAL buffer full")
}
}
// Close закрывает WAL
func (wal *WriteAheadLog) Close() error {
close(wal.writeChan)
<-wal.done
return wal.file.Close()
}
// BeginTransaction начинает новую транзакцию
func BeginTransaction() *Transaction {
if globalTxManager == nil {
InitTransactionManager("futriis.wal")
}
tx := &Transaction{
ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1),
StartTime: time.Now().UnixMilli(),
Operations: make([]Operation, 0),
}
tx.State.Store(int32(TransactionActive))
globalTxManager.activeTransactions.Store(tx.ID, tx)
// Сохраняем как текущую транзакцию для горутины
currentTx.Store(tx)
// Записываем начало транзакции в WAL
record := &TransactionRecord{
ID: tx.ID,
State: TransactionActive,
Timestamp: tx.StartTime,
Operations: tx.Operations,
}
globalTxManager.wal.Write(record)
return tx
}
// AddToTransaction добавляет операцию в текущую транзакцию
func AddToTransaction(coll *Collection, opType string, doc *Document) error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
if TransactionState(tx.State.Load()) != TransactionActive {
return fmt.Errorf("transaction is not active")
}
op := Operation{
Type: opType,
Database: coll.Name(), // В реальной реализации нужно передавать имя БД
Collection: coll.Name(),
DocumentID: doc.ID,
Data: doc.GetFields(),
Version: doc.Version,
}
tx.mu.Lock()
tx.Operations = append(tx.Operations, op)
tx.mu.Unlock()
return nil
}
// CommitCurrentTransaction коммитит текущую транзакцию
func CommitCurrentTransaction() error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
if TransactionState(tx.State.Load()) != TransactionActive {
return fmt.Errorf("transaction is not active")
}
// Применяем все операции атомарно
for _, op := range tx.Operations {
if err := applyOperation(op); err != nil {
// Откатываем при ошибке
AbortCurrentTransaction()
return fmt.Errorf("transaction commit failed: %v", err)
}
}
tx.State.Store(int32(TransactionCommitted))
// Записываем коммит в WAL
record := &TransactionRecord{
ID: tx.ID,
State: TransactionCommitted,
Timestamp: time.Now().UnixMilli(),
Operations: tx.Operations,
}
globalTxManager.wal.Write(record)
// Очищаем текущую транзакцию
currentTx.Store(nil)
globalTxManager.activeTransactions.Delete(tx.ID)
return nil
}
// AbortCurrentTransaction откатывает текущую транзакцию
func AbortCurrentTransaction() error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
tx.State.Store(int32(TransactionAborted))
// Записываем откат в WAL
record := &TransactionRecord{
ID: tx.ID,
State: TransactionAborted,
Timestamp: time.Now().UnixMilli(),
Operations: tx.Operations,
}
globalTxManager.wal.Write(record)
// Очищаем текущую транзакцию
currentTx.Store(nil)
globalTxManager.activeTransactions.Delete(tx.ID)
return nil
}
// HasActiveTransaction проверяет наличие активной транзакции
func HasActiveTransaction() bool {
return currentTx.Load() != nil
}
// FindInTransaction ищет документ в контексте транзакции
func FindInTransaction(coll *Collection, id string) (*Document, error) {
txVal := currentTx.Load()
if txVal == nil {
return coll.Find(id)
}
tx := txVal.(*Transaction)
// Сначала ищем в операциях транзакции
for i := len(tx.Operations) - 1; i >= 0; i-- {
op := tx.Operations[i]
if op.DocumentID == id {
if op.Type == "delete" {
return nil, fmt.Errorf("key not found")
}
// Создаём документ из данных операции
doc := NewDocumentWithID(op.DocumentID)
for k, v := range op.Data {
doc.SetField(k, v)
}
doc.Version = op.Version
return doc, nil
}
}
// Ищем в основном хранилище
return coll.Find(id)
}
// applyOperation применяет операцию к хранилищу
func applyOperation(op Operation) error {
// В реальной реализации здесь будет применение операции к соответствующей коллекции
// С использованием MVCC для версионирования
switch op.Type {
case "insert":
// Проверяем версию документа (MVCC)
doc := NewDocumentWithID(op.DocumentID)
for k, v := range op.Data {
doc.SetField(k, v)
}
// Здесь должна быть вставка в коллекцию
return nil
case "update":
// Обновление с проверкой версии
return nil
case "delete":
// Удаление
return nil
}
return nil
}
// recoverFromWAL восстанавливает состояние из WAL после сбоя
func (tm *TransactionManager) recoverFromWAL() {
// В реальной реализации здесь будет чтение WAL и восстановление
// незавершённых транзакций
}
// GetTransaction возвращает транзакцию по ID
func GetTransaction(id TransactionID) (*Transaction, bool) {
if val, ok := globalTxManager.activeTransactions.Load(id); ok {
return val.(*Transaction), true
}
return nil, false
}
// MVCCSnapshot создаёт снапшот текущего состояния для MVCC
func MVCCSnapshot() uint64 {
return uint64(time.Now().UnixNano())
}
// CreateDocumentVersion создаёт новую версию документа для MVCC
func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion {
return &DocumentVersion{
Document: doc.Clone(),
Timestamp: time.Now().UnixMilli(),
TxID: txID,
}
}