ソースを参照

add workflows

david 3 時間 前
コミット
e7c1121f4d

+ 2 - 1
.gitignore

@@ -1,5 +1,6 @@
+spec
 arp.db
 arp
 arp_spec.md
 arp_cli/arp_cli
-arp_agent/arp_agent
+arp_agent/arp_agent

+ 297 - 0
CLIENT_GUIDE.md

@@ -1649,3 +1649,300 @@ eventSource.addEventListener('message', (event) => {
 4. **Session Management**: Maintain the SSE connection for the duration of your session
 5. **Reconnect on Disconnect**: Implement reconnection logic for long-running sessions
 6. **Use Resources for Real-time**: Subscribe to resources for real-time updates instead of polling
+
+---
+
+## Workflow Engine
+
+The workflow engine enables you to define complex multi-step processes as JSON-based DAGs (Directed Acyclic Graphs). Each node in the DAG represents a task or decision point, and dependencies between nodes are automatically resolved.
+
+### Workflow Types
+
+| Type | Description |
+|------|-------------|
+| `task` | A task node that creates a task for a user |
+| `condition` | A decision node that evaluates conditions |
+| `parallel` | A node that spawns parallel branches |
+| `join` | A node that waits for multiple branches to complete |
+| `trigger` | A node that starts the workflow |
+
+### Workflow Permissions
+
+| Operation | Required Permission |
+|-----------|---------------------|
+| `createWorkflowTemplate` | `workflow:create` |
+| `updateWorkflowTemplate` | `workflow:manage` |
+| `deleteWorkflowTemplate` | `workflow:manage` |
+| `startWorkflow` | `workflow:start` |
+| `cancelWorkflow` | `workflow:manage` |
+| `retryWorkflowNode` | `workflow:intervene` |
+| `workflowTemplates` query | `workflow:view` |
+| `workflowInstance` query | `workflow:view` |
+
+### Workflow Definition Format
+
+Workflows are defined as JSON with a `nodes` object where each key is a unique node identifier:
+
+```json
+{
+  "nodes": {
+    "start": {
+      "type": "task",
+      "title": "Initial Review",
+      "content": "Review the initial request",
+      "assignee": "1",
+      "dependsOn": []
+    },
+    "analysis": {
+      "type": "task",
+      "title": "Data Analysis",
+      "content": "Analyze the data",
+      "assignee": "2",
+      "dependsOn": ["start"]
+    },
+    "approval": {
+      "type": "task",
+      "title": "Manager Approval",
+      "content": "Get manager approval",
+      "assignee": "3",
+      "dependsOn": ["analysis"]
+    },
+    "end": {
+      "type": "task",
+      "title": "Complete",
+      "content": "Mark workflow as complete",
+      "assignee": "1",
+      "dependsOn": ["approval"]
+    }
+  }
+}
+```
+
+### Workflow API Examples
+
+#### Create a Workflow Template
+
+```graphql
+mutation CreateWorkflowTemplate($input: NewWorkflowTemplate!) {
+  createWorkflowTemplate(input: $input) {
+    id
+    name
+    description
+    definition
+    isActive
+    createdAt
+  }
+}
+```
+
+**Variables:**
+```json
+{
+  "input": {
+    "name": "Onboarding",
+    "description": "New employee onboarding workflow",
+    "definition": "{\"nodes\":{\"start\":{\"type\":\"task\",\"title\":\"Welcome\",\"content\":\"Send welcome email\",\"assignee\":\"1\",\"dependsOn\":[]},\"setup\":{\"type\":\"task\",\"title\":\"Setup Account\",\"content\":\"Create user account\",\"assignee\":\"2\",\"dependsOn\":[\"start\"]}}}",
+    "isActive": true
+  }
+}
+```
+
+#### Start a Workflow Instance
+
+```graphql
+mutation StartWorkflow($templateId: ID!, $input: StartWorkflowInput!) {
+  startWorkflow(templateId: $templateId, input: $input) {
+    id
+    status
+    context
+    template {
+      id
+      name
+    }
+    service {
+      id
+      name
+    }
+    createdAt
+  }
+}
+```
+
+**Variables:**
+```json
+{
+  "templateId": "1",
+  "input": {
+    "serviceId": "1",
+    "context": "New hire onboarding"
+  }
+}
+```
+
+#### Get All Workflow Instances
+
+```graphql
+query WorkflowInstances {
+  workflowInstances {
+    id
+    status
+    context
+    template {
+      id
+      name
+    }
+    service {
+      id
+      name
+    }
+    createdAt
+    completedAt
+  }
+}
+```
+
+#### Get Single Workflow Instance
+
+```graphql
+query WorkflowInstance($id: ID!) {
+  workflowInstance(id: $id) {
+    id
+    status
+    context
+    template {
+      id
+      name
+      definition
+    }
+    service {
+      id
+      name
+    }
+    createdAt
+    completedAt
+  }
+}
+```
+
+#### Cancel a Workflow
+
+```graphql
+mutation CancelWorkflow($id: ID!) {
+  cancelWorkflow(id: $id) {
+    id
+    status
+    completedAt
+  }
+}
+```
+
+#### Retry a Failed Node
+
+```graphql
+mutation RetryWorkflowNode($nodeId: ID!) {
+  retryWorkflowNode(nodeId: $nodeId) {
+    id
+    nodeKey
+    nodeType
+    status
+    task {
+      id
+      title
+    }
+    retryCount
+  }
+}
+```
+
+### MCP Tools for Workflows
+
+The MCP server also supports workflow operations through the `query` and `mutate` tools.
+
+#### Query Workflow Templates
+
+```json
+{
+  "jsonrpc": "2.0",
+  "id": 10,
+  "method": "tools/call",
+  "params": {
+    "name": "query",
+    "arguments": {
+      "query": "query { workflowTemplates { id name isActive } }"
+    }
+  }
+}
+```
+
+#### Start Workflow via MCP
+
+```json
+{
+  "jsonrpc": "2.0",
+  "id": 11,
+  "method": "tools/call",
+  "params": {
+    "name": "mutate",
+    "arguments": {
+      "mutation": "mutation StartWorkflow($templateId: ID!, $input: StartWorkflowInput!) { startWorkflow(templateId: $templateId, input: $input) { id status } }",
+      "variables": {
+        "templateId": "1",
+        "input": {
+          "serviceId": "1",
+          "context": "Workflow started via MCP"
+        }
+      }
+    }
+  }
+}
+```
+
+### Workflow Node States
+
+| State | Description |
+|-------|-------------|
+| `pending` | Node is waiting for dependencies to complete |
+| `ready` | Node is ready to execute (all dependencies satisfied) |
+| `running` | Node is currently executing |
+| `completed` | Node has completed successfully |
+| `failed` | Node has failed (may be retried) |
+| `skipped` | Node was skipped |
+
+### Automatic Task Completion Integration
+
+When a task associated with a workflow node is marked as "done", the workflow engine automatically:
+
+1. **Marks the node as completed** - The workflow node status changes to `completed`
+2. **Evaluates downstream dependencies** - Checks which nodes are now ready to execute
+3. **Creates tasks for newly ready nodes** - Automatically creates tasks for nodes whose dependencies are satisfied
+
+This enables workflows to progress automatically as users complete their assigned tasks without manual intervention.
+
+### Retry Logic
+
+Workflows support automatic retry with configurable max retries:
+
+- **Default Max Retries**: 3
+- **Retry Delay**: 60 seconds (configurable via `WORKFLOW_RETRY_DELAY`)
+
+To manually retry a failed node:
+
+```graphql
+mutation RetryWorkflowNode($nodeId: ID!) {
+  retryWorkflowNode(nodeId: $nodeId) {
+    id
+    nodeKey
+    status
+    retryCount
+  }
+}
+```
+
+### Best Practices
+
+1. **Design DAGs Carefully**: Ensure workflows have clear start and end points
+2. **Use Meaningful Node Keys**: Node keys should be descriptive and unique
+3. **Handle Failures**: Design workflows to handle node failures gracefully
+4. **Monitor Workflow Progress**: Use `workflowInstances` query to track progress
+5. **Set Proper Assignees**: Ensure each task node has an appropriate assignee
+6. **Test Workflows**: Test workflows with sample data before production use

+ 108 - 1
README.md

@@ -1,6 +1,6 @@
 # ARP - Agent Resource Platform
 
-A GraphQL-based coordination system for users and agents to collaborate on services, tasks, and communications.
+A GraphQL-based coordination system for users and agents to collaborate on services, tasks, and workflows.
 
 ## Overview
 
@@ -11,6 +11,7 @@ ARP (Agent Resource Platform) is a coordination backend that enables human users
 - **Task Management** - Assignable work items with status tracking
 - **Notes** - Attach notes to services for context
 - **Messaging** - Real-time chat channels between participants
+- **Workflow Engine** - DAG-based workflow definitions with automatic task creation and dependency management
 
 ## Architecture
 
@@ -225,6 +226,112 @@ type Mutation {
 }
 ```
 
+## Workflow Engine
+
+The workflow engine enables you to define complex multi-step processes as JSON-based DAGs (Directed Acyclic Graphs). Each node in the DAG represents a task or decision point, and dependencies between nodes are automatically resolved.
+
+### Workflow Definition Format
+
+Workflows are defined as JSON with a `nodes` object where each key is a unique node identifier:
+
+```json
+{
+  "nodes": {
+    "start": {
+      "type": "task",
+      "title": "Initial Review",
+      "content": "Review the initial request",
+      "assignee": "1",
+      "dependsOn": []
+    },
+    "analysis": {
+      "type": "task",
+      "title": "Data Analysis",
+      "content": "Analyze the data",
+      "assignee": "2",
+      "dependsOn": ["start"]
+    },
+    "approval": {
+      "type": "task",
+      "title": "Manager Approval",
+      "content": "Get manager approval",
+      "assignee": "3",
+      "dependsOn": ["analysis"]
+    },
+    "end": {
+      "type": "task",
+      "title": "Complete",
+      "content": "Mark workflow as complete",
+      "assignee": "1",
+      "dependsOn": ["approval"]
+    }
+  }
+}
+```
+
+### Workflow Permissions
+
+| Operation | Required Permission |
+|-----------|---------------------|
+| Create Workflow Template | `workflow:create` |
+| Update Workflow Template | `workflow:manage` |
+| Delete Workflow Template | `workflow:manage` |
+| Start Workflow | `workflow:start` |
+| Cancel Workflow | `workflow:manage` |
+| Retry Node | `workflow:intervene` |
+| View Workflows | `workflow:view` |
+
+### Workflow Node Types
+
+| Type | Description |
+|------|-------------|
+| `task` | Creates a task for a user to complete |
+| `condition` | Evaluates conditions to determine next steps |
+| `parallel` | Spawns multiple concurrent branches |
+| `join` | Waits for multiple branches to complete |
+| `trigger` | Starts the workflow automatically |
+
+### Automatic Task Completion Integration
+
+When a task associated with a workflow node is marked as "done", the workflow engine automatically:
+1. Marks the corresponding node as completed
+2. Evaluates downstream dependencies
+3. Creates tasks for newly ready nodes
+
+This enables workflows to progress automatically as users complete their assigned tasks.
+
+### Workflow API Examples
+
+```bash
+# Create a workflow template
+curl -s -X POST http://localhost:8080/query \
+  -H "Content-Type: application/json" \
+  -H "Authorization: Bearer $TOKEN" \
+  -d '{
+    "query": "mutation { createWorkflowTemplate(input: {name: \"Onboarding\", description: \"New employee onboarding\", definition: \"{\\\"nodes\\\":{\\\"start\\\":{\\\"type\\\":\\\"task\\\",\\\"title\\\":\\\"Welcome\\\",\\\"content\\\":\\\"Send welcome email\\\",\\\"assignee\\\":\\\"1\\\",\\\"dependsOn\\\":[]}}}\"}) { id name }"
+  }' | jq
+
+# Start a workflow instance
+curl -s -X POST http://localhost:8080/query \
+  -H "Content-Type: application/json" \
+  -H "Authorization: Bearer $TOKEN" \
+  -d '{
+    "query": "mutation { startWorkflow(templateId: \"1\", input: {serviceId: \"1\", context: \"New hire onboarding\"}) { id status template { name } } }"
+  }' | jq
+
+# Get all workflow instances
+curl -s -X POST http://localhost:8080/query \
+  -H "Content-Type: application/json" \
+  -H "Authorization: Bearer $TOKEN" \
+  -d '{"query":"{ workflowInstances { id status template { name } service { name } } }"}' | jq
+
+# Retry a failed workflow node
+curl -s -X POST http://localhost:8080/query \
+  -H "Content-Type: application/json" \
+  -H "Authorization: Bearer $TOKEN" \
+  -d '{"query":"mutation { retryWorkflowNode(nodeId: \"1\") { id nodeKey status retryCount } }"}' | jq
+```
+
 
 ## Environment Variables
 

+ 0 - 3
TODO.md

@@ -1,3 +0,0 @@
-* fix message notification -> sender still seems to be notified about their own messages.
-* maybe switch to direct messaging using a recipient id list.
-* add user profile description in model for users to figure out whom to include in messages

+ 0 - 0
ARP_AGENT_README.MD → arp_agent/README.MD


BIN
arp_server


+ 117 - 0
graph/converters.go

@@ -265,3 +265,120 @@ func convertMessages(messages []models.Message) []*model.Message {
 	}
 	return result
 }
+
+// convertWorkflowTemplate converts models.WorkflowTemplate to model.WorkflowTemplate
+func convertWorkflowTemplate(w models.WorkflowTemplate) *model.WorkflowTemplate {
+	var createdBy *model.User
+	if w.CreatedBy.ID > 0 {
+		createdBy = convertUser(w.CreatedBy)
+	}
+	return &model.WorkflowTemplate{
+		ID:   strconv.FormatUint(uint64(w.ID), 10),
+		Name: w.Name,
+		Description: func() *string {
+			if w.Description != "" {
+				return &w.Description
+			}
+			return nil
+		}(),
+		Definition: w.Definition,
+		IsActive:   w.IsActive,
+		CreatedBy:  createdBy,
+		CreatedAt:  w.CreatedAt.String(),
+		UpdatedAt:  w.UpdatedAt.String(),
+	}
+}
+
+// convertWorkflowTemplates converts []models.WorkflowTemplate to []*model.WorkflowTemplate
+func convertWorkflowTemplates(templates []models.WorkflowTemplate) []*model.WorkflowTemplate {
+	result := make([]*model.WorkflowTemplate, len(templates))
+	for i, w := range templates {
+		result[i] = convertWorkflowTemplate(w)
+	}
+	return result
+}
+
+// convertWorkflowInstance converts models.WorkflowInstance to model.WorkflowInstance
+func convertWorkflowInstance(w models.WorkflowInstance) *model.WorkflowInstance {
+	template := convertWorkflowTemplate(w.WorkflowTemplate)
+	var context *string
+	if w.Context != "" {
+		context = &w.Context
+	}
+	var service *model.Service
+	if w.Service != nil && w.Service.ID > 0 {
+		service = convertService(*w.Service)
+	}
+	var completedAt *string
+	if w.CompletedAt != nil {
+		completedAtVal := w.CompletedAt.String()
+		completedAt = &completedAtVal
+	}
+	return &model.WorkflowInstance{
+		ID:          strconv.FormatUint(uint64(w.ID), 10),
+		Template:    template,
+		Status:      w.Status,
+		Context:     context,
+		Service:     service,
+		CreatedAt:   w.CreatedAt.String(),
+		UpdatedAt:   w.UpdatedAt.String(),
+		CompletedAt: completedAt,
+	}
+}
+
+// convertWorkflowInstances converts []models.WorkflowInstance to []*model.WorkflowInstance
+func convertWorkflowInstances(instances []models.WorkflowInstance) []*model.WorkflowInstance {
+	result := make([]*model.WorkflowInstance, len(instances))
+	for i, w := range instances {
+		result[i] = convertWorkflowInstance(w)
+	}
+	return result
+}
+
+// convertWorkflowNode converts models.WorkflowNode to model.WorkflowNode
+func convertWorkflowNode(n models.WorkflowNode) *model.WorkflowNode {
+	var task *model.Task
+	if n.Task != nil {
+		task = convertTask(*n.Task)
+	}
+	var inputData *string
+	if n.InputData != "" {
+		inputData = &n.InputData
+	}
+	var outputData *string
+	if n.OutputData != "" {
+		outputData = &n.OutputData
+	}
+	var startedAt *string
+	if n.StartedAt != nil {
+		startedAtVal := n.StartedAt.String()
+		startedAt = &startedAtVal
+	}
+	var completedAt *string
+	if n.CompletedAt != nil {
+		completedAtVal := n.CompletedAt.String()
+		completedAt = &completedAtVal
+	}
+	return &model.WorkflowNode{
+		ID:          strconv.FormatUint(uint64(n.ID), 10),
+		NodeKey:     n.NodeKey,
+		NodeType:    n.NodeType,
+		Status:      n.Status,
+		Task:        task,
+		InputData:   inputData,
+		OutputData:  outputData,
+		CreatedAt:   n.CreatedAt.String(),
+		UpdatedAt:   n.UpdatedAt.String(),
+		StartedAt:   startedAt,
+		CompletedAt: completedAt,
+	}
+}
+
+// convertWorkflowNodes converts []models.WorkflowNode to []*model.WorkflowNode
+func convertWorkflowNodes(nodes []models.WorkflowNode) []*model.WorkflowNode {
+	result := make([]*model.WorkflowNode, len(nodes))
+	for i, n := range nodes {
+		result[i] = convertWorkflowNode(n)
+	}
+	return result
+}

ファイルの差分が大きいため隠しています
+ 810 - 46
graph/generated.go


+ 55 - 0
graph/model/models_gen.go

@@ -73,6 +73,13 @@ type NewUser struct {
 	Roles    []string `json:"roles"`
 }
 
+type NewWorkflowTemplate struct {
+	Name        string  `json:"name"`
+	Description *string `json:"description,omitempty"`
+	Definition  string  `json:"definition"`
+	IsActive    *bool   `json:"isActive,omitempty"`
+}
+
 type Note struct {
 	ID        string   `json:"id"`
 	Title     string   `json:"title"`
@@ -113,6 +120,11 @@ type Service struct {
 	UpdatedAt    string  `json:"updatedAt"`
 }
 
+type StartWorkflowInput struct {
+	ServiceID *string `json:"serviceId,omitempty"`
+	Context   *string `json:"context,omitempty"`
+}
+
 type Subscription struct {
 }
 
@@ -194,6 +206,13 @@ type UpdateUserInput struct {
 	Roles    []string `json:"roles"`
 }
 
+type UpdateWorkflowTemplateInput struct {
+	Name        *string `json:"name,omitempty"`
+	Description *string `json:"description,omitempty"`
+	Definition  *string `json:"definition,omitempty"`
+	IsActive    *bool   `json:"isActive,omitempty"`
+}
+
 type User struct {
 	ID        string  `json:"id"`
 	Email     string  `json:"email"`
@@ -201,3 +220,39 @@ type User struct {
 	CreatedAt string  `json:"createdAt"`
 	UpdatedAt string  `json:"updatedAt"`
 }
+
+type WorkflowInstance struct {
+	ID          string            `json:"id"`
+	Template    *WorkflowTemplate `json:"template"`
+	Status      string            `json:"status"`
+	Context     *string           `json:"context,omitempty"`
+	Service     *Service          `json:"service,omitempty"`
+	CreatedAt   string            `json:"createdAt"`
+	UpdatedAt   string            `json:"updatedAt"`
+	CompletedAt *string           `json:"completedAt,omitempty"`
+}
+
+type WorkflowNode struct {
+	ID          string  `json:"id"`
+	NodeKey     string  `json:"nodeKey"`
+	NodeType    string  `json:"nodeType"`
+	Status      string  `json:"status"`
+	Task        *Task   `json:"task,omitempty"`
+	InputData   *string `json:"inputData,omitempty"`
+	OutputData  *string `json:"outputData,omitempty"`
+	CreatedAt   string  `json:"createdAt"`
+	UpdatedAt   string  `json:"updatedAt"`
+	StartedAt   *string `json:"startedAt,omitempty"`
+	CompletedAt *string `json:"completedAt,omitempty"`
+}
+
+type WorkflowTemplate struct {
+	ID          string  `json:"id"`
+	Name        string  `json:"name"`
+	Description *string `json:"description,omitempty"`
+	Definition  string  `json:"definition"`
+	IsActive    bool    `json:"isActive"`
+	CreatedBy   *User   `json:"createdBy"`
+	CreatedAt   string  `json:"createdAt"`
+	UpdatedAt   string  `json:"updatedAt"`
+}

+ 71 - 0
graph/schema.graphqls

@@ -99,6 +99,43 @@ type Message {
   updatedAt: String!
 }
 
+# Workflow types
+type WorkflowTemplate {
+  id: ID!
+  name: String!
+  description: String
+  definition: String!
+  isActive: Boolean!
+  createdBy: User!
+  createdAt: String!
+  updatedAt: String!
+}
+
+type WorkflowInstance {
+  id: ID!
+  template: WorkflowTemplate!
+  status: String!
+  context: String
+  service: Service
+  createdAt: String!
+  updatedAt: String!
+  completedAt: String
+}
+
+type WorkflowNode {
+  id: ID!
+  nodeKey: String!
+  nodeType: String!
+  status: String!
+  task: Task
+  inputData: String
+  outputData: String
+  createdAt: String!
+  updatedAt: String!
+  startedAt: String
+  completedAt: String
+}
+
 # Root Query type
 type Query {
   # Users
@@ -132,6 +169,32 @@ type Query {
   # Messages
   messages: [Message!]!
   message(id: ID!): Message
+
+  # Workflows
+  workflowTemplates: [WorkflowTemplate!]!
+  workflowTemplate(id: ID!): WorkflowTemplate
+  workflowInstances: [WorkflowInstance!]!
+  workflowInstance(id: ID!): WorkflowInstance
+}
+
+# Input types for workflow mutations
+input NewWorkflowTemplate {
+  name: String!
+  description: String
+  definition: String!
+  isActive: Boolean
+}
+
+input UpdateWorkflowTemplateInput {
+  name: String
+  description: String
+  definition: String
+  isActive: Boolean
+}
+
+input StartWorkflowInput {
+  serviceId: ID
+  context: String
 }
 
 # Root Mutation type
@@ -178,6 +241,14 @@ type Mutation {
   createMessage(input: NewMessage!): Message!
   updateMessage(id: ID!, input: UpdateMessageInput!): Message!
   deleteMessage(id: ID!): Boolean!
+
+  # Workflows
+  createWorkflowTemplate(input: NewWorkflowTemplate!): WorkflowTemplate!
+  updateWorkflowTemplate(id: ID!, input: UpdateWorkflowTemplateInput!): WorkflowTemplate!
+  deleteWorkflowTemplate(id: ID!): Boolean!
+  startWorkflow(templateId: ID!, input: StartWorkflowInput!): WorkflowInstance!
+  cancelWorkflow(id: ID!): WorkflowInstance!
+  retryWorkflowNode(nodeId: ID!): WorkflowNode!
 }
 
 # Input types for mutations

+ 367 - 1
graph/schema.resolvers.go

@@ -15,6 +15,7 @@ import (
 	"gogs.dmsc.dev/arp/graph/model"
 	"gogs.dmsc.dev/arp/logging"
 	"gogs.dmsc.dev/arp/models"
+	"gogs.dmsc.dev/arp/workflow"
 )
 
 // Login is the resolver for the login field.
@@ -649,7 +650,7 @@ func (r *mutationResolver) UpdateTask(ctx context.Context, id string, input mode
 	}
 
 	var existing models.Task
-	if err := r.DB.Preload("CreatedBy").Preload("Assignee").First(&existing, taskID).Error; err != nil {
+	if err := r.DB.Preload("CreatedBy").Preload("Assignee").Preload("Status").First(&existing, taskID).Error; err != nil {
 		return nil, fmt.Errorf("task not found: %w", err)
 	}
 
@@ -710,6 +711,35 @@ func (r *mutationResolver) UpdateTask(ctx context.Context, id string, input mode
 	graphqlTask := convertTask(existing)
 	r.PublishTaskEvent(graphqlTask, existing.AssigneeID, "updated")
 
+	// Workflow integration: Check if task is associated with a workflow node
+	// Look up the workflow node by task ID
+	var workflowNode models.WorkflowNode
+	if err := r.DB.Where("task_id = ?", existing.ID).First(&workflowNode).Error; err == nil && workflowNode.ID > 0 {
+		// Get the workflow engine
+		workflowEngine := workflow.NewEngine(r.DB)
+
+		// Check if status changed to "done" (status code "done")
+		if input.StatusID != nil && *input.StatusID != "" {
+			var newStatus models.TaskStatus
+			if err := r.DB.First(&newStatus, existing.StatusID).Error; err == nil && newStatus.Code == "done" {
+				// Mark node as completed
+				if err := workflowEngine.MarkNodeCompleted(workflowNode.ID); err != nil {
+					fmt.Printf("ERROR: workflow_node_complete node_id=%d error=%v\n", workflowNode.ID, err)
+				}
+			} else if input.StatusID != nil && *input.StatusID != "" {
+				// Check for cancelled/failed status
+				if err := r.DB.First(&newStatus, existing.StatusID).Error; err == nil {
+					if newStatus.Code == "cancelled" || newStatus.Code == "failed" {
+						// Mark node as failed
+						if err := workflowEngine.MarkNodeFailed(workflowNode.ID, fmt.Sprintf("task status changed to %s", newStatus.Code)); err != nil {
+							fmt.Printf("ERROR: workflow_node_fail node_id=%d error=%v\n", workflowNode.ID, err)
+						}
+					}
+				}
+			}
+		}
+	}
+
 	logging.LogMutation(ctx, "UPDATE", "TASK", existing.Title)
 	return graphqlTask, nil
 }
@@ -951,6 +981,258 @@ func (r *mutationResolver) DeleteMessage(ctx context.Context, id string) (bool,
 	return result.RowsAffected > 0, nil
 }
 
+// CreateWorkflowTemplate is the resolver for the createWorkflowTemplate field.
+func (r *mutationResolver) CreateWorkflowTemplate(ctx context.Context, input model.NewWorkflowTemplate) (*model.WorkflowTemplate, error) {
+	// Auth check
+	if !auth.IsAuthenticated(ctx) {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+	if !auth.HasPermission(ctx, "workflow:create") {
+		return nil, errors.New("unauthorized: missing workflow:create permission")
+	}
+
+	currentUser, err := auth.CurrentUser(ctx)
+	if err != nil {
+		return nil, fmt.Errorf("failed to get current user: %w", err)
+	}
+
+	var isActive bool
+	if input.IsActive != nil {
+		isActive = *input.IsActive
+	} else {
+		isActive = true
+	}
+
+	workflowTemplate := models.WorkflowTemplate{
+		Name: input.Name,
+		Description: func() string {
+			if input.Description != nil {
+				return *input.Description
+			}
+			return ""
+		}(),
+		Definition:  input.Definition,
+		IsActive:    isActive,
+		CreatedByID: currentUser.ID,
+	}
+
+	if err := r.DB.Create(&workflowTemplate).Error; err != nil {
+		return nil, fmt.Errorf("failed to create workflow template: %w", err)
+	}
+
+	// Reload with associations
+	r.DB.Preload("CreatedBy").First(&workflowTemplate, workflowTemplate.ID)
+
+	logging.LogMutation(ctx, "CREATE", "WORKFLOW_TEMPLATE", workflowTemplate.Name)
+	return convertWorkflowTemplate(workflowTemplate), nil
+}
+
+// UpdateWorkflowTemplate is the resolver for the updateWorkflowTemplate field.
+func (r *mutationResolver) UpdateWorkflowTemplate(ctx context.Context, id string, input model.UpdateWorkflowTemplateInput) (*model.WorkflowTemplate, error) {
+	// Auth check
+	if !auth.IsAuthenticated(ctx) {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+	if !auth.HasPermission(ctx, "workflow:manage") {
+		return nil, errors.New("unauthorized: missing workflow:manage permission")
+	}
+
+	templateID, err := toID(id)
+	if err != nil {
+		return nil, fmt.Errorf("invalid workflow template ID: %w", err)
+	}
+
+	var existing models.WorkflowTemplate
+	if err := r.DB.First(&existing, templateID).Error; err != nil {
+		return nil, fmt.Errorf("workflow template not found: %w", err)
+	}
+
+	if input.Name != nil {
+		existing.Name = *input.Name
+	}
+	if input.Description != nil {
+		existing.Description = *input.Description
+	}
+	if input.Definition != nil {
+		existing.Definition = *input.Definition
+	}
+	if input.IsActive != nil {
+		existing.IsActive = *input.IsActive
+	}
+
+	if err := r.DB.Save(&existing).Error; err != nil {
+		return nil, fmt.Errorf("failed to update workflow template: %w", err)
+	}
+
+	// Reload with associations for response
+	r.DB.Preload("CreatedBy").First(&existing, existing.ID)
+
+	logging.LogMutation(ctx, "UPDATE", "WORKFLOW_TEMPLATE", existing.Name)
+	return convertWorkflowTemplate(existing), nil
+}
+
+// DeleteWorkflowTemplate is the resolver for the deleteWorkflowTemplate field.
+func (r *mutationResolver) DeleteWorkflowTemplate(ctx context.Context, id string) (bool, error) {
+	// Auth check
+	if !auth.IsAuthenticated(ctx) {
+		return false, errors.New("unauthorized: authentication required")
+	}
+	if !auth.HasPermission(ctx, "workflow:manage") {
+		return false, errors.New("unauthorized: missing workflow:manage permission")
+	}
+
+	templateID, err := toID(id)
+	if err != nil {
+		return false, fmt.Errorf("invalid workflow template ID: %w", err)
+	}
+
+	result := r.DB.Delete(&models.WorkflowTemplate{}, templateID)
+	if result.Error != nil {
+		return false, fmt.Errorf("failed to delete workflow template: %w", result.Error)
+	}
+
+	logging.LogMutation(ctx, "DELETE", "WORKFLOW_TEMPLATE", id)
+	return result.RowsAffected > 0, nil
+}
+
+// StartWorkflow is the resolver for the startWorkflow field.
+func (r *mutationResolver) StartWorkflow(ctx context.Context, templateID string, input model.StartWorkflowInput) (*model.WorkflowInstance, error) {
+	// Auth check
+	if !auth.IsAuthenticated(ctx) {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+	if !auth.HasPermission(ctx, "workflow:start") {
+		return nil, errors.New("unauthorized: missing workflow:start permission")
+	}
+
+	templateIDUint, err := toID(templateID)
+	if err != nil {
+		return nil, fmt.Errorf("invalid workflow template ID: %w", err)
+	}
+
+	var template models.WorkflowTemplate
+	if err := r.DB.First(&template, templateIDUint).Error; err != nil {
+		return nil, fmt.Errorf("workflow template not found: %w", err)
+	}
+
+	// Parse the workflow definition (JSON DAG)
+	workflowEngine := workflow.NewEngine(r.DB)
+	instance, nodes, err := workflowEngine.CreateInstance(template, workflow.StartWorkflowInput{
+		ServiceID: func() *uint {
+			if input.ServiceID != nil {
+				id, _ := toID(*input.ServiceID)
+				return &id
+			}
+			return nil
+		}(),
+		Context: func() string {
+			if input.Context != nil {
+				return *input.Context
+			}
+			return ""
+		}(),
+	})
+	if err != nil {
+		return nil, fmt.Errorf("failed to create workflow instance: %w", err)
+	}
+
+	// Save the workflow instance
+	if err := r.DB.Create(&instance).Error; err != nil {
+		return nil, fmt.Errorf("failed to save workflow instance: %w", err)
+	}
+
+	// Save the workflow nodes
+	for i := range nodes {
+		nodes[i].WorkflowInstanceID = instance.ID
+		if err := r.DB.Create(&nodes[i]).Error; err != nil {
+			return nil, fmt.Errorf("failed to save workflow node: %w", err)
+		}
+	}
+
+	// Reload with associations
+	r.DB.Preload("WorkflowTemplate").Preload("Service").First(&instance, instance.ID)
+
+	logging.LogMutation(ctx, "START_WORKFLOW", "WORKFLOW_INSTANCE", fmt.Sprintf("template=%s", template.Name))
+	return convertWorkflowInstance(instance), nil
+}
+
+// CancelWorkflow is the resolver for the cancelWorkflow field.
+func (r *mutationResolver) CancelWorkflow(ctx context.Context, id string) (*model.WorkflowInstance, error) {
+	// Auth check
+	if !auth.IsAuthenticated(ctx) {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+	if !auth.HasPermission(ctx, "workflow:manage") {
+		return nil, errors.New("unauthorized: missing workflow:manage permission")
+	}
+
+	instanceID, err := toID(id)
+	if err != nil {
+		return nil, fmt.Errorf("invalid workflow instance ID: %w", err)
+	}
+
+	var instance models.WorkflowInstance
+	if err := r.DB.First(&instance, instanceID).Error; err != nil {
+		return nil, fmt.Errorf("workflow instance not found: %w", err)
+	}
+
+	instance.Status = "failed"
+	now := time.Now()
+	instance.CompletedAt = &now
+
+	if err := r.DB.Save(&instance).Error; err != nil {
+		return nil, fmt.Errorf("failed to cancel workflow: %w", err)
+	}
+
+	// Update all running nodes to failed
+	r.DB.Model(&models.WorkflowNode{}).
+		Where("workflow_instance_id = ? AND status = ?", instanceID, "running").
+		Update("status", "failed")
+
+	// Reload with associations for response
+	r.DB.Preload("WorkflowTemplate").Preload("Service").First(&instance, instance.ID)
+
+	logging.LogMutation(ctx, "CANCEL_WORKFLOW", "WORKFLOW_INSTANCE", id)
+	return convertWorkflowInstance(instance), nil
+}
+
+// RetryWorkflowNode is the resolver for the retryWorkflowNode field.
+func (r *mutationResolver) RetryWorkflowNode(ctx context.Context, nodeID string) (*model.WorkflowNode, error) {
+	// Auth check
+	if !auth.IsAuthenticated(ctx) {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+	if !auth.HasPermission(ctx, "workflow:intervene") {
+		return nil, errors.New("unauthorized: missing workflow:intervene permission")
+	}
+
+	nodeIDUint, err := toID(nodeID)
+	if err != nil {
+		return nil, fmt.Errorf("invalid workflow node ID: %w", err)
+	}
+
+	var node models.WorkflowNode
+	if err := r.DB.Preload("WorkflowInstance").First(&node, nodeIDUint).Error; err != nil {
+		return nil, fmt.Errorf("workflow node not found: %w", err)
+	}
+
+	// Reset node status to pending and clear task association
+	node.Status = "pending"
+	node.TaskID = nil
+	node.RetryCount++
+	node.OutputData = ""
+
+	if err := r.DB.Save(&node).Error; err != nil {
+		return nil, fmt.Errorf("failed to retry workflow node: %w", err)
+	}
+
+	// Reload with task for response
+	r.DB.Preload("Task").First(&node, node.ID)
+
+	logging.LogMutation(ctx, "RETRY_NODE", "WORKFLOW_NODE", nodeID)
+	return convertWorkflowNode(node), nil
+}
+
 // Users is the resolver for the users field.
 func (r *queryResolver) Users(ctx context.Context) ([]*model.User, error) {
 	// Auth check
@@ -1239,6 +1521,90 @@ func (r *queryResolver) Message(ctx context.Context, id string) (*model.Message,
 	return convertMessage(message), nil
 }
 
+// WorkflowTemplates is the resolver for the workflowTemplates field.
+func (r *queryResolver) WorkflowTemplates(ctx context.Context) ([]*model.WorkflowTemplate, error) {
+	// Auth check
+	if !auth.IsAuthenticated(ctx) {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+	if !auth.HasPermission(ctx, "workflow:view") {
+		return nil, errors.New("unauthorized: missing workflow:view permission")
+	}
+
+	var templates []models.WorkflowTemplate
+	if err := r.DB.Preload("CreatedBy").Find(&templates).Error; err != nil {
+		return nil, fmt.Errorf("failed to fetch workflow templates: %w", err)
+	}
+	logging.LogQuery(ctx, "WORKFLOW_TEMPLATES", "all")
+	return convertWorkflowTemplates(templates), nil
+}
+
+// WorkflowTemplate is the resolver for the workflowTemplate field.
+func (r *queryResolver) WorkflowTemplate(ctx context.Context, id string) (*model.WorkflowTemplate, error) {
+	// Auth check
+	if !auth.IsAuthenticated(ctx) {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+	if !auth.HasPermission(ctx, "workflow:view") {
+		return nil, errors.New("unauthorized: missing workflow:view permission")
+	}
+
+	templateID, err := toID(id)
+	if err != nil {
+		return nil, fmt.Errorf("invalid workflow template ID: %w", err)
+	}
+
+	var template models.WorkflowTemplate
+	if err := r.DB.Preload("CreatedBy").First(&template, templateID).Error; err != nil {
+		return nil, fmt.Errorf("workflow template not found: %w", err)
+	}
+
+	logging.LogQuery(ctx, "WORKFLOW_TEMPLATE", id)
+	return convertWorkflowTemplate(template), nil
+}
+
+// WorkflowInstances is the resolver for the workflowInstances field.
+func (r *queryResolver) WorkflowInstances(ctx context.Context) ([]*model.WorkflowInstance, error) {
+	// Auth check
+	if !auth.IsAuthenticated(ctx) {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+	if !auth.HasPermission(ctx, "workflow:view") {
+		return nil, errors.New("unauthorized: missing workflow:view permission")
+	}
+
+	var instances []models.WorkflowInstance
+	if err := r.DB.Preload("WorkflowTemplate").Preload("Service").Find(&instances).Error; err != nil {
+		return nil, fmt.Errorf("failed to fetch workflow instances: %w", err)
+	}
+	logging.LogQuery(ctx, "WORKFLOW_INSTANCES", "all")
+	return convertWorkflowInstances(instances), nil
+}
+
+// WorkflowInstance is the resolver for the workflowInstance field.
+func (r *queryResolver) WorkflowInstance(ctx context.Context, id string) (*model.WorkflowInstance, error) {
+	// Auth check
+	if !auth.IsAuthenticated(ctx) {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+	if !auth.HasPermission(ctx, "workflow:view") {
+		return nil, errors.New("unauthorized: missing workflow:view permission")
+	}
+
+	instanceID, err := toID(id)
+	if err != nil {
+		return nil, fmt.Errorf("invalid workflow instance ID: %w", err)
+	}
+
+	var instance models.WorkflowInstance
+	if err := r.DB.Preload("WorkflowTemplate").Preload("Service").First(&instance, instanceID).Error; err != nil {
+		return nil, fmt.Errorf("workflow instance not found: %w", err)
+	}
+
+	logging.LogQuery(ctx, "WORKFLOW_INSTANCE", id)
+	return convertWorkflowInstance(instance), nil
+}
+
 // TaskCreated is the resolver for the taskCreated field.
 // Users only receive events for tasks where they are the assignee.
 func (r *subscriptionResolver) TaskCreated(ctx context.Context) (<-chan *model.Task, error) {

+ 64 - 1
init_prod.sql

@@ -121,6 +121,56 @@ CREATE TABLE IF NOT EXISTS messages (
     updated_at DATETIME
 );
 
+-- Workflow Templates table
+CREATE TABLE IF NOT EXISTS workflow_templates (
+    id INTEGER PRIMARY KEY AUTOINCREMENT,
+    name TEXT NOT NULL UNIQUE,
+    description TEXT,
+    definition TEXT NOT NULL,
+    is_active INTEGER DEFAULT 1,
+    created_by_id INTEGER,
+    created_at DATETIME,
+    updated_at DATETIME
+);
+
+-- Workflow Instances table
+CREATE TABLE IF NOT EXISTS workflow_instances (
+    id INTEGER PRIMARY KEY AUTOINCREMENT,
+    workflow_template_id INTEGER NOT NULL,
+    status TEXT NOT NULL,
+    context TEXT,
+    service_id INTEGER,
+    created_at DATETIME,
+    updated_at DATETIME,
+    completed_at DATETIME
+);
+
+-- Workflow Nodes table
+CREATE TABLE IF NOT EXISTS workflow_nodes (
+    id INTEGER PRIMARY KEY AUTOINCREMENT,
+    workflow_instance_id INTEGER NOT NULL,
+    node_key TEXT NOT NULL,
+    node_type TEXT NOT NULL,
+    status TEXT NOT NULL,
+    task_id INTEGER,
+    retry_count INTEGER DEFAULT 0,
+    input_data TEXT,
+    output_data TEXT,
+    created_at DATETIME,
+    updated_at DATETIME,
+    started_at DATETIME,
+    completed_at DATETIME
+);
+
+-- Workflow Edges table
+CREATE TABLE IF NOT EXISTS workflow_edges (
+    id INTEGER PRIMARY KEY AUTOINCREMENT,
+    workflow_instance_id INTEGER NOT NULL,
+    from_node_id INTEGER NOT NULL,
+    to_node_id INTEGER NOT NULL,
+    condition TEXT
+);
+
 -- ============================================
 -- INSERT STATEMENTS
 -- ============================================
@@ -162,7 +212,12 @@ INSERT INTO permissions (id, code, description) VALUES
   (33, 'taskstatus:create', 'Create task statuses'),
   (34, 'taskstatus:read', 'Read task statuses'),
   (35, 'taskstatus:update', 'Update task statuses'),
-  (36, 'taskstatus:delete', 'Delete task statuses');
+  (36, 'taskstatus:delete', 'Delete task statuses'),
+  (37, 'workflow:create', 'Create workflow templates'),
+  (38, 'workflow:start', 'Start workflow instances'),
+  (39, 'workflow:manage', 'Manage workflow templates'),
+  (40, 'workflow:intervene', 'Manual intervention for failed workflow nodes'),
+  (41, 'workflow:view', 'View workflows and instances');
 
 -- Roles
 INSERT INTO roles (id, name, description) VALUES 
@@ -183,11 +238,19 @@ INSERT INTO role_permissions (role_id, permission_id) VALUES
   (2, 29), (2, 30), (2, 31), (2, 32), -- message:*
   (2, 33), (2, 34), (2, 35), (2, 36); -- taskstatus:*
 
+-- Manager role workflow permissions (idempotent for existing databases)
+INSERT OR IGNORE INTO role_permissions (role_id, permission_id) VALUES
+  (2, 37), (2, 38), (2, 39), (2, 40), (2, 41); -- workflow:*
+
 -- User role permissions (read-only + create notes/messages)
 INSERT INTO role_permissions (role_id, permission_id) VALUES
   (3, 2), (3, 6), (3, 10), (3, 14), (3, 18), (3, 22), (3, 26), (3, 30), (3, 34), -- read permissions
   (3, 21), (3, 29); -- create notes and messages
 
+-- User role workflow permissions (idempotent for existing databases)
+INSERT OR IGNORE INTO role_permissions (role_id, permission_id) VALUES
+  (3, 38), (3, 41); -- workflow:start, workflow:view
+
 -- Admin user (password: secret123)
 -- bcrypt hash generated with cost 10
 INSERT INTO users (id, email, password, created_at, updated_at) VALUES 

+ 1 - 1
mcp/testdata/snapshots/TestMCP_Introspect-QueryType

@@ -4,7 +4,7 @@ introspect_query
   "result": {
     "content": [
       {
-        "text": "Type: Query (Object)\n\nFields:\n  users: [User!]\n  user(id: ID!): User\n  notes: [Note!]\n  note(id: ID!): Note\n  roles: [Role!]\n  role(id: ID!): Role\n  permissions: [Permission!]\n  permission(id: ID!): Permission\n  services: [Service!]\n  service(id: ID!): Service\n  tasks: [Task!]\n  task(id: ID!): Task\n  taskStatuses: [TaskStatus!]\n  taskStatus(id: ID!): TaskStatus\n  messages: [Message!]\n  message(id: ID!): Message\n  __schema: __Schema!\n  __type(name: String!): __Type\n",
+        "text": "Type: Query (Object)\n\nFields:\n  users: [User!]\n  user(id: ID!): User\n  notes: [Note!]\n  note(id: ID!): Note\n  roles: [Role!]\n  role(id: ID!): Role\n  permissions: [Permission!]\n  permission(id: ID!): Permission\n  services: [Service!]\n  service(id: ID!): Service\n  tasks: [Task!]\n  task(id: ID!): Task\n  taskStatuses: [TaskStatus!]\n  taskStatus(id: ID!): TaskStatus\n  messages: [Message!]\n  message(id: ID!): Message\n  workflowTemplates: [WorkflowTemplate!]\n  workflowTemplate(id: ID!): WorkflowTemplate\n  workflowInstances: [WorkflowInstance!]\n  workflowInstance(id: ID!): WorkflowInstance\n  __schema: __Schema!\n  __type(name: String!): __Type\n",
         "type": "text"
       }
     ]

+ 1 - 1
mcp/testdata/snapshots/TestMCP_Query-Users

@@ -4,7 +4,7 @@ query_users
   "result": {
     "content": [
       {
-        "text": "{\"users\":[{\"email\":\"admin@example.com\",\"roles\":[]}]}",
+        "text": "{\"users\":[{\"email\":\"admin@example.com\",\"roles\":[{\"description\":\"Administrator with full access\",\"name\":\"admin\",\"permissions\":[]}]}]}",
         "type": "text"
       }
     ]

+ 62 - 0
models/workflow.go

@@ -0,0 +1,62 @@
+package models
+
+import (
+	"time"
+)
+
+// WorkflowTemplate - JSON-configurable workflow definition (admin-managed)
+type WorkflowTemplate struct {
+	ID          uint   `gorm:"primaryKey"`
+	Name        string `gorm:"size:200;not null;uniqueIndex"`
+	Description string `gorm:"type:text"`
+	Definition  string `gorm:"type:text;not null"` // JSON DAG definition
+	IsActive    bool   `gorm:"default:true"`
+	CreatedByID uint
+	CreatedBy   User `gorm:"foreignKey:CreatedByID"`
+	CreatedAt   time.Time
+	UpdatedAt   time.Time
+}
+
+// WorkflowInstance - a running instance of a workflow (many-to-one with Service)
+type WorkflowInstance struct {
+	ID                 uint `gorm:"primaryKey"`
+	WorkflowTemplateID uint
+	WorkflowTemplate   WorkflowTemplate `gorm:"foreignKey:WorkflowTemplateID"`
+	Status             string           `gorm:"size:50;not null"` // "running", "completed", "failed", "paused"
+	Context            string           `gorm:"type:text"`        // JSON workflow variables
+	ServiceID          *uint            // Many-to-one: multiple workflows can belong to one service
+	Service            *Service         `gorm:"foreignKey:ServiceID"`
+	CreatedAt          time.Time
+	UpdatedAt          time.Time
+	CompletedAt        *time.Time
+}
+
+// WorkflowNode - represents a task node in a workflow instance
+type WorkflowNode struct {
+	ID                 uint `gorm:"primaryKey"`
+	WorkflowInstanceID uint
+	WorkflowInstance   WorkflowInstance `gorm:"foreignKey:WorkflowInstanceID"`
+	NodeKey            string           `gorm:"size:100;not null"`
+	NodeType           string           `gorm:"size:50;not null"` // "task", "condition", "parallel", "join"
+	Status             string           `gorm:"size:50;not null"` // "pending", "ready", "running", "completed", "skipped", "failed"
+	TaskID             *uint            // Link to created task (assignment is on Task)
+	Task               *Task            `gorm:"foreignKey:TaskID"`
+	RetryCount         int              `gorm:"default:0"`
+	InputData          string           `gorm:"type:text"`
+	OutputData         string           `gorm:"type:text"`
+	CreatedAt          time.Time
+	UpdatedAt          time.Time
+	StartedAt          *time.Time
+	CompletedAt        *time.Time
+}
+
+// WorkflowEdge - dependency between nodes
+type WorkflowEdge struct {
+	ID                 uint `gorm:"primaryKey"`
+	WorkflowInstanceID uint
+	FromNodeID         uint
+	FromNode           WorkflowNode `gorm:"foreignKey:FromNodeID"`
+	ToNodeID           uint
+	ToNode             WorkflowNode `gorm:"foreignKey:ToNodeID"`
+	Condition          string       `gorm:"type:text"`
+}

+ 1 - 1
server.go

@@ -40,7 +40,7 @@ func main() {
 	}
 
 	// Run auto-migration for all models
-	err = db.AutoMigrate(&models.User{}, &models.Role{}, &models.Permission{}, &models.Service{}, &models.Task{}, &models.TaskStatus{}, &models.Message{}, &models.Note{})
+	err = db.AutoMigrate(&models.User{}, &models.Role{}, &models.Permission{}, &models.Service{}, &models.Task{}, &models.TaskStatus{}, &models.Message{}, &models.Note{}, &models.WorkflowTemplate{}, &models.WorkflowInstance{}, &models.WorkflowNode{}, &models.WorkflowEdge{})
 	if err != nil {
 		log.Fatal("failed to migrate database:", err)
 	}

+ 48 - 0
workflow/config.go

@@ -0,0 +1,48 @@
+package workflow
+
+import (
+	"fmt"
+	"os"
+)
+
+// Config holds workflow engine configuration
+type Config struct {
+	// MaxRetries is the maximum number of retries for a failed node
+	MaxRetries int
+	// RetryDelay is the delay between retries in seconds
+	RetryDelay int
+}
+
+// DefaultConfig returns the default workflow configuration
+func DefaultConfig() *Config {
+	return &Config{
+		MaxRetries: 3,
+		RetryDelay: 60,
+	}
+}
+
+// LoadConfig loads the workflow configuration from environment variables
+func LoadConfig() *Config {
+	config := DefaultConfig()
+
+	// Override with environment variables if set
+	if maxRetries := os.Getenv("WORKFLOW_MAX_RETRIES"); maxRetries != "" {
+		config.MaxRetries = parseInt(maxRetries, config.MaxRetries)
+	}
+
+	if retryDelay := os.Getenv("WORKFLOW_RETRY_DELAY"); retryDelay != "" {
+		config.RetryDelay = parseInt(retryDelay, config.RetryDelay)
+	}
+
+	return config
+}
+
+// parseInt parses a string to int, returning default if parsing fails
+func parseInt(s string, defaultValue int) int {
+	var result int
+	_, err := fmt.Sscanf(s, "%d", &result)
+	if err != nil {
+		return defaultValue
+	}
+	return result
+}

+ 426 - 0
workflow/engine.go

@@ -0,0 +1,426 @@
+package workflow
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"time"
+
+	"gogs.dmsc.dev/arp/models"
+	"gorm.io/gorm"
+)
+
+// Engine represents the workflow execution engine
+type Engine struct {
+	db     *gorm.DB
+	config *Config
+	ctx    context.Context
+}
+
+// NewEngine creates a new workflow engine
+func NewEngine(db *gorm.DB) *Engine {
+	return &Engine{
+		db:     db,
+		config: LoadConfig(),
+		ctx:    context.Background(),
+	}
+}
+
+// StartWorkflowInput represents the input for starting a workflow
+type StartWorkflowInput struct {
+	ServiceID *uint
+	Context   string
+}
+
+// CreateInstance creates a new workflow instance from a template
+func (e *Engine) CreateInstance(template models.WorkflowTemplate, input StartWorkflowInput) (models.WorkflowInstance, []models.WorkflowNode, error) {
+	// Parse the workflow definition
+	def, err := ParseWorkflowDefinition(template.Definition)
+	if err != nil {
+		return models.WorkflowInstance{}, nil, fmt.Errorf("failed to parse workflow definition: %w", err)
+	}
+
+	// Validate the workflow definition
+	if err := def.Validate(); err != nil {
+		return models.WorkflowInstance{}, nil, fmt.Errorf("workflow definition invalid: %w", err)
+	}
+
+	// Create workflow instance
+	instance := models.WorkflowInstance{
+		WorkflowTemplateID: template.ID,
+		Status:             "running",
+		Context:            input.Context,
+		ServiceID:          input.ServiceID,
+	}
+
+	// Create nodes for each node in the definition
+	var nodes []models.WorkflowNode
+	for key, nodeDef := range def.Nodes {
+		// Determine initial status based on dependencies
+		status := NodeStatusPending
+		if len(nodeDef.DependsOn) == 0 {
+			status = NodeStatusReady
+		}
+
+		// Create input data for the node
+		inputData, _ := json.Marshal(map[string]interface{}{
+			"key":       key,
+			"type":      nodeDef.Type,
+			"title":     nodeDef.Title,
+			"content":   nodeDef.Content,
+			"dependsOn": nodeDef.DependsOn,
+		})
+
+		node := models.WorkflowNode{
+			WorkflowInstanceID: 0, // Will be set after instance is created
+			NodeKey:            key,
+			NodeType:           string(nodeDef.Type),
+			Status:             string(status),
+			InputData:          string(inputData),
+			RetryCount:         0,
+		}
+
+		// Set assignee if specified
+		if nodeDef.Assignee != nil {
+			// Store assignee info in output data for later task creation
+			outputData, _ := json.Marshal(map[string]interface{}{
+				"assignee": nodeDef.Assignee,
+			})
+			node.OutputData = string(outputData)
+		}
+
+		nodes = append(nodes, node)
+	}
+
+	return instance, nodes, nil
+}
+
+// EvaluateDependencies evaluates which nodes should be marked as ready
+func (e *Engine) EvaluateDependencies(instanceID uint, executedNodes map[string]bool) error {
+	// Get all nodes for this instance
+	var nodes []models.WorkflowNode
+	if err := e.db.Where("workflow_instance_id = ?", instanceID).Find(&nodes).Error; err != nil {
+		return fmt.Errorf("failed to fetch workflow nodes: %w", err)
+	}
+
+	// Update node statuses based on executed nodes
+	for i := range nodes {
+		node := &nodes[i]
+
+		// Skip if already completed, failed, or skipped
+		if node.Status == string(NodeStatusCompleted) ||
+			node.Status == string(NodeStatusFailed) ||
+			node.Status == string(NodeStatusSkipped) {
+			continue
+		}
+
+		// Check if all dependencies are satisfied
+		if executedNodes[node.NodeKey] {
+			continue // Already executed
+		}
+
+		// Get dependencies for this node
+		var nodeDef WorkflowNode
+		if err := json.Unmarshal([]byte(node.InputData), &nodeDef); err != nil {
+			continue
+		}
+
+		allDepsSatisfied := true
+		for _, depKey := range nodeDef.DependsOn {
+			if !executedNodes[depKey] {
+				allDepsSatisfied = false
+				break
+			}
+		}
+
+		if allDepsSatisfied && node.Status == string(NodeStatusPending) {
+			node.Status = string(NodeStatusReady)
+			if err := e.db.Save(node).Error; err != nil {
+				return fmt.Errorf("failed to update node status: %w", err)
+			}
+		}
+	}
+
+	return nil
+}
+
+// CreateTaskForNode creates a task for a workflow node
+func (e *Engine) CreateTaskForNode(node *models.WorkflowNode, createdByID uint) (*models.Task, error) {
+	// Parse node input data to get task details
+	var nodeDef WorkflowNode
+	if err := json.Unmarshal([]byte(node.InputData), &nodeDef); err != nil {
+		return nil, fmt.Errorf("failed to parse node input data: %w", err)
+	}
+
+	// Parse output data to get assignee
+	var outputData map[string]interface{}
+	if node.OutputData != "" {
+		json.Unmarshal([]byte(node.OutputData), &outputData)
+	}
+
+	// Determine assignee
+	var assigneeID *uint
+	if outputData != nil && outputData["assignee"] != nil {
+		// Assignee is stored as a string (user ID or email)
+		assigneeStr := fmt.Sprintf("%v", outputData["assignee"])
+		// Try to parse as uint first
+		if id, err := parseUint(assigneeStr); err == nil {
+			assigneeID = &id
+		} else {
+			// Look up user by email
+			var user models.User
+			if err := e.db.Where("email = ?", assigneeStr).First(&user).Error; err == nil {
+				uid := uint(user.ID)
+				assigneeID = &uid
+			}
+		}
+	}
+
+	// Create the task
+	task := models.Task{
+		Title:       nodeDef.Title,
+		Content:     nodeDef.Content,
+		CreatedByID: createdByID,
+		AssigneeID:  assigneeID,
+		Priority:    "medium",
+	}
+
+	if err := e.db.Create(&task).Error; err != nil {
+		return nil, fmt.Errorf("failed to create task: %w", err)
+	}
+
+	// Update node with task ID
+	node.TaskID = &task.ID
+	if err := e.db.Save(node).Error; err != nil {
+		return nil, fmt.Errorf("failed to update node with task ID: %w", err)
+	}
+
+	return &task, nil
+}
+
+// MarkNodeCompleted marks a node as completed and evaluates downstream nodes
+func (e *Engine) MarkNodeCompleted(nodeID uint) error {
+	var node models.WorkflowNode
+	if err := e.db.Preload("WorkflowInstance").First(&node, nodeID).Error; err != nil {
+		return fmt.Errorf("workflow node not found: %w", err)
+	}
+
+	node.Status = string(NodeStatusCompleted)
+	now := time.Now()
+	node.CompletedAt = &now
+
+	if err := e.db.Save(&node).Error; err != nil {
+		return fmt.Errorf("failed to mark node as completed: %w", err)
+	}
+
+	// Evaluate downstream nodes
+	executedNodes := e.getExecutedNodes(node.WorkflowInstanceID)
+	if err := e.EvaluateDependencies(node.WorkflowInstanceID, executedNodes); err != nil {
+		return err
+	}
+
+	// Check if workflow is completed
+	return e.CheckWorkflowCompletion(node.WorkflowInstanceID)
+}
+
+// MarkNodeFailed marks a node as failed and handles retry logic
+func (e *Engine) MarkNodeFailed(nodeID uint, reason string) error {
+	var node models.WorkflowNode
+	if err := e.db.Preload("WorkflowInstance").First(&node, nodeID).Error; err != nil {
+		return fmt.Errorf("workflow node not found: %w", err)
+	}
+
+	node.Status = string(NodeStatusFailed)
+	now := time.Now()
+	node.CompletedAt = &now
+
+	// Check if we should retry
+	if node.RetryCount < e.config.MaxRetries {
+		// Reset to pending for retry
+		node.Status = string(NodeStatusPending)
+		node.CompletedAt = nil
+		node.RetryCount++
+		// Clear output data for retry
+		node.OutputData = ""
+
+		if err := e.db.Save(&node).Error; err != nil {
+			return fmt.Errorf("failed to reset node for retry: %w", err)
+		}
+		return nil
+	}
+
+	// Max retries exceeded, mark as failed
+	if err := e.db.Save(&node).Error; err != nil {
+		return fmt.Errorf("failed to mark node as failed: %w", err)
+	}
+
+	// Update workflow instance status
+	var instance models.WorkflowInstance
+	if err := e.db.First(&instance, node.WorkflowInstanceID).Error; err != nil {
+		return fmt.Errorf("workflow instance not found: %w", err)
+	}
+
+	instance.Status = "failed"
+	now = time.Now()
+	instance.CompletedAt = &now
+
+	if err := e.db.Save(&instance).Error; err != nil {
+		return fmt.Errorf("failed to update workflow instance status: %w", err)
+	}
+
+	return nil
+}
+
+// getExecutedNodes returns a map of node keys that have been executed
+func (e *Engine) getExecutedNodes(instanceID uint) map[string]bool {
+	result := make(map[string]bool)
+
+	var nodes []models.WorkflowNode
+	if err := e.db.Where("workflow_instance_id = ? AND status IN ?", instanceID, []string{
+		string(NodeStatusCompleted), string(NodeStatusFailed), string(NodeStatusSkipped),
+	}).Find(&nodes).Error; err != nil {
+		return result
+	}
+
+	for _, node := range nodes {
+		result[node.NodeKey] = true
+	}
+
+	return result
+}
+
+// CheckWorkflowCompletion checks if the workflow instance is completed
+func (e *Engine) CheckWorkflowCompletion(instanceID uint) error {
+	var instance models.WorkflowInstance
+	if err := e.db.First(&instance, instanceID).Error; err != nil {
+		return fmt.Errorf("workflow instance not found: %w", err)
+	}
+
+	// Get all nodes for this instance
+	var allNodes []models.WorkflowNode
+	if err := e.db.Where("workflow_instance_id = ?", instanceID).Find(&allNodes).Error; err != nil {
+		return fmt.Errorf("failed to fetch workflow nodes: %w", err)
+	}
+
+	// Check if all nodes are completed, failed, or skipped
+	allCompleted := true
+	for _, node := range allNodes {
+		if node.Status != string(NodeStatusCompleted) &&
+			node.Status != string(NodeStatusFailed) &&
+			node.Status != string(NodeStatusSkipped) {
+			allCompleted = false
+			break
+		}
+	}
+
+	if allCompleted {
+		instance.Status = "completed"
+		now := time.Now()
+		instance.CompletedAt = &now
+		if err := e.db.Save(&instance).Error; err != nil {
+			return fmt.Errorf("failed to update workflow instance status: %w", err)
+		}
+	}
+
+	return nil
+}
+
+// StartWorkflowInstance starts a workflow instance by creating tasks for root nodes
+func (e *Engine) StartWorkflowInstance(instanceID uint, createdByID uint) error {
+	// Get the workflow instance
+	var instance models.WorkflowInstance
+	if err := e.db.Preload("WorkflowTemplate").First(&instance, instanceID).Error; err != nil {
+		return fmt.Errorf("workflow instance not found: %w", err)
+	}
+
+	// Parse the workflow definition
+	def, err := ParseWorkflowDefinition(instance.WorkflowTemplate.Definition)
+	if err != nil {
+		return fmt.Errorf("failed to parse workflow definition: %w", err)
+	}
+
+	// Get root nodes (nodes with no dependencies)
+	rootNodes := def.GetRootNodes()
+
+	// Create tasks for root nodes
+	for _, nodeKey := range rootNodes {
+		// Find the node in the database
+		var node models.WorkflowNode
+		if err := e.db.Where("workflow_instance_id = ? AND node_key = ?", instanceID, nodeKey).First(&node).Error; err != nil {
+			continue
+		}
+
+		// Create task for this node
+		task, err := e.CreateTaskForNode(&node, createdByID)
+		if err != nil {
+			// Log error but continue with other nodes
+			continue
+		}
+
+		// Update node status to running
+		node.Status = string(NodeStatusRunning)
+		now := time.Now()
+		node.StartedAt = &now
+		if err := e.db.Save(&node).Error; err != nil {
+			return fmt.Errorf("failed to update node status: %w", err)
+		}
+
+		// Publish task event
+		// (This would be handled by the task creation event in the resolver)
+		_ = task
+	}
+
+	return nil
+}
+
+// StartWorkflowInstanceWithService starts a workflow instance with a service association
+func (e *Engine) StartWorkflowInstanceWithService(instanceID uint, createdByID uint, serviceID uint) error {
+	// Get the workflow instance
+	var instance models.WorkflowInstance
+	if err := e.db.Preload("WorkflowTemplate").First(&instance, instanceID).Error; err != nil {
+		return fmt.Errorf("workflow instance not found: %w", err)
+	}
+
+	// Parse the workflow definition
+	def, err := ParseWorkflowDefinition(instance.WorkflowTemplate.Definition)
+	if err != nil {
+		return fmt.Errorf("failed to parse workflow definition: %w", err)
+	}
+
+	// Get root nodes (nodes with no dependencies)
+	rootNodes := def.GetRootNodes()
+
+	// Create tasks for root nodes
+	for _, nodeKey := range rootNodes {
+		// Find the node in the database
+		var node models.WorkflowNode
+		if err := e.db.Where("workflow_instance_id = ? AND node_key = ?", instanceID, nodeKey).First(&node).Error; err != nil {
+			continue
+		}
+
+		// Create task for this node
+		task, err := e.CreateTaskForNode(&node, createdByID)
+		if err != nil {
+			continue
+		}
+
+		// Update node status to running
+		node.Status = string(NodeStatusRunning)
+		now := time.Now()
+		node.StartedAt = &now
+		if err := e.db.Save(&node).Error; err != nil {
+			return fmt.Errorf("failed to update node status: %w", err)
+		}
+
+		_ = task
+	}
+
+	return nil
+}
+
+// parseUint parses a string to uint
+func parseUint(s string) (uint, error) {
+	var result uint
+	_, err := fmt.Sscanf(s, "%d", &result)
+	return result, err
+}

+ 175 - 0
workflow/parser.go

@@ -0,0 +1,175 @@
+package workflow
+
+import (
+	"encoding/json"
+	"fmt"
+)
+
+// NodeType represents the type of a workflow node
+type NodeType string
+
+const (
+	NodeTypeTask      NodeType = "task"
+	NodeTypeCondition NodeType = "condition"
+	NodeTypeParallel  NodeType = "parallel"
+	NodeTypeJoin      NodeType = "join"
+	NodeTypeTrigger   NodeType = "trigger"
+)
+
+// NodeStatus represents the status of a workflow node
+type NodeStatus string
+
+const (
+	NodeStatusPending   NodeStatus = "pending"
+	NodeStatusReady     NodeStatus = "ready"
+	NodeStatusRunning   NodeStatus = "running"
+	NodeStatusCompleted NodeStatus = "completed"
+	NodeStatusSkipped   NodeStatus = "skipped"
+	NodeStatusFailed    NodeStatus = "failed"
+)
+
+// WorkflowNode represents a node in the workflow DAG
+type WorkflowNode struct {
+	Key       string   `json:"key"`
+	Type      NodeType `json:"type"`
+	Title     string   `json:"title"`
+	Content   string   `json:"content"`
+	Assignee  *string  `json:"assignee,omitempty"`
+	DependsOn []string `json:"dependsOn,omitempty"`
+	Config    string   `json:"config,omitempty"` // JSON config for task nodes
+}
+
+// WorkflowDefinition represents the complete workflow DAG definition
+type WorkflowDefinition struct {
+	Nodes map[string]*WorkflowNode `json:"nodes"`
+}
+
+// ParseWorkflowDefinition parses a JSON string into a WorkflowDefinition
+func ParseWorkflowDefinition(jsonDef string) (*WorkflowDefinition, error) {
+	var def WorkflowDefinition
+	if err := json.Unmarshal([]byte(jsonDef), &def); err != nil {
+		return nil, fmt.Errorf("failed to parse workflow definition: %w", err)
+	}
+	return &def, nil
+}
+
+// Validate validates the workflow definition
+func (d *WorkflowDefinition) Validate() error {
+	if d.Nodes == nil || len(d.Nodes) == 0 {
+		return fmt.Errorf("workflow definition must have at least one node")
+	}
+
+	// Validate each node
+	for key, node := range d.Nodes {
+		if node.Type == "" {
+			return fmt.Errorf("node '%s' must have a type", key)
+		}
+
+		// Validate dependsOn references exist
+		for _, depKey := range node.DependsOn {
+			if _, exists := d.Nodes[depKey]; !exists {
+				return fmt.Errorf("node '%s' depends on non-existent node '%s'", key, depKey)
+			}
+		}
+
+		// Validate node type
+		switch node.Type {
+		case NodeTypeTask, NodeTypeCondition, NodeTypeParallel, NodeTypeJoin, NodeTypeTrigger:
+			// Valid types
+		default:
+			return fmt.Errorf("node '%s' has invalid type: %s", key, node.Type)
+		}
+	}
+
+	// Check for cycles in the DAG
+	if err := d.validateNoCycles(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// validateNoCycles checks for cycles in the workflow DAG using DFS
+func (d *WorkflowDefinition) validateNoCycles() error {
+	visited := make(map[string]bool)
+	recStack := make(map[string]bool)
+
+	var dfs func(string) error
+	dfs = func(nodeKey string) error {
+		visited[nodeKey] = true
+		recStack[nodeKey] = true
+
+		node := d.Nodes[nodeKey]
+		for _, depKey := range node.DependsOn {
+			if !visited[depKey] {
+				if err := dfs(depKey); err != nil {
+					return err
+				}
+			} else if recStack[depKey] {
+				return fmt.Errorf("cycle detected: node '%s' creates a dependency loop", depKey)
+			}
+		}
+
+		recStack[nodeKey] = false
+		return nil
+	}
+
+	// Run DFS from each node
+	for key := range d.Nodes {
+		if !visited[key] {
+			if err := dfs(key); err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}
+
+// GetRootNodes returns nodes that have no dependencies (starting points)
+func (d *WorkflowDefinition) GetRootNodes() []string {
+	var roots []string
+	for key, node := range d.Nodes {
+		if len(node.DependsOn) == 0 {
+			roots = append(roots, key)
+		}
+	}
+	return roots
+}
+
+// GetDependentNodes returns nodes that depend on the given node
+func (d *WorkflowDefinition) GetDependentNodes(nodeKey string) []string {
+	var dependents []string
+	for key, node := range d.Nodes {
+		for _, dep := range node.DependsOn {
+			if dep == nodeKey {
+				dependents = append(dependents, key)
+				break
+			}
+		}
+	}
+	return dependents
+}
+
+// GetReadyNodes returns nodes that are ready to execute (all dependencies satisfied)
+func (d *WorkflowDefinition) GetReadyNodes(executedNodes map[string]bool) []string {
+	var ready []string
+	for key, node := range d.Nodes {
+		if executedNodes[key] {
+			continue // Already executed
+		}
+
+		allDepsSatisfied := true
+		for _, depKey := range node.DependsOn {
+			if !executedNodes[depKey] {
+				allDepsSatisfied = false
+				break
+			}
+		}
+
+		if allDepsSatisfied {
+			ready = append(ready, key)
+		}
+	}
+	return ready
+}

+ 369 - 0
workflow/parser_test.go

@@ -0,0 +1,369 @@
+package workflow
+
+import (
+	"testing"
+)
+
+func TestParseWorkflowDefinition(t *testing.T) {
+	tests := []struct {
+		name    string
+		jsonDef string
+		wantErr bool
+	}{
+		{
+			name: "valid simple workflow",
+			jsonDef: `{
+				"nodes": {
+					"start": {
+						"type": "task",
+						"title": "Start",
+						"content": "Start task",
+						"dependsOn": []
+					},
+					"end": {
+						"type": "task",
+						"title": "End",
+						"content": "End task",
+						"dependsOn": ["start"]
+					}
+				}
+			}`,
+			wantErr: false,
+		},
+		{
+			name: "valid parallel workflow",
+			jsonDef: `{
+				"nodes": {
+					"start": {
+						"type": "task",
+						"title": "Start",
+						"content": "Start task",
+						"dependsOn": []
+					},
+					"parallel1": {
+						"type": "task",
+						"title": "Parallel 1",
+						"content": "Parallel task 1",
+						"dependsOn": ["start"]
+					},
+					"parallel2": {
+						"type": "task",
+						"title": "Parallel 2",
+						"content": "Parallel task 2",
+						"dependsOn": ["start"]
+					},
+					"join": {
+						"type": "join",
+						"title": "Join",
+						"content": "Join parallel branches",
+						"dependsOn": ["parallel1", "parallel2"]
+					}
+				}
+			}`,
+			wantErr: false,
+		},
+		{
+			name:    "invalid JSON",
+			jsonDef: `{invalid}`,
+			wantErr: true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			def, err := ParseWorkflowDefinition(tt.jsonDef)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("ParseWorkflowDefinition() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !tt.wantErr && def == nil {
+				t.Error("ParseWorkflowDefinition() returned nil definition")
+			}
+		})
+	}
+}
+
+func TestParseAndValidate(t *testing.T) {
+	tests := []struct {
+		name    string
+		jsonDef string
+		wantErr bool
+	}{
+		{
+			name:    "empty definition",
+			jsonDef: `{}`,
+			wantErr: true,
+		},
+		{
+			name: "node missing type",
+			jsonDef: `{
+				"nodes": {
+					"start": {
+						"title": "Start"
+					}
+				}
+			}`,
+			wantErr: true,
+		},
+		{
+			name: "invalid node type",
+			jsonDef: `{
+				"nodes": {
+					"start": {
+						"type": "invalid",
+						"title": "Start"
+					}
+				}
+			}`,
+			wantErr: true,
+		},
+		{
+			name: "dependency on non-existent node",
+			jsonDef: `{
+				"nodes": {
+					"start": {
+						"type": "task",
+						"title": "Start",
+						"dependsOn": ["nonexistent"]
+					}
+				}
+			}`,
+			wantErr: true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			def, err := ParseWorkflowDefinition(tt.jsonDef)
+			if err != nil {
+				if !tt.wantErr {
+					t.Errorf("ParseWorkflowDefinition() unexpected error = %v", err)
+				}
+				return
+			}
+			// Parse succeeded, now validate
+			err = def.Validate()
+			if (err != nil) != tt.wantErr {
+				t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}
+
+func TestValidate(t *testing.T) {
+	tests := []struct {
+		name    string
+		jsonDef string
+		wantErr bool
+	}{
+		{
+			name: "valid workflow",
+			jsonDef: `{
+				"nodes": {
+					"start": {"type": "task", "title": "Start", "dependsOn": []},
+					"end": {"type": "task", "title": "End", "dependsOn": ["start"]}
+				}
+			}`,
+			wantErr: false,
+		},
+		{
+			name: "cycle detection",
+			jsonDef: `{
+				"nodes": {
+					"a": {"type": "task", "title": "A", "dependsOn": ["b"]},
+					"b": {"type": "task", "title": "B", "dependsOn": ["a"]}
+				}
+			}`,
+			wantErr: true,
+		},
+		{
+			name: "self-referential cycle",
+			jsonDef: `{
+				"nodes": {
+					"a": {"type": "task", "title": "A", "dependsOn": ["a"]}
+				}
+			}`,
+			wantErr: true,
+		},
+		{
+			name: "complex cycle",
+			jsonDef: `{
+				"nodes": {
+					"a": {"type": "task", "title": "A", "dependsOn": []},
+					"b": {"type": "task", "title": "B", "dependsOn": ["a"]},
+					"c": {"type": "task", "title": "C", "dependsOn": ["b"]},
+					"d": {"type": "task", "title": "D", "dependsOn": ["c"]},
+					"e": {"type": "task", "title": "E", "dependsOn": ["d", "b"]}
+				}
+			}`,
+			wantErr: false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			def, err := ParseWorkflowDefinition(tt.jsonDef)
+			if err != nil {
+				t.Fatalf("ParseWorkflowDefinition() error = %v", err)
+			}
+
+			err = def.Validate()
+			if (err != nil) != tt.wantErr {
+				t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}
+
+func TestGetRootNodes(t *testing.T) {
+	jsonDef := `{
+		"nodes": {
+			"start": {"type": "task", "title": "Start", "dependsOn": []},
+			"middle": {"type": "task", "title": "Middle", "dependsOn": ["start"]},
+			"end": {"type": "task", "title": "End", "dependsOn": ["middle"]}
+		}
+	}`
+
+	def, err := ParseWorkflowDefinition(jsonDef)
+	if err != nil {
+		t.Fatalf("ParseWorkflowDefinition() error = %v", err)
+	}
+
+	roots := def.GetRootNodes()
+	if len(roots) != 1 {
+		t.Errorf("GetRootNodes() returned %d roots, want 1", len(roots))
+	}
+	if len(roots) > 0 && roots[0] != "start" {
+		t.Errorf("GetRootNodes() returned %v, want [start]", roots)
+	}
+}
+
+func TestGetDependentNodes(t *testing.T) {
+	jsonDef := `{
+		"nodes": {
+			"start": {"type": "task", "title": "Start", "dependsOn": []},
+			"branch1": {"type": "task", "title": "Branch 1", "dependsOn": ["start"]},
+			"branch2": {"type": "task", "title": "Branch 2", "dependsOn": ["start"]},
+			"end": {"type": "task", "title": "End", "dependsOn": ["branch1", "branch2"]}
+		}
+	}`
+
+	def, err := ParseWorkflowDefinition(jsonDef)
+	if err != nil {
+		t.Fatalf("ParseWorkflowDefinition() error = %v", err)
+	}
+
+	dependents := def.GetDependentNodes("start")
+	if len(dependents) != 2 {
+		t.Errorf("GetDependentNodes(start) returned %d dependents, want 2", len(dependents))
+	}
+
+	dependents = def.GetDependentNodes("branch1")
+	if len(dependents) != 1 {
+		t.Errorf("GetDependentNodes(branch1) returned %d dependents, want 1", len(dependents))
+	}
+}
+
+func TestGetReadyNodes(t *testing.T) {
+	jsonDef := `{
+		"nodes": {
+			"start": {"type": "task", "title": "Start", "dependsOn": []},
+			"middle": {"type": "task", "title": "Middle", "dependsOn": ["start"]},
+			"end": {"type": "task", "title": "End", "dependsOn": ["middle"]}
+		}
+	}`
+
+	def, err := ParseWorkflowDefinition(jsonDef)
+	if err != nil {
+		t.Fatalf("ParseWorkflowDefinition() error = %v", err)
+	}
+
+	// Initially, only start should be ready
+	ready := def.GetReadyNodes(map[string]bool{})
+	if len(ready) != 1 || ready[0] != "start" {
+		t.Errorf("GetReadyNodes({}) = %v, want [start]", ready)
+	}
+
+	// After start is executed, middle should be ready
+	ready = def.GetReadyNodes(map[string]bool{"start": true})
+	if len(ready) != 1 || ready[0] != "middle" {
+		t.Errorf("GetReadyNodes({start:true}) = %v, want [middle]", ready)
+	}
+
+	// After middle is executed, end should be ready
+	ready = def.GetReadyNodes(map[string]bool{"start": true, "middle": true})
+	if len(ready) != 1 || ready[0] != "end" {
+		t.Errorf("GetReadyNodes({start:true, middle:true}) = %v, want [end]", ready)
+	}
+}
+
+func TestParallelNodeTypes(t *testing.T) {
+	jsonDef := `{
+		"nodes": {
+			"start": {"type": "task", "title": "Start", "dependsOn": []},
+			"parallel": {"type": "parallel", "title": "Parallel", "dependsOn": ["start"]},
+			"join": {"type": "join", "title": "Join", "dependsOn": ["parallel"]}
+		}
+	}`
+
+	def, err := ParseWorkflowDefinition(jsonDef)
+	if err != nil {
+		t.Fatalf("ParseWorkflowDefinition() error = %v", err)
+	}
+
+	if err := def.Validate(); err != nil {
+		t.Errorf("Validate() error = %v, want nil", err)
+	}
+
+	// Verify node types
+	if def.Nodes["parallel"].Type != NodeTypeParallel {
+		t.Errorf("parallel node type = %v, want %v", def.Nodes["parallel"].Type, NodeTypeParallel)
+	}
+	if def.Nodes["join"].Type != NodeTypeJoin {
+		t.Errorf("join node type = %v, want %v", def.Nodes["join"].Type, NodeTypeJoin)
+	}
+}
+
+func TestConditionNodeType(t *testing.T) {
+	jsonDef := `{
+		"nodes": {
+			"start": {"type": "task", "title": "Start", "dependsOn": []},
+			"condition": {"type": "condition", "title": "Condition", "dependsOn": ["start"]}
+		}
+	}`
+
+	def, err := ParseWorkflowDefinition(jsonDef)
+	if err != nil {
+		t.Fatalf("ParseWorkflowDefinition() error = %v", err)
+	}
+
+	if err := def.Validate(); err != nil {
+		t.Errorf("Validate() error = %v, want nil", err)
+	}
+
+	if def.Nodes["condition"].Type != NodeTypeCondition {
+		t.Errorf("condition node type = %v, want %v", def.Nodes["condition"].Type, NodeTypeCondition)
+	}
+}
+
+func TestTriggerNodeType(t *testing.T) {
+	jsonDef := `{
+		"nodes": {
+			"trigger": {"type": "trigger", "title": "Trigger", "dependsOn": []},
+			"task": {"type": "task", "title": "Task", "dependsOn": ["trigger"]}
+		}
+	}`
+
+	def, err := ParseWorkflowDefinition(jsonDef)
+	if err != nil {
+		t.Fatalf("ParseWorkflowDefinition() error = %v", err)
+	}
+
+	if err := def.Validate(); err != nil {
+		t.Errorf("Validate() error = %v, want nil", err)
+	}
+
+	if def.Nodes["trigger"].Type != NodeTypeTrigger {
+		t.Errorf("trigger node type = %v, want %v", def.Nodes["trigger"].Type, NodeTypeTrigger)
+	}
+}

この差分においてかなりの量のファイルが変更されているため、一部のファイルを表示していません