| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- package main
- import (
- "encoding/json"
- "fmt"
- "log"
- "strings"
- "sync"
- )
- // MCPClientInterface defines the interface for MCP client operations
- type MCPClientInterface interface {
- ListTools() ([]Tool, error)
- CallTool(name string, arguments map[string]interface{}) (*CallToolResult, error)
- }
- // MCPServerInfo holds information about a connected MCP server
- type MCPServerInfo struct {
- Name string
- Client MCPClientInterface
- Tools []Tool
- }
- // MCPManager manages multiple MCP clients (ARP + external servers)
- // It aggregates tools from all servers and routes tool calls appropriately
- type MCPManager struct {
- // Primary ARP client
- arpClient *MCPClient
- // External MCP servers (stdio-based)
- externalClients map[string]*MCPStdioClient
- // Aggregated tools with server prefix
- tools []Tool
- // Map of prefixed tool name -> server name
- toolToServer map[string]string
- // Map of prefixed tool name -> original tool name
- toolToOriginal map[string]string
- // Mutex for thread-safe access
- mu sync.RWMutex
- // Close once for idempotent close
- closeOnce sync.Once
- // Subscribed resource URIs (for reconnection)
- subscribedURIs []string
- }
- // NewMCPManager creates a new MCP manager
- func NewMCPManager(arpClient *MCPClient) *MCPManager {
- return &MCPManager{
- arpClient: arpClient,
- externalClients: make(map[string]*MCPStdioClient),
- tools: make([]Tool, 0),
- toolToServer: make(map[string]string),
- toolToOriginal: make(map[string]string),
- }
- }
- // AddExternalServer adds and initializes an external MCP server
- // Returns error if the server fails to start, but does not fail the whole manager
- func (m *MCPManager) AddExternalServer(name string, config MCPServerConfig) error {
- client := NewMCPStdioClient(name, config)
- // Start the process
- if err := client.Start(); err != nil {
- return fmt.Errorf("failed to start MCP server '%s': %w", name, err)
- }
- // Initialize the server
- if _, err := client.Initialize(); err != nil {
- client.Close()
- return fmt.Errorf("failed to initialize MCP server '%s': %w", name, err)
- }
- // List tools from the server
- tools, err := client.ListTools()
- if err != nil {
- client.Close()
- return fmt.Errorf("failed to list tools from MCP server '%s': %w", name, err)
- }
- m.mu.Lock()
- defer m.mu.Unlock()
- m.externalClients[name] = client
- // Prefix tools with server name
- for _, tool := range tools {
- prefixedName := fmt.Sprintf("%s.%s", name, tool.Name)
- m.tools = append(m.tools, Tool{
- Name: prefixedName,
- Description: tool.Description,
- InputSchema: tool.InputSchema,
- })
- m.toolToServer[prefixedName] = name
- m.toolToOriginal[prefixedName] = tool.Name
- }
- log.Printf("Added external MCP server '%s' with %d tools: %v", name, len(tools), toolNames(tools))
- return nil
- }
- // Initialize initializes the ARP client and discovers tools
- func (m *MCPManager) Initialize() error {
- // Get tools from ARP client
- arpTools, err := m.arpClient.ListTools()
- if err != nil {
- return fmt.Errorf("failed to list ARP tools: %w", err)
- }
- m.mu.Lock()
- defer m.mu.Unlock()
- // Add ARP tools without prefix (they are the primary tools)
- for _, tool := range arpTools {
- m.tools = append(m.tools, tool)
- m.toolToServer[tool.Name] = "arp"
- m.toolToOriginal[tool.Name] = tool.Name
- }
- log.Printf("Discovered %d ARP tools: %v", len(arpTools), toolNames(arpTools))
- return nil
- }
- // ListTools returns all aggregated tools from all servers
- func (m *MCPManager) ListTools() ([]Tool, error) {
- m.mu.RLock()
- defer m.mu.RUnlock()
- // Return a copy to avoid race conditions
- tools := make([]Tool, len(m.tools))
- copy(tools, m.tools)
- return tools, nil
- }
- // CallTool routes a tool call to the appropriate server
- func (m *MCPManager) CallTool(name string, arguments map[string]interface{}) (*CallToolResult, error) {
- m.mu.RLock()
- serverName, hasServer := m.toolToServer[name]
- originalName, hasOriginal := m.toolToOriginal[name]
- m.mu.RUnlock()
- if !hasServer || !hasOriginal {
- return nil, fmt.Errorf("unknown tool: %s", name)
- }
- // Route to the appropriate client
- switch serverName {
- case "arp":
- return m.arpClient.CallTool(originalName, arguments)
- default:
- // External server
- client, ok := m.externalClients[serverName]
- if !ok {
- return nil, fmt.Errorf("MCP server '%s' not found", serverName)
- }
- return client.CallTool(originalName, arguments)
- }
- }
- // Close closes all MCP clients (idempotent)
- func (m *MCPManager) Close() error {
- m.closeOnce.Do(func() {
- m.mu.Lock()
- defer m.mu.Unlock()
- var errors []string
- // Close external clients
- for name, client := range m.externalClients {
- if err := client.Close(); err != nil {
- errors = append(errors, fmt.Sprintf("%s: %v", name, err))
- }
- }
- // Close ARP client
- if m.arpClient != nil {
- if err := m.arpClient.Close(); err != nil {
- errors = append(errors, fmt.Sprintf("arp: %v", err))
- }
- }
- if len(errors) > 0 {
- log.Printf("errors closing MCP clients: %s", strings.Join(errors, ", "))
- }
- })
- return nil
- }
- // GetServerNames returns the names of all connected servers
- func (m *MCPManager) GetServerNames() []string {
- m.mu.RLock()
- defer m.mu.RUnlock()
- names := []string{"arp"}
- for name := range m.externalClients {
- names = append(names, name)
- }
- return names
- }
- // GetToolCount returns the total number of tools across all servers
- func (m *MCPManager) GetToolCount() int {
- m.mu.RLock()
- defer m.mu.RUnlock()
- return len(m.tools)
- }
- // SubscribeResource subscribes to a resource on the ARP server
- func (m *MCPManager) SubscribeResource(uri string) error {
- if err := m.arpClient.SubscribeResource(uri); err != nil {
- return err
- }
- // Track the subscription for reconnection
- m.mu.Lock()
- defer m.mu.Unlock()
- // Check if already subscribed
- for _, existing := range m.subscribedURIs {
- if existing == uri {
- return nil
- }
- }
- m.subscribedURIs = append(m.subscribedURIs, uri)
- return nil
- }
- // UnsubscribeResource unsubscribes from a resource on the ARP server
- func (m *MCPManager) UnsubscribeResource(uri string) error {
- return m.arpClient.UnsubscribeResource(uri)
- }
- // ListResources lists resources from the ARP server
- func (m *MCPManager) ListResources() ([]Resource, error) {
- return m.arpClient.ListResources()
- }
- // Notifications returns the notification channel from the ARP server
- func (m *MCPManager) Notifications() <-chan json.RawMessage {
- return m.arpClient.Notifications()
- }
- // SetARPClient replaces the internal ARP client with a new one
- // This is used for reconnection scenarios
- func (m *MCPManager) SetARPClient(client *MCPClient) {
- m.mu.Lock()
- defer m.mu.Unlock()
- // Close the old client if it exists
- if m.arpClient != nil {
- m.arpClient.Close()
- }
- m.arpClient = client
- }
- // Reconnect handles reconnection with a new MCP client
- // It replaces the internal client and re-subscribes to all previously subscribed resources
- func (m *MCPManager) Reconnect(client *MCPClient) error {
- // Get the list of subscribed URIs before replacing the client
- m.mu.RLock()
- subscribedURIs := make([]string, len(m.subscribedURIs))
- copy(subscribedURIs, m.subscribedURIs)
- m.mu.RUnlock()
- // Replace the client
- m.SetARPClient(client)
- // Re-subscribe to all previously subscribed resources
- for _, uri := range subscribedURIs {
- if err := client.SubscribeResource(uri); err != nil {
- log.Printf("Warning: Failed to re-subscribe to %s: %v", uri, err)
- } else {
- log.Printf("Re-subscribed to: %s", uri)
- }
- }
- return nil
- }
- // GetSubscribedURIs returns the list of currently subscribed resource URIs
- func (m *MCPManager) GetSubscribedURIs() []string {
- m.mu.RLock()
- defer m.mu.RUnlock()
- uris := make([]string, len(m.subscribedURIs))
- copy(uris, m.subscribedURIs)
- return uris
- }
|