follow replication
This commit is contained in:
parent
4a15a20e27
commit
2d6a6ff0a0
@ -90,14 +90,16 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error {
|
||||
if !d.updated {
|
||||
return nil // just ignore writes if the command did not update
|
||||
}
|
||||
// process hooks
|
||||
if hm, ok := c.hookcols[d.key]; ok {
|
||||
for _, hook := range hm {
|
||||
if err := c.DoHook(hook, d); err != nil {
|
||||
if d.revert != nil {
|
||||
d.revert()
|
||||
if c.config.FollowHost == "" {
|
||||
// process hooks, for leader only
|
||||
if hm, ok := c.hookcols[d.key]; ok {
|
||||
for _, hook := range hm {
|
||||
if err := c.DoHook(hook, d); err != nil {
|
||||
if d.revert != nil {
|
||||
d.revert()
|
||||
}
|
||||
return errAOFHook{err}
|
||||
}
|
||||
return errAOFHook{err}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -260,18 +262,16 @@ func (c *Controller) liveAOF(pos int64, conn net.Conn, rd *server.AnyReaderWrite
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rd := resp.NewReader(f)
|
||||
|
||||
b := make([]byte, 4096)
|
||||
// The reader needs to be OK with the eof not
|
||||
for {
|
||||
v, _, err := rd.ReadValue()
|
||||
if err != io.EOF {
|
||||
n, err := f.Read(b)
|
||||
if err != io.EOF && n > 0 {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := v.MarshalRESP()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := conn.Write(data); err != nil {
|
||||
if _, err := conn.Write(b[:n]); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
|
@ -231,6 +231,7 @@ func (c *Controller) followStep(host string, port int, followc uint64) error {
|
||||
if telnet || v.Type() != resp.Array {
|
||||
return errors.New("invalid multibulk")
|
||||
}
|
||||
|
||||
aofsz, err := c.followHandleCommand(vals, followc, nullw)
|
||||
if err != nil {
|
||||
return err
|
||||
|
Loading…
x
Reference in New Issue
Block a user