173 lines
3.4 KiB
Go
173 lines
3.4 KiB
Go
|
|
// /futriis/internal/replication/aof.go
|
||
|
|
// Пакет replication реализует механизм AOF (Append Only File) как в Redis)
|
||
|
|
|
||
|
|
package replication
|
||
|
|
|
||
|
|
import (
|
||
|
|
"bufio"
|
||
|
|
"encoding/json"
|
||
|
|
"os"
|
||
|
|
"sync"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"futriis/pkg/utils"
|
||
|
|
)
|
||
|
|
|
||
|
|
// AOFCommand представляет команду для AOF
|
||
|
|
type AOFCommand struct {
|
||
|
|
Timestamp int64 `json:"ts"`
|
||
|
|
Command string `json:"cmd"`
|
||
|
|
Args []interface{} `json:"args"`
|
||
|
|
}
|
||
|
|
|
||
|
|
// AOFManager управляет AOF файлом
|
||
|
|
type AOFManager struct {
|
||
|
|
file *os.File
|
||
|
|
writer *bufio.Writer
|
||
|
|
mu sync.Mutex
|
||
|
|
enabled bool
|
||
|
|
filePath string
|
||
|
|
stopChan chan struct{}
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewAOFManager создаёт новый менеджер AOF
|
||
|
|
func NewAOFManager(filePath string, enabled bool) (*AOFManager, error) {
|
||
|
|
aof := &AOFManager{
|
||
|
|
enabled: enabled,
|
||
|
|
filePath: filePath,
|
||
|
|
stopChan: make(chan struct{}),
|
||
|
|
}
|
||
|
|
|
||
|
|
if enabled {
|
||
|
|
if err := aof.openFile(); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
|
||
|
|
// Запускаем периодический sync
|
||
|
|
go aof.syncLoop()
|
||
|
|
}
|
||
|
|
|
||
|
|
return aof, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// openFile открывает AOF файл
|
||
|
|
func (aof *AOFManager) openFile() error {
|
||
|
|
// Создаем директорию если не существует
|
||
|
|
dir := aof.filePath[:len(aof.filePath)-len("/futriis.aof")]
|
||
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
file, err := os.OpenFile(aof.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
aof.file = file
|
||
|
|
aof.writer = bufio.NewWriter(file)
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// syncLoop периодически синхронизирует данные на диск
|
||
|
|
func (aof *AOFManager) syncLoop() {
|
||
|
|
ticker := time.NewTicker(1 * time.Second)
|
||
|
|
defer ticker.Stop()
|
||
|
|
|
||
|
|
for {
|
||
|
|
select {
|
||
|
|
case <-ticker.C:
|
||
|
|
aof.mu.Lock()
|
||
|
|
if aof.writer != nil {
|
||
|
|
aof.writer.Flush()
|
||
|
|
aof.file.Sync()
|
||
|
|
}
|
||
|
|
aof.mu.Unlock()
|
||
|
|
case <-aof.stopChan:
|
||
|
|
return
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Append добавляет команду в AOF
|
||
|
|
func (aof *AOFManager) Append(cmd string, args []interface{}) error {
|
||
|
|
if !aof.enabled || aof.writer == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
aof.mu.Lock()
|
||
|
|
defer aof.mu.Unlock()
|
||
|
|
|
||
|
|
command := AOFCommand{
|
||
|
|
Timestamp: time.Now().UnixNano(),
|
||
|
|
Command: cmd,
|
||
|
|
Args: args,
|
||
|
|
}
|
||
|
|
|
||
|
|
data, err := json.Marshal(command)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
if _, err := aof.writer.Write(data); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
if _, err := aof.writer.WriteString("\n"); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// Close закрывает AOF файл
|
||
|
|
func (aof *AOFManager) Close() error {
|
||
|
|
if !aof.enabled {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
close(aof.stopChan)
|
||
|
|
|
||
|
|
aof.mu.Lock()
|
||
|
|
defer aof.mu.Unlock()
|
||
|
|
|
||
|
|
if aof.writer != nil {
|
||
|
|
aof.writer.Flush()
|
||
|
|
}
|
||
|
|
|
||
|
|
if aof.file != nil {
|
||
|
|
return aof.file.Close()
|
||
|
|
}
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// Replay воспроизводит команды из AOF
|
||
|
|
func (aof *AOFManager) Replay(handler func(cmd string, args []interface{}) error) error {
|
||
|
|
if !aof.enabled {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
file, err := os.Open(aof.filePath)
|
||
|
|
if err != nil {
|
||
|
|
if os.IsNotExist(err) {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
defer file.Close()
|
||
|
|
|
||
|
|
scanner := bufio.NewScanner(file)
|
||
|
|
for scanner.Scan() {
|
||
|
|
var cmd AOFCommand
|
||
|
|
if err := json.Unmarshal(scanner.Bytes(), &cmd); err != nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
|
||
|
|
if err := handler(cmd.Command, cmd.Args); err != nil {
|
||
|
|
utils.PrintError("Ошибка воспроизведения команды: %v", err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return scanner.Err()
|
||
|
|
}
|