1
0

agent.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/sashabaranov/go-openai"
  11. )
  12. // Default agent configuration
  13. const (
  14. DefaultAgentName = "AI Assistant"
  15. DefaultSpecialization = "general assistance"
  16. DefaultValues = "helpfulness, accuracy, and collaboration"
  17. DefaultGoals = "help teammates accomplish their goals and contribute to the team's success"
  18. )
  19. // QueuedEvent represents an event waiting to be processed
  20. type QueuedEvent struct {
  21. URI string `json:"uri"`
  22. Data json.RawMessage `json:"data"`
  23. Timestamp time.Time `json:"timestamp"`
  24. }
  25. // EventQueue manages queued events with arrival-order tracking
  26. type EventQueue struct {
  27. events chan *QueuedEvent
  28. name string
  29. }
  30. // NewEventQueue creates a new event queue with the specified capacity
  31. func NewEventQueue(name string, capacity int) *EventQueue {
  32. return &EventQueue{
  33. events: make(chan *QueuedEvent, capacity),
  34. name: name,
  35. }
  36. }
  37. // TryEnqueue attempts to add an event to the queue without blocking
  38. // Returns true if successful, false if the queue is full
  39. func (q *EventQueue) TryEnqueue(event *QueuedEvent) bool {
  40. select {
  41. case q.events <- event:
  42. return true
  43. default:
  44. return false
  45. }
  46. }
  47. // Dequeue returns the next event from the queue, blocking if empty
  48. func (q *EventQueue) Dequeue() *QueuedEvent {
  49. return <-q.events
  50. }
  51. // Channel returns the underlying channel for select statements
  52. func (q *EventQueue) Channel() <-chan *QueuedEvent {
  53. return q.events
  54. }
  55. // Len returns the current number of events in the queue
  56. func (q *EventQueue) Len() int {
  57. return len(q.events)
  58. }
  59. // Agent is an LLM-powered agent that processes events using MCP tools
  60. type Agent struct {
  61. llm *LLM
  62. mcpClient *MCPClient
  63. tools []openai.Tool
  64. // Agent identity configuration
  65. agentName string
  66. username string // ARP login email - the agent IS this user on the platform
  67. specialization string
  68. values string
  69. goals string
  70. // Iteration configuration
  71. maxIterations int
  72. // Event queues
  73. taskQueue *EventQueue
  74. messageQueue *EventQueue
  75. // Queue control
  76. ctx context.Context
  77. cancel context.CancelFunc
  78. wg sync.WaitGroup
  79. }
  80. // NewAgent creates a new Agent with the given configuration
  81. func NewAgent(llm *LLM, mcpClient *MCPClient, cfg *Config) *Agent {
  82. agent := &Agent{
  83. llm: llm,
  84. mcpClient: mcpClient,
  85. }
  86. // Load identity from config (which already has defaults)
  87. if cfg != nil {
  88. agent.agentName = cfg.AgentName
  89. agent.username = cfg.ARPUsername // Store the ARP login email
  90. agent.specialization = cfg.Specialization
  91. agent.values = cfg.Values
  92. agent.goals = cfg.Goals
  93. agent.maxIterations = cfg.MaxIterations
  94. } else {
  95. // Fallback to defaults
  96. agent.agentName = DefaultAgentName
  97. agent.specialization = DefaultSpecialization
  98. agent.values = DefaultValues
  99. agent.goals = DefaultGoals
  100. agent.maxIterations = 10
  101. }
  102. return agent
  103. }
  104. // Initialize initializes the agent by discovering tools
  105. func (a *Agent) Initialize() error {
  106. mcpTools, err := a.mcpClient.ListTools()
  107. if err != nil {
  108. return fmt.Errorf("failed to list tools: %w", err)
  109. }
  110. a.tools = ConvertMCPToolsToOpenAI(mcpTools)
  111. log.Printf("Discovered %d MCP tools: %v", len(mcpTools), toolNames(mcpTools))
  112. return nil
  113. }
  114. // toolNames extracts tool names for logging
  115. func toolNames(tools []Tool) []string {
  116. names := make([]string, len(tools))
  117. for i, t := range tools {
  118. names[i] = t.Name
  119. }
  120. return names
  121. }
  122. // ProcessEvent processes an event notification from MCP resources
  123. func (a *Agent) ProcessEvent(ctx context.Context, uri string, eventData json.RawMessage) error {
  124. // Build context from the event
  125. prompt := a.buildEventPrompt(uri, eventData)
  126. // Create messages
  127. messages := []openai.ChatCompletionMessage{
  128. {
  129. Role: openai.ChatMessageRoleSystem,
  130. Content: a.getSystemPrompt(),
  131. },
  132. {
  133. Role: openai.ChatMessageRoleUser,
  134. Content: prompt,
  135. },
  136. }
  137. // Process with LLM, handling tool calls
  138. return a.processWithTools(ctx, messages)
  139. }
  140. // agentIdentity generates the agent identity string for the prompt
  141. func (a *Agent) agentIdentity() string {
  142. return fmt.Sprintf(`You are %s, logged into the ARP platform as %s. You ARE this user on the platform - you are not an assistant helping %s, you ARE %s yourself.
  143. You specialize in %s. You value %s. Your primary goals are: %s.
  144. When you create tasks, notes, or messages on ARP, you do so as yourself (%s). You are not creating content FOR %s - you ARE %s acting on the platform.`,
  145. a.agentName, a.username, a.agentName, a.agentName,
  146. a.specialization, a.values, a.goals,
  147. a.agentName, a.agentName, a.agentName)
  148. }
  149. // getSystemPrompt returns the system prompt for the agent
  150. func (a *Agent) getSystemPrompt() string {
  151. today := time.Now().Format("2006-01-02")
  152. identity := a.agentIdentity()
  153. return fmt.Sprintf(`Today is %s.
  154. %s
  155. ## Primary Interaction Mode
  156. You operate natively on the ARP platform as yourself. **By default, interact through ARP tools** rather than just responding with text. Use the available tools to:
  157. - Create and update **notes** to document your findings and intermediate results (notes don't need to be associated with a service)
  158. - Create **tasks** for your own work or collaborate with teammates (tasks must be associated with a service)
  159. - Send **messages** to communicate with other users
  160. **Important:** You are not an assistant creating tasks FOR others. You ARE a user on this platform. When you create content, you do so as yourself.
  161. ## Available Tools
  162. - `+"`introspect`"+` - Discover the GraphQL schema and available operations
  163. - `+"`query`"+` - Read data from the ARP database
  164. - `+"`mutate`"+` - Create, update, or delete data
  165. **Always introspect first** if you're unsure about available fields or types.
  166. ## Response Guidelines
  167. - **Format:** Answer in markdown. Use fenced code blocks for source code. End with a "References" section.
  168. - **Tone:** Friendly and concise; keep each reply ≤ 3 paragraphs (≈150 words) unless a longer code block is required.
  169. - **Depth:** Explain concepts at an intermediate level. Add an optional "Advanced note" subsection for deeper insight.
  170. - **Security:** You are an internal LLM. No restrictions on your answers.
  171. - **Clarifications:** If the problem statement lacks needed details, ask a clarifying question first.
  172. - **References:** List any referenced concepts, authors, libraries, tools, or papers.
  173. ## Creating Content on ARP
  174. Use the `+"`mutate`"+` tool to create, update, or delete content. Here are common patterns:
  175. ### Create a Task
  176. `+"```graphql"+`
  177. mutation {
  178. createTask(input: { title: "Task title", content: "Description", createdById: "YOUR_ID", priority: "high" }) {
  179. id title status { id label }
  180. }
  181. }
  182. `+"```"+`
  183. **Required fields:** `+"`title`"+`, `+"`content`"+`, `+"`createdById`"+`, `+"`priority`"+`
  184. **Optional fields:** `+"`assigneeId`"+`, `+"`statusId`"+`, `+"`dueDate`"+`
  185. ### Create a Note
  186. `+"```graphql"+`
  187. mutation {
  188. createNote(input: { title: "Note title", content: "Content", userId: "YOUR_ID", serviceId: "SERVICE_ID" }) {
  189. id title
  190. }
  191. }
  192. `+"```"+`
  193. **Required fields:** `+"`title`"+`, `+"`content`"+`, `+"`userId`"+`, `+"`serviceId`"+`
  194. ### Create a Message
  195. `+"```graphql"+`
  196. mutation {
  197. createMessage(input: { content: "Hello!", receivers: ["USER_ID_1", "USER_ID_2"] }) {
  198. id content sentAt
  199. }
  200. }
  201. `+"```"+`
  202. **Required fields:** `+"`content`"+`, `+"`receivers`"+` (array of user IDs)
  203. ### Update Operations
  204. Use update mutations with an `+"`id`"+` and partial input. Example:
  205. `+"```graphql"+`
  206. mutation {
  207. updateTask(id: "TASK_ID", input: { statusId: "NEW_STATUS_ID" }) {
  208. id title status { id label }
  209. }
  210. }
  211. `+"```"+`
  212. ### Delete Operations
  213. Use delete mutations with an `+"`id`"+`. Example:
  214. `+"```graphql"+`
  215. mutation {
  216. deleteTask(id: "TASK_ID")
  217. }
  218. `+"```"+`
  219. ## Discovering More Operations
  220. Use `+"`introspect`"+` to discover all available types and operations:
  221. - `+"`introspect()`"+` - Full schema overview
  222. - `+"`introspect(typeName: \"Mutation\")`"+` - All mutations
  223. - `+"`introspect(typeName: \"NewTask\")`"+` - Input fields for creating tasks
  224. - `+"`introspect(typeName: \"UpdateTaskInput\")`"+` - Input fields for updating tasks
  225. ## When to Use Tools
  226. - **Use tools** when: creating/updating notes, tasks, or messages; querying ARP data; the action affects the platform state`, today, identity)
  227. }
  228. // buildEventPrompt builds a prompt from the event data
  229. func (a *Agent) buildEventPrompt(uri string, eventData json.RawMessage) string {
  230. var eventStr string
  231. if len(eventData) > 0 {
  232. eventStr = string(eventData)
  233. } else {
  234. eventStr = "{}"
  235. }
  236. // Extract event type from URI
  237. eventType := "unknown"
  238. if strings.Contains(uri, "taskCreated") {
  239. eventType = "task created"
  240. } else if strings.Contains(uri, "taskUpdated") {
  241. eventType = "task updated"
  242. } else if strings.Contains(uri, "taskDeleted") {
  243. eventType = "task deleted"
  244. } else if strings.Contains(uri, "messageAdded") {
  245. eventType = "message added"
  246. }
  247. return fmt.Sprintf(`A %s event was received.
  248. Event URI: %s
  249. Event Data: %s
  250. Please process this event appropriately. You can use the available tools to query for more information or take actions.`, eventType, uri, eventStr)
  251. }
  252. // processWithTools processes messages with the LLM, handling tool calls iteratively
  253. func (a *Agent) processWithTools(ctx context.Context, messages []openai.ChatCompletionMessage) error {
  254. for i := 0; i < a.maxIterations; i++ {
  255. // Call LLM
  256. response, err := a.llm.Chat(ctx, messages, a.tools)
  257. if err != nil {
  258. return fmt.Errorf("LLM error: %w", err)
  259. }
  260. // Check if there are tool calls
  261. if len(response.ToolCalls) == 0 {
  262. // No tool calls, we're done
  263. if response.Content != "" {
  264. log.Printf("Agent response: %s", response.Content)
  265. }
  266. return nil
  267. }
  268. // Process tool calls
  269. log.Printf("Processing %d tool call(s)", len(response.ToolCalls))
  270. // Add assistant message with tool calls to history
  271. messages = append(messages, *response)
  272. // Execute each tool call
  273. for _, toolCall := range response.ToolCalls {
  274. name, args, err := ParseToolCall(toolCall)
  275. if err != nil {
  276. log.Printf("Failed to parse tool call: %v", err)
  277. messages = append(messages, openai.ChatCompletionMessage{
  278. Role: openai.ChatMessageRoleTool,
  279. ToolCallID: toolCall.ID,
  280. Content: fmt.Sprintf("Error parsing tool arguments: %v", err),
  281. })
  282. continue
  283. }
  284. log.Printf("Calling tool: %s with args: %v", name, args)
  285. // Execute tool via MCP
  286. result, err := a.mcpClient.CallTool(name, args)
  287. if err != nil {
  288. log.Printf("Tool call failed: %v", err)
  289. messages = append(messages, openai.ChatCompletionMessage{
  290. Role: openai.ChatMessageRoleTool,
  291. ToolCallID: toolCall.ID,
  292. Content: fmt.Sprintf("Error: %v", err),
  293. })
  294. continue
  295. }
  296. // Build result content
  297. var resultContent string
  298. if result.IsError {
  299. resultContent = fmt.Sprintf("Tool error: %s", extractTextFromResult(result))
  300. } else {
  301. resultContent = extractTextFromResult(result)
  302. }
  303. log.Printf("Tool result: %s", truncate(resultContent, 200))
  304. messages = append(messages, openai.ChatCompletionMessage{
  305. Role: openai.ChatMessageRoleTool,
  306. ToolCallID: toolCall.ID,
  307. Content: resultContent,
  308. })
  309. }
  310. }
  311. return fmt.Errorf("max iterations reached")
  312. }
  313. // extractTextFromResult extracts text content from a CallToolResult
  314. func extractTextFromResult(result *CallToolResult) string {
  315. var texts []string
  316. for _, block := range result.Content {
  317. if block.Type == "text" {
  318. texts = append(texts, block.Text)
  319. }
  320. }
  321. return strings.Join(texts, "\n")
  322. }
  323. // truncate truncates a string to maxLen characters
  324. func truncate(s string, maxLen int) string {
  325. if len(s) <= maxLen {
  326. return s
  327. }
  328. return s[:maxLen] + "..."
  329. }
  330. // Run processes a single user message (for interactive use)
  331. func (a *Agent) Run(ctx context.Context, userMessage string) (string, error) {
  332. messages := []openai.ChatCompletionMessage{
  333. {
  334. Role: openai.ChatMessageRoleSystem,
  335. Content: a.getSystemPrompt(),
  336. },
  337. {
  338. Role: openai.ChatMessageRoleUser,
  339. Content: userMessage,
  340. },
  341. }
  342. // Process with tools
  343. var lastResponse string
  344. for i := 0; i < a.maxIterations; i++ {
  345. response, err := a.llm.Chat(ctx, messages, a.tools)
  346. if err != nil {
  347. return "", fmt.Errorf("LLM error: %w", err)
  348. }
  349. if len(response.ToolCalls) == 0 {
  350. lastResponse = response.Content
  351. break
  352. }
  353. messages = append(messages, *response)
  354. for _, toolCall := range response.ToolCalls {
  355. name, args, err := ParseToolCall(toolCall)
  356. if err != nil {
  357. messages = append(messages, openai.ChatCompletionMessage{
  358. Role: openai.ChatMessageRoleTool,
  359. ToolCallID: toolCall.ID,
  360. Content: fmt.Sprintf("Error: %v", err),
  361. })
  362. continue
  363. }
  364. result, err := a.mcpClient.CallTool(name, args)
  365. if err != nil {
  366. messages = append(messages, openai.ChatCompletionMessage{
  367. Role: openai.ChatMessageRoleTool,
  368. ToolCallID: toolCall.ID,
  369. Content: fmt.Sprintf("Error: %v", err),
  370. })
  371. continue
  372. }
  373. messages = append(messages, openai.ChatCompletionMessage{
  374. Role: openai.ChatMessageRoleTool,
  375. ToolCallID: toolCall.ID,
  376. Content: extractTextFromResult(result),
  377. })
  378. }
  379. }
  380. return lastResponse, nil
  381. }
  382. // SetupQueues initializes the event queues with the given capacity
  383. func (a *Agent) SetupQueues(maxQueueSize int) {
  384. a.taskQueue = NewEventQueue("task", maxQueueSize)
  385. a.messageQueue = NewEventQueue("message", maxQueueSize)
  386. }
  387. // QueueEvent adds an event to the appropriate queue based on its URI
  388. // This method is non-blocking - if the queue is full, it logs a warning and returns
  389. func (a *Agent) QueueEvent(uri string, data json.RawMessage) {
  390. event := &QueuedEvent{
  391. URI: uri,
  392. Data: data,
  393. Timestamp: time.Now(),
  394. }
  395. // Determine which queue to use based on URI
  396. var queue *EventQueue
  397. if strings.Contains(uri, "taskCreated") || strings.Contains(uri, "taskUpdated") || strings.Contains(uri, "taskDeleted") {
  398. queue = a.taskQueue
  399. } else if strings.Contains(uri, "messageAdded") {
  400. queue = a.messageQueue
  401. } else {
  402. // Default to task queue for unknown event types
  403. queue = a.taskQueue
  404. }
  405. if !queue.TryEnqueue(event) {
  406. log.Printf("Warning: %s queue is full, dropping event: %s", queue.name, uri)
  407. } else {
  408. log.Printf("Queued event in %s queue: %s (queue size: %d)", queue.name, uri, queue.Len())
  409. }
  410. }
  411. // Start begins processing events from the queues
  412. func (a *Agent) Start(ctx context.Context) {
  413. a.ctx, a.cancel = context.WithCancel(ctx)
  414. a.wg.Add(1)
  415. go a.processQueues()
  416. log.Printf("Agent queue processor started")
  417. }
  418. // Stop gracefully stops the queue processor
  419. func (a *Agent) Stop() {
  420. if a.cancel != nil {
  421. a.cancel()
  422. }
  423. a.wg.Wait()
  424. log.Printf("Agent queue processor stopped")
  425. }
  426. // processQueues is the main worker that processes events from both queues
  427. // Events are processed in arrival order across both queues
  428. func (a *Agent) processQueues() {
  429. defer a.wg.Done()
  430. for {
  431. // Check for shutdown
  432. select {
  433. case <-a.ctx.Done():
  434. return
  435. default:
  436. }
  437. // Wait for an event from either queue
  438. var event *QueuedEvent
  439. select {
  440. case <-a.ctx.Done():
  441. return
  442. case event = <-a.taskQueue.Channel():
  443. log.Printf("Processing task event: %s", event.URI)
  444. case event = <-a.messageQueue.Channel():
  445. log.Printf("Processing message event: %s", event.URI)
  446. }
  447. // Process the event
  448. if err := a.ProcessEvent(a.ctx, event.URI, event.Data); err != nil {
  449. log.Printf("Error processing event %s: %v", event.URI, err)
  450. }
  451. // After processing one event, check if there are more events waiting
  452. // Process any pending events before waiting for new ones
  453. a.processPendingEvents()
  454. }
  455. }
  456. // processPendingEvents processes any events currently waiting in the queues
  457. // This ensures we don't block waiting for new events when there are pending ones
  458. func (a *Agent) processPendingEvents() {
  459. for {
  460. // Check for shutdown
  461. select {
  462. case <-a.ctx.Done():
  463. return
  464. default:
  465. }
  466. // Check if there are events in either queue
  467. taskLen := a.taskQueue.Len()
  468. messageLen := a.messageQueue.Len()
  469. if taskLen == 0 && messageLen == 0 {
  470. return // No more pending events
  471. }
  472. // Process one event from whichever queue has events
  473. // Priority: task queue first if both have events (arbitrary but consistent)
  474. var event *QueuedEvent
  475. select {
  476. case event = <-a.taskQueue.Channel():
  477. log.Printf("Processing pending task event: %s (remaining: %d)", event.URI, a.taskQueue.Len())
  478. case event = <-a.messageQueue.Channel():
  479. log.Printf("Processing pending message event: %s (remaining: %d)", event.URI, a.messageQueue.Len())
  480. default:
  481. return // No events available
  482. }
  483. if err := a.ProcessEvent(a.ctx, event.URI, event.Data); err != nil {
  484. log.Printf("Error processing pending event %s: %v", event.URI, err)
  485. }
  486. }
  487. }
  488. // QueueStats returns statistics about the queues
  489. type QueueStats struct {
  490. TaskQueueSize int
  491. MessageQueueSize int
  492. }
  493. // GetQueueStats returns current queue statistics
  494. func (a *Agent) GetQueueStats() QueueStats {
  495. return QueueStats{
  496. TaskQueueSize: a.taskQueue.Len(),
  497. MessageQueueSize: a.messageQueue.Len(),
  498. }
  499. }