package main import ( "bytes" "context" "encoding/json" "fmt" "log" "net/http" "os" "os/signal" "strings" "syscall" "time" "github.com/joho/godotenv" ) func main() { // Load .env file if it exists if err := godotenv.Load(); err != nil { log.Println("No .env file found, using environment variables") } // Load configuration cfg, err := LoadConfig() if err != nil { log.Fatalf("Failed to load configuration: %v", err) } // Test OpenAI connectivity log.Printf("Testing connectivity to OpenAI API...") retryDelay := time.Duration(cfg.LLMRetryDelay) * time.Second llm := NewLLMWithRetry(cfg.OpenAIKey, cfg.OpenAIModel, float32(cfg.OpenAITemperature), cfg.OpenAIBaseURL, cfg.OpenAIMaxTokens, cfg.LLMMaxRetries, retryDelay) if err := llm.TestConnection(context.Background()); err != nil { log.Fatalf("Failed to connect to OpenAI API: %v", err) } log.Printf("✓ Successfully connected to OpenAI API (max retries: %d, retry delay: %v)", cfg.LLMMaxRetries, retryDelay) // Login to ARP server log.Printf("Connecting to ARP server at %s...", cfg.ARPURL) token, err := login(cfg.ARPURL, cfg.ARPUsername, cfg.ARPPassword) if err != nil { log.Fatalf("Failed to login: %v", err) } log.Printf("Successfully authenticated with ARP server") // Connect to MCP server log.Printf("Connecting to MCP server...") mcpClient := NewMCPClient(cfg.ARPURL, token) if err := mcpClient.Connect(); err != nil { log.Fatalf("Failed to connect to MCP: %v", err) } // Initialize MCP initResult, err := mcpClient.Initialize() if err != nil { mcpClient.Close() log.Fatalf("Failed to initialize MCP: %v", err) } log.Printf("Connected to MCP server: %s v%s", initResult.ServerInfo.Name, initResult.ServerInfo.Version) // Create MCP manager to aggregate ARP + external servers mcpManager := NewMCPManager(mcpClient) // Load external MCP servers from config file if specified if cfg.MCPConfigPath != "" { log.Printf("Loading MCP server configuration from %s", cfg.MCPConfigPath) mcpConfig, err := LoadMCPConfig(cfg.MCPConfigPath) if err != nil { mcpManager.Close() log.Fatalf("Failed to load MCP config: %v", err) } for name, serverConfig := range mcpConfig.MCPServers { if err := mcpManager.AddExternalServer(name, serverConfig); err != nil { log.Printf("Warning: Failed to add external MCP server '%s': %v", name, err) } } } // Initialize the manager (discovers ARP tools) if err := mcpManager.Initialize(); err != nil { mcpManager.Close() log.Fatalf("Failed to initialize MCP manager: %v", err) } log.Printf("MCP manager initialized with %d tools from %d servers", mcpManager.GetToolCount(), len(mcpManager.GetServerNames())) // Create and initialize agent agent := NewAgent(llm, mcpManager, cfg) if err := agent.Initialize(); err != nil { mcpManager.Close() log.Fatalf("Failed to initialize agent: %v", err) } log.Printf("Agent initialized successfully.") // Setup event queues agent.SetupQueues(cfg.MaxQueueSize) log.Printf("Event queues initialized with capacity: %d", cfg.MaxQueueSize) // List and subscribe to resources log.Printf("Subscribing to ARP resources...") resources, err := mcpManager.ListResources() if err != nil { mcpManager.Close() log.Fatalf("Failed to list resources: %v", err) } log.Printf("Available resources: %v", resourceURIs(resources)) for _, resource := range resources { log.Printf(" Subscribed to: %s", resource.URI) if err := mcpManager.SubscribeResource(resource.URI); err != nil { log.Printf("Warning: Failed to subscribe to %s: %v", resource.URI, err) } } // Handle graceful shutdown ctx, cancel := context.WithCancel(context.Background()) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigChan log.Println("Received shutdown signal, stopping...") cancel() }() // Start the agent queue processor agent.Start(ctx) defer agent.Stop() // Listen for events with reconnection support log.Println() log.Println("Listening for events. Press Ctrl+C to stop.") reconnectDelay := time.Duration(cfg.MCPReconnectDelay) * time.Second for { select { case <-ctx.Done(): mcpManager.Close() return case event, ok := <-mcpManager.Notifications(): if !ok { // Notification channel closed - attempt to reconnect log.Printf("Notification channel closed, attempting to reconnect in %v...", reconnectDelay) select { case <-ctx.Done(): mcpManager.Close() return case <-time.After(reconnectDelay): // Try to reconnect if err := reconnectMCP(cfg, mcpManager, &token); err != nil { log.Printf("Reconnection failed: %v. Retrying in %v...", err, reconnectDelay) continue } log.Println("Successfully reconnected to MCP server") } continue } handleNotification(agent, event) } } } // reconnectMCP attempts to reconnect to the MCP server func reconnectMCP(cfg *Config, mcpManager *MCPManager, token *string) error { // Close existing connection mcpManager.Close() // Re-authenticate newToken, err := login(cfg.ARPURL, cfg.ARPUsername, cfg.ARPPassword) if err != nil { return fmt.Errorf("failed to re-authenticate: %w", err) } *token = newToken // Create new MCP client mcpClient := NewMCPClient(cfg.ARPURL, newToken) if err := mcpClient.Connect(); err != nil { return fmt.Errorf("failed to reconnect to MCP: %w", err) } // Initialize MCP if _, err := mcpClient.Initialize(); err != nil { mcpClient.Close() return fmt.Errorf("failed to initialize MCP on reconnect: %w", err) } // Note: We can't easily replace the mcpManager's internal client // This is a limitation - in a production system we'd need to refactor // the MCPManager to support reconnection log.Println("MCP reconnection requires agent restart - please restart the agent") return fmt.Errorf("MCP reconnection requires agent restart") } // login authenticates with the ARP server and returns a JWT token func login(baseURL, username, password string) (string, error) { // Ensure URL has the /query endpoint loginURL := strings.TrimSuffix(baseURL, "/") if !strings.HasSuffix(loginURL, "/query") { loginURL = loginURL + "/query" } query := ` mutation Login($email: String!, $password: String!) { login(email: $email, password: $password) { token user { id email } } }` reqBody := map[string]interface{}{ "query": query, "variables": map[string]interface{}{ "email": username, "password": password, }, } bodyBytes, err := json.Marshal(reqBody) if err != nil { return "", fmt.Errorf("failed to marshal request: %w", err) } req, err := http.NewRequest("POST", loginURL, bytes.NewReader(bodyBytes)) if err != nil { return "", fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Do(req) if err != nil { return "", fmt.Errorf("request failed: %w", err) } defer resp.Body.Close() var result struct { Data struct { Login struct { Token string `json:"token"` User struct { ID string `json:"id"` Email string `json:"email"` } `json:"user"` } `json:"login"` } `json:"data"` Errors []struct { Message string `json:"message"` } `json:"errors"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return "", fmt.Errorf("failed to parse response: %w", err) } if len(result.Errors) > 0 { return "", fmt.Errorf("login failed: %s", result.Errors[0].Message) } if result.Data.Login.Token == "" { return "", fmt.Errorf("no token received") } return result.Data.Login.Token, nil } // resourceURIs extracts URIs from resources for logging func resourceURIs(resources []Resource) []string { uris := make([]string, len(resources)) for i, r := range resources { uris[i] = r.URI } return uris } // handleNotification processes an MCP notification and queues it for processing func handleNotification(agent *Agent, event json.RawMessage) { // Parse the notification var notification JSONRPCNotification if err := json.Unmarshal(event, ¬ification); err != nil { log.Printf("Failed to parse notification: %v", err) return } // Handle resource update notifications if notification.Method == "notifications/resources/updated" { params, ok := notification.Params.(map[string]interface{}) if !ok { log.Printf("Invalid notification params") return } uri, _ := params["uri"].(string) contents, ok := params["contents"].(map[string]interface{}) if !ok { log.Printf("Invalid notification contents") return } text, _ := contents["text"].(string) log.Printf("Received event from %s", uri) // Queue the event for processing (non-blocking) agent.QueueEvent(uri, json.RawMessage(text)) } }