mcp_manager.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "strings"
  7. "sync"
  8. )
  9. // MCPClientInterface defines the interface for MCP client operations
  10. type MCPClientInterface interface {
  11. ListTools() ([]Tool, error)
  12. CallTool(name string, arguments map[string]interface{}) (*CallToolResult, error)
  13. }
  14. // MCPServerInfo holds information about a connected MCP server
  15. type MCPServerInfo struct {
  16. Name string
  17. Client MCPClientInterface
  18. Tools []Tool
  19. }
  20. // MCPManager manages multiple MCP clients (ARP + external servers)
  21. // It aggregates tools from all servers and routes tool calls appropriately
  22. type MCPManager struct {
  23. // Primary ARP client
  24. arpClient *MCPClient
  25. // External MCP servers (stdio-based)
  26. externalClients map[string]*MCPStdioClient
  27. // Aggregated tools with server prefix
  28. tools []Tool
  29. // Map of prefixed tool name -> server name
  30. toolToServer map[string]string
  31. // Map of prefixed tool name -> original tool name
  32. toolToOriginal map[string]string
  33. // Mutex for thread-safe access
  34. mu sync.RWMutex
  35. // Close once for idempotent close
  36. closeOnce sync.Once
  37. // Subscribed resource URIs (for reconnection)
  38. subscribedURIs []string
  39. }
  40. // NewMCPManager creates a new MCP manager
  41. func NewMCPManager(arpClient *MCPClient) *MCPManager {
  42. return &MCPManager{
  43. arpClient: arpClient,
  44. externalClients: make(map[string]*MCPStdioClient),
  45. tools: make([]Tool, 0),
  46. toolToServer: make(map[string]string),
  47. toolToOriginal: make(map[string]string),
  48. }
  49. }
  50. // AddExternalServer adds and initializes an external MCP server
  51. // Returns error if the server fails to start, but does not fail the whole manager
  52. func (m *MCPManager) AddExternalServer(name string, config MCPServerConfig) error {
  53. client := NewMCPStdioClient(name, config)
  54. // Start the process
  55. if err := client.Start(); err != nil {
  56. return fmt.Errorf("failed to start MCP server '%s': %w", name, err)
  57. }
  58. // Initialize the server
  59. if _, err := client.Initialize(); err != nil {
  60. client.Close()
  61. return fmt.Errorf("failed to initialize MCP server '%s': %w", name, err)
  62. }
  63. // List tools from the server
  64. tools, err := client.ListTools()
  65. if err != nil {
  66. client.Close()
  67. return fmt.Errorf("failed to list tools from MCP server '%s': %w", name, err)
  68. }
  69. m.mu.Lock()
  70. defer m.mu.Unlock()
  71. m.externalClients[name] = client
  72. // Prefix tools with server name
  73. for _, tool := range tools {
  74. prefixedName := fmt.Sprintf("%s.%s", name, tool.Name)
  75. m.tools = append(m.tools, Tool{
  76. Name: prefixedName,
  77. Description: tool.Description,
  78. InputSchema: tool.InputSchema,
  79. })
  80. m.toolToServer[prefixedName] = name
  81. m.toolToOriginal[prefixedName] = tool.Name
  82. }
  83. log.Printf("Added external MCP server '%s' with %d tools: %v", name, len(tools), toolNames(tools))
  84. return nil
  85. }
  86. // Initialize initializes the ARP client and discovers tools
  87. func (m *MCPManager) Initialize() error {
  88. // Get tools from ARP client
  89. arpTools, err := m.arpClient.ListTools()
  90. if err != nil {
  91. return fmt.Errorf("failed to list ARP tools: %w", err)
  92. }
  93. m.mu.Lock()
  94. defer m.mu.Unlock()
  95. // Add ARP tools without prefix (they are the primary tools)
  96. for _, tool := range arpTools {
  97. m.tools = append(m.tools, tool)
  98. m.toolToServer[tool.Name] = "arp"
  99. m.toolToOriginal[tool.Name] = tool.Name
  100. }
  101. log.Printf("Discovered %d ARP tools: %v", len(arpTools), toolNames(arpTools))
  102. return nil
  103. }
  104. // ListTools returns all aggregated tools from all servers
  105. func (m *MCPManager) ListTools() ([]Tool, error) {
  106. m.mu.RLock()
  107. defer m.mu.RUnlock()
  108. // Return a copy to avoid race conditions
  109. tools := make([]Tool, len(m.tools))
  110. copy(tools, m.tools)
  111. return tools, nil
  112. }
  113. // CallTool routes a tool call to the appropriate server
  114. func (m *MCPManager) CallTool(name string, arguments map[string]interface{}) (*CallToolResult, error) {
  115. m.mu.RLock()
  116. serverName, hasServer := m.toolToServer[name]
  117. originalName, hasOriginal := m.toolToOriginal[name]
  118. m.mu.RUnlock()
  119. if !hasServer || !hasOriginal {
  120. return nil, fmt.Errorf("unknown tool: %s", name)
  121. }
  122. // Route to the appropriate client
  123. switch serverName {
  124. case "arp":
  125. return m.arpClient.CallTool(originalName, arguments)
  126. default:
  127. // External server
  128. client, ok := m.externalClients[serverName]
  129. if !ok {
  130. return nil, fmt.Errorf("MCP server '%s' not found", serverName)
  131. }
  132. return client.CallTool(originalName, arguments)
  133. }
  134. }
  135. // Close closes all MCP clients (idempotent)
  136. func (m *MCPManager) Close() error {
  137. m.closeOnce.Do(func() {
  138. m.mu.Lock()
  139. defer m.mu.Unlock()
  140. var errors []string
  141. // Close external clients
  142. for name, client := range m.externalClients {
  143. if err := client.Close(); err != nil {
  144. errors = append(errors, fmt.Sprintf("%s: %v", name, err))
  145. }
  146. }
  147. // Close ARP client
  148. if m.arpClient != nil {
  149. if err := m.arpClient.Close(); err != nil {
  150. errors = append(errors, fmt.Sprintf("arp: %v", err))
  151. }
  152. }
  153. if len(errors) > 0 {
  154. log.Printf("errors closing MCP clients: %s", strings.Join(errors, ", "))
  155. }
  156. })
  157. return nil
  158. }
  159. // GetServerNames returns the names of all connected servers
  160. func (m *MCPManager) GetServerNames() []string {
  161. m.mu.RLock()
  162. defer m.mu.RUnlock()
  163. names := []string{"arp"}
  164. for name := range m.externalClients {
  165. names = append(names, name)
  166. }
  167. return names
  168. }
  169. // GetToolCount returns the total number of tools across all servers
  170. func (m *MCPManager) GetToolCount() int {
  171. m.mu.RLock()
  172. defer m.mu.RUnlock()
  173. return len(m.tools)
  174. }
  175. // SubscribeResource subscribes to a resource on the ARP server
  176. func (m *MCPManager) SubscribeResource(uri string) error {
  177. if err := m.arpClient.SubscribeResource(uri); err != nil {
  178. return err
  179. }
  180. // Track the subscription for reconnection
  181. m.mu.Lock()
  182. defer m.mu.Unlock()
  183. // Check if already subscribed
  184. for _, existing := range m.subscribedURIs {
  185. if existing == uri {
  186. return nil
  187. }
  188. }
  189. m.subscribedURIs = append(m.subscribedURIs, uri)
  190. return nil
  191. }
  192. // UnsubscribeResource unsubscribes from a resource on the ARP server
  193. func (m *MCPManager) UnsubscribeResource(uri string) error {
  194. return m.arpClient.UnsubscribeResource(uri)
  195. }
  196. // ListResources lists resources from the ARP server
  197. func (m *MCPManager) ListResources() ([]Resource, error) {
  198. return m.arpClient.ListResources()
  199. }
  200. // Notifications returns the notification channel from the ARP server
  201. func (m *MCPManager) Notifications() <-chan json.RawMessage {
  202. return m.arpClient.Notifications()
  203. }
  204. // SetARPClient replaces the internal ARP client with a new one
  205. // This is used for reconnection scenarios
  206. func (m *MCPManager) SetARPClient(client *MCPClient) {
  207. m.mu.Lock()
  208. defer m.mu.Unlock()
  209. // Close the old client if it exists
  210. if m.arpClient != nil {
  211. m.arpClient.Close()
  212. }
  213. m.arpClient = client
  214. }
  215. // Reconnect handles reconnection with a new MCP client
  216. // It replaces the internal client and re-subscribes to all previously subscribed resources
  217. func (m *MCPManager) Reconnect(client *MCPClient) error {
  218. // Get the list of subscribed URIs before replacing the client
  219. m.mu.RLock()
  220. subscribedURIs := make([]string, len(m.subscribedURIs))
  221. copy(subscribedURIs, m.subscribedURIs)
  222. m.mu.RUnlock()
  223. // Replace the client
  224. m.SetARPClient(client)
  225. // Re-subscribe to all previously subscribed resources
  226. for _, uri := range subscribedURIs {
  227. if err := client.SubscribeResource(uri); err != nil {
  228. log.Printf("Warning: Failed to re-subscribe to %s: %v", uri, err)
  229. } else {
  230. log.Printf("Re-subscribed to: %s", uri)
  231. }
  232. }
  233. return nil
  234. }
  235. // GetSubscribedURIs returns the list of currently subscribed resource URIs
  236. func (m *MCPManager) GetSubscribedURIs() []string {
  237. m.mu.RLock()
  238. defer m.mu.RUnlock()
  239. uris := make([]string, len(m.subscribedURIs))
  240. copy(uris, m.subscribedURIs)
  241. return uris
  242. }