|
|
7 uur geleden | |
|---|---|---|
| .. | ||
| testdata | 3 dagen geleden | |
| .env.example | 3 dagen geleden | |
| README.MD | 7 uur geleden | |
| agent.go | 3 dagen geleden | |
| agent_test.go | 3 dagen geleden | |
| arp_agent | 3 dagen geleden | |
| config.go | 3 dagen geleden | |
| config_test.go | 3 dagen geleden | |
| go.mod | 3 dagen geleden | |
| go.sum | 3 dagen geleden | |
| llm.go | 3 dagen geleden | |
| llm_test.go | 3 dagen geleden | |
| main.go | 3 dagen geleden | |
| mcp_client.go | 3 dagen geleden | |
| testutil_test.go | 3 dagen geleden | |
The ARP Agent connects to an ARP server and:
/mcp endpointtools/list protocolpip install -r requirements.txt
poetry install
openai - OpenAI API client for LLM interactionsrequests - HTTP client for GraphQL and MCP communicationsseclient-py - Server-Sent Events client for MCPpython-dotenv - Environment variable managementCopy the example environment file and configure your credentials:
cp .env.example .env
Edit .env with your settings:
# ARP Server Configuration
ARP_URL=http://localhost:8080
ARP_USERNAME=your-email@example.com
ARP_PASSWORD=your-password
# OpenAI Configuration
OPENAI_API_KEY=sk-your-openai-api-key
OPENAI_MODEL=gpt-4
OPENAI_TEMPERATURE=0.0
# Optional: Custom OpenAI endpoint (for local models, etc.)
# OPENAI_BASE_URL=http://localhost:11434/v1
| Variable | Description |
|---|---|
ARP_URL |
Base URL of the ARP server |
ARP_USERNAME |
Your ARP login email |
ARP_PASSWORD |
Your ARP password |
OPENAI_API_KEY |
OpenAI API key for LLM |
| Variable | Description | Default |
|---|---|---|
OPENAI_MODEL |
Model to use | gpt-4 |
OPENAI_TEMPERATURE |
Sampling temperature | 0.0 |
OPENAI_BASE_URL |
Custom OpenAI-compatible endpoint | OpenAI API |
python run_arp_agent.py
Testing connectivity to OpenAI API (api.openai.com)...
✓ Successfully connected to OpenAI API (api.openai.com)
Connecting to ARP server at http://localhost:8080...
Successfully authenticated with ARP server
Connecting to MCP server...
Discovered 3 MCP tools: ['introspect', 'query', 'mutate']
Initializing LLM agent...
Agent initialized successfully.
Subscribing to ARP resources...
Available resources: ['taskCreated', 'taskUpdated', 'taskDeleted', 'messageAdded']
Subscribed to: graphql://subscription/taskCreated
Subscribed to: graphql://subscription/taskUpdated
Subscribed to: graphql://subscription/taskDeleted
Subscribed to: graphql://subscription/messageAdded
Listening for events. Press Ctrl+C to stop.
┌─────────────────┐ GraphQL Login ┌─────────────────┐
│ ARP Agent │ ──────────────────────► │ ARP Server │
│ │ ◄────────────────────── │ │
│ ┌───────────┐ │ JWT Token │ │
│ │ LLM │ │ │ ┌───────────┐ │
│ └───────────┘ │ SSE Connect │ │ MCP │ │
│ │ │ ──────────────────────► │ │ Server │ │
│ ▼ │ /mcp endpoint │ └───────────┘ │
│ ┌───────────┐ │ │ │ │
│ │MCP Client │◄─┼──── Tool Discovery │ │ │
│ └───────────┘ │ Tool Calls │ ▼ │
│ │ │ Notifications │ ┌───────────┐ │
│ ▼ │ │ │ GraphQL │ │
│ ┌───────────┐ │ │ │ Engine │ │
│ │ Events │◄─┼─────────────────────────┼──┤ │ │
│ └───────────┘ │ Real-time │ └───────────┘ │
└─────────────────┘ Subscriptions └─────────────────┘
The agent uses the Model Context Protocol (MCP) to communicate with the ARP server:
/mcp endpoint via Server-Sent Eventsendpoint eventinitialize request with protocol version and client infotools/list to discover available toolsresources/subscribe for real-time event streamsAuthentication is handled via JWT tokens:
login mutation with email/passwordAuthorization: Bearer <token>)2024-11-05The ARP MCP server exposes three tools:
introspectDiscover the GraphQL schema - types, fields, queries, mutations.
# Get full schema
result = mcp_client.call_tool("introspect", {})
# Get specific type
result = mcp_client.call_tool("introspect", {"typeName": "User"})
queryExecute GraphQL queries (read operations).
# Query users
result = mcp_client.call_tool("query", {
"query": "{ users { id email roles { name } } }"
})
# Query with variables
result = mcp_client.call_tool("query", {
"query": "query User($id: ID!) { user(id: $id) { id email } }",
"variables": {"id": "1"}
})
mutateExecute GraphQL mutations (create/update/delete operations).
# Create a task
result = mcp_client.call_tool("mutate", {
"mutation": """
mutation CreateTask($input: NewTask!) {
createTask(input: $input) { id title }
}
""",
"variables": {
"input": {
"title": "New Task",
"content": "Task description",
"createdById": "1"
}
}
})
# Delete a note
result = mcp_client.call_tool("mutate", {
"mutation": "mutation DeleteNote($id: ID!) { deleteNote(id: $id) }",
"variables": {"id": "123"}
})
The ARP platform supports workflows - configurable, DAG-based process automation that coordinates tasks across agents and users. Workflows enable you to define multi-step processes with dependencies, parallel execution, and automatic task creation.
| Concept | Description |
|---|---|
| WorkflowTemplate | Admin-defined workflow definition (JSON DAG structure) |
| WorkflowInstance | A running instance of a workflow template |
| WorkflowNode | A single step in a workflow instance (maps to a Task) |
| WorkflowEdge | Dependency relationship between nodes |
| Type | Description |
|---|---|
task |
Creates and assigns a task to a user |
condition |
Conditional branching based on workflow context |
parallel |
Fork into multiple concurrent branches |
join |
Wait for multiple branches to complete |
trigger |
External event trigger (webhook, schedule, etc.) |
pending → ready → running → completed
└── failed → (retry or abort)
└── skipped
Workflows are defined as JSON DAGs (Directed Acyclic Graphs):
{
"nodes": {
"start": {
"type": "task",
"title": "Initial Review",
"content": "Review the submitted request",
"assignee": "reviewer@example.com",
"dependsOn": []
},
"parallel_analysis": {
"type": "parallel",
"title": "Parallel Analysis",
"content": "Run multiple analyses in parallel",
"dependsOn": ["start"]
},
"technical_review": {
"type": "task",
"title": "Technical Review",
"content": "Review technical aspects",
"assignee": "tech@example.com",
"dependsOn": ["parallel_analysis"]
},
"business_review": {
"type": "task",
"title": "Business Review",
"content": "Review business impact",
"assignee": "business@example.com",
"dependsOn": ["parallel_analysis"]
},
"join_reviews": {
"type": "join",
"title": "Join Reviews",
"content": "Wait for all reviews to complete",
"dependsOn": ["technical_review", "business_review"]
},
"final_approval": {
"type": "task",
"title": "Final Approval",
"content": "Make final decision",
"assignee": "approver@example.com",
"dependsOn": ["join_reviews"]
}
}
}
Use the mutate tool to create workflow templates and start instances:
# Create a workflow template
result = mcp_client.call_tool("mutate", {
"mutation": """
mutation CreateWorkflowTemplate($input: NewWorkflowTemplate!) {
createWorkflowTemplate(input: $input) {
id
name
isActive
}
}
""",
"variables": {
"input": {
"name": "Approval Process",
"description": "Multi-step approval workflow",
"definition": '{"nodes": {...}}',
"isActive": True
}
}
})
# Start a workflow instance
result = mcp_client.call_tool("mutate", {
"mutation": """
mutation StartWorkflow($templateId: ID!, $input: StartWorkflowInput!) {
startWorkflow(templateId: $templateId, input: $input) {
id
status
createdAt
}
}
""",
"variables": {
"templateId": "1",
"input": {
"serviceId": "5",
"context": '{"requestId": "REQ-123"}'
}
}
})
# List all workflow templates
result = mcp_client.call_tool("query", {
"query": """
{
workflowTemplates {
id
name
description
isActive
}
}
"""
})
# Get workflow instance status
result = mcp_client.call_tool("query", {
"query": """
query WorkflowInstance($id: ID!) {
workflowInstance(id: $id) {
id
status
context
service { id name }
template { name }
}
}
""",
"variables": {"id": "1"}
})
Agents can interact with workflows through MCP tools:
Example agent workflow handling:
# Agent receives task completion event
# Check if task is part of a workflow
result = mcp_client.call_tool("query", {
"query": """
query TaskWorkflow($taskId: ID!) {
task(id: $taskId) {
id
title
workflowNodes {
id
workflowInstance {
id
status
template { name }
}
}
}
}
""",
"variables": {"taskId": task_id}
})
# If task is part of workflow, check downstream nodes
# Agent can proactively notify next assignees or take actions
from llm_agents.mcp_client import login_and_create_mcp_client
# Login and create client
client = login_and_create_mcp_client(
url="http://localhost:8080",
username="admin@example.com",
password="secret123"
)
# Connect to MCP server
client.connect()
client.initialize()
# Discover tools
tools = client.list_tools()
print(f"Available tools: {[t.name for t in tools]}")
# Call tools
users = client.call_tool("query", {"query": "{ users { id email } }"})
print(users)
# Subscribe to resources
client.subscribe_resource("graphql://subscription/taskCreated")
# Listen for notifications
def on_notification(data):
print(f"Received: {data}")
client.listen_for_notifications(on_notification)
# Cleanup
client.close()
from llm_agents import Agent, ChatLLM
from llm_agents.mcp_client import login_and_create_mcp_client
# Create authenticated MCP client
mcp_client = login_and_create_mcp_client(
url="http://localhost:8080",
username="admin@example.com",
password="secret123"
)
# Connect and initialize
mcp_client.connect()
mcp_client.initialize()
mcp_client.list_tools()
# Create agent with MCP client
llm = ChatLLM() # Uses OPENAI_API_KEY from environment
agent = Agent(llm=llm, mcp_client=mcp_client)
# Run the agent
result = agent.run("List all users and their roles")
print(result)
# Cleanup
mcp_client.close()
The ARP MCP server exposes resources for real-time GraphQL subscriptions:
| Resource URI | Description |
|---|---|
graphql://subscription/taskCreated |
New task events (received by assignee) |
graphql://subscription/taskUpdated |
Task update events (received by assignee) |
graphql://subscription/taskDeleted |
Task deletion events (received by assignee) |
graphql://subscription/messageAdded |
New message events (received by receivers) |
# List available resources
resources = mcp_client.list_resources()
for resource in resources:
print(f"{resource['uri']}: {resource['name']}")
# Subscribe to task events
mcp_client.subscribe_resource("graphql://subscription/taskCreated")
mcp_client.subscribe_resource("graphql://subscription/taskUpdated")
# Unsubscribe
mcp_client.unsubscribe_resource("graphql://subscription/taskCreated")
Events are filtered by the ARP server based on user context:
This ensures each user only receives relevant notifications.
Run the integration tests (requires a running ARP server):
python -m pytest tests/integration/ -v --no-cov
arp_agent/
├── llm_agents/
│ ├── __init__.py # Package exports
│ ├── agent.py # Agent with tool-calling
│ ├── llm.py # OpenAI LLM wrapper
│ └── mcp_client.py # MCP client implementation
├── tests/
│ ├── conftest.py # Pytest fixtures
│ ├── test_setup_validation.py
│ ├── unit/ # Unit tests
│ └── integration/ # Integration tests
│ └── test_arp_agent_integration.py
├── specs/
│ └── schema.graphqls # GraphQL schema reference
├── run_arp_agent.py # Main entry point
├── run_tests.py # Test runner wrapper
├── pyproject.toml # Poetry configuration
├── poetry.lock # Locked dependencies
├── requirements.txt # Python dependencies (for pip)
├── .env.example # Environment template
├── CLIENT_GUIDE.md # ARP client implementation guide
└── README.md # This file
See LICENSE for details.