/* * Copyright 2026 Safronov Grigorii * * Licensed under the CDDL, Version 1.0 (the "License"); * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at * https://opensource.org/licenses/CDDL-1.0 */ // Файл: internal/api/http.go // Назначение: HTTP RESTful API для взаимодействия с СУБД через curl. // Поддерживает CRUD операции, управление индексами, ACL и ограничениями. // Реализован с минимальными блокировками, использует wait-free структуры. package api import ( "encoding/json" "fmt" "net/http" "strconv" "strings" "futriis/internal/acl" "futriis/internal/cluster" "futriis/internal/log" "futriis/internal/storage" ) type HTTPServer struct { store *storage.Storage coordinator *cluster.RaftCoordinator aclManager *acl.ACLManager logger *log.Logger server *http.Server port int } type APIResponse struct { Success bool `json:"success"` Data interface{} `json:"data,omitempty"` Error string `json:"error,omitempty"` } // NewHTTPServer создаёт новый HTTP сервер (добавлен CORS middleware) func NewHTTPServer(port int, store *storage.Storage, coord *cluster.RaftCoordinator, aclMgr *acl.ACLManager, logger *log.Logger) *HTTPServer { s := &HTTPServer{ store: store, coordinator: coord, aclManager: aclMgr, logger: logger, port: port, } mux := http.NewServeMux() // CORS middleware wrapper corsHandler := func(handler http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, X-Session-ID") if r.Method == http.MethodOptions { w.WriteHeader(http.StatusOK) return } handler(w, r) } } // Middleware для аутентификации mux.HandleFunc("/api/auth/login", corsHandler(s.handleLogin)) mux.HandleFunc("/api/auth/logout", corsHandler(s.handleLogout)) // CRUD операции mux.HandleFunc("/api/db/", corsHandler(s.handleDatabaseRequest)) // Индексы mux.HandleFunc("/api/index/", corsHandler(s.handleIndexRequest)) // ACL mux.HandleFunc("/api/acl/", corsHandler(s.handleACLRequest)) // Constraints mux.HandleFunc("/api/constraint/", corsHandler(s.handleConstraintRequest)) // Cluster mux.HandleFunc("/api/cluster/", corsHandler(s.handleClusterRequest)) s.server = &http.Server{ Addr: fmt.Sprintf(":%d", port), Handler: mux, } return s } // Start запускает HTTP сервер func (s *HTTPServer) Start() error { s.logger.Info("Starting HTTP API server on port " + strconv.Itoa(s.port)) return s.server.ListenAndServe() } // Stop останавливает HTTP сервер func (s *HTTPServer) Stop() error { return s.server.Close() } // handleLogin обрабатывает аутентификацию func (s *HTTPServer) handleLogin(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { s.sendError(w, "Method not allowed", http.StatusMethodNotAllowed) return } var creds struct { Username string `json:"username"` Password string `json:"password"` } if err := json.NewDecoder(r.Body).Decode(&creds); err != nil { s.sendError(w, "Invalid request body", http.StatusBadRequest) return } sessionID, err := s.aclManager.Authenticate(creds.Username, creds.Password) if err != nil { s.sendError(w, err.Error(), http.StatusUnauthorized) return } s.sendSuccess(w, map[string]string{"session_id": sessionID}) } // handleLogout обрабатывает выход func (s *HTTPServer) handleLogout(w http.ResponseWriter, r *http.Request) { sessionID := r.Header.Get("X-Session-ID") if sessionID != "" { s.aclManager.Logout(sessionID) } s.sendSuccess(w, map[string]string{"status": "logged out"}) } // handleDatabaseRequest обрабатывает запросы к БД func (s *HTTPServer) handleDatabaseRequest(w http.ResponseWriter, r *http.Request) { // URL: /api/db/{database}/{collection}/{document_id} path := strings.TrimPrefix(r.URL.Path, "/api/db/") parts := strings.Split(path, "/") if len(parts) < 2 { s.sendError(w, "Invalid path. Use /api/db/{database}/{collection}[/{id}]", http.StatusBadRequest) return } database := parts[0] collection := parts[1] docID := "" if len(parts) > 2 { docID = parts[2] } // Проверка аутентификации sessionID := r.Header.Get("X-Session-ID") if sessionID == "" { s.sendError(w, "Authentication required", http.StatusUnauthorized) return } switch r.Method { case http.MethodGet: s.handleGetDocument(w, r, sessionID, database, collection, docID) case http.MethodPost: s.handleInsertDocument(w, r, sessionID, database, collection) case http.MethodPut: s.handleUpdateDocument(w, r, sessionID, database, collection, docID) case http.MethodDelete: s.handleDeleteDocument(w, r, sessionID, database, collection, docID) default: s.sendError(w, "Method not allowed", http.StatusMethodNotAllowed) } } // handleGetDocument обрабатывает GET запросы func (s *HTTPServer) handleGetDocument(w http.ResponseWriter, r *http.Request, sessionID, database, collection, docID string) { // Проверка прав if !s.aclManager.CheckPermission(sessionID, database, collection, "read") { s.sendError(w, "Access denied", http.StatusForbidden) return } db, err := s.store.GetDatabase(database) if err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } coll, err := db.GetCollection(collection) if err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } // Поиск по индексу или ID query := r.URL.Query() if indexName := query.Get("index"); indexName != "" { indexValue := query.Get("value") docs, err := coll.FindByIndex(indexName, indexValue) if err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } s.sendSuccess(w, docs) return } if docID == "" { // Возвращаем все документы (с пагинацией) limit := 100 if limitStr := query.Get("limit"); limitStr != "" { if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 { limit = l } } offset := 0 if offsetStr := query.Get("offset"); offsetStr != "" { if o, err := strconv.Atoi(offsetStr); err == nil && o >= 0 { offset = o } } allDocs := coll.GetAllDocuments() start := offset end := offset + limit if start > len(allDocs) { start = len(allDocs) } if end > len(allDocs) { end = len(allDocs) } result := allDocs[start:end] s.sendSuccess(w, map[string]interface{}{ "documents": result, "total": len(allDocs), "limit": limit, "offset": offset, }) return } doc, err := coll.Find(docID) if err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } s.sendSuccess(w, doc) } // handleInsertDocument обрабатывает POST запросы func (s *HTTPServer) handleInsertDocument(w http.ResponseWriter, r *http.Request, sessionID, database, collection string) { if !s.aclManager.CheckPermission(sessionID, database, collection, "write") { s.sendError(w, "Access denied", http.StatusForbidden) return } db, err := s.store.GetDatabase(database) if err != nil { // Создаём БД если не существует if err := s.store.CreateDatabase(database); err != nil { s.sendError(w, err.Error(), http.StatusInternalServerError) return } db, _ = s.store.GetDatabase(database) } coll, err := db.GetCollection(collection) if err != nil { if err := db.CreateCollection(collection); err != nil { s.sendError(w, err.Error(), http.StatusInternalServerError) return } coll, _ = db.GetCollection(collection) } var doc map[string]interface{} if err := json.NewDecoder(r.Body).Decode(&doc); err != nil { s.sendError(w, "Invalid JSON", http.StatusBadRequest) return } if err := coll.InsertFromMap(doc); err != nil { s.sendError(w, err.Error(), http.StatusBadRequest) return } s.sendSuccess(w, map[string]string{"status": "inserted"}) } // handleUpdateDocument обрабатывает PUT запросы func (s *HTTPServer) handleUpdateDocument(w http.ResponseWriter, r *http.Request, sessionID, database, collection, docID string) { if docID == "" { s.sendError(w, "Document ID required", http.StatusBadRequest) return } if !s.aclManager.CheckPermission(sessionID, database, collection, "write") { s.sendError(w, "Access denied", http.StatusForbidden) return } db, err := s.store.GetDatabase(database) if err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } coll, err := db.GetCollection(collection) if err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } var updates map[string]interface{} if err := json.NewDecoder(r.Body).Decode(&updates); err != nil { s.sendError(w, "Invalid JSON", http.StatusBadRequest) return } if err := coll.Update(docID, updates); err != nil { s.sendError(w, err.Error(), http.StatusBadRequest) return } s.sendSuccess(w, map[string]string{"status": "updated"}) } // handleDeleteDocument обрабатывает DELETE запросы func (s *HTTPServer) handleDeleteDocument(w http.ResponseWriter, r *http.Request, sessionID, database, collection, docID string) { if docID == "" { s.sendError(w, "Document ID required", http.StatusBadRequest) return } if !s.aclManager.CheckPermission(sessionID, database, collection, "delete") { s.sendError(w, "Access denied", http.StatusForbidden) return } db, err := s.store.GetDatabase(database) if err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } coll, err := db.GetCollection(collection) if err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } if err := coll.Delete(docID); err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } s.sendSuccess(w, map[string]string{"status": "deleted"}) } // handleIndexRequest обрабатывает запросы к индексам func (s *HTTPServer) handleIndexRequest(w http.ResponseWriter, r *http.Request) { // URL: /api/index/{database}/{collection}/{action} path := strings.TrimPrefix(r.URL.Path, "/api/index/") parts := strings.Split(path, "/") if len(parts) < 3 { s.sendError(w, "Invalid path. Use /api/index/{database}/{collection}/{action}", http.StatusBadRequest) return } database := parts[0] collection := parts[1] action := parts[2] sessionID := r.Header.Get("X-Session-ID") if !s.aclManager.CheckPermission(sessionID, database, collection, "admin") { s.sendError(w, "Admin access required", http.StatusForbidden) return } db, err := s.store.GetDatabase(database) if err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } coll, err := db.GetCollection(collection) if err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } switch action { case "list": indexes := coll.GetIndexes() s.sendSuccess(w, indexes) case "create": var req struct { Name string `json:"name"` Fields []string `json:"fields"` Unique bool `json:"unique"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.sendError(w, "Invalid request body", http.StatusBadRequest) return } if err := coll.CreateIndex(req.Name, req.Fields, req.Unique); err != nil { s.sendError(w, err.Error(), http.StatusBadRequest) return } s.sendSuccess(w, map[string]string{"status": "index created"}) case "drop": if len(parts) < 4 { s.sendError(w, "Index name required", http.StatusBadRequest) return } indexName := parts[3] if err := coll.DropIndex(indexName); err != nil { s.sendError(w, err.Error(), http.StatusBadRequest) return } s.sendSuccess(w, map[string]string{"status": "index dropped"}) default: s.sendError(w, "Unknown action", http.StatusBadRequest) } } // handleACLRequest обрабатывает запросы ACL func (s *HTTPServer) handleACLRequest(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/api/acl/") parts := strings.Split(path, "/") if len(parts) < 1 { s.sendError(w, "Invalid path", http.StatusBadRequest) return } sessionID := r.Header.Get("X-Session-ID") if !s.aclManager.CheckPermission(sessionID, "*", "*", "admin") { s.sendError(w, "Admin access required", http.StatusForbidden) return } action := parts[0] switch action { case "users": users := s.aclManager.ListUsers() s.sendSuccess(w, users) case "user": if len(parts) < 2 { s.sendError(w, "Username required", http.StatusBadRequest) return } username := parts[1] switch r.Method { case http.MethodPost: var req struct { Password string `json:"password"` Roles []string `json:"roles"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.sendError(w, "Invalid request body", http.StatusBadRequest) return } if err := s.aclManager.CreateUser(username, req.Password, req.Roles); err != nil { s.sendError(w, err.Error(), http.StatusBadRequest) return } s.sendSuccess(w, map[string]string{"status": "user created"}) default: s.sendError(w, "Method not allowed", http.StatusMethodNotAllowed) } case "roles": roles := s.aclManager.ListRoles() s.sendSuccess(w, roles) case "grant": if len(parts) < 3 { s.sendError(w, "Role and permission required", http.StatusBadRequest) return } roleName := parts[1] permission := parts[2] if err := s.aclManager.GrantPermission(roleName, permission); err != nil { s.sendError(w, err.Error(), http.StatusBadRequest) return } s.sendSuccess(w, map[string]string{"status": "permission granted"}) default: s.sendError(w, "Unknown action", http.StatusBadRequest) } } // handleConstraintRequest обрабатывает запросы к ограничениям func (s *HTTPServer) handleConstraintRequest(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/api/constraint/") parts := strings.Split(path, "/") if len(parts) < 3 { s.sendError(w, "Invalid path. Use /api/constraint/{database}/{collection}/{action}", http.StatusBadRequest) return } database := parts[0] collection := parts[1] action := parts[2] sessionID := r.Header.Get("X-Session-ID") if !s.aclManager.CheckPermission(sessionID, database, collection, "admin") { s.sendError(w, "Admin access required", http.StatusForbidden) return } db, err := s.store.GetDatabase(database) if err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } coll, err := db.GetCollection(collection) if err != nil { s.sendError(w, err.Error(), http.StatusNotFound) return } switch action { case "required": if len(parts) < 4 { s.sendError(w, "Field name required", http.StatusBadRequest) return } field := parts[3] coll.AddRequiredField(field) s.sendSuccess(w, map[string]string{"status": "required field added"}) case "unique": if len(parts) < 4 { s.sendError(w, "Field name required", http.StatusBadRequest) return } field := parts[3] coll.AddUniqueConstraint(field) s.sendSuccess(w, map[string]string{"status": "unique constraint added"}) case "min": if len(parts) < 5 { s.sendError(w, "Field name and value required", http.StatusBadRequest) return } field := parts[3] minVal, _ := strconv.ParseFloat(parts[4], 64) coll.AddMinConstraint(field, minVal) s.sendSuccess(w, map[string]string{"status": "min constraint added"}) case "max": if len(parts) < 5 { s.sendError(w, "Field name and value required", http.StatusBadRequest) return } field := parts[3] maxVal, _ := strconv.ParseFloat(parts[4], 64) coll.AddMaxConstraint(field, maxVal) s.sendSuccess(w, map[string]string{"status": "max constraint added"}) default: s.sendError(w, "Unknown action", http.StatusBadRequest) } } // handleClusterRequest обрабатывает запросы к кластеру (исправлено: поддержка разных методов) func (s *HTTPServer) handleClusterRequest(w http.ResponseWriter, r *http.Request) { sessionID := r.Header.Get("X-Session-ID") if !s.aclManager.CheckPermission(sessionID, "*", "*", "admin") { s.sendError(w, "Admin access required", http.StatusForbidden) return } if s.coordinator == nil { s.sendError(w, "Cluster not available", http.StatusServiceUnavailable) return } path := strings.TrimPrefix(r.URL.Path, "/api/cluster/") parts := strings.Split(path, "/") switch r.Method { case http.MethodGet: if len(parts) == 0 || parts[0] == "" || parts[0] == "status" { status := s.coordinator.GetClusterStatus() s.sendSuccess(w, status) } else if parts[0] == "health" { health := s.coordinator.GetClusterHealth() s.sendSuccess(w, health) } else if parts[0] == "nodes" { nodes := s.coordinator.GetAllNodes() s.sendSuccess(w, nodes) } else { s.sendError(w, "Unknown cluster endpoint", http.StatusNotFound) } case http.MethodPost: if len(parts) >= 2 && parts[0] == "replication" && parts[1] == "factor" { var req struct { Factor int `json:"factor"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.sendError(w, "Invalid request body", http.StatusBadRequest) return } if req.Factor < 1 || req.Factor > 5 { s.sendError(w, "Replication factor must be between 1 and 5", http.StatusBadRequest) return } if err := s.coordinator.SetReplicationFactor(req.Factor); err != nil { s.sendError(w, err.Error(), http.StatusBadRequest) return } s.sendSuccess(w, map[string]interface{}{ "status": "updated", "factor": req.Factor, }) } else { s.sendError(w, "Unknown cluster endpoint", http.StatusNotFound) } default: s.sendError(w, "Method not allowed", http.StatusMethodNotAllowed) } } // sendSuccess отправляет успешный ответ func (s *HTTPServer) sendSuccess(w http.ResponseWriter, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(APIResponse{ Success: true, Data: data, }) } // sendError отправляет ответ с ошибкой func (s *HTTPServer) sendError(w http.ResponseWriter, errMsg string, statusCode int) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) json.NewEncoder(w).Encode(APIResponse{ Success: false, Error: errMsg, }) }