diff --git a/cmd/tile38-server/main.go b/cmd/tile38-server/main.go index 84b4dc02..498a1403 100644 --- a/cmd/tile38-server/main.go +++ b/cmd/tile38-server/main.go @@ -5,15 +5,21 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" "os" "runtime" "strconv" "strings" + "golang.org/x/net/context" + + "google.golang.org/grpc" + "github.com/tidwall/tile38/controller" "github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/core" + "github.com/tidwall/tile38/hservice" ) var ( @@ -26,15 +32,21 @@ var ( quiet bool ) -// Fire up a webhook test server by using the --webhook-consumer-port +// Fire up a webhook test server by using the --webhook-consumer-http-port // for example -// $ ./tile38-server --webhook-consumer-port 9999 +// $ ./tile38-server --webhook-http-consumer-port 9999 // // The create hooks like such... // SETHOOK myhook http://localhost:9999/myhook NEARBY mykey FENCE POINT 33.5 -115.5 1000 +type hserver struct{} + +func (s *hserver) Send(ctx context.Context, in *hservice.MessageRequest) (*hservice.MessageReply, error) { + return &hservice.MessageReply{true}, nil +} + func main() { - if len(os.Args) == 3 && os.Args[1] == "--webhook-consumer-port" { + if len(os.Args) == 3 && os.Args[1] == "--webhook-http-consumer-port" { log.Default = log.New(os.Stderr, &log.Config{}) port, err := strconv.ParseUint(os.Args[2], 10, 16) if err != nil { @@ -43,8 +55,7 @@ func main() { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { data, err := ioutil.ReadAll(r.Body) if err != nil { - log.Error(err) - return + log.Fatal(err) } log.HTTPf("http: %s : %s", r.URL.Path, string(data)) }) @@ -55,6 +66,26 @@ func main() { return } + if len(os.Args) == 3 && os.Args[1] == "--webhook-grpc-consumer-port" { + log.Default = log.New(os.Stderr, &log.Config{}) + port, err := strconv.ParseUint(os.Args[2], 10, 16) + if err != nil { + log.Fatal(err) + } + + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + log.Fatal(err) + } + s := grpc.NewServer() + hservice.RegisterHookServiceServer(s, &hserver{}) + log.Infof("webhook server grpc://localhost:%d/", port) + if err := s.Serve(lis); err != nil { + log.Fatal(err) + } + return + } + // parse non standard args. nargs := []string{os.Args[0]} for i := 1; i < len(os.Args); i++ { diff --git a/controller/endpoint/disque.go b/controller/endpoint/disque.go index 98daa92d..cd315fc2 100644 --- a/controller/endpoint/disque.go +++ b/controller/endpoint/disque.go @@ -13,7 +13,6 @@ import ( const ( disqueExpiresAfter = time.Second * 30 - disqueDialTimeout = time.Second * 10 ) type DisqueEndpointConn struct { @@ -64,7 +63,7 @@ func (conn *DisqueEndpointConn) Send(msg string) error { if conn.conn == nil { addr := fmt.Sprintf("%s:%d", conn.ep.Disque.Host, conn.ep.Disque.Port) var err error - conn.conn, err = net.DialTimeout("tcp", addr, disqueDialTimeout) + conn.conn, err = net.Dial("tcp", addr) if err != nil { return err } diff --git a/controller/endpoint/endpoint.go b/controller/endpoint/endpoint.go index 877f85e2..6b3d2128 100644 --- a/controller/endpoint/endpoint.go +++ b/controller/endpoint/endpoint.go @@ -15,13 +15,18 @@ type EndpointProtocol string const ( HTTP = EndpointProtocol("http") // HTTP Disque = EndpointProtocol("disque") // Disque + GRPC = EndpointProtocol("grpc") // GRPC ) // Endpoint represents an endpoint. type Endpoint struct { Protocol EndpointProtocol Original string - Disque struct { + GRPC struct { + Host string + Port int + } + Disque struct { Host string Port int QueueName string @@ -86,6 +91,8 @@ func (epc *EndpointManager) Send(endpoint, val string) error { conn = newHTTPEndpointConn(ep) case Disque: conn = newDisqueEndpointConn(ep) + case GRPC: + conn = newGRPCEndpointConn(ep) } epc.conns[endpoint] = conn } @@ -93,41 +100,6 @@ func (epc *EndpointManager) Send(endpoint, val string) error { return conn.Send(val) } -/* -func (conn *endpointConn) Expired() bool { - conn.mu.Lock() - defer conn.mu.Unlock() - println("is expired?", conn.ex) - return conn.ex -} - -func (conn *endpointConn) Send(val string) error { - conn.mu.Lock() - defer conn.mu.Unlock() - - return nil -} -*/ -/* -func (ep *Endpoint) Open() { - ep.mu.Lock() - defer ep.mu.Unlock() - println("open " + ep.Original) - // Even though open is called we should wait until the a messages - // is sent before establishing a network connection. -} - -func (ep *Endpoint) Close() { - ep.mu.Lock() - defer ep.mu.Unlock() - println("close " + ep.Original) - // Make sure to forece close the network connection here. -} - -func (ep *Endpoint) Send() error { - return nil -} -*/ func parseEndpoint(s string) (Endpoint, error) { var endpoint Endpoint endpoint.Original = s @@ -140,6 +112,8 @@ func parseEndpoint(s string) (Endpoint, error) { endpoint.Protocol = HTTP case strings.HasPrefix(s, "disque:"): endpoint.Protocol = Disque + case strings.HasPrefix(s, "grpc:"): + endpoint.Protocol = GRPC } s = s[strings.Index(s, ":")+1:] if !strings.HasPrefix(s, "//") { @@ -151,6 +125,23 @@ func parseEndpoint(s string) (Endpoint, error) { if s == "" { return endpoint, errors.New("missing host") } + if endpoint.Protocol == GRPC { + dp := strings.Split(s, ":") + switch len(dp) { + default: + return endpoint, errors.New("invalid grpc url") + case 1: + endpoint.GRPC.Host = dp[0] + endpoint.GRPC.Port = 80 + case 2: + endpoint.GRPC.Host = dp[0] + n, err := strconv.ParseUint(dp[1], 10, 16) + if err != nil { + return endpoint, errors.New("invalid grpc url") + } + endpoint.GRPC.Port = int(n) + } + } if endpoint.Protocol == Disque { dp := strings.Split(s, ":") switch len(dp) { diff --git a/controller/endpoint/grpc.go b/controller/endpoint/grpc.go new file mode 100644 index 00000000..a2d19d53 --- /dev/null +++ b/controller/endpoint/grpc.go @@ -0,0 +1,83 @@ +package endpoint + +import ( + "errors" + "fmt" + "sync" + "time" + + "golang.org/x/net/context" + + "github.com/tidwall/tile38/hservice" + + "google.golang.org/grpc" +) + +const ( + grpcExpiresAfter = time.Second * 30 +) + +type GRPCEndpointConn struct { + mu sync.Mutex + ep Endpoint + ex bool + t time.Time + conn *grpc.ClientConn + sconn hservice.HookServiceClient +} + +func newGRPCEndpointConn(ep Endpoint) *GRPCEndpointConn { + return &GRPCEndpointConn{ + ep: ep, + t: time.Now(), + } +} + +func (conn *GRPCEndpointConn) Expired() bool { + conn.mu.Lock() + defer conn.mu.Unlock() + if !conn.ex { + if time.Now().Sub(conn.t) > grpcExpiresAfter { + if conn.conn != nil { + conn.close() + } + conn.ex = true + } + } + return conn.ex +} +func (conn *GRPCEndpointConn) close() { + if conn.conn != nil { + conn.conn.Close() + conn.conn = nil + } +} + +func (conn *GRPCEndpointConn) Send(msg string) error { + conn.mu.Lock() + defer conn.mu.Unlock() + if conn.ex { + return errors.New("expired") + } + conn.t = time.Now() + if conn.conn == nil { + addr := fmt.Sprintf("%s:%d", conn.ep.GRPC.Host, conn.ep.GRPC.Port) + var err error + conn.conn, err = grpc.Dial(addr, grpc.WithInsecure()) + if err != nil { + conn.close() + return err + } + conn.sconn = hservice.NewHookServiceClient(conn.conn) + } + r, err := conn.sconn.Send(context.Background(), &hservice.MessageRequest{msg}) + if err != nil { + conn.close() + return err + } + if !r.Ok { + conn.close() + return errors.New("invalid grpc reply") + } + return nil +} diff --git a/hservice/gen.sh b/hservice/gen.sh new file mode 100755 index 00000000..7a1c930d --- /dev/null +++ b/hservice/gen.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +cd $(dirname "${BASH_SOURCE[0]}") +protoc --go_out=plugins=grpc,import_path=hservice:. *.proto diff --git a/hservice/hservice.pb.go b/hservice/hservice.pb.go new file mode 100644 index 00000000..2f8a8a9b --- /dev/null +++ b/hservice/hservice.pb.go @@ -0,0 +1,151 @@ +// Code generated by protoc-gen-go. +// source: hservice.proto +// DO NOT EDIT! + +/* +Package hservice is a generated protocol buffer package. + +It is generated from these files: + hservice.proto + +It has these top-level messages: + MessageRequest + MessageReply +*/ +package hservice + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +// The request message containing the message value +type MessageRequest struct { + Value string `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"` +} + +func (m *MessageRequest) Reset() { *m = MessageRequest{} } +func (m *MessageRequest) String() string { return proto.CompactTextString(m) } +func (*MessageRequest) ProtoMessage() {} +func (*MessageRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +// The response message containing an ok (true or false) +type MessageReply struct { + Ok bool `protobuf:"varint,1,opt,name=ok" json:"ok,omitempty"` +} + +func (m *MessageReply) Reset() { *m = MessageReply{} } +func (m *MessageReply) String() string { return proto.CompactTextString(m) } +func (*MessageReply) ProtoMessage() {} +func (*MessageReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func init() { + proto.RegisterType((*MessageRequest)(nil), "hservice.MessageRequest") + proto.RegisterType((*MessageReply)(nil), "hservice.MessageReply") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion3 + +// Client API for HookService service + +type HookServiceClient interface { + // Sends a greeting + Send(ctx context.Context, in *MessageRequest, opts ...grpc.CallOption) (*MessageReply, error) +} + +type hookServiceClient struct { + cc *grpc.ClientConn +} + +func NewHookServiceClient(cc *grpc.ClientConn) HookServiceClient { + return &hookServiceClient{cc} +} + +func (c *hookServiceClient) Send(ctx context.Context, in *MessageRequest, opts ...grpc.CallOption) (*MessageReply, error) { + out := new(MessageReply) + err := grpc.Invoke(ctx, "/hservice.HookService/Send", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for HookService service + +type HookServiceServer interface { + // Sends a greeting + Send(context.Context, *MessageRequest) (*MessageReply, error) +} + +func RegisterHookServiceServer(s *grpc.Server, srv HookServiceServer) { + s.RegisterService(&_HookService_serviceDesc, srv) +} + +func _HookService_Send_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MessageRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HookServiceServer).Send(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/hservice.HookService/Send", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HookServiceServer).Send(ctx, req.(*MessageRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _HookService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "hservice.HookService", + HandlerType: (*HookServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Send", + Handler: _HookService_Send_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: fileDescriptor0, +} + +func init() { proto.RegisterFile("hservice.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 168 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xcb, 0x28, 0x4e, 0x2d, + 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf1, 0x95, 0xd4, + 0xb8, 0xf8, 0x7c, 0x53, 0x8b, 0x8b, 0x13, 0xd3, 0x53, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, + 0x84, 0x44, 0xb8, 0x58, 0xcb, 0x12, 0x73, 0x4a, 0x53, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, + 0x20, 0x1c, 0x25, 0x39, 0x2e, 0x1e, 0xb8, 0xba, 0x82, 0x9c, 0x4a, 0x21, 0x3e, 0x2e, 0xa6, 0xfc, + 0x6c, 0xb0, 0x12, 0x8e, 0x20, 0xa6, 0xfc, 0x6c, 0x23, 0x4f, 0x2e, 0x6e, 0x8f, 0xfc, 0xfc, 0xec, + 0x60, 0x88, 0xb1, 0x42, 0x56, 0x5c, 0x2c, 0xc1, 0xa9, 0x79, 0x29, 0x42, 0x12, 0x7a, 0x70, 0x9b, + 0x51, 0xad, 0x91, 0x12, 0xc3, 0x22, 0x53, 0x90, 0x53, 0xa9, 0xc4, 0xe0, 0xa4, 0xc9, 0x25, 0x9c, + 0x9c, 0x9f, 0xab, 0x57, 0x92, 0x99, 0x93, 0x6a, 0x6c, 0x01, 0x57, 0xe5, 0x24, 0x80, 0x64, 0x7e, + 0x00, 0xc8, 0x17, 0x01, 0x8c, 0x49, 0x6c, 0x60, 0xef, 0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, + 0x6d, 0xd0, 0x2b, 0x13, 0xe0, 0x00, 0x00, 0x00, +} diff --git a/hservice/hservice.proto b/hservice/hservice.proto new file mode 100644 index 00000000..5e545d4f --- /dev/null +++ b/hservice/hservice.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "com.tile38.hservice"; +option java_outer_classname = "HookServiceProto"; + +package hservice; + +// The greeting service definition. +service HookService { + // Sends a greeting + rpc Send (MessageRequest) returns (MessageReply) {} +} + +// The request message containing the message value +message MessageRequest { + string value = 1; +} + +// The response message containing an ok (true or false) +message MessageReply { + bool ok = 1; +}