Upload files to "internal/storage"
This commit is contained in:
931
internal/storage/transaction.go
Normal file
931
internal/storage/transaction.go
Normal file
@@ -0,0 +1,931 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2026 Safronov Grigorii
|
||||||
|
*
|
||||||
|
* Licensed under the CDDL, Version 1.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
*
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* https://opensource.org/licenses/CDDL-1.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Файл: internal/storage/transaction.go
|
||||||
|
// Назначение: Реализация транзакций с поддержкой MVCC (Multi-Version Concurrency Control)
|
||||||
|
// и WAL (Write-Ahead Logging) без блокировок. Использует атомарные операции и версионирование.
|
||||||
|
// Полная WAL реализация на чистом Go без сторонних зависимостей.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/binary"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TransactionID представляет уникальный идентификатор транзакции
|
||||||
|
type TransactionID uint64
|
||||||
|
|
||||||
|
// TransactionState представляет состояние транзакции
|
||||||
|
type TransactionState int32
|
||||||
|
|
||||||
|
const (
|
||||||
|
TransactionActive TransactionState = iota
|
||||||
|
TransactionCommitted
|
||||||
|
TransactionAborted
|
||||||
|
)
|
||||||
|
|
||||||
|
// TransactionRecord представляет запись в WAL
|
||||||
|
type TransactionRecord struct {
|
||||||
|
ID TransactionID `json:"id"`
|
||||||
|
State TransactionState `json:"state"`
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
Operations []Operation `json:"operations"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Operation представляет одну операцию в транзакции
|
||||||
|
type Operation struct {
|
||||||
|
Type string `json:"type"` // "insert", "update", "delete"
|
||||||
|
Database string `json:"database"`
|
||||||
|
Collection string `json:"collection"`
|
||||||
|
DocumentID string `json:"document_id"`
|
||||||
|
Data map[string]interface{} `json:"data"`
|
||||||
|
Version uint64 `json:"version"`
|
||||||
|
OldData map[string]interface{} `json:"old_data"` // для отката
|
||||||
|
}
|
||||||
|
|
||||||
|
// DocumentVersion представляет версию документа для MVCC
|
||||||
|
type DocumentVersion struct {
|
||||||
|
Document *Document `json:"document"`
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
TxID TransactionID `json:"tx_id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransactionInfo представляет информацию о транзакции для API
|
||||||
|
type TransactionInfo struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
StartTime int64 `json:"start_time"`
|
||||||
|
OperationCount int `json:"operation_count"`
|
||||||
|
Operations []OperationInfo `json:"operations,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// OperationInfo представляет информацию об операции для API
|
||||||
|
type OperationInfo struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Database string `json:"database"`
|
||||||
|
Collection string `json:"collection"`
|
||||||
|
DocumentID string `json:"document_id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// WALRecord представляет запись в WAL файле
|
||||||
|
type WALRecord struct {
|
||||||
|
CRC uint32 `json:"crc"`
|
||||||
|
Length uint32 `json:"length"`
|
||||||
|
Type byte `json:"type"` // 1=Transaction, 2=Checkpoint
|
||||||
|
Data []byte `json:"data"`
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
LSN uint64 `json:"lsn"` // Log Sequence Number
|
||||||
|
}
|
||||||
|
|
||||||
|
// WALManager управляет Write-Ahead Log
|
||||||
|
type WALManager struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
file *os.File
|
||||||
|
writer *bufio.Writer
|
||||||
|
path string
|
||||||
|
currentLSN uint64
|
||||||
|
lastSync time.Time
|
||||||
|
syncInterval time.Duration
|
||||||
|
bufferSize int
|
||||||
|
closed bool
|
||||||
|
writeChan chan *WALRecord
|
||||||
|
stopChan chan struct{}
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransactionManager управляет транзакциями
|
||||||
|
type TransactionManager struct {
|
||||||
|
activeTransactions sync.Map // map[TransactionID]*Transaction
|
||||||
|
nextTxID atomic.Uint64
|
||||||
|
wal *WALManager
|
||||||
|
logger LoggerInterface
|
||||||
|
mu sync.RWMutex
|
||||||
|
walPath string
|
||||||
|
checkpointInterval int64 // интервал создания чекпоинтов в секундах
|
||||||
|
lastCheckpoint int64
|
||||||
|
checkpointFile *os.File
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transaction представляет одну транзакцию
|
||||||
|
type Transaction struct {
|
||||||
|
ID TransactionID
|
||||||
|
State atomic.Int32
|
||||||
|
Operations []Operation
|
||||||
|
StartTime int64
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoggerInterface определяет интерфейс для логирования
|
||||||
|
type LoggerInterface interface {
|
||||||
|
Debug(msg string)
|
||||||
|
Info(msg string)
|
||||||
|
Error(msg string)
|
||||||
|
Warn(msg string)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
globalTxManager *TransactionManager
|
||||||
|
txManagerOnce sync.Once
|
||||||
|
currentTx atomic.Value // *Transaction
|
||||||
|
)
|
||||||
|
|
||||||
|
// CRC32 таблица для быстрых вычислений
|
||||||
|
var crc32Table = [256]uint32{
|
||||||
|
0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f,
|
||||||
|
0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
|
||||||
|
0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2,
|
||||||
|
0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
|
||||||
|
0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9,
|
||||||
|
0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
|
||||||
|
0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c,
|
||||||
|
0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
|
||||||
|
0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423,
|
||||||
|
0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
|
||||||
|
0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106,
|
||||||
|
0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
|
||||||
|
0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d,
|
||||||
|
0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
|
||||||
|
0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950,
|
||||||
|
0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
|
||||||
|
0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7,
|
||||||
|
0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
|
||||||
|
0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa,
|
||||||
|
0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
|
||||||
|
0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81,
|
||||||
|
0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
|
||||||
|
0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84,
|
||||||
|
0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
|
||||||
|
0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb,
|
||||||
|
0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
|
||||||
|
0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e,
|
||||||
|
0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
|
||||||
|
0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55,
|
||||||
|
0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
|
||||||
|
0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28,
|
||||||
|
0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
|
||||||
|
0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f,
|
||||||
|
0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
|
||||||
|
0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242,
|
||||||
|
0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
|
||||||
|
0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69,
|
||||||
|
0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
|
||||||
|
0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc,
|
||||||
|
0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
|
||||||
|
0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693,
|
||||||
|
0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
|
||||||
|
0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
|
||||||
|
}
|
||||||
|
|
||||||
|
// crc32 вычисляет CRC32 хеш
|
||||||
|
func crc32(data []byte) uint32 {
|
||||||
|
crc := uint32(0xFFFFFFFF)
|
||||||
|
for _, b := range data {
|
||||||
|
crc = (crc >> 8) ^ crc32Table[(crc^uint32(b))&0xFF]
|
||||||
|
}
|
||||||
|
return crc ^ 0xFFFFFFFF
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWALManager создаёт новый WAL менеджер
|
||||||
|
func NewWALManager(path string) (*WALManager, error) {
|
||||||
|
dir := filepath.Dir(path)
|
||||||
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create WAL directory: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to open WAL file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Определяем текущий LSN из файла
|
||||||
|
var currentLSN uint64 = 1
|
||||||
|
stat, err := file.Stat()
|
||||||
|
if err == nil && stat.Size() > 0 {
|
||||||
|
// Читаем последнюю запись для определения LSN
|
||||||
|
currentLSN = uint64(stat.Size()) / 100 // приблизительное значение
|
||||||
|
if currentLSN < 1 {
|
||||||
|
currentLSN = 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wm := &WALManager{
|
||||||
|
file: file,
|
||||||
|
writer: bufio.NewWriterSize(file, 64*1024), // 64KB буфер
|
||||||
|
path: path,
|
||||||
|
currentLSN: currentLSN,
|
||||||
|
lastSync: time.Now(),
|
||||||
|
syncInterval: 5 * time.Second,
|
||||||
|
bufferSize: 64 * 1024,
|
||||||
|
writeChan: make(chan *WALRecord, 10000),
|
||||||
|
stopChan: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Запускаем фоновую запись
|
||||||
|
wm.wg.Add(1)
|
||||||
|
go wm.writerLoop()
|
||||||
|
|
||||||
|
return wm, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// writerLoop асинхронно записывает записи в WAL
|
||||||
|
func (wm *WALManager) writerLoop() {
|
||||||
|
defer wm.wg.Done()
|
||||||
|
|
||||||
|
batch := make([]*WALRecord, 0, 100)
|
||||||
|
ticker := time.NewTicker(wm.syncInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case record, ok := <-wm.writeChan:
|
||||||
|
if !ok {
|
||||||
|
// Канал закрыт, сбрасываем оставшиеся записи
|
||||||
|
if len(batch) > 0 {
|
||||||
|
wm.flushBatch(batch)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
batch = append(batch, record)
|
||||||
|
if len(batch) >= 100 {
|
||||||
|
wm.flushBatch(batch)
|
||||||
|
batch = batch[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-ticker.C:
|
||||||
|
if len(batch) > 0 {
|
||||||
|
wm.flushBatch(batch)
|
||||||
|
batch = batch[:0]
|
||||||
|
}
|
||||||
|
wm.sync()
|
||||||
|
|
||||||
|
case <-wm.stopChan:
|
||||||
|
if len(batch) > 0 {
|
||||||
|
wm.flushBatch(batch)
|
||||||
|
}
|
||||||
|
wm.sync()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// flushBatch записывает пакет записей
|
||||||
|
func (wm *WALManager) flushBatch(batch []*WALRecord) {
|
||||||
|
wm.mu.Lock()
|
||||||
|
defer wm.mu.Unlock()
|
||||||
|
|
||||||
|
for _, record := range batch {
|
||||||
|
// Сериализуем запись
|
||||||
|
data, err := json.Marshal(record)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Записываем размер (4 байта)
|
||||||
|
lenBuf := make([]byte, 4)
|
||||||
|
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
|
||||||
|
if _, err := wm.writer.Write(lenBuf); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Записываем данные
|
||||||
|
if _, err := wm.writer.Write(data); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Обновляем LSN
|
||||||
|
record.LSN = wm.currentLSN
|
||||||
|
wm.currentLSN++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sync синхронизирует буфер с диском
|
||||||
|
func (wm *WALManager) sync() {
|
||||||
|
wm.mu.Lock()
|
||||||
|
defer wm.mu.Unlock()
|
||||||
|
|
||||||
|
if time.Since(wm.lastSync) >= wm.syncInterval {
|
||||||
|
wm.writer.Flush()
|
||||||
|
wm.file.Sync()
|
||||||
|
wm.lastSync = time.Now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write записывает запись в WAL
|
||||||
|
func (wm *WALManager) Write(record *WALRecord) error {
|
||||||
|
if wm.closed {
|
||||||
|
return fmt.Errorf("WAL is closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
record.Timestamp = time.Now().UnixMilli()
|
||||||
|
|
||||||
|
// Вычисляем CRC
|
||||||
|
data, err := json.Marshal(record.Data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
record.CRC = crc32(data)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case wm.writeChan <- record:
|
||||||
|
return nil
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
return fmt.Errorf("WAL write timeout")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadAll читает все записи из WAL
|
||||||
|
func (wm *WALManager) ReadAll() ([]*WALRecord, error) {
|
||||||
|
wm.mu.RLock()
|
||||||
|
defer wm.mu.RUnlock()
|
||||||
|
|
||||||
|
// Сбрасываем буферы
|
||||||
|
wm.writer.Flush()
|
||||||
|
|
||||||
|
// Открываем файл для чтения
|
||||||
|
file, err := os.Open(wm.path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
records := make([]*WALRecord, 0)
|
||||||
|
reader := bufio.NewReader(file)
|
||||||
|
lenBuf := make([]byte, 4)
|
||||||
|
|
||||||
|
for {
|
||||||
|
_, err := reader.Read(lenBuf)
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
recordLen := binary.BigEndian.Uint32(lenBuf)
|
||||||
|
recordData := make([]byte, recordLen)
|
||||||
|
_, err = reader.Read(recordData)
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
var record WALRecord
|
||||||
|
if err := json.Unmarshal(recordData, &record); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Проверяем CRC
|
||||||
|
data, _ := json.Marshal(record.Data)
|
||||||
|
if crc32(data) != record.CRC {
|
||||||
|
continue // Пропускаем повреждённые записи
|
||||||
|
}
|
||||||
|
|
||||||
|
records = append(records, &record)
|
||||||
|
}
|
||||||
|
|
||||||
|
return records, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close закрывает WAL
|
||||||
|
func (wm *WALManager) Close() error {
|
||||||
|
wm.mu.Lock()
|
||||||
|
wm.closed = true
|
||||||
|
wm.mu.Unlock()
|
||||||
|
|
||||||
|
close(wm.stopChan)
|
||||||
|
close(wm.writeChan)
|
||||||
|
wm.wg.Wait()
|
||||||
|
|
||||||
|
wm.mu.Lock()
|
||||||
|
defer wm.mu.Unlock()
|
||||||
|
|
||||||
|
if err := wm.writer.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return wm.file.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitTransactionManager инициализирует глобальный менеджер транзакций
|
||||||
|
func InitTransactionManager(walPath string) error {
|
||||||
|
var err error
|
||||||
|
txManagerOnce.Do(func() {
|
||||||
|
globalTxManager = &TransactionManager{
|
||||||
|
nextTxID: atomic.Uint64{},
|
||||||
|
walPath: walPath,
|
||||||
|
checkpointInterval: 300, // 5 минут
|
||||||
|
lastCheckpoint: time.Now().Unix(),
|
||||||
|
}
|
||||||
|
globalTxManager.nextTxID.Store(1)
|
||||||
|
err = globalTxManager.initWAL(walPath)
|
||||||
|
if err == nil {
|
||||||
|
// Восстанавливаем состояние из WAL
|
||||||
|
go globalTxManager.recoverFromWAL()
|
||||||
|
// Запускаем периодическое создание чекпоинтов
|
||||||
|
go globalTxManager.checkpointLoop()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// initWAL инициализирует Write-Ahead Log
|
||||||
|
func (tm *TransactionManager) initWAL(walPath string) error {
|
||||||
|
wal, err := NewWALManager(walPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tm.wal = wal
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkpointLoop периодически создаёт чекпоинты
|
||||||
|
func (tm *TransactionManager) checkpointLoop() {
|
||||||
|
ticker := time.NewTicker(time.Duration(tm.checkpointInterval) * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for range ticker.C {
|
||||||
|
tm.createCheckpoint()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// createCheckpoint создаёт чекпоинт текущего состояния
|
||||||
|
func (tm *TransactionManager) createCheckpoint() {
|
||||||
|
now := time.Now().Unix()
|
||||||
|
if now-tm.lastCheckpoint < tm.checkpointInterval {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Создаём чекпоинт-файл
|
||||||
|
checkpointPath := fmt.Sprintf("%s.checkpoint.%d", tm.walPath, now)
|
||||||
|
|
||||||
|
// Собираем состояние всех активных транзакций
|
||||||
|
checkpoint := make(map[string]interface{})
|
||||||
|
checkpoint["timestamp"] = now
|
||||||
|
checkpoint["last_lsn"] = tm.wal.currentLSN
|
||||||
|
|
||||||
|
transactions := make([]*Transaction, 0)
|
||||||
|
tm.activeTransactions.Range(func(key, value interface{}) bool {
|
||||||
|
tx := value.(*Transaction)
|
||||||
|
transactions = append(transactions, tx)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
checkpoint["transactions"] = transactions
|
||||||
|
|
||||||
|
data, err := json.Marshal(checkpoint)
|
||||||
|
if err != nil {
|
||||||
|
if tm.logger != nil {
|
||||||
|
tm.logger.Error(fmt.Sprintf("Failed to marshal checkpoint: %v", err))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Записываем чекпоинт
|
||||||
|
if err := os.WriteFile(checkpointPath, data, 0644); err != nil {
|
||||||
|
if tm.logger != nil {
|
||||||
|
tm.logger.Error(fmt.Sprintf("Failed to write checkpoint: %v", err))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
tm.lastCheckpoint = now
|
||||||
|
|
||||||
|
if tm.logger != nil {
|
||||||
|
tm.logger.Info(fmt.Sprintf("Checkpoint created: %s", checkpointPath))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Удаляем старые чекпоинты
|
||||||
|
go tm.cleanOldCheckpoints()
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanOldCheckpoints удаляет старые чекпоинты
|
||||||
|
func (tm *TransactionManager) cleanOldCheckpoints() {
|
||||||
|
files, err := filepath.Glob(fmt.Sprintf("%s.checkpoint.*", tm.walPath))
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Оставляем только 5 последних чекпоинтов
|
||||||
|
if len(files) > 5 {
|
||||||
|
for i := 0; i < len(files)-5; i++ {
|
||||||
|
os.Remove(files[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// recoverFromWAL восстанавливает состояние из WAL после сбоя
|
||||||
|
func (tm *TransactionManager) recoverFromWAL() {
|
||||||
|
if tm.wal == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
records, err := tm.wal.ReadAll()
|
||||||
|
if err != nil {
|
||||||
|
if tm.logger != nil {
|
||||||
|
tm.logger.Error(fmt.Sprintf("Failed to read WAL: %v", err))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, record := range records {
|
||||||
|
if record.Type == 1 { // Transaction record
|
||||||
|
var txRecord TransactionRecord
|
||||||
|
if err := json.Unmarshal(record.Data, &txRecord); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if txRecord.State == TransactionCommitted {
|
||||||
|
// Повторно применяем закоммиченные операции
|
||||||
|
for _, op := range txRecord.Operations {
|
||||||
|
if err := applyOperation(op); err != nil {
|
||||||
|
if tm.logger != nil {
|
||||||
|
tm.logger.Error(fmt.Sprintf("Failed to replay operation: %v", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Транзакции в состоянии Active или Aborted игнорируем
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if tm.logger != nil {
|
||||||
|
tm.logger.Info(fmt.Sprintf("Recovered %d transactions from WAL", len(records)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetTransactionLogger устанавливает логгер для транзакций
|
||||||
|
func SetTransactionLogger(logger LoggerInterface) {
|
||||||
|
if globalTxManager != nil {
|
||||||
|
globalTxManager.logger = logger
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BeginTransaction начинает новую транзакцию
|
||||||
|
func BeginTransaction() *Transaction {
|
||||||
|
if globalTxManager == nil {
|
||||||
|
InitTransactionManager("futriis.wal")
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := &Transaction{
|
||||||
|
ID: TransactionID(globalTxManager.nextTxID.Add(1) - 1),
|
||||||
|
StartTime: time.Now().UnixMilli(),
|
||||||
|
Operations: make([]Operation, 0),
|
||||||
|
}
|
||||||
|
tx.State.Store(int32(TransactionActive))
|
||||||
|
|
||||||
|
globalTxManager.activeTransactions.Store(tx.ID, tx)
|
||||||
|
currentTx.Store(tx)
|
||||||
|
|
||||||
|
// Записываем начало транзакции в WAL
|
||||||
|
txData, _ := json.Marshal(TransactionRecord{
|
||||||
|
ID: tx.ID,
|
||||||
|
State: TransactionActive,
|
||||||
|
Timestamp: tx.StartTime,
|
||||||
|
Operations: tx.Operations,
|
||||||
|
})
|
||||||
|
|
||||||
|
if globalTxManager.wal != nil {
|
||||||
|
globalTxManager.wal.Write(&WALRecord{
|
||||||
|
Type: 1,
|
||||||
|
Data: txData,
|
||||||
|
LSN: globalTxManager.wal.currentLSN,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if globalTxManager.logger != nil {
|
||||||
|
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started", tx.ID))
|
||||||
|
}
|
||||||
|
|
||||||
|
AuditLog("START", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
|
||||||
|
"start_time": tx.StartTime,
|
||||||
|
})
|
||||||
|
|
||||||
|
return tx
|
||||||
|
}
|
||||||
|
|
||||||
|
// BeginTransactionOnCollection начинает транзакцию для конкретной коллекции
|
||||||
|
func BeginTransactionOnCollection(coll *Collection) error {
|
||||||
|
if globalTxManager == nil {
|
||||||
|
if err := InitTransactionManager("futriis.wal"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := BeginTransaction()
|
||||||
|
if tx == nil {
|
||||||
|
return fmt.Errorf("failed to create transaction")
|
||||||
|
}
|
||||||
|
|
||||||
|
if globalTxManager.logger != nil {
|
||||||
|
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d started on collection %s.%s", tx.ID, coll.dbName, coll.name))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddToTransaction добавляет операцию в текущую транзакцию
|
||||||
|
func AddToTransaction(coll *Collection, opType string, docID string, newData map[string]interface{}, oldData map[string]interface{}, version uint64) error {
|
||||||
|
txVal := currentTx.Load()
|
||||||
|
if txVal == nil {
|
||||||
|
return fmt.Errorf("no active transaction")
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := txVal.(*Transaction)
|
||||||
|
if TransactionState(tx.State.Load()) != TransactionActive {
|
||||||
|
return fmt.Errorf("transaction is not active")
|
||||||
|
}
|
||||||
|
|
||||||
|
op := Operation{
|
||||||
|
Type: opType,
|
||||||
|
Database: coll.dbName,
|
||||||
|
Collection: coll.name,
|
||||||
|
DocumentID: docID,
|
||||||
|
Data: newData,
|
||||||
|
OldData: oldData,
|
||||||
|
Version: version,
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.mu.Lock()
|
||||||
|
tx.Operations = append(tx.Operations, op)
|
||||||
|
tx.mu.Unlock()
|
||||||
|
|
||||||
|
if globalTxManager.logger != nil {
|
||||||
|
globalTxManager.logger.Debug(fmt.Sprintf("Transaction %d: added %s operation on %s", tx.ID, opType, docID))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CommitCurrentTransaction коммитит текущую транзакцию
|
||||||
|
func CommitCurrentTransaction() error {
|
||||||
|
txVal := currentTx.Load()
|
||||||
|
if txVal == nil {
|
||||||
|
return fmt.Errorf("no active transaction")
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := txVal.(*Transaction)
|
||||||
|
if TransactionState(tx.State.Load()) != TransactionActive {
|
||||||
|
return fmt.Errorf("transaction is not active")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Применяем все операции атомарно
|
||||||
|
for _, op := range tx.Operations {
|
||||||
|
if err := applyOperation(op); err != nil {
|
||||||
|
// Откатываем при ошибке
|
||||||
|
AbortCurrentTransaction()
|
||||||
|
return fmt.Errorf("transaction commit failed at operation %s: %v", op.Type, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.State.Store(int32(TransactionCommitted))
|
||||||
|
|
||||||
|
// Записываем коммит в WAL
|
||||||
|
txData, _ := json.Marshal(TransactionRecord{
|
||||||
|
ID: tx.ID,
|
||||||
|
State: TransactionCommitted,
|
||||||
|
Timestamp: time.Now().UnixMilli(),
|
||||||
|
Operations: tx.Operations,
|
||||||
|
})
|
||||||
|
|
||||||
|
if globalTxManager.wal != nil {
|
||||||
|
globalTxManager.wal.Write(&WALRecord{
|
||||||
|
Type: 1,
|
||||||
|
Data: txData,
|
||||||
|
LSN: globalTxManager.wal.currentLSN,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if globalTxManager.logger != nil {
|
||||||
|
globalTxManager.logger.Info(fmt.Sprintf("Transaction %d committed with %d operations", tx.ID, len(tx.Operations)))
|
||||||
|
}
|
||||||
|
|
||||||
|
AuditLog("COMMIT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
|
||||||
|
"operations": len(tx.Operations),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Очищаем текущую транзакцию
|
||||||
|
currentTx.Store(nil)
|
||||||
|
globalTxManager.activeTransactions.Delete(tx.ID)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AbortCurrentTransaction откатывает текущую транзакцию
|
||||||
|
func AbortCurrentTransaction() error {
|
||||||
|
txVal := currentTx.Load()
|
||||||
|
if txVal == nil {
|
||||||
|
return fmt.Errorf("no active transaction")
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := txVal.(*Transaction)
|
||||||
|
tx.State.Store(int32(TransactionAborted))
|
||||||
|
|
||||||
|
// Записываем откат в WAL
|
||||||
|
txData, _ := json.Marshal(TransactionRecord{
|
||||||
|
ID: tx.ID,
|
||||||
|
State: TransactionAborted,
|
||||||
|
Timestamp: time.Now().UnixMilli(),
|
||||||
|
Operations: tx.Operations,
|
||||||
|
})
|
||||||
|
|
||||||
|
if globalTxManager.wal != nil {
|
||||||
|
globalTxManager.wal.Write(&WALRecord{
|
||||||
|
Type: 1,
|
||||||
|
Data: txData,
|
||||||
|
LSN: globalTxManager.wal.currentLSN,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if globalTxManager.logger != nil {
|
||||||
|
globalTxManager.logger.Info(fmt.Sprintf("Transaction %d aborted with %d operations", tx.ID, len(tx.Operations)))
|
||||||
|
}
|
||||||
|
|
||||||
|
AuditLog("ABORT", "TRANSACTION", fmt.Sprintf("%d", tx.ID), map[string]interface{}{
|
||||||
|
"operations": len(tx.Operations),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Очищаем текущую транзакцию
|
||||||
|
currentTx.Store(nil)
|
||||||
|
globalTxManager.activeTransactions.Delete(tx.ID)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasActiveTransaction проверяет наличие активной транзакции
|
||||||
|
func HasActiveTransaction() bool {
|
||||||
|
return currentTx.Load() != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetCurrentTransactionID возвращает ID текущей транзакции
|
||||||
|
func GetCurrentTransactionID() string {
|
||||||
|
txVal := currentTx.Load()
|
||||||
|
if txVal == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
tx := txVal.(*Transaction)
|
||||||
|
return fmt.Sprintf("%d", tx.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetActiveTransactions возвращает список активных транзакций для API
|
||||||
|
func GetActiveTransactions() []TransactionInfo {
|
||||||
|
if globalTxManager == nil {
|
||||||
|
return []TransactionInfo{}
|
||||||
|
}
|
||||||
|
|
||||||
|
transactions := make([]TransactionInfo, 0)
|
||||||
|
|
||||||
|
globalTxManager.activeTransactions.Range(func(key, value interface{}) bool {
|
||||||
|
tx := value.(*Transaction)
|
||||||
|
status := "active"
|
||||||
|
if TransactionState(tx.State.Load()) == TransactionCommitted {
|
||||||
|
status = "committed"
|
||||||
|
} else if TransactionState(tx.State.Load()) == TransactionAborted {
|
||||||
|
status = "aborted"
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.mu.RLock()
|
||||||
|
opCount := len(tx.Operations)
|
||||||
|
operations := make([]OperationInfo, 0, opCount)
|
||||||
|
for _, op := range tx.Operations {
|
||||||
|
operations = append(operations, OperationInfo{
|
||||||
|
Type: op.Type,
|
||||||
|
Database: op.Database,
|
||||||
|
Collection: op.Collection,
|
||||||
|
DocumentID: op.DocumentID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
tx.mu.RUnlock()
|
||||||
|
|
||||||
|
transactions = append(transactions, TransactionInfo{
|
||||||
|
ID: fmt.Sprintf("%d", tx.ID),
|
||||||
|
Status: status,
|
||||||
|
StartTime: tx.StartTime,
|
||||||
|
OperationCount: opCount,
|
||||||
|
Operations: operations,
|
||||||
|
})
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
return transactions
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTransactionByID возвращает транзакцию по ID
|
||||||
|
func GetTransactionByID(id string) (*Transaction, error) {
|
||||||
|
if globalTxManager == nil {
|
||||||
|
return nil, fmt.Errorf("transaction manager not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
var txID TransactionID
|
||||||
|
fmt.Sscanf(id, "%d", &txID)
|
||||||
|
|
||||||
|
if val, ok := globalTxManager.activeTransactions.Load(txID); ok {
|
||||||
|
return val.(*Transaction), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("transaction not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindInTransaction ищет документ в контексте транзакции
|
||||||
|
func FindInTransaction(coll *Collection, id string) (*Document, error) {
|
||||||
|
txVal := currentTx.Load()
|
||||||
|
if txVal == nil {
|
||||||
|
return coll.Find(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := txVal.(*Transaction)
|
||||||
|
|
||||||
|
tx.mu.RLock()
|
||||||
|
defer tx.mu.RUnlock()
|
||||||
|
|
||||||
|
// Ищем в операциях транзакции в обратном порядке
|
||||||
|
for i := len(tx.Operations) - 1; i >= 0; i-- {
|
||||||
|
op := tx.Operations[i]
|
||||||
|
if op.DocumentID == id {
|
||||||
|
if op.Type == "delete" {
|
||||||
|
return nil, fmt.Errorf("document deleted in transaction")
|
||||||
|
}
|
||||||
|
if op.Type == "insert" || op.Type == "update" {
|
||||||
|
doc := NewDocumentWithID(op.DocumentID)
|
||||||
|
for k, v := range op.Data {
|
||||||
|
doc.SetField(k, v)
|
||||||
|
}
|
||||||
|
doc.Version = op.Version
|
||||||
|
return doc, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return coll.Find(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// applyOperation применяет операцию к хранилищу
|
||||||
|
func applyOperation(op Operation) error {
|
||||||
|
db, err := GetGlobalStorage().GetDatabase(op.Database)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("database not found: %s", op.Database)
|
||||||
|
}
|
||||||
|
|
||||||
|
coll, err := db.GetCollection(op.Collection)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("collection not found: %s", op.Collection)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch op.Type {
|
||||||
|
case "insert":
|
||||||
|
doc := NewDocumentWithID(op.DocumentID)
|
||||||
|
for k, v := range op.Data {
|
||||||
|
doc.SetField(k, v)
|
||||||
|
}
|
||||||
|
doc.Version = op.Version
|
||||||
|
return coll.Insert(doc)
|
||||||
|
|
||||||
|
case "update":
|
||||||
|
return coll.Update(op.DocumentID, op.Data)
|
||||||
|
|
||||||
|
case "delete":
|
||||||
|
return coll.Delete(op.DocumentID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetGlobalStorage возвращает глобальное хранилище
|
||||||
|
var globalStorage *Storage
|
||||||
|
|
||||||
|
// SetGlobalStorage устанавливает глобальное хранилище
|
||||||
|
func SetGlobalStorage(s *Storage) {
|
||||||
|
globalStorage = s
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetGlobalStorage возвращает глобальное хранилище
|
||||||
|
func GetGlobalStorage() *Storage {
|
||||||
|
return globalStorage
|
||||||
|
}
|
||||||
|
|
||||||
|
// AuditLog записывает событие в аудит
|
||||||
|
func AuditLog(operation, dataType, name string, details map[string]interface{}) {
|
||||||
|
LogAudit(operation, dataType, name, details)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MVCCSnapshot создаёт снапшот текущего состояния для MVCC
|
||||||
|
func MVCCSnapshot() uint64 {
|
||||||
|
return uint64(time.Now().UnixNano())
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateDocumentVersion создаёт новую версию документа для MVCC
|
||||||
|
func CreateDocumentVersion(doc *Document, txID TransactionID) *DocumentVersion {
|
||||||
|
return &DocumentVersion{
|
||||||
|
Document: doc.Clone(),
|
||||||
|
Timestamp: time.Now().UnixMilli(),
|
||||||
|
TxID: txID,
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user