2026-02-27 22:04:04 +03:00

245 lines
5.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// /futriis/internal/replication/aof.go
// Пакет replication реализует Append-Only File данных
// AOF обеспечивает постоянную запись команд в лог-файл для восстановления состояния после перезапуска
// Поддерживает запись, чтение, синхронизацию и перезапись (rewrite) лог-файла
package replication
import (
"bufio"
"encoding/json"
"fmt"
"os"
"sync"
"time"
)
// AOFCommand представляет команду для записи в AOF
type AOFCommand struct {
Name string `json:"name"`
Args []interface{} `json:"args"`
Time int64 `json:"time"`
}
// AOFManager управляет Append-Only File
type AOFManager struct {
file *os.File
writer *bufio.Writer
enabled bool
filePath string
mu sync.Mutex
}
// NewAOFManager создаёт новый менеджер AOF
func NewAOFManager(filePath string, enabled bool) (*AOFManager, error) {
if !enabled {
return &AOFManager{
enabled: false,
}, nil
}
// Открываем файл для записи и чтения
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, fmt.Errorf("не удалось открыть AOF файл: %v", err)
}
return &AOFManager{
file: file,
writer: bufio.NewWriter(file),
enabled: true,
filePath: filePath,
}, nil
}
// Append добавляет команду в AOF
func (aof *AOFManager) Append(name string, args []interface{}) error {
if !aof.enabled {
return nil
}
aof.mu.Lock()
defer aof.mu.Unlock()
cmd := AOFCommand{
Name: name,
Args: args,
Time: time.Now().Unix(),
}
data, err := json.Marshal(cmd)
if err != nil {
return fmt.Errorf("ошибка сериализации команды: %v", err)
}
// Записываем команду в файл
if _, err := aof.writer.Write(data); err != nil {
return fmt.Errorf("ошибка записи в AOF: %v", err)
}
if err := aof.writer.WriteByte('\n'); err != nil {
return fmt.Errorf("ошибка записи разделителя в AOF: %v", err)
}
// Сбрасываем буфер на диск
if err := aof.writer.Flush(); err != nil {
return fmt.Errorf("ошибка сброса AOF на диск: %v", err)
}
return nil
}
// ReadAll читает все команды из AOF файла
func (aof *AOFManager) ReadAll() ([]AOFCommand, error) {
if !aof.enabled {
return nil, fmt.Errorf("AOF отключён")
}
aof.mu.Lock()
defer aof.mu.Unlock()
// Сбрасываем буфер на диск перед чтением
if err := aof.writer.Flush(); err != nil {
return nil, fmt.Errorf("ошибка сброса AOF на диск: %v", err)
}
// Перемещаем указатель в начало файла
if _, err := aof.file.Seek(0, 0); err != nil {
return nil, fmt.Errorf("ошибка перемещения в начало AOF файла: %v", err)
}
var commands []AOFCommand
scanner := bufio.NewScanner(aof.file)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
var cmd AOFCommand
if err := json.Unmarshal(line, &cmd); err != nil {
// Пропускаем повреждённые записи
continue
}
commands = append(commands, cmd)
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("ошибка чтения AOF файла: %v", err)
}
// Возвращаем указатель в конец файла для продолжения записи
if _, err := aof.file.Seek(0, 2); err != nil {
return nil, fmt.Errorf("ошибка перемещения в конец AOF файла: %v", err)
}
return commands, nil
}
// GetFileInfo возвращает информацию о файле
func (aof *AOFManager) GetFileInfo() (string, error) {
if !aof.enabled || aof.file == nil {
return "AOF отключён", nil
}
stat, err := aof.file.Stat()
if err != nil {
return "", err
}
return fmt.Sprintf("%d байт", stat.Size()), nil
}
// Close закрывает AOF файл
func (aof *AOFManager) Close() error {
if !aof.enabled || aof.file == nil {
return nil
}
aof.mu.Lock()
defer aof.mu.Unlock()
if err := aof.writer.Flush(); err != nil {
return err
}
return aof.file.Close()
}
// Sync принудительно синхронизирует AOF с диском
func (aof *AOFManager) Sync() error {
if !aof.enabled {
return nil
}
aof.mu.Lock()
defer aof.mu.Unlock()
if err := aof.writer.Flush(); err != nil {
return err
}
return aof.file.Sync()
}
// Rewrite выполняет перезапись AOF файла (упрощённая версия)
func (aof *AOFManager) Rewrite(commands []AOFCommand) error {
if !aof.enabled {
return nil
}
aof.mu.Lock()
defer aof.mu.Unlock()
// Создаём временный файл
tmpFile := aof.filePath + ".tmp"
file, err := os.Create(tmpFile)
if err != nil {
return fmt.Errorf("не удалось создать временный AOF файл: %v", err)
}
defer file.Close()
writer := bufio.NewWriter(file)
// Записываем все команды
for _, cmd := range commands {
data, err := json.Marshal(cmd)
if err != nil {
continue
}
if _, err := writer.Write(data); err != nil {
return err
}
if err := writer.WriteByte('\n'); err != nil {
return err
}
}
if err := writer.Flush(); err != nil {
return err
}
// Закрываем текущий файл
aof.file.Close()
// Заменяем старый файл новым
if err := os.Rename(tmpFile, aof.filePath); err != nil {
return err
}
// Открываем новый файл
file, err = os.OpenFile(aof.filePath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return err
}
aof.file = file
aof.writer = bufio.NewWriter(file)
return nil
}