handler.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. package mcp
  2. import (
  3. "context"
  4. "encoding/json"
  5. "gogs.dmsc.dev/arp/auth"
  6. "gogs.dmsc.dev/arp/mcp/tools"
  7. )
  8. // handleToolsList returns the list of available tools
  9. func (s *Server) handleToolsList(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse {
  10. toolList := []Tool{
  11. {
  12. Name: "introspect",
  13. Description: "Get GraphQL schema information - types, fields, queries, mutations. Use this to discover the API structure before making queries or mutations.",
  14. InputSchema: InputSchema{
  15. Type: "object",
  16. AdditionalProperties: false,
  17. Properties: map[string]Property{
  18. "typeName": {
  19. Type: "string",
  20. Description: "Optional - specific type to introspect (e.g., 'Query', 'Mutation', 'User', 'Task'). If omitted, returns full schema overview.",
  21. },
  22. },
  23. },
  24. },
  25. {
  26. Name: "query",
  27. Description: "Execute GraphQL queries (read operations). Use for fetching data from the API. The query must be a valid GraphQL query string.",
  28. InputSchema: InputSchema{
  29. Type: "object",
  30. AdditionalProperties: false,
  31. Properties: map[string]Property{
  32. "query": {
  33. Type: "string",
  34. Description: "GraphQL query string (e.g., 'query { users { id email } }')",
  35. },
  36. "variables": {
  37. Type: "object",
  38. Description: "Optional query variables as key-value pairs",
  39. },
  40. },
  41. Required: []string{"query"},
  42. },
  43. },
  44. {
  45. Name: "mutate",
  46. Description: "Execute GraphQL mutations (create/update/delete operations). Use for modifying data in the API. The mutation must be a valid GraphQL mutation string.",
  47. InputSchema: InputSchema{
  48. Type: "object",
  49. AdditionalProperties: false,
  50. Properties: map[string]Property{
  51. "mutation": {
  52. Type: "string",
  53. Description: "GraphQL mutation string (e.g., 'mutation { createUser(input: {email: \"test@example.com\", password: \"pass\", roles: []}) { id } }')",
  54. },
  55. "variables": {
  56. Type: "object",
  57. Description: "Optional mutation variables as key-value pairs",
  58. },
  59. },
  60. Required: []string{"mutation"},
  61. },
  62. },
  63. }
  64. return &JSONRPCResponse{
  65. JSONRPC: "2.0",
  66. ID: req.ID,
  67. Result: ListToolsResult{Tools: toolList},
  68. }
  69. }
  70. // handleToolsCall executes a tool call
  71. func (s *Server) handleToolsCall(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse {
  72. var params CallToolParams
  73. if req.Params != nil {
  74. if err := json.Unmarshal(req.Params, &params); err != nil {
  75. return &JSONRPCResponse{
  76. JSONRPC: "2.0",
  77. ID: req.ID,
  78. Error: ErrInvalidParams,
  79. }
  80. }
  81. }
  82. var result tools.CallToolResult
  83. var err error
  84. switch params.Name {
  85. case "introspect":
  86. result, err = tools.Introspect(ctx, s.schema, params.Arguments)
  87. case "query":
  88. result, err = tools.Query(ctx, s.resolver, s.schema, params.Arguments)
  89. case "mutate":
  90. result, err = tools.Mutate(ctx, s.resolver, s.schema, params.Arguments)
  91. default:
  92. return &JSONRPCResponse{
  93. JSONRPC: "2.0",
  94. ID: req.ID,
  95. Error: ErrMethodNotFound,
  96. }
  97. }
  98. if err != nil {
  99. return &JSONRPCResponse{
  100. JSONRPC: "2.0",
  101. ID: req.ID,
  102. Result: tools.CallToolResult{
  103. Content: []tools.ContentBlock{
  104. {Type: "text", Text: err.Error()},
  105. },
  106. IsError: true,
  107. },
  108. }
  109. }
  110. return &JSONRPCResponse{
  111. JSONRPC: "2.0",
  112. ID: req.ID,
  113. Result: result,
  114. }
  115. }
  116. // handleResourcesList returns the list of available subscription resources
  117. func (s *Server) handleResourcesList(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse {
  118. resources := []Resource{
  119. {
  120. URI: "graphql://subscription/taskCreated",
  121. Name: "taskCreated",
  122. Description: "Subscribe to task creation events. Receives Task objects when new tasks are created and assigned to you.",
  123. MimeType: "application/json",
  124. },
  125. {
  126. URI: "graphql://subscription/taskUpdated",
  127. Name: "taskUpdated",
  128. Description: "Subscribe to task update events. Receives Task objects when tasks assigned to you are updated.",
  129. MimeType: "application/json",
  130. },
  131. {
  132. URI: "graphql://subscription/taskDeleted",
  133. Name: "taskDeleted",
  134. Description: "Subscribe to task deletion events. Receives Task objects when tasks assigned to you are deleted.",
  135. MimeType: "application/json",
  136. },
  137. {
  138. URI: "graphql://subscription/messageAdded",
  139. Name: "messageAdded",
  140. Description: "Subscribe to new message events. Receives Message objects when messages are sent to you.",
  141. MimeType: "application/json",
  142. },
  143. }
  144. return &JSONRPCResponse{
  145. JSONRPC: "2.0",
  146. ID: req.ID,
  147. Result: ListResourcesResult{Resources: resources},
  148. }
  149. }
  150. // handleResourcesRead returns current state of a resource (for subscriptions, this is a description)
  151. func (s *Server) handleResourcesRead(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse {
  152. var params ReadResourceParams
  153. if req.Params != nil {
  154. if err := json.Unmarshal(req.Params, &params); err != nil {
  155. return &JSONRPCResponse{
  156. JSONRPC: "2.0",
  157. ID: req.ID,
  158. Error: ErrInvalidParams,
  159. }
  160. }
  161. }
  162. // For subscriptions, reading returns a description
  163. description := "This is a subscription resource. Use resources/subscribe to receive real-time updates."
  164. return &JSONRPCResponse{
  165. JSONRPC: "2.0",
  166. ID: req.ID,
  167. Result: ReadResourceResult{
  168. Contents: []ResourceContents{
  169. {
  170. URI: params.URI,
  171. MimeType: "text/plain",
  172. Text: description,
  173. },
  174. },
  175. },
  176. }
  177. }
  178. // handleResourcesSubscribe starts a subscription for real-time updates
  179. func (s *Server) handleResourcesSubscribe(ctx context.Context, req *JSONRPCRequest, session *Session) *JSONRPCResponse {
  180. var params SubscribeParams
  181. if req.Params != nil {
  182. if err := json.Unmarshal(req.Params, &params); err != nil {
  183. return &JSONRPCResponse{
  184. JSONRPC: "2.0",
  185. ID: req.ID,
  186. Error: ErrInvalidParams,
  187. }
  188. }
  189. }
  190. // Check authentication
  191. user, err := auth.CurrentUser(ctx)
  192. if err != nil {
  193. return &JSONRPCResponse{
  194. JSONRPC: "2.0",
  195. ID: req.ID,
  196. Error: &RPCError{Code: -32603, Message: "Authentication required for subscriptions"},
  197. }
  198. }
  199. // Create cancellable context for this subscription
  200. subCtx, cancel := context.WithCancel(context.Background())
  201. // Store the cancel function
  202. session.SubsMu.Lock()
  203. session.Subscriptions[params.URI] = cancel
  204. session.SubsMu.Unlock()
  205. // Start the subscription based on URI
  206. go s.runSubscription(subCtx, params.URI, user.ID, session)
  207. return &JSONRPCResponse{
  208. JSONRPC: "2.0",
  209. ID: req.ID,
  210. Result: map[string]interface{}{"subscribed": true, "uri": params.URI},
  211. }
  212. }
  213. // handleResourcesUnsubscribe stops a subscription
  214. func (s *Server) handleResourcesUnsubscribe(ctx context.Context, req *JSONRPCRequest, session *Session) *JSONRPCResponse {
  215. var params UnsubscribeParams
  216. if req.Params != nil {
  217. if err := json.Unmarshal(req.Params, &params); err != nil {
  218. return &JSONRPCResponse{
  219. JSONRPC: "2.0",
  220. ID: req.ID,
  221. Error: ErrInvalidParams,
  222. }
  223. }
  224. }
  225. session.SubsMu.Lock()
  226. if cancel, ok := session.Subscriptions[params.URI]; ok {
  227. cancel()
  228. delete(session.Subscriptions, params.URI)
  229. }
  230. session.SubsMu.Unlock()
  231. return &JSONRPCResponse{
  232. JSONRPC: "2.0",
  233. ID: req.ID,
  234. Result: map[string]interface{}{"unsubscribed": true, "uri": params.URI},
  235. }
  236. }
  237. // runSubscription handles the actual subscription event streaming
  238. func (s *Server) runSubscription(ctx context.Context, uri string, userID uint, session *Session) {
  239. switch uri {
  240. case "graphql://subscription/taskCreated":
  241. s.streamTaskEvents(ctx, userID, session, "created")
  242. case "graphql://subscription/taskUpdated":
  243. s.streamTaskEvents(ctx, userID, session, "updated")
  244. case "graphql://subscription/taskDeleted":
  245. s.streamTaskEvents(ctx, userID, session, "deleted")
  246. case "graphql://subscription/messageAdded":
  247. s.streamMessageEvents(ctx, userID, session)
  248. }
  249. }
  250. // streamTaskEvents streams task events to the session
  251. func (s *Server) streamTaskEvents(ctx context.Context, userID uint, session *Session, eventType string) {
  252. eventChan := s.resolver.SubscribeToTasks(userID)
  253. for {
  254. select {
  255. case <-ctx.Done():
  256. return
  257. case <-session.Done:
  258. return
  259. case event, ok := <-eventChan:
  260. if !ok {
  261. return
  262. }
  263. if event.EventType == eventType && event.Task != nil {
  264. notification := CreateResourceNotification(
  265. "graphql://subscription/task"+capitalize(eventType),
  266. event.Task,
  267. )
  268. s.sendNotification(session, notification)
  269. }
  270. }
  271. }
  272. }
  273. // streamMessageEvents streams message events to the session
  274. func (s *Server) streamMessageEvents(ctx context.Context, userID uint, session *Session) {
  275. eventChan := s.resolver.SubscribeToMessages(userID)
  276. for {
  277. select {
  278. case <-ctx.Done():
  279. return
  280. case <-session.Done:
  281. return
  282. case event, ok := <-eventChan:
  283. if !ok {
  284. return
  285. }
  286. // Check if user is a receiver
  287. isReceiver := false
  288. for _, receiverID := range event.ReceiverIDs {
  289. if receiverID == userID {
  290. isReceiver = true
  291. break
  292. }
  293. }
  294. if isReceiver && event.Message != nil {
  295. notification := CreateResourceNotification(
  296. "graphql://subscription/messageAdded",
  297. event.Message,
  298. )
  299. s.sendNotification(session, notification)
  300. }
  301. }
  302. }
  303. }
  304. // sendNotification sends a JSON-RPC notification to the session
  305. func (s *Server) sendNotification(session *Session, notification *JSONRPCNotification) {
  306. notifBytes, err := json.Marshal(notification)
  307. if err != nil {
  308. return
  309. }
  310. select {
  311. case session.Events <- notifBytes:
  312. default:
  313. // Channel full, skip
  314. }
  315. }
  316. // capitalize helper
  317. func capitalize(s string) string {
  318. if len(s) == 0 {
  319. return s
  320. }
  321. return string(s[0]-32) + s[1:]
  322. }