| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package workflow
- import (
- "encoding/json"
- "fmt"
- )
- // NodeType represents the type of a workflow node
- type NodeType string
- const (
- NodeTypeTask NodeType = "task"
- NodeTypeCondition NodeType = "condition"
- NodeTypeParallel NodeType = "parallel"
- NodeTypeJoin NodeType = "join"
- NodeTypeTrigger NodeType = "trigger"
- )
- // NodeStatus represents the status of a workflow node
- type NodeStatus string
- const (
- NodeStatusPending NodeStatus = "pending"
- NodeStatusReady NodeStatus = "ready"
- NodeStatusRunning NodeStatus = "running"
- NodeStatusCompleted NodeStatus = "completed"
- NodeStatusSkipped NodeStatus = "skipped"
- NodeStatusFailed NodeStatus = "failed"
- )
- // WorkflowNode represents a node in the workflow DAG
- type WorkflowNode struct {
- Key string `json:"key"`
- Type NodeType `json:"type"`
- Title string `json:"title"`
- Content string `json:"content"`
- Assignee *string `json:"assignee,omitempty"`
- DependsOn []string `json:"dependsOn,omitempty"`
- Config string `json:"config,omitempty"` // JSON config for task nodes
- }
- // WorkflowDefinition represents the complete workflow DAG definition
- type WorkflowDefinition struct {
- Nodes map[string]*WorkflowNode `json:"nodes"`
- }
- // ParseWorkflowDefinition parses a JSON string into a WorkflowDefinition
- func ParseWorkflowDefinition(jsonDef string) (*WorkflowDefinition, error) {
- var def WorkflowDefinition
- if err := json.Unmarshal([]byte(jsonDef), &def); err != nil {
- return nil, fmt.Errorf("failed to parse workflow definition: %w", err)
- }
- return &def, nil
- }
- // Validate validates the workflow definition
- func (d *WorkflowDefinition) Validate() error {
- if d.Nodes == nil || len(d.Nodes) == 0 {
- return fmt.Errorf("workflow definition must have at least one node")
- }
- // Validate each node
- for key, node := range d.Nodes {
- if node.Type == "" {
- return fmt.Errorf("node '%s' must have a type", key)
- }
- // Validate dependsOn references exist
- for _, depKey := range node.DependsOn {
- if _, exists := d.Nodes[depKey]; !exists {
- return fmt.Errorf("node '%s' depends on non-existent node '%s'", key, depKey)
- }
- }
- // Validate node type
- switch node.Type {
- case NodeTypeTask, NodeTypeCondition, NodeTypeParallel, NodeTypeJoin, NodeTypeTrigger:
- // Valid types
- default:
- return fmt.Errorf("node '%s' has invalid type: %s", key, node.Type)
- }
- }
- // Check for cycles in the DAG
- if err := d.validateNoCycles(); err != nil {
- return err
- }
- return nil
- }
- // validateNoCycles checks for cycles in the workflow DAG using DFS
- func (d *WorkflowDefinition) validateNoCycles() error {
- visited := make(map[string]bool)
- recStack := make(map[string]bool)
- var dfs func(string) error
- dfs = func(nodeKey string) error {
- visited[nodeKey] = true
- recStack[nodeKey] = true
- node := d.Nodes[nodeKey]
- for _, depKey := range node.DependsOn {
- if !visited[depKey] {
- if err := dfs(depKey); err != nil {
- return err
- }
- } else if recStack[depKey] {
- return fmt.Errorf("cycle detected: node '%s' creates a dependency loop", depKey)
- }
- }
- recStack[nodeKey] = false
- return nil
- }
- // Run DFS from each node
- for key := range d.Nodes {
- if !visited[key] {
- if err := dfs(key); err != nil {
- return err
- }
- }
- }
- return nil
- }
- // GetRootNodes returns nodes that have no dependencies (starting points)
- func (d *WorkflowDefinition) GetRootNodes() []string {
- var roots []string
- for key, node := range d.Nodes {
- if len(node.DependsOn) == 0 {
- roots = append(roots, key)
- }
- }
- return roots
- }
- // GetDependentNodes returns nodes that depend on the given node
- func (d *WorkflowDefinition) GetDependentNodes(nodeKey string) []string {
- var dependents []string
- for key, node := range d.Nodes {
- for _, dep := range node.DependsOn {
- if dep == nodeKey {
- dependents = append(dependents, key)
- break
- }
- }
- }
- return dependents
- }
- // GetReadyNodes returns nodes that are ready to execute (all dependencies satisfied)
- func (d *WorkflowDefinition) GetReadyNodes(executedNodes map[string]bool) []string {
- var ready []string
- for key, node := range d.Nodes {
- if executedNodes[key] {
- continue // Already executed
- }
- allDepsSatisfied := true
- for _, depKey := range node.DependsOn {
- if !executedNodes[depKey] {
- allDepsSatisfied = false
- break
- }
- }
- if allDepsSatisfied {
- ready = append(ready, key)
- }
- }
- return ready
- }
|