173 lines
3.4 KiB
Go
Raw Permalink Normal View History

2026-02-23 22:48:31 +03:00
// /futriis/internal/replication/aof.go
// Пакет replication реализует механизм AOF (Append Only File) как в Redis)
package replication
import (
"bufio"
"encoding/json"
"os"
"sync"
"time"
"futriis/pkg/utils"
)
// AOFCommand представляет команду для AOF
type AOFCommand struct {
Timestamp int64 `json:"ts"`
Command string `json:"cmd"`
Args []interface{} `json:"args"`
}
// AOFManager управляет AOF файлом
type AOFManager struct {
file *os.File
writer *bufio.Writer
mu sync.Mutex
enabled bool
filePath string
stopChan chan struct{}
}
// NewAOFManager создаёт новый менеджер AOF
func NewAOFManager(filePath string, enabled bool) (*AOFManager, error) {
aof := &AOFManager{
enabled: enabled,
filePath: filePath,
stopChan: make(chan struct{}),
}
if enabled {
if err := aof.openFile(); err != nil {
return nil, err
}
// Запускаем периодический sync
go aof.syncLoop()
}
return aof, nil
}
// openFile открывает AOF файл
func (aof *AOFManager) openFile() error {
// Создаем директорию если не существует
dir := aof.filePath[:len(aof.filePath)-len("/futriis.aof")]
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
file, err := os.OpenFile(aof.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
aof.file = file
aof.writer = bufio.NewWriter(file)
return nil
}
// syncLoop периодически синхронизирует данные на диск
func (aof *AOFManager) syncLoop() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
aof.mu.Lock()
if aof.writer != nil {
aof.writer.Flush()
aof.file.Sync()
}
aof.mu.Unlock()
case <-aof.stopChan:
return
}
}
}
// Append добавляет команду в AOF
func (aof *AOFManager) Append(cmd string, args []interface{}) error {
if !aof.enabled || aof.writer == nil {
return nil
}
aof.mu.Lock()
defer aof.mu.Unlock()
command := AOFCommand{
Timestamp: time.Now().UnixNano(),
Command: cmd,
Args: args,
}
data, err := json.Marshal(command)
if err != nil {
return err
}
if _, err := aof.writer.Write(data); err != nil {
return err
}
if _, err := aof.writer.WriteString("\n"); err != nil {
return err
}
return nil
}
// Close закрывает AOF файл
func (aof *AOFManager) Close() error {
if !aof.enabled {
return nil
}
close(aof.stopChan)
aof.mu.Lock()
defer aof.mu.Unlock()
if aof.writer != nil {
aof.writer.Flush()
}
if aof.file != nil {
return aof.file.Close()
}
return nil
}
// Replay воспроизводит команды из AOF
func (aof *AOFManager) Replay(handler func(cmd string, args []interface{}) error) error {
if !aof.enabled {
return nil
}
file, err := os.Open(aof.filePath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
var cmd AOFCommand
if err := json.Unmarshal(scanner.Bytes(), &cmd); err != nil {
continue
}
if err := handler(cmd.Command, cmd.Args); err != nil {
utils.PrintError("Ошибка воспроизведения команды: %v", err)
}
}
return scanner.Err()
}