-
Notifications
You must be signed in to change notification settings - Fork 9
/
preinit.go
99 lines (92 loc) · 2.18 KB
/
preinit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package kubejob
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
)
// PreInit can define the process you want to execute before the process of init container specified at the time of starting Job.
// It is mainly intended to be used when you use `(*JobExecutor).CopyToFile` to copy an arbitrary file to pod before init processing.
func (j *Job) PreInit(c corev1.Container, cb func(ctx context.Context, exec *JobExecutor) error) {
j.preInit = &preInit{
container: c,
callback: cb,
}
}
func (j *Job) setupPreInitContainer() error {
if j.preInit == nil {
return nil
}
c := j.preInit.container
var agentPort uint16
if j.agentCfg != nil && j.agentCfg.Enabled(c.Name) {
port, err := j.agentCfg.NewAllocatedPort()
if err != nil {
return err
}
agentPort = port
c.Env = append(c.Env, j.agentCfg.PublicKeyEnv())
}
j.preInit.container = jobTemplateCommandContainer(c, j.agentCfg, agentPort)
j.preInit.exec = &JobExecutor{
Container: c,
command: c.Command,
args: c.Args,
job: j,
agentCfg: j.agentCfg,
agentPort: agentPort,
}
return nil
}
type preInit struct {
exec *JobExecutor
container corev1.Container
callback func(context.Context, *JobExecutor) error
done bool
}
func (i *preInit) needsToRun(status corev1.PodStatus) bool {
if i == nil {
return false
}
if i.done {
return false
}
if status.Phase != corev1.PodPending {
return false
}
for _, status := range status.InitContainerStatuses {
if status.Name != i.container.Name {
continue
}
if status.State.Running != nil {
return true
}
}
return false
}
func (i *preInit) run(ctx context.Context, pod *corev1.Pod) error {
if i.done {
return nil
}
if err := i.exec.setPod(pod); err != nil {
return fmt.Errorf("job: failed to set corev1.Pod to preinit executor instance: %w", err)
}
logger := i.exec.job.containerLogger
i.exec.job.containerLogger = func(log *ContainerLog) {
if log.Container.Name == i.container.Name {
return
}
if logger != nil {
logger(log)
}
}
if i.callback != nil {
if err := i.callback(ctx, i.exec); err != nil {
return errPreInit(err)
}
}
i.done = true
if err := i.exec.Stop(); err != nil {
return err
}
return nil
}