206 lines
6.0 KiB
Go
206 lines
6.0 KiB
Go
// /futriis/internal/transaction/tx.go
|
||
// Пакет transaction реализует механизм простых транзакций (не ACID) для операций с данными.
|
||
// Предоставляет структуры для хранения состояния транзакций и управления их жизненным циклом.
|
||
// Использует wait-free операции через атомарные указатели.
|
||
|
||
package transaction
|
||
|
||
import (
|
||
"errors"
|
||
"sync/atomic"
|
||
"time"
|
||
"unsafe"
|
||
|
||
"futriis/pkg/types"
|
||
)
|
||
|
||
// TxState состояние транзакции
|
||
type TxState int32
|
||
|
||
const (
|
||
TxStateActive TxState = iota
|
||
TxStateCommited
|
||
TxStateRolledBack
|
||
)
|
||
|
||
// String возвращает строковое представление состояния
|
||
func (s TxState) String() string {
|
||
switch s {
|
||
case TxStateActive:
|
||
return "active"
|
||
case TxStateCommited:
|
||
return "committed"
|
||
case TxStateRolledBack:
|
||
return "rolled_back"
|
||
default:
|
||
return "unknown"
|
||
}
|
||
}
|
||
|
||
// Operation представляет операцию в транзакции
|
||
type Operation struct {
|
||
Type string // create, update, delete
|
||
Tapple string
|
||
Slice string
|
||
Key string
|
||
Value interface{}
|
||
OldValue interface{}
|
||
Timestamp time.Time
|
||
}
|
||
|
||
// Transaction представляет транзакцию с wait-free состоянием
|
||
type Transaction struct {
|
||
ID string
|
||
state int32 // Атомарное состояние
|
||
Operations []Operation
|
||
CreatedAt time.Time
|
||
}
|
||
|
||
// NewTransaction создаёт новую транзакцию
|
||
func NewTransaction(id string) *Transaction {
|
||
return &Transaction{
|
||
ID: id,
|
||
state: int32(TxStateActive),
|
||
Operations: make([]Operation, 0),
|
||
CreatedAt: time.Now(),
|
||
}
|
||
}
|
||
|
||
// GetState атомарно получает состояние транзакции
|
||
func (tx *Transaction) GetState() TxState {
|
||
return TxState(atomic.LoadInt32(&tx.state))
|
||
}
|
||
|
||
// SetState атомарно устанавливает состояние транзакции
|
||
func (tx *Transaction) SetState(state TxState) {
|
||
atomic.StoreInt32(&tx.state, int32(state))
|
||
}
|
||
|
||
// AddOperation добавляет операцию в транзакцию
|
||
func (tx *Transaction) AddOperation(op Operation) {
|
||
tx.Operations = append(tx.Operations, op)
|
||
}
|
||
|
||
// TransactionManager управляет транзакциями с wait-free операциями
|
||
type TransactionManager struct {
|
||
transactions unsafe.Pointer // Атомарный указатель на map[string]*Transaction
|
||
currentTx unsafe.Pointer // Атомарный указатель на текущую транзакцию
|
||
}
|
||
|
||
// NewTransactionManager создаёт новый менеджер транзакций
|
||
func NewTransactionManager() *TransactionManager {
|
||
transactions := make(map[string]*Transaction)
|
||
return &TransactionManager{
|
||
transactions: unsafe.Pointer(&transactions),
|
||
currentTx: nil,
|
||
}
|
||
}
|
||
|
||
// Begin начинает новую транзакцию
|
||
func (tm *TransactionManager) Begin() (string, error) {
|
||
// Проверяем, есть ли активная транзакция
|
||
currentPtr := atomic.LoadPointer(&tm.currentTx)
|
||
if currentPtr != nil {
|
||
currentTx := (*Transaction)(currentPtr)
|
||
if currentTx.GetState() == TxStateActive {
|
||
return "", errors.New("транзакция уже активна")
|
||
}
|
||
}
|
||
|
||
id := generateTxID()
|
||
tx := NewTransaction(id)
|
||
|
||
// Атомарно устанавливаем текущую транзакцию
|
||
atomic.StorePointer(&tm.currentTx, unsafe.Pointer(tx))
|
||
|
||
// Добавляем в список транзакций
|
||
oldPtr := atomic.LoadPointer(&tm.transactions)
|
||
oldTxns := *(*map[string]*Transaction)(oldPtr)
|
||
|
||
newTxns := make(map[string]*Transaction)
|
||
for k, v := range oldTxns {
|
||
newTxns[k] = v
|
||
}
|
||
newTxns[id] = tx
|
||
|
||
atomic.StorePointer(&tm.transactions, unsafe.Pointer(&newTxns))
|
||
|
||
return id, nil
|
||
}
|
||
|
||
// Commit фиксирует текущую транзакцию
|
||
func (tm *TransactionManager) Commit() error {
|
||
currentPtr := atomic.LoadPointer(&tm.currentTx)
|
||
if currentPtr == nil {
|
||
return errors.New("нет активной транзакции")
|
||
}
|
||
|
||
tx := (*Transaction)(currentPtr)
|
||
if tx.GetState() != TxStateActive {
|
||
return errors.New("транзакция не активна")
|
||
}
|
||
|
||
tx.SetState(TxStateCommited)
|
||
atomic.StorePointer(&tm.currentTx, nil)
|
||
|
||
return nil
|
||
}
|
||
|
||
// Rollback откатывает текущую транзакцию
|
||
func (tm *TransactionManager) Rollback() error {
|
||
currentPtr := atomic.LoadPointer(&tm.currentTx)
|
||
if currentPtr == nil {
|
||
return errors.New("нет активной транзакции")
|
||
}
|
||
|
||
tx := (*Transaction)(currentPtr)
|
||
if tx.GetState() != TxStateActive {
|
||
return errors.New("транзакция не активна")
|
||
}
|
||
|
||
tx.SetState(TxStateRolledBack)
|
||
atomic.StorePointer(&tm.currentTx, nil)
|
||
|
||
return nil
|
||
}
|
||
|
||
// AddOperation добавляет операцию в текущую транзакцию
|
||
func (tm *TransactionManager) AddOperation(op Operation) error {
|
||
currentPtr := atomic.LoadPointer(&tm.currentTx)
|
||
if currentPtr == nil {
|
||
return errors.New("нет активной транзакции")
|
||
}
|
||
|
||
tx := (*Transaction)(currentPtr)
|
||
if tx.GetState() != TxStateActive {
|
||
return errors.New("транзакция не активна")
|
||
}
|
||
|
||
op.Timestamp = time.Now()
|
||
tx.AddOperation(op)
|
||
|
||
return nil
|
||
}
|
||
|
||
// GetCurrentTx возвращает текущую транзакцию (wait-free)
|
||
func (tm *TransactionManager) GetCurrentTx() *Transaction {
|
||
currentPtr := atomic.LoadPointer(&tm.currentTx)
|
||
if currentPtr == nil {
|
||
return nil
|
||
}
|
||
return (*Transaction)(currentPtr)
|
||
}
|
||
|
||
// GetTransaction возвращает транзакцию по ID (wait-free)
|
||
func (tm *TransactionManager) GetTransaction(id string) (*Transaction, bool) {
|
||
ptr := atomic.LoadPointer(&tm.transactions)
|
||
txns := *(*map[string]*Transaction)(ptr)
|
||
tx, exists := txns[id]
|
||
return tx, exists
|
||
}
|
||
|
||
// generateTxID генерирует ID транзакции
|
||
func generateTxID() string {
|
||
return "tx-" + types.GenerateID() + "-" + time.Now().Format("20060102150405")
|
||
}
|