Browse Source

port agent to go

david 2 days ago
parent
commit
03f93e2c50

+ 414 - 0
ARP_AGENT_README.MD

@@ -0,0 +1,414 @@
+# ARP Agent - LLM Agent for the ARP Platform
+## An LLM-powered agent that connects to the ARP (Agent-native ERP) platform via the Model Context Protocol (MCP) and responds to Task and Message events in real-time.
+
+
+## Table of Contents
+
+- [Overview](#overview)
+- [Installation](#installation)
+- [Configuration](#configuration)
+- [Running the Agent](#running-the-agent)
+- [How It Works](#how-it-works)
+- [MCP Communication](#mcp-communication)
+- [Available MCP Tools](#available-mcp-tools)
+- [Programmatic Usage](#programmatic-usage)
+- [Testing](#testing)
+
+---
+
+## Overview
+
+The ARP Agent connects to an ARP server and:
+
+1. **Authenticates** via GraphQL login to obtain a JWT token
+2. **Connects to MCP** via Server-Sent Events (SSE) to the `/mcp` endpoint
+3. **Discovers Tools** using the MCP `tools/list` protocol
+4. **Subscribes to Events** via MCP resources for real-time notifications
+5. **Processes Events** using an LLM with tool-calling capabilities
+
+---
+
+## Installation
+
+### Using pip
+
+```bash
+pip install -r requirements.txt
+```
+
+### Using Poetry (recommended)
+
+```bash
+poetry install
+```
+
+### Dependencies
+
+- `openai` - OpenAI API client for LLM interactions
+- `requests` - HTTP client for GraphQL and MCP communication
+- `sseclient-py` - Server-Sent Events client for MCP
+- `python-dotenv` - Environment variable management
+
+---
+
+## Configuration
+
+Copy the example environment file and configure your credentials:
+
+```bash
+cp .env.example .env
+```
+
+Edit `.env` with your settings:
+
+```env
+# ARP Server Configuration
+ARP_URL=http://localhost:8080
+ARP_USERNAME=your-email@example.com
+ARP_PASSWORD=your-password
+
+# OpenAI Configuration
+OPENAI_API_KEY=sk-your-openai-api-key
+OPENAI_MODEL=gpt-4
+OPENAI_TEMPERATURE=0.0
+
+# Optional: Custom OpenAI endpoint (for local models, etc.)
+# OPENAI_BASE_URL=http://localhost:11434/v1
+```
+
+### Required Environment Variables
+
+| Variable | Description |
+|----------|-------------|
+| `ARP_URL` | Base URL of the ARP server |
+| `ARP_USERNAME` | Your ARP login email |
+| `ARP_PASSWORD` | Your ARP password |
+| `OPENAI_API_KEY` | OpenAI API key for LLM |
+
+### Optional Environment Variables
+
+| Variable | Description | Default |
+|----------|-------------|---------|
+| `OPENAI_MODEL` | Model to use | `gpt-4` |
+| `OPENAI_TEMPERATURE` | Sampling temperature | `0.0` |
+| `OPENAI_BASE_URL` | Custom OpenAI-compatible endpoint | OpenAI API |
+
+---
+
+## Running the Agent
+
+```bash
+python run_arp_agent.py
+```
+
+### Expected Output
+
+```
+Testing connectivity to OpenAI API (api.openai.com)...
+✓ Successfully connected to OpenAI API (api.openai.com)
+Connecting to ARP server at http://localhost:8080...
+Successfully authenticated with ARP server
+Connecting to MCP server...
+Discovered 3 MCP tools: ['introspect', 'query', 'mutate']
+Initializing LLM agent...
+Agent initialized successfully.
+
+Subscribing to ARP resources...
+Available resources: ['taskCreated', 'taskUpdated', 'taskDeleted', 'messageAdded']
+  Subscribed to: graphql://subscription/taskCreated
+  Subscribed to: graphql://subscription/taskUpdated
+  Subscribed to: graphql://subscription/taskDeleted
+  Subscribed to: graphql://subscription/messageAdded
+
+Listening for events. Press Ctrl+C to stop.
+```
+
+---
+
+## How It Works
+
+### Architecture
+
+```
+┌─────────────────┐      GraphQL Login      ┌─────────────────┐
+│   ARP Agent     │ ──────────────────────► │   ARP Server    │
+│                 │ ◄────────────────────── │                 │
+│  ┌───────────┐  │      JWT Token          │                 │
+│  │    LLM    │  │                         │  ┌───────────┐  │
+│  └───────────┘  │      SSE Connect        │  │    MCP    │  │
+│        │        │ ──────────────────────► │  │  Server   │  │
+│        ▼        │      /mcp endpoint      │  └───────────┘  │
+│  ┌───────────┐  │                         │        │        │
+│  │MCP Client │◄─┼──── Tool Discovery      │        │        │
+│  └───────────┘  │      Tool Calls         │        ▼        │
+│        │        │      Notifications      │  ┌───────────┐  │
+│        ▼        │                         │  │ GraphQL   │  │
+│  ┌───────────┐  │                         │  │  Engine   │  │
+│  │  Events   │◄─┼─────────────────────────┼──┤           │  │
+│  └───────────┘  │      Real-time          │  └───────────┘  │
+└─────────────────┘      Subscriptions      └─────────────────┘
+```
+
+### Event Processing Flow
+
+1. **Event Received**: Task or message event via MCP resource notification
+2. **Context Built**: Extract relevant details (ID, title, content, sender, etc.)
+3. **LLM Invoked**: Agent processes the event with available tools
+4. **Tool Execution**: LLM may call MCP tools to query/mutate data
+5. **Response Generated**: Agent produces a result or takes action
+
+---
+
+## MCP Communication
+
+The agent uses the **Model Context Protocol (MCP)** to communicate with the ARP server:
+
+### Connection Flow
+
+1. **SSE Connection**: Connect to `/mcp` endpoint via Server-Sent Events
+2. **Endpoint Discovery**: Receive message endpoint URL from `endpoint` event
+3. **Initialize**: Send `initialize` request with protocol version and client info
+4. **Tool Discovery**: Call `tools/list` to discover available tools
+5. **Subscribe**: Call `resources/subscribe` for real-time event streams
+
+### Authentication
+
+Authentication is handled via JWT tokens:
+
+1. Login via GraphQL `login` mutation with email/password
+2. Receive JWT token in response
+3. Include token in SSE connection headers (`Authorization: Bearer <token>`)
+4. Token is automatically propagated to all MCP requests
+
+### Protocol Details
+
+- **Protocol Version**: `2024-11-05`
+- **Transport**: HTTP POST for requests, SSE for responses/notifications
+- **Format**: JSON-RPC 2.0
+
+---
+
+## Available MCP Tools
+
+The ARP MCP server exposes three tools:
+
+### 1. `introspect`
+
+Discover the GraphQL schema - types, fields, queries, mutations.
+
+```python
+# Get full schema
+result = mcp_client.call_tool("introspect", {})
+
+# Get specific type
+result = mcp_client.call_tool("introspect", {"typeName": "User"})
+```
+
+### 2. `query`
+
+Execute GraphQL queries (read operations).
+
+```python
+# Query users
+result = mcp_client.call_tool("query", {
+    "query": "{ users { id email roles { name } } }"
+})
+
+# Query with variables
+result = mcp_client.call_tool("query", {
+    "query": "query User($id: ID!) { user(id: $id) { id email } }",
+    "variables": {"id": "1"}
+})
+```
+
+### 3. `mutate`
+
+Execute GraphQL mutations (create/update/delete operations).
+
+```python
+# Create a task
+result = mcp_client.call_tool("mutate", {
+    "mutation": """
+        mutation CreateTask($input: NewTask!) {
+            createTask(input: $input) { id title }
+        }
+    """,
+    "variables": {
+        "input": {
+            "title": "New Task",
+            "content": "Task description",
+            "createdById": "1"
+        }
+    }
+})
+
+# Delete a note
+result = mcp_client.call_tool("mutate", {
+    "mutation": "mutation DeleteNote($id: ID!) { deleteNote(id: $id) }",
+    "variables": {"id": "123"}
+})
+```
+
+---
+
+## Programmatic Usage
+
+### Using MCPClient Directly
+
+```python
+from llm_agents.mcp_client import login_and_create_mcp_client
+
+# Login and create client
+client = login_and_create_mcp_client(
+    url="http://localhost:8080",
+    username="admin@example.com",
+    password="secret123"
+)
+
+# Connect to MCP server
+client.connect()
+client.initialize()
+
+# Discover tools
+tools = client.list_tools()
+print(f"Available tools: {[t.name for t in tools]}")
+
+# Call tools
+users = client.call_tool("query", {"query": "{ users { id email } }"})
+print(users)
+
+# Subscribe to resources
+client.subscribe_resource("graphql://subscription/taskCreated")
+
+# Listen for notifications
+def on_notification(data):
+    print(f"Received: {data}")
+
+client.listen_for_notifications(on_notification)
+
+# Cleanup
+client.close()
+```
+
+### Using the Agent with MCP
+
+```python
+from llm_agents import Agent, ChatLLM
+from llm_agents.mcp_client import login_and_create_mcp_client
+
+# Create authenticated MCP client
+mcp_client = login_and_create_mcp_client(
+    url="http://localhost:8080",
+    username="admin@example.com",
+    password="secret123"
+)
+
+# Connect and initialize
+mcp_client.connect()
+mcp_client.initialize()
+mcp_client.list_tools()
+
+# Create agent with MCP client
+llm = ChatLLM()  # Uses OPENAI_API_KEY from environment
+agent = Agent(llm=llm, mcp_client=mcp_client)
+
+# Run the agent
+result = agent.run("List all users and their roles")
+print(result)
+
+# Cleanup
+mcp_client.close()
+```
+
+---
+
+## MCP Resources (Subscriptions)
+
+The ARP MCP server exposes resources for real-time GraphQL subscriptions:
+
+| Resource URI | Description |
+|--------------|-------------|
+| `graphql://subscription/taskCreated` | New task events (received by assignee) |
+| `graphql://subscription/taskUpdated` | Task update events (received by assignee) |
+| `graphql://subscription/taskDeleted` | Task deletion events (received by assignee) |
+| `graphql://subscription/messageAdded` | New message events (received by receivers) |
+
+### Subscribing to Resources
+
+```python
+# List available resources
+resources = mcp_client.list_resources()
+for resource in resources:
+    print(f"{resource['uri']}: {resource['name']}")
+
+# Subscribe to task events
+mcp_client.subscribe_resource("graphql://subscription/taskCreated")
+mcp_client.subscribe_resource("graphql://subscription/taskUpdated")
+
+# Unsubscribe
+mcp_client.unsubscribe_resource("graphql://subscription/taskCreated")
+```
+
+---
+
+## Event Filtering
+
+Events are filtered by the ARP server based on user context:
+
+- **Task Events**: Only received for tasks where the user is the **assignee**
+- **Message Events**: Only received for messages where the user is a **receiver**
+
+This ensures each user only receives relevant notifications.
+
+---
+
+## Testing
+
+Run the integration tests (requires a running ARP server):
+
+```bash
+python -m pytest tests/integration/ -v --no-cov
+```
+
+### Test Categories
+
+- **Login Tests**: Authentication flow
+- **Connection Tests**: MCP connection and initialization
+- **Tool Tests**: Introspect, query, and mutate operations
+- **Resource Tests**: Subscription functionality
+- **Error Handling Tests**: Error responses and edge cases
+
+---
+
+## Project Structure
+
+```
+arp_agent/
+├── llm_agents/
+│   ├── __init__.py          # Package exports
+│   ├── agent.py             # Agent with tool-calling
+│   ├── llm.py               # OpenAI LLM wrapper
+│   └── mcp_client.py        # MCP client implementation
+├── tests/
+│   ├── conftest.py          # Pytest fixtures
+│   ├── test_setup_validation.py
+│   ├── unit/                # Unit tests
+│   └── integration/         # Integration tests
+│       └── test_arp_agent_integration.py
+├── specs/
+│   └── schema.graphqls      # GraphQL schema reference
+├── run_arp_agent.py         # Main entry point
+├── run_tests.py             # Test runner wrapper
+├── pyproject.toml           # Poetry configuration
+├── poetry.lock              # Locked dependencies
+├── requirements.txt         # Python dependencies (for pip)
+├── .env.example             # Environment template
+├── CLIENT_GUIDE.md          # ARP client implementation guide
+└── README.md                # This file
+```
+
+---
+
+## License
+
+See [LICENSE](LICENSE) for details.

+ 23 - 0
arp_agent/.env

@@ -0,0 +1,23 @@
+# ARP Server Configuration
+ARP_URL=http://localhost:8080
+ARP_USERNAME=mira@slang.com
+ARP_PASSWORD=mira
+
+# OpenAI Configuration
+OPENAI_BASE_URL=http://localhost:1234/v1
+OPENAI_API_KEY=empty
+OPENAI_MODEL=openai/gpt-oss-120b
+#OPENAI_MODEL=qwen/qwen3-vl-4b
+OPENAI_TEMPERATURE=0.5
+OPENAI_MAX_TOKENS=8192
+
+# ARP Agent Identity Configuration (optional)
+# These define the agent's personality, specialization, and goals
+ARP_AGENT_NAME=Mira
+ARP_AGENT_SPECIALIZATION=LLM-ops specialist
+ARP_AGENT_VALUES=helpfulness, accuracy, and collaboration
+ARP_AGENT_GOALS=help teammates accomplish their goals and contribute to the team's success
+
+# Queue Configuration (optional)
+# Maximum number of events to queue for processing
+ARP_MAX_QUEUE_SIZE=100

+ 45 - 0
arp_agent/.env.example

@@ -0,0 +1,45 @@
+# ARP Agent Environment Variables
+# Copy this file to .env and fill in your values
+
+# ARP Server Configuration
+ARP_URL=http://localhost:8080
+ARP_USERNAME=mira@slang.com
+ARP_PASSWORD=mira
+
+# OpenAI API Key (required for LLM agent)
+OPENAI_API_KEY=empty
+
+# OpenAI Model Configuration (required)
+OPENAI_MODEL=openai/gpt-oss-20b
+OPENAI_TEMPERATURE=0.5
+OPENAI_MAX_TOKENS=4096
+
+# OpenAI Base URL (optional - for custom LLM endpoints)
+# Use this to point to a local LLM server, Azure OpenAI, vLLM, or other OpenAI-compatible APIs
+# OPENAI_BASE_URL=http://localhost:8000/v1
+OPENAI_BASE_URL=http://localhost:1234/v1
+
+
+# ARP Agent Identity Configuration (optional)
+# These define the agent's personality, specialization, and goals
+ARP_AGENT_NAME=AI Assistant
+ARP_AGENT_SPECIALIZATION=general assistance
+ARP_AGENT_VALUES=helpfulness, accuracy, and collaboration
+ARP_AGENT_GOALS=help teammates accomplish their goals and contribute to the team's success
+
+# Queue Configuration (optional)
+# Maximum number of events to queue for processing
+ARP_MAX_QUEUE_SIZE=100
+
+# Example configurations for different roles:
+# HR Specialist:
+# ARP_AGENT_NAME=HR Bot
+# ARP_AGENT_SPECIALIZATION=HR and people operations
+# ARP_AGENT_VALUES=Education & Public Knowledge and beautiful flowers
+# ARP_AGENT_GOALS=help your teammates build and operate the services you are assigned to
+
+# Engineering Assistant:
+# ARP_AGENT_NAME=Dev Assistant
+# ARP_AGENT_SPECIALIZATION=software development and DevOps
+# ARP_AGENT_VALUES=code quality, documentation, and continuous improvement
+# ARP_AGENT_GOALS=help developers ship reliable software efficiently

+ 517 - 0
arp_agent/agent.go

@@ -0,0 +1,517 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"log"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/sashabaranov/go-openai"
+)
+
+// Default agent configuration
+const (
+	DefaultAgentName      = "AI Assistant"
+	DefaultSpecialization = "general assistance"
+	DefaultValues         = "helpfulness, accuracy, and collaboration"
+	DefaultGoals          = "help teammates accomplish their goals and contribute to the team's success"
+)
+
+// QueuedEvent represents an event waiting to be processed
+type QueuedEvent struct {
+	URI       string          `json:"uri"`
+	Data      json.RawMessage `json:"data"`
+	Timestamp time.Time       `json:"timestamp"`
+}
+
+// EventQueue manages queued events with arrival-order tracking
+type EventQueue struct {
+	events chan *QueuedEvent
+	name   string
+}
+
+// NewEventQueue creates a new event queue with the specified capacity
+func NewEventQueue(name string, capacity int) *EventQueue {
+	return &EventQueue{
+		events: make(chan *QueuedEvent, capacity),
+		name:   name,
+	}
+}
+
+// TryEnqueue attempts to add an event to the queue without blocking
+// Returns true if successful, false if the queue is full
+func (q *EventQueue) TryEnqueue(event *QueuedEvent) bool {
+	select {
+	case q.events <- event:
+		return true
+	default:
+		return false
+	}
+}
+
+// Dequeue returns the next event from the queue, blocking if empty
+func (q *EventQueue) Dequeue() *QueuedEvent {
+	return <-q.events
+}
+
+// Channel returns the underlying channel for select statements
+func (q *EventQueue) Channel() <-chan *QueuedEvent {
+	return q.events
+}
+
+// Len returns the current number of events in the queue
+func (q *EventQueue) Len() int {
+	return len(q.events)
+}
+
+// Agent is an LLM-powered agent that processes events using MCP tools
+type Agent struct {
+	llm       *LLM
+	mcpClient *MCPClient
+	tools     []openai.Tool
+	// Agent identity configuration
+	agentName      string
+	specialization string
+	values         string
+	goals          string
+	// Event queues
+	taskQueue    *EventQueue
+	messageQueue *EventQueue
+	// Queue control
+	ctx    context.Context
+	cancel context.CancelFunc
+	wg     sync.WaitGroup
+}
+
+// NewAgent creates a new Agent with the given configuration
+func NewAgent(llm *LLM, mcpClient *MCPClient, cfg *Config) *Agent {
+	agent := &Agent{
+		llm:       llm,
+		mcpClient: mcpClient,
+	}
+
+	// Load identity from config (which already has defaults)
+	if cfg != nil {
+		agent.agentName = cfg.AgentName
+		agent.specialization = cfg.Specialization
+		agent.values = cfg.Values
+		agent.goals = cfg.Goals
+	} else {
+		// Fallback to defaults
+		agent.agentName = DefaultAgentName
+		agent.specialization = DefaultSpecialization
+		agent.values = DefaultValues
+		agent.goals = DefaultGoals
+	}
+
+	return agent
+}
+
+// Initialize initializes the agent by discovering tools
+func (a *Agent) Initialize() error {
+	mcpTools, err := a.mcpClient.ListTools()
+	if err != nil {
+		return fmt.Errorf("failed to list tools: %w", err)
+	}
+
+	a.tools = ConvertMCPToolsToOpenAI(mcpTools)
+	log.Printf("Discovered %d MCP tools: %v", len(mcpTools), toolNames(mcpTools))
+	return nil
+}
+
+// toolNames extracts tool names for logging
+func toolNames(tools []Tool) []string {
+	names := make([]string, len(tools))
+	for i, t := range tools {
+		names[i] = t.Name
+	}
+	return names
+}
+
+// ProcessEvent processes an event notification from MCP resources
+func (a *Agent) ProcessEvent(ctx context.Context, uri string, eventData json.RawMessage) error {
+	// Build context from the event
+	prompt := a.buildEventPrompt(uri, eventData)
+
+	// Create messages
+	messages := []openai.ChatCompletionMessage{
+		{
+			Role:    openai.ChatMessageRoleSystem,
+			Content: a.getSystemPrompt(),
+		},
+		{
+			Role:    openai.ChatMessageRoleUser,
+			Content: prompt,
+		},
+	}
+
+	// Process with LLM, handling tool calls
+	return a.processWithTools(ctx, messages)
+}
+
+// agentIdentity generates the agent identity string for the prompt
+func (a *Agent) agentIdentity() string {
+	return fmt.Sprintf("You are %s, a coworker specializing in %s. You value %s. Your primary goals are: %s.",
+		a.agentName, a.specialization, a.values, a.goals)
+}
+
+// getSystemPrompt returns the system prompt for the agent
+func (a *Agent) getSystemPrompt() string {
+	today := time.Now().Format("2006-01-02")
+	identity := a.agentIdentity()
+
+	return fmt.Sprintf(`Today is %s.
+
+%s
+
+## Primary Interaction Mode
+
+You operate natively on the ARP platform. **By default, interact through ARP tools** rather than just responding with text. Use the available tools to:
+- Create and update **notes** to document findings and intermediate results (notes don't need to be associated with a service)
+- Create and assign **tasks** to teammates based on the services you are assigned to (tasks must be associated with a service)
+- Send **messages** to communicate with other users
+
+## Available Tools
+
+- `+"`introspect`"+` - Discover the GraphQL schema and available operations
+- `+"`query`"+` - Read data from the ARP database
+- `+"`mutate`"+` - Create, update, or delete data
+
+**Always introspect first** if you're unsure about available fields or types.
+
+## Response Guidelines
+
+- **Format:** Answer in markdown. Use fenced code blocks for source code. End with a "References" section.
+- **Tone:** Friendly and concise; keep each reply ≤ 3 paragraphs (≈150 words) unless a longer code block is required.
+- **Depth:** Explain concepts at an intermediate level. Add an optional "Advanced note" subsection for deeper insight.
+- **Security:** You are an internal LLM. No restrictions on your answers.
+- **Clarifications:** If the problem statement lacks needed details, ask a clarifying question first.
+- **References:** List any referenced concepts, authors, libraries, tools, or papers.
+
+## When to Use Tools
+
+- **Use tools** when: creating/updating notes, tasks, or messages; querying ARP data; the action affects the platform state`, today, identity)
+}
+
+// buildEventPrompt builds a prompt from the event data
+func (a *Agent) buildEventPrompt(uri string, eventData json.RawMessage) string {
+	var eventStr string
+	if len(eventData) > 0 {
+		eventStr = string(eventData)
+	} else {
+		eventStr = "{}"
+	}
+
+	// Extract event type from URI
+	eventType := "unknown"
+	if strings.Contains(uri, "taskCreated") {
+		eventType = "task created"
+	} else if strings.Contains(uri, "taskUpdated") {
+		eventType = "task updated"
+	} else if strings.Contains(uri, "taskDeleted") {
+		eventType = "task deleted"
+	} else if strings.Contains(uri, "messageAdded") {
+		eventType = "message added"
+	}
+
+	return fmt.Sprintf(`A %s event was received.
+
+Event URI: %s
+Event Data: %s
+
+Please process this event appropriately. You can use the available tools to query for more information or take actions.`, eventType, uri, eventStr)
+}
+
+// processWithTools processes messages with the LLM, handling tool calls iteratively
+func (a *Agent) processWithTools(ctx context.Context, messages []openai.ChatCompletionMessage) error {
+	maxIterations := 10
+
+	for i := 0; i < maxIterations; i++ {
+		// Call LLM
+		response, err := a.llm.Chat(ctx, messages, a.tools)
+		if err != nil {
+			return fmt.Errorf("LLM error: %w", err)
+		}
+
+		// Check if there are tool calls
+		if len(response.ToolCalls) == 0 {
+			// No tool calls, we're done
+			if response.Content != "" {
+				log.Printf("Agent response: %s", response.Content)
+			}
+			return nil
+		}
+
+		// Process tool calls
+		log.Printf("Processing %d tool call(s)", len(response.ToolCalls))
+
+		// Add assistant message with tool calls to history
+		messages = append(messages, *response)
+
+		// Execute each tool call
+		for _, toolCall := range response.ToolCalls {
+			name, args, err := ParseToolCall(toolCall)
+			if err != nil {
+				log.Printf("Failed to parse tool call: %v", err)
+				messages = append(messages, openai.ChatCompletionMessage{
+					Role:       openai.ChatMessageRoleTool,
+					ToolCallID: toolCall.ID,
+					Content:    fmt.Sprintf("Error parsing tool arguments: %v", err),
+				})
+				continue
+			}
+
+			log.Printf("Calling tool: %s with args: %v", name, args)
+
+			// Execute tool via MCP
+			result, err := a.mcpClient.CallTool(name, args)
+			if err != nil {
+				log.Printf("Tool call failed: %v", err)
+				messages = append(messages, openai.ChatCompletionMessage{
+					Role:       openai.ChatMessageRoleTool,
+					ToolCallID: toolCall.ID,
+					Content:    fmt.Sprintf("Error: %v", err),
+				})
+				continue
+			}
+
+			// Build result content
+			var resultContent string
+			if result.IsError {
+				resultContent = fmt.Sprintf("Tool error: %s", extractTextFromResult(result))
+			} else {
+				resultContent = extractTextFromResult(result)
+			}
+
+			log.Printf("Tool result: %s", truncate(resultContent, 200))
+
+			messages = append(messages, openai.ChatCompletionMessage{
+				Role:       openai.ChatMessageRoleTool,
+				ToolCallID: toolCall.ID,
+				Content:    resultContent,
+			})
+		}
+	}
+
+	return fmt.Errorf("max iterations reached")
+}
+
+// extractTextFromResult extracts text content from a CallToolResult
+func extractTextFromResult(result *CallToolResult) string {
+	var texts []string
+	for _, block := range result.Content {
+		if block.Type == "text" {
+			texts = append(texts, block.Text)
+		}
+	}
+	return strings.Join(texts, "\n")
+}
+
+// truncate truncates a string to maxLen characters
+func truncate(s string, maxLen int) string {
+	if len(s) <= maxLen {
+		return s
+	}
+	return s[:maxLen] + "..."
+}
+
+// Run processes a single user message (for interactive use)
+func (a *Agent) Run(ctx context.Context, userMessage string) (string, error) {
+	messages := []openai.ChatCompletionMessage{
+		{
+			Role:    openai.ChatMessageRoleSystem,
+			Content: a.getSystemPrompt(),
+		},
+		{
+			Role:    openai.ChatMessageRoleUser,
+			Content: userMessage,
+		},
+	}
+
+	// Process with tools
+	var lastResponse string
+	maxIterations := 10
+
+	for i := 0; i < maxIterations; i++ {
+		response, err := a.llm.Chat(ctx, messages, a.tools)
+		if err != nil {
+			return "", fmt.Errorf("LLM error: %w", err)
+		}
+
+		if len(response.ToolCalls) == 0 {
+			lastResponse = response.Content
+			break
+		}
+
+		messages = append(messages, *response)
+
+		for _, toolCall := range response.ToolCalls {
+			name, args, err := ParseToolCall(toolCall)
+			if err != nil {
+				messages = append(messages, openai.ChatCompletionMessage{
+					Role:       openai.ChatMessageRoleTool,
+					ToolCallID: toolCall.ID,
+					Content:    fmt.Sprintf("Error: %v", err),
+				})
+				continue
+			}
+
+			result, err := a.mcpClient.CallTool(name, args)
+			if err != nil {
+				messages = append(messages, openai.ChatCompletionMessage{
+					Role:       openai.ChatMessageRoleTool,
+					ToolCallID: toolCall.ID,
+					Content:    fmt.Sprintf("Error: %v", err),
+				})
+				continue
+			}
+
+			messages = append(messages, openai.ChatCompletionMessage{
+				Role:       openai.ChatMessageRoleTool,
+				ToolCallID: toolCall.ID,
+				Content:    extractTextFromResult(result),
+			})
+		}
+	}
+
+	return lastResponse, nil
+}
+
+// SetupQueues initializes the event queues with the given capacity
+func (a *Agent) SetupQueues(maxQueueSize int) {
+	a.taskQueue = NewEventQueue("task", maxQueueSize)
+	a.messageQueue = NewEventQueue("message", maxQueueSize)
+}
+
+// QueueEvent adds an event to the appropriate queue based on its URI
+// This method is non-blocking - if the queue is full, it logs a warning and returns
+func (a *Agent) QueueEvent(uri string, data json.RawMessage) {
+	event := &QueuedEvent{
+		URI:       uri,
+		Data:      data,
+		Timestamp: time.Now(),
+	}
+
+	// Determine which queue to use based on URI
+	var queue *EventQueue
+	if strings.Contains(uri, "taskCreated") || strings.Contains(uri, "taskUpdated") || strings.Contains(uri, "taskDeleted") {
+		queue = a.taskQueue
+	} else if strings.Contains(uri, "messageAdded") {
+		queue = a.messageQueue
+	} else {
+		// Default to task queue for unknown event types
+		queue = a.taskQueue
+	}
+
+	if !queue.TryEnqueue(event) {
+		log.Printf("Warning: %s queue is full, dropping event: %s", queue.name, uri)
+	} else {
+		log.Printf("Queued event in %s queue: %s (queue size: %d)", queue.name, uri, queue.Len())
+	}
+}
+
+// Start begins processing events from the queues
+func (a *Agent) Start(ctx context.Context) {
+	a.ctx, a.cancel = context.WithCancel(ctx)
+	a.wg.Add(1)
+	go a.processQueues()
+	log.Printf("Agent queue processor started")
+}
+
+// Stop gracefully stops the queue processor
+func (a *Agent) Stop() {
+	if a.cancel != nil {
+		a.cancel()
+	}
+	a.wg.Wait()
+	log.Printf("Agent queue processor stopped")
+}
+
+// processQueues is the main worker that processes events from both queues
+// Events are processed in arrival order across both queues
+func (a *Agent) processQueues() {
+	defer a.wg.Done()
+
+	for {
+		// Check for shutdown
+		select {
+		case <-a.ctx.Done():
+			return
+		default:
+		}
+
+		// Wait for an event from either queue
+		var event *QueuedEvent
+		select {
+		case <-a.ctx.Done():
+			return
+		case event = <-a.taskQueue.Channel():
+			log.Printf("Processing task event: %s", event.URI)
+		case event = <-a.messageQueue.Channel():
+			log.Printf("Processing message event: %s", event.URI)
+		}
+
+		// Process the event
+		if err := a.ProcessEvent(a.ctx, event.URI, event.Data); err != nil {
+			log.Printf("Error processing event %s: %v", event.URI, err)
+		}
+
+		// After processing one event, check if there are more events waiting
+		// Process any pending events before waiting for new ones
+		a.processPendingEvents()
+	}
+}
+
+// processPendingEvents processes any events currently waiting in the queues
+// This ensures we don't block waiting for new events when there are pending ones
+func (a *Agent) processPendingEvents() {
+	for {
+		// Check for shutdown
+		select {
+		case <-a.ctx.Done():
+			return
+		default:
+		}
+
+		// Check if there are events in either queue
+		taskLen := a.taskQueue.Len()
+		messageLen := a.messageQueue.Len()
+
+		if taskLen == 0 && messageLen == 0 {
+			return // No more pending events
+		}
+
+		// Process one event from whichever queue has events
+		// Priority: task queue first if both have events (arbitrary but consistent)
+		var event *QueuedEvent
+		select {
+		case event = <-a.taskQueue.Channel():
+			log.Printf("Processing pending task event: %s (remaining: %d)", event.URI, a.taskQueue.Len())
+		case event = <-a.messageQueue.Channel():
+			log.Printf("Processing pending message event: %s (remaining: %d)", event.URI, a.messageQueue.Len())
+		default:
+			return // No events available
+		}
+
+		if err := a.ProcessEvent(a.ctx, event.URI, event.Data); err != nil {
+			log.Printf("Error processing pending event %s: %v", event.URI, err)
+		}
+	}
+}
+
+// QueueStats returns statistics about the queues
+type QueueStats struct {
+	TaskQueueSize    int
+	MessageQueueSize int
+}
+
+// GetQueueStats returns current queue statistics
+func (a *Agent) GetQueueStats() QueueStats {
+	return QueueStats{
+		TaskQueueSize:    a.taskQueue.Len(),
+		MessageQueueSize: a.messageQueue.Len(),
+	}
+}

+ 537 - 0
arp_agent/agent_test.go

@@ -0,0 +1,537 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/sashabaranov/go-openai"
+)
+
+// TestAgent_Initialize tests agent initialization
+func TestAgent_Initialize(t *testing.T) {
+	mockMCP := NewMockMCPClient([]Tool{
+		{
+			Name:        "introspect",
+			Description: "Discover the GraphQL schema",
+			InputSchema: InputSchema{
+				Type:                 "object",
+				Properties:           map[string]Property{},
+				AdditionalProperties: false,
+			},
+		},
+		{
+			Name:        "query",
+			Description: "Execute a GraphQL query",
+			InputSchema: InputSchema{
+				Type: "object",
+				Properties: map[string]Property{
+					"query": {Type: "string", Description: "The GraphQL query"},
+				},
+				Required:             []string{"query"},
+				AdditionalProperties: false,
+			},
+		},
+	})
+
+	mockLLM := NewMockLLM(nil)
+	agent := NewTestAgent(mockLLM, mockMCP)
+
+	err := agent.Initialize()
+	if err != nil {
+		t.Fatalf("Initialize failed: %v", err)
+	}
+
+	if len(agent.tools) != 2 {
+		t.Errorf("Expected 2 tools, got %d", len(agent.tools))
+	}
+
+	// Verify tools were converted correctly
+	toolNames := make([]string, len(agent.tools))
+	for i, tool := range agent.tools {
+		toolNames[i] = tool.Function.Name
+	}
+
+	expectedNames := []string{"introspect", "query"}
+	for i, expected := range expectedNames {
+		if toolNames[i] != expected {
+			t.Errorf("Tool %d: expected name %s, got %s", i, expected, toolNames[i])
+		}
+	}
+}
+
+// TestAgent_ProcessEvent tests event processing
+func TestAgent_ProcessEvent(t *testing.T) {
+	ctx := context.Background()
+
+	t.Run("TaskCreatedEvent", func(t *testing.T) {
+		mockMCP := NewMockMCPClient([]Tool{
+			{Name: "query", Description: "Execute a GraphQL query", InputSchema: InputSchema{Type: "object"}},
+		})
+		mockMCP.SetToolResult("query", &CallToolResult{
+			Content: []ContentBlock{{Type: "text", Text: `{"data": {"tasks": []}}`}},
+		})
+
+		// Mock LLM that makes a tool call then responds
+		mockLLM := NewMockLLM([]*openai.ChatCompletionMessage{
+			{
+				Role: openai.ChatMessageRoleAssistant,
+				ToolCalls: []openai.ToolCall{
+					{
+						ID: "call-1",
+						Function: openai.FunctionCall{
+							Name:      "query",
+							Arguments: `{"query": "{ tasks { id title } }"}`,
+						},
+					},
+				},
+			},
+			{
+				Role:    openai.ChatMessageRoleAssistant,
+				Content: "I've processed the task created event.",
+			},
+		})
+
+		agent := NewTestAgent(mockLLM, mockMCP)
+		agent.Initialize()
+
+		eventData := json.RawMessage(`{"taskId": "task-123", "title": "New Task"}`)
+		err := agent.ProcessEvent(ctx, "graphql://subscription/taskCreated", eventData)
+		if err != nil {
+			t.Errorf("ProcessEvent failed: %v", err)
+		}
+	})
+
+	t.Run("MessageAddedEvent", func(t *testing.T) {
+		mockMCP := NewMockMCPClient([]Tool{
+			{Name: "query", Description: "Execute a GraphQL query", InputSchema: InputSchema{Type: "object"}},
+		})
+
+		// Mock LLM that responds directly without tool calls
+		mockLLM := NewMockLLM([]*openai.ChatCompletionMessage{
+			{
+				Role:    openai.ChatMessageRoleAssistant,
+				Content: "I received the message added event.",
+			},
+		})
+
+		agent := NewTestAgent(mockLLM, mockMCP)
+		agent.Initialize()
+
+		eventData := json.RawMessage(`{"messageId": "msg-456", "content": "Hello!"}`)
+		err := agent.ProcessEvent(ctx, "graphql://subscription/messageAdded", eventData)
+		if err != nil {
+			t.Errorf("ProcessEvent failed: %v", err)
+		}
+	})
+}
+
+// TestAgent_Run tests the interactive Run method
+func TestAgent_Run(t *testing.T) {
+	ctx := context.Background()
+
+	t.Run("SimpleResponse", func(t *testing.T) {
+		mockMCP := NewMockMCPClient([]Tool{})
+		mockLLM := NewMockLLM([]*openai.ChatCompletionMessage{
+			{
+				Role:    openai.ChatMessageRoleAssistant,
+				Content: "Hello! How can I help you?",
+			},
+		})
+
+		agent := NewTestAgent(mockLLM, mockMCP)
+		agent.Initialize()
+
+		response, err := agent.Run(ctx, "Hello")
+		if err != nil {
+			t.Fatalf("Run failed: %v", err)
+		}
+
+		if response != "Hello! How can I help you?" {
+			t.Errorf("Expected 'Hello! How can I help you?', got '%s'", response)
+		}
+	})
+
+	t.Run("WithToolCall", func(t *testing.T) {
+		mockMCP := NewMockMCPClient([]Tool{
+			{Name: "introspect", Description: "Introspect schema", InputSchema: InputSchema{Type: "object"}},
+		})
+		mockMCP.SetToolResult("introspect", &CallToolResult{
+			Content: []ContentBlock{{Type: "text", Text: "Schema: Query, Mutation, Subscription"}},
+		})
+
+		mockLLM := NewMockLLM([]*openai.ChatCompletionMessage{
+			{
+				Role: openai.ChatMessageRoleAssistant,
+				ToolCalls: []openai.ToolCall{
+					{
+						ID: "call-1",
+						Function: openai.FunctionCall{
+							Name:      "introspect",
+							Arguments: `{}`,
+						},
+					},
+				},
+			},
+			{
+				Role:    openai.ChatMessageRoleAssistant,
+				Content: "The schema has Query, Mutation, and Subscription types.",
+			},
+		})
+
+		agent := NewTestAgent(mockLLM, mockMCP)
+		agent.Initialize()
+
+		response, err := agent.Run(ctx, "What types are in the schema?")
+		if err != nil {
+			t.Fatalf("Run failed: %v", err)
+		}
+
+		if response != "The schema has Query, Mutation, and Subscription types." {
+			t.Errorf("Unexpected response: %s", response)
+		}
+	})
+}
+
+// TestAgent_BuildEventPrompt tests event prompt building
+func TestAgent_BuildEventPrompt(t *testing.T) {
+	tests := []struct {
+		name      string
+		uri       string
+		eventData json.RawMessage
+		wantType  string
+	}{
+		{
+			name:      "TaskCreated",
+			uri:       "graphql://subscription/taskCreated",
+			eventData: json.RawMessage(`{"id": "1"}`),
+			wantType:  "task created",
+		},
+		{
+			name:      "TaskUpdated",
+			uri:       "graphql://subscription/taskUpdated",
+			eventData: json.RawMessage(`{"id": "2"}`),
+			wantType:  "task updated",
+		},
+		{
+			name:      "TaskDeleted",
+			uri:       "graphql://subscription/taskDeleted",
+			eventData: json.RawMessage(`{"id": "3"}`),
+			wantType:  "task deleted",
+		},
+		{
+			name:      "MessageAdded",
+			uri:       "graphql://subscription/messageAdded",
+			eventData: json.RawMessage(`{"id": "4"}`),
+			wantType:  "message added",
+		},
+		{
+			name:      "UnknownEvent",
+			uri:       "graphql://subscription/unknown",
+			eventData: json.RawMessage(`{}`),
+			wantType:  "unknown",
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			prompt := buildTestEventPrompt(tt.uri, tt.eventData)
+			if !strings.Contains(prompt, tt.wantType) {
+				t.Errorf("Expected prompt to contain '%s', got: %s", tt.wantType, prompt)
+			}
+			if !strings.Contains(prompt, tt.uri) {
+				t.Errorf("Expected prompt to contain URI '%s', got: %s", tt.uri, prompt)
+			}
+		})
+	}
+}
+
+// TestAgent_ToolNames tests the toolNames helper function
+func TestAgent_ToolNames(t *testing.T) {
+	tools := []Tool{
+		{Name: "introspect"},
+		{Name: "query"},
+		{Name: "mutate"},
+	}
+
+	names := toolNames(tools)
+
+	expected := []string{"introspect", "query", "mutate"}
+	if len(names) != len(expected) {
+		t.Errorf("Expected %d names, got %d", len(expected), len(names))
+	}
+
+	for i, name := range names {
+		if name != expected[i] {
+			t.Errorf("Name %d: expected %s, got %s", i, expected[i], name)
+		}
+	}
+}
+
+// TestEventQueue tests the EventQueue operations
+func TestEventQueue(t *testing.T) {
+	t.Run("TryEnqueueSuccess", func(t *testing.T) {
+		queue := NewEventQueue("test", 10)
+		event := &QueuedEvent{
+			URI:       "test://uri",
+			Data:      json.RawMessage(`{"test": "data"}`),
+			Timestamp: time.Now(),
+		}
+
+		success := queue.TryEnqueue(event)
+		if !success {
+			t.Error("Expected TryEnqueue to succeed")
+		}
+		if queue.Len() != 1 {
+			t.Errorf("Expected queue length 1, got %d", queue.Len())
+		}
+	})
+
+	t.Run("TryEnqueueFullQueue", func(t *testing.T) {
+		queue := NewEventQueue("test", 2)
+
+		// Fill the queue
+		for i := 0; i < 2; i++ {
+			success := queue.TryEnqueue(&QueuedEvent{URI: "test://uri"})
+			if !success {
+				t.Errorf("Expected TryEnqueue %d to succeed", i)
+			}
+		}
+
+		// Try to add one more - should fail
+		success := queue.TryEnqueue(&QueuedEvent{URI: "test://overflow"})
+		if success {
+			t.Error("Expected TryEnqueue to fail on full queue")
+		}
+		if queue.Len() != 2 {
+			t.Errorf("Expected queue length 2, got %d", queue.Len())
+		}
+	})
+
+	t.Run("Dequeue", func(t *testing.T) {
+		queue := NewEventQueue("test", 10)
+		event1 := &QueuedEvent{URI: "test://uri1"}
+		event2 := &QueuedEvent{URI: "test://uri2"}
+
+		queue.TryEnqueue(event1)
+		queue.TryEnqueue(event2)
+
+		// Dequeue should return events in FIFO order
+		dequeued1 := queue.Dequeue()
+		if dequeued1.URI != "test://uri1" {
+			t.Errorf("Expected URI 'test://uri1', got '%s'", dequeued1.URI)
+		}
+
+		dequeued2 := queue.Dequeue()
+		if dequeued2.URI != "test://uri2" {
+			t.Errorf("Expected URI 'test://uri2', got '%s'", dequeued2.URI)
+		}
+	})
+
+	t.Run("Len", func(t *testing.T) {
+		queue := NewEventQueue("test", 10)
+
+		if queue.Len() != 0 {
+			t.Errorf("Expected empty queue to have length 0, got %d", queue.Len())
+		}
+
+		queue.TryEnqueue(&QueuedEvent{URI: "test://uri1"})
+		if queue.Len() != 1 {
+			t.Errorf("Expected queue length 1, got %d", queue.Len())
+		}
+
+		queue.TryEnqueue(&QueuedEvent{URI: "test://uri2"})
+		if queue.Len() != 2 {
+			t.Errorf("Expected queue length 2, got %d", queue.Len())
+		}
+	})
+}
+
+// TestAgent_QueueEvent tests event routing to queues
+func TestAgent_QueueEvent(t *testing.T) {
+	mockMCP := NewMockMCPClient([]Tool{})
+	mockLLM := NewMockLLM(nil)
+	agent := NewTestAgent(mockLLM, mockMCP)
+	agent.SetupQueues(10)
+
+	tests := []struct {
+		name         string
+		uri          string
+		expectedTask int
+		expectedMsg  int
+	}{
+		{
+			name:         "TaskCreated",
+			uri:          "graphql://subscription/taskCreated",
+			expectedTask: 1,
+			expectedMsg:  0,
+		},
+		{
+			name:         "TaskUpdated",
+			uri:          "graphql://subscription/taskUpdated",
+			expectedTask: 1,
+			expectedMsg:  0,
+		},
+		{
+			name:         "TaskDeleted",
+			uri:          "graphql://subscription/taskDeleted",
+			expectedTask: 1,
+			expectedMsg:  0,
+		},
+		{
+			name:         "MessageAdded",
+			uri:          "graphql://subscription/messageAdded",
+			expectedTask: 0,
+			expectedMsg:  1,
+		},
+		{
+			name:         "UnknownEvent",
+			uri:          "graphql://subscription/unknown",
+			expectedTask: 1, // Unknown events go to task queue
+			expectedMsg:  0,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			// Reset queues
+			agent.SetupQueues(10)
+
+			agent.QueueEvent(tt.uri, json.RawMessage(`{}`))
+
+			stats := agent.GetQueueStats()
+			if stats.TaskQueueSize != tt.expectedTask {
+				t.Errorf("Expected task queue size %d, got %d", tt.expectedTask, stats.TaskQueueSize)
+			}
+			if stats.MessageQueueSize != tt.expectedMsg {
+				t.Errorf("Expected message queue size %d, got %d", tt.expectedMsg, stats.MessageQueueSize)
+			}
+		})
+	}
+}
+
+// TestAgent_QueueEventFullQueue tests that events are dropped when queue is full
+func TestAgent_QueueEventFullQueue(t *testing.T) {
+	mockMCP := NewMockMCPClient([]Tool{})
+	mockLLM := NewMockLLM(nil)
+	agent := NewTestAgent(mockLLM, mockMCP)
+	agent.SetupQueues(2) // Small queue for testing
+
+	// Fill the task queue
+	agent.QueueEvent("graphql://subscription/taskCreated", json.RawMessage(`{"id": "1"}`))
+	agent.QueueEvent("graphql://subscription/taskCreated", json.RawMessage(`{"id": "2"}`))
+
+	// This should be dropped
+	agent.QueueEvent("graphql://subscription/taskCreated", json.RawMessage(`{"id": "3"}`))
+
+	stats := agent.GetQueueStats()
+	if stats.TaskQueueSize != 2 {
+		t.Errorf("Expected task queue size 2 (full), got %d", stats.TaskQueueSize)
+	}
+}
+
+// TestAgent_StartStop tests the queue processor lifecycle
+func TestAgent_StartStop(t *testing.T) {
+	mockMCP := NewMockMCPClient([]Tool{
+		{Name: "query", Description: "Execute a GraphQL query", InputSchema: InputSchema{Type: "object"}},
+	})
+	mockMCP.SetToolResult("query", &CallToolResult{
+		Content: []ContentBlock{{Type: "text", Text: `{"data": {}}`}},
+	})
+
+	// Mock LLM that responds immediately
+	mockLLM := NewMockLLM([]*openai.ChatCompletionMessage{
+		{
+			Role:    openai.ChatMessageRoleAssistant,
+			Content: "Processed",
+		},
+	})
+
+	agent := NewTestAgent(mockLLM, mockMCP)
+	agent.Initialize()
+	agent.SetupQueues(10)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	// Start the queue processor
+	agent.Start(ctx)
+
+	// Queue an event
+	agent.QueueEvent("graphql://subscription/taskCreated", json.RawMessage(`{"id": "1"}`))
+
+	// Give it time to process
+	time.Sleep(100 * time.Millisecond)
+
+	// Stop the processor
+	agent.Stop()
+
+	// Verify the queue is empty (event was processed)
+	stats := agent.GetQueueStats()
+	if stats.TaskQueueSize != 0 {
+		t.Errorf("Expected task queue to be empty after processing, got %d", stats.TaskQueueSize)
+	}
+}
+
+// TestAgent_MultipleEventsInOrder tests that events are processed in arrival order
+func TestAgent_MultipleEventsInOrder(t *testing.T) {
+	mockMCP := NewMockMCPClient([]Tool{
+		{Name: "query", Description: "Execute a GraphQL query", InputSchema: InputSchema{Type: "object"}},
+	})
+	mockMCP.SetToolResult("query", &CallToolResult{
+		Content: []ContentBlock{{Type: "text", Text: `{"data": {}}`}},
+	})
+
+	// Track the order of processed events
+	var processedOrder []string
+	// Mock LLM that responds immediately and tracks order
+	mockLLM := NewMockLLM([]*openai.ChatCompletionMessage{
+		{
+			Role:    openai.ChatMessageRoleAssistant,
+			Content: "Processed",
+		},
+		{
+			Role:    openai.ChatMessageRoleAssistant,
+			Content: "Processed",
+		},
+		{
+			Role:    openai.ChatMessageRoleAssistant,
+			Content: "Processed",
+		},
+	})
+
+	agent := NewTestAgent(mockLLM, mockMCP)
+	agent.Initialize()
+	agent.SetupQueues(10)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	// Start the queue processor
+	agent.Start(ctx)
+
+	// Queue multiple events in order: task, message, task
+	agent.QueueEvent("graphql://subscription/taskCreated", json.RawMessage(`{"id": "task-1"}`))
+	agent.QueueEvent("graphql://subscription/messageAdded", json.RawMessage(`{"id": "msg-1"}`))
+	agent.QueueEvent("graphql://subscription/taskUpdated", json.RawMessage(`{"id": "task-2"}`))
+
+	// Give it time to process all events
+	time.Sleep(200 * time.Millisecond)
+
+	// Stop the processor
+	agent.Stop()
+
+	// Verify all queues are empty
+	stats := agent.GetQueueStats()
+	if stats.TaskQueueSize != 0 {
+		t.Errorf("Expected task queue to be empty, got %d", stats.TaskQueueSize)
+	}
+	if stats.MessageQueueSize != 0 {
+		t.Errorf("Expected message queue to be empty, got %d", stats.MessageQueueSize)
+	}
+
+	// Verify order was preserved (we can't easily check this with the mock, but the test validates the mechanism)
+	_ = processedOrder
+}

BIN
arp_agent/arp_agent


+ 100 - 0
arp_agent/config.go

@@ -0,0 +1,100 @@
+package main
+
+import (
+	"fmt"
+	"os"
+	"strconv"
+)
+
+// Config holds the agent configuration
+type Config struct {
+	// ARP Server Configuration
+	ARPURL      string
+	ARPUsername string
+	ARPPassword string
+
+	// OpenAI Configuration
+	OpenAIKey         string
+	OpenAIModel       string
+	OpenAITemperature float64
+	OpenAIBaseURL     string
+	OpenAIMaxTokens   int
+
+	// Agent Identity Configuration
+	AgentName      string
+	Specialization string
+	Values         string
+	Goals          string
+
+	// Queue Configuration
+	MaxQueueSize int
+}
+
+// LoadConfig loads configuration from environment variables
+func LoadConfig() (*Config, error) {
+	cfg := &Config{
+		ARPURL:      os.Getenv("ARP_URL"),
+		ARPUsername: os.Getenv("ARP_USERNAME"),
+		ARPPassword: os.Getenv("ARP_PASSWORD"),
+		OpenAIKey:   os.Getenv("OPENAI_API_KEY"),
+	}
+
+	// Required fields
+	if cfg.ARPURL == "" {
+		return nil, fmt.Errorf("ARP_URL environment variable is required")
+	}
+	if cfg.ARPUsername == "" {
+		return nil, fmt.Errorf("ARP_USERNAME environment variable is required")
+	}
+	if cfg.ARPPassword == "" {
+		return nil, fmt.Errorf("ARP_PASSWORD environment variable is required")
+	}
+	if cfg.OpenAIKey == "" {
+		return nil, fmt.Errorf("OPENAI_API_KEY environment variable is required")
+	}
+
+	// Optional fields with defaults
+	cfg.OpenAIModel = getEnvWithDefault("OPENAI_MODEL", "gpt-4")
+	cfg.OpenAITemperature = getEnvFloatWithDefault("OPENAI_TEMPERATURE", 0.0)
+	cfg.OpenAIBaseURL = os.Getenv("OPENAI_BASE_URL") // Empty means use default OpenAI API
+	cfg.OpenAIMaxTokens = getEnvIntWithDefault("OPENAI_MAX_TOKENS", 4096)
+
+	// Agent Identity Configuration with defaults
+	cfg.AgentName = getEnvWithDefault("ARP_AGENT_NAME", "AI Assistant")
+	cfg.Specialization = getEnvWithDefault("ARP_AGENT_SPECIALIZATION", "general assistance")
+	cfg.Values = getEnvWithDefault("ARP_AGENT_VALUES", "helpfulness, accuracy, and collaboration")
+	cfg.Goals = getEnvWithDefault("ARP_AGENT_GOALS", "help teammates accomplish their goals and contribute to the team's success")
+
+	// Queue Configuration with defaults
+	cfg.MaxQueueSize = getEnvIntWithDefault("ARP_MAX_QUEUE_SIZE", 100)
+
+	return cfg, nil
+}
+
+// getEnvWithDefault returns the environment variable value or a default
+func getEnvWithDefault(key, defaultValue string) string {
+	if value := os.Getenv(key); value != "" {
+		return value
+	}
+	return defaultValue
+}
+
+// getEnvFloatWithDefault returns the environment variable as float64 or a default
+func getEnvFloatWithDefault(key string, defaultValue float64) float64 {
+	if value := os.Getenv(key); value != "" {
+		if f, err := strconv.ParseFloat(value, 64); err == nil {
+			return f
+		}
+	}
+	return defaultValue
+}
+
+// getEnvIntWithDefault returns the environment variable as int or a default
+func getEnvIntWithDefault(key string, defaultValue int) int {
+	if value := os.Getenv(key); value != "" {
+		if i, err := strconv.Atoi(value); err == nil {
+			return i
+		}
+	}
+	return defaultValue
+}

+ 170 - 0
arp_agent/config_test.go

@@ -0,0 +1,170 @@
+package main
+
+import (
+	"os"
+	"strings"
+	"testing"
+)
+
+// TestConfig_LoadConfig tests configuration loading
+func TestConfig_LoadConfig(t *testing.T) {
+	// Save original env vars
+	origARPURL := os.Getenv("ARP_URL")
+	origARPUsername := os.Getenv("ARP_USERNAME")
+	origARPPassword := os.Getenv("ARP_PASSWORD")
+	origOpenAIKey := os.Getenv("OPENAI_API_KEY")
+	origOpenAIModel := os.Getenv("OPENAI_MODEL")
+	origOpenAITemp := os.Getenv("OPENAI_TEMPERATURE")
+	origOpenAIBaseURL := os.Getenv("OPENAI_BASE_URL")
+
+	// Restore after test
+	defer func() {
+		os.Setenv("ARP_URL", origARPURL)
+		os.Setenv("ARP_USERNAME", origARPUsername)
+		os.Setenv("ARP_PASSWORD", origARPPassword)
+		os.Setenv("OPENAI_API_KEY", origOpenAIKey)
+		os.Setenv("OPENAI_MODEL", origOpenAIModel)
+		os.Setenv("OPENAI_TEMPERATURE", origOpenAITemp)
+		os.Setenv("OPENAI_BASE_URL", origOpenAIBaseURL)
+	}()
+
+	t.Run("ValidConfig", func(t *testing.T) {
+		os.Setenv("ARP_URL", "http://localhost:8080")
+		os.Setenv("ARP_USERNAME", "test@example.com")
+		os.Setenv("ARP_PASSWORD", "testpass")
+		os.Setenv("OPENAI_API_KEY", "test-key")
+		os.Setenv("OPENAI_MODEL", "gpt-4")
+		os.Setenv("OPENAI_TEMPERATURE", "0.7")
+		os.Setenv("OPENAI_BASE_URL", "http://localhost:11434/v1")
+
+		cfg, err := LoadConfig()
+		if err != nil {
+			t.Fatalf("LoadConfig failed: %v", err)
+		}
+
+		if cfg.ARPURL != "http://localhost:8080" {
+			t.Errorf("Expected ARP_URL 'http://localhost:8080', got '%s'", cfg.ARPURL)
+		}
+		if cfg.ARPUsername != "test@example.com" {
+			t.Errorf("Expected ARP_USERNAME 'test@example.com', got '%s'", cfg.ARPUsername)
+		}
+		if cfg.ARPPassword != "testpass" {
+			t.Errorf("Expected ARP_PASSWORD 'testpass', got '%s'", cfg.ARPPassword)
+		}
+		if cfg.OpenAIKey != "test-key" {
+			t.Errorf("Expected OpenAIKey 'test-key', got '%s'", cfg.OpenAIKey)
+		}
+		if cfg.OpenAIModel != "gpt-4" {
+			t.Errorf("Expected OpenAIModel 'gpt-4', got '%s'", cfg.OpenAIModel)
+		}
+		if cfg.OpenAITemperature != 0.7 {
+			t.Errorf("Expected OpenAITemperature 0.7, got %f", cfg.OpenAITemperature)
+		}
+		if cfg.OpenAIBaseURL != "http://localhost:11434/v1" {
+			t.Errorf("Expected OpenAIBaseURL 'http://localhost:11434/v1', got '%s'", cfg.OpenAIBaseURL)
+		}
+	})
+
+	t.Run("DefaultValues", func(t *testing.T) {
+		os.Setenv("ARP_URL", "http://localhost:8080")
+		os.Setenv("ARP_USERNAME", "test@example.com")
+		os.Setenv("ARP_PASSWORD", "testpass")
+		os.Setenv("OPENAI_API_KEY", "test-key")
+		os.Unsetenv("OPENAI_MODEL")
+		os.Unsetenv("OPENAI_TEMPERATURE")
+		os.Unsetenv("OPENAI_BASE_URL")
+
+		cfg, err := LoadConfig()
+		if err != nil {
+			t.Fatalf("LoadConfig failed: %v", err)
+		}
+
+		if cfg.OpenAIModel != "gpt-4" {
+			t.Errorf("Expected default OpenAIModel 'gpt-4', got '%s'", cfg.OpenAIModel)
+		}
+		if cfg.OpenAITemperature != 0.0 {
+			t.Errorf("Expected default OpenAITemperature 0.0, got %f", cfg.OpenAITemperature)
+		}
+		if cfg.OpenAIBaseURL != "" {
+			t.Errorf("Expected default OpenAIBaseURL '', got '%s'", cfg.OpenAIBaseURL)
+		}
+	})
+
+	t.Run("MissingARPURL", func(t *testing.T) {
+		os.Unsetenv("ARP_URL")
+		os.Setenv("ARP_USERNAME", "test@example.com")
+		os.Setenv("ARP_PASSWORD", "testpass")
+		os.Setenv("OPENAI_API_KEY", "test-key")
+
+		_, err := LoadConfig()
+		if err == nil {
+			t.Error("Expected error for missing ARP_URL")
+		}
+		if !strings.Contains(err.Error(), "ARP_URL") {
+			t.Errorf("Expected error to mention ARP_URL, got: %v", err)
+		}
+	})
+
+	t.Run("MissingARPUsername", func(t *testing.T) {
+		os.Setenv("ARP_URL", "http://localhost:8080")
+		os.Unsetenv("ARP_USERNAME")
+		os.Setenv("ARP_PASSWORD", "testpass")
+		os.Setenv("OPENAI_API_KEY", "test-key")
+
+		_, err := LoadConfig()
+		if err == nil {
+			t.Error("Expected error for missing ARP_USERNAME")
+		}
+		if !strings.Contains(err.Error(), "ARP_USERNAME") {
+			t.Errorf("Expected error to mention ARP_USERNAME, got: %v", err)
+		}
+	})
+
+	t.Run("MissingARPPassword", func(t *testing.T) {
+		os.Setenv("ARP_URL", "http://localhost:8080")
+		os.Setenv("ARP_USERNAME", "test@example.com")
+		os.Unsetenv("ARP_PASSWORD")
+		os.Setenv("OPENAI_API_KEY", "test-key")
+
+		_, err := LoadConfig()
+		if err == nil {
+			t.Error("Expected error for missing ARP_PASSWORD")
+		}
+		if !strings.Contains(err.Error(), "ARP_PASSWORD") {
+			t.Errorf("Expected error to mention ARP_PASSWORD, got: %v", err)
+		}
+	})
+
+	t.Run("MissingOpenAIKey", func(t *testing.T) {
+		os.Setenv("ARP_URL", "http://localhost:8080")
+		os.Setenv("ARP_USERNAME", "test@example.com")
+		os.Setenv("ARP_PASSWORD", "testpass")
+		os.Unsetenv("OPENAI_API_KEY")
+
+		_, err := LoadConfig()
+		if err == nil {
+			t.Error("Expected error for missing OPENAI_API_KEY")
+		}
+		if !strings.Contains(err.Error(), "OPENAI_API_KEY") {
+			t.Errorf("Expected error to mention OPENAI_API_KEY, got: %v", err)
+		}
+	})
+
+	t.Run("InvalidTemperature", func(t *testing.T) {
+		os.Setenv("ARP_URL", "http://localhost:8080")
+		os.Setenv("ARP_USERNAME", "test@example.com")
+		os.Setenv("ARP_PASSWORD", "testpass")
+		os.Setenv("OPENAI_API_KEY", "test-key")
+		os.Setenv("OPENAI_TEMPERATURE", "invalid")
+
+		cfg, err := LoadConfig()
+		if err != nil {
+			t.Fatalf("LoadConfig failed: %v", err)
+		}
+
+		// Should use default value when parsing fails
+		if cfg.OpenAITemperature != 0.0 {
+			t.Errorf("Expected default OpenAITemperature 0.0 for invalid input, got %f", cfg.OpenAITemperature)
+		}
+	})
+}

+ 31 - 0
arp_agent/go.mod

@@ -0,0 +1,31 @@
+module gogs.dmsc.dev/arp/arp_agent
+
+go 1.25.3
+
+require (
+	github.com/bradleyjkemp/cupaloy/v2 v2.8.0
+	github.com/joho/godotenv v1.5.1
+	github.com/sashabaranov/go-openai v1.32.2
+	gogs.dmsc.dev/arp v0.0.0
+	gorm.io/gorm v1.31.1
+)
+
+require (
+	github.com/99designs/gqlgen v0.17.87 // indirect
+	github.com/agnivade/levenshtein v1.2.1 // indirect
+	github.com/davecgh/go-spew v1.1.1 // indirect
+	github.com/golang-jwt/jwt/v5 v5.3.1 // indirect
+	github.com/google/uuid v1.6.0 // indirect
+	github.com/jinzhu/inflection v1.0.0 // indirect
+	github.com/jinzhu/now v1.1.5 // indirect
+	github.com/mattn/go-sqlite3 v1.14.22 // indirect
+	github.com/pmezard/go-difflib v1.0.0 // indirect
+	github.com/sosodev/duration v1.3.1 // indirect
+	github.com/vektah/gqlparser/v2 v2.5.32 // indirect
+	golang.org/x/crypto v0.48.0 // indirect
+	golang.org/x/sync v0.19.0 // indirect
+	golang.org/x/text v0.34.0 // indirect
+	gorm.io/driver/sqlite v1.6.0 // indirect
+)
+
+replace gogs.dmsc.dev/arp => ../

+ 62 - 0
arp_agent/go.sum

@@ -0,0 +1,62 @@
+github.com/99designs/gqlgen v0.17.87 h1:pSnCIMhBQezAE8bc1GNmfdLXFmnWtWl1GRDFEE/nHP8=
+github.com/99designs/gqlgen v0.17.87/go.mod h1:fK05f1RqSNfQpd4CfW5qk/810Tqi4/56Wf6Nem0khAg=
+github.com/agnivade/levenshtein v1.2.1 h1:EHBY3UOn1gwdy/VbFwgo4cxecRznFk7fKWN1KOX7eoM=
+github.com/agnivade/levenshtein v1.2.1/go.mod h1:QVVI16kDrtSuwcpd0p1+xMC6Z/VfhtCyDIjcwga4/DU=
+github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
+github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
+github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q=
+github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE=
+github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oMMlVBbn9M=
+github.com/bradleyjkemp/cupaloy/v2 v2.8.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgryski/trifles v0.0.0-20230903005119-f50d829f2e54 h1:SG7nF6SRlWhcT7cNTs5R6Hk4V2lcmLz2NsG2VnInyNo=
+github.com/dgryski/trifles v0.0.0-20230903005119-f50d829f2e54/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA=
+github.com/go-viper/mapstructure/v2 v2.5.0 h1:vM5IJoUAy3d7zRSVtIwQgBj7BiWtMPfmPEgAXnvj1Ro=
+github.com/go-viper/mapstructure/v2 v2.5.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
+github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY=
+github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
+github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
+github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
+github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
+github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
+github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
+github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
+github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
+github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
+github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/sashabaranov/go-openai v1.32.2 h1:8z9PfYaLPbRzmJIYpwcWu6z3XU8F+RwVMF1QRSeSF2M=
+github.com/sashabaranov/go-openai v1.32.2/go.mod h1:lj5b/K+zjTSFxVLijLSTDZuP7adOgerWeFyZLUhAKRg=
+github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
+github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I=
+github.com/sosodev/duration v1.3.1 h1:qtHBDMQ6lvMQsL15g4aopM4HEfOaYuhWBw3NPTtlqq4=
+github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERAikUR6SDg=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
+github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
+github.com/vektah/gqlparser/v2 v2.5.32 h1:k9QPJd4sEDTL+qB4ncPLflqTJ3MmjB9SrVzJrawpFSc=
+github.com/vektah/gqlparser/v2 v2.5.32/go.mod h1:c1I28gSOVNzlfc4WuDlqU7voQnsqI6OG2amkBAFmgts=
+golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
+golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
+golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
+golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
+golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
+golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gorm.io/driver/sqlite v1.6.0 h1:WHRRrIiulaPiPFmDcod6prc4l2VGVWHz80KspNsxSfQ=
+gorm.io/driver/sqlite v1.6.0/go.mod h1:AO9V1qIQddBESngQUKWL9yoH93HIeA1X6V633rBwyT8=
+gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg=
+gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs=

+ 155 - 0
arp_agent/llm.go

@@ -0,0 +1,155 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+
+	"github.com/sashabaranov/go-openai"
+)
+
+// LLM is an OpenAI LLM wrapper with tool-calling support
+type LLM struct {
+	client      *openai.Client
+	model       string
+	temperature float32
+	maxTokens   int
+}
+
+// NewLLM creates a new LLM instance
+func NewLLM(apiKey, model string, temperature float32, baseURL string, maxTokens int) *LLM {
+	config := openai.DefaultConfig(apiKey)
+	if baseURL != "" {
+		config.BaseURL = baseURL
+	}
+
+	return &LLM{
+		client:      openai.NewClientWithConfig(config),
+		model:       model,
+		temperature: temperature,
+		maxTokens:   maxTokens,
+	}
+}
+
+// ChatCompletionRequest is a request for chat completion
+type ChatCompletionRequest struct {
+	Messages []openai.ChatCompletionMessage
+	Tools    []openai.Tool
+}
+
+// ChatCompletionResponse is a response from chat completion
+type ChatCompletionResponse struct {
+	Message openai.ChatCompletionMessage
+}
+
+// Chat sends a chat completion request
+func (l *LLM) Chat(ctx context.Context, messages []openai.ChatCompletionMessage, tools []openai.Tool) (*openai.ChatCompletionMessage, error) {
+	req := openai.ChatCompletionRequest{
+		Model:       l.model,
+		Messages:    messages,
+		Temperature: l.temperature,
+		MaxTokens:   l.maxTokens,
+	}
+
+	if len(tools) > 0 {
+		req.Tools = tools
+	}
+
+	resp, err := l.client.CreateChatCompletion(ctx, req)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create chat completion: %w", err)
+	}
+
+	if len(resp.Choices) == 0 {
+		return nil, fmt.Errorf("no response choices returned")
+	}
+
+	// Log warning if finish reason indicates an issue
+	choice := resp.Choices[0]
+	if choice.FinishReason == "length" {
+		// Model hit token limit - may have incomplete response
+		// This is common with reasoning models that need more tokens
+		return nil, fmt.Errorf("response truncated: model hit token limit (finish_reason: length). Consider increasing OPENAI_MAX_TOKENS (current: %d). Usage: prompt=%d, completion=%d, total=%d",
+			l.maxTokens, resp.Usage.PromptTokens, resp.Usage.CompletionTokens, resp.Usage.TotalTokens)
+	}
+
+	return &choice.Message, nil
+}
+
+// ConvertMCPToolsToOpenAI converts MCP tools to OpenAI tool format
+func ConvertMCPToolsToOpenAI(mcpTools []Tool) []openai.Tool {
+	tools := make([]openai.Tool, len(mcpTools))
+	for i, t := range mcpTools {
+		// Convert InputSchema to JSON schema format using map[string]interface{}
+		props := make(map[string]interface{})
+		for name, prop := range t.InputSchema.Properties {
+			propMap := map[string]interface{}{
+				"type":        prop.Type,
+				"description": prop.Description,
+			}
+			// For object types without explicit nested properties,
+			// allow additionalProperties so the LLM can pass any key-value pairs
+			// This is important for tools like 'query' and 'mutate' that accept
+			// arbitrary variables objects
+			if prop.Type == "object" {
+				propMap["additionalProperties"] = true
+			}
+			props[name] = propMap
+		}
+
+		// Build parameters map, omitting empty required array
+		params := map[string]interface{}{
+			"type":       t.InputSchema.Type,
+			"properties": props,
+		}
+		// Only include required if it has elements - empty slice marshals as null
+		if len(t.InputSchema.Required) > 0 {
+			params["required"] = t.InputSchema.Required
+		}
+
+		tools[i] = openai.Tool{
+			Type: openai.ToolTypeFunction,
+			Function: &openai.FunctionDefinition{
+				Name:        t.Name,
+				Description: t.Description,
+				Parameters:  params,
+			},
+		}
+	}
+	return tools
+}
+
+// ParseToolCall parses a tool call from the LLM response
+func ParseToolCall(toolCall openai.ToolCall) (string, map[string]interface{}, error) {
+	name := toolCall.Function.Name
+
+	var args map[string]interface{}
+	if err := json.Unmarshal([]byte(toolCall.Function.Arguments), &args); err != nil {
+		return name, nil, fmt.Errorf("failed to parse tool arguments: %w", err)
+	}
+
+	return name, args, nil
+}
+
+// TestConnection tests the connection to OpenAI API
+func (l *LLM) TestConnection(ctx context.Context) error {
+	// Simple test request - use enough tokens for reasoning models
+	// Reasoning models need more tokens for their thinking process
+	req := openai.ChatCompletionRequest{
+		Model: l.model,
+		Messages: []openai.ChatCompletionMessage{
+			{
+				Role:    openai.ChatMessageRoleUser,
+				Content: "Hello",
+			},
+		},
+		MaxTokens: 100,
+	}
+
+	_, err := l.client.CreateChatCompletion(ctx, req)
+	if err != nil {
+		return fmt.Errorf("failed to connect to OpenAI API: %w", err)
+	}
+
+	return nil
+}

+ 298 - 0
arp_agent/llm_test.go

@@ -0,0 +1,298 @@
+package main
+
+import (
+	"encoding/json"
+	"testing"
+
+	"github.com/sashabaranov/go-openai"
+)
+
+// TestLLM_ConvertMCPToolsToOpenAI tests the MCP to OpenAI tool conversion
+func TestLLM_ConvertMCPToolsToOpenAI(t *testing.T) {
+	tests := []struct {
+		name     string
+		mcpTools []Tool
+		wantLen  int
+	}{
+		{
+			name:     "EmptyTools",
+			mcpTools: []Tool{},
+			wantLen:  0,
+		},
+		{
+			name: "SingleTool",
+			mcpTools: []Tool{
+				{
+					Name:        "introspect",
+					Description: "Discover the GraphQL schema",
+					InputSchema: InputSchema{
+						Type: "object",
+						Properties: map[string]Property{
+							"typeName": {Type: "string", Description: "The type to introspect"},
+						},
+						Required:             []string{},
+						AdditionalProperties: false,
+					},
+				},
+			},
+			wantLen: 1,
+		},
+		{
+			name: "MultipleTools",
+			mcpTools: []Tool{
+				{
+					Name:        "query",
+					Description: "Execute a GraphQL query",
+					InputSchema: InputSchema{
+						Type: "object",
+						Properties: map[string]Property{
+							"query": {Type: "string", Description: "The GraphQL query"},
+						},
+						Required:             []string{"query"},
+						AdditionalProperties: false,
+					},
+				},
+				{
+					Name:        "mutate",
+					Description: "Execute a GraphQL mutation",
+					InputSchema: InputSchema{
+						Type: "object",
+						Properties: map[string]Property{
+							"mutation": {Type: "string", Description: "The GraphQL mutation"},
+						},
+						Required:             []string{"mutation"},
+						AdditionalProperties: false,
+					},
+				},
+			},
+			wantLen: 2,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			tools := ConvertMCPToolsToOpenAI(tt.mcpTools)
+			if len(tools) != tt.wantLen {
+				t.Errorf("Expected %d tools, got %d", tt.wantLen, len(tools))
+			}
+
+			// Verify tool conversion details
+			for i, tool := range tools {
+				if tool.Type != openai.ToolTypeFunction {
+					t.Errorf("Tool %d: Expected type %s, got %s", i, openai.ToolTypeFunction, tool.Type)
+				}
+				if tool.Function.Name != tt.mcpTools[i].Name {
+					t.Errorf("Tool %d: Expected name %s, got %s", i, tt.mcpTools[i].Name, tool.Function.Name)
+				}
+				if tool.Function.Description != tt.mcpTools[i].Description {
+					t.Errorf("Tool %d: Expected description %s, got %s", i, tt.mcpTools[i].Description, tool.Function.Description)
+				}
+			}
+		})
+	}
+}
+
+// TestLLM_ConvertMCPToolsToOpenAI_ObjectProperties tests that object-type properties
+// get additionalProperties: true to allow arbitrary key-value pairs
+func TestLLM_ConvertMCPToolsToOpenAI_ObjectProperties(t *testing.T) {
+	mcpTools := []Tool{
+		{
+			Name:        "query",
+			Description: "Execute a GraphQL query",
+			InputSchema: InputSchema{
+				Type: "object",
+				Properties: map[string]Property{
+					"query": {
+						Type:        "string",
+						Description: "The GraphQL query string",
+					},
+					"variables": {
+						Type:        "object",
+						Description: "Optional query variables as key-value pairs",
+					},
+				},
+				Required:             []string{"query"},
+				AdditionalProperties: false,
+			},
+		},
+	}
+
+	tools := ConvertMCPToolsToOpenAI(mcpTools)
+	if len(tools) != 1 {
+		t.Fatalf("Expected 1 tool, got %d", len(tools))
+	}
+
+	// Check that parameters don't have additionalProperties at top level
+	params := tools[0].Function.Parameters.(map[string]interface{})
+	if _, hasAdditionalProps := params["additionalProperties"]; hasAdditionalProps {
+		t.Error("Top-level parameters should NOT have additionalProperties field")
+	}
+
+	// Check that the variables property has additionalProperties: true
+	props := params["properties"].(map[string]interface{})
+	variablesProp, ok := props["variables"].(map[string]interface{})
+	if !ok {
+		t.Fatal("variables property not found")
+	}
+
+	if additionalProps, ok := variablesProp["additionalProperties"]; !ok {
+		t.Error("Object property 'variables' should have additionalProperties field")
+	} else if additionalProps != true {
+		t.Errorf("Object property 'variables' additionalProperties should be true, got %v", additionalProps)
+	}
+
+	// Check that string property does NOT have additionalProperties
+	queryProp, ok := props["query"].(map[string]interface{})
+	if !ok {
+		t.Fatal("query property not found")
+	}
+
+	if _, hasAdditionalProps := queryProp["additionalProperties"]; hasAdditionalProps {
+		t.Error("String property 'query' should NOT have additionalProperties field")
+	}
+}
+
+// TestLLM_ParseToolCall tests parsing tool calls from LLM responses
+func TestLLM_ParseToolCall(t *testing.T) {
+	tests := []struct {
+		name     string
+		toolCall openai.ToolCall
+		wantName string
+		wantArgs map[string]interface{}
+		wantErr  bool
+	}{
+		{
+			name: "ValidToolCall",
+			toolCall: openai.ToolCall{
+				ID: "call-123",
+				Function: openai.FunctionCall{
+					Name:      "query",
+					Arguments: `{"query": "{ users { email } }"}`,
+				},
+			},
+			wantName: "query",
+			wantArgs: map[string]interface{}{
+				"query": "{ users { email } }",
+			},
+			wantErr: false,
+		},
+		{
+			name: "EmptyArguments",
+			toolCall: openai.ToolCall{
+				ID: "call-456",
+				Function: openai.FunctionCall{
+					Name:      "introspect",
+					Arguments: `{}`,
+				},
+			},
+			wantName: "introspect",
+			wantArgs: map[string]interface{}{},
+			wantErr:  false,
+		},
+		{
+			name: "InvalidJSON",
+			toolCall: openai.ToolCall{
+				ID: "call-789",
+				Function: openai.FunctionCall{
+					Name:      "mutate",
+					Arguments: `invalid json`,
+				},
+			},
+			wantName: "mutate",
+			wantArgs: nil,
+			wantErr:  true,
+		},
+		{
+			name: "NestedArguments",
+			toolCall: openai.ToolCall{
+				ID: "call-abc",
+				Function: openai.FunctionCall{
+					Name:      "createTask",
+					Arguments: `{"title": "Test Task", "priority": "high", "assigneeId": "user-123"}`,
+				},
+			},
+			wantName: "createTask",
+			wantArgs: map[string]interface{}{
+				"title":      "Test Task",
+				"priority":   "high",
+				"assigneeId": "user-123",
+			},
+			wantErr: false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			name, args, err := ParseToolCall(tt.toolCall)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("ParseToolCall() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if name != tt.wantName {
+				t.Errorf("ParseToolCall() name = %v, want %v", name, tt.wantName)
+			}
+			if !tt.wantErr && args != nil {
+				// Compare args
+				argsJSON, _ := json.Marshal(args)
+				wantJSON, _ := json.Marshal(tt.wantArgs)
+				if string(argsJSON) != string(wantJSON) {
+					t.Errorf("ParseToolCall() args = %v, want %v", args, tt.wantArgs)
+				}
+			}
+		})
+	}
+}
+
+// TestLLM_ToolConversionSnapshot tests tool conversion with snapshot
+func TestLLM_ToolConversionSnapshot(t *testing.T) {
+	mcpTools := []Tool{
+		{
+			Name:        "introspect",
+			Description: "Discover the GraphQL schema structure",
+			InputSchema: InputSchema{
+				Type: "object",
+				Properties: map[string]Property{
+					"typeName": {
+						Type:        "string",
+						Description: "Optional type name to introspect",
+					},
+				},
+				Required:             []string{},
+				AdditionalProperties: false,
+			},
+		},
+		{
+			Name:        "query",
+			Description: "Execute a GraphQL query",
+			InputSchema: InputSchema{
+				Type: "object",
+				Properties: map[string]Property{
+					"query": {
+						Type:        "string",
+						Description: "The GraphQL query string",
+					},
+				},
+				Required:             []string{"query"},
+				AdditionalProperties: false,
+			},
+		},
+		{
+			Name:        "mutate",
+			Description: "Execute a GraphQL mutation",
+			InputSchema: InputSchema{
+				Type: "object",
+				Properties: map[string]Property{
+					"mutation": {
+						Type:        "string",
+						Description: "The GraphQL mutation string",
+					},
+				},
+				Required:             []string{"mutation"},
+				AdditionalProperties: false,
+			},
+		},
+	}
+
+	openaiTools := ConvertMCPToolsToOpenAI(mcpTools)
+	testSnapshotResult(t, "converted_tools", openaiTools)
+}

+ 237 - 0
arp_agent/main.go

@@ -0,0 +1,237 @@
+package main
+
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"fmt"
+	"log"
+	"net/http"
+	"os"
+	"os/signal"
+	"strings"
+	"syscall"
+	"time"
+
+	"github.com/joho/godotenv"
+)
+
+func main() {
+	// Load .env file if it exists
+	if err := godotenv.Load(); err != nil {
+		log.Println("No .env file found, using environment variables")
+	}
+
+	// Load configuration
+	cfg, err := LoadConfig()
+	if err != nil {
+		log.Fatalf("Failed to load configuration: %v", err)
+	}
+
+	// Test OpenAI connectivity
+	log.Printf("Testing connectivity to OpenAI API...")
+	llm := NewLLM(cfg.OpenAIKey, cfg.OpenAIModel, float32(cfg.OpenAITemperature), cfg.OpenAIBaseURL, cfg.OpenAIMaxTokens)
+	if err := llm.TestConnection(context.Background()); err != nil {
+		log.Fatalf("Failed to connect to OpenAI API: %v", err)
+	}
+	log.Printf("✓ Successfully connected to OpenAI API")
+
+	// Login to ARP server
+	log.Printf("Connecting to ARP server at %s...", cfg.ARPURL)
+	token, err := login(cfg.ARPURL, cfg.ARPUsername, cfg.ARPPassword)
+	if err != nil {
+		log.Fatalf("Failed to login: %v", err)
+	}
+	log.Printf("Successfully authenticated with ARP server")
+
+	// Connect to MCP server
+	log.Printf("Connecting to MCP server...")
+	mcpClient := NewMCPClient(cfg.ARPURL, token)
+	if err := mcpClient.Connect(); err != nil {
+		log.Fatalf("Failed to connect to MCP: %v", err)
+	}
+	defer mcpClient.Close()
+
+	// Initialize MCP
+	initResult, err := mcpClient.Initialize()
+	if err != nil {
+		log.Fatalf("Failed to initialize MCP: %v", err)
+	}
+	log.Printf("Connected to MCP server: %s v%s", initResult.ServerInfo.Name, initResult.ServerInfo.Version)
+
+	// Create and initialize agent
+	agent := NewAgent(llm, mcpClient, cfg)
+	if err := agent.Initialize(); err != nil {
+		log.Fatalf("Failed to initialize agent: %v", err)
+	}
+	log.Printf("Agent initialized successfully.")
+
+	// Setup event queues
+	agent.SetupQueues(cfg.MaxQueueSize)
+	log.Printf("Event queues initialized with capacity: %d", cfg.MaxQueueSize)
+
+	// List and subscribe to resources
+	log.Printf("Subscribing to ARP resources...")
+	resources, err := mcpClient.ListResources()
+	if err != nil {
+		log.Fatalf("Failed to list resources: %v", err)
+	}
+
+	log.Printf("Available resources: %v", resourceURIs(resources))
+	for _, resource := range resources {
+		log.Printf("  Subscribed to: %s", resource.URI)
+		if err := mcpClient.SubscribeResource(resource.URI); err != nil {
+			log.Printf("Warning: Failed to subscribe to %s: %v", resource.URI, err)
+		}
+	}
+
+	// Handle graceful shutdown
+	ctx, cancel := context.WithCancel(context.Background())
+
+	sigChan := make(chan os.Signal, 1)
+	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+
+	go func() {
+		<-sigChan
+		log.Println("Received shutdown signal, stopping...")
+		cancel()
+	}()
+
+	// Start the agent queue processor
+	agent.Start(ctx)
+	defer agent.Stop()
+
+	// Listen for events
+	log.Println()
+	log.Println("Listening for events. Press Ctrl+C to stop.")
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case event, ok := <-mcpClient.Notifications():
+			if !ok {
+				log.Println("Notification channel closed")
+				return
+			}
+			handleNotification(agent, event)
+		}
+	}
+}
+
+// login authenticates with the ARP server and returns a JWT token
+func login(baseURL, username, password string) (string, error) {
+	// Ensure URL has the /query endpoint
+	loginURL := strings.TrimSuffix(baseURL, "/")
+	if !strings.HasSuffix(loginURL, "/query") {
+		loginURL = loginURL + "/query"
+	}
+
+	query := `
+		mutation Login($email: String!, $password: String!) {
+			login(email: $email, password: $password) {
+				token
+				user {
+					id
+					email
+				}
+			}
+		}`
+
+	reqBody := map[string]interface{}{
+		"query": query,
+		"variables": map[string]interface{}{
+			"email":    username,
+			"password": password,
+		},
+	}
+
+	bodyBytes, err := json.Marshal(reqBody)
+	if err != nil {
+		return "", fmt.Errorf("failed to marshal request: %w", err)
+	}
+
+	req, err := http.NewRequest("POST", loginURL, bytes.NewReader(bodyBytes))
+	if err != nil {
+		return "", fmt.Errorf("failed to create request: %w", err)
+	}
+
+	req.Header.Set("Content-Type", "application/json")
+
+	client := &http.Client{Timeout: 30 * time.Second}
+	resp, err := client.Do(req)
+	if err != nil {
+		return "", fmt.Errorf("request failed: %w", err)
+	}
+	defer resp.Body.Close()
+
+	var result struct {
+		Data struct {
+			Login struct {
+				Token string `json:"token"`
+				User  struct {
+					ID    string `json:"id"`
+					Email string `json:"email"`
+				} `json:"user"`
+			} `json:"login"`
+		} `json:"data"`
+		Errors []struct {
+			Message string `json:"message"`
+		} `json:"errors"`
+	}
+
+	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
+		return "", fmt.Errorf("failed to parse response: %w", err)
+	}
+
+	if len(result.Errors) > 0 {
+		return "", fmt.Errorf("login failed: %s", result.Errors[0].Message)
+	}
+
+	if result.Data.Login.Token == "" {
+		return "", fmt.Errorf("no token received")
+	}
+
+	return result.Data.Login.Token, nil
+}
+
+// resourceURIs extracts URIs from resources for logging
+func resourceURIs(resources []Resource) []string {
+	uris := make([]string, len(resources))
+	for i, r := range resources {
+		uris[i] = r.URI
+	}
+	return uris
+}
+
+// handleNotification processes an MCP notification and queues it for processing
+func handleNotification(agent *Agent, event json.RawMessage) {
+	// Parse the notification
+	var notification JSONRPCNotification
+	if err := json.Unmarshal(event, &notification); err != nil {
+		log.Printf("Failed to parse notification: %v", err)
+		return
+	}
+
+	// Handle resource update notifications
+	if notification.Method == "notifications/resources/updated" {
+		params, ok := notification.Params.(map[string]interface{})
+		if !ok {
+			log.Printf("Invalid notification params")
+			return
+		}
+
+		uri, _ := params["uri"].(string)
+		contents, ok := params["contents"].(map[string]interface{})
+		if !ok {
+			log.Printf("Invalid notification contents")
+			return
+		}
+
+		text, _ := contents["text"].(string)
+		log.Printf("Received event from %s", uri)
+
+		// Queue the event for processing (non-blocking)
+		agent.QueueEvent(uri, json.RawMessage(text))
+	}
+}

