From 6eb21b890c9a93ae5e13f0324a8907c69492fb1b Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Sat, 2 Apr 2016 14:13:20 -0700 Subject: [PATCH] fixed shared string issue --- controller/aof.go | 4 ++- controller/disque.go | 59 +++++++++++++++++++++++++++++++++++++++++++ controller/fence.go | 36 +++++++++++++++++--------- controller/hooks.go | 50 +++--------------------------------- controller/http.go | 43 +++++++++++++++++++++++++++++++ controller/live.go | 3 +-- controller/scan.go | 6 ++--- controller/scanner.go | 8 +++--- controller/search.go | 6 ++--- 9 files changed, 145 insertions(+), 70 deletions(-) create mode 100644 controller/disque.go create mode 100644 controller/http.go diff --git a/controller/aof.go b/controller/aof.go index a878b7b8..d57e9c9e 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -94,7 +94,9 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { } if c.config.FollowHost == "" { // process hooks, for leader only - return c.processHooks(d) + if err := c.processHooks(d); err != nil { + return err + } } } data, err := value.MarshalRESP() diff --git a/controller/disque.go b/controller/disque.go new file mode 100644 index 00000000..3964407e --- /dev/null +++ b/controller/disque.go @@ -0,0 +1,59 @@ +package controller + +import ( + "errors" + "fmt" + "strings" + "sync" + "time" +) + +// TODO: add one connection pool per endpoint. Use Redigo. +// The current implementation is too slow. +var endpointDisqueMu sync.Mutex + +type endpointDisqueConn struct { + mu sync.Mutex +} + +var endpointDisqueM = make(map[string]*endpointDisqueConn) + +func sendDisqueMessage(endpoint Endpoint, msg []byte) error { + endpointDisqueMu.Lock() + conn, ok := endpointDisqueM[endpoint.Original] + if !ok { + conn = &endpointDisqueConn{ + //client: &http.Client{Transport: &http.Transport{}}, + } + endpointDisqueM[endpoint.Original] = conn + } + endpointDisqueMu.Unlock() + conn.mu.Lock() + defer conn.mu.Unlock() + + addr := fmt.Sprintf("%s:%d", endpoint.Disque.Host, endpoint.Disque.Port) + dconn, err := DialTimeout(addr, time.Second/4) + if err != nil { + return err + } + defer dconn.Close() + options := []interface{}{endpoint.Disque.QueueName, msg, 0} + replicate := endpoint.Disque.Options.Replicate + if replicate > 0 { + options = append(options, "REPLICATE") + options = append(options, endpoint.Disque.Options.Replicate) + } + v, err := dconn.Do("ADDJOB", options...) + if err != nil { + return err + } + if v.Error() != nil { + return v.Error() + } + id := v.String() + p := strings.Split(id, "-") + if len(p) != 4 { + return errors.New("invalid disque reply") + } + return nil +} diff --git a/controller/fence.go b/controller/fence.go index 6b5caac9..6cbc55ed 100644 --- a/controller/fence.go +++ b/controller/fence.go @@ -10,9 +10,11 @@ import ( var tmfmt = "2006-01-02T15:04:05.999999999Z07:00" func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT) []string { + jshookName := jsonString(hookName) + jstime := jsonString(details.timestamp.Format(tmfmt)) glob := fence.glob if details.command == "drop" { - return []string{`{"cmd":"drop","time":` + details.timestamp.Format(tmfmt) + `}`} + return []string{`{"cmd":"drop","hook":` + jshookName + `,"time":` + jstime + `}`} } match := true if glob != "" && glob != "*" { @@ -22,7 +24,11 @@ func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai return nil } - if details.obj == nil || (details.command == "fset" && sw.nofields) { + sw.mu.Lock() + nofields := sw.nofields + sw.mu.Unlock() + + if details.obj == nil || (details.command == "fset" && nofields) { return nil } match = false @@ -72,45 +78,51 @@ func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai } } if details.command == "del" { - return []string{`{"command":"del","id":` + jsonString(details.id) + `,"time":` + details.timestamp.Format(tmfmt) + `}`} + return []string{`{"command":"del","hook":` + jshookName + `,"id":` + jsonString(details.id) + `,"time":` + jstime + `}`} } if details.fmap == nil { return nil } + sw.mu.Lock() sw.fmap = details.fmap sw.fullFields = true sw.msg.OutputType = server.JSON - sw.writeObject(details.id, details.obj, details.fields) + sw.writeObject(details.id, details.obj, details.fields, true) if sw.wr.Len() == 0 { + sw.mu.Unlock() return nil } res := sw.wr.String() + resb := make([]byte, len(res)) + copy(resb, res) + res = string(resb) sw.wr.Reset() - if strings.HasPrefix(res, ",") { - res = res[1:] - } if sw.output == outputIDs { res = `{"id":` + res + `}` } - jskey := jsonString(details.key) + sw.mu.Unlock() - jshookName := jsonString(hookName) + if strings.HasPrefix(res, ",") { + res = res[1:] + } + + jskey := jsonString(details.key) ores := res msgs := make([]string, 0, 2) if fence.detect == nil || fence.detect[detect] { if strings.HasPrefix(ores, "{") { - res = `{"command":"` + details.command + `","detect":"` + detect + `","hook":` + jshookName + `,"time":"` + details.timestamp.Format(tmfmt) + `","key":` + jskey + `,` + ores[1:] + res = `{"command":"` + details.command + `","detect":"` + detect + `","hook":` + jshookName + `,"key":` + jskey + `,"time":` + jstime + `,` + ores[1:] } msgs = append(msgs, res) } switch detect { case "enter": if fence.detect == nil || fence.detect["inside"] { - msgs = append(msgs, `{"command":"`+details.command+`","detect":"inside","hook":`+jshookName+`,"time":"`+details.timestamp.Format(tmfmt)+`","key":`+jskey+`,`+ores[1:]) + msgs = append(msgs, `{"command":"`+details.command+`","detect":"inside","hook":`+jshookName+`,"key":`+jskey+`,"time":`+jstime+`,`+ores[1:]) } case "exit", "cross": if fence.detect == nil || fence.detect["outside"] { - msgs = append(msgs, `{"command":"`+details.command+`","detect":"outside","hook":`+jshookName+`,"time":"`+details.timestamp.Format(tmfmt)+`","key":`+jskey+`,`+ores[1:]) + msgs = append(msgs, `{"command":"`+details.command+`","detect":"outside","hook":`+jshookName+`,"key":`+jskey+`,"time":`+jstime+`,`+ores[1:]) } } return msgs diff --git a/controller/hooks.go b/controller/hooks.go index 41c29484..227787a3 100644 --- a/controller/hooks.go +++ b/controller/hooks.go @@ -3,8 +3,6 @@ package controller import ( "bytes" "errors" - "fmt" - "net/http" "net/url" "sort" "strconv" @@ -69,16 +67,16 @@ nextMessage: } } if len(lerrs) == 0 { + // log.Notice("YAY") return nil } var errmsgs []string for _, err := range lerrs { errmsgs = append(errmsgs, err.Error()) } - if len(errmsgs) > 0 { - return errors.New("not sent: " + strings.Join(errmsgs, ",")) - } - return errors.New("not sent") + err := errors.New("not sent: " + strings.Join(errmsgs, ",")) + log.Error(err) + return err } type hooksByName []*Hook @@ -387,43 +385,3 @@ func (c *Controller) cmdHooks(msg *server.Message) (res string, err error) { } return "", nil } - -func sendHTTPMessage(endpoint Endpoint, msg []byte) error { - resp, err := http.Post(endpoint.Original, "application/json", bytes.NewBuffer(msg)) - if err != nil { - return err - } - defer resp.Body.Close() - if resp.StatusCode != 200 { - return fmt.Errorf("enpoint returned status code %d", resp.StatusCode) - } - return nil -} - -func sendDisqueMessage(endpoint Endpoint, msg []byte) error { - addr := fmt.Sprintf("%s:%d", endpoint.Disque.Host, endpoint.Disque.Port) - conn, err := DialTimeout(addr, time.Second/4) - if err != nil { - return err - } - defer conn.Close() - options := []interface{}{endpoint.Disque.QueueName, msg, 0} - replicate := endpoint.Disque.Options.Replicate - if replicate > 0 { - options = append(options, "REPLICATE") - options = append(options, endpoint.Disque.Options.Replicate) - } - v, err := conn.Do("ADDJOB", options...) - if err != nil { - return err - } - if v.Error() != nil { - return v.Error() - } - id := v.String() - p := strings.Split(id, "-") - if len(p) != 4 { - return errors.New("invalid disque reply") - } - return nil -} diff --git a/controller/http.go b/controller/http.go new file mode 100644 index 00000000..49f0c84a --- /dev/null +++ b/controller/http.go @@ -0,0 +1,43 @@ +package controller + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "net/http" + "sync" +) + +var endpointHTTPMu sync.Mutex + +type endpointHTTPConn struct { + mu sync.Mutex + client *http.Client +} + +var endpointHTTPM = make(map[string]*endpointHTTPConn) + +func sendHTTPMessage(endpoint Endpoint, msg []byte) error { + endpointHTTPMu.Lock() + conn, ok := endpointHTTPM[endpoint.Original] + if !ok { + conn = &endpointHTTPConn{ + client: &http.Client{Transport: &http.Transport{}}, + } + endpointHTTPM[endpoint.Original] = conn + } + endpointHTTPMu.Unlock() + conn.mu.Lock() + defer conn.mu.Unlock() + res, err := conn.client.Post(endpoint.Original, "application/json", bytes.NewBuffer(msg)) + if err != nil { + return err + } + io.Copy(ioutil.Discard, res.Body) + res.Body.Close() + if res.StatusCode != 200 { + return fmt.Errorf("endpoint returned status code %d", res.StatusCode) + } + return nil +} diff --git a/controller/live.go b/controller/live.go index b47d90eb..40514072 100644 --- a/controller/live.go +++ b/controller/live.go @@ -8,7 +8,6 @@ import ( "net" "sync" - "github.com/tidwall/tile38/client" "github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/controller/server" ) @@ -139,7 +138,7 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *server.AnyReaderWrit var livemsg []byte switch outputType { case server.JSON: - livemsg = []byte(client.LiveJSON) + livemsg = []byte(`{"ok":true,"live":true}`) case server.RESP: livemsg = []byte("+OK\r\n") } diff --git a/controller/scan.go b/controller/scan.go index 21f9692e..d820bfde 100644 --- a/controller/scan.go +++ b/controller/scan.go @@ -59,16 +59,16 @@ func (c *Controller) cmdScan(msg *server.Message) (res string, err error) { greaterGlob := sw.glob[:len(sw.glob)-1] if globIsGlob(greaterGlob) { s.cursor = sw.col.Scan(s.cursor, func(id string, o geojson.Object, fields []float64) bool { - return sw.writeObject(id, o, fields) + return sw.writeObject(id, o, fields, false) }) } else { s.cursor = sw.col.ScanGreaterOrEqual(sw.glob, s.cursor, func(id string, o geojson.Object, fields []float64) bool { - return sw.writeObject(id, o, fields) + return sw.writeObject(id, o, fields, false) }) } } else { s.cursor = sw.col.Scan(s.cursor, func(id string, o geojson.Object, fields []float64) bool { - return sw.writeObject(id, o, fields) + return sw.writeObject(id, o, fields, false) }) } } diff --git a/controller/scanner.go b/controller/scanner.go index f6095fe7..db57c119 100644 --- a/controller/scanner.go +++ b/controller/scanner.go @@ -220,9 +220,11 @@ func (sw *scanWriter) fieldMatch(fields []float64, o geojson.Object) ([]float64, return sw.fvals, true } -func (sw *scanWriter) writeObject(id string, o geojson.Object, fields []float64) bool { - sw.mu.Lock() - defer sw.mu.Unlock() +func (sw *scanWriter) writeObject(id string, o geojson.Object, fields []float64, noLock bool) bool { + if !noLock { + sw.mu.Lock() + defer sw.mu.Unlock() + } keepGoing := true if !sw.globEverything { if sw.globSingle { diff --git a/controller/search.go b/controller/search.go index abbb70b1..ef6e2a41 100644 --- a/controller/search.go +++ b/controller/search.go @@ -239,7 +239,7 @@ func (c *Controller) cmdNearby(msg *server.Message) (res string, err error) { sw.writeHead() if sw.col != nil { s.cursor = sw.col.Nearby(s.cursor, s.sparse, s.lat, s.lon, s.meters, func(id string, o geojson.Object, fields []float64) bool { - return sw.writeObject(id, o, fields) + return sw.writeObject(id, o, fields, false) }) } sw.writeFoot(s.cursor) @@ -279,13 +279,13 @@ func (c *Controller) cmdWithinOrIntersects(cmd string, msg *server.Message) (res if cmd == "within" { s.cursor = sw.col.Within(s.cursor, s.sparse, s.o, s.minLat, s.minLon, s.maxLat, s.maxLon, func(id string, o geojson.Object, fields []float64) bool { - return sw.writeObject(id, o, fields) + return sw.writeObject(id, o, fields, false) }, ) } else if cmd == "intersects" { s.cursor = sw.col.Intersects(s.cursor, s.sparse, s.o, s.minLat, s.minLon, s.maxLat, s.maxLon, func(id string, o geojson.Object, fields []float64) bool { - return sw.writeObject(id, o, fields) + return sw.writeObject(id, o, fields, false) }, ) }