engine.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. package workflow
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "gogs.dmsc.dev/arp/models"
  8. "gorm.io/gorm"
  9. )
  10. // Engine represents the workflow execution engine
  11. type Engine struct {
  12. db *gorm.DB
  13. config *Config
  14. ctx context.Context
  15. }
  16. // NewEngine creates a new workflow engine
  17. func NewEngine(db *gorm.DB) *Engine {
  18. return &Engine{
  19. db: db,
  20. config: LoadConfig(),
  21. ctx: context.Background(),
  22. }
  23. }
  24. // StartWorkflowInput represents the input for starting a workflow
  25. type StartWorkflowInput struct {
  26. ServiceID *uint
  27. Context string
  28. }
  29. // CreateInstance creates a new workflow instance from a template
  30. func (e *Engine) CreateInstance(template models.WorkflowTemplate, input StartWorkflowInput) (models.WorkflowInstance, []models.WorkflowNode, error) {
  31. // Parse the workflow definition
  32. def, err := ParseWorkflowDefinition(template.Definition)
  33. if err != nil {
  34. return models.WorkflowInstance{}, nil, fmt.Errorf("failed to parse workflow definition: %w", err)
  35. }
  36. // Validate the workflow definition
  37. if err := def.Validate(); err != nil {
  38. return models.WorkflowInstance{}, nil, fmt.Errorf("workflow definition invalid: %w", err)
  39. }
  40. // Create workflow instance
  41. instance := models.WorkflowInstance{
  42. WorkflowTemplateID: template.ID,
  43. Status: "running",
  44. Context: input.Context,
  45. ServiceID: input.ServiceID,
  46. }
  47. // Create nodes for each node in the definition
  48. var nodes []models.WorkflowNode
  49. for key, nodeDef := range def.Nodes {
  50. // Determine initial status based on dependencies
  51. status := NodeStatusPending
  52. if len(nodeDef.DependsOn) == 0 {
  53. status = NodeStatusReady
  54. }
  55. // Create input data for the node
  56. inputData, _ := json.Marshal(map[string]interface{}{
  57. "key": key,
  58. "type": nodeDef.Type,
  59. "title": nodeDef.Title,
  60. "content": nodeDef.Content,
  61. "dependsOn": nodeDef.DependsOn,
  62. })
  63. node := models.WorkflowNode{
  64. WorkflowInstanceID: 0, // Will be set after instance is created
  65. NodeKey: key,
  66. NodeType: string(nodeDef.Type),
  67. Status: string(status),
  68. InputData: string(inputData),
  69. RetryCount: 0,
  70. }
  71. // Set assignee if specified
  72. if nodeDef.Assignee != nil {
  73. // Store assignee info in output data for later task creation
  74. outputData, _ := json.Marshal(map[string]interface{}{
  75. "assignee": nodeDef.Assignee,
  76. })
  77. node.OutputData = string(outputData)
  78. }
  79. nodes = append(nodes, node)
  80. }
  81. return instance, nodes, nil
  82. }
  83. // EvaluateDependencies evaluates which nodes should be marked as ready
  84. func (e *Engine) EvaluateDependencies(instanceID uint, executedNodes map[string]bool) error {
  85. // Get all nodes for this instance
  86. var nodes []models.WorkflowNode
  87. if err := e.db.Where("workflow_instance_id = ?", instanceID).Find(&nodes).Error; err != nil {
  88. return fmt.Errorf("failed to fetch workflow nodes: %w", err)
  89. }
  90. // Update node statuses based on executed nodes
  91. for i := range nodes {
  92. node := &nodes[i]
  93. // Skip if already completed, failed, or skipped
  94. if node.Status == string(NodeStatusCompleted) ||
  95. node.Status == string(NodeStatusFailed) ||
  96. node.Status == string(NodeStatusSkipped) {
  97. continue
  98. }
  99. // Check if all dependencies are satisfied
  100. if executedNodes[node.NodeKey] {
  101. continue // Already executed
  102. }
  103. // Get dependencies for this node
  104. var nodeDef WorkflowNode
  105. if err := json.Unmarshal([]byte(node.InputData), &nodeDef); err != nil {
  106. continue
  107. }
  108. allDepsSatisfied := true
  109. for _, depKey := range nodeDef.DependsOn {
  110. if !executedNodes[depKey] {
  111. allDepsSatisfied = false
  112. break
  113. }
  114. }
  115. if allDepsSatisfied && node.Status == string(NodeStatusPending) {
  116. node.Status = string(NodeStatusReady)
  117. if err := e.db.Save(node).Error; err != nil {
  118. return fmt.Errorf("failed to update node status: %w", err)
  119. }
  120. }
  121. }
  122. return nil
  123. }
  124. // CreateTaskForNode creates a task for a workflow node
  125. func (e *Engine) CreateTaskForNode(node *models.WorkflowNode, createdByID uint) (*models.Task, error) {
  126. // Parse node input data to get task details
  127. var nodeDef WorkflowNode
  128. if err := json.Unmarshal([]byte(node.InputData), &nodeDef); err != nil {
  129. return nil, fmt.Errorf("failed to parse node input data: %w", err)
  130. }
  131. // Parse output data to get assignee
  132. var outputData map[string]interface{}
  133. if node.OutputData != "" {
  134. json.Unmarshal([]byte(node.OutputData), &outputData)
  135. }
  136. // Determine assignee
  137. var assigneeID *uint
  138. if outputData != nil && outputData["assignee"] != nil {
  139. // Assignee is stored as a string (user ID or email)
  140. assigneeStr := fmt.Sprintf("%v", outputData["assignee"])
  141. // Try to parse as uint first
  142. if id, err := parseUint(assigneeStr); err == nil {
  143. assigneeID = &id
  144. } else {
  145. // Look up user by email
  146. var user models.User
  147. if err := e.db.Where("email = ?", assigneeStr).First(&user).Error; err == nil {
  148. uid := uint(user.ID)
  149. assigneeID = &uid
  150. }
  151. }
  152. }
  153. // Create the task
  154. task := models.Task{
  155. Title: nodeDef.Title,
  156. Content: nodeDef.Content,
  157. CreatedByID: createdByID,
  158. AssigneeID: assigneeID,
  159. Priority: "medium",
  160. }
  161. if err := e.db.Create(&task).Error; err != nil {
  162. return nil, fmt.Errorf("failed to create task: %w", err)
  163. }
  164. // Update node with task ID
  165. node.TaskID = &task.ID
  166. if err := e.db.Save(node).Error; err != nil {
  167. return nil, fmt.Errorf("failed to update node with task ID: %w", err)
  168. }
  169. return &task, nil
  170. }
  171. // MarkNodeCompleted marks a node as completed and evaluates downstream nodes
  172. func (e *Engine) MarkNodeCompleted(nodeID uint) error {
  173. var node models.WorkflowNode
  174. if err := e.db.Preload("WorkflowInstance").First(&node, nodeID).Error; err != nil {
  175. return fmt.Errorf("workflow node not found: %w", err)
  176. }
  177. node.Status = string(NodeStatusCompleted)
  178. now := time.Now()
  179. node.CompletedAt = &now
  180. if err := e.db.Save(&node).Error; err != nil {
  181. return fmt.Errorf("failed to mark node as completed: %w", err)
  182. }
  183. // Evaluate downstream nodes
  184. executedNodes := e.getExecutedNodes(node.WorkflowInstanceID)
  185. if err := e.EvaluateDependencies(node.WorkflowInstanceID, executedNodes); err != nil {
  186. return err
  187. }
  188. // Check if workflow is completed
  189. return e.CheckWorkflowCompletion(node.WorkflowInstanceID)
  190. }
  191. // MarkNodeFailed marks a node as failed and handles retry logic
  192. func (e *Engine) MarkNodeFailed(nodeID uint, reason string) error {
  193. var node models.WorkflowNode
  194. if err := e.db.Preload("WorkflowInstance").First(&node, nodeID).Error; err != nil {
  195. return fmt.Errorf("workflow node not found: %w", err)
  196. }
  197. node.Status = string(NodeStatusFailed)
  198. now := time.Now()
  199. node.CompletedAt = &now
  200. // Check if we should retry
  201. if node.RetryCount < e.config.MaxRetries {
  202. // Reset to pending for retry
  203. node.Status = string(NodeStatusPending)
  204. node.CompletedAt = nil
  205. node.RetryCount++
  206. // Clear output data for retry
  207. node.OutputData = ""
  208. if err := e.db.Save(&node).Error; err != nil {
  209. return fmt.Errorf("failed to reset node for retry: %w", err)
  210. }
  211. return nil
  212. }
  213. // Max retries exceeded, mark as failed
  214. if err := e.db.Save(&node).Error; err != nil {
  215. return fmt.Errorf("failed to mark node as failed: %w", err)
  216. }
  217. // Update workflow instance status
  218. var instance models.WorkflowInstance
  219. if err := e.db.First(&instance, node.WorkflowInstanceID).Error; err != nil {
  220. return fmt.Errorf("workflow instance not found: %w", err)
  221. }
  222. instance.Status = "failed"
  223. now = time.Now()
  224. instance.CompletedAt = &now
  225. if err := e.db.Save(&instance).Error; err != nil {
  226. return fmt.Errorf("failed to update workflow instance status: %w", err)
  227. }
  228. return nil
  229. }
  230. // getExecutedNodes returns a map of node keys that have been executed
  231. func (e *Engine) getExecutedNodes(instanceID uint) map[string]bool {
  232. result := make(map[string]bool)
  233. var nodes []models.WorkflowNode
  234. if err := e.db.Where("workflow_instance_id = ? AND status IN ?", instanceID, []string{
  235. string(NodeStatusCompleted), string(NodeStatusFailed), string(NodeStatusSkipped),
  236. }).Find(&nodes).Error; err != nil {
  237. return result
  238. }
  239. for _, node := range nodes {
  240. result[node.NodeKey] = true
  241. }
  242. return result
  243. }
  244. // CheckWorkflowCompletion checks if the workflow instance is completed
  245. func (e *Engine) CheckWorkflowCompletion(instanceID uint) error {
  246. var instance models.WorkflowInstance
  247. if err := e.db.First(&instance, instanceID).Error; err != nil {
  248. return fmt.Errorf("workflow instance not found: %w", err)
  249. }
  250. // Get all nodes for this instance
  251. var allNodes []models.WorkflowNode
  252. if err := e.db.Where("workflow_instance_id = ?", instanceID).Find(&allNodes).Error; err != nil {
  253. return fmt.Errorf("failed to fetch workflow nodes: %w", err)
  254. }
  255. // Check if all nodes are completed, failed, or skipped
  256. allCompleted := true
  257. for _, node := range allNodes {
  258. if node.Status != string(NodeStatusCompleted) &&
  259. node.Status != string(NodeStatusFailed) &&
  260. node.Status != string(NodeStatusSkipped) {
  261. allCompleted = false
  262. break
  263. }
  264. }
  265. if allCompleted {
  266. instance.Status = "completed"
  267. now := time.Now()
  268. instance.CompletedAt = &now
  269. if err := e.db.Save(&instance).Error; err != nil {
  270. return fmt.Errorf("failed to update workflow instance status: %w", err)
  271. }
  272. }
  273. return nil
  274. }
  275. // StartWorkflowInstance starts a workflow instance by creating tasks for root nodes
  276. func (e *Engine) StartWorkflowInstance(instanceID uint, createdByID uint) error {
  277. // Get the workflow instance
  278. var instance models.WorkflowInstance
  279. if err := e.db.Preload("WorkflowTemplate").First(&instance, instanceID).Error; err != nil {
  280. return fmt.Errorf("workflow instance not found: %w", err)
  281. }
  282. // Parse the workflow definition
  283. def, err := ParseWorkflowDefinition(instance.WorkflowTemplate.Definition)
  284. if err != nil {
  285. return fmt.Errorf("failed to parse workflow definition: %w", err)
  286. }
  287. // Get root nodes (nodes with no dependencies)
  288. rootNodes := def.GetRootNodes()
  289. // Create tasks for root nodes
  290. for _, nodeKey := range rootNodes {
  291. // Find the node in the database
  292. var node models.WorkflowNode
  293. if err := e.db.Where("workflow_instance_id = ? AND node_key = ?", instanceID, nodeKey).First(&node).Error; err != nil {
  294. continue
  295. }
  296. // Create task for this node
  297. task, err := e.CreateTaskForNode(&node, createdByID)
  298. if err != nil {
  299. // Log error but continue with other nodes
  300. continue
  301. }
  302. // Update node status to running
  303. node.Status = string(NodeStatusRunning)
  304. now := time.Now()
  305. node.StartedAt = &now
  306. if err := e.db.Save(&node).Error; err != nil {
  307. return fmt.Errorf("failed to update node status: %w", err)
  308. }
  309. // Publish task event
  310. // (This would be handled by the task creation event in the resolver)
  311. _ = task
  312. }
  313. return nil
  314. }
  315. // StartWorkflowInstanceWithService starts a workflow instance with a service association
  316. func (e *Engine) StartWorkflowInstanceWithService(instanceID uint, createdByID uint, serviceID uint) error {
  317. // Get the workflow instance
  318. var instance models.WorkflowInstance
  319. if err := e.db.Preload("WorkflowTemplate").First(&instance, instanceID).Error; err != nil {
  320. return fmt.Errorf("workflow instance not found: %w", err)
  321. }
  322. // Parse the workflow definition
  323. def, err := ParseWorkflowDefinition(instance.WorkflowTemplate.Definition)
  324. if err != nil {
  325. return fmt.Errorf("failed to parse workflow definition: %w", err)
  326. }
  327. // Get root nodes (nodes with no dependencies)
  328. rootNodes := def.GetRootNodes()
  329. // Create tasks for root nodes
  330. for _, nodeKey := range rootNodes {
  331. // Find the node in the database
  332. var node models.WorkflowNode
  333. if err := e.db.Where("workflow_instance_id = ? AND node_key = ?", instanceID, nodeKey).First(&node).Error; err != nil {
  334. continue
  335. }
  336. // Create task for this node
  337. task, err := e.CreateTaskForNode(&node, createdByID)
  338. if err != nil {
  339. continue
  340. }
  341. // Update node status to running
  342. node.Status = string(NodeStatusRunning)
  343. now := time.Now()
  344. node.StartedAt = &now
  345. if err := e.db.Save(&node).Error; err != nil {
  346. return fmt.Errorf("failed to update node status: %w", err)
  347. }
  348. _ = task
  349. }
  350. return nil
  351. }
  352. // parseUint parses a string to uint
  353. func parseUint(s string) (uint, error) {
  354. var result uint
  355. _, err := fmt.Sscanf(s, "%d", &result)
  356. return result, err
  357. }