| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- package mcp
- import (
- "context"
- "encoding/json"
- "gogs.dmsc.dev/arp/auth"
- "gogs.dmsc.dev/arp/mcp/tools"
- )
- // handleToolsList returns the list of available tools
- func (s *Server) handleToolsList(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse {
- toolList := []Tool{
- {
- Name: "introspect",
- Description: "Get GraphQL schema information - types, fields, queries, mutations. Use this to discover the API structure before making queries or mutations.",
- InputSchema: InputSchema{
- Type: "object",
- AdditionalProperties: false,
- Properties: map[string]Property{
- "typeName": {
- Type: "string",
- Description: "Optional - specific type to introspect (e.g., 'Query', 'Mutation', 'User', 'Task'). If omitted, returns full schema overview.",
- },
- },
- },
- },
- {
- Name: "query",
- Description: "Execute GraphQL queries (read operations). Use for fetching data from the API. The query must be a valid GraphQL query string.",
- InputSchema: InputSchema{
- Type: "object",
- AdditionalProperties: false,
- Properties: map[string]Property{
- "query": {
- Type: "string",
- Description: "GraphQL query string (e.g., 'query { users { id email } }')",
- },
- "variables": {
- Type: "object",
- Description: "Optional query variables as key-value pairs",
- },
- },
- Required: []string{"query"},
- },
- },
- {
- Name: "mutate",
- Description: "Execute GraphQL mutations (create/update/delete operations). Use for modifying data in the API. The mutation must be a valid GraphQL mutation string.",
- InputSchema: InputSchema{
- Type: "object",
- AdditionalProperties: false,
- Properties: map[string]Property{
- "mutation": {
- Type: "string",
- Description: "GraphQL mutation string (e.g., 'mutation { createUser(input: {email: \"test@example.com\", password: \"pass\", roles: []}) { id } }')",
- },
- "variables": {
- Type: "object",
- Description: "Optional mutation variables as key-value pairs",
- },
- },
- Required: []string{"mutation"},
- },
- },
- }
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Result: ListToolsResult{Tools: toolList},
- }
- }
- // handleToolsCall executes a tool call
- func (s *Server) handleToolsCall(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse {
- var params CallToolParams
- if req.Params != nil {
- if err := json.Unmarshal(req.Params, ¶ms); err != nil {
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Error: ErrInvalidParams,
- }
- }
- }
- var result tools.CallToolResult
- var err error
- switch params.Name {
- case "introspect":
- result, err = tools.Introspect(ctx, s.schema, params.Arguments)
- case "query":
- result, err = tools.Query(ctx, s.resolver, s.schema, params.Arguments)
- case "mutate":
- result, err = tools.Mutate(ctx, s.resolver, s.schema, params.Arguments)
- default:
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Error: ErrMethodNotFound,
- }
- }
- if err != nil {
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Result: tools.CallToolResult{
- Content: []tools.ContentBlock{
- {Type: "text", Text: err.Error()},
- },
- IsError: true,
- },
- }
- }
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Result: result,
- }
- }
- // handleResourcesList returns the list of available subscription resources
- func (s *Server) handleResourcesList(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse {
- resources := []Resource{
- {
- URI: "graphql://subscription/taskCreated",
- Name: "taskCreated",
- Description: "Subscribe to task creation events. Receives Task objects when new tasks are created and assigned to you.",
- MimeType: "application/json",
- },
- {
- URI: "graphql://subscription/taskUpdated",
- Name: "taskUpdated",
- Description: "Subscribe to task update events. Receives Task objects when tasks assigned to you are updated.",
- MimeType: "application/json",
- },
- {
- URI: "graphql://subscription/taskDeleted",
- Name: "taskDeleted",
- Description: "Subscribe to task deletion events. Receives Task objects when tasks assigned to you are deleted.",
- MimeType: "application/json",
- },
- {
- URI: "graphql://subscription/messageAdded",
- Name: "messageAdded",
- Description: "Subscribe to new message events. Receives Message objects when messages are sent to you.",
- MimeType: "application/json",
- },
- }
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Result: ListResourcesResult{Resources: resources},
- }
- }
- // handleResourcesRead returns current state of a resource (for subscriptions, this is a description)
- func (s *Server) handleResourcesRead(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse {
- var params ReadResourceParams
- if req.Params != nil {
- if err := json.Unmarshal(req.Params, ¶ms); err != nil {
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Error: ErrInvalidParams,
- }
- }
- }
- // For subscriptions, reading returns a description
- description := "This is a subscription resource. Use resources/subscribe to receive real-time updates."
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Result: ReadResourceResult{
- Contents: []ResourceContents{
- {
- URI: params.URI,
- MimeType: "text/plain",
- Text: description,
- },
- },
- },
- }
- }
- // handleResourcesSubscribe starts a subscription for real-time updates
- func (s *Server) handleResourcesSubscribe(ctx context.Context, req *JSONRPCRequest, session *Session) *JSONRPCResponse {
- var params SubscribeParams
- if req.Params != nil {
- if err := json.Unmarshal(req.Params, ¶ms); err != nil {
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Error: ErrInvalidParams,
- }
- }
- }
- // Check authentication
- user, err := auth.CurrentUser(ctx)
- if err != nil {
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Error: &RPCError{Code: -32603, Message: "Authentication required for subscriptions"},
- }
- }
- // Create cancellable context for this subscription
- subCtx, cancel := context.WithCancel(context.Background())
- // Store the cancel function
- session.SubsMu.Lock()
- session.Subscriptions[params.URI] = cancel
- session.SubsMu.Unlock()
- // Start the subscription based on URI
- go s.runSubscription(subCtx, params.URI, user.ID, session)
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Result: map[string]interface{}{"subscribed": true, "uri": params.URI},
- }
- }
- // handleResourcesUnsubscribe stops a subscription
- func (s *Server) handleResourcesUnsubscribe(ctx context.Context, req *JSONRPCRequest, session *Session) *JSONRPCResponse {
- var params UnsubscribeParams
- if req.Params != nil {
- if err := json.Unmarshal(req.Params, ¶ms); err != nil {
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Error: ErrInvalidParams,
- }
- }
- }
- session.SubsMu.Lock()
- if cancel, ok := session.Subscriptions[params.URI]; ok {
- cancel()
- delete(session.Subscriptions, params.URI)
- }
- session.SubsMu.Unlock()
- return &JSONRPCResponse{
- JSONRPC: "2.0",
- ID: req.ID,
- Result: map[string]interface{}{"unsubscribed": true, "uri": params.URI},
- }
- }
- // runSubscription handles the actual subscription event streaming
- func (s *Server) runSubscription(ctx context.Context, uri string, userID uint, session *Session) {
- switch uri {
- case "graphql://subscription/taskCreated":
- s.streamTaskEvents(ctx, userID, session, "created")
- case "graphql://subscription/taskUpdated":
- s.streamTaskEvents(ctx, userID, session, "updated")
- case "graphql://subscription/taskDeleted":
- s.streamTaskEvents(ctx, userID, session, "deleted")
- case "graphql://subscription/messageAdded":
- s.streamMessageEvents(ctx, userID, session)
- }
- }
- // streamTaskEvents streams task events to the session
- func (s *Server) streamTaskEvents(ctx context.Context, userID uint, session *Session, eventType string) {
- eventChan := s.resolver.SubscribeToTasks(userID)
- for {
- select {
- case <-ctx.Done():
- return
- case <-session.Done:
- return
- case event, ok := <-eventChan:
- if !ok {
- return
- }
- if event.EventType == eventType && event.Task != nil {
- notification := CreateResourceNotification(
- "graphql://subscription/task"+capitalize(eventType),
- event.Task,
- )
- s.sendNotification(session, notification)
- }
- }
- }
- }
- // streamMessageEvents streams message events to the session
- func (s *Server) streamMessageEvents(ctx context.Context, userID uint, session *Session) {
- eventChan := s.resolver.SubscribeToMessages(userID)
- for {
- select {
- case <-ctx.Done():
- return
- case <-session.Done:
- return
- case event, ok := <-eventChan:
- if !ok {
- return
- }
- // Check if user is a receiver
- isReceiver := false
- for _, receiverID := range event.ReceiverIDs {
- if receiverID == userID {
- isReceiver = true
- break
- }
- }
- if isReceiver && event.Message != nil {
- notification := CreateResourceNotification(
- "graphql://subscription/messageAdded",
- event.Message,
- )
- s.sendNotification(session, notification)
- }
- }
- }
- }
- // sendNotification sends a JSON-RPC notification to the session
- func (s *Server) sendNotification(session *Session, notification *JSONRPCNotification) {
- notifBytes, err := json.Marshal(notification)
- if err != nil {
- return
- }
- select {
- case session.Events <- notifBytes:
- default:
- // Channel full, skip
- }
- }
- // capitalize helper
- func capitalize(s string) string {
- if len(s) == 0 {
- return s
- }
- return string(s[0]-32) + s[1:]
- }
|