package mcp import ( "context" "encoding/json" "gogs.dmsc.dev/arp/auth" "gogs.dmsc.dev/arp/mcp/tools" ) // handleToolsList returns the list of available tools func (s *Server) handleToolsList(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse { toolList := []Tool{ { Name: "introspect", Description: "Get GraphQL schema information - types, fields, queries, mutations. Use this to discover the API structure before making queries or mutations.", InputSchema: InputSchema{ Type: "object", AdditionalProperties: false, Properties: map[string]Property{ "typeName": { Type: "string", Description: "Optional - specific type to introspect (e.g., 'Query', 'Mutation', 'User', 'Task'). If omitted, returns full schema overview.", }, }, }, }, { Name: "query", Description: "Execute GraphQL queries (read operations). Use for fetching data from the API. The query must be a valid GraphQL query string.", InputSchema: InputSchema{ Type: "object", AdditionalProperties: false, Properties: map[string]Property{ "query": { Type: "string", Description: "GraphQL query string (e.g., 'query { users { id email } }')", }, "variables": { Type: "object", Description: "Optional query variables as key-value pairs", }, }, Required: []string{"query"}, }, }, { Name: "mutate", Description: "Execute GraphQL mutations (create/update/delete operations). Use for modifying data in the API. The mutation must be a valid GraphQL mutation string.", InputSchema: InputSchema{ Type: "object", AdditionalProperties: false, Properties: map[string]Property{ "mutation": { Type: "string", Description: "GraphQL mutation string (e.g., 'mutation { createUser(input: {email: \"test@example.com\", password: \"pass\", roles: []}) { id } }')", }, "variables": { Type: "object", Description: "Optional mutation variables as key-value pairs", }, }, Required: []string{"mutation"}, }, }, } return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Result: ListToolsResult{Tools: toolList}, } } // handleToolsCall executes a tool call func (s *Server) handleToolsCall(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse { var params CallToolParams if req.Params != nil { if err := json.Unmarshal(req.Params, ¶ms); err != nil { return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Error: ErrInvalidParams, } } } var result tools.CallToolResult var err error switch params.Name { case "introspect": result, err = tools.Introspect(ctx, s.schema, params.Arguments) case "query": result, err = tools.Query(ctx, s.resolver, s.schema, params.Arguments) case "mutate": result, err = tools.Mutate(ctx, s.resolver, s.schema, params.Arguments) default: return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Error: ErrMethodNotFound, } } if err != nil { return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Result: tools.CallToolResult{ Content: []tools.ContentBlock{ {Type: "text", Text: err.Error()}, }, IsError: true, }, } } return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Result: result, } } // handleResourcesList returns the list of available subscription resources func (s *Server) handleResourcesList(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse { resources := []Resource{ { URI: "graphql://subscription/taskCreated", Name: "taskCreated", Description: "Subscribe to task creation events. Receives Task objects when new tasks are created and assigned to you.", MimeType: "application/json", }, { URI: "graphql://subscription/taskUpdated", Name: "taskUpdated", Description: "Subscribe to task update events. Receives Task objects when tasks assigned to you are updated.", MimeType: "application/json", }, { URI: "graphql://subscription/taskDeleted", Name: "taskDeleted", Description: "Subscribe to task deletion events. Receives Task objects when tasks assigned to you are deleted.", MimeType: "application/json", }, { URI: "graphql://subscription/messageAdded", Name: "messageAdded", Description: "Subscribe to new message events. Receives Message objects when messages are sent to you.", MimeType: "application/json", }, } return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Result: ListResourcesResult{Resources: resources}, } } // handleResourcesRead returns current state of a resource (for subscriptions, this is a description) func (s *Server) handleResourcesRead(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse { var params ReadResourceParams if req.Params != nil { if err := json.Unmarshal(req.Params, ¶ms); err != nil { return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Error: ErrInvalidParams, } } } // For subscriptions, reading returns a description description := "This is a subscription resource. Use resources/subscribe to receive real-time updates." return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Result: ReadResourceResult{ Contents: []ResourceContents{ { URI: params.URI, MimeType: "text/plain", Text: description, }, }, }, } } // handleResourcesSubscribe starts a subscription for real-time updates func (s *Server) handleResourcesSubscribe(ctx context.Context, req *JSONRPCRequest, session *Session) *JSONRPCResponse { var params SubscribeParams if req.Params != nil { if err := json.Unmarshal(req.Params, ¶ms); err != nil { return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Error: ErrInvalidParams, } } } // Check authentication user, err := auth.CurrentUser(ctx) if err != nil { return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Error: &RPCError{Code: -32603, Message: "Authentication required for subscriptions"}, } } // Create cancellable context for this subscription subCtx, cancel := context.WithCancel(context.Background()) // Store the cancel function session.SubsMu.Lock() session.Subscriptions[params.URI] = cancel session.SubsMu.Unlock() // Start the subscription based on URI go s.runSubscription(subCtx, params.URI, user.ID, session) return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Result: map[string]interface{}{"subscribed": true, "uri": params.URI}, } } // handleResourcesUnsubscribe stops a subscription func (s *Server) handleResourcesUnsubscribe(ctx context.Context, req *JSONRPCRequest, session *Session) *JSONRPCResponse { var params UnsubscribeParams if req.Params != nil { if err := json.Unmarshal(req.Params, ¶ms); err != nil { return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Error: ErrInvalidParams, } } } session.SubsMu.Lock() if cancel, ok := session.Subscriptions[params.URI]; ok { cancel() delete(session.Subscriptions, params.URI) } session.SubsMu.Unlock() return &JSONRPCResponse{ JSONRPC: "2.0", ID: req.ID, Result: map[string]interface{}{"unsubscribed": true, "uri": params.URI}, } } // runSubscription handles the actual subscription event streaming func (s *Server) runSubscription(ctx context.Context, uri string, userID uint, session *Session) { switch uri { case "graphql://subscription/taskCreated": s.streamTaskEvents(ctx, userID, session, "created") case "graphql://subscription/taskUpdated": s.streamTaskEvents(ctx, userID, session, "updated") case "graphql://subscription/taskDeleted": s.streamTaskEvents(ctx, userID, session, "deleted") case "graphql://subscription/messageAdded": s.streamMessageEvents(ctx, userID, session) } } // streamTaskEvents streams task events to the session func (s *Server) streamTaskEvents(ctx context.Context, userID uint, session *Session, eventType string) { eventChan := s.resolver.SubscribeToTasks(userID) for { select { case <-ctx.Done(): return case <-session.Done: return case event, ok := <-eventChan: if !ok { return } if event.EventType == eventType && event.Task != nil { notification := CreateResourceNotification( "graphql://subscription/task"+capitalize(eventType), event.Task, ) s.sendNotification(session, notification) } } } } // streamMessageEvents streams message events to the session func (s *Server) streamMessageEvents(ctx context.Context, userID uint, session *Session) { eventChan := s.resolver.SubscribeToMessages(userID) for { select { case <-ctx.Done(): return case <-session.Done: return case event, ok := <-eventChan: if !ok { return } // Check if user is a receiver isReceiver := false for _, receiverID := range event.ReceiverIDs { if receiverID == userID { isReceiver = true break } } if isReceiver && event.Message != nil { notification := CreateResourceNotification( "graphql://subscription/messageAdded", event.Message, ) s.sendNotification(session, notification) } } } } // sendNotification sends a JSON-RPC notification to the session func (s *Server) sendNotification(session *Session, notification *JSONRPCNotification) { notifBytes, err := json.Marshal(notification) if err != nil { return } select { case session.Events <- notifBytes: default: // Channel full, skip } } // capitalize helper func capitalize(s string) string { if len(s) == 0 { return s } return string(s[0]-32) + s[1:] }