package workflow import ( "context" "encoding/json" "fmt" "time" "gogs.dmsc.dev/arp/models" "gorm.io/gorm" ) // Engine represents the workflow execution engine type Engine struct { db *gorm.DB config *Config ctx context.Context } // NewEngine creates a new workflow engine func NewEngine(db *gorm.DB) *Engine { return &Engine{ db: db, config: LoadConfig(), ctx: context.Background(), } } // StartWorkflowInput represents the input for starting a workflow type StartWorkflowInput struct { ServiceID *uint Context string } // CreateInstance creates a new workflow instance from a template func (e *Engine) CreateInstance(template models.WorkflowTemplate, input StartWorkflowInput) (models.WorkflowInstance, []models.WorkflowNode, error) { // Parse the workflow definition def, err := ParseWorkflowDefinition(template.Definition) if err != nil { return models.WorkflowInstance{}, nil, fmt.Errorf("failed to parse workflow definition: %w", err) } // Validate the workflow definition if err := def.Validate(); err != nil { return models.WorkflowInstance{}, nil, fmt.Errorf("workflow definition invalid: %w", err) } // Create workflow instance instance := models.WorkflowInstance{ WorkflowTemplateID: template.ID, Status: "running", Context: input.Context, ServiceID: input.ServiceID, } // Create nodes for each node in the definition var nodes []models.WorkflowNode for key, nodeDef := range def.Nodes { // Determine initial status based on dependencies status := NodeStatusPending if len(nodeDef.DependsOn) == 0 { status = NodeStatusReady } // Create input data for the node inputData, _ := json.Marshal(map[string]interface{}{ "key": key, "type": nodeDef.Type, "title": nodeDef.Title, "content": nodeDef.Content, "dependsOn": nodeDef.DependsOn, }) node := models.WorkflowNode{ WorkflowInstanceID: 0, // Will be set after instance is created NodeKey: key, NodeType: string(nodeDef.Type), Status: string(status), InputData: string(inputData), RetryCount: 0, } // Set assignee if specified if nodeDef.Assignee != nil { // Store assignee info in output data for later task creation outputData, _ := json.Marshal(map[string]interface{}{ "assignee": nodeDef.Assignee, }) node.OutputData = string(outputData) } nodes = append(nodes, node) } return instance, nodes, nil } // EvaluateDependencies evaluates which nodes should be marked as ready func (e *Engine) EvaluateDependencies(instanceID uint, executedNodes map[string]bool) error { // Get all nodes for this instance var nodes []models.WorkflowNode if err := e.db.Where("workflow_instance_id = ?", instanceID).Find(&nodes).Error; err != nil { return fmt.Errorf("failed to fetch workflow nodes: %w", err) } // Update node statuses based on executed nodes for i := range nodes { node := &nodes[i] // Skip if already completed, failed, or skipped if node.Status == string(NodeStatusCompleted) || node.Status == string(NodeStatusFailed) || node.Status == string(NodeStatusSkipped) { continue } // Check if all dependencies are satisfied if executedNodes[node.NodeKey] { continue // Already executed } // Get dependencies for this node var nodeDef WorkflowNode if err := json.Unmarshal([]byte(node.InputData), &nodeDef); err != nil { continue } allDepsSatisfied := true for _, depKey := range nodeDef.DependsOn { if !executedNodes[depKey] { allDepsSatisfied = false break } } if allDepsSatisfied && node.Status == string(NodeStatusPending) { node.Status = string(NodeStatusReady) if err := e.db.Save(node).Error; err != nil { return fmt.Errorf("failed to update node status: %w", err) } } } return nil } // CreateTaskForNode creates a task for a workflow node func (e *Engine) CreateTaskForNode(node *models.WorkflowNode, createdByID uint) (*models.Task, error) { // Parse node input data to get task details var nodeDef WorkflowNode if err := json.Unmarshal([]byte(node.InputData), &nodeDef); err != nil { return nil, fmt.Errorf("failed to parse node input data: %w", err) } // Parse output data to get assignee var outputData map[string]interface{} if node.OutputData != "" { json.Unmarshal([]byte(node.OutputData), &outputData) } // Determine assignee var assigneeID *uint if outputData != nil && outputData["assignee"] != nil { // Assignee is stored as a string (user ID or email) assigneeStr := fmt.Sprintf("%v", outputData["assignee"]) // Try to parse as uint first if id, err := parseUint(assigneeStr); err == nil { assigneeID = &id } else { // Look up user by email var user models.User if err := e.db.Where("email = ?", assigneeStr).First(&user).Error; err == nil { uid := uint(user.ID) assigneeID = &uid } } } // Create the task task := models.Task{ Title: nodeDef.Title, Content: nodeDef.Content, CreatedByID: createdByID, AssigneeID: assigneeID, Priority: "medium", } if err := e.db.Create(&task).Error; err != nil { return nil, fmt.Errorf("failed to create task: %w", err) } // Update node with task ID node.TaskID = &task.ID if err := e.db.Save(node).Error; err != nil { return nil, fmt.Errorf("failed to update node with task ID: %w", err) } return &task, nil } // MarkNodeCompleted marks a node as completed and evaluates downstream nodes func (e *Engine) MarkNodeCompleted(nodeID uint) error { var node models.WorkflowNode if err := e.db.Preload("WorkflowInstance").First(&node, nodeID).Error; err != nil { return fmt.Errorf("workflow node not found: %w", err) } node.Status = string(NodeStatusCompleted) now := time.Now() node.CompletedAt = &now if err := e.db.Save(&node).Error; err != nil { return fmt.Errorf("failed to mark node as completed: %w", err) } // Evaluate downstream nodes executedNodes := e.getExecutedNodes(node.WorkflowInstanceID) if err := e.EvaluateDependencies(node.WorkflowInstanceID, executedNodes); err != nil { return err } // Check if workflow is completed return e.CheckWorkflowCompletion(node.WorkflowInstanceID) } // MarkNodeFailed marks a node as failed and handles retry logic func (e *Engine) MarkNodeFailed(nodeID uint, reason string) error { var node models.WorkflowNode if err := e.db.Preload("WorkflowInstance").First(&node, nodeID).Error; err != nil { return fmt.Errorf("workflow node not found: %w", err) } node.Status = string(NodeStatusFailed) now := time.Now() node.CompletedAt = &now // Check if we should retry if node.RetryCount < e.config.MaxRetries { // Reset to pending for retry node.Status = string(NodeStatusPending) node.CompletedAt = nil node.RetryCount++ // Clear output data for retry node.OutputData = "" if err := e.db.Save(&node).Error; err != nil { return fmt.Errorf("failed to reset node for retry: %w", err) } return nil } // Max retries exceeded, mark as failed if err := e.db.Save(&node).Error; err != nil { return fmt.Errorf("failed to mark node as failed: %w", err) } // Update workflow instance status var instance models.WorkflowInstance if err := e.db.First(&instance, node.WorkflowInstanceID).Error; err != nil { return fmt.Errorf("workflow instance not found: %w", err) } instance.Status = "failed" now = time.Now() instance.CompletedAt = &now if err := e.db.Save(&instance).Error; err != nil { return fmt.Errorf("failed to update workflow instance status: %w", err) } return nil } // getExecutedNodes returns a map of node keys that have been executed func (e *Engine) getExecutedNodes(instanceID uint) map[string]bool { result := make(map[string]bool) var nodes []models.WorkflowNode if err := e.db.Where("workflow_instance_id = ? AND status IN ?", instanceID, []string{ string(NodeStatusCompleted), string(NodeStatusFailed), string(NodeStatusSkipped), }).Find(&nodes).Error; err != nil { return result } for _, node := range nodes { result[node.NodeKey] = true } return result } // CheckWorkflowCompletion checks if the workflow instance is completed func (e *Engine) CheckWorkflowCompletion(instanceID uint) error { var instance models.WorkflowInstance if err := e.db.First(&instance, instanceID).Error; err != nil { return fmt.Errorf("workflow instance not found: %w", err) } // Get all nodes for this instance var allNodes []models.WorkflowNode if err := e.db.Where("workflow_instance_id = ?", instanceID).Find(&allNodes).Error; err != nil { return fmt.Errorf("failed to fetch workflow nodes: %w", err) } // Check if all nodes are completed, failed, or skipped allCompleted := true for _, node := range allNodes { if node.Status != string(NodeStatusCompleted) && node.Status != string(NodeStatusFailed) && node.Status != string(NodeStatusSkipped) { allCompleted = false break } } if allCompleted { instance.Status = "completed" now := time.Now() instance.CompletedAt = &now if err := e.db.Save(&instance).Error; err != nil { return fmt.Errorf("failed to update workflow instance status: %w", err) } } return nil } // StartWorkflowInstance starts a workflow instance by creating tasks for root nodes func (e *Engine) StartWorkflowInstance(instanceID uint, createdByID uint) error { // Get the workflow instance var instance models.WorkflowInstance if err := e.db.Preload("WorkflowTemplate").First(&instance, instanceID).Error; err != nil { return fmt.Errorf("workflow instance not found: %w", err) } // Parse the workflow definition def, err := ParseWorkflowDefinition(instance.WorkflowTemplate.Definition) if err != nil { return fmt.Errorf("failed to parse workflow definition: %w", err) } // Get root nodes (nodes with no dependencies) rootNodes := def.GetRootNodes() // Create tasks for root nodes for _, nodeKey := range rootNodes { // Find the node in the database var node models.WorkflowNode if err := e.db.Where("workflow_instance_id = ? AND node_key = ?", instanceID, nodeKey).First(&node).Error; err != nil { continue } // Create task for this node task, err := e.CreateTaskForNode(&node, createdByID) if err != nil { // Log error but continue with other nodes continue } // Update node status to running node.Status = string(NodeStatusRunning) now := time.Now() node.StartedAt = &now if err := e.db.Save(&node).Error; err != nil { return fmt.Errorf("failed to update node status: %w", err) } // Publish task event // (This would be handled by the task creation event in the resolver) _ = task } return nil } // StartWorkflowInstanceWithService starts a workflow instance with a service association func (e *Engine) StartWorkflowInstanceWithService(instanceID uint, createdByID uint, serviceID uint) error { // Get the workflow instance var instance models.WorkflowInstance if err := e.db.Preload("WorkflowTemplate").First(&instance, instanceID).Error; err != nil { return fmt.Errorf("workflow instance not found: %w", err) } // Parse the workflow definition def, err := ParseWorkflowDefinition(instance.WorkflowTemplate.Definition) if err != nil { return fmt.Errorf("failed to parse workflow definition: %w", err) } // Get root nodes (nodes with no dependencies) rootNodes := def.GetRootNodes() // Create tasks for root nodes for _, nodeKey := range rootNodes { // Find the node in the database var node models.WorkflowNode if err := e.db.Where("workflow_instance_id = ? AND node_key = ?", instanceID, nodeKey).First(&node).Error; err != nil { continue } // Create task for this node task, err := e.CreateTaskForNode(&node, createdByID) if err != nil { continue } // Update node status to running node.Status = string(NodeStatusRunning) now := time.Now() node.StartedAt = &now if err := e.db.Save(&node).Error; err != nil { return fmt.Errorf("failed to update node status: %w", err) } _ = task } return nil } // parseUint parses a string to uint func parseUint(s string) (uint, error) { var result uint _, err := fmt.Sscanf(s, "%d", &result) return result, err }