From 2d6a6ff0a0c762ccd7e6bda66446d96a6b5bfac8 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Thu, 31 Mar 2016 17:58:02 -0700 Subject: [PATCH] follow replication --- controller/aof.go | 30 +++++++++++++++--------------- controller/follow.go | 1 + 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/controller/aof.go b/controller/aof.go index 98b74041..e999d03d 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -90,14 +90,16 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { if !d.updated { return nil // just ignore writes if the command did not update } - // process hooks - if hm, ok := c.hookcols[d.key]; ok { - for _, hook := range hm { - if err := c.DoHook(hook, d); err != nil { - if d.revert != nil { - d.revert() + if c.config.FollowHost == "" { + // process hooks, for leader only + if hm, ok := c.hookcols[d.key]; ok { + for _, hook := range hm { + if err := c.DoHook(hook, d); err != nil { + if d.revert != nil { + d.revert() + } + return errAOFHook{err} } - return errAOFHook{err} } } } @@ -260,18 +262,16 @@ func (c *Controller) liveAOF(pos int64, conn net.Conn, rd *server.AnyReaderWrite if err != nil { return err } - rd := resp.NewReader(f) + + b := make([]byte, 4096) + // The reader needs to be OK with the eof not for { - v, _, err := rd.ReadValue() - if err != io.EOF { + n, err := f.Read(b) + if err != io.EOF && n > 0 { if err != nil { return err } - data, err := v.MarshalRESP() - if err != nil { - return err - } - if _, err := conn.Write(data); err != nil { + if _, err := conn.Write(b[:n]); err != nil { return err } continue diff --git a/controller/follow.go b/controller/follow.go index cce372c2..3f151574 100644 --- a/controller/follow.go +++ b/controller/follow.go @@ -231,6 +231,7 @@ func (c *Controller) followStep(host string, port int, followc uint64) error { if telnet || v.Type() != resp.Array { return errors.New("invalid multibulk") } + aofsz, err := c.followHandleCommand(vals, followc, nullw) if err != nil { return err