// /futriis/internal/replication/aof.go // Пакет replication реализует Append-Only File данных // AOF обеспечивает постоянную запись команд в лог-файл для восстановления состояния после перезапуска // Поддерживает запись, чтение, синхронизацию и перезапись (rewrite) лог-файла package replication import ( "bufio" "encoding/json" "fmt" "os" "sync" "time" ) // AOFCommand представляет команду для записи в AOF type AOFCommand struct { Name string `json:"name"` Args []interface{} `json:"args"` Time int64 `json:"time"` } // AOFManager управляет Append-Only File type AOFManager struct { file *os.File writer *bufio.Writer enabled bool filePath string mu sync.Mutex } // NewAOFManager создаёт новый менеджер AOF func NewAOFManager(filePath string, enabled bool) (*AOFManager, error) { if !enabled { return &AOFManager{ enabled: false, }, nil } // Открываем файл для записи и чтения file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644) if err != nil { return nil, fmt.Errorf("не удалось открыть AOF файл: %v", err) } return &AOFManager{ file: file, writer: bufio.NewWriter(file), enabled: true, filePath: filePath, }, nil } // Append добавляет команду в AOF func (aof *AOFManager) Append(name string, args []interface{}) error { if !aof.enabled { return nil } aof.mu.Lock() defer aof.mu.Unlock() cmd := AOFCommand{ Name: name, Args: args, Time: time.Now().Unix(), } data, err := json.Marshal(cmd) if err != nil { return fmt.Errorf("ошибка сериализации команды: %v", err) } // Записываем команду в файл if _, err := aof.writer.Write(data); err != nil { return fmt.Errorf("ошибка записи в AOF: %v", err) } if err := aof.writer.WriteByte('\n'); err != nil { return fmt.Errorf("ошибка записи разделителя в AOF: %v", err) } // Сбрасываем буфер на диск if err := aof.writer.Flush(); err != nil { return fmt.Errorf("ошибка сброса AOF на диск: %v", err) } return nil } // ReadAll читает все команды из AOF файла func (aof *AOFManager) ReadAll() ([]AOFCommand, error) { if !aof.enabled { return nil, fmt.Errorf("AOF отключён") } aof.mu.Lock() defer aof.mu.Unlock() // Сбрасываем буфер на диск перед чтением if err := aof.writer.Flush(); err != nil { return nil, fmt.Errorf("ошибка сброса AOF на диск: %v", err) } // Перемещаем указатель в начало файла if _, err := aof.file.Seek(0, 0); err != nil { return nil, fmt.Errorf("ошибка перемещения в начало AOF файла: %v", err) } var commands []AOFCommand scanner := bufio.NewScanner(aof.file) for scanner.Scan() { line := scanner.Bytes() if len(line) == 0 { continue } var cmd AOFCommand if err := json.Unmarshal(line, &cmd); err != nil { // Пропускаем повреждённые записи continue } commands = append(commands, cmd) } if err := scanner.Err(); err != nil { return nil, fmt.Errorf("ошибка чтения AOF файла: %v", err) } // Возвращаем указатель в конец файла для продолжения записи if _, err := aof.file.Seek(0, 2); err != nil { return nil, fmt.Errorf("ошибка перемещения в конец AOF файла: %v", err) } return commands, nil } // GetFileInfo возвращает информацию о файле func (aof *AOFManager) GetFileInfo() (string, error) { if !aof.enabled || aof.file == nil { return "AOF отключён", nil } stat, err := aof.file.Stat() if err != nil { return "", err } return fmt.Sprintf("%d байт", stat.Size()), nil } // Close закрывает AOF файл func (aof *AOFManager) Close() error { if !aof.enabled || aof.file == nil { return nil } aof.mu.Lock() defer aof.mu.Unlock() if err := aof.writer.Flush(); err != nil { return err } return aof.file.Close() } // Sync принудительно синхронизирует AOF с диском func (aof *AOFManager) Sync() error { if !aof.enabled { return nil } aof.mu.Lock() defer aof.mu.Unlock() if err := aof.writer.Flush(); err != nil { return err } return aof.file.Sync() } // Rewrite выполняет перезапись AOF файла (упрощённая версия) func (aof *AOFManager) Rewrite(commands []AOFCommand) error { if !aof.enabled { return nil } aof.mu.Lock() defer aof.mu.Unlock() // Создаём временный файл tmpFile := aof.filePath + ".tmp" file, err := os.Create(tmpFile) if err != nil { return fmt.Errorf("не удалось создать временный AOF файл: %v", err) } defer file.Close() writer := bufio.NewWriter(file) // Записываем все команды for _, cmd := range commands { data, err := json.Marshal(cmd) if err != nil { continue } if _, err := writer.Write(data); err != nil { return err } if err := writer.WriteByte('\n'); err != nil { return err } } if err := writer.Flush(); err != nil { return err } // Закрываем текущий файл aof.file.Close() // Заменяем старый файл новым if err := os.Rename(tmpFile, aof.filePath); err != nil { return err } // Открываем новый файл file, err = os.OpenFile(aof.filePath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644) if err != nil { return err } aof.file = file aof.writer = bufio.NewWriter(file) return nil }