package transport import ( "bytes" "context" "encoding/json" "fmt" "log" "net/http" "strings" "sync" "time" "funstatmcp/internal/app" ) type Server struct { app *app.App config app.Config subscribersMu sync.Mutex subscribers map[int]chan []byte nextID int } func NewServer(appInstance *app.App, cfg app.Config) *Server { return &Server{ app: appInstance, config: cfg, subscribers: make(map[int]chan []byte), } } func (s *Server) Run(ctx context.Context) error { mux := http.NewServeMux() mux.HandleFunc("/sse", s.handleSSE) mux.HandleFunc("/messages", s.handleMessages) mux.HandleFunc("/health", s.handleHealth) server := &http.Server{ Addr: fmt.Sprintf("%s:%d", s.config.Host, s.config.Port), Handler: s.corsMiddleware(mux), } go func() { <-ctx.Done() shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := server.Shutdown(shutdownCtx); err != nil { log.Printf("HTTP server shutdown error: %v", err) } }() log.Printf("Funstat MCP Go server listening on http://%s:%d", s.config.Host, s.config.Port) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { return err } return nil } func (s *Server) corsMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, X-MCP-Session-ID") w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS") if r.Method == http.MethodOptions { w.WriteHeader(http.StatusNoContent) return } next.ServeHTTP(w, r) }) } func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(map[string]any{ "status": "ok", "server": "funstat-mcp-go", }) } func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.WriteHeader(http.StatusMethodNotAllowed) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming unsupported", http.StatusInternalServerError) return } subscriber := make(chan []byte, 16) id := s.addSubscriber(subscriber) defer s.removeSubscriber(id) heartbeatTicker := time.NewTicker(15 * time.Second) defer heartbeatTicker.Stop() ctx := r.Context() for { select { case <-ctx.Done(): return case <-heartbeatTicker.C: fmt.Fprint(w, ": ping\n\n") flusher.Flush() case data := <-subscriber: fmt.Fprintf(w, "event: message\n") fmt.Fprintf(w, "data: %s\n\n", data) flusher.Flush() } } } func (s *Server) addSubscriber(ch chan []byte) int { s.subscribersMu.Lock() defer s.subscribersMu.Unlock() id := s.nextID s.nextID++ s.subscribers[id] = ch return id } func (s *Server) removeSubscriber(id int) { s.subscribersMu.Lock() defer s.subscribersMu.Unlock() delete(s.subscribers, id) } func (s *Server) broadcast(payload []byte) { s.subscribersMu.Lock() defer s.subscribersMu.Unlock() for _, ch := range s.subscribers { select { case ch <- payload: default: } } } type jsonRPCRequest struct { JSONRPC string `json:"jsonrpc"` ID any `json:"id"` Method string `json:"method"` Params json.RawMessage `json:"params"` } type jsonRPCResponse struct { JSONRPC string `json:"jsonrpc"` ID any `json:"id"` Result any `json:"result,omitempty"` Error *jsonRPCError `json:"error,omitempty"` } type jsonRPCError struct { Code int `json:"code"` Message string `json:"message"` } func (s *Server) handleMessages(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } var request jsonRPCRequest if err := json.NewDecoder(r.Body).Decode(&request); err != nil { writeError(w, http.StatusBadRequest, fmt.Errorf("invalid JSON: %w", err)) return } response := s.handleRequest(r.Context(), request) var buffer bytes.Buffer if err := json.NewEncoder(&buffer).Encode(response); err != nil { writeError(w, http.StatusInternalServerError, err) return } payload := bytes.TrimSpace(buffer.Bytes()) s.broadcast(payload) w.Header().Set("Content-Type", "application/json") w.Write(payload) } func (s *Server) handleRequest(ctx context.Context, req jsonRPCRequest) jsonRPCResponse { response := jsonRPCResponse{ JSONRPC: "2.0", ID: req.ID, } if strings.TrimSpace(req.JSONRPC) != "2.0" { response.Error = &jsonRPCError{Code: -32600, Message: "invalid jsonrpc version"} return response } switch req.Method { case "initialize": response.Result = s.initializeResult() case "list_tools": response.Result = map[string]any{ "tools": app.ToolDefinitions(), } case "call_tool": var payload struct { Name string `json:"name"` Arguments map[string]any `json:"arguments"` } if err := json.Unmarshal(req.Params, &payload); err != nil { response.Error = &jsonRPCError{Code: -32602, Message: "invalid params"} return response } if payload.Arguments == nil { payload.Arguments = make(map[string]any) } result, err := s.app.CallTool(ctx, payload.Name, payload.Arguments) if err != nil { response.Result = map[string]any{ "content": []map[string]string{ { "type": "text", "text": fmt.Sprintf("❌ 错误: %s", err.Error()), }, }, } return response } response.Result = map[string]any{ "content": []map[string]string{ { "type": "text", "text": result, }, }, } default: response.Error = &jsonRPCError{Code: -32601, Message: "method not found"} } return response } func (s *Server) initializeResult() map[string]any { return map[string]any{ "protocolVersion": "2025-03-26", "capabilities": map[string]any{ "tools": map[string]any{}, }, "serverInfo": map[string]any{ "name": "funstat-mcp-go", "version": "1.0.0", }, } } func writeError(w http.ResponseWriter, status int, err error) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(map[string]any{ "error": err.Error(), }) }