Delete internal/storage/transaction.go
This commit is contained in:
@@ -1,461 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2026 Safronov Grigorii
|
|
||||||
*
|
|
||||||
* Licensed under the CDDL, Version 1.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
*
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
* https://opensource.org/licenses/CDDL-1.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
// Файл: internal/storage/transaction.go
|
|
||||||
// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control)
|
|
||||||
// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование.
|
|
||||||
|
|
||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/binary"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"futriis/internal/serializer"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TransactionID представляет уникальный идентификатор транзакции
|
|
||||||
type TransactionID uint64
|
|
||||||
|
|
||||||
// TransactionState представляет состояние транзакции
|
|
||||||
type TransactionState int32
|
|
||||||
|
|
||||||
const (
|
|
||||||
TransactionActive TransactionState = iota
|
|
||||||
TransactionCommitted
|
|
||||||
TransactionAborted
|
|
||||||
)
|
|
||||||
|
|
||||||
// TransactionRecord представляет запись в WAL
|
|
||||||
type TransactionRecord struct {
|
|
||||||
ID TransactionID `msgpack:"id"`
|
|
||||||
State TransactionState `msgpack:"state"`
|
|
||||||
Timestamp int64 `msgpack:"timestamp"`
|
|
||||||
Operations []Operation `msgpack:"operations"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Operation представляет одну операцию в транзакции
|
|
||||||
type Operation struct {
|
|
||||||
Type string `msgpack:"type"` // "insert", "update", "delete"
|
|
||||||
Database string `msgpack:"database"`
|
|
||||||
Collection string `msgpack:"collection"`
|
|
||||||
DocumentID string `msgpack:"document_id"`
|
|
||||||
Data map[string]interface{} `msgpack:"data"`
|
|
||||||
Version uint64 `msgpack:"version"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// DocumentVersion представляет версию документа для MVCC
|
|
||||||
type DocumentVersion struct {
|
|
||||||
Document *Document `msgpack:"document"`
|
|
||||||
Timestamp int64 `msgpack:"timestamp"`
|
|
||||||
TxID TransactionID `msgpack:"tx_id"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TransactionInfo представляет информацию о транзакции для API
|
|
||||||
type TransactionInfo struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
Status string `json:"status"`
|
|
||||||
StartTime int64 `json:"start_time"`
|
|
||||||
OperationCount int `json:"operation_count"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TransactionManager управляет транзакциями
|
|
||||||
type TransactionManager struct {
|
|
||||||
activeTransactions sync.Map // map[TransactionID]*Transaction
|
|
||||||
nextTxID atomic.Uint64
|
|
||||||
wal *WriteAheadLog
|
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// Transaction представляет одну транзакцию
|
|
||||||
type Transaction struct {
|
|
||||||
ID TransactionID
|
|
||||||
State atomic.Int32
|
|
||||||
Operations []Operation
|
|
||||||
StartTime int64
|
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteAheadLog реализует журнал предзаписи
|
|
||||||
type WriteAheadLog struct {
|
|
||||||
file *os.File
|
|
||||||
writeChan chan []byte
|
|
||||||
done chan struct{}
|
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
globalTxManager *TransactionManager
|
|
||||||
txManagerOnce sync.Once
|
|
||||||
currentTx atomic.Value // *Transaction
|
|
||||||
)
|
|
||||||
|
|
||||||
// InitTransactionManager инициализирует глобальный менеджер транзакций
|
|
||||||
func InitTransactionManager(walPath string) error {
|
|
||||||
var err error
|
|
||||||
txManagerOnce.Do(func() {
|
|
||||||
globalTxManager = &TransactionManager{
|
|
||||||
nextTxID: atomic.Uint64{},
|
|
||||||
}
|
|
||||||
globalTxManager.nextTxID.Store(1)
|
|
||||||
err = globalTxManager.initWAL(walPath)
|
|
||||||
})
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// initWAL инициализирует Write-Ahead Log
|
|
||||||
func (tm *TransactionManager) initWAL(walPath string) error {
|
|
||||||
wal, err := NewWriteAheadLog(walPath)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
tm.wal = wal
|
|
||||||
|
|
||||||
// Восстанавливаем состояние из WAL при запуске
|
|
||||||
go tm.recoverFromWAL()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewWriteAheadLog создаёт новый WAL
|
|
||||||
func NewWriteAheadLog(path string) (*WriteAheadLog, error) {
|
|
||||||
file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
wal := &WriteAheadLog{
|
|
||||||
file: file,
|
|
||||||
writeChan: make(chan []byte, 10000),
|
|
||||||
done: make(chan struct{}),
|
|
||||||
}
|
|
||||||
|
|
||||||
go wal.writerLoop()
|
|
||||||
|
|
||||||
return wal, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// writerLoop асинхронно записывает данные в WAL
|
|
||||||
func (wal *WriteAheadLog) writerLoop() {
|
|
||||||
for data := range wal.writeChan {
|
|
||||||
wal.mu.Lock()
|
|
||||||
// Формат записи: [длина (4 байта)][данные]
|
|
||||||
lenBuf := make([]byte, 4)
|
|
||||||
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
|
|
||||||
|
|
||||||
if _, err := wal.file.Write(lenBuf); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if _, err := wal.file.Write(data); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
wal.file.Sync()
|
|
||||||
wal.mu.Unlock()
|
|
||||||
}
|
|
||||||
close(wal.done)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write записывает запись в WAL
|
|
||||||
func (wal *WriteAheadLog) Write(record *TransactionRecord) error {
|
|
||||||
data, err := serializer.Marshal(record)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case wal.writeChan <- data:
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("WAL buffer full")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close закрывает WAL
|
|
||||||
func (wal *WriteAheadLog) Close() error {
|
|
||||||
close(wal.writeChan)
|
|
||||||
<-wal.done
|
|
||||||
return wal.file.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// BeginTransaction начинает новую транзакцию
|
|
||||||
func BeginTransaction() *Transaction {
|
|
||||||
if globalTxManager == nil {
|
|
||||||
InitTransactionManager("futriis.wal")
|
|
||||||
}
|
|
||||||
|
|
||||||
tx := &Transaction{
|
|
||||||
ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1),
|
|
||||||
StartTime: time.Now().UnixMilli(),
|
|
||||||
Operations: make([]Operation, 0),
|
|
||||||
}
|
|
||||||
tx.State.Store(int32(TransactionActive))
|
|
||||||
|
|
||||||
globalTxManager.activeTransactions.Store(tx.ID, tx)
|
|
||||||
|
|
||||||
// Сохраняем как текущую транзакцию для горутины
|
|
||||||
currentTx.Store(tx)
|
|
||||||
|
|
||||||
// Записываем начало транзакции в WAL
|
|
||||||
record := &TransactionRecord{
|
|
||||||
ID: tx.ID,
|
|
||||||
State: TransactionActive,
|
|
||||||
Timestamp: tx.StartTime,
|
|
||||||
Operations: tx.Operations,
|
|
||||||
}
|
|
||||||
if globalTxManager.wal != nil {
|
|
||||||
globalTxManager.wal.Write(record)
|
|
||||||
}
|
|
||||||
|
|
||||||
return tx
|
|
||||||
}
|
|
||||||
|
|
||||||
// BeginTransactionOnCollection начинает транзакцию для конкретной коллекции
|
|
||||||
func BeginTransactionOnCollection(coll *Collection) error {
|
|
||||||
if globalTxManager == nil {
|
|
||||||
if err := InitTransactionManager("futriis.wal"); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tx := BeginTransaction()
|
|
||||||
if tx == nil {
|
|
||||||
return fmt.Errorf("failed to create transaction")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddToTransaction добавляет операцию в текущую транзакцию
|
|
||||||
func AddToTransaction(coll *Collection, opType string, doc *Document) error {
|
|
||||||
txVal := currentTx.Load()
|
|
||||||
if txVal == nil {
|
|
||||||
return fmt.Errorf("no active transaction")
|
|
||||||
}
|
|
||||||
|
|
||||||
tx := txVal.(*Transaction)
|
|
||||||
if TransactionState(tx.State.Load()) != TransactionActive {
|
|
||||||
return fmt.Errorf("transaction is not active")
|
|
||||||
}
|
|
||||||
|
|
||||||
op := Operation{
|
|
||||||
Type: opType,
|
|
||||||
Database: coll.dbName,
|
|
||||||
Collection: coll.name,
|
|
||||||
DocumentID: doc.ID,
|
|
||||||
Data: doc.GetFields(),
|
|
||||||
Version: doc.Version,
|
|
||||||
}
|
|
||||||
|
|
||||||
tx.mu.Lock()
|
|
||||||
tx.Operations = append(tx.Operations, op)
|
|
||||||
tx.mu.Unlock()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CommitCurrentTransaction коммитит текущую транзакцию
|
|
||||||
func CommitCurrentTransaction() error {
|
|
||||||
txVal := currentTx.Load()
|
|
||||||
if txVal == nil {
|
|
||||||
return fmt.Errorf("no active transaction")
|
|
||||||
}
|
|
||||||
|
|
||||||
tx := txVal.(*Transaction)
|
|
||||||
if TransactionState(tx.State.Load()) != TransactionActive {
|
|
||||||
return fmt.Errorf("transaction is not active")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Применяем все операции атомарно
|
|
||||||
for _, op := range tx.Operations {
|
|
||||||
if err := applyOperation(op); err != nil {
|
|
||||||
// Откатываем при ошибке
|
|
||||||
AbortCurrentTransaction()
|
|
||||||
return fmt.Errorf("transaction commit failed: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tx.State.Store(int32(TransactionCommitted))
|
|
||||||
|
|
||||||
// Записываем коммит в WAL
|
|
||||||
record := &TransactionRecord{
|
|
||||||
ID: tx.ID,
|
|
||||||
State: TransactionCommitted,
|
|
||||||
Timestamp: time.Now().UnixMilli(),
|
|
||||||
Operations: tx.Operations,
|
|
||||||
}
|
|
||||||
if globalTxManager.wal != nil {
|
|
||||||
globalTxManager.wal.Write(record)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Очищаем текущую транзакцию
|
|
||||||
currentTx.Store(nil)
|
|
||||||
globalTxManager.activeTransactions.Delete(tx.ID)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AbortCurrentTransaction откатывает текущую транзакцию
|
|
||||||
func AbortCurrentTransaction() error {
|
|
||||||
txVal := currentTx.Load()
|
|
||||||
if txVal == nil {
|
|
||||||
return fmt.Errorf("no active transaction")
|
|
||||||
}
|
|
||||||
|
|
||||||
tx := txVal.(*Transaction)
|
|
||||||
tx.State.Store(int32(TransactionAborted))
|
|
||||||
|
|
||||||
// Записываем откат в WAL
|
|
||||||
record := &TransactionRecord{
|
|
||||||
ID: tx.ID,
|
|
||||||
State: TransactionAborted,
|
|
||||||
Timestamp: time.Now().UnixMilli(),
|
|
||||||
Operations: tx.Operations,
|
|
||||||
}
|
|
||||||
if globalTxManager.wal != nil {
|
|
||||||
globalTxManager.wal.Write(record)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Очищаем текущую транзакцию
|
|
||||||
currentTx.Store(nil)
|
|
||||||
globalTxManager.activeTransactions.Delete(tx.ID)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// HasActiveTransaction проверяет наличие активной транзакции
|
|
||||||
func HasActiveTransaction() bool {
|
|
||||||
return currentTx.Load() != nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetActiveTransactions возвращает список активных транзакций для API
|
|
||||||
func GetActiveTransactions() []TransactionInfo {
|
|
||||||
if globalTxManager == nil {
|
|
||||||
return []TransactionInfo{}
|
|
||||||
}
|
|
||||||
|
|
||||||
transactions := make([]TransactionInfo, 0)
|
|
||||||
|
|
||||||
globalTxManager.activeTransactions.Range(func(key, value interface{}) bool {
|
|
||||||
tx := value.(*Transaction)
|
|
||||||
status := "active"
|
|
||||||
if TransactionState(tx.State.Load()) == TransactionCommitted {
|
|
||||||
status = "committed"
|
|
||||||
} else if TransactionState(tx.State.Load()) == TransactionAborted {
|
|
||||||
status = "aborted"
|
|
||||||
}
|
|
||||||
|
|
||||||
tx.mu.RLock()
|
|
||||||
opCount := len(tx.Operations)
|
|
||||||
tx.mu.RUnlock()
|
|
||||||
|
|
||||||
transactions = append(transactions, TransactionInfo{
|
|
||||||
ID: fmt.Sprintf("%d", tx.ID),
|
|
||||||
Status: status,
|
|
||||||
StartTime: tx.StartTime,
|
|
||||||
OperationCount: opCount,
|
|
||||||
})
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
return transactions
|
|
||||||
}
|
|
||||||
|
|
||||||
// FindInTransaction ищет документ в контексте транзакции
|
|
||||||
func FindInTransaction(coll *Collection, id string) (*Document, error) {
|
|
||||||
txVal := currentTx.Load()
|
|
||||||
if txVal == nil {
|
|
||||||
return coll.Find(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
tx := txVal.(*Transaction)
|
|
||||||
|
|
||||||
// Сначала ищем в операциях транзакции
|
|
||||||
tx.mu.RLock()
|
|
||||||
defer tx.mu.RUnlock()
|
|
||||||
|
|
||||||
for i := len(tx.Operations) - 1; i >= 0; i-- {
|
|
||||||
op := tx.Operations[i]
|
|
||||||
if op.DocumentID == id {
|
|
||||||
if op.Type == "delete" {
|
|
||||||
return nil, fmt.Errorf("key not found")
|
|
||||||
}
|
|
||||||
// Создаём документ из данных операции
|
|
||||||
doc := NewDocumentWithID(op.DocumentID)
|
|
||||||
for k, v := range op.Data {
|
|
||||||
doc.SetField(k, v)
|
|
||||||
}
|
|
||||||
doc.Version = op.Version
|
|
||||||
return doc, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ищем в основном хранилище
|
|
||||||
return coll.Find(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// applyOperation применяет операцию к хранилищу
|
|
||||||
func applyOperation(op Operation) error {
|
|
||||||
// В реальной реализации здесь будет применение операции к соответствующей коллекции
|
|
||||||
// С использованием MVCC для версионирования
|
|
||||||
|
|
||||||
switch op.Type {
|
|
||||||
case "insert":
|
|
||||||
// Проверяем версию документа (MVCC)
|
|
||||||
doc := NewDocumentWithID(op.DocumentID)
|
|
||||||
for k, v := range op.Data {
|
|
||||||
doc.SetField(k, v)
|
|
||||||
}
|
|
||||||
// Здесь должна быть вставка в коллекцию
|
|
||||||
return nil
|
|
||||||
case "update":
|
|
||||||
// Обновление с проверкой версии
|
|
||||||
return nil
|
|
||||||
case "delete":
|
|
||||||
// Удаление
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// recoverFromWAL восстанавливает состояние из WAL после сбоя
|
|
||||||
func (tm *TransactionManager) recoverFromWAL() {
|
|
||||||
// В реальной реализации здесь будет чтение WAL и восстановление
|
|
||||||
// незавершённых транзакций
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetTransaction возвращает транзакцию по ID
|
|
||||||
func GetTransaction(id TransactionID) (*Transaction, bool) {
|
|
||||||
if globalTxManager == nil {
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
if val, ok := globalTxManager.activeTransactions.Load(id); ok {
|
|
||||||
return val.(*Transaction), true
|
|
||||||
}
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
|
|
||||||
// MVCCSnapshot создаёт снапшот текущего состояния для MVCC
|
|
||||||
func MVCCSnapshot() uint64 {
|
|
||||||
return uint64(time.Now().UnixNano())
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateDocumentVersion создаёт новую версию документа для MVCC
|
|
||||||
func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion {
|
|
||||||
return &DocumentVersion{
|
|
||||||
Document: doc.Clone(),
|
|
||||||
Timestamp: time.Now().UnixMilli(),
|
|
||||||
TxID: txID,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user