package main import ( "encoding/json" "fmt" "log" "strings" "sync" ) // MCPClientInterface defines the interface for MCP client operations type MCPClientInterface interface { ListTools() ([]Tool, error) CallTool(name string, arguments map[string]interface{}) (*CallToolResult, error) } // MCPServerInfo holds information about a connected MCP server type MCPServerInfo struct { Name string Client MCPClientInterface Tools []Tool } // MCPManager manages multiple MCP clients (ARP + external servers) // It aggregates tools from all servers and routes tool calls appropriately type MCPManager struct { // Primary ARP client arpClient *MCPClient // External MCP servers (stdio-based) externalClients map[string]*MCPStdioClient // Aggregated tools with server prefix tools []Tool // Map of prefixed tool name -> server name toolToServer map[string]string // Map of prefixed tool name -> original tool name toolToOriginal map[string]string // Mutex for thread-safe access mu sync.RWMutex } // NewMCPManager creates a new MCP manager func NewMCPManager(arpClient *MCPClient) *MCPManager { return &MCPManager{ arpClient: arpClient, externalClients: make(map[string]*MCPStdioClient), tools: make([]Tool, 0), toolToServer: make(map[string]string), toolToOriginal: make(map[string]string), } } // AddExternalServer adds and initializes an external MCP server // Returns error if the server fails to start, but does not fail the whole manager func (m *MCPManager) AddExternalServer(name string, config MCPServerConfig) error { client := NewMCPStdioClient(name, config) // Start the process if err := client.Start(); err != nil { return fmt.Errorf("failed to start MCP server '%s': %w", name, err) } // Initialize the server if _, err := client.Initialize(); err != nil { client.Close() return fmt.Errorf("failed to initialize MCP server '%s': %w", name, err) } // List tools from the server tools, err := client.ListTools() if err != nil { client.Close() return fmt.Errorf("failed to list tools from MCP server '%s': %w", name, err) } m.mu.Lock() defer m.mu.Unlock() m.externalClients[name] = client // Prefix tools with server name for _, tool := range tools { prefixedName := fmt.Sprintf("%s.%s", name, tool.Name) m.tools = append(m.tools, Tool{ Name: prefixedName, Description: tool.Description, InputSchema: tool.InputSchema, }) m.toolToServer[prefixedName] = name m.toolToOriginal[prefixedName] = tool.Name } log.Printf("Added external MCP server '%s' with %d tools: %v", name, len(tools), toolNames(tools)) return nil } // Initialize initializes the ARP client and discovers tools func (m *MCPManager) Initialize() error { // Get tools from ARP client arpTools, err := m.arpClient.ListTools() if err != nil { return fmt.Errorf("failed to list ARP tools: %w", err) } m.mu.Lock() defer m.mu.Unlock() // Add ARP tools without prefix (they are the primary tools) for _, tool := range arpTools { m.tools = append(m.tools, tool) m.toolToServer[tool.Name] = "arp" m.toolToOriginal[tool.Name] = tool.Name } log.Printf("Discovered %d ARP tools: %v", len(arpTools), toolNames(arpTools)) return nil } // ListTools returns all aggregated tools from all servers func (m *MCPManager) ListTools() ([]Tool, error) { m.mu.RLock() defer m.mu.RUnlock() // Return a copy to avoid race conditions tools := make([]Tool, len(m.tools)) copy(tools, m.tools) return tools, nil } // CallTool routes a tool call to the appropriate server func (m *MCPManager) CallTool(name string, arguments map[string]interface{}) (*CallToolResult, error) { m.mu.RLock() serverName, hasServer := m.toolToServer[name] originalName, hasOriginal := m.toolToOriginal[name] m.mu.RUnlock() if !hasServer || !hasOriginal { return nil, fmt.Errorf("unknown tool: %s", name) } // Route to the appropriate client switch serverName { case "arp": return m.arpClient.CallTool(originalName, arguments) default: // External server client, ok := m.externalClients[serverName] if !ok { return nil, fmt.Errorf("MCP server '%s' not found", serverName) } return client.CallTool(originalName, arguments) } } // Close closes all MCP clients func (m *MCPManager) Close() error { m.mu.Lock() defer m.mu.Unlock() var errors []string // Close external clients for name, client := range m.externalClients { if err := client.Close(); err != nil { errors = append(errors, fmt.Sprintf("%s: %v", name, err)) } } // Close ARP client if m.arpClient != nil { if err := m.arpClient.Close(); err != nil { errors = append(errors, fmt.Sprintf("arp: %v", err)) } } if len(errors) > 0 { return fmt.Errorf("errors closing MCP clients: %s", strings.Join(errors, ", ")) } return nil } // GetServerNames returns the names of all connected servers func (m *MCPManager) GetServerNames() []string { m.mu.RLock() defer m.mu.RUnlock() names := []string{"arp"} for name := range m.externalClients { names = append(names, name) } return names } // GetToolCount returns the total number of tools across all servers func (m *MCPManager) GetToolCount() int { m.mu.RLock() defer m.mu.RUnlock() return len(m.tools) } // SubscribeResource subscribes to a resource on the ARP server func (m *MCPManager) SubscribeResource(uri string) error { return m.arpClient.SubscribeResource(uri) } // UnsubscribeResource unsubscribes from a resource on the ARP server func (m *MCPManager) UnsubscribeResource(uri string) error { return m.arpClient.UnsubscribeResource(uri) } // ListResources lists resources from the ARP server func (m *MCPManager) ListResources() ([]Resource, error) { return m.arpClient.ListResources() } // Notifications returns the notification channel from the ARP server func (m *MCPManager) Notifications() <-chan json.RawMessage { return m.arpClient.Notifications() }