diff --git a/core/job/resolver/internal_upstream_resolver.go b/core/job/resolver/internal_upstream_resolver.go index 5769822ac7..b10aa29825 100644 --- a/core/job/resolver/internal_upstream_resolver.go +++ b/core/job/resolver/internal_upstream_resolver.go @@ -71,7 +71,7 @@ func (i internalUpstreamResolver) resolveInferredUpstream(ctx context.Context, s var internalUpstream []*job.Upstream me := errors.NewMultiError("resolve internal inferred upstream errors") for _, source := range sources { - jobUpstreams, err := i.jobRepository.GetAllByResourceDestination(ctx, source) + jobUpstreams, err := i.jobRepository.GetAllEnabledByResourceDestination(ctx, source) me.Append(err) if len(jobUpstreams) == 0 { continue @@ -91,7 +91,7 @@ func (i internalUpstreamResolver) resolveStaticUpstream(ctx context.Context, pro me.Append(err) continue } - jobUpstream, err := i.jobRepository.GetByJobName(ctx, projectName, upstreamJobName) + jobUpstream, err := i.jobRepository.GetEnabledJobByName(ctx, projectName, upstreamJobName) if err != nil || jobUpstream == nil { me.Append(err) continue diff --git a/core/job/resolver/internal_upstream_resolver_test.go b/core/job/resolver/internal_upstream_resolver_test.go index cf1016f26d..60c3450b80 100644 --- a/core/job/resolver/internal_upstream_resolver_test.go +++ b/core/job/resolver/internal_upstream_resolver_test.go @@ -67,9 +67,9 @@ func TestInternalUpstreamResolver(t *testing.T) { logWriter := new(mockWriter) defer logWriter.AssertExpectations(t) - jobRepo.On("GetAllByResourceDestination", ctx, jobASources[0]).Return([]*job.Job{jobB}, nil) - jobRepo.On("GetAllByResourceDestination", ctx, jobASources[1]).Return([]*job.Job{}, nil) - jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) + jobRepo.On("GetAllEnabledByResourceDestination", ctx, jobASources[0]).Return([]*job.Job{jobB}, nil) + jobRepo.On("GetAllEnabledByResourceDestination", ctx, jobASources[1]).Return([]*job.Job{}, nil) + jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) jobWithUnresolvedUpstream := job.NewWithUpstream(jobA, []*job.Upstream{unresolvedUpstreamB, unresolvedUpstreamC, unresolvedUpstreamD}) expectedJobWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{internalUpstreamB, internalUpstreamC, unresolvedUpstreamD}) @@ -94,8 +94,8 @@ func TestInternalUpstreamResolver(t *testing.T) { unresolvedUpstreamCStatic := job.NewUpstreamUnresolvedStatic("job-C", sampleTenant.ProjectName()) internalUpstreamCStatic := job.NewUpstreamResolved("job-C", "", resourceURNC, sampleTenant, "static", taskName, false) - jobRepo.On("GetAllByResourceDestination", ctx, jobDSources[0]).Return([]*job.Job{jobC}, nil) - jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) + jobRepo.On("GetAllEnabledByResourceDestination", ctx, jobDSources[0]).Return([]*job.Job{jobC}, nil) + jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) jobWithUnresolvedUpstream := job.NewWithUpstream(jobD, []*job.Upstream{unresolvedUpstreamCStatic, unresolvedUpstreamCInferred}) expectedJobWithUpstream := job.NewWithUpstream(jobD, []*job.Upstream{internalUpstreamCStatic}) @@ -114,7 +114,7 @@ func TestInternalUpstreamResolver(t *testing.T) { jobXDestination := resourceURNX jobX := job.NewJob(sampleTenant, specX, jobXDestination, []resource.URN{resourceURNB}, false) - jobRepo.On("GetAllByResourceDestination", ctx, jobX.Sources()[0]).Return([]*job.Job{jobB}, nil) + jobRepo.On("GetAllEnabledByResourceDestination", ctx, jobX.Sources()[0]).Return([]*job.Job{jobB}, nil) jobWithUnresolvedUpstream := job.NewWithUpstream(jobX, []*job.Upstream{unresolvedUpstreamB}) expectedJobWithUpstream := job.NewWithUpstream(jobX, []*job.Upstream{internalUpstreamB}) @@ -133,7 +133,7 @@ func TestInternalUpstreamResolver(t *testing.T) { jobXDestination := resourceURNX jobX := job.NewJob(sampleTenant, specX, jobXDestination, nil, false) - jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) + jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) jobWithUnresolvedUpstream := job.NewWithUpstream(jobX, []*job.Upstream{unresolvedUpstreamC}) expectedJobWithUpstream := job.NewWithUpstream(jobX, []*job.Upstream{internalUpstreamC}) @@ -149,10 +149,10 @@ func TestInternalUpstreamResolver(t *testing.T) { logWriter := new(mockWriter) defer logWriter.AssertExpectations(t) - jobRepo.On("GetAllByResourceDestination", ctx, jobASources[0]).Return([]*job.Job{}, errors.New("internal error")) - jobRepo.On("GetAllByResourceDestination", ctx, jobASources[1]).Return([]*job.Job{}, nil) + jobRepo.On("GetAllEnabledByResourceDestination", ctx, jobASources[0]).Return([]*job.Job{}, errors.New("internal error")) + jobRepo.On("GetAllEnabledByResourceDestination", ctx, jobASources[1]).Return([]*job.Job{}, nil) - jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) + jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) jobWithUnresolvedUpstream := job.NewWithUpstream(jobA, []*job.Upstream{unresolvedUpstreamB, unresolvedUpstreamC, unresolvedUpstreamD}) @@ -175,8 +175,8 @@ func TestInternalUpstreamResolver(t *testing.T) { jobEDestination := resourceURNE jobE := job.NewJob(sampleTenant, specE, jobEDestination, nil, false) - jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), job.Name("job-unknown")).Return(nil, errors.New("not found")) - jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) + jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), job.Name("job-unknown")).Return(nil, errors.New("not found")) + jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) unresolvedUpstreamUnknown := job.NewUpstreamUnresolvedStatic("job-unknown", sampleTenant.ProjectName()) jobWithUnresolvedUpstream := job.NewWithUpstream(jobE, []*job.Upstream{unresolvedUpstreamUnknown, unresolvedUpstreamC}) @@ -203,7 +203,7 @@ func TestInternalUpstreamResolver(t *testing.T) { jobEDestination := resourceURNE jobE := job.NewJob(sampleTenant, specE, jobEDestination, nil, false) - jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) + jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil) unresolvedUpstreamUnknown := job.NewUpstreamUnresolvedStatic("job-unknown", sampleTenant.ProjectName()) jobWithUnresolvedUpstream := job.NewWithUpstream(jobE, []*job.Upstream{unresolvedUpstreamUnknown, unresolvedUpstreamC}) diff --git a/core/job/resolver/upstream_resolver.go b/core/job/resolver/upstream_resolver.go index 472ed5cb79..8fd8b91163 100644 --- a/core/job/resolver/upstream_resolver.go +++ b/core/job/resolver/upstream_resolver.go @@ -39,8 +39,8 @@ type InternalUpstreamResolver interface { type JobRepository interface { ResolveUpstreams(ctx context.Context, projectName tenant.ProjectName, jobNames []job.Name) (map[job.Name][]*job.Upstream, error) - GetAllByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error) - GetByJobName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) + GetAllEnabledByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error) + GetEnabledJobByName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) } func (u UpstreamResolver) CheckStaticResolvable(ctx context.Context, tnnt tenant.Tenant, incomingJobs []*job.Job, logWriter writer.LogWriter) error { diff --git a/core/job/resolver/upstream_resolver_test.go b/core/job/resolver/upstream_resolver_test.go index 12d3047792..f20a65d40c 100644 --- a/core/job/resolver/upstream_resolver_test.go +++ b/core/job/resolver/upstream_resolver_test.go @@ -646,11 +646,19 @@ type JobRepository struct { mock.Mock } -// GetAllByResourceDestination provides a mock function with given fields: ctx, resourceDestination -func (_m *JobRepository) GetAllByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error) { +// GetAllEnabledByResourceDestination provides a mock function with given fields: ctx, resourceDestination +func (_m *JobRepository) GetAllEnabledByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error) { ret := _m.Called(ctx, resourceDestination) + if len(ret) == 0 { + panic("no return value specified for GetAllEnabledByResourceDestination") + } + var r0 []*job.Job + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, resource.URN) ([]*job.Job, error)); ok { + return rf(ctx, resourceDestination) + } if rf, ok := ret.Get(0).(func(context.Context, resource.URN) []*job.Job); ok { r0 = rf(ctx, resourceDestination) } else { @@ -659,7 +667,6 @@ func (_m *JobRepository) GetAllByResourceDestination(ctx context.Context, resour } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, resource.URN) error); ok { r1 = rf(ctx, resourceDestination) } else { @@ -669,11 +676,19 @@ func (_m *JobRepository) GetAllByResourceDestination(ctx context.Context, resour return r0, r1 } -// GetByJobName provides a mock function with given fields: ctx, projectName, jobName -func (_m *JobRepository) GetByJobName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) { +// GetEnabledJobByName provides a mock function with given fields: ctx, projectName, jobName +func (_m *JobRepository) GetEnabledJobByName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) { ret := _m.Called(ctx, projectName, jobName) + if len(ret) == 0 { + panic("no return value specified for GetEnabledJobByName") + } + var r0 *job.Job + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, tenant.ProjectName, job.Name) (*job.Job, error)); ok { + return rf(ctx, projectName, jobName) + } if rf, ok := ret.Get(0).(func(context.Context, tenant.ProjectName, job.Name) *job.Job); ok { r0 = rf(ctx, projectName, jobName) } else { @@ -682,7 +697,6 @@ func (_m *JobRepository) GetByJobName(ctx context.Context, projectName tenant.Pr } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, tenant.ProjectName, job.Name) error); ok { r1 = rf(ctx, projectName, jobName) } else { diff --git a/internal/store/postgres/job/job_repository.go b/internal/store/postgres/job/job_repository.go index e28859cdaa..a98cd033b8 100644 --- a/internal/store/postgres/job/job_repository.go +++ b/internal/store/postgres/job/job_repository.go @@ -26,6 +26,8 @@ const ( task_name, task_config, window_spec, assets, hooks, metadata, destination, sources, project_name, namespace_name, created_at, updated_at` jobColumns = `id, ` + jobColumnsToStore + `, deleted_at, is_dirty` + + enabledOnlyStatement = ` AND state != 'disabled'` ) var changelogMetrics = promauto.NewCounterVec(prometheus.CounterOpts{ @@ -55,7 +57,7 @@ func (j JobRepository) Add(ctx context.Context, jobs []*job.Job) ([]*job.Job, er } func (j JobRepository) insertJobSpec(ctx context.Context, jobEntity *job.Job) error { - existingJob, err := j.get(ctx, jobEntity.ProjectName(), jobEntity.Spec().Name(), false) + existingJob, err := j.get(ctx, jobEntity.ProjectName(), jobEntity.Spec().Name(), false, false) if err != nil { if errors.IsErrorType(err, errors.ErrNotFound) { return j.triggerInsert(ctx, jobEntity) @@ -235,7 +237,7 @@ WHERE } func (j JobRepository) preCheckUpdate(ctx context.Context, jobEntity *job.Job) error { - existingJob, err := j.get(ctx, jobEntity.ProjectName(), jobEntity.Spec().Name(), false) + existingJob, err := j.get(ctx, jobEntity.ProjectName(), jobEntity.Spec().Name(), false, false) if err != nil && errors.IsErrorType(err, errors.ErrNotFound) { return errors.NewError(errors.ErrNotFound, job.EntityJob, fmt.Sprintf("job %s not exists yet", jobEntity.Spec().Name())) } @@ -341,7 +343,7 @@ func (j JobRepository) computeAndPersistChangeLog(ctx context.Context, existingJ } func (j JobRepository) updateAndPersistChangelog(ctx context.Context, incomingJobEntity *job.Job) error { - existingJob, err := j.get(ctx, incomingJobEntity.ProjectName(), incomingJobEntity.Spec().Name(), false) + existingJob, err := j.get(ctx, incomingJobEntity.ProjectName(), incomingJobEntity.Spec().Name(), false, false) if err != nil { return err } @@ -394,7 +396,7 @@ WHERE return nil } -func (j JobRepository) get(ctx context.Context, projectName tenant.ProjectName, jobName job.Name, onlyActiveJob bool) (*Spec, error) { +func (j JobRepository) get(ctx context.Context, projectName tenant.ProjectName, jobName job.Name, onlyActiveJob bool, onlyEnabledJob bool) (*Spec, error) { getJobByNameAtProject := `SELECT ` + jobColumns + ` FROM job WHERE name = $1 AND project_name = $2` if onlyActiveJob { @@ -402,6 +404,10 @@ func (j JobRepository) get(ctx context.Context, projectName tenant.ProjectName, getJobByNameAtProject += jobDeletedFilter } + if onlyEnabledJob { + getJobByNameAtProject += enabledOnlyStatement + } + spec, err := FromRow(j.db.QueryRow(ctx, getJobByNameAtProject, jobName.String(), projectName.String())) if errors.IsErrorType(err, errors.ErrNotFound) { err = errors.NotFound(job.EntityJob, fmt.Sprintf("unable to get job %s", jobName)) @@ -445,6 +451,7 @@ WITH static_upstreams AS ( WHERE project_name = $1 AND name = any ($2) AND j.deleted_at IS NULL + AND state != 'disabled' ), inferred_upstreams AS ( @@ -454,6 +461,7 @@ inferred_upstreams AS ( WHERE project_name = $1 AND name = any ($2) AND j.deleted_at IS NULL + AND state != 'disabled' ) SELECT @@ -471,6 +479,7 @@ JOIN job j ON (su.static_upstream = j.name and su.project_name = j.project_name) OR (su.static_upstream = j.project_name || '/' ||j.name) WHERE j.deleted_at IS NULL +AND j.state != 'disabled' UNION ALL @@ -486,7 +495,8 @@ SELECT false AS upstream_external FROM inferred_upstreams id JOIN job j ON id.source = j.destination -WHERE j.deleted_at IS NULL;` +WHERE j.deleted_at IS NULL +AND j.state != 'disabled';` rows, err := j.db.Query(ctx, query, projectName, jobNames) if err != nil { @@ -619,7 +629,21 @@ func (JobRepository) toUpstreams(storeUpstreams []*JobWithUpstream) ([]*job.Upst } func (j JobRepository) GetByJobName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) { - spec, err := j.get(ctx, projectName, jobName, true) + spec, err := j.get(ctx, projectName, jobName, true, false) + if err != nil { + return nil, err + } + + job, err := specToJob(spec) + if err != nil { + return nil, err + } + + return job, nil +} + +func (j JobRepository) GetEnabledJobByName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) { + spec, err := j.get(ctx, projectName, jobName, true, true) if err != nil { return nil, err } @@ -664,9 +688,20 @@ func (j JobRepository) GetAllByProjectName(ctx context.Context, projectName tena } func (j JobRepository) GetAllByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error) { + return j.getAllByResourceDestination(ctx, resourceDestination, false) +} + +func (j JobRepository) GetAllEnabledByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error) { + return j.getAllByResourceDestination(ctx, resourceDestination, true) +} + +func (j JobRepository) getAllByResourceDestination(ctx context.Context, resourceDestination resource.URN, enabledOnly bool) ([]*job.Job, error) { me := errors.NewMultiError("get all job specs by resource destination") - getAllByDestination := `SELECT ` + jobColumns + ` FROM job WHERE destination = $1 AND deleted_at IS NULL;` + getAllByDestination := `SELECT ` + jobColumns + ` FROM job WHERE destination = $1 AND deleted_at IS NULL` + if enabledOnly { + getAllByDestination += enabledOnlyStatement + } rows, err := j.db.Query(ctx, getAllByDestination, resourceDestination.String()) if err != nil { diff --git a/internal/store/postgres/job/job_repository_test.go b/internal/store/postgres/job/job_repository_test.go index 110eccfede..4908f57664 100644 --- a/internal/store/postgres/job/job_repository_test.go +++ b/internal/store/postgres/job/job_repository_test.go @@ -664,6 +664,56 @@ func TestPostgresJobRepository(t *testing.T) { upstreamB, } + upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) + assert.NoError(t, err) + assert.ElementsMatch(t, expectedUpstreams, upstreams[jobSpecA.Name()]) + }) + t.Run("returns job with only active static and inferred upstreams", func(t *testing.T) { + db := dbSetup() + + tenantDetails, err := tenant.NewTenantDetails(proj, namespace, nil) + assert.NoError(t, err) + + upstreamBName := job.SpecUpstreamNameFrom("test-proj/sample-job-B") + upstreamDName := job.SpecUpstreamNameFrom("test-proj/sample-job-D") + jobAUpstream, _ := job.NewSpecUpstreamBuilder().WithUpstreamNames([]job.SpecUpstreamName{upstreamBName, upstreamDName}).Build() + jobSpecA, _ := job.NewSpecBuilder(jobVersion, "sample-job-A", jobOwner, jobSchedule, customConfig, jobTask). + WithDescription(jobDescription). + WithSpecUpstream(jobAUpstream). + Build() + jobA := job.NewJob(sampleTenant, jobSpecA, resourceURNA, []resource.URN{resourceURNC, resourceURNE}, false) + + jobSpecB, err := job.NewSpecBuilder(jobVersion, "sample-job-B", jobOwner, jobSchedule, customConfig, jobTask).WithDescription(jobDescription).Build() + assert.NoError(t, err) + jobB := job.NewJob(sampleTenant, jobSpecB, resourceURNB, nil, false) + + jobSpecC, err := job.NewSpecBuilder(jobVersion, "sample-job-C", jobOwner, jobSchedule, customConfig, jobTask).WithDescription(jobDescription).Build() + assert.NoError(t, err) + jobC := job.NewJob(sampleTenant, jobSpecC, resourceURNC, nil, false) + + jobSpecD, err := job.NewSpecBuilder(jobVersion, "sample-job-D", jobOwner, jobSchedule, customConfig, jobTask).WithDescription(jobDescription).Build() + assert.NoError(t, err) + jobD := job.NewJob(sampleTenant, jobSpecD, resourceURND, nil, false) + + jobSpecE, err := job.NewSpecBuilder(jobVersion, "sample-job-E", jobOwner, jobSchedule, customConfig, jobTask).WithDescription(jobDescription).Build() + assert.NoError(t, err) + jobE := job.NewJob(sampleTenant, jobSpecE, resourceURNE, nil, false) + + jobRepo := postgres.NewJobRepository(db) + _, err = jobRepo.Add(ctx, []*job.Job{jobA, jobB, jobC, jobD, jobE}) + assert.NoError(t, err) + + err = jobRepo.SyncState(ctx, tenantDetails.ToTenant(), []job.Name{jobSpecD.Name(), jobSpecE.Name()}, nil) + assert.NoError(t, err) + + upstreamB := job.NewUpstreamResolved(jobSpecB.Name(), "", jobB.Destination(), tenantDetails.ToTenant(), "static", taskName, false) + upstreamC := job.NewUpstreamResolved(jobSpecC.Name(), "", jobC.Destination(), tenantDetails.ToTenant(), "inferred", taskName, false) + + expectedUpstreams := []*job.Upstream{ + upstreamB, + upstreamC, + } + upstreams, err := jobRepo.ResolveUpstreams(ctx, proj.Name(), []job.Name{jobSpecA.Name()}) assert.NoError(t, err) assert.ElementsMatch(t, expectedUpstreams, upstreams[jobSpecA.Name()]) @@ -974,6 +1024,32 @@ func TestPostgresJobRepository(t *testing.T) { }) }) + t.Run("GetEnabledJobByName", func(t *testing.T) { + t.Run("should not return job if it is disabled", func(t *testing.T) { + db := dbSetup() + + jobSpecA, err := job.NewSpecBuilder(jobVersion, "sample-job-A", jobOwner, jobSchedule, customConfig, jobTask).WithDescription(jobDescription).Build() + assert.NoError(t, err) + jobA := job.NewJob(sampleTenant, jobSpecA, resourceURNA, []resource.URN{resourceURNB, resourceURNC}, false) + + jobRepo := postgres.NewJobRepository(db) + _, err = jobRepo.Add(ctx, []*job.Job{jobA}) + assert.NoError(t, err) + + actual, err := jobRepo.GetEnabledJobByName(ctx, sampleTenant.ProjectName(), "sample-job-A") + assert.NoError(t, err) + assert.NotNil(t, actual) + assert.Equal(t, jobA, actual) + + err = jobRepo.SyncState(ctx, sampleTenant, []job.Name{jobSpecA.Name()}, nil) + assert.NoError(t, err) + + actual, err = jobRepo.GetEnabledJobByName(ctx, sampleTenant.ProjectName(), "sample-job-A") + assert.Error(t, err) + assert.Nil(t, actual) + }) + }) + t.Run("GetAllByProjectName", func(t *testing.T) { t.Run("returns no error when get all jobs success", func(t *testing.T) { db := dbSetup() @@ -1062,6 +1138,30 @@ func TestPostgresJobRepository(t *testing.T) { }) }) + t.Run("GetAllEnabledByResourceDestination", func(t *testing.T) { + t.Run("returns only enabled jobs excluding the disabled jobs", func(t *testing.T) { + db := dbSetup() + + jobSpecA, err := job.NewSpecBuilder(jobVersion, "sample-job-A", jobOwner, jobSchedule, customConfig, jobTask).WithDescription(jobDescription).Build() + assert.NoError(t, err) + jobA := job.NewJob(sampleTenant, jobSpecA, resourceURNY, []resource.URN{resourceURNB, resourceURNC}, false) + jobSpecB, err := job.NewSpecBuilder(jobVersion, "sample-job-B", jobOwner, jobSchedule, customConfig, jobTask).WithDescription(jobDescription).Build() + assert.NoError(t, err) + jobB := job.NewJob(sampleTenant, jobSpecB, resourceURNY, []resource.URN{resourceURNC}, false) + + jobRepo := postgres.NewJobRepository(db) + _, err = jobRepo.Add(ctx, []*job.Job{jobA, jobB}) + assert.NoError(t, err) + + err = jobRepo.SyncState(ctx, sampleTenant, []job.Name{jobSpecB.Name()}, nil) + assert.NoError(t, err) + + actual, err := jobRepo.GetAllEnabledByResourceDestination(ctx, resourceURNY) + assert.NoError(t, err) + assert.Equal(t, []*job.Job{jobA}, actual) + }) + }) + t.Run("GetUpstreams", func(t *testing.T) { t.Run("returns upstream given project and job name", func(t *testing.T) { // TODO: test is failing for nullable fields in upstream