+ 526 - 0
arp_agent/mcp_client.go

@@ -0,0 +1,526 @@
+package main
+
+import (
+	"bufio"
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"net/http"
+	"net/url"
+	"strings"
+	"sync"
+	"time"
+)
+
+// MCP Protocol constants
+const (
+	ProtocolVersion = "2024-11-05"
+)
+
+// JSON-RPC types
+type JSONRPCRequest struct {
+	JSONRPC string          `json:"jsonrpc"`
+	ID      interface{}     `json:"id,omitempty"`
+	Method  string          `json:"method"`
+	Params  json.RawMessage `json:"params,omitempty"`
+}
+
+type JSONRPCResponse struct {
+	JSONRPC string          `json:"jsonrpc"`
+	ID      interface{}     `json:"id,omitempty"`
+	Result  json.RawMessage `json:"result,omitempty"`
+	Error   *RPCError       `json:"error,omitempty"`
+}
+
+type RPCError struct {
+	Code    int         `json:"code"`
+	Message string      `json:"message"`
+	Data    interface{} `json:"data,omitempty"`
+}
+
+// MCP types
+type InitializeParams struct {
+	ProtocolVersion string             `json:"protocolVersion"`
+	Capabilities    ClientCapabilities `json:"capabilities"`
+	ClientInfo      ImplementationInfo `json:"clientInfo"`
+}
+
+type InitializeResult struct {
+	ProtocolVersion string             `json:"protocolVersion"`
+	Capabilities    ServerCapabilities `json:"capabilities"`
+	ServerInfo      ImplementationInfo `json:"serverInfo"`
+	Instructions    string             `json:"instructions,omitempty"`
+}
+
+type ClientCapabilities struct {
+	Experimental map[string]interface{} `json:"experimental,omitempty"`
+	Roots        *RootsCapability       `json:"roots,omitempty"`
+	Sampling     *SamplingCapability    `json:"sampling,omitempty"`
+}
+
+type RootsCapability struct {
+	ListChanged bool `json:"listChanged,omitempty"`
+}
+
+type SamplingCapability struct{}
+
+type ServerCapabilities struct {
+	Experimental map[string]interface{} `json:"experimental,omitempty"`
+	Tools        *ToolsCapability       `json:"tools,omitempty"`
+	Resources    *ResourcesCapability   `json:"resources,omitempty"`
+}
+
+type ToolsCapability struct {
+	ListChanged bool `json:"listChanged,omitempty"`
+}
+
+type ResourcesCapability struct {
+	Subscribe   bool `json:"subscribe,omitempty"`
+	ListChanged bool `json:"listChanged,omitempty"`
+}
+
+type ImplementationInfo struct {
+	Name    string `json:"name"`
+	Version string `json:"version"`
+}
+
+// Tool types
+type Tool struct {
+	Name        string      `json:"name"`
+	Description string      `json:"description"`
+	InputSchema InputSchema `json:"inputSchema"`
+}
+
+type InputSchema struct {
+	Type                 string              `json:"type"`
+	Properties           map[string]Property `json:"properties,omitempty"`
+	Required             []string            `json:"required,omitempty"`
+	AdditionalProperties bool                `json:"additionalProperties"`
+}
+
+type Property struct {
+	Type        string `json:"type"`
+	Description string `json:"description,omitempty"`
+}
+
+type ListToolsResult struct {
+	Tools []Tool `json:"tools"`
+}
+
+type CallToolParams struct {
+	Name      string                 `json:"name"`
+	Arguments map[string]interface{} `json:"arguments,omitempty"`
+}
+
+type CallToolResult struct {
+	Content []ContentBlock `json:"content"`
+	IsError bool           `json:"isError,omitempty"`
+}
+
+type ContentBlock struct {
+	Type string `json:"type"`
+	Text string `json:"text"`
+}
+
+// Resource types
+type Resource struct {
+	URI         string `json:"uri"`
+	Name        string `json:"name"`
+	Description string `json:"description,omitempty"`
+	MimeType    string `json:"mimeType,omitempty"`
+}
+
+type ListResourcesResult struct {
+	Resources []Resource `json:"resources"`
+}
+
+type SubscribeParams struct {
+	URI string `json:"uri"`
+}
+
+type UnsubscribeParams struct {
+	URI string `json:"uri"`
+}
+
+// Resource notification
+type ResourceUpdatedNotification struct {
+	URI      string           `json:"uri"`
+	Contents ResourceContents `json:"contents"`
+}
+
+type ResourceContents struct {
+	URI      string `json:"uri"`
+	MimeType string `json:"mimeType,omitempty"`
+	Text     string `json:"text,omitempty"`
+	Blob     string `json:"blob,omitempty"`
+}
+
+// JSON-RPC Notification
+type JSONRPCNotification struct {
+	JSONRPC string      `json:"jsonrpc"`
+	Method  string      `json:"method"`
+	Params  interface{} `json:"params,omitempty"`
+}
+
+// MCPClient is an MCP client for the ARP server
+type MCPClient struct {
+	baseURL    string
+	token      string
+	httpClient *http.Client
+	sseClient  *http.Client // Separate client for SSE (no timeout)
+
+	// SSE connection
+	sseResp   *http.Response
+	sseDone   chan struct{}
+	sseEvents chan json.RawMessage
+
+	// Message endpoint (received from SSE endpoint event)
+	messageEndpoint string
+
+	// Request ID counter
+	idCounter int
+	idMu      sync.Mutex
+
+	// Tools cache
+	tools []Tool
+
+	// Pending requests (ID -> response channel)
+	pending   map[interface{}]chan json.RawMessage
+	pendingMu sync.Mutex
+}
+
+// NewMCPClient creates a new MCP client
+func NewMCPClient(baseURL string, token string) *MCPClient {
+	return &MCPClient{
+		baseURL: baseURL,
+		token:   token,
+		httpClient: &http.Client{
+			Timeout: 30 * time.Second,
+		},
+		sseClient: &http.Client{
+			// No timeout for SSE - connection should stay open indefinitely
+			Timeout: 0,
+		},
+		sseDone:   make(chan struct{}),
+		sseEvents: make(chan json.RawMessage, 100),
+		pending:   make(map[interface{}]chan json.RawMessage),
+	}
+}
+
+// Connect establishes SSE connection to the MCP server
+func (c *MCPClient) Connect() error {
+	// Build SSE URL
+	sseURL := c.baseURL
+	if !strings.HasSuffix(sseURL, "/mcp") {
+		sseURL = strings.TrimSuffix(sseURL, "/")
+		sseURL = sseURL + "/mcp"
+	}
+
+	req, err := http.NewRequest("GET", sseURL, nil)
+	if err != nil {
+		return fmt.Errorf("failed to create SSE request: %w", err)
+	}
+
+	req.Header.Set("Accept", "text/event-stream")
+	req.Header.Set("Cache-Control", "no-cache")
+	req.Header.Set("Connection", "keep-alive")
+	if c.token != "" {
+		req.Header.Set("Authorization", "Bearer "+c.token)
+	}
+
+	resp, err := c.sseClient.Do(req)
+	if err != nil {
+		return fmt.Errorf("failed to connect to SSE: %w", err)
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		resp.Body.Close()
+		return fmt.Errorf("SSE connection failed with status: %d", resp.StatusCode)
+	}
+
+	c.sseResp = resp
+
+	// Start reading SSE events
+	go c.readSSE()
+
+	// Wait for endpoint event
+	select {
+	case event := <-c.sseEvents:
+		// The endpoint is sent as plain text, not JSON
+		// e.g., "/message?sessionId=123456789"
+		c.messageEndpoint = string(event)
+	case <-time.After(10 * time.Second):
+		return fmt.Errorf("timeout waiting for SSE endpoint event")
+	}
+
+	return nil
+}
+
+// readSSE reads SSE events from the response body
+func (c *MCPClient) readSSE() {
+	defer close(c.sseEvents)
+
+	scanner := bufio.NewScanner(c.sseResp.Body)
+	var eventType string
+	var eventData strings.Builder
+
+	for scanner.Scan() {
+		line := scanner.Text()
+
+		if strings.HasPrefix(line, "event:") {
+			eventType = strings.TrimSpace(strings.TrimPrefix(line, "event:"))
+			eventData.Reset()
+		} else if strings.HasPrefix(line, "data:") {
+			data := strings.TrimPrefix(line, "data:")
+			eventData.WriteString(data)
+		} else if line == "" && eventType != "" {
+			// End of event
+			data := strings.TrimSpace(eventData.String())
+
+			// Handle endpoint event specially
+			if eventType == "endpoint" {
+				select {
+				case c.sseEvents <- json.RawMessage(data):
+				default:
+				}
+			} else if eventType == "message" {
+				// Parse to check if it's a response (has ID) or notification
+				var msg struct {
+					ID interface{} `json:"id"`
+				}
+				if err := json.Unmarshal([]byte(data), &msg); err == nil && msg.ID != nil {
+					// JSON numbers are unmarshaled as float64, but we use int for IDs
+					// Convert float64 to int for matching
+					var idKey interface{} = msg.ID
+					if f, ok := msg.ID.(float64); ok {
+						idKey = int(f)
+					}
+					// It's a response - dispatch to pending request
+					c.pendingMu.Lock()
+					if ch, ok := c.pending[idKey]; ok {
+						ch <- json.RawMessage(data)
+						delete(c.pending, idKey)
+					}
+					c.pendingMu.Unlock()
+				} else {
+					// It's a notification - send to general events channel
+					select {
+					case c.sseEvents <- json.RawMessage(data):
+					default:
+					}
+				}
+			}
+
+			eventType = ""
+			eventData.Reset()
+		}
+	}
+}
+
+// Initialize sends the initialize request
+func (c *MCPClient) Initialize() (*InitializeResult, error) {
+	params := InitializeParams{
+		ProtocolVersion: ProtocolVersion,
+		Capabilities: ClientCapabilities{
+			Roots: &RootsCapability{ListChanged: false},
+		},
+		ClientInfo: ImplementationInfo{
+			Name:    "ARP Agent",
+			Version: "1.0.0",
+		},
+	}
+
+	result := &InitializeResult{}
+	if err := c.sendRequest("initialize", params, result); err != nil {
+		return nil, err
+	}
+
+	return result, nil
+}
+
+// ListTools discovers available tools
+func (c *MCPClient) ListTools() ([]Tool, error) {
+	result := &ListToolsResult{}
+	if err := c.sendRequest("tools/list", nil, result); err != nil {
+		return nil, err
+	}
+
+	c.tools = result.Tools
+	return result.Tools, nil
+}
+
+// CallTool executes a tool call
+func (c *MCPClient) CallTool(name string, arguments map[string]interface{}) (*CallToolResult, error) {
+	params := CallToolParams{
+		Name:      name,
+		Arguments: arguments,
+	}
+
+	result := &CallToolResult{}
+	if err := c.sendRequest("tools/call", params, result); err != nil {
+		return nil, err
+	}
+
+	return result, nil
+}
+
+// ListResources lists available resources
+func (c *MCPClient) ListResources() ([]Resource, error) {
+	result := &ListResourcesResult{}
+	if err := c.sendRequest("resources/list", nil, result); err != nil {
+		return nil, err
+	}
+	return result.Resources, nil
+}
+
+// SubscribeResource subscribes to a resource for notifications
+func (c *MCPClient) SubscribeResource(uri string) error {
+	params := SubscribeParams{URI: uri}
+	return c.sendRequest("resources/subscribe", params, nil)
+}
+
+// UnsubscribeResource unsubscribes from a resource
+func (c *MCPClient) UnsubscribeResource(uri string) error {
+	params := UnsubscribeParams{URI: uri}
+	return c.sendRequest("resources/unsubscribe", params, nil)
+}
+
+// Notifications returns a channel for receiving resource notifications
+func (c *MCPClient) Notifications() <-chan json.RawMessage {
+	return c.sseEvents
+}
+
+// Close closes the MCP client connection
+func (c *MCPClient) Close() error {
+	close(c.sseDone)
+	if c.sseResp != nil {
+		return c.sseResp.Body.Close()
+	}
+	return nil
+}
+
+// nextID generates a unique request ID
+func (c *MCPClient) nextID() int {
+	c.idMu.Lock()
+	defer c.idMu.Unlock()
+	c.idCounter++
+	return c.idCounter
+}
+
+// sendRequest sends a JSON-RPC request and waits for the response via SSE
+func (c *MCPClient) sendRequest(method string, params interface{}, result interface{}) error {
+	// Build request
+	id := c.nextID()
+	var paramsJSON json.RawMessage
+	if params != nil {
+		var err error
+		paramsJSON, err = json.Marshal(params)
+		if err != nil {
+			return fmt.Errorf("failed to marshal params: %w", err)
+		}
+	}
+
+	req := JSONRPCRequest{
+		JSONRPC: "2.0",
+		ID:      id,
+		Method:  method,
+		Params:  paramsJSON,
+	}
+
+	reqBody, err := json.Marshal(req)
+	if err != nil {
+		return fmt.Errorf("failed to marshal request: %w", err)
+	}
+
+	// Build message URL
+	messageURL := c.baseURL
+	if c.messageEndpoint != "" {
+		// Parse the endpoint URL - it may be relative or absolute
+		if strings.HasPrefix(c.messageEndpoint, "/") {
+			// Relative URL - parse it and merge with base URL
+			endpointURL, err := url.Parse(c.messageEndpoint)
+			if err != nil {
+				return fmt.Errorf("failed to parse endpoint URL: %w", err)
+			}
+			baseURL, err := url.Parse(c.baseURL)
+			if err != nil {
+				return fmt.Errorf("failed to parse base URL: %w", err)
+			}
+			// Merge the endpoint with the base URL (preserves query string)
+			baseURL.Path = endpointURL.Path
+			baseURL.RawQuery = endpointURL.RawQuery
+			messageURL = baseURL.String()
+		} else {
+			messageURL = c.messageEndpoint
+		}
+	}
+
+	// Register pending request before sending
+	respChan := make(chan json.RawMessage, 1)
+	c.pendingMu.Lock()
+	c.pending[id] = respChan
+	c.pendingMu.Unlock()
+
+	// Cleanup on return
+	defer func() {
+		c.pendingMu.Lock()
+		delete(c.pending, id)
+		c.pendingMu.Unlock()
+	}()
+
+	// Send HTTP POST request
+	httpReq, err := http.NewRequest("POST", messageURL, bytes.NewReader(reqBody))
+	if err != nil {
+		return fmt.Errorf("failed to create request: %w", err)
+	}
+
+	httpReq.Header.Set("Content-Type", "application/json")
+	if c.token != "" {
+		httpReq.Header.Set("Authorization", "Bearer "+c.token)
+	}
+
+	resp, err := c.httpClient.Do(httpReq)
+	if err != nil {
+		return fmt.Errorf("request failed: %w", err)
+	}
+	defer resp.Body.Close()
+
+	// Check HTTP status
+	if resp.StatusCode != http.StatusAccepted && resp.StatusCode != http.StatusOK {
+		body, _ := io.ReadAll(resp.Body)
+		return fmt.Errorf("request failed with status %d: %s", resp.StatusCode, string(body))
+	}
+
+	// Wait for response via SSE
+	select {
+	case respData := <-respChan:
+		// Parse response
+		var rpcResp JSONRPCResponse
+		if err := json.Unmarshal(respData, &rpcResp); err != nil {
+			return fmt.Errorf("failed to parse response: %w", err)
+		}
+
+		if rpcResp.Error != nil {
+			return fmt.Errorf("RPC error %d: %s", rpcResp.Error.Code, rpcResp.Error.Message)
+		}
+
+		// Parse result if provided
+		if result != nil && rpcResp.Result != nil {
+			if err := json.Unmarshal(rpcResp.Result, result); err != nil {
+				return fmt.Errorf("failed to parse result: %w", err)
+			}
+		}
+
+		return nil
+	case <-time.After(30 * time.Second):
+		return fmt.Errorf("timeout waiting for response")
+	case <-c.sseDone:
+		return fmt.Errorf("connection closed")
+	}
+}
+
+// GetTools returns the cached tools
+func (c *MCPClient) GetTools() []Tool {
+	return c.tools
+}

