package main import ( "bytes" "context" "encoding/json" "fmt" "log" "net/http" "os" "os/signal" "strings" "syscall" "time" "github.com/joho/godotenv" ) func main() { // Load .env file if it exists if err := godotenv.Load(); err != nil { log.Println("No .env file found, using environment variables") } // Load configuration cfg, err := LoadConfig() if err != nil { log.Fatalf("Failed to load configuration: %v", err) } // Test OpenAI connectivity log.Printf("Testing connectivity to OpenAI API...") llm := NewLLM(cfg.OpenAIKey, cfg.OpenAIModel, float32(cfg.OpenAITemperature), cfg.OpenAIBaseURL, cfg.OpenAIMaxTokens) if err := llm.TestConnection(context.Background()); err != nil { log.Fatalf("Failed to connect to OpenAI API: %v", err) } log.Printf("✓ Successfully connected to OpenAI API") // Login to ARP server log.Printf("Connecting to ARP server at %s...", cfg.ARPURL) token, err := login(cfg.ARPURL, cfg.ARPUsername, cfg.ARPPassword) if err != nil { log.Fatalf("Failed to login: %v", err) } log.Printf("Successfully authenticated with ARP server") // Connect to MCP server log.Printf("Connecting to MCP server...") mcpClient := NewMCPClient(cfg.ARPURL, token) if err := mcpClient.Connect(); err != nil { log.Fatalf("Failed to connect to MCP: %v", err) } defer mcpClient.Close() // Initialize MCP initResult, err := mcpClient.Initialize() if err != nil { log.Fatalf("Failed to initialize MCP: %v", err) } log.Printf("Connected to MCP server: %s v%s", initResult.ServerInfo.Name, initResult.ServerInfo.Version) // Create and initialize agent agent := NewAgent(llm, mcpClient, cfg) if err := agent.Initialize(); err != nil { log.Fatalf("Failed to initialize agent: %v", err) } log.Printf("Agent initialized successfully.") // Setup event queues agent.SetupQueues(cfg.MaxQueueSize) log.Printf("Event queues initialized with capacity: %d", cfg.MaxQueueSize) // List and subscribe to resources log.Printf("Subscribing to ARP resources...") resources, err := mcpClient.ListResources() if err != nil { log.Fatalf("Failed to list resources: %v", err) } log.Printf("Available resources: %v", resourceURIs(resources)) for _, resource := range resources { log.Printf(" Subscribed to: %s", resource.URI) if err := mcpClient.SubscribeResource(resource.URI); err != nil { log.Printf("Warning: Failed to subscribe to %s: %v", resource.URI, err) } } // Handle graceful shutdown ctx, cancel := context.WithCancel(context.Background()) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigChan log.Println("Received shutdown signal, stopping...") cancel() }() // Start the agent queue processor agent.Start(ctx) defer agent.Stop() // Listen for events log.Println() log.Println("Listening for events. Press Ctrl+C to stop.") for { select { case <-ctx.Done(): return case event, ok := <-mcpClient.Notifications(): if !ok { log.Println("Notification channel closed") return } handleNotification(agent, event) } } } // login authenticates with the ARP server and returns a JWT token func login(baseURL, username, password string) (string, error) { // Ensure URL has the /query endpoint loginURL := strings.TrimSuffix(baseURL, "/") if !strings.HasSuffix(loginURL, "/query") { loginURL = loginURL + "/query" } query := ` mutation Login($email: String!, $password: String!) { login(email: $email, password: $password) { token user { id email } } }` reqBody := map[string]interface{}{ "query": query, "variables": map[string]interface{}{ "email": username, "password": password, }, } bodyBytes, err := json.Marshal(reqBody) if err != nil { return "", fmt.Errorf("failed to marshal request: %w", err) } req, err := http.NewRequest("POST", loginURL, bytes.NewReader(bodyBytes)) if err != nil { return "", fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Do(req) if err != nil { return "", fmt.Errorf("request failed: %w", err) } defer resp.Body.Close() var result struct { Data struct { Login struct { Token string `json:"token"` User struct { ID string `json:"id"` Email string `json:"email"` } `json:"user"` } `json:"login"` } `json:"data"` Errors []struct { Message string `json:"message"` } `json:"errors"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return "", fmt.Errorf("failed to parse response: %w", err) } if len(result.Errors) > 0 { return "", fmt.Errorf("login failed: %s", result.Errors[0].Message) } if result.Data.Login.Token == "" { return "", fmt.Errorf("no token received") } return result.Data.Login.Token, nil } // resourceURIs extracts URIs from resources for logging func resourceURIs(resources []Resource) []string { uris := make([]string, len(resources)) for i, r := range resources { uris[i] = r.URI } return uris } // handleNotification processes an MCP notification and queues it for processing func handleNotification(agent *Agent, event json.RawMessage) { // Parse the notification var notification JSONRPCNotification if err := json.Unmarshal(event, ¬ification); err != nil { log.Printf("Failed to parse notification: %v", err) return } // Handle resource update notifications if notification.Method == "notifications/resources/updated" { params, ok := notification.Params.(map[string]interface{}) if !ok { log.Printf("Invalid notification params") return } uri, _ := params["uri"].(string) contents, ok := params["contents"].(map[string]interface{}) if !ok { log.Printf("Invalid notification contents") return } text, _ := contents["text"].(string) log.Printf("Received event from %s", uri) // Queue the event for processing (non-blocking) agent.QueueEvent(uri, json.RawMessage(text)) } }