// /futriis/internal/replication/aof.go // Пакет replication реализует механизм AOF (Append Only File) как в Redis) package replication import ( "bufio" "encoding/json" "os" "sync" "time" "futriis/pkg/utils" ) // AOFCommand представляет команду для AOF type AOFCommand struct { Timestamp int64 `json:"ts"` Command string `json:"cmd"` Args []interface{} `json:"args"` } // AOFManager управляет AOF файлом type AOFManager struct { file *os.File writer *bufio.Writer mu sync.Mutex enabled bool filePath string stopChan chan struct{} } // NewAOFManager создаёт новый менеджер AOF func NewAOFManager(filePath string, enabled bool) (*AOFManager, error) { aof := &AOFManager{ enabled: enabled, filePath: filePath, stopChan: make(chan struct{}), } if enabled { if err := aof.openFile(); err != nil { return nil, err } // Запускаем периодический sync go aof.syncLoop() } return aof, nil } // openFile открывает AOF файл func (aof *AOFManager) openFile() error { // Создаем директорию если не существует dir := aof.filePath[:len(aof.filePath)-len("/futriis.aof")] if err := os.MkdirAll(dir, 0755); err != nil { return err } file, err := os.OpenFile(aof.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } aof.file = file aof.writer = bufio.NewWriter(file) return nil } // syncLoop периодически синхронизирует данные на диск func (aof *AOFManager) syncLoop() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: aof.mu.Lock() if aof.writer != nil { aof.writer.Flush() aof.file.Sync() } aof.mu.Unlock() case <-aof.stopChan: return } } } // Append добавляет команду в AOF func (aof *AOFManager) Append(cmd string, args []interface{}) error { if !aof.enabled || aof.writer == nil { return nil } aof.mu.Lock() defer aof.mu.Unlock() command := AOFCommand{ Timestamp: time.Now().UnixNano(), Command: cmd, Args: args, } data, err := json.Marshal(command) if err != nil { return err } if _, err := aof.writer.Write(data); err != nil { return err } if _, err := aof.writer.WriteString("\n"); err != nil { return err } return nil } // Close закрывает AOF файл func (aof *AOFManager) Close() error { if !aof.enabled { return nil } close(aof.stopChan) aof.mu.Lock() defer aof.mu.Unlock() if aof.writer != nil { aof.writer.Flush() } if aof.file != nil { return aof.file.Close() } return nil } // Replay воспроизводит команды из AOF func (aof *AOFManager) Replay(handler func(cmd string, args []interface{}) error) error { if !aof.enabled { return nil } file, err := os.Open(aof.filePath) if err != nil { if os.IsNotExist(err) { return nil } return err } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { var cmd AOFCommand if err := json.Unmarshal(scanner.Bytes(), &cmd); err != nil { continue } if err := handler(cmd.Command, cmd.Args); err != nil { utils.PrintError("Ошибка воспроизведения команды: %v", err) } } return scanner.Err() }