| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- package workflow
- import (
- "context"
- "encoding/json"
- "fmt"
- "time"
- "gogs.dmsc.dev/arp/models"
- "gorm.io/gorm"
- )
- // Engine represents the workflow execution engine
- type Engine struct {
- db *gorm.DB
- config *Config
- ctx context.Context
- }
- // NewEngine creates a new workflow engine
- func NewEngine(db *gorm.DB) *Engine {
- return &Engine{
- db: db,
- config: LoadConfig(),
- ctx: context.Background(),
- }
- }
- // StartWorkflowInput represents the input for starting a workflow
- type StartWorkflowInput struct {
- ServiceID *uint
- Context string
- }
- // CreateInstance creates a new workflow instance from a template
- func (e *Engine) CreateInstance(template models.WorkflowTemplate, input StartWorkflowInput) (models.WorkflowInstance, []models.WorkflowNode, error) {
- // Parse the workflow definition
- def, err := ParseWorkflowDefinition(template.Definition)
- if err != nil {
- return models.WorkflowInstance{}, nil, fmt.Errorf("failed to parse workflow definition: %w", err)
- }
- // Validate the workflow definition
- if err := def.Validate(); err != nil {
- return models.WorkflowInstance{}, nil, fmt.Errorf("workflow definition invalid: %w", err)
- }
- // Create workflow instance
- instance := models.WorkflowInstance{
- WorkflowTemplateID: template.ID,
- Status: "running",
- Context: input.Context,
- ServiceID: input.ServiceID,
- }
- // Create nodes for each node in the definition
- var nodes []models.WorkflowNode
- for key, nodeDef := range def.Nodes {
- // Determine initial status based on dependencies
- status := NodeStatusPending
- if len(nodeDef.DependsOn) == 0 {
- status = NodeStatusReady
- }
- // Create input data for the node
- inputData, _ := json.Marshal(map[string]interface{}{
- "key": key,
- "type": nodeDef.Type,
- "title": nodeDef.Title,
- "content": nodeDef.Content,
- "dependsOn": nodeDef.DependsOn,
- })
- node := models.WorkflowNode{
- WorkflowInstanceID: 0, // Will be set after instance is created
- NodeKey: key,
- NodeType: string(nodeDef.Type),
- Status: string(status),
- InputData: string(inputData),
- RetryCount: 0,
- }
- // Set assignee if specified
- if nodeDef.Assignee != nil {
- // Store assignee info in output data for later task creation
- outputData, _ := json.Marshal(map[string]interface{}{
- "assignee": nodeDef.Assignee,
- })
- node.OutputData = string(outputData)
- }
- nodes = append(nodes, node)
- }
- return instance, nodes, nil
- }
- // EvaluateDependencies evaluates which nodes should be marked as ready
- func (e *Engine) EvaluateDependencies(instanceID uint, executedNodes map[string]bool) error {
- // Get all nodes for this instance
- var nodes []models.WorkflowNode
- if err := e.db.Where("workflow_instance_id = ?", instanceID).Find(&nodes).Error; err != nil {
- return fmt.Errorf("failed to fetch workflow nodes: %w", err)
- }
- // Update node statuses based on executed nodes
- for i := range nodes {
- node := &nodes[i]
- // Skip if already completed, failed, or skipped
- if node.Status == string(NodeStatusCompleted) ||
- node.Status == string(NodeStatusFailed) ||
- node.Status == string(NodeStatusSkipped) {
- continue
- }
- // Check if all dependencies are satisfied
- if executedNodes[node.NodeKey] {
- continue // Already executed
- }
- // Get dependencies for this node
- var nodeDef WorkflowNode
- if err := json.Unmarshal([]byte(node.InputData), &nodeDef); err != nil {
- continue
- }
- allDepsSatisfied := true
- for _, depKey := range nodeDef.DependsOn {
- if !executedNodes[depKey] {
- allDepsSatisfied = false
- break
- }
- }
- if allDepsSatisfied && node.Status == string(NodeStatusPending) {
- node.Status = string(NodeStatusReady)
- if err := e.db.Save(node).Error; err != nil {
- return fmt.Errorf("failed to update node status: %w", err)
- }
- }
- }
- return nil
- }
- // CreateTaskForNode creates a task for a workflow node
- func (e *Engine) CreateTaskForNode(node *models.WorkflowNode, createdByID uint) (*models.Task, error) {
- // Parse node input data to get task details
- var nodeDef WorkflowNode
- if err := json.Unmarshal([]byte(node.InputData), &nodeDef); err != nil {
- return nil, fmt.Errorf("failed to parse node input data: %w", err)
- }
- // Parse output data to get assignee
- var outputData map[string]interface{}
- if node.OutputData != "" {
- json.Unmarshal([]byte(node.OutputData), &outputData)
- }
- // Determine assignee
- var assigneeID *uint
- if outputData != nil && outputData["assignee"] != nil {
- // Assignee is stored as a string (user ID or email)
- assigneeStr := fmt.Sprintf("%v", outputData["assignee"])
- // Try to parse as uint first
- if id, err := parseUint(assigneeStr); err == nil {
- assigneeID = &id
- } else {
- // Look up user by email
- var user models.User
- if err := e.db.Where("email = ?", assigneeStr).First(&user).Error; err == nil {
- uid := uint(user.ID)
- assigneeID = &uid
- }
- }
- }
- // Create the task
- task := models.Task{
- Title: nodeDef.Title,
- Content: nodeDef.Content,
- CreatedByID: createdByID,
- AssigneeID: assigneeID,
- Priority: "medium",
- }
- if err := e.db.Create(&task).Error; err != nil {
- return nil, fmt.Errorf("failed to create task: %w", err)
- }
- // Update node with task ID
- node.TaskID = &task.ID
- if err := e.db.Save(node).Error; err != nil {
- return nil, fmt.Errorf("failed to update node with task ID: %w", err)
- }
- return &task, nil
- }
- // MarkNodeCompleted marks a node as completed and evaluates downstream nodes
- func (e *Engine) MarkNodeCompleted(nodeID uint) error {
- var node models.WorkflowNode
- if err := e.db.Preload("WorkflowInstance").First(&node, nodeID).Error; err != nil {
- return fmt.Errorf("workflow node not found: %w", err)
- }
- node.Status = string(NodeStatusCompleted)
- now := time.Now()
- node.CompletedAt = &now
- if err := e.db.Save(&node).Error; err != nil {
- return fmt.Errorf("failed to mark node as completed: %w", err)
- }
- // Evaluate downstream nodes
- executedNodes := e.getExecutedNodes(node.WorkflowInstanceID)
- if err := e.EvaluateDependencies(node.WorkflowInstanceID, executedNodes); err != nil {
- return err
- }
- // Check if workflow is completed
- return e.CheckWorkflowCompletion(node.WorkflowInstanceID)
- }
- // MarkNodeFailed marks a node as failed and handles retry logic
- func (e *Engine) MarkNodeFailed(nodeID uint, reason string) error {
- var node models.WorkflowNode
- if err := e.db.Preload("WorkflowInstance").First(&node, nodeID).Error; err != nil {
- return fmt.Errorf("workflow node not found: %w", err)
- }
- node.Status = string(NodeStatusFailed)
- now := time.Now()
- node.CompletedAt = &now
- // Check if we should retry
- if node.RetryCount < e.config.MaxRetries {
- // Reset to pending for retry
- node.Status = string(NodeStatusPending)
- node.CompletedAt = nil
- node.RetryCount++
- // Clear output data for retry
- node.OutputData = ""
- if err := e.db.Save(&node).Error; err != nil {
- return fmt.Errorf("failed to reset node for retry: %w", err)
- }
- return nil
- }
- // Max retries exceeded, mark as failed
- if err := e.db.Save(&node).Error; err != nil {
- return fmt.Errorf("failed to mark node as failed: %w", err)
- }
- // Update workflow instance status
- var instance models.WorkflowInstance
- if err := e.db.First(&instance, node.WorkflowInstanceID).Error; err != nil {
- return fmt.Errorf("workflow instance not found: %w", err)
- }
- instance.Status = "failed"
- now = time.Now()
- instance.CompletedAt = &now
- if err := e.db.Save(&instance).Error; err != nil {
- return fmt.Errorf("failed to update workflow instance status: %w", err)
- }
- return nil
- }
- // getExecutedNodes returns a map of node keys that have been executed
- func (e *Engine) getExecutedNodes(instanceID uint) map[string]bool {
- result := make(map[string]bool)
- var nodes []models.WorkflowNode
- if err := e.db.Where("workflow_instance_id = ? AND status IN ?", instanceID, []string{
- string(NodeStatusCompleted), string(NodeStatusFailed), string(NodeStatusSkipped),
- }).Find(&nodes).Error; err != nil {
- return result
- }
- for _, node := range nodes {
- result[node.NodeKey] = true
- }
- return result
- }
- // CheckWorkflowCompletion checks if the workflow instance is completed
- func (e *Engine) CheckWorkflowCompletion(instanceID uint) error {
- var instance models.WorkflowInstance
- if err := e.db.First(&instance, instanceID).Error; err != nil {
- return fmt.Errorf("workflow instance not found: %w", err)
- }
- // Get all nodes for this instance
- var allNodes []models.WorkflowNode
- if err := e.db.Where("workflow_instance_id = ?", instanceID).Find(&allNodes).Error; err != nil {
- return fmt.Errorf("failed to fetch workflow nodes: %w", err)
- }
- // Check if all nodes are completed, failed, or skipped
- allCompleted := true
- for _, node := range allNodes {
- if node.Status != string(NodeStatusCompleted) &&
- node.Status != string(NodeStatusFailed) &&
- node.Status != string(NodeStatusSkipped) {
- allCompleted = false
- break
- }
- }
- if allCompleted {
- instance.Status = "completed"
- now := time.Now()
- instance.CompletedAt = &now
- if err := e.db.Save(&instance).Error; err != nil {
- return fmt.Errorf("failed to update workflow instance status: %w", err)
- }
- }
- return nil
- }
- // StartWorkflowInstance starts a workflow instance by creating tasks for root nodes
- func (e *Engine) StartWorkflowInstance(instanceID uint, createdByID uint) error {
- // Get the workflow instance
- var instance models.WorkflowInstance
- if err := e.db.Preload("WorkflowTemplate").First(&instance, instanceID).Error; err != nil {
- return fmt.Errorf("workflow instance not found: %w", err)
- }
- // Parse the workflow definition
- def, err := ParseWorkflowDefinition(instance.WorkflowTemplate.Definition)
- if err != nil {
- return fmt.Errorf("failed to parse workflow definition: %w", err)
- }
- // Get root nodes (nodes with no dependencies)
- rootNodes := def.GetRootNodes()
- // Create tasks for root nodes
- for _, nodeKey := range rootNodes {
- // Find the node in the database
- var node models.WorkflowNode
- if err := e.db.Where("workflow_instance_id = ? AND node_key = ?", instanceID, nodeKey).First(&node).Error; err != nil {
- continue
- }
- // Create task for this node
- task, err := e.CreateTaskForNode(&node, createdByID)
- if err != nil {
- // Log error but continue with other nodes
- continue
- }
- // Update node status to running
- node.Status = string(NodeStatusRunning)
- now := time.Now()
- node.StartedAt = &now
- if err := e.db.Save(&node).Error; err != nil {
- return fmt.Errorf("failed to update node status: %w", err)
- }
- // Publish task event
- // (This would be handled by the task creation event in the resolver)
- _ = task
- }
- return nil
- }
- // StartWorkflowInstanceWithService starts a workflow instance with a service association
- func (e *Engine) StartWorkflowInstanceWithService(instanceID uint, createdByID uint, serviceID uint) error {
- // Get the workflow instance
- var instance models.WorkflowInstance
- if err := e.db.Preload("WorkflowTemplate").First(&instance, instanceID).Error; err != nil {
- return fmt.Errorf("workflow instance not found: %w", err)
- }
- // Parse the workflow definition
- def, err := ParseWorkflowDefinition(instance.WorkflowTemplate.Definition)
- if err != nil {
- return fmt.Errorf("failed to parse workflow definition: %w", err)
- }
- // Get root nodes (nodes with no dependencies)
- rootNodes := def.GetRootNodes()
- // Create tasks for root nodes
- for _, nodeKey := range rootNodes {
- // Find the node in the database
- var node models.WorkflowNode
- if err := e.db.Where("workflow_instance_id = ? AND node_key = ?", instanceID, nodeKey).First(&node).Error; err != nil {
- continue
- }
- // Create task for this node
- task, err := e.CreateTaskForNode(&node, createdByID)
- if err != nil {
- continue
- }
- // Update node status to running
- node.Status = string(NodeStatusRunning)
- now := time.Now()
- node.StartedAt = &now
- if err := e.db.Save(&node).Error; err != nil {
- return fmt.Errorf("failed to update node status: %w", err)
- }
- _ = task
- }
- return nil
- }
- // parseUint parses a string to uint
- func parseUint(s string) (uint, error) {
- var result uint
- _, err := fmt.Sscanf(s, "%d", &result)
- return result, err
- }
|