From 9e552c362975f66c0cff398a8b703e2dabb6a599 Mon Sep 17 00:00:00 2001 From: tidwall Date: Fri, 1 Oct 2021 17:18:07 -0700 Subject: [PATCH] Allow some basic client commands before AOF data loads This commit accepts incoming connections even before the AOF dataset has been loaded into memory. Though only a very limited command set is allowed. Allowed commands: PING, ECHO, OUTPUT, QUIT All other commands will return: LOADING Tile38 is loading the dataset in memory This is useful for establishing connections for the purpose of checking process and network state. --- internal/server/server.go | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/internal/server/server.go b/internal/server/server.go index 0dee910a..f5446530 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -94,6 +94,7 @@ type Server struct { lastShrinkDuration aint stopServer abool outOfMemory abool + loadedAndReady abool // server is loaded and ready for commands connsmu sync.RWMutex conns map[int]*Client @@ -245,6 +246,12 @@ func Serve(opts Options) error { } log.Debugf("Multi indexing: RTree (%d points)", server.geomParseOpts.IndexChildren) + nerr := make(chan error) + go func() { + // Start the server in the background + nerr <- server.netServe() + }() + // Load the queue before the aof qdb, err := buntdb.Open(core.QueueFileName) if err != nil { @@ -288,7 +295,6 @@ func Serve(opts Options) error { server.aof.Sync() }() } - // server.fillExpiresList() // Start background routines if server.config.followHost() != "" { @@ -322,8 +328,9 @@ func Serve(opts Options) error { server.lcond.L.Lock() }() - // Start the network server - return server.netServe() + // Server is now loaded and ready. Wait for network error messages. + server.loadedAndReady.set(true) + return <-nerr } func (server *Server) isProtected() bool { @@ -426,7 +433,6 @@ func (server *Server) netServe() error { rdbuf := bytes.NewBuffer(packet) pr.rd = rdbuf pr.wr = client - msgs, err := pr.ReadMessages() for _, msg := range msgs { // Just closing connection if we have deprecated HTTP or WS connection, @@ -827,12 +833,34 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { if errMsg == errInvalidNumberOfArguments.Error() { return writeOutput("-ERR wrong number of arguments for '" + cmd + "' command\r\n") } - v, _ := resp.ErrorValue(errors.New("ERR " + errMsg)).MarshalRESP() + var ucprefix bool + word := strings.Split(errMsg, " ")[0] + if len(word) > 0 { + ucprefix = true + for i := 0; i < len(word); i++ { + if word[i] < 'A' || word[i] > 'Z' { + ucprefix = false + break + } + } + } + if !ucprefix { + errMsg = "ERR " + errMsg + } + v, _ := resp.ErrorValue(errors.New(errMsg)).MarshalRESP() return writeOutput(string(v)) } return nil } + if !server.loadedAndReady.on() { + switch msg.Command() { + case "output", "ping", "echo": + default: + return writeErr("LOADING Tile38 is loading the dataset in memory") + } + } + if cmd == "timeout" { if err := rewriteTimeoutMsg(msg); err != nil { return writeErr(err.Error())