mcp_manager.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "strings"
  7. "sync"
  8. )
  9. // MCPClientInterface defines the interface for MCP client operations
  10. type MCPClientInterface interface {
  11. ListTools() ([]Tool, error)
  12. CallTool(name string, arguments map[string]interface{}) (*CallToolResult, error)
  13. }
  14. // MCPServerInfo holds information about a connected MCP server
  15. type MCPServerInfo struct {
  16. Name string
  17. Client MCPClientInterface
  18. Tools []Tool
  19. }
  20. // MCPManager manages multiple MCP clients (ARP + external servers)
  21. // It aggregates tools from all servers and routes tool calls appropriately
  22. type MCPManager struct {
  23. // Primary ARP client
  24. arpClient *MCPClient
  25. // External MCP servers (stdio-based)
  26. externalClients map[string]*MCPStdioClient
  27. // Aggregated tools with server prefix
  28. tools []Tool
  29. // Map of prefixed tool name -> server name
  30. toolToServer map[string]string
  31. // Map of prefixed tool name -> original tool name
  32. toolToOriginal map[string]string
  33. // Mutex for thread-safe access
  34. mu sync.RWMutex
  35. }
  36. // NewMCPManager creates a new MCP manager
  37. func NewMCPManager(arpClient *MCPClient) *MCPManager {
  38. return &MCPManager{
  39. arpClient: arpClient,
  40. externalClients: make(map[string]*MCPStdioClient),
  41. tools: make([]Tool, 0),
  42. toolToServer: make(map[string]string),
  43. toolToOriginal: make(map[string]string),
  44. }
  45. }
  46. // AddExternalServer adds and initializes an external MCP server
  47. // Returns error if the server fails to start, but does not fail the whole manager
  48. func (m *MCPManager) AddExternalServer(name string, config MCPServerConfig) error {
  49. client := NewMCPStdioClient(name, config)
  50. // Start the process
  51. if err := client.Start(); err != nil {
  52. return fmt.Errorf("failed to start MCP server '%s': %w", name, err)
  53. }
  54. // Initialize the server
  55. if _, err := client.Initialize(); err != nil {
  56. client.Close()
  57. return fmt.Errorf("failed to initialize MCP server '%s': %w", name, err)
  58. }
  59. // List tools from the server
  60. tools, err := client.ListTools()
  61. if err != nil {
  62. client.Close()
  63. return fmt.Errorf("failed to list tools from MCP server '%s': %w", name, err)
  64. }
  65. m.mu.Lock()
  66. defer m.mu.Unlock()
  67. m.externalClients[name] = client
  68. // Prefix tools with server name
  69. for _, tool := range tools {
  70. prefixedName := fmt.Sprintf("%s.%s", name, tool.Name)
  71. m.tools = append(m.tools, Tool{
  72. Name: prefixedName,
  73. Description: tool.Description,
  74. InputSchema: tool.InputSchema,
  75. })
  76. m.toolToServer[prefixedName] = name
  77. m.toolToOriginal[prefixedName] = tool.Name
  78. }
  79. log.Printf("Added external MCP server '%s' with %d tools: %v", name, len(tools), toolNames(tools))
  80. return nil
  81. }
  82. // Initialize initializes the ARP client and discovers tools
  83. func (m *MCPManager) Initialize() error {
  84. // Get tools from ARP client
  85. arpTools, err := m.arpClient.ListTools()
  86. if err != nil {
  87. return fmt.Errorf("failed to list ARP tools: %w", err)
  88. }
  89. m.mu.Lock()
  90. defer m.mu.Unlock()
  91. // Add ARP tools without prefix (they are the primary tools)
  92. for _, tool := range arpTools {
  93. m.tools = append(m.tools, tool)
  94. m.toolToServer[tool.Name] = "arp"
  95. m.toolToOriginal[tool.Name] = tool.Name
  96. }
  97. log.Printf("Discovered %d ARP tools: %v", len(arpTools), toolNames(arpTools))
  98. return nil
  99. }
  100. // ListTools returns all aggregated tools from all servers
  101. func (m *MCPManager) ListTools() ([]Tool, error) {
  102. m.mu.RLock()
  103. defer m.mu.RUnlock()
  104. // Return a copy to avoid race conditions
  105. tools := make([]Tool, len(m.tools))
  106. copy(tools, m.tools)
  107. return tools, nil
  108. }
  109. // CallTool routes a tool call to the appropriate server
  110. func (m *MCPManager) CallTool(name string, arguments map[string]interface{}) (*CallToolResult, error) {
  111. m.mu.RLock()
  112. serverName, hasServer := m.toolToServer[name]
  113. originalName, hasOriginal := m.toolToOriginal[name]
  114. m.mu.RUnlock()
  115. if !hasServer || !hasOriginal {
  116. return nil, fmt.Errorf("unknown tool: %s", name)
  117. }
  118. // Route to the appropriate client
  119. switch serverName {
  120. case "arp":
  121. return m.arpClient.CallTool(originalName, arguments)
  122. default:
  123. // External server
  124. client, ok := m.externalClients[serverName]
  125. if !ok {
  126. return nil, fmt.Errorf("MCP server '%s' not found", serverName)
  127. }
  128. return client.CallTool(originalName, arguments)
  129. }
  130. }
  131. // Close closes all MCP clients
  132. func (m *MCPManager) Close() error {
  133. m.mu.Lock()
  134. defer m.mu.Unlock()
  135. var errors []string
  136. // Close external clients
  137. for name, client := range m.externalClients {
  138. if err := client.Close(); err != nil {
  139. errors = append(errors, fmt.Sprintf("%s: %v", name, err))
  140. }
  141. }
  142. // Close ARP client
  143. if m.arpClient != nil {
  144. if err := m.arpClient.Close(); err != nil {
  145. errors = append(errors, fmt.Sprintf("arp: %v", err))
  146. }
  147. }
  148. if len(errors) > 0 {
  149. return fmt.Errorf("errors closing MCP clients: %s", strings.Join(errors, ", "))
  150. }
  151. return nil
  152. }
  153. // GetServerNames returns the names of all connected servers
  154. func (m *MCPManager) GetServerNames() []string {
  155. m.mu.RLock()
  156. defer m.mu.RUnlock()
  157. names := []string{"arp"}
  158. for name := range m.externalClients {
  159. names = append(names, name)
  160. }
  161. return names
  162. }
  163. // GetToolCount returns the total number of tools across all servers
  164. func (m *MCPManager) GetToolCount() int {
  165. m.mu.RLock()
  166. defer m.mu.RUnlock()
  167. return len(m.tools)
  168. }
  169. // SubscribeResource subscribes to a resource on the ARP server
  170. func (m *MCPManager) SubscribeResource(uri string) error {
  171. return m.arpClient.SubscribeResource(uri)
  172. }
  173. // UnsubscribeResource unsubscribes from a resource on the ARP server
  174. func (m *MCPManager) UnsubscribeResource(uri string) error {
  175. return m.arpClient.UnsubscribeResource(uri)
  176. }
  177. // ListResources lists resources from the ARP server
  178. func (m *MCPManager) ListResources() ([]Resource, error) {
  179. return m.arpClient.ListResources()
  180. }
  181. // Notifications returns the notification channel from the ARP server
  182. func (m *MCPManager) Notifications() <-chan json.RawMessage {
  183. return m.arpClient.Notifications()
  184. }