From 56e0bac24c909a9452b227d1cd626f016e32e0f7 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Thu, 9 Feb 2017 10:01:59 -0700 Subject: [PATCH 1/2] TCP Keepalives Enabled TCP keepalive packets to determine if the connection is still valid, and terminate if needed. It also helps with maintaining idle connections. Default to 300 seconds and can be changed by: CONFIG SET keepalive 300 addresses #145: clients not being cleaned up properly --- controller/config.go | 23 ++++++++++++++++++++++- controller/controller.go | 9 +++++++++ controller/server/server.go | 10 ++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/controller/config.go b/controller/config.go index 3ca94d7c..1d7b5b9c 100644 --- a/controller/config.go +++ b/controller/config.go @@ -20,9 +20,10 @@ const ( ProtectedMode = "protected-mode" MaxMemory = "maxmemory" AutoGC = "autogc" + KeepAlive = "keepalive" ) -var validProperties = []string{RequirePass, LeaderAuth, ProtectedMode, MaxMemory, AutoGC} +var validProperties = []string{RequirePass, LeaderAuth, ProtectedMode, MaxMemory, AutoGC, KeepAlive} // Config is a tile38 config type Config struct { @@ -44,6 +45,8 @@ type Config struct { MaxMemory int `json:"-"` AutoGCP string `json:"autogc,omitempty"` AutoGC uint64 `json:"-"` + KeepAliveP string `json:"keepalive,omitempty"` + KeepAlive int `json:"-"` } func (c *Controller) loadConfig() error { @@ -74,6 +77,9 @@ func (c *Controller) loadConfig() error { if err := c.setConfigProperty(AutoGC, c.config.AutoGCP, true); err != nil { return err } + if err := c.setConfigProperty(KeepAlive, c.config.KeepAliveP, true); err != nil { + return err + } return nil } @@ -161,7 +167,19 @@ func (c *Controller) setConfigProperty(name, value string, fromLoad bool) error default: invalid = true } + case KeepAlive: + if value == "" { + c.config.KeepAlive = 300 + } else { + keepalive, err := strconv.ParseUint(value, 10, 64) + if err != nil { + invalid = true + } else { + c.config.KeepAlive = int(keepalive) + } + } } + if invalid { return fmt.Errorf("Invalid argument '%s' for CONFIG SET '%s'", value, name) } @@ -192,6 +210,8 @@ func (c *Controller) getConfigProperty(name string) string { return c.config.ProtectedMode case MaxMemory: return formatMemSize(c.config.MaxMemory) + case KeepAlive: + return strconv.FormatUint(uint64(c.config.KeepAlive), 10) } } @@ -216,6 +236,7 @@ func (c *Controller) writeConfig(writeProperties bool) error { c.config.ProtectedModeP = c.config.ProtectedMode c.config.MaxMemoryP = formatMemSize(c.config.MaxMemory) c.config.AutoGCP = strconv.FormatUint(c.config.AutoGC, 10) + c.config.KeepAliveP = strconv.FormatUint(uint64(c.config.KeepAlive), 10) } var data []byte data, err = json.MarshalIndent(c.config, "", "\t") diff --git a/controller/controller.go b/controller/controller.go index 50074d90..eb1ce533 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -227,8 +227,17 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http return is } var clientId uint64 + opened := func(conn *server.Conn) { c.mu.Lock() + if c.config.KeepAlive > 0 { + err := conn.SetKeepAlive( + time.Duration(c.config.KeepAlive) * time.Second) + if err != nil { + log.Warnf("could not set keepalive for connection: %v", + conn.RemoteAddr().String()) + } + } clientId++ c.conns[conn] = &clientConn{ id: clientId, diff --git a/controller/server/server.go b/controller/server/server.go index e031304f..fff1bf6b 100644 --- a/controller/server/server.go +++ b/controller/server/server.go @@ -45,6 +45,16 @@ type Conn struct { Authenticated bool } +func (conn Conn) SetKeepAlive(period time.Duration) error { + if tcp, ok := conn.Conn.(*net.TCPConn); ok { + if err := tcp.SetKeepAlive(true); err != nil { + return err + } + return tcp.SetKeepAlivePeriod(period) + } + return nil +} + var errCloseHTTP = errors.New("close http") // ListenAndServe starts a tile38 server at the specified address. From 22f1b1bd81fc043e31020839870e928fbb9c4430 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Fri, 10 Feb 2017 05:27:02 -0700 Subject: [PATCH 2/2] resend on expired endpoint --- controller/endpoint/disque.go | 2 +- controller/endpoint/endpoint.go | 60 ++++++++++++++++++++------------- controller/endpoint/grpc.go | 2 +- controller/endpoint/http.go | 3 +- controller/endpoint/redis.go | 2 +- 5 files changed, 41 insertions(+), 28 deletions(-) diff --git a/controller/endpoint/disque.go b/controller/endpoint/disque.go index 169177a9..404a6f6f 100644 --- a/controller/endpoint/disque.go +++ b/controller/endpoint/disque.go @@ -57,7 +57,7 @@ func (conn *DisqueEndpointConn) Send(msg string) error { conn.mu.Lock() defer conn.mu.Unlock() if conn.ex { - return errors.New("expired") + return errExpired } conn.t = time.Now() if conn.conn == nil { diff --git a/controller/endpoint/endpoint.go b/controller/endpoint/endpoint.go index 3b2ed27d..b0075bc1 100644 --- a/controller/endpoint/endpoint.go +++ b/controller/endpoint/endpoint.go @@ -9,6 +9,8 @@ import ( "time" ) +var errExpired = errors.New("expired") + // EndpointProtocol is the type of protocol that the endpoint represents. type EndpointProtocol string @@ -36,9 +38,9 @@ type Endpoint struct { } } Redis struct { - Host string - Port int - Channel string + Host string + Port int + Channel string } } @@ -82,30 +84,42 @@ func (epc *EndpointManager) Validate(url string) error { } func (epc *EndpointManager) Send(endpoint, val string) error { - epc.mu.Lock() - conn, ok := epc.conns[endpoint] - if !ok || conn.Expired() { - ep, err := parseEndpoint(endpoint) + for { + epc.mu.Lock() + conn, ok := epc.conns[endpoint] + if !ok || conn.Expired() { + ep, err := parseEndpoint(endpoint) + if err != nil { + epc.mu.Unlock() + return err + } + switch ep.Protocol { + default: + return errors.New("invalid protocol") + case HTTP: + conn = newHTTPEndpointConn(ep) + case Disque: + conn = newDisqueEndpointConn(ep) + case GRPC: + conn = newGRPCEndpointConn(ep) + case Redis: + conn = newRedisEndpointConn(ep) + } + epc.conns[endpoint] = conn + } + epc.mu.Unlock() + err := conn.Send(val) if err != nil { - epc.mu.Unlock() + if err == errExpired { + // it's possible that the connection has expired in-between + // the last conn.Expired() check and now. If so, we should + // just try the send again. + continue + } return err } - switch ep.Protocol { - default: - return errors.New("invalid protocol") - case HTTP: - conn = newHTTPEndpointConn(ep) - case Disque: - conn = newDisqueEndpointConn(ep) - case GRPC: - conn = newGRPCEndpointConn(ep) - case Redis: - conn = newRedisEndpointConn(ep) - } - epc.conns[endpoint] = conn + return nil } - epc.mu.Unlock() - return conn.Send(val) } func parseEndpoint(s string) (Endpoint, error) { diff --git a/controller/endpoint/grpc.go b/controller/endpoint/grpc.go index a2d19d53..821d586c 100644 --- a/controller/endpoint/grpc.go +++ b/controller/endpoint/grpc.go @@ -57,7 +57,7 @@ func (conn *GRPCEndpointConn) Send(msg string) error { conn.mu.Lock() defer conn.mu.Unlock() if conn.ex { - return errors.New("expired") + return errExpired } conn.t = time.Now() if conn.conn == nil { diff --git a/controller/endpoint/http.go b/controller/endpoint/http.go index 34c3cd53..07cdc677 100644 --- a/controller/endpoint/http.go +++ b/controller/endpoint/http.go @@ -2,7 +2,6 @@ package endpoint import ( "bytes" - "errors" "fmt" "io" "io/ioutil" @@ -48,7 +47,7 @@ func (conn *HTTPEndpointConn) Send(msg string) error { conn.mu.Lock() defer conn.mu.Unlock() if conn.ex { - return errors.New("expired") + return errExpired } conn.t = time.Now() if conn.client == nil { diff --git a/controller/endpoint/redis.go b/controller/endpoint/redis.go index 1343ca1d..b1440f99 100644 --- a/controller/endpoint/redis.go +++ b/controller/endpoint/redis.go @@ -56,7 +56,7 @@ func (conn *RedisEndpointConn) Send(msg string) error { defer conn.mu.Unlock() if conn.ex { - return errors.New("expired") + return errExpired } conn.t = time.Now()