From 22be725b4576e79a53e08df87c7e1650a85718af Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Thu, 19 Apr 2018 09:25:39 -0700 Subject: [PATCH] Refactor endpoint package --- pkg/controller/controller.go | 4 +- pkg/controller/hooks.go | 2 +- pkg/endpoint/amqp.go | 30 +++++++------- pkg/endpoint/disque.go | 15 ++++--- pkg/endpoint/endpoint.go | 78 ++++++++++++++++++++---------------- pkg/endpoint/grpc.go | 21 +++++----- pkg/endpoint/http.go | 13 +++--- pkg/endpoint/kafka.go | 15 ++++--- pkg/endpoint/mqtt.go | 15 ++++--- pkg/endpoint/redis.go | 15 ++++--- pkg/endpoint/sqs.go | 22 +++++----- 11 files changed, 128 insertions(+), 102 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 66aa6ae7..181133f5 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -67,7 +67,7 @@ type Controller struct { dir string started time.Time config *Config - epc *endpoint.EndpointManager + epc *endpoint.Manager // atomics followc aint // counter increases when follow property changes @@ -136,7 +136,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http expires: make(map[string]map[string]time.Time), started: time.Now(), conns: make(map[*server.Conn]*clientConn), - epc: endpoint.NewEndpointManager(), + epc: endpoint.NewManager(), http: http, } diff --git a/pkg/controller/hooks.go b/pkg/controller/hooks.go index b5e341b1..073baabe 100644 --- a/pkg/controller/hooks.go +++ b/pkg/controller/hooks.go @@ -345,7 +345,7 @@ type Hook struct { closed bool opened bool query string - epm *endpoint.EndpointManager + epm *endpoint.Manager } func (h *Hook) Equals(hook *Hook) bool { diff --git a/pkg/endpoint/amqp.go b/pkg/endpoint/amqp.go index db5dbdd2..b3f8339f 100644 --- a/pkg/endpoint/amqp.go +++ b/pkg/endpoint/amqp.go @@ -1,20 +1,20 @@ package endpoint import ( + "fmt" "net" "sync" "time" - "fmt" - "github.com/streadway/amqp" ) const ( - AMQPExpiresAfter = time.Second * 30 + amqpExpiresAfter = time.Second * 30 ) -type AMQPEndpointConn struct { +// AMQPConn is an endpoint connection +type AMQPConn struct { mu sync.Mutex ep Endpoint conn *amqp.Connection @@ -23,11 +23,12 @@ type AMQPEndpointConn struct { t time.Time } -func (conn *AMQPEndpointConn) Expired() bool { +// Expired returns true if the connection has expired +func (conn *AMQPConn) Expired() bool { conn.mu.Lock() defer conn.mu.Unlock() if !conn.ex { - if time.Now().Sub(conn.t) > kafkaExpiresAfter { + if time.Now().Sub(conn.t) > amqpExpiresAfter { conn.ex = true conn.close() } @@ -35,7 +36,7 @@ func (conn *AMQPEndpointConn) Expired() bool { return conn.ex } -func (conn *AMQPEndpointConn) close() { +func (conn *AMQPConn) close() { if conn.conn != nil { conn.conn.Close() conn.conn = nil @@ -43,7 +44,8 @@ func (conn *AMQPEndpointConn) close() { } } -func (conn *AMQPEndpointConn) Send(msg string) error { +// Send sends a message +func (conn *AMQPConn) Send(msg string) error { conn.mu.Lock() defer conn.mu.Unlock() @@ -113,7 +115,7 @@ func (conn *AMQPEndpointConn) Send(msg string) error { conn.channel = channel } - if err := conn.channel.Publish( + return conn.channel.Publish( conn.ep.AMQP.QueueName, conn.ep.AMQP.RouteKey, conn.ep.AMQP.Mandatory, @@ -126,15 +128,11 @@ func (conn *AMQPEndpointConn) Send(msg string) error { DeliveryMode: conn.ep.AMQP.DeliveryMode, Priority: 0, }, - ); err != nil { - return err - } - - return nil + ) } -func newAMQPEndpointConn(ep Endpoint) *AMQPEndpointConn { - return &AMQPEndpointConn{ +func newAMQPConn(ep Endpoint) *AMQPConn { + return &AMQPConn{ ep: ep, t: time.Now(), } diff --git a/pkg/endpoint/disque.go b/pkg/endpoint/disque.go index 404a6f6f..f7ce2ae8 100644 --- a/pkg/endpoint/disque.go +++ b/pkg/endpoint/disque.go @@ -15,7 +15,8 @@ const ( disqueExpiresAfter = time.Second * 30 ) -type DisqueEndpointConn struct { +// DisqueConn is an endpoint connection +type DisqueConn struct { mu sync.Mutex ep Endpoint ex bool @@ -24,14 +25,15 @@ type DisqueEndpointConn struct { rd *bufio.Reader } -func newDisqueEndpointConn(ep Endpoint) *DisqueEndpointConn { - return &DisqueEndpointConn{ +func newDisqueConn(ep Endpoint) *DisqueConn { + return &DisqueConn{ ep: ep, t: time.Now(), } } -func (conn *DisqueEndpointConn) Expired() bool { +// Expired returns true if the connection has expired +func (conn *DisqueConn) Expired() bool { conn.mu.Lock() defer conn.mu.Unlock() if !conn.ex { @@ -45,7 +47,7 @@ func (conn *DisqueEndpointConn) Expired() bool { return conn.ex } -func (conn *DisqueEndpointConn) close() { +func (conn *DisqueConn) close() { if conn.conn != nil { conn.conn.Close() conn.conn = nil @@ -53,7 +55,8 @@ func (conn *DisqueEndpointConn) close() { conn.rd = nil } -func (conn *DisqueEndpointConn) Send(msg string) error { +// Send sends a message +func (conn *DisqueConn) Send(msg string) error { conn.mu.Lock() defer conn.mu.Unlock() if conn.ex { diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index 5df104e5..244d3705 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -13,23 +13,31 @@ import ( var errExpired = errors.New("expired") -// EndpointProtocol is the type of protocol that the endpoint represents. -type EndpointProtocol string +// Protocol is the type of protocol that the endpoint represents. +type Protocol string const ( - HTTP = EndpointProtocol("http") // HTTP - Disque = EndpointProtocol("disque") // Disque - GRPC = EndpointProtocol("grpc") // GRPC - Redis = EndpointProtocol("redis") // Redis - Kafka = EndpointProtocol("kafka") // Kafka - MQTT = EndpointProtocol("mqtt") // MQTT - AMQP = EndpointProtocol("amqp") // AMQP - SQS = EndpointProtocol("sqs") // SQS + // HTTP protocol + HTTP = Protocol("http") + // Disque protocol + Disque = Protocol("disque") + // GRPC protocol + GRPC = Protocol("grpc") + // Redis protocol + Redis = Protocol("redis") + // Kafka protocol + Kafka = Protocol("kafka") + // MQTT protocol + MQTT = Protocol("mqtt") + // AMQP protocol + AMQP = Protocol("amqp") + // SQS protocol + SQS = Protocol("sqs") ) // Endpoint represents an endpoint. type Endpoint struct { - Protocol EndpointProtocol + Protocol Protocol Original string GRPC struct { Host string @@ -84,27 +92,29 @@ type Endpoint struct { } } -type EndpointConn interface { +// Conn is an endpoint connection +type Conn interface { Expired() bool Send(val string) error } -type EndpointManager struct { - mu sync.RWMutex // this is intentionally exposed - conns map[string]EndpointConn +// Manager manages all endpoints +type Manager struct { + mu sync.RWMutex + conns map[string]Conn } -func NewEndpointManager() *EndpointManager { - epc := &EndpointManager{ - conns: make(map[string]EndpointConn), +// NewManager returns a new manager +func NewManager() *Manager { + epc := &Manager{ + conns: make(map[string]Conn), } go epc.Run() return epc } -// Manage connection at enpoints -// If some connection expired we should delete it -func (epc *EndpointManager) Run() { +// Run starts the managing of endpoints +func (epc *Manager) Run() { for { time.Sleep(time.Second) func() { @@ -119,14 +129,14 @@ func (epc *EndpointManager) Run() { } } -// Get finds an endpoint based on its url. If the enpoint does not -// exist a new only is created. -func (epc *EndpointManager) Validate(url string) error { +// Validate an endpoint url +func (epc *Manager) Validate(url string) error { _, err := parseEndpoint(url) return err } -func (epc *EndpointManager) Send(endpoint, val string) error { +// Send send a message to an endpoint +func (epc *Manager) Send(endpoint, msg string) error { for { epc.mu.Lock() conn, ok := epc.conns[endpoint] @@ -140,26 +150,26 @@ func (epc *EndpointManager) Send(endpoint, val string) error { default: return errors.New("invalid protocol") case HTTP: - conn = newHTTPEndpointConn(ep) + conn = newHTTPConn(ep) case Disque: - conn = newDisqueEndpointConn(ep) + conn = newDisqueConn(ep) case GRPC: - conn = newGRPCEndpointConn(ep) + conn = newGRPCConn(ep) case Redis: - conn = newRedisEndpointConn(ep) + conn = newRedisConn(ep) case Kafka: - conn = newKafkaEndpointConn(ep) + conn = newKafkaConn(ep) case MQTT: - conn = newMQTTEndpointConn(ep) + conn = newMQTTConn(ep) case AMQP: - conn = newAMQPEndpointConn(ep) + conn = newAMQPConn(ep) case SQS: - conn = newSQSEndpointConn(ep) + conn = newSQSConn(ep) } epc.conns[endpoint] = conn } epc.mu.Unlock() - err := conn.Send(val) + err := conn.Send(msg) if err != nil { if err == errExpired { // it's possible that the connection has expired in-between diff --git a/pkg/endpoint/grpc.go b/pkg/endpoint/grpc.go index dd06414c..08497031 100644 --- a/pkg/endpoint/grpc.go +++ b/pkg/endpoint/grpc.go @@ -6,10 +6,8 @@ import ( "sync" "time" - "golang.org/x/net/context" - "github.com/tidwall/tile38/pkg/hservice" - + "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -17,7 +15,8 @@ const ( grpcExpiresAfter = time.Second * 30 ) -type GRPCEndpointConn struct { +// GRPCConn is an endpoint connection +type GRPCConn struct { mu sync.Mutex ep Endpoint ex bool @@ -26,14 +25,15 @@ type GRPCEndpointConn struct { sconn hservice.HookServiceClient } -func newGRPCEndpointConn(ep Endpoint) *GRPCEndpointConn { - return &GRPCEndpointConn{ +func newGRPCConn(ep Endpoint) *GRPCConn { + return &GRPCConn{ ep: ep, t: time.Now(), } } -func (conn *GRPCEndpointConn) Expired() bool { +// Expired returns true if the connection has expired +func (conn *GRPCConn) Expired() bool { conn.mu.Lock() defer conn.mu.Unlock() if !conn.ex { @@ -46,14 +46,15 @@ func (conn *GRPCEndpointConn) Expired() bool { } return conn.ex } -func (conn *GRPCEndpointConn) close() { +func (conn *GRPCConn) close() { if conn.conn != nil { conn.conn.Close() conn.conn = nil } } -func (conn *GRPCEndpointConn) Send(msg string) error { +// Send sends a message +func (conn *GRPCConn) Send(msg string) error { conn.mu.Lock() defer conn.mu.Unlock() if conn.ex { @@ -70,7 +71,7 @@ func (conn *GRPCEndpointConn) Send(msg string) error { } conn.sconn = hservice.NewHookServiceClient(conn.conn) } - r, err := conn.sconn.Send(context.Background(), &hservice.MessageRequest{msg}) + r, err := conn.sconn.Send(context.Background(), &hservice.MessageRequest{Value: msg}) if err != nil { conn.close() return err diff --git a/pkg/endpoint/http.go b/pkg/endpoint/http.go index 7326aee1..c5a0c6d7 100644 --- a/pkg/endpoint/http.go +++ b/pkg/endpoint/http.go @@ -15,13 +15,14 @@ const ( httpMaxIdleConnections = 20 ) -type HTTPEndpointConn struct { +// HTTPConn is an endpoint connection +type HTTPConn struct { ep Endpoint client *http.Client } -func newHTTPEndpointConn(ep Endpoint) *HTTPEndpointConn { - return &HTTPEndpointConn{ +func newHTTPConn(ep Endpoint) *HTTPConn { + return &HTTPConn{ ep: ep, client: &http.Client{ Transport: &http.Transport{ @@ -33,11 +34,13 @@ func newHTTPEndpointConn(ep Endpoint) *HTTPEndpointConn { } } -func (conn *HTTPEndpointConn) Expired() bool { +// Expired returns true if the connection has expired +func (conn *HTTPConn) Expired() bool { return false } -func (conn *HTTPEndpointConn) Send(msg string) error { +// Send sends a message +func (conn *HTTPConn) Send(msg string) error { req, err := http.NewRequest("POST", conn.ep.Original, bytes.NewBufferString(msg)) if err != nil { return err diff --git a/pkg/endpoint/kafka.go b/pkg/endpoint/kafka.go index 0888da4b..7c1ce596 100644 --- a/pkg/endpoint/kafka.go +++ b/pkg/endpoint/kafka.go @@ -13,7 +13,8 @@ const ( kafkaExpiresAfter = time.Second * 30 ) -type KafkaEndpointConn struct { +// KafkaConn is an endpoint connection +type KafkaConn struct { mu sync.Mutex ep Endpoint conn sarama.SyncProducer @@ -21,7 +22,8 @@ type KafkaEndpointConn struct { t time.Time } -func (conn *KafkaEndpointConn) Expired() bool { +// Expired returns true if the connection has expired +func (conn *KafkaConn) Expired() bool { conn.mu.Lock() defer conn.mu.Unlock() if !conn.ex { @@ -35,14 +37,15 @@ func (conn *KafkaEndpointConn) Expired() bool { return conn.ex } -func (conn *KafkaEndpointConn) close() { +func (conn *KafkaConn) close() { if conn.conn != nil { conn.conn.Close() conn.conn = nil } } -func (conn *KafkaEndpointConn) Send(msg string) error { +// Send sends a message +func (conn *KafkaConn) Send(msg string) error { conn.mu.Lock() defer conn.mu.Unlock() @@ -84,8 +87,8 @@ func (conn *KafkaEndpointConn) Send(msg string) error { return nil } -func newKafkaEndpointConn(ep Endpoint) *KafkaEndpointConn { - return &KafkaEndpointConn{ +func newKafkaConn(ep Endpoint) *KafkaConn { + return &KafkaConn{ ep: ep, t: time.Now(), } diff --git a/pkg/endpoint/mqtt.go b/pkg/endpoint/mqtt.go index 69a6b120..1dab8beb 100644 --- a/pkg/endpoint/mqtt.go +++ b/pkg/endpoint/mqtt.go @@ -12,7 +12,8 @@ const ( mqttExpiresAfter = time.Second * 30 ) -type MQTTEndpointConn struct { +// MQTTConn is an endpoint connection +type MQTTConn struct { mu sync.Mutex ep Endpoint conn paho.Client @@ -20,7 +21,8 @@ type MQTTEndpointConn struct { t time.Time } -func (conn *MQTTEndpointConn) Expired() bool { +// Expired returns true if the connection has expired +func (conn *MQTTConn) Expired() bool { conn.mu.Lock() defer conn.mu.Unlock() if !conn.ex { @@ -32,7 +34,7 @@ func (conn *MQTTEndpointConn) Expired() bool { return conn.ex } -func (conn *MQTTEndpointConn) close() { +func (conn *MQTTConn) close() { if conn.conn != nil { if conn.conn.IsConnected() { conn.conn.Disconnect(250) @@ -42,7 +44,8 @@ func (conn *MQTTEndpointConn) close() { } } -func (conn *MQTTEndpointConn) Send(msg string) error { +// Send sends a message +func (conn *MQTTConn) Send(msg string) error { conn.mu.Lock() defer conn.mu.Unlock() @@ -74,8 +77,8 @@ func (conn *MQTTEndpointConn) Send(msg string) error { return nil } -func newMQTTEndpointConn(ep Endpoint) *MQTTEndpointConn { - return &MQTTEndpointConn{ +func newMQTTConn(ep Endpoint) *MQTTConn { + return &MQTTConn{ ep: ep, t: time.Now(), } diff --git a/pkg/endpoint/redis.go b/pkg/endpoint/redis.go index b1440f99..748c9b53 100644 --- a/pkg/endpoint/redis.go +++ b/pkg/endpoint/redis.go @@ -13,7 +13,8 @@ const ( redisExpiresAfter = time.Second * 30 ) -type RedisEndpointConn struct { +// RedisConn is an endpoint connection +type RedisConn struct { mu sync.Mutex ep Endpoint ex bool @@ -22,14 +23,15 @@ type RedisEndpointConn struct { rd *bufio.Reader } -func newRedisEndpointConn(ep Endpoint) *RedisEndpointConn { - return &RedisEndpointConn{ +func newRedisConn(ep Endpoint) *RedisConn { + return &RedisConn{ ep: ep, t: time.Now(), } } -func (conn *RedisEndpointConn) Expired() bool { +// Expired returns true if the connection has expired +func (conn *RedisConn) Expired() bool { conn.mu.Lock() defer conn.mu.Unlock() if !conn.ex { @@ -43,7 +45,7 @@ func (conn *RedisEndpointConn) Expired() bool { return conn.ex } -func (conn *RedisEndpointConn) close() { +func (conn *RedisConn) close() { if conn.conn != nil { conn.conn.Close() conn.conn = nil @@ -51,7 +53,8 @@ func (conn *RedisEndpointConn) close() { conn.rd = nil } -func (conn *RedisEndpointConn) Send(msg string) error { +// Send sends a message +func (conn *RedisConn) Send(msg string) error { conn.mu.Lock() defer conn.mu.Unlock() diff --git a/pkg/endpoint/sqs.go b/pkg/endpoint/sqs.go index 3cdee789..4405aec7 100644 --- a/pkg/endpoint/sqs.go +++ b/pkg/endpoint/sqs.go @@ -10,17 +10,17 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" - "github.com/streadway/amqp" ) var errCreateQueue = errors.New("Error while creating queue") const ( - SQSExpiresAfter = time.Second * 30 + sqsExpiresAfter = time.Second * 30 ) -type SQSEndpointConn struct { +// SQSConn is an endpoint connection +type SQSConn struct { mu sync.Mutex ep Endpoint session *session.Session @@ -30,15 +30,16 @@ type SQSEndpointConn struct { t time.Time } -func (conn *SQSEndpointConn) generateSQSURL() string { +func (conn *SQSConn) generateSQSURL() string { return "https://sqs." + conn.ep.SQS.Region + "amazonaws.com/" + conn.ep.SQS.QueueID + "/" + conn.ep.SQS.QueueName } -func (conn *SQSEndpointConn) Expired() bool { +// Expired returns true if the connection has expired +func (conn *SQSConn) Expired() bool { conn.mu.Lock() defer conn.mu.Unlock() if !conn.ex { - if time.Now().Sub(conn.t) > SQSExpiresAfter { + if time.Now().Sub(conn.t) > sqsExpiresAfter { conn.ex = true conn.close() } @@ -46,14 +47,15 @@ func (conn *SQSEndpointConn) Expired() bool { return conn.ex } -func (conn *SQSEndpointConn) close() { +func (conn *SQSConn) close() { if conn.svc != nil { conn.svc = nil conn.session = nil } } -func (conn *SQSEndpointConn) Send(msg string) error { +// Send sends a message +func (conn *SQSConn) Send(msg string) error { conn.mu.Lock() defer conn.mu.Unlock() @@ -113,8 +115,8 @@ func (conn *SQSEndpointConn) Send(msg string) error { return nil } -func newSQSEndpointConn(ep Endpoint) *SQSEndpointConn { - return &SQSEndpointConn{ +func newSQSConn(ep Endpoint) *SQSConn { + return &SQSConn{ ep: ep, t: time.Now(), }