Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pipeline timeout #320

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion gaia.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ const (

// RunReschedule status
RunReschedule PipelineRunStatus = "reschedule"


// RunTimeOut status
RunTimeOut PipelineRunStatus = "timeout"

// JobWaitingExec status
JobWaitingExec JobStatus = "waiting for execution"

Expand Down Expand Up @@ -230,6 +233,7 @@ type Pipeline struct {
Tags []string `json:"tags,omitempty"`
Docker bool `json:"docker"`
CronInst *cron.Cron `json:"-"`
TimeOut int `json:"timeout"`
}

// GitRepo represents a single git repository
Expand Down Expand Up @@ -262,6 +266,12 @@ type Argument struct {
Value string `json:"value,omitempty"`
}

// StartPipelineParam Add the timeout parameter to the pipeline unit:minutes
type StartPipelineParam struct {
TimeOut int `json:"timeout,omitempty"`
Arg []*Argument `json:"arg,omitempty"`
}

// CreatePipeline represents a pipeline which is not yet
// compiled.
type CreatePipeline struct {
Expand Down Expand Up @@ -296,6 +306,7 @@ type PipelineRun struct {
PipelineTags []string `json:"pipelinetags,omitempty"`
Docker bool `json:"docker,omitempty"`
DockerWorkerID string `json:"dockerworkerid,omitempty"`
TimeOut int `json:"timeout,omitempty"`
}

// Worker represents a single registered worker.
Expand Down
17 changes: 11 additions & 6 deletions providers/pipelines/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func (pp *PipelineProvider) PipelineTriggerAuth(c echo.Context) error {
// @Produce json
// @Security ApiKeyAuth
// @Param pipelineid query string true "The ID of the pipeline."
// @Param args body gaia.Argument false "Optional arguments of the pipeline."
// @Param args body gaia.StartPipelineParam false "Optional arguments of the pipeline."
// @Success 200 {object} gaia.PipelineRun
// @Failure 400 {string} string "Various failures regarding starting the pipeline like: invalid id, invalid docker value and schedule errors"
// @Failure 404 {string} string "Pipeline not found"
Expand All @@ -559,8 +559,8 @@ func (pp *PipelineProvider) PipelineStart(c echo.Context) error {

// Look for arguments.
// We do not check for errors here cause arguments are optional.
var args []*gaia.Argument
_ = c.Bind(&args)
var param gaia.StartPipelineParam
_ = c.Bind(&param)

// Convert string to int because id is int
pipelineID, err := strconv.Atoi(pipelineIDStr)
Expand All @@ -578,14 +578,19 @@ func (pp *PipelineProvider) PipelineStart(c echo.Context) error {
}

// Overwrite docker setting
for _, a := range args {
for _, a := range param.Arg {
if a.Key == "docker" {
foundPipeline.Docker = a.Value == "1"
}
}


// set foundPipeline for the given timeout
if param.TimeOut > 0 {
foundPipeline.TimeOut = param.TimeOut
}

if foundPipeline.Name != "" {
pipelineRun, err := pp.deps.Scheduler.SchedulePipeline(&foundPipeline, gaia.StartReasonManual, args)
pipelineRun, err := pp.deps.Scheduler.SchedulePipeline(&foundPipeline, gaia.StartReasonManual, param.Arg)
if err != nil {
return c.String(http.StatusBadRequest, err.Error())
} else if pipelineRun != nil {
Expand Down
47 changes: 45 additions & 2 deletions workers/scheduler/gaiascheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline, startedReason string, arg
PipelineTags: p.Tags,
Docker: p.Docker,
StartReason: startedReason,
TimeOut: p.TimeOut,
}

// Put run into store
Expand Down Expand Up @@ -579,11 +580,13 @@ func (s *Scheduler) executeScheduledJobs(r gaia.PipelineRun, pS plugin.Plugin) {
runFail = true
}
}

if runFail && r.Status != gaia.RunCancelled {
if runFail && r.Status != gaia.RunCancelled && r.Status != gaia.RunTimeOut {
s.finishPipelineRun(&r, gaia.RunFailed)
} else if r.Status == gaia.RunCancelled {
s.finishPipelineRun(&r, gaia.RunCancelled)
} else if r.Status == gaia.RunTimeOut {
s.finishPipelineRun(&r, gaia.RunTimeOut)
} else {
s.finishPipelineRun(&r, gaia.RunSuccess)
}
Expand Down Expand Up @@ -631,6 +634,30 @@ func (s *Scheduler) executeScheduler(r *gaia.PipelineRun, pS plugin.Plugin) {
}
}()

// This is usually used when a job failed and the whole pipeline
// should be timeout.
timeOutChan := make(chan bool)
if r.TimeOut > 0 {
// Create a new timeOutTicker (scheduled go routine) which periodically
// check pipeline timeout
timeOutTicker := time.NewTicker(time.Duration(r.TimeOut) * time.Minute)
go func() {
defer ticker.Stop()
for {
select {
case <-timeOutTicker.C:
timeOutChan <- true
close(timeOutChan)
return
case _, ok := <-pipelineFinished:
if !ok {
return
}
}
}
}()
}

// Separate channel to save updates about the status of job executions.
triggerSave := make(chan gaia.Job)

Expand All @@ -657,6 +684,22 @@ func (s *Scheduler) executeScheduler(r *gaia.PipelineRun, pS plugin.Plugin) {
return
}
}
case _, ok := <-timeOutChan:
if ok {
for _, job := range r.Jobs {
if job.Status == gaia.JobRunning || job.Status == gaia.JobWaitingExec {
job.Status = gaia.JobFailed
job.FailPipeline = true
}
}
r.Status = gaia.RunTimeOut
_ = s.storeService.PipelinePutRun(r)
close(done)
close(executeScheduler)
finished <- true
finalize = true
return
}
case <-finished:
close(pipelineFinished)
return
Expand Down