245 lines
5.9 KiB
Go
Raw Normal View History

2026-02-27 22:04:04 +03:00
// /futriis/internal/replication/aof.go
// Пакет replication реализует Append-Only File данных
// AOF обеспечивает постоянную запись команд в лог-файл для восстановления состояния после перезапуска
// Поддерживает запись, чтение, синхронизацию и перезапись (rewrite) лог-файла
package replication
import (
"bufio"
"encoding/json"
"fmt"
"os"
"sync"
"time"
)
// AOFCommand представляет команду для записи в AOF
type AOFCommand struct {
Name string `json:"name"`
Args []interface{} `json:"args"`
Time int64 `json:"time"`
}
// AOFManager управляет Append-Only File
type AOFManager struct {
file *os.File
writer *bufio.Writer
enabled bool
filePath string
mu sync.Mutex
}
// NewAOFManager создаёт новый менеджер AOF
func NewAOFManager(filePath string, enabled bool) (*AOFManager, error) {
if !enabled {
return &AOFManager{
enabled: false,
}, nil
}
// Открываем файл для записи и чтения
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, fmt.Errorf("не удалось открыть AOF файл: %v", err)
}
return &AOFManager{
file: file,
writer: bufio.NewWriter(file),
enabled: true,
filePath: filePath,
}, nil
}
// Append добавляет команду в AOF
func (aof *AOFManager) Append(name string, args []interface{}) error {
if !aof.enabled {
return nil
}
aof.mu.Lock()
defer aof.mu.Unlock()
cmd := AOFCommand{
Name: name,
Args: args,
Time: time.Now().Unix(),
}
data, err := json.Marshal(cmd)
if err != nil {
return fmt.Errorf("ошибка сериализации команды: %v", err)
}
// Записываем команду в файл
if _, err := aof.writer.Write(data); err != nil {
return fmt.Errorf("ошибка записи в AOF: %v", err)
}
if err := aof.writer.WriteByte('\n'); err != nil {
return fmt.Errorf("ошибка записи разделителя в AOF: %v", err)
}
// Сбрасываем буфер на диск
if err := aof.writer.Flush(); err != nil {
return fmt.Errorf("ошибка сброса AOF на диск: %v", err)
}
return nil
}
// ReadAll читает все команды из AOF файла
func (aof *AOFManager) ReadAll() ([]AOFCommand, error) {
if !aof.enabled {
return nil, fmt.Errorf("AOF отключён")
}
aof.mu.Lock()
defer aof.mu.Unlock()
// Сбрасываем буфер на диск перед чтением
if err := aof.writer.Flush(); err != nil {
return nil, fmt.Errorf("ошибка сброса AOF на диск: %v", err)
}
// Перемещаем указатель в начало файла
if _, err := aof.file.Seek(0, 0); err != nil {
return nil, fmt.Errorf("ошибка перемещения в начало AOF файла: %v", err)
}
var commands []AOFCommand
scanner := bufio.NewScanner(aof.file)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
var cmd AOFCommand
if err := json.Unmarshal(line, &cmd); err != nil {
// Пропускаем повреждённые записи
continue
}
commands = append(commands, cmd)
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("ошибка чтения AOF файла: %v", err)
}
// Возвращаем указатель в конец файла для продолжения записи
if _, err := aof.file.Seek(0, 2); err != nil {
return nil, fmt.Errorf("ошибка перемещения в конец AOF файла: %v", err)
}
return commands, nil
}
// GetFileInfo возвращает информацию о файле
func (aof *AOFManager) GetFileInfo() (string, error) {
if !aof.enabled || aof.file == nil {
return "AOF отключён", nil
}
stat, err := aof.file.Stat()
if err != nil {
return "", err
}
return fmt.Sprintf("%d байт", stat.Size()), nil
}
// Close закрывает AOF файл
func (aof *AOFManager) Close() error {
if !aof.enabled || aof.file == nil {
return nil
}
aof.mu.Lock()
defer aof.mu.Unlock()
if err := aof.writer.Flush(); err != nil {
return err
}
return aof.file.Close()
}
// Sync принудительно синхронизирует AOF с диском
func (aof *AOFManager) Sync() error {
if !aof.enabled {
return nil
}
aof.mu.Lock()
defer aof.mu.Unlock()
if err := aof.writer.Flush(); err != nil {
return err
}
return aof.file.Sync()
}
// Rewrite выполняет перезапись AOF файла (упрощённая версия)
func (aof *AOFManager) Rewrite(commands []AOFCommand) error {
if !aof.enabled {
return nil
}
aof.mu.Lock()
defer aof.mu.Unlock()
// Создаём временный файл
tmpFile := aof.filePath + ".tmp"
file, err := os.Create(tmpFile)
if err != nil {
return fmt.Errorf("не удалось создать временный AOF файл: %v", err)
}
defer file.Close()
writer := bufio.NewWriter(file)
// Записываем все команды
for _, cmd := range commands {
data, err := json.Marshal(cmd)
if err != nil {
continue
}
if _, err := writer.Write(data); err != nil {
return err
}
if err := writer.WriteByte('\n'); err != nil {
return err
}
}
if err := writer.Flush(); err != nil {
return err
}
// Закрываем текущий файл
aof.file.Close()
// Заменяем старый файл новым
if err := os.Rename(tmpFile, aof.filePath); err != nil {
return err
}
// Открываем новый файл
file, err = os.OpenFile(aof.filePath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return err
}
aof.file = file
aof.writer = bufio.NewWriter(file)
return nil
}