2026-02-27 22:04:04 +03:00

206 lines
6.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// /futriis/internal/transaction/tx.go
// Пакет transaction реализует механизм простых транзакций (не ACID) для операций с данными.
// Предоставляет структуры для хранения состояния транзакций и управления их жизненным циклом.
// Использует wait-free операции через атомарные указатели.
package transaction
import (
"errors"
"sync/atomic"
"time"
"unsafe"
"futriis/pkg/types"
)
// TxState состояние транзакции
type TxState int32
const (
TxStateActive TxState = iota
TxStateCommited
TxStateRolledBack
)
// String возвращает строковое представление состояния
func (s TxState) String() string {
switch s {
case TxStateActive:
return "active"
case TxStateCommited:
return "committed"
case TxStateRolledBack:
return "rolled_back"
default:
return "unknown"
}
}
// Operation представляет операцию в транзакции
type Operation struct {
Type string // create, update, delete
Tapple string
Slice string
Key string
Value interface{}
OldValue interface{}
Timestamp time.Time
}
// Transaction представляет транзакцию с wait-free состоянием
type Transaction struct {
ID string
state int32 // Атомарное состояние
Operations []Operation
CreatedAt time.Time
}
// NewTransaction создаёт новую транзакцию
func NewTransaction(id string) *Transaction {
return &Transaction{
ID: id,
state: int32(TxStateActive),
Operations: make([]Operation, 0),
CreatedAt: time.Now(),
}
}
// GetState атомарно получает состояние транзакции
func (tx *Transaction) GetState() TxState {
return TxState(atomic.LoadInt32(&tx.state))
}
// SetState атомарно устанавливает состояние транзакции
func (tx *Transaction) SetState(state TxState) {
atomic.StoreInt32(&tx.state, int32(state))
}
// AddOperation добавляет операцию в транзакцию
func (tx *Transaction) AddOperation(op Operation) {
tx.Operations = append(tx.Operations, op)
}
// TransactionManager управляет транзакциями с wait-free операциями
type TransactionManager struct {
transactions unsafe.Pointer // Атомарный указатель на map[string]*Transaction
currentTx unsafe.Pointer // Атомарный указатель на текущую транзакцию
}
// NewTransactionManager создаёт новый менеджер транзакций
func NewTransactionManager() *TransactionManager {
transactions := make(map[string]*Transaction)
return &TransactionManager{
transactions: unsafe.Pointer(&transactions),
currentTx: nil,
}
}
// Begin начинает новую транзакцию
func (tm *TransactionManager) Begin() (string, error) {
// Проверяем, есть ли активная транзакция
currentPtr := atomic.LoadPointer(&tm.currentTx)
if currentPtr != nil {
currentTx := (*Transaction)(currentPtr)
if currentTx.GetState() == TxStateActive {
return "", errors.New("транзакция уже активна")
}
}
id := generateTxID()
tx := NewTransaction(id)
// Атомарно устанавливаем текущую транзакцию
atomic.StorePointer(&tm.currentTx, unsafe.Pointer(tx))
// Добавляем в список транзакций
oldPtr := atomic.LoadPointer(&tm.transactions)
oldTxns := *(*map[string]*Transaction)(oldPtr)
newTxns := make(map[string]*Transaction)
for k, v := range oldTxns {
newTxns[k] = v
}
newTxns[id] = tx
atomic.StorePointer(&tm.transactions, unsafe.Pointer(&newTxns))
return id, nil
}
// Commit фиксирует текущую транзакцию
func (tm *TransactionManager) Commit() error {
currentPtr := atomic.LoadPointer(&tm.currentTx)
if currentPtr == nil {
return errors.New("нет активной транзакции")
}
tx := (*Transaction)(currentPtr)
if tx.GetState() != TxStateActive {
return errors.New("транзакция не активна")
}
tx.SetState(TxStateCommited)
atomic.StorePointer(&tm.currentTx, nil)
return nil
}
// Rollback откатывает текущую транзакцию
func (tm *TransactionManager) Rollback() error {
currentPtr := atomic.LoadPointer(&tm.currentTx)
if currentPtr == nil {
return errors.New("нет активной транзакции")
}
tx := (*Transaction)(currentPtr)
if tx.GetState() != TxStateActive {
return errors.New("транзакция не активна")
}
tx.SetState(TxStateRolledBack)
atomic.StorePointer(&tm.currentTx, nil)
return nil
}
// AddOperation добавляет операцию в текущую транзакцию
func (tm *TransactionManager) AddOperation(op Operation) error {
currentPtr := atomic.LoadPointer(&tm.currentTx)
if currentPtr == nil {
return errors.New("нет активной транзакции")
}
tx := (*Transaction)(currentPtr)
if tx.GetState() != TxStateActive {
return errors.New("транзакция не активна")
}
op.Timestamp = time.Now()
tx.AddOperation(op)
return nil
}
// GetCurrentTx возвращает текущую транзакцию (wait-free)
func (tm *TransactionManager) GetCurrentTx() *Transaction {
currentPtr := atomic.LoadPointer(&tm.currentTx)
if currentPtr == nil {
return nil
}
return (*Transaction)(currentPtr)
}
// GetTransaction возвращает транзакцию по ID (wait-free)
func (tm *TransactionManager) GetTransaction(id string) (*Transaction, bool) {
ptr := atomic.LoadPointer(&tm.transactions)
txns := *(*map[string]*Transaction)(ptr)
tx, exists := txns[id]
return tx, exists
}
// generateTxID генерирует ID транзакции
func generateTxID() string {
return "tx-" + types.GenerateID() + "-" + time.Now().Format("20060102150405")
}