// /futriis/internal/transaction/tx.go // Пакет transaction реализует механизм простых транзакций (не ACID) для операций с данными. // Предоставляет структуры для хранения состояния транзакций и управления их жизненным циклом. // Использует wait-free операции через атомарные указатели. package transaction import ( "errors" "sync/atomic" "time" "unsafe" "futriis/pkg/types" ) // TxState состояние транзакции type TxState int32 const ( TxStateActive TxState = iota TxStateCommited TxStateRolledBack ) // String возвращает строковое представление состояния func (s TxState) String() string { switch s { case TxStateActive: return "active" case TxStateCommited: return "committed" case TxStateRolledBack: return "rolled_back" default: return "unknown" } } // Operation представляет операцию в транзакции type Operation struct { Type string // create, update, delete Tapple string Slice string Key string Value interface{} OldValue interface{} Timestamp time.Time } // Transaction представляет транзакцию с wait-free состоянием type Transaction struct { ID string state int32 // Атомарное состояние Operations []Operation CreatedAt time.Time } // NewTransaction создаёт новую транзакцию func NewTransaction(id string) *Transaction { return &Transaction{ ID: id, state: int32(TxStateActive), Operations: make([]Operation, 0), CreatedAt: time.Now(), } } // GetState атомарно получает состояние транзакции func (tx *Transaction) GetState() TxState { return TxState(atomic.LoadInt32(&tx.state)) } // SetState атомарно устанавливает состояние транзакции func (tx *Transaction) SetState(state TxState) { atomic.StoreInt32(&tx.state, int32(state)) } // AddOperation добавляет операцию в транзакцию func (tx *Transaction) AddOperation(op Operation) { tx.Operations = append(tx.Operations, op) } // TransactionManager управляет транзакциями с wait-free операциями type TransactionManager struct { transactions unsafe.Pointer // Атомарный указатель на map[string]*Transaction currentTx unsafe.Pointer // Атомарный указатель на текущую транзакцию } // NewTransactionManager создаёт новый менеджер транзакций func NewTransactionManager() *TransactionManager { transactions := make(map[string]*Transaction) return &TransactionManager{ transactions: unsafe.Pointer(&transactions), currentTx: nil, } } // Begin начинает новую транзакцию func (tm *TransactionManager) Begin() (string, error) { // Проверяем, есть ли активная транзакция currentPtr := atomic.LoadPointer(&tm.currentTx) if currentPtr != nil { currentTx := (*Transaction)(currentPtr) if currentTx.GetState() == TxStateActive { return "", errors.New("транзакция уже активна") } } id := generateTxID() tx := NewTransaction(id) // Атомарно устанавливаем текущую транзакцию atomic.StorePointer(&tm.currentTx, unsafe.Pointer(tx)) // Добавляем в список транзакций oldPtr := atomic.LoadPointer(&tm.transactions) oldTxns := *(*map[string]*Transaction)(oldPtr) newTxns := make(map[string]*Transaction) for k, v := range oldTxns { newTxns[k] = v } newTxns[id] = tx atomic.StorePointer(&tm.transactions, unsafe.Pointer(&newTxns)) return id, nil } // Commit фиксирует текущую транзакцию func (tm *TransactionManager) Commit() error { currentPtr := atomic.LoadPointer(&tm.currentTx) if currentPtr == nil { return errors.New("нет активной транзакции") } tx := (*Transaction)(currentPtr) if tx.GetState() != TxStateActive { return errors.New("транзакция не активна") } tx.SetState(TxStateCommited) atomic.StorePointer(&tm.currentTx, nil) return nil } // Rollback откатывает текущую транзакцию func (tm *TransactionManager) Rollback() error { currentPtr := atomic.LoadPointer(&tm.currentTx) if currentPtr == nil { return errors.New("нет активной транзакции") } tx := (*Transaction)(currentPtr) if tx.GetState() != TxStateActive { return errors.New("транзакция не активна") } tx.SetState(TxStateRolledBack) atomic.StorePointer(&tm.currentTx, nil) return nil } // AddOperation добавляет операцию в текущую транзакцию func (tm *TransactionManager) AddOperation(op Operation) error { currentPtr := atomic.LoadPointer(&tm.currentTx) if currentPtr == nil { return errors.New("нет активной транзакции") } tx := (*Transaction)(currentPtr) if tx.GetState() != TxStateActive { return errors.New("транзакция не активна") } op.Timestamp = time.Now() tx.AddOperation(op) return nil } // GetCurrentTx возвращает текущую транзакцию (wait-free) func (tm *TransactionManager) GetCurrentTx() *Transaction { currentPtr := atomic.LoadPointer(&tm.currentTx) if currentPtr == nil { return nil } return (*Transaction)(currentPtr) } // GetTransaction возвращает транзакцию по ID (wait-free) func (tm *TransactionManager) GetTransaction(id string) (*Transaction, bool) { ptr := atomic.LoadPointer(&tm.transactions) txns := *(*map[string]*Transaction)(ptr) tx, exists := txns[id] return tx, exists } // generateTxID генерирует ID транзакции func generateTxID() string { return "tx-" + types.GenerateID() + "-" + time.Now().Format("20060102150405") }