245 lines
5.9 KiB
Go
245 lines
5.9 KiB
Go
|
|
// /futriis/internal/replication/aof.go
|
|||
|
|
// Пакет replication реализует Append-Only File данных
|
|||
|
|
// AOF обеспечивает постоянную запись команд в лог-файл для восстановления состояния после перезапуска
|
|||
|
|
// Поддерживает запись, чтение, синхронизацию и перезапись (rewrite) лог-файла
|
|||
|
|
|
|||
|
|
package replication
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"bufio"
|
|||
|
|
"encoding/json"
|
|||
|
|
"fmt"
|
|||
|
|
"os"
|
|||
|
|
"sync"
|
|||
|
|
"time"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// AOFCommand представляет команду для записи в AOF
|
|||
|
|
type AOFCommand struct {
|
|||
|
|
Name string `json:"name"`
|
|||
|
|
Args []interface{} `json:"args"`
|
|||
|
|
Time int64 `json:"time"`
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// AOFManager управляет Append-Only File
|
|||
|
|
type AOFManager struct {
|
|||
|
|
file *os.File
|
|||
|
|
writer *bufio.Writer
|
|||
|
|
enabled bool
|
|||
|
|
filePath string
|
|||
|
|
mu sync.Mutex
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NewAOFManager создаёт новый менеджер AOF
|
|||
|
|
func NewAOFManager(filePath string, enabled bool) (*AOFManager, error) {
|
|||
|
|
if !enabled {
|
|||
|
|
return &AOFManager{
|
|||
|
|
enabled: false,
|
|||
|
|
}, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Открываем файл для записи и чтения
|
|||
|
|
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
|
|||
|
|
if err != nil {
|
|||
|
|
return nil, fmt.Errorf("не удалось открыть AOF файл: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return &AOFManager{
|
|||
|
|
file: file,
|
|||
|
|
writer: bufio.NewWriter(file),
|
|||
|
|
enabled: true,
|
|||
|
|
filePath: filePath,
|
|||
|
|
}, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Append добавляет команду в AOF
|
|||
|
|
func (aof *AOFManager) Append(name string, args []interface{}) error {
|
|||
|
|
if !aof.enabled {
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
aof.mu.Lock()
|
|||
|
|
defer aof.mu.Unlock()
|
|||
|
|
|
|||
|
|
cmd := AOFCommand{
|
|||
|
|
Name: name,
|
|||
|
|
Args: args,
|
|||
|
|
Time: time.Now().Unix(),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
data, err := json.Marshal(cmd)
|
|||
|
|
if err != nil {
|
|||
|
|
return fmt.Errorf("ошибка сериализации команды: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Записываем команду в файл
|
|||
|
|
if _, err := aof.writer.Write(data); err != nil {
|
|||
|
|
return fmt.Errorf("ошибка записи в AOF: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if err := aof.writer.WriteByte('\n'); err != nil {
|
|||
|
|
return fmt.Errorf("ошибка записи разделителя в AOF: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Сбрасываем буфер на диск
|
|||
|
|
if err := aof.writer.Flush(); err != nil {
|
|||
|
|
return fmt.Errorf("ошибка сброса AOF на диск: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ReadAll читает все команды из AOF файла
|
|||
|
|
func (aof *AOFManager) ReadAll() ([]AOFCommand, error) {
|
|||
|
|
if !aof.enabled {
|
|||
|
|
return nil, fmt.Errorf("AOF отключён")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
aof.mu.Lock()
|
|||
|
|
defer aof.mu.Unlock()
|
|||
|
|
|
|||
|
|
// Сбрасываем буфер на диск перед чтением
|
|||
|
|
if err := aof.writer.Flush(); err != nil {
|
|||
|
|
return nil, fmt.Errorf("ошибка сброса AOF на диск: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Перемещаем указатель в начало файла
|
|||
|
|
if _, err := aof.file.Seek(0, 0); err != nil {
|
|||
|
|
return nil, fmt.Errorf("ошибка перемещения в начало AOF файла: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
var commands []AOFCommand
|
|||
|
|
scanner := bufio.NewScanner(aof.file)
|
|||
|
|
|
|||
|
|
for scanner.Scan() {
|
|||
|
|
line := scanner.Bytes()
|
|||
|
|
if len(line) == 0 {
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
var cmd AOFCommand
|
|||
|
|
if err := json.Unmarshal(line, &cmd); err != nil {
|
|||
|
|
// Пропускаем повреждённые записи
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
commands = append(commands, cmd)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if err := scanner.Err(); err != nil {
|
|||
|
|
return nil, fmt.Errorf("ошибка чтения AOF файла: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Возвращаем указатель в конец файла для продолжения записи
|
|||
|
|
if _, err := aof.file.Seek(0, 2); err != nil {
|
|||
|
|
return nil, fmt.Errorf("ошибка перемещения в конец AOF файла: %v", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return commands, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetFileInfo возвращает информацию о файле
|
|||
|
|
func (aof *AOFManager) GetFileInfo() (string, error) {
|
|||
|
|
if !aof.enabled || aof.file == nil {
|
|||
|
|
return "AOF отключён", nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
stat, err := aof.file.Stat()
|
|||
|
|
if err != nil {
|
|||
|
|
return "", err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return fmt.Sprintf("%d байт", stat.Size()), nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Close закрывает AOF файл
|
|||
|
|
func (aof *AOFManager) Close() error {
|
|||
|
|
if !aof.enabled || aof.file == nil {
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
aof.mu.Lock()
|
|||
|
|
defer aof.mu.Unlock()
|
|||
|
|
|
|||
|
|
if err := aof.writer.Flush(); err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return aof.file.Close()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Sync принудительно синхронизирует AOF с диском
|
|||
|
|
func (aof *AOFManager) Sync() error {
|
|||
|
|
if !aof.enabled {
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
aof.mu.Lock()
|
|||
|
|
defer aof.mu.Unlock()
|
|||
|
|
|
|||
|
|
if err := aof.writer.Flush(); err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return aof.file.Sync()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Rewrite выполняет перезапись AOF файла (упрощённая версия)
|
|||
|
|
func (aof *AOFManager) Rewrite(commands []AOFCommand) error {
|
|||
|
|
if !aof.enabled {
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
aof.mu.Lock()
|
|||
|
|
defer aof.mu.Unlock()
|
|||
|
|
|
|||
|
|
// Создаём временный файл
|
|||
|
|
tmpFile := aof.filePath + ".tmp"
|
|||
|
|
file, err := os.Create(tmpFile)
|
|||
|
|
if err != nil {
|
|||
|
|
return fmt.Errorf("не удалось создать временный AOF файл: %v", err)
|
|||
|
|
}
|
|||
|
|
defer file.Close()
|
|||
|
|
|
|||
|
|
writer := bufio.NewWriter(file)
|
|||
|
|
|
|||
|
|
// Записываем все команды
|
|||
|
|
for _, cmd := range commands {
|
|||
|
|
data, err := json.Marshal(cmd)
|
|||
|
|
if err != nil {
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if _, err := writer.Write(data); err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if err := writer.WriteByte('\n'); err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if err := writer.Flush(); err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Закрываем текущий файл
|
|||
|
|
aof.file.Close()
|
|||
|
|
|
|||
|
|
// Заменяем старый файл новым
|
|||
|
|
if err := os.Rename(tmpFile, aof.filePath); err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Открываем новый файл
|
|||
|
|
file, err = os.OpenFile(aof.filePath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
aof.file = file
|
|||
|
|
aof.writer = bufio.NewWriter(file)
|
|||
|
|
|
|||
|
|
return nil
|
|||
|
|
}
|