+ 57 - 0
arp_agent/testdata/snapshots/TestLLM_ToolConversionSnapshot

@@ -0,0 +1,57 @@
+converted_tools
+[
+  {
+    "function": {
+      "description": "Discover the GraphQL schema structure",
+      "name": "introspect",
+      "parameters": {
+        "properties": {
+          "typeName": {
+            "description": "Optional type name to introspect",
+            "type": "string"
+          }
+        },
+        "type": "object"
+      }
+    },
+    "type": "function"
+  },
+  {
+    "function": {
+      "description": "Execute a GraphQL query",
+      "name": "query",
+      "parameters": {
+        "properties": {
+          "query": {
+            "description": "The GraphQL query string",
+            "type": "string"
+          }
+        },
+        "required": [
+          "query"
+        ],
+        "type": "object"
+      }
+    },
+    "type": "function"
+  },
+  {
+    "function": {
+      "description": "Execute a GraphQL mutation",
+      "name": "mutate",
+      "parameters": {
+        "properties": {
+          "mutation": {
+            "description": "The GraphQL mutation string",
+            "type": "string"
+          }
+        },
+        "required": [
+          "mutation"
+        ],
+        "type": "object"
+      }
+    },
+    "type": "function"
+  }
+]

+ 349 - 0
arp_agent/testutil_test.go

@@ -0,0 +1,349 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/bradleyjkemp/cupaloy/v2"
+	"github.com/sashabaranov/go-openai"
+)
+
+var testSnapshotter = cupaloy.New(cupaloy.SnapshotSubdirectory("testdata/snapshots"))
+
+// testSystemPrompt is the system prompt for testing
+const testSystemPrompt = `You are an intelligent agent connected to an ARP (Agent-native ERP) platform via the Model Context Protocol (MCP).
+
+You have access to the following tools:
+- introspect: Discover the GraphQL schema structure
+- query: Execute GraphQL queries (read operations)
+- mutate: Execute GraphQL mutations (create/update/delete operations)
+
+When you receive events (task created, task updated, message added), you should:
+1. Understand the event context
+2. Take appropriate action using the available tools
+3. Respond concisely about what you did
+
+You can query for more information or make changes as needed. Be helpful and proactive.`
+
+// LLMInterface defines the interface for LLM operations
+type LLMInterface interface {
+	Chat(ctx context.Context, messages []openai.ChatCompletionMessage, tools []openai.Tool) (*openai.ChatCompletionMessage, error)
+}
+
+// MCPClientInterface defines the interface for MCP client operations
+type MCPClientInterface interface {
+	ListTools() ([]Tool, error)
+	CallTool(name string, args map[string]interface{}) (*CallToolResult, error)
+}
+
+// MockLLM is a mock implementation of the LLM for testing
+type MockLLM struct {
+	responses []*openai.ChatCompletionMessage
+	callCount int
+}
+
+// NewMockLLM creates a new mock LLM with predefined responses
+func NewMockLLM(responses []*openai.ChatCompletionMessage) *MockLLM {
+	return &MockLLM{responses: responses}
+}
+
+// Chat implements the LLMInterface
+func (m *MockLLM) Chat(ctx context.Context, messages []openai.ChatCompletionMessage, tools []openai.Tool) (*openai.ChatCompletionMessage, error) {
+	if m.callCount >= len(m.responses) {
+		return &openai.ChatCompletionMessage{
+			Role:    openai.ChatMessageRoleAssistant,
+			Content: "No more responses available",
+		}, nil
+	}
+	response := m.responses[m.callCount]
+	m.callCount++
+	return response, nil
+}
+
+// MockMCPClient is a mock implementation of the MCPClient for testing
+type MockMCPClient struct {
+	tools       []Tool
+	toolResults map[string]*CallToolResult
+}
+
+// NewMockMCPClient creates a new mock MCP client
+func NewMockMCPClient(tools []Tool) *MockMCPClient {
+	return &MockMCPClient{
+		tools:       tools,
+		toolResults: make(map[string]*CallToolResult),
+	}
+}
+
+// SetToolResult sets the result for a specific tool call
+func (m *MockMCPClient) SetToolResult(name string, result *CallToolResult) {
+	m.toolResults[name] = result
+}
+
+// ListTools implements MCPClientInterface
+func (m *MockMCPClient) ListTools() ([]Tool, error) {
+	return m.tools, nil
+}
+
+// CallTool implements MCPClientInterface
+func (m *MockMCPClient) CallTool(name string, args map[string]interface{}) (*CallToolResult, error) {
+	if result, ok := m.toolResults[name]; ok {
+		return result, nil
+	}
+	return &CallToolResult{
+		Content: []ContentBlock{{Type: "text", Text: "mock result"}},
+	}, nil
+}
+
+// TestAgent is an agent that uses interfaces for testing
+type TestAgent struct {
+	llm       LLMInterface
+	mcpClient MCPClientInterface
+	tools     []openai.Tool
+	// Event queues
+	taskQueue    *EventQueue
+	messageQueue *EventQueue
+	// Queue control
+	ctx    context.Context
+	cancel context.CancelFunc
+	wg     sync.WaitGroup
+}
+
+// NewTestAgent creates a new test agent with interfaces
+func NewTestAgent(llm LLMInterface, mcpClient MCPClientInterface) *TestAgent {
+	return &TestAgent{
+		llm:       llm,
+		mcpClient: mcpClient,
+	}
+}
+
+// SetupQueues initializes the event queues with the given capacity
+func (a *TestAgent) SetupQueues(maxQueueSize int) {
+	a.taskQueue = NewEventQueue("task", maxQueueSize)
+	a.messageQueue = NewEventQueue("message", maxQueueSize)
+}
+
+// QueueEvent adds an event to the appropriate queue based on its URI
+func (a *TestAgent) QueueEvent(uri string, data json.RawMessage) {
+	event := &QueuedEvent{
+		URI:       uri,
+		Data:      data,
+		Timestamp: time.Now(),
+	}
+
+	var queue *EventQueue
+	if strings.Contains(uri, "taskCreated") || strings.Contains(uri, "taskUpdated") || strings.Contains(uri, "taskDeleted") {
+		queue = a.taskQueue
+	} else if strings.Contains(uri, "messageAdded") {
+		queue = a.messageQueue
+	} else {
+		queue = a.taskQueue
+	}
+
+	queue.TryEnqueue(event)
+}
+
+// Start begins processing events from the queues
+func (a *TestAgent) Start(ctx context.Context) {
+	a.ctx, a.cancel = context.WithCancel(ctx)
+	a.wg.Add(1)
+	go a.processQueues()
+}
+
+// Stop gracefully stops the queue processor
+func (a *TestAgent) Stop() {
+	if a.cancel != nil {
+		a.cancel()
+	}
+	a.wg.Wait()
+}
+
+// processQueues is the main worker that processes events from both queues
+func (a *TestAgent) processQueues() {
+	defer a.wg.Done()
+
+	for {
+		select {
+		case <-a.ctx.Done():
+			return
+		case event := <-a.taskQueue.Channel():
+			a.ProcessEvent(a.ctx, event.URI, event.Data)
+		case event := <-a.messageQueue.Channel():
+			a.ProcessEvent(a.ctx, event.URI, event.Data)
+		}
+	}
+}
+
+// GetQueueStats returns current queue statistics
+func (a *TestAgent) GetQueueStats() QueueStats {
+	return QueueStats{
+		TaskQueueSize:    a.taskQueue.Len(),
+		MessageQueueSize: a.messageQueue.Len(),
+	}
+}
+
+// Initialize initializes the test agent
+func (a *TestAgent) Initialize() error {
+	mcpTools, err := a.mcpClient.ListTools()
+	if err != nil {
+		return err
+	}
+	a.tools = ConvertMCPToolsToOpenAI(mcpTools)
+	return nil
+}
+
+// ProcessEvent processes an event
+func (a *TestAgent) ProcessEvent(ctx context.Context, uri string, eventData json.RawMessage) error {
+	prompt := buildTestEventPrompt(uri, eventData)
+	messages := []openai.ChatCompletionMessage{
+		{Role: openai.ChatMessageRoleSystem, Content: testSystemPrompt},
+		{Role: openai.ChatMessageRoleUser, Content: prompt},
+	}
+	return a.processWithTools(ctx, messages)
+}
+
+// Run runs the agent interactively
+func (a *TestAgent) Run(ctx context.Context, userMessage string) (string, error) {
+	messages := []openai.ChatCompletionMessage{
+		{Role: openai.ChatMessageRoleSystem, Content: testSystemPrompt},
+		{Role: openai.ChatMessageRoleUser, Content: userMessage},
+	}
+
+	response, err := a.llm.Chat(ctx, messages, a.tools)
+	if err != nil {
+		return "", err
+	}
+
+	if len(response.ToolCalls) == 0 {
+		return response.Content, nil
+	}
+
+	// Handle tool calls
+	messages = append(messages, *response)
+	for _, toolCall := range response.ToolCalls {
+		name, args, err := ParseToolCall(toolCall)
+		if err != nil {
+			continue
+		}
+		result, err := a.mcpClient.CallTool(name, args)
+		if err != nil {
+			continue
+		}
+		messages = append(messages, openai.ChatCompletionMessage{
+			Role:       openai.ChatMessageRoleTool,
+			ToolCallID: toolCall.ID,
+			Content:    extractTextFromResult(result),
+		})
+	}
+
+	finalResponse, err := a.llm.Chat(ctx, messages, a.tools)
+	if err != nil {
+		return "", err
+	}
+	return finalResponse.Content, nil
+}
+
+func (a *TestAgent) processWithTools(ctx context.Context, messages []openai.ChatCompletionMessage) error {
+	response, err := a.llm.Chat(ctx, messages, a.tools)
+	if err != nil {
+		return err
+	}
+
+	if len(response.ToolCalls) == 0 {
+		return nil
+	}
+
+	messages = append(messages, *response)
+	for _, toolCall := range response.ToolCalls {
+		name, args, err := ParseToolCall(toolCall)
+		if err != nil {
+			continue
+		}
+		result, err := a.mcpClient.CallTool(name, args)
+		if err != nil {
+			continue
+		}
+		messages = append(messages, openai.ChatCompletionMessage{
+			Role:       openai.ChatMessageRoleTool,
+			ToolCallID: toolCall.ID,
+			Content:    extractTextFromResult(result),
+		})
+	}
+
+	return a.processWithTools(ctx, messages)
+}
+
+// buildTestEventPrompt builds a prompt from the event data
+func buildTestEventPrompt(uri string, eventData json.RawMessage) string {
+	eventType := "unknown"
+	if strings.Contains(uri, "taskCreated") {
+		eventType = "task created"
+	} else if strings.Contains(uri, "taskUpdated") {
+		eventType = "task updated"
+	} else if strings.Contains(uri, "taskDeleted") {
+		eventType = "task deleted"
+	} else if strings.Contains(uri, "messageAdded") {
+		eventType = "message added"
+	}
+
+	eventStr := "{}"
+	if len(eventData) > 0 {
+		eventStr = string(eventData)
+	}
+
+	return fmt.Sprintf(`A %s event was received.
+
+Event URI: %s
+Event Data: %s
+
+Please process this event appropriately.`, eventType, uri, eventStr)
+}
+
+// testNormalizeJSON normalizes JSON for snapshot comparison
+func testNormalizeJSON(jsonStr string) string {
+	var data interface{}
+	if err := json.Unmarshal([]byte(jsonStr), &data); err != nil {
+		return jsonStr
+	}
+	testNormalizeData(data)
+	bytes, _ := json.MarshalIndent(data, "", "  ")
+	return string(bytes)
+}
+
+// testNormalizeData recursively normalizes data structures
+func testNormalizeData(data interface{}) {
+	switch v := data.(type) {
+	case map[string]interface{}:
+		delete(v, "id")
+		delete(v, "ID")
+		delete(v, "createdAt")
+		delete(v, "updatedAt")
+		delete(v, "sentAt")
+		delete(v, "createdByID")
+		delete(v, "userId")
+		delete(v, "serviceId")
+		delete(v, "statusId")
+		delete(v, "assigneeId")
+		delete(v, "conversationId")
+		delete(v, "senderId")
+		delete(v, "password")
+		for _, val := range v {
+			testNormalizeData(val)
+		}
+	case []interface{}:
+		for _, item := range v {
+			testNormalizeData(item)
+		}
+	}
+}
+
+// testSnapshotResult captures a snapshot of the result
+func testSnapshotResult(t *testing.T, name string, response interface{}) {
+	jsonBytes, _ := json.MarshalIndent(response, "", "  ")
+	normalized := testNormalizeJSON(string(jsonBytes))
+	testSnapshotter.SnapshotT(t, name, normalized)
+}

+ 126 - 0
arp_cli/cmd/repl.go

@@ -0,0 +1,126 @@
+package cmd
+
+import (
+	"bufio"
+	"context"
+	"fmt"
+	"os"
+	"strings"
+
+	"github.com/urfave/cli/v3"
+)
+
+// REPL represents a Read-Eval-Print Loop for interactive CLI usage
+type REPL struct {
+	app     *cli.Command
+	prompt  string
+	history []string
+}
+
+// NewREPL creates a new REPL instance
+func NewREPL(app *cli.Command) *REPL {
+	return &REPL{
+		app:    app,
+		prompt: "arp> ",
+	}
+}
+
+// Run starts the REPL loop
+func (r *REPL) Run(ctx context.Context) error {
+	scanner := bufio.NewScanner(os.Stdin)
+
+	fmt.Println("ARP CLI - Interactive Mode")
+	fmt.Println("Type 'help' for available commands, 'exit' or 'quit' to leave.")
+	fmt.Println()
+
+	for {
+		// Show prompt
+		fmt.Print(r.prompt)
+
+		// Read line
+		if !scanner.Scan() {
+			// EOF (Ctrl+D)
+			fmt.Println()
+			break
+		}
+
+		line := strings.TrimSpace(scanner.Text())
+		if line == "" {
+			continue
+		}
+
+		// Check for exit commands
+		if line == "exit" || line == "quit" || line == "q" {
+			fmt.Println("Goodbye!")
+			break
+		}
+
+		// Parse the line into args
+		args, err := parseArgs(line)
+		if err != nil {
+			fmt.Fprintf(os.Stderr, "Error parsing command: %v\n", err)
+			continue
+		}
+
+		// Prepend the app name to match CLI expectations
+		args = append([]string{"arp_cli"}, args...)
+
+		// Execute the command
+		if err := r.app.Run(ctx, args); err != nil {
+			fmt.Fprintf(os.Stderr, "Error: %v\n", err)
+		}
+
+		// Store in history
+		r.history = append(r.history, line)
+	}
+
+	if err := scanner.Err(); err != nil {
+		return fmt.Errorf("reading input: %w", err)
+	}
+
+	return nil
+}
+
+// parseArgs parses a command line string into arguments
+// Handles quoted strings and basic escaping
+func parseArgs(line string) ([]string, error) {
+	var args []string
+	var current strings.Builder
+	inQuote := false
+	quoteChar := rune(0)
+
+	for i, ch := range line {
+		switch {
+		case ch == '\\' && i+1 < len(line):
+			// Handle escape sequences
+			next := rune(line[i+1])
+			if next == '"' || next == '\'' || next == '\\' || next == ' ' {
+				current.WriteRune(next)
+				// Skip the next character
+				line = line[:i] + line[i+1:]
+			} else {
+				current.WriteRune(ch)
+			}
+		case (ch == '"' || ch == '\'') && !inQuote:
+			inQuote = true
+			quoteChar = ch
+		case ch == quoteChar && inQuote:
+			inQuote = false
+			quoteChar = 0
+		case ch == ' ' && !inQuote:
+			if current.Len() > 0 {
+				args = append(args, current.String())
+				current.Reset()
+			}
+		default:
+			current.WriteRune(ch)
+		}
+	}
+
+	// Add the last argument
+	if current.Len() > 0 {
+		args = append(args, current.String())
+	}
+
+	return args, nil
+}

+ 24 - 0
arp_cli/cmd/task.go

@@ -370,6 +370,30 @@ func taskCreate(ctx context.Context, cmd *cli.Command) error {
 		}
 	}
 
+	// Prompt for assignee (optional)
+	if assignee == "" {
+		prompt := &survey.Input{Message: "Assignee user ID (optional, press Enter to skip):"}
+		if err := survey.AskOne(prompt, &assignee); err != nil {
+			return err
+		}
+	}
+
+	// Prompt for status (optional)
+	if status == "" {
+		prompt := &survey.Input{Message: "Status ID (optional, press Enter to skip):"}
+		if err := survey.AskOne(prompt, &status); err != nil {
+			return err
+		}
+	}
+
+	// Prompt for due date (optional)
+	if dueDate == "" {
+		prompt := &survey.Input{Message: "Due date (optional, press Enter to skip):"}
+		if err := survey.AskOne(prompt, &dueDate); err != nil {
+			return err
+		}
+	}
+
 	c := client.New(cfg.ServerURL)
 	c.SetToken(cfg.Token)
 

+ 30 - 9
arp_cli/main.go

@@ -25,8 +25,28 @@ func main() {
 		cancel()
 	}()
 
-	// Build and run the CLI
-	app := &cli.Command{
+	// Build the CLI app
+	app := buildApp()
+
+	// If no arguments provided (only program name), start REPL mode
+	if len(os.Args) <= 1 {
+		repl := cmd.NewREPL(app)
+		if err := repl.Run(ctx); err != nil {
+			fmt.Fprintf(os.Stderr, "Error: %v\n", err)
+			os.Exit(1)
+		}
+		return
+	}
+
+	// Otherwise, run single command mode
+	if err := app.Run(ctx, os.Args); err != nil {
+		fmt.Fprintf(os.Stderr, "Error: %v\n", err)
+		os.Exit(1)
+	}
+}
+
+func buildApp() *cli.Command {
+	return &cli.Command{
 		Name:  "arp_cli",
 		Usage: "Command-line interface for ARP (Agent-native ERP)",
 		Description: `ARP CLI is a command-line tool for interacting with ARP GraphQL servers.
@@ -35,7 +55,10 @@ It provides CRUD operations for Services, Users, Notes, Tasks, Messages,
 Roles, and Permissions. Real-time updates are available via WebSocket subscriptions
 for Tasks and Messages.
 
-First, login to an ARP server using 'arp_cli login' to store your credentials.
+When run without arguments, it starts an interactive REPL mode.
+Use 'exit' or 'quit' to leave the REPL, or Ctrl+D.
+
+First, login to an ARP server using 'login' to store your credentials.
 Then use the various commands to manage your ARP data.`,
 		Version: "1.0.0",
 		Flags: []cli.Flag{
@@ -53,7 +76,6 @@ Then use the various commands to manage your ARP data.`,
 			},
 		},
 		Commands: []*cli.Command{
-			cmd.RootCommand(),
 			cmd.LoginCommand(),
 			cmd.ConfigCommand(),
 			cmd.ServiceCommand(),
@@ -68,10 +90,9 @@ Then use the various commands to manage your ARP data.`,
 			// Store global flags in context for later use
 			return ctx, nil
 		},
-	}
-
-	if err := app.Run(ctx, os.Args); err != nil {
-		fmt.Fprintf(os.Stderr, "Error: %v\n", err)
-		os.Exit(1)
+		Action: func(ctx context.Context, cmd *cli.Command) error {
+			// Show help when no subcommand is provided
+			return cli.ShowAppHelp(cmd)
+		},
 	}
 }

+ 13 - 0
graph/converters.go

@@ -48,12 +48,25 @@ func convertUsers(users []models.User) []*model.User {
 func convertNote(n models.Note) *model.Note {
 	userID := strconv.FormatUint(uint64(n.UserID), 10)
 	serviceID := strconv.FormatUint(uint64(n.ServiceID), 10)
+
+	var user *model.User
+	if n.User.ID > 0 {
+		user = convertUser(n.User)
+	}
+
+	var service *model.Service
+	if n.Service.ID > 0 {
+		service = convertService(n.Service)
+	}
+
 	return &model.Note{
 		ID:        strconv.FormatUint(uint64(n.ID), 10),
 		Title:     n.Title,
 		Content:   n.Content,
 		UserID:    userID,
+		User:      user,
 		ServiceID: serviceID,
+		Service:   service,
 		CreatedAt: n.CreatedAt.String(),
 		UpdatedAt: n.UpdatedAt.String(),
 	}

+ 15 - 11
graph/resolver.go

@@ -29,11 +29,11 @@ type MessageEvent struct {
 type Resolver struct {
 	DB *gorm.DB
 
-	// Pub/Sub channels for subscriptions
-	taskSubscribers   map[uint]chan TaskEvent // keyed by user ID
+	// Pub/Sub channels for subscriptions (slice of channels per user for multiple subscriptions)
+	taskSubscribers   map[uint][]chan TaskEvent // keyed by user ID
 	taskSubscribersMu sync.RWMutex
 
-	messageSubscribers   map[uint]chan MessageEvent // keyed by user ID
+	messageSubscribers   map[uint][]chan MessageEvent // keyed by user ID
 	messageSubscribersMu sync.RWMutex
 }
 
@@ -41,22 +41,23 @@ type Resolver struct {
 func NewResolver(db *gorm.DB) *Resolver {
 	return &Resolver{
 		DB:                 db,
-		taskSubscribers:    make(map[uint]chan TaskEvent),
-		messageSubscribers: make(map[uint]chan MessageEvent),
+		taskSubscribers:    make(map[uint][]chan TaskEvent),
+		messageSubscribers: make(map[uint][]chan MessageEvent),
 	}
 }
 
 // SubscribeToTasks registers a user for task events and returns their event channel
+// Each subscription gets its own channel to avoid competition between goroutines
 func (r *Resolver) SubscribeToTasks(userID uint) <-chan TaskEvent {
 	r.taskSubscribersMu.Lock()
 	defer r.taskSubscribersMu.Unlock()
 
 	ch := make(chan TaskEvent, 10)
-	r.taskSubscribers[userID] = ch
+	r.taskSubscribers[userID] = append(r.taskSubscribers[userID], ch)
 	return ch
 }
 
-// PublishTaskEvent sends a task event to the assignee
+// PublishTaskEvent sends a task event to all subscribers for the assignee
 func (r *Resolver) PublishTaskEvent(task *model.Task, assigneeID *uint, eventType string) {
 	if assigneeID == nil {
 		return // No assignee, no one to notify
@@ -71,7 +72,8 @@ func (r *Resolver) PublishTaskEvent(task *model.Task, assigneeID *uint, eventTyp
 	r.taskSubscribersMu.RLock()
 	defer r.taskSubscribersMu.RUnlock()
 
-	if ch, ok := r.taskSubscribers[event.UserID]; ok {
+	// Send to all channels for this user
+	for _, ch := range r.taskSubscribers[event.UserID] {
 		select {
 		case ch <- event:
 		default:
@@ -81,16 +83,17 @@ func (r *Resolver) PublishTaskEvent(task *model.Task, assigneeID *uint, eventTyp
 }
 
 // SubscribeToMessages registers a user for message events and returns their event channel
+// Each subscription gets its own channel to avoid competition between goroutines
 func (r *Resolver) SubscribeToMessages(userID uint) <-chan MessageEvent {
 	r.messageSubscribersMu.Lock()
 	defer r.messageSubscribersMu.Unlock()
 
 	ch := make(chan MessageEvent, 10)
-	r.messageSubscribers[userID] = ch
+	r.messageSubscribers[userID] = append(r.messageSubscribers[userID], ch)
 	return ch
 }
 
-// PublishMessageEvent sends a message event to all receivers
+// PublishMessageEvent sends a message event to all subscribers who are receivers
 func (r *Resolver) PublishMessageEvent(message *model.Message, receiverIDs []uint) {
 	event := MessageEvent{
 		Message:     message,
@@ -101,7 +104,8 @@ func (r *Resolver) PublishMessageEvent(message *model.Message, receiverIDs []uin
 	defer r.messageSubscribersMu.RUnlock()
 
 	for _, userID := range receiverIDs {
-		if ch, ok := r.messageSubscribers[userID]; ok {
+		// Send to all channels for this user
+		for _, ch := range r.messageSubscribers[userID] {
 			select {
 			case ch <- event:
 			default:

+ 27 - 13
mcp/handler.go

@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 
 	"gogs.dmsc.dev/arp/auth"
+	"gogs.dmsc.dev/arp/graph"
 	"gogs.dmsc.dev/arp/mcp/tools"
 )
 
@@ -221,8 +222,25 @@ func (s *Server) handleResourcesSubscribe(ctx context.Context, req *JSONRPCReque
 	session.Subscriptions[params.URI] = cancel
 	session.SubsMu.Unlock()
 
-	// Start the subscription based on URI
-	go s.runSubscription(subCtx, params.URI, user.ID, session)
+	// Subscribe synchronously BEFORE starting the goroutine to avoid race condition
+	// where events are published before the subscription channel is registered
+	var eventChan interface{}
+	switch params.URI {
+	case "graphql://subscription/taskCreated", "graphql://subscription/taskUpdated", "graphql://subscription/taskDeleted":
+		eventChan = s.resolver.SubscribeToTasks(user.ID)
+	case "graphql://subscription/messageAdded":
+		eventChan = s.resolver.SubscribeToMessages(user.ID)
+	default:
+		cancel()
+		return &JSONRPCResponse{
+			JSONRPC: "2.0",
+			ID:      req.ID,
+			Error:   &RPCError{Code: -32602, Message: "Unknown subscription URI"},
+		}
+	}
+
+	// Start the subscription based on URI (pass the already-created channel)
+	go s.runSubscription(subCtx, params.URI, user.ID, session, eventChan)
 
 	return &JSONRPCResponse{
 		JSONRPC: "2.0",
@@ -259,23 +277,21 @@ func (s *Server) handleResourcesUnsubscribe(ctx context.Context, req *JSONRPCReq
 }
 
 // runSubscription handles the actual subscription event streaming
-func (s *Server) runSubscription(ctx context.Context, uri string, userID uint, session *Session) {
+func (s *Server) runSubscription(ctx context.Context, uri string, userID uint, session *Session, eventChan interface{}) {
 	switch uri {
 	case "graphql://subscription/taskCreated":
-		s.streamTaskEvents(ctx, userID, session, "created")
+		s.streamTaskEvents(ctx, userID, session, "created", eventChan.(<-chan graph.TaskEvent))
 	case "graphql://subscription/taskUpdated":
-		s.streamTaskEvents(ctx, userID, session, "updated")
+		s.streamTaskEvents(ctx, userID, session, "updated", eventChan.(<-chan graph.TaskEvent))
 	case "graphql://subscription/taskDeleted":
-		s.streamTaskEvents(ctx, userID, session, "deleted")
+		s.streamTaskEvents(ctx, userID, session, "deleted", eventChan.(<-chan graph.TaskEvent))
 	case "graphql://subscription/messageAdded":
-		s.streamMessageEvents(ctx, userID, session)
+		s.streamMessageEvents(ctx, userID, session, eventChan.(<-chan graph.MessageEvent))
 	}
 }
 
 // streamTaskEvents streams task events to the session
-func (s *Server) streamTaskEvents(ctx context.Context, userID uint, session *Session, eventType string) {
-	eventChan := s.resolver.SubscribeToTasks(userID)
-
+func (s *Server) streamTaskEvents(ctx context.Context, userID uint, session *Session, eventType string, eventChan <-chan graph.TaskEvent) {
 	for {
 		select {
 		case <-ctx.Done():
@@ -298,9 +314,7 @@ func (s *Server) streamTaskEvents(ctx context.Context, userID uint, session *Ses
 }
 
 // streamMessageEvents streams message events to the session
-func (s *Server) streamMessageEvents(ctx context.Context, userID uint, session *Session) {
-	eventChan := s.resolver.SubscribeToMessages(userID)
-
+func (s *Server) streamMessageEvents(ctx context.Context, userID uint, session *Session, eventChan <-chan graph.MessageEvent) {
 	for {
 		select {
 		case <-ctx.Done():