-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathstep_instance.go
144 lines (119 loc) · 3.46 KB
/
step_instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package asyncjob
import (
"context"
"fmt"
"time"
"github.com/Azure/go-asyncjob/graph"
"github.com/Azure/go-asynctask"
)
type StepState string
const StepStatePending StepState = "pending"
const StepStateRunning StepState = "running"
const StepStateFailed StepState = "failed"
const StepStateCompleted StepState = "completed"
// StepInstanceMeta is the interface for a step instance
type StepInstanceMeta interface {
GetName() string
ExecutionData() *StepExecutionData
GetState() StepState
GetJobInstance() JobInstanceMeta
GetStepDefinition() StepDefinitionMeta
Waitable() asynctask.Waitable
DotSpec() *graph.DotNodeSpec
}
// StepInstance is the instance of a step, within a job instance.
type StepInstance[T any] struct {
Definition *StepDefinition[T]
JobInstance JobInstanceMeta
task *asynctask.Task[T]
state StepState
executionData *StepExecutionData
}
func newStepInstance[T any](stepDefinition *StepDefinition[T], jobInstance JobInstanceMeta) *StepInstance[T] {
return &StepInstance[T]{
Definition: stepDefinition,
JobInstance: jobInstance,
executionData: &StepExecutionData{},
state: StepStatePending,
}
}
func (si *StepInstance[T]) GetJobInstance() JobInstanceMeta {
return si.JobInstance
}
func (si *StepInstance[T]) GetStepDefinition() StepDefinitionMeta {
return si.Definition
}
func (si *StepInstance[T]) Waitable() asynctask.Waitable {
return si.task
}
func (si *StepInstance[T]) GetName() string {
return si.Definition.GetName()
}
func (si *StepInstance[T]) GetState() StepState {
return si.state
}
func (si *StepInstance[T]) EnrichContext(ctx context.Context) (result context.Context) {
result = ctx
if si.Definition.executionOptions.ContextPolicy != nil {
// TODO: bubble up the error somehow
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered in EnrichContext", r)
}
}()
result = si.Definition.executionOptions.ContextPolicy(ctx, si)
}
return result
}
func (si *StepInstance[T]) ExecutionData() *StepExecutionData {
return si.executionData
}
func (si *StepInstance[T]) DotSpec() *graph.DotNodeSpec {
shape := "hexagon"
if si.Definition.stepType == stepTypeRoot {
shape = "triangle"
}
color := "gray"
switch si.state {
case StepStatePending:
color = "gray"
case StepStateRunning:
color = "yellow"
case StepStateCompleted:
color = "green"
case StepStateFailed:
color = "red"
}
tooltip := ""
if si.state != StepStatePending && si.executionData != nil {
tooltip = fmt.Sprintf("State: %s\\nStartAt: %s\\nDuration: %s", si.state, si.executionData.StartTime.Format(time.RFC3339Nano), si.executionData.Duration)
}
return &graph.DotNodeSpec{
Name: si.GetName(),
DisplayName: si.GetName(),
Shape: shape,
Style: "filled",
FillColor: color,
Tooltip: tooltip,
}
}
func connectStepInstance(stepFrom, stepTo StepInstanceMeta) *graph.DotEdgeSpec {
edgeSpec := &graph.DotEdgeSpec{
FromNodeName: stepFrom.GetName(),
ToNodeName: stepTo.GetName(),
Color: "black",
Style: "bold",
}
// update edge color, tooltip if NodeTo is started already.
if stepTo.GetState() != StepStatePending {
executionData := stepTo.ExecutionData()
edgeSpec.Tooltip = fmt.Sprintf("Time: %s", executionData.StartTime.Format(time.RFC3339Nano))
}
fromNodeState := stepFrom.GetState()
if fromNodeState == StepStateCompleted {
edgeSpec.Color = "green"
} else if fromNodeState == StepStateFailed {
edgeSpec.Color = "red"
}
return edgeSpec
}