Skip to content

Commit 216b53d

Browse files
author
liuchangle6
committed
feat: add skip option
1 parent 09ba4f7 commit 216b53d

File tree

3 files changed

+706
-37
lines changed

3 files changed

+706
-37
lines changed

workflow/engine.go

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type WorkflowEngine struct {
2424

2525
// Repository workflow storage Repository
2626
Repository WorkflowRepository
27+
28+
// Skip succeeded nodes flag for retry functionality
29+
skipSucceededNodes bool
2730
}
2831

2932
type Result struct {
@@ -36,7 +39,29 @@ type NodeExecutor interface {
3639
ExecuteWorkflowNode(ctx context.Context, data NodeData) Result
3740
}
3841

39-
func NewEngine(ctx context.Context, wf *Workflow, w *WorkflowController, repository WorkflowRepository) (*WorkflowEngine, error) {
42+
// EngineOption defines a function type for configuring WorkflowEngine
43+
type EngineOption func(*WorkflowEngine)
44+
45+
// WithSkipSucceededNodes sets the skipSucceededNodes option
46+
func WithSkipSucceededNodes(skip bool) EngineOption {
47+
return func(engine *WorkflowEngine) {
48+
engine.skipSucceededNodes = skip
49+
}
50+
}
51+
52+
// NewEngine creates a new WorkflowEngine with optional configuration
53+
func NewEngine(ctx context.Context, wf *Workflow, w *WorkflowController, repository WorkflowRepository, options ...EngineOption) (*WorkflowEngine, error) {
54+
// Validate required parameters
55+
if wf == nil {
56+
return nil, fmt.Errorf("workflow cannot be nil")
57+
}
58+
if w == nil {
59+
return nil, fmt.Errorf("workflow controller cannot be nil")
60+
}
61+
if repository == nil {
62+
return nil, fmt.Errorf("repository cannot be nil")
63+
}
64+
4065
engine := &WorkflowEngine{
4166
ctx: ctx,
4267
wf: wf,
@@ -46,6 +71,11 @@ func NewEngine(ctx context.Context, wf *Workflow, w *WorkflowController, reposit
4671
Repository: repository,
4772
}
4873

74+
// Apply all options
75+
for _, option := range options {
76+
option(engine)
77+
}
78+
4979
return engine, engine.initializeDAGFromWorkflow()
5080
}
5181

@@ -63,7 +93,7 @@ func (oc *WorkflowEngine) registerNodeTypeExecutor(nodeType NodeType, executor N
6393
}
6494

6595
oc.executorMap[nodeType] = executor
66-
oc.woc.logger.Info("[WorkflowEngine] Registered executor for node type", "nodeType", nodeType)
96+
oc.woc.logger.Info("registered executor for node type", "nodeType", nodeType)
6797
return nil
6898
}
6999

@@ -97,7 +127,7 @@ func (oc *WorkflowEngine) AddWorkflowNode(nodeID, nodeName string, nodeType Node
97127
}
98128
oc.wf.Status.Nodes.Set(nodeID, nodeStatus)
99129

100-
oc.woc.logger.Info("[WorkflowEngine] Added workflow node", "nodeID", nodeID, "nodeType", nodeType)
130+
oc.woc.logger.Info("added workflow node", "nodeID", nodeID, "nodeType", nodeType)
101131
return nil
102132
}
103133

@@ -138,7 +168,7 @@ func (oc *WorkflowEngine) AddWorkflowDependency(fromNodeID, toNodeID string) err
138168
fromNode.Children = append(fromNode.Children, toNodeID)
139169
oc.wf.Status.Nodes.Set(fromNodeID, *fromNode)
140170

141-
oc.woc.logger.Info("[WorkflowEngine] Added dependency", "from", fromNodeID, "to", toNodeID)
171+
oc.woc.logger.Info("added dependency", "from", fromNodeID, "to", toNodeID)
142172
return nil
143173
}
144174

@@ -182,9 +212,11 @@ func (oc *WorkflowEngine) RemoveDependency(fromNodeID, toNodeID string) error {
182212
return nil
183213
}
184214

185-
// ExecuteWorkflow executes the entire workflow
215+
// ExecuteWorkflow executes the entire workflow using the engine's configured options
186216
func (oc *WorkflowEngine) ExecuteWorkflow(ctx context.Context) error {
187-
oc.woc.logger.Info("[WorkflowEngine] Starting workflow execution", "workflowName", oc.wf.Metadata.Name)
217+
oc.woc.logger.Info("starting workflow execution",
218+
"workflowName", oc.wf.Metadata.Name,
219+
"skipSucceeded", oc.skipSucceededNodes)
188220

189221
// Validate DAG
190222
if err := oc.dagExecutor.ValidateDAG(); err != nil {
@@ -194,20 +226,19 @@ func (oc *WorkflowEngine) ExecuteWorkflow(ctx context.Context) error {
194226
// Update workflow status to running
195227
oc.wf.Status.Phase = Running
196228

197-
// ExecuteWorkflowNode DAG using the new DAG execution method
198-
{
199-
err := oc.executeDAG(ctx)
200-
if err != nil {
201-
oc.woc.logger.Error(err, "[WorkflowEngine] Failed to execute DAG")
202-
}
229+
// Execute DAG
230+
err := oc.executeDAG(ctx)
231+
if err != nil {
232+
oc.woc.logger.Error(err, "failed to execute DAG")
203233
}
234+
204235
// Check if all nodes completed successfully
205236
if oc.isWorkflowCompleted() {
206237
oc.wf.Status.Phase = Succeeded
207-
oc.wf.Status.Message = "[WorkflowEngine] Workflow execute succeeded"
238+
oc.wf.Status.Message = "workflow execute succeeded"
208239
} else {
209240
oc.wf.Status.Phase = Failed
210-
oc.wf.Status.Message = "[WorkflowEngine] Workflow execute failed"
241+
oc.wf.Status.Message = "workflow execute failed"
211242
}
212243

213244
// Update record
@@ -216,16 +247,16 @@ func (oc *WorkflowEngine) ExecuteWorkflow(ctx context.Context) error {
216247

217248
// executeDAG executes the DAG with DFS-based parallel execution
218249
func (oc *WorkflowEngine) executeDAG(ctx context.Context) error {
219-
oc.woc.logger.Info("[WorkflowEngine] Starting DFS-based DAG execution")
250+
oc.woc.logger.Info("starting DFS-based DAG execution")
220251

221252
// Get root nodes (nodes with no dependencies)
222253
rootNodes := oc.dagExecutor.GetRootNodes()
223254
if len(rootNodes) == 0 {
224-
oc.woc.logger.Info("[WorkflowEngine] No root nodes found")
255+
oc.woc.logger.Info("no root nodes found")
225256
return nil
226257
}
227258

228-
oc.woc.logger.Info("[WorkflowEngine] Found root nodes", "count", len(rootNodes))
259+
oc.woc.logger.Info("found root nodes", "count", len(rootNodes))
229260

230261
// ExecuteWorkflowNode from all root nodes in parallel
231262
return oc.executeRootNodes(ctx, rootNodes)
@@ -274,18 +305,18 @@ func (oc *WorkflowEngine) executeDFSFromNode(ctx context.Context, node *Workflow
274305
// Get dependent nodes (children)
275306
dependents := oc.dagExecutor.GetDependents(node.ID)
276307
if len(dependents) == 0 {
277-
oc.woc.logger.Info("[WorkflowEngine] Node completed (leaf node)", "nodeID", node.ID)
308+
oc.woc.logger.Info("node completed (leaf node)", "nodeID", node.ID)
278309
return nil
279310
}
280311

281312
// Find ready dependents (all dependencies completed)
282313
readyDependents := oc.getReadyDependents(dependents)
283314
if len(readyDependents) == 0 {
284-
oc.woc.logger.Info("[WorkflowEngine] Node completed (no ready dependents)", "nodeID", node.ID)
315+
oc.woc.logger.Info("nNode completed (no ready dependents)", "nodeID", node.ID)
285316
return nil
286317
}
287318

288-
oc.woc.logger.Info("[WorkflowEngine] Executing dependents", "nodeID", node.ID, "dependentCount", len(readyDependents))
319+
oc.woc.logger.Info("executing dependents", "nodeID", node.ID, "dependentCount", len(readyDependents))
289320

290321
// ExecuteWorkflowNode ready dependents
291322
return oc.executeReadyDependents(ctx, readyDependents)
@@ -352,7 +383,7 @@ func (oc *WorkflowEngine) areAllDependenciesCompleted(node *WorkflowNode) bool {
352383
return true
353384
}
354385

355-
// executeNode executes a single node
386+
// executeNode executes a single node with optional skip logic
356387
func (oc *WorkflowEngine) executeNode(ctx context.Context, nodeID string) error {
357388
oc.mu.Lock()
358389
node, err := oc.wf.Status.Nodes.Get(nodeID)
@@ -361,6 +392,13 @@ func (oc *WorkflowEngine) executeNode(ctx context.Context, nodeID string) error
361392
return fmt.Errorf("node %s not found: %v", nodeID, err)
362393
}
363394

395+
// skip nodes
396+
if (oc.skipSucceededNodes && node.Phase == NodeSucceeded) || node.Phase == NodeSkipped {
397+
oc.mu.Unlock()
398+
oc.woc.logger.Info("skipping already completed node", "nodeID", nodeID, "phase", node.Phase)
399+
return nil
400+
}
401+
364402
nodeType := node.Type
365403
executor, exists := oc.executorMap[nodeType]
366404
if !exists {
@@ -369,12 +407,13 @@ func (oc *WorkflowEngine) executeNode(ctx context.Context, nodeID string) error
369407
}
370408
oc.mu.Unlock()
371409

372-
oc.woc.logger.Info("[WorkflowEngine] Executing node", "nodeID", nodeID, "nodeType", nodeType)
410+
oc.woc.logger.Info("executing node", "nodeID", nodeID, "nodeType", nodeType)
373411

374412
now := metav1.Now()
375413
// Update node status to running
376414
if err = oc.updateNodeStatus(nodeID, Params{
377-
Phase: NodeRunning,
415+
Phase: NodeRunning,
416+
StartedAt: &now,
378417
}); err != nil {
379418
return err
380419
}
@@ -395,7 +434,7 @@ func (oc *WorkflowEngine) executeNode(ctx context.Context, nodeID string) error
395434
}); err != nil {
396435
return err
397436
}
398-
oc.woc.logger.Info("[WorkflowEngine] Node executed successfully", "nodeID", nodeID)
437+
oc.woc.logger.Info("node executed", "nodeID", nodeID, "phase", phase)
399438

400439
return nil
401440
}
@@ -453,7 +492,13 @@ func (oc *WorkflowEngine) isWorkflowCompleted() bool {
453492

454493
// initializeDAGFromWorkflow initializes DAG from existing workflow
455494
func (oc *WorkflowEngine) initializeDAGFromWorkflow() error {
495+
if oc.wf == nil {
496+
return nil
497+
}
498+
499+
// Initialize Nodes map if it's nil
456500
if oc.wf.Status.Nodes == nil {
501+
oc.wf.Status.Nodes = make(map[string]NodeStatus)
457502
return nil
458503
}
459504
var err error
@@ -498,10 +543,9 @@ func (oc *WorkflowEngine) RegisterFunc(key NodeType, f NodeExecutor) error {
498543
return oc.registerNodeTypeExecutor(key, f)
499544
}
500545

546+
// operator executes the workflow (used by WorkflowController)
501547
func (oc *WorkflowEngine) operator(ctx context.Context) {
502-
err := oc.ExecuteWorkflow(ctx)
503-
if err != nil {
504-
oc.woc.logger.Error(err, "[WorkflowEngine] execute workflow failed")
505-
return
548+
if err := oc.ExecuteWorkflow(ctx); err != nil {
549+
oc.woc.logger.Error(err, "workflow execution failed")
506550
}
507551
}

0 commit comments

Comments
 (0)