
The core package uses global variables that keep from having more than one Tile38 instance runnning in the same process. Move the core variables in the server.Options type which are uniquely stated per Server instance. The build variables are still present in the core package.
243 lines
5.1 KiB
Go
243 lines
5.1 KiB
Go
package server
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/tidwall/resp"
|
|
"github.com/tidwall/tile38/internal/log"
|
|
)
|
|
|
|
// checksum performs a simple md5 checksum on the aof file
|
|
func (s *Server) checksum(pos, size int64) (sum string, err error) {
|
|
if pos+size > int64(s.aofsz) {
|
|
return "", io.EOF
|
|
}
|
|
var f *os.File
|
|
f, err = os.Open(s.aof.Name())
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer f.Close()
|
|
sumr := md5.New()
|
|
err = func() error {
|
|
if size == 0 {
|
|
n, err := f.Seek(int64(s.aofsz), 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if pos >= n {
|
|
return io.EOF
|
|
}
|
|
return nil
|
|
}
|
|
_, err = f.Seek(pos, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = io.CopyN(sumr, f, size)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
if err == io.ErrUnexpectedEOF {
|
|
err = io.EOF
|
|
}
|
|
return "", err
|
|
}
|
|
return fmt.Sprintf("%x", sumr.Sum(nil)), nil
|
|
}
|
|
|
|
func connAOFMD5(conn *RESPConn, pos, size int64) (sum string, err error) {
|
|
v, err := conn.Do("aofmd5", pos, size)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if v.Error() != nil {
|
|
errmsg := v.Error().Error()
|
|
if errmsg == "ERR EOF" || errmsg == "EOF" {
|
|
return "", io.EOF
|
|
}
|
|
return "", v.Error()
|
|
}
|
|
sum = v.String()
|
|
if len(sum) != 32 {
|
|
return "", errors.New("checksum not ok")
|
|
}
|
|
return sum, nil
|
|
}
|
|
|
|
func (s *Server) matchChecksums(conn *RESPConn, pos, size int64) (match bool, err error) {
|
|
sum, err := s.checksum(pos, size)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
csum, err := connAOFMD5(conn, pos, size)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
return csum == sum, nil
|
|
}
|
|
|
|
// getEndOfLastValuePositionInFile is a very slow operation because it reads the file
|
|
// backwards on byte at a time. Eek. It seek+read, seek+read, etc.
|
|
func getEndOfLastValuePositionInFile(fname string, startPos int64) (int64, error) {
|
|
pos := startPos
|
|
f, err := os.Open(fname)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer f.Close()
|
|
readByte := func() (byte, error) {
|
|
if pos <= 0 {
|
|
return 0, io.EOF
|
|
}
|
|
pos--
|
|
if _, err := f.Seek(pos, 0); err != nil {
|
|
return 0, err
|
|
}
|
|
b := make([]byte, 1)
|
|
if n, err := f.Read(b); err != nil {
|
|
return 0, err
|
|
} else if n != 1 {
|
|
return 0, errors.New("invalid read")
|
|
}
|
|
return b[0], nil
|
|
}
|
|
for {
|
|
c, err := readByte()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if c == '*' {
|
|
if _, err := f.Seek(pos, 0); err != nil {
|
|
return 0, err
|
|
}
|
|
rd := resp.NewReader(f)
|
|
_, telnet, n, err := rd.ReadMultiBulk()
|
|
if err != nil || telnet {
|
|
continue // keep reading backwards
|
|
}
|
|
return pos + int64(n), nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// followCheckSome is not a full checksum. It just "checks some" data.
|
|
// We will do some various checksums on the leader until we find the correct position to start at.
|
|
func (s *Server) followCheckSome(addr string, followc int, auth string,
|
|
) (pos int64, err error) {
|
|
if s.opts.ShowDebugMessages {
|
|
log.Debug("follow:", addr, ":check some")
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.followc.get() != followc {
|
|
return 0, errNoLongerFollowing
|
|
}
|
|
if s.aofsz < checksumsz {
|
|
return 0, nil
|
|
}
|
|
|
|
conn, err := DialTimeout(addr, time.Second*2)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer conn.Close()
|
|
|
|
if auth != "" {
|
|
if err := s.followDoLeaderAuth(conn, auth); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
min := int64(0)
|
|
max := int64(s.aofsz) - checksumsz
|
|
limit := int64(s.aofsz)
|
|
match, err := s.matchChecksums(conn, min, checksumsz)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if match {
|
|
min += checksumsz // bump up the min
|
|
for {
|
|
if max < min || max+checksumsz > limit {
|
|
pos = min
|
|
break
|
|
} else {
|
|
match, err = s.matchChecksums(conn, max, checksumsz)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if match {
|
|
min = max + checksumsz
|
|
} else {
|
|
limit = max
|
|
}
|
|
max = (limit-min)/2 - checksumsz/2 + min // multiply
|
|
}
|
|
}
|
|
}
|
|
fullpos := pos
|
|
fname := s.aof.Name()
|
|
if pos == 0 {
|
|
s.aof.Close()
|
|
s.aof, err = os.Create(fname)
|
|
if err != nil {
|
|
log.Fatalf("could not recreate aof, possible data loss. %s", err.Error())
|
|
return 0, err
|
|
}
|
|
return 0, nil
|
|
}
|
|
|
|
// we want to truncate at a command location
|
|
// search for nearest command
|
|
pos, err = getEndOfLastValuePositionInFile(s.aof.Name(), fullpos)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if pos == fullpos {
|
|
if s.opts.ShowDebugMessages {
|
|
log.Debug("follow: aof fully intact")
|
|
}
|
|
return pos, nil
|
|
}
|
|
log.Warnf("truncating aof to %d", pos)
|
|
// any errror below are fatal.
|
|
s.aof.Close()
|
|
if err := os.Truncate(fname, pos); err != nil {
|
|
log.Fatalf("could not truncate aof, possible data loss. %s", err.Error())
|
|
return 0, err
|
|
}
|
|
s.aof, err = os.OpenFile(fname, os.O_CREATE|os.O_RDWR, 0600)
|
|
if err != nil {
|
|
log.Fatalf("could not create aof, possible data loss. %s", err.Error())
|
|
return 0, err
|
|
}
|
|
// reset the entire system.
|
|
log.Infof("reloading aof commands")
|
|
s.reset()
|
|
if err := s.loadAOF(); err != nil {
|
|
log.Fatalf("could not reload aof, possible data loss. %s", err.Error())
|
|
return 0, err
|
|
}
|
|
if int64(s.aofsz) != pos {
|
|
log.Fatalf("aof size mismatch during reload, possible data loss.")
|
|
return 0, errors.New("?")
|
|
}
|
|
return pos, nil
|
|
}
|