1
0

parser.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package workflow
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. )
  6. // NodeType represents the type of a workflow node
  7. type NodeType string
  8. const (
  9. NodeTypeTask NodeType = "task"
  10. NodeTypeCondition NodeType = "condition"
  11. NodeTypeParallel NodeType = "parallel"
  12. NodeTypeJoin NodeType = "join"
  13. NodeTypeTrigger NodeType = "trigger"
  14. )
  15. // NodeStatus represents the status of a workflow node
  16. type NodeStatus string
  17. const (
  18. NodeStatusPending NodeStatus = "pending"
  19. NodeStatusReady NodeStatus = "ready"
  20. NodeStatusRunning NodeStatus = "running"
  21. NodeStatusCompleted NodeStatus = "completed"
  22. NodeStatusSkipped NodeStatus = "skipped"
  23. NodeStatusFailed NodeStatus = "failed"
  24. )
  25. // WorkflowNode represents a node in the workflow DAG
  26. type WorkflowNode struct {
  27. Key string `json:"key"`
  28. Type NodeType `json:"type"`
  29. Title string `json:"title"`
  30. Content string `json:"content"`
  31. Assignee *string `json:"assignee,omitempty"`
  32. DependsOn []string `json:"dependsOn,omitempty"`
  33. Config string `json:"config,omitempty"` // JSON config for task nodes
  34. }
  35. // WorkflowDefinition represents the complete workflow DAG definition
  36. type WorkflowDefinition struct {
  37. Nodes map[string]*WorkflowNode `json:"nodes"`
  38. }
  39. // ParseWorkflowDefinition parses a JSON string into a WorkflowDefinition
  40. func ParseWorkflowDefinition(jsonDef string) (*WorkflowDefinition, error) {
  41. var def WorkflowDefinition
  42. if err := json.Unmarshal([]byte(jsonDef), &def); err != nil {
  43. return nil, fmt.Errorf("failed to parse workflow definition: %w", err)
  44. }
  45. return &def, nil
  46. }
  47. // Validate validates the workflow definition
  48. func (d *WorkflowDefinition) Validate() error {
  49. if d.Nodes == nil || len(d.Nodes) == 0 {
  50. return fmt.Errorf("workflow definition must have at least one node")
  51. }
  52. // Validate each node
  53. for key, node := range d.Nodes {
  54. if node.Type == "" {
  55. return fmt.Errorf("node '%s' must have a type", key)
  56. }
  57. // Validate dependsOn references exist
  58. for _, depKey := range node.DependsOn {
  59. if _, exists := d.Nodes[depKey]; !exists {
  60. return fmt.Errorf("node '%s' depends on non-existent node '%s'", key, depKey)
  61. }
  62. }
  63. // Validate node type
  64. switch node.Type {
  65. case NodeTypeTask, NodeTypeCondition, NodeTypeParallel, NodeTypeJoin, NodeTypeTrigger:
  66. // Valid types
  67. default:
  68. return fmt.Errorf("node '%s' has invalid type: %s", key, node.Type)
  69. }
  70. }
  71. // Check for cycles in the DAG
  72. if err := d.validateNoCycles(); err != nil {
  73. return err
  74. }
  75. return nil
  76. }
  77. // validateNoCycles checks for cycles in the workflow DAG using DFS
  78. func (d *WorkflowDefinition) validateNoCycles() error {
  79. visited := make(map[string]bool)
  80. recStack := make(map[string]bool)
  81. var dfs func(string) error
  82. dfs = func(nodeKey string) error {
  83. visited[nodeKey] = true
  84. recStack[nodeKey] = true
  85. node := d.Nodes[nodeKey]
  86. for _, depKey := range node.DependsOn {
  87. if !visited[depKey] {
  88. if err := dfs(depKey); err != nil {
  89. return err
  90. }
  91. } else if recStack[depKey] {
  92. return fmt.Errorf("cycle detected: node '%s' creates a dependency loop", depKey)
  93. }
  94. }
  95. recStack[nodeKey] = false
  96. return nil
  97. }
  98. // Run DFS from each node
  99. for key := range d.Nodes {
  100. if !visited[key] {
  101. if err := dfs(key); err != nil {
  102. return err
  103. }
  104. }
  105. }
  106. return nil
  107. }
  108. // GetRootNodes returns nodes that have no dependencies (starting points)
  109. func (d *WorkflowDefinition) GetRootNodes() []string {
  110. var roots []string
  111. for key, node := range d.Nodes {
  112. if len(node.DependsOn) == 0 {
  113. roots = append(roots, key)
  114. }
  115. }
  116. return roots
  117. }
  118. // GetDependentNodes returns nodes that depend on the given node
  119. func (d *WorkflowDefinition) GetDependentNodes(nodeKey string) []string {
  120. var dependents []string
  121. for key, node := range d.Nodes {
  122. for _, dep := range node.DependsOn {
  123. if dep == nodeKey {
  124. dependents = append(dependents, key)
  125. break
  126. }
  127. }
  128. }
  129. return dependents
  130. }
  131. // GetReadyNodes returns nodes that are ready to execute (all dependencies satisfied)
  132. func (d *WorkflowDefinition) GetReadyNodes(executedNodes map[string]bool) []string {
  133. var ready []string
  134. for key, node := range d.Nodes {
  135. if executedNodes[key] {
  136. continue // Already executed
  137. }
  138. allDepsSatisfied := true
  139. for _, depKey := range node.DependsOn {
  140. if !executedNodes[depKey] {
  141. allDepsSatisfied = false
  142. break
  143. }
  144. }
  145. if allDepsSatisfied {
  146. ready = append(ready, key)
  147. }
  148. }
  149. return ready
  150. }