Skip to content

Commit a6ff29b

Browse files
fix: handle deleted jobs in scheduler BC (#712)
* fix: handle delered jobs in scheduler BC * fix: handle delered jobs in scheduler BC * fix: add better error mesages in airflow.go
1 parent a72d635 commit a6ff29b

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

ext/scheduler/airflow/airflow.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,14 @@ func (s *Scheduler) DeployJobs(ctx context.Context, tenant tenant.Tenant, jobs [
8383

8484
bucket, err := s.bucketFac.New(spanCtx, tenant)
8585
if err != nil {
86-
return err
86+
return errors.AddErrContext(err, EntityAirflow, "error in creating storage client instance")
8787
}
8888
defer bucket.Close()
8989

90-
bucket.WriteAll(spanCtx, filepath.Join(jobsDir, baseLibFileName), SharedLib, nil)
91-
90+
err = bucket.WriteAll(spanCtx, filepath.Join(jobsDir, baseLibFileName), SharedLib, nil)
91+
if err != nil {
92+
return errors.AddErrContext(err, EntityAirflow, "error in writing __lib.py file")
93+
}
9294
multiError := errors.NewMultiError("ErrorsInDeployJobs")
9395
runner := parallel.NewRunner(parallel.WithTicket(concurrentTicketPerSec), parallel.WithLimit(concurrentLimit))
9496
for _, job := range jobs {

internal/store/postgres/scheduler/job_repository.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func FromRow(row pgx.Row) (*Job, error) {
241241
}
242242

243243
func (j *JobRepository) GetJob(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName) (*scheduler.Job, error) {
244-
getJobByNameAtProject := `SELECT ` + jobColumns + ` FROM job WHERE name = $1 AND project_name = $2`
244+
getJobByNameAtProject := `SELECT ` + jobColumns + ` FROM job WHERE name = $1 AND project_name = $2 AND deleted_at IS NULL`
245245
spec, err := FromRow(j.db.QueryRow(ctx, getJobByNameAtProject, jobName, projectName))
246246
if err != nil {
247247
return nil, err
@@ -250,7 +250,7 @@ func (j *JobRepository) GetJob(ctx context.Context, projectName tenant.ProjectNa
250250
}
251251

252252
func (j *JobRepository) GetJobDetails(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName) (*scheduler.JobWithDetails, error) {
253-
getJobByNameAtProject := `SELECT ` + jobColumns + ` FROM job WHERE name = $1 AND project_name = $2`
253+
getJobByNameAtProject := `SELECT ` + jobColumns + ` FROM job WHERE name = $1 AND project_name = $2 AND deleted_at IS NULL`
254254
spec, err := FromRow(j.db.QueryRow(ctx, getJobByNameAtProject, jobName, projectName))
255255
if err != nil {
256256
return nil, err
@@ -305,7 +305,7 @@ func (j *JobRepository) getJobsUpstreams(ctx context.Context, projectName tenant
305305
}
306306

307307
func (j *JobRepository) GetAll(ctx context.Context, projectName tenant.ProjectName) ([]*scheduler.JobWithDetails, error) {
308-
getJobByNameAtProject := `SELECT ` + jobColumns + ` FROM job WHERE project_name = $1`
308+
getJobByNameAtProject := `SELECT ` + jobColumns + ` FROM job WHERE project_name = $1 AND deleted_at IS NULL`
309309
rows, err := j.db.Query(ctx, getJobByNameAtProject, projectName)
310310
if err != nil {
311311
return nil, errors.Wrap(job.EntityJob, "error while getting all jobs", err)

0 commit comments

Comments
 (0)