Files
futriix/internal/storage/transaction.go

932 lines
30 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
* Copyright 2026 Safronov Grigorii
*
* Licensed under the CDDL, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
* https://opensource.org/licenses/CDDL-1.0
*/
// Файл: internal/storage/transaction.go
// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control)
// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование.
// Полная WAL реализация на чистом Go без сторонних зависимостей.
package storage
import (
"bufio"
"encoding/binary"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
)
// TransactionID представляет уникальный идентификатор транзакции
type TransactionID uint64
// TransactionState представляет состояние транзакции
type TransactionState int32
const (
TransactionActive TransactionState = iota
TransactionCommitted
TransactionAborted
)
// TransactionRecord представляет запись в WAL
type TransactionRecord struct {
ID TransactionID `json:"id"`
State TransactionState `json:"state"`
Timestamp int64 `json:"timestamp"`
Operations []Operation `json:"operations"`
}
// Operation представляет одну операцию в транзакции
type Operation struct {
Type string `json:"type"` // "insert", "update", "delete"
Database string `json:"database"`
Collection string `json:"collection"`
DocumentID string `json:"document_id"`
Data map[string]interface{} `json:"data"`
Version uint64 `json:"version"`
OldData map[string]interface{} `json:"old_data"` // для отката
}
// DocumentVersion представляет версию документа для MVCC
type DocumentVersion struct {
Document *Document `json:"document"`
Timestamp int64 `json:"timestamp"`
TxID TransactionID `json:"tx_id"`
}
// TransactionInfo представляет информацию о транзакции для API
type TransactionInfo struct {
ID string `json:"id"`
Status string `json:"status"`
StartTime int64 `json:"start_time"`
OperationCount int `json:"operation_count"`
Operations []OperationInfo `json:"operations,omitempty"`
}
// OperationInfo представляет информацию об операции для API
type OperationInfo struct {
Type string `json:"type"`
Database string `json:"database"`
Collection string `json:"collection"`
DocumentID string `json:"document_id"`
}
// WALRecord представляет запись в WAL файле
type WALRecord struct {
CRC uint32 `json:"crc"`
Length uint32 `json:"length"`
Type byte `json:"type"` // 1=Transaction, 2=Checkpoint
Data []byte `json:"data"`
Timestamp int64 `json:"timestamp"`
LSN uint64 `json:"lsn"` // Log Sequence Number
}
// WALManager управляет Write-Ahead Log
type WALManager struct {
mu sync.RWMutex
file *os.File
writer *bufio.Writer
path string
currentLSN uint64
lastSync time.Time
syncInterval time.Duration
bufferSize int
closed bool
writeChan chan *WALRecord
stopChan chan struct{}
wg sync.WaitGroup
}
// TransactionManager управляет транзакциями
type TransactionManager struct {
activeTransactions sync.Map // map[TransactionID]*Transaction
nextTxID atomic.Uint64
wal *WALManager
logger LoggerInterface
mu sync.RWMutex
walPath string
checkpointInterval int64 // интервал создания чекпоинтов в секундах
lastCheckpoint int64
checkpointFile *os.File
}
// Transaction представляет одну транзакцию
type Transaction struct {
ID TransactionID
State atomic.Int32
Operations []Operation
StartTime int64
mu sync.RWMutex
}
// LoggerInterface определяет интерфейс для логирования
type LoggerInterface interface {
Debug(msg string)
Info(msg string)
Error(msg string)
Warn(msg string)
}
var (
globalTxManager *TransactionManager
txManagerOnce sync.Once
currentTx atomic.Value // *Transaction
)
// CRC32 таблица для быстрых вычислений
var crc32Table = [256]uint32{
0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f,
0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2,
0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9,
0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c,
0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423,
0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106,
0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d,
0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950,
0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7,
0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa,
0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81,
0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84,
0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb,
0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e,
0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55,
0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28,
0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f,
0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242,
0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69,
0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc,
0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693,
0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
}
// crc32 вычисляет CRC32 хеш
func crc32(data []byte) uint32 {
crc := uint32(0xFFFFFFFF)
for _, b := range data {
crc = (crc >> 8) ^ crc32Table[(crc^uint32(b))&0xFF]
}
return crc ^ 0xFFFFFFFF
}
// NewWALManager создаёт новый WAL менеджер
func NewWALManager(path string) (*WALManager, error) {
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to create WAL directory: %v", err)
}
file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
if err != nil {
return nil, fmt.Errorf("failed to open WAL file: %v", err)
}
// Определяем текущий LSN из файла
var currentLSN uint64 = 1
stat, err := file.Stat()
if err == nil && stat.Size() > 0 {
// Читаем последнюю запись для определения LSN
currentLSN = uint64(stat.Size()) / 100 // приблизительное значение
if currentLSN < 1 {
currentLSN = 1
}
}
wm := &WALManager{
file: file,
writer: bufio.NewWriterSize(file, 64*1024), // 64KB буфер
path: path,
currentLSN: currentLSN,
lastSync: time.Now(),
syncInterval: 5 * time.Second,
bufferSize: 64 * 1024,
writeChan: make(chan *WALRecord, 10000),
stopChan: make(chan struct{}),
}
// Запускаем фоновую запись
wm.wg.Add(1)
go wm.writerLoop()
return wm, nil
}
// writerLoop асинхронно записывает записи в WAL
func (wm *WALManager) writerLoop() {
defer wm.wg.Done()
batch := make([]*WALRecord, 0, 100)
ticker := time.NewTicker(wm.syncInterval)
defer ticker.Stop()
for {
select {
case record, ok := <-wm.writeChan:
if !ok {
// Канал закрыт, сбрасываем оставшиеся записи
if len(batch) > 0 {
wm.flushBatch(batch)
}
return
}
batch = append(batch, record)
if len(batch) >= 100 {
wm.flushBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
wm.flushBatch(batch)
batch = batch[:0]
}
wm.sync()
case <-wm.stopChan:
if len(batch) > 0 {
wm.flushBatch(batch)
}
wm.sync()
return
}
}
}
// flushBatch записывает пакет записей
func (wm *WALManager) flushBatch(batch []*WALRecord) {
wm.mu.Lock()
defer wm.mu.Unlock()
for _, record := range batch {
// Сериализуем запись
data, err := json.Marshal(record)
if err != nil {
continue
}
// Записываем размер (4 байта)
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
if _, err := wm.writer.Write(lenBuf); err != nil {
continue
}
// Записываем данные
if _, err := wm.writer.Write(data); err != nil {
continue
}
// Обновляем LSN
record.LSN = wm.currentLSN
wm.currentLSN++
}
}
// sync синхронизирует буфер с диском
func (wm *WALManager) sync() {
wm.mu.Lock()
defer wm.mu.Unlock()
if time.Since(wm.lastSync) >= wm.syncInterval {
wm.writer.Flush()
wm.file.Sync()
wm.lastSync = time.Now()
}
}
// Write записывает запись в WAL
func (wm *WALManager) Write(record *WALRecord) error {
if wm.closed {
return fmt.Errorf("WAL is closed")
}
record.Timestamp = time.Now().UnixMilli()
// Вычисляем CRC
data, err := json.Marshal(record.Data)
if err != nil {
return err
}
record.CRC = crc32(data)
select {
case wm.writeChan <- record:
return nil
case <-time.After(5 * time.Second):
return fmt.Errorf("WAL write timeout")
}
}
// ReadAll читает все записи из WAL
func (wm *WALManager) ReadAll() ([]*WALRecord, error) {
wm.mu.RLock()
defer wm.mu.RUnlock()
// Сбрасываем буферы
wm.writer.Flush()
// Открываем файл для чтения
file, err := os.Open(wm.path)
if err != nil {
return nil, err
}
defer file.Close()
records := make([]*WALRecord, 0)
reader := bufio.NewReader(file)
lenBuf := make([]byte, 4)
for {
_, err := reader.Read(lenBuf)
if err != nil {
break
}
recordLen := binary.BigEndian.Uint32(lenBuf)
recordData := make([]byte, recordLen)
_, err = reader.Read(recordData)
if err != nil {
break
}
var record WALRecord
if err := json.Unmarshal(recordData, &record); err != nil {
continue
}
// Проверяем CRC
data, _ := json.Marshal(record.Data)
if crc32(data) != record.CRC {
continue // Пропускаем повреждённые записи
}
records = append(records, &record)
}
return records, nil
}
// Close закрывает WAL
func (wm *WALManager) Close() error {
wm.mu.Lock()
wm.closed = true
wm.mu.Unlock()
close(wm.stopChan)
close(wm.writeChan)
wm.wg.Wait()
wm.mu.Lock()
defer wm.mu.Unlock()
if err := wm.writer.Flush(); err != nil {
return err
}
return wm.file.Close()
}
// InitTransactionManager инициализирует глобальный менеджер транзакций
func InitTransactionManager(walPath string) error {
var err error
txManagerOnce.Do(func() {
globalTxManager = &TransactionManager{
nextTxID: atomic.Uint64{},
walPath: walPath,
checkpointInterval: 300, // 5 минут
lastCheckpoint: time.Now().Unix(),
}
globalTxManager.nextTxID.Store(1)
err = globalTxManager.initWAL(walPath)
if err == nil {
// Восстанавливаем состояние из WAL
go globalTxManager.recoverFromWAL()
// Запускаем периодическое создание чекпоинтов
go globalTxManager.checkpointLoop()
}
})
return err
}
// initWAL инициализирует Write-Ahead Log
func (tm *TransactionManager) initWAL(walPath string) error {
wal, err := NewWALManager(walPath)
if err != nil {
return err
}
tm.wal = wal
return nil
}
// checkpointLoop периодически создаёт чекпоинты
func (tm *TransactionManager) checkpointLoop() {
ticker := time.NewTicker(time.Duration(tm.checkpointInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
tm.createCheckpoint()
}
}
// createCheckpoint создаёт чекпоинт текущего состояния
func (tm *TransactionManager) createCheckpoint() {
now := time.Now().Unix()
if now-tm.lastCheckpoint < tm.checkpointInterval {
return
}
// Создаём чекпоинт-файл
checkpointPath := fmt.Sprintf("%s.checkpoint.%d", tm.walPath, now)
// Собираем состояние всех активных транзакций
checkpoint := make(map[string]interface{})
checkpoint["timestamp"] = now
checkpoint["last_lsn"] = tm.wal.currentLSN
transactions := make([]*Transaction, 0)
tm.activeTransactions.Range(func(key, value interface{}) bool {
tx := value.(*Transaction)
transactions = append(transactions, tx)
return true
})
checkpoint["transactions"] = transactions
data, err := json.Marshal(checkpoint)
if err != nil {
if tm.logger != nil {
tm.logger.Error(fmt.Sprintf("Failed to marshal checkpoint: %v", err))
}
return
}
// Записываем чекпоинт
if err := os.WriteFile(checkpointPath, data, 0644); err != nil {
if tm.logger != nil {
tm.logger.Error(fmt.Sprintf("Failed to write checkpoint: %v", err))
}
return
}
tm.lastCheckpoint = now
if tm.logger != nil {
tm.logger.Info(fmt.Sprintf("Checkpoint created: %s", checkpointPath))
}
// Удаляем старые чекпоинты
go tm.cleanOldCheckpoints()
}
// cleanOldCheckpoints удаляет старые чекпоинты
func (tm *TransactionManager) cleanOldCheckpoints() {
files, err := filepath.Glob(fmt.Sprintf("%s.checkpoint.*", tm.walPath))
if err != nil {
return
}
// Оставляем только 5 последних чекпоинтов
if len(files) > 5 {
for i := 0; i < len(files)-5; i++ {
os.Remove(files[i])
}
}
}
// recoverFromWAL восстанавливает состояние из WAL после сбоя
func (tm *TransactionManager) recoverFromWAL() {
if tm.wal == nil {
return
}
records, err := tm.wal.ReadAll()
if err != nil {
if tm.logger != nil {
tm.logger.Error(fmt.Sprintf("Failed to read WAL: %v", err))
}
return
}
for _, record := range records {
if record.Type == 1 { // Transaction record
var txRecord TransactionRecord
if err := json.Unmarshal(record.Data, &txRecord); err != nil {
continue
}
if txRecord.State == TransactionCommitted {
// Повторно применяем закоммиченные операции
for _, op := range txRecord.Operations {
if err := applyOperation(op); err != nil {
if tm.logger != nil {
tm.logger.Error(fmt.Sprintf("Failed to replay operation: %v", err))
}
}
}
}
// Транзакции в состоянии Active или Aborted игнорируем
}
}
if tm.logger != nil {
tm.logger.Info(fmt.Sprintf("Recovered %d transactions from WAL", len(records)))
}
}
// SetTransactionLogger устанавливает логгер для транзакций
func SetTransactionLogger(logger LoggerInterface) {
if globalTxManager != nil {
globalTxManager.logger = logger
}
}
// BeginTransaction начинает новую транзакцию
func BeginTransaction() *Transaction {
if globalTxManager == nil {
InitTransactionManager("futriis.wal")
}
tx := &Transaction{
ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1),
StartTime: time.Now().UnixMilli(),
Operations: make([]Operation, 0),
}
tx.State.Store(int32(TransactionActive))
globalTxManager.activeTransactions.Store(tx.ID, tx)
currentTx.Store(tx)
// Записываем начало транзакции в WAL
txData, _ := json.Marshal(TransactionRecord{
ID: tx.ID,
State: TransactionActive,
Timestamp: tx.StartTime,
Operations: tx.Operations,
})
if globalTxManager.wal != nil {
globalTxManager.wal.Write(&WALRecord{
Type: 1,
Data: txData,
LSN: globalTxManager.wal.currentLSN,
})
}
if globalTxManager.logger != nil {
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started", tx.ID))
}
AuditLog("START", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
"start_time": tx.StartTime,
})
return tx
}
// BeginTransactionOnCollection начинает транзакцию для конкретной коллекции
func BeginTransactionOnCollection(coll *Collection) error {
if globalTxManager == nil {
if err := InitTransactionManager("futriis.wal"); err != nil {
return err
}
}
tx := BeginTransaction()
if tx == nil {
return fmt.Errorf("failed to create transaction")
}
if globalTxManager.logger != nil {
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started on collection %s.%s", tx.ID, coll.dbName, coll.name))
}
return nil
}
// AddToTransaction добавляет операцию в текущую транзакцию
func AddToTransaction(coll *Collection, opType string, docID string, newData map[string]interface{}, oldData map[string]interface{}, version uint64) error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
if TransactionState(tx.State.Load()) != TransactionActive {
return fmt.Errorf("transaction is not active")
}
op := Operation{
Type: opType,
Database: coll.dbName,
Collection: coll.name,
DocumentID: docID,
Data: newData,
OldData: oldData,
Version: version,
}
tx.mu.Lock()
tx.Operations = append(tx.Operations, op)
tx.mu.Unlock()
if globalTxManager.logger != nil {
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d: added %s operation on %s", tx.ID, opType, docID))
}
return nil
}
// CommitCurrentTransaction коммитит текущую транзакцию
func CommitCurrentTransaction() error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
if TransactionState(tx.State.Load()) != TransactionActive {
return fmt.Errorf("transaction is not active")
}
// Применяем все операции атомарно
for _, op := range tx.Operations {
if err := applyOperation(op); err != nil {
// Откатываем при ошибке
AbortCurrentTransaction()
return fmt.Errorf("transaction commit failed at operation %s: %v", op.Type, err)
}
}
tx.State.Store(int32(TransactionCommitted))
// Записываем коммит в WAL
txData, _ := json.Marshal(TransactionRecord{
ID: tx.ID,
State: TransactionCommitted,
Timestamp: time.Now().UnixMilli(),
Operations: tx.Operations,
})
if globalTxManager.wal != nil {
globalTxManager.wal.Write(&WALRecord{
Type: 1,
Data: txData,
LSN: globalTxManager.wal.currentLSN,
})
}
if globalTxManager.logger != nil {
globalTxManager.logger.Info(fmt.Sprintf("Transaction %d committed with %d operations", tx.ID, len(tx.Operations)))
}
AuditLog("COMMIT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
"operations": len(tx.Operations),
})
// Очищаем текущую транзакцию
currentTx.Store(nil)
globalTxManager.activeTransactions.Delete(tx.ID)
return nil
}
// AbortCurrentTransaction откатывает текущую транзакцию
func AbortCurrentTransaction() error {
txVal := currentTx.Load()
if txVal == nil {
return fmt.Errorf("no active transaction")
}
tx := txVal.(*Transaction)
tx.State.Store(int32(TransactionAborted))
// Записываем откат в WAL
txData, _ := json.Marshal(TransactionRecord{
ID: tx.ID,
State: TransactionAborted,
Timestamp: time.Now().UnixMilli(),
Operations: tx.Operations,
})
if globalTxManager.wal != nil {
globalTxManager.wal.Write(&WALRecord{
Type: 1,
Data: txData,
LSN: globalTxManager.wal.currentLSN,
})
}
if globalTxManager.logger != nil {
globalTxManager.logger.Info(fmt.Sprintf("Transaction %d aborted with %d operations", tx.ID, len(tx.Operations)))
}
AuditLog("ABORT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
"operations": len(tx.Operations),
})
// Очищаем текущую транзакцию
currentTx.Store(nil)
globalTxManager.activeTransactions.Delete(tx.ID)
return nil
}
// HasActiveTransaction проверяет наличие активной транзакции
func HasActiveTransaction() bool {
return currentTx.Load() != nil
}
// GetCurrentTransactionID возвращает ID текущей транзакции
func GetCurrentTransactionID() string {
txVal := currentTx.Load()
if txVal == nil {
return ""
}
tx := txVal.(*Transaction)
return fmt.Sprintf("%d", tx.ID)
}
// GetActiveTransactions возвращает список активных транзакций для API
func GetActiveTransactions() []TransactionInfo {
if globalTxManager == nil {
return []TransactionInfo{}
}
transactions := make([]TransactionInfo, 0)
globalTxManager.activeTransactions.Range(func(key, value interface{}) bool {
tx := value.(*Transaction)
status := "active"
if TransactionState(tx.State.Load()) == TransactionCommitted {
status = "committed"
} else if TransactionState(tx.State.Load()) == TransactionAborted {
status = "aborted"
}
tx.mu.RLock()
opCount := len(tx.Operations)
operations := make([]OperationInfo, 0, opCount)
for _, op := range tx.Operations {
operations = append(operations, OperationInfo{
Type: op.Type,
Database: op.Database,
Collection: op.Collection,
DocumentID: op.DocumentID,
})
}
tx.mu.RUnlock()
transactions = append(transactions, TransactionInfo{
ID: fmt.Sprintf("%d", tx.ID),
Status: status,
StartTime: tx.StartTime,
OperationCount: opCount,
Operations: operations,
})
return true
})
return transactions
}
// GetTransactionByID возвращает транзакцию по ID
func GetTransactionByID(id string) (*Transaction, error) {
if globalTxManager == nil {
return nil, fmt.Errorf("transaction manager not initialized")
}
var txID TransactionID
fmt.Sscanf(id, "%d", &txID)
if val, ok := globalTxManager.activeTransactions.Load(txID); ok {
return val.(*Transaction), nil
}
return nil, fmt.Errorf("transaction not found")
}
// FindInTransaction ищет документ в контексте транзакции
func FindInTransaction(coll *Collection, id string) (*Document, error) {
txVal := currentTx.Load()
if txVal == nil {
return coll.Find(id)
}
tx := txVal.(*Transaction)
tx.mu.RLock()
defer tx.mu.RUnlock()
// Ищем в операциях транзакции в обратном порядке
for i := len(tx.Operations) - 1; i >= 0; i-- {
op := tx.Operations[i]
if op.DocumentID == id {
if op.Type == "delete" {
return nil, fmt.Errorf("document deleted in transaction")
}
if op.Type == "insert" || op.Type == "update" {
doc := NewDocumentWithID(op.DocumentID)
for k, v := range op.Data {
doc.SetField(k, v)
}
doc.Version = op.Version
return doc, nil
}
}
}
return coll.Find(id)
}
// applyOperation применяет операцию к хранилищу
func applyOperation(op Operation) error {
db, err := GetGlobalStorage().GetDatabase(op.Database)
if err != nil {
return fmt.Errorf("database not found: %s", op.Database)
}
coll, err := db.GetCollection(op.Collection)
if err != nil {
return fmt.Errorf("collection not found: %s", op.Collection)
}
switch op.Type {
case "insert":
doc := NewDocumentWithID(op.DocumentID)
for k, v := range op.Data {
doc.SetField(k, v)
}
doc.Version = op.Version
return coll.Insert(doc)
case "update":
return coll.Update(op.DocumentID, op.Data)
case "delete":
return coll.Delete(op.DocumentID)
}
return nil
}
// GetGlobalStorage возвращает глобальное хранилище
var globalStorage *Storage
// SetGlobalStorage устанавливает глобальное хранилище
func SetGlobalStorage(s *Storage) {
globalStorage = s
}
// GetGlobalStorage возвращает глобальное хранилище
func GetGlobalStorage() *Storage {
return globalStorage
}
// AuditLog записывает событие в аудит
func AuditLog(operation, dataType, name string, details map[string]interface{}) {
LogAudit(operation, dataType, name, details)
}
// MVCCSnapshot создаёт снапшот текущего состояния для MVCC
func MVCCSnapshot() uint64 {
return uint64(time.Now().UnixNano())
}
// CreateDocumentVersion создаёт новую версию документа для MVCC
func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion {
return &DocumentVersion{
Document: doc.Clone(),
Timestamp: time.Now().UnixMilli(),
TxID: txID,
}
}