Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/job/resolver/internal_upstream_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (i internalUpstreamResolver) resolveInferredUpstream(ctx context.Context, s
var internalUpstream []*job.Upstream
me := errors.NewMultiError("resolve internal inferred upstream errors")
for _, source := range sources {
jobUpstreams, err := i.jobRepository.GetAllByResourceDestination(ctx, source)
jobUpstreams, err := i.jobRepository.GetAllEnabledByResourceDestination(ctx, source)
me.Append(err)
if len(jobUpstreams) == 0 {
continue
Expand All @@ -91,7 +91,7 @@ func (i internalUpstreamResolver) resolveStaticUpstream(ctx context.Context, pro
me.Append(err)
continue
}
jobUpstream, err := i.jobRepository.GetByJobName(ctx, projectName, upstreamJobName)
jobUpstream, err := i.jobRepository.GetEnabledJobByName(ctx, projectName, upstreamJobName)
if err != nil || jobUpstream == nil {
me.Append(err)
continue
Expand Down
26 changes: 13 additions & 13 deletions core/job/resolver/internal_upstream_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func TestInternalUpstreamResolver(t *testing.T) {
logWriter := new(mockWriter)
defer logWriter.AssertExpectations(t)

jobRepo.On("GetAllByResourceDestination", ctx, jobASources[0]).Return([]*job.Job{jobB}, nil)
jobRepo.On("GetAllByResourceDestination", ctx, jobASources[1]).Return([]*job.Job{}, nil)
jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)
jobRepo.On("GetAllEnabledByResourceDestination", ctx, jobASources[0]).Return([]*job.Job{jobB}, nil)
jobRepo.On("GetAllEnabledByResourceDestination", ctx, jobASources[1]).Return([]*job.Job{}, nil)
jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)

jobWithUnresolvedUpstream := job.NewWithUpstream(jobA, []*job.Upstream{unresolvedUpstreamB, unresolvedUpstreamC, unresolvedUpstreamD})
expectedJobWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{internalUpstreamB, internalUpstreamC, unresolvedUpstreamD})
Expand All @@ -94,8 +94,8 @@ func TestInternalUpstreamResolver(t *testing.T) {
unresolvedUpstreamCStatic := job.NewUpstreamUnresolvedStatic("job-C", sampleTenant.ProjectName())
internalUpstreamCStatic := job.NewUpstreamResolved("job-C", "", resourceURNC, sampleTenant, "static", taskName, false)

jobRepo.On("GetAllByResourceDestination", ctx, jobDSources[0]).Return([]*job.Job{jobC}, nil)
jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)
jobRepo.On("GetAllEnabledByResourceDestination", ctx, jobDSources[0]).Return([]*job.Job{jobC}, nil)
jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)

jobWithUnresolvedUpstream := job.NewWithUpstream(jobD, []*job.Upstream{unresolvedUpstreamCStatic, unresolvedUpstreamCInferred})
expectedJobWithUpstream := job.NewWithUpstream(jobD, []*job.Upstream{internalUpstreamCStatic})
Expand All @@ -114,7 +114,7 @@ func TestInternalUpstreamResolver(t *testing.T) {
jobXDestination := resourceURNX
jobX := job.NewJob(sampleTenant, specX, jobXDestination, []resource.URN{resourceURNB}, false)

jobRepo.On("GetAllByResourceDestination", ctx, jobX.Sources()[0]).Return([]*job.Job{jobB}, nil)
jobRepo.On("GetAllEnabledByResourceDestination", ctx, jobX.Sources()[0]).Return([]*job.Job{jobB}, nil)

jobWithUnresolvedUpstream := job.NewWithUpstream(jobX, []*job.Upstream{unresolvedUpstreamB})
expectedJobWithUpstream := job.NewWithUpstream(jobX, []*job.Upstream{internalUpstreamB})
Expand All @@ -133,7 +133,7 @@ func TestInternalUpstreamResolver(t *testing.T) {
jobXDestination := resourceURNX
jobX := job.NewJob(sampleTenant, specX, jobXDestination, nil, false)

jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)
jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)

jobWithUnresolvedUpstream := job.NewWithUpstream(jobX, []*job.Upstream{unresolvedUpstreamC})
expectedJobWithUpstream := job.NewWithUpstream(jobX, []*job.Upstream{internalUpstreamC})
Expand All @@ -149,10 +149,10 @@ func TestInternalUpstreamResolver(t *testing.T) {
logWriter := new(mockWriter)
defer logWriter.AssertExpectations(t)

jobRepo.On("GetAllByResourceDestination", ctx, jobASources[0]).Return([]*job.Job{}, errors.New("internal error"))
jobRepo.On("GetAllByResourceDestination", ctx, jobASources[1]).Return([]*job.Job{}, nil)
jobRepo.On("GetAllEnabledByResourceDestination", ctx, jobASources[0]).Return([]*job.Job{}, errors.New("internal error"))
jobRepo.On("GetAllEnabledByResourceDestination", ctx, jobASources[1]).Return([]*job.Job{}, nil)

jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)
jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)

jobWithUnresolvedUpstream := job.NewWithUpstream(jobA, []*job.Upstream{unresolvedUpstreamB, unresolvedUpstreamC, unresolvedUpstreamD})

Expand All @@ -175,8 +175,8 @@ func TestInternalUpstreamResolver(t *testing.T) {
jobEDestination := resourceURNE
jobE := job.NewJob(sampleTenant, specE, jobEDestination, nil, false)

jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), job.Name("job-unknown")).Return(nil, errors.New("not found"))
jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)
jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), job.Name("job-unknown")).Return(nil, errors.New("not found"))
jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)

unresolvedUpstreamUnknown := job.NewUpstreamUnresolvedStatic("job-unknown", sampleTenant.ProjectName())
jobWithUnresolvedUpstream := job.NewWithUpstream(jobE, []*job.Upstream{unresolvedUpstreamUnknown, unresolvedUpstreamC})
Expand All @@ -203,7 +203,7 @@ func TestInternalUpstreamResolver(t *testing.T) {
jobEDestination := resourceURNE
jobE := job.NewJob(sampleTenant, specE, jobEDestination, nil, false)

jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)
jobRepo.On("GetEnabledJobByName", ctx, sampleTenant.ProjectName(), specC.Name()).Return(jobC, nil)

unresolvedUpstreamUnknown := job.NewUpstreamUnresolvedStatic("job-unknown", sampleTenant.ProjectName())
jobWithUnresolvedUpstream := job.NewWithUpstream(jobE, []*job.Upstream{unresolvedUpstreamUnknown, unresolvedUpstreamC})
Expand Down
4 changes: 2 additions & 2 deletions core/job/resolver/upstream_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type InternalUpstreamResolver interface {
type JobRepository interface {
ResolveUpstreams(ctx context.Context, projectName tenant.ProjectName, jobNames []job.Name) (map[job.Name][]*job.Upstream, error)

GetAllByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error)
GetByJobName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error)
GetAllEnabledByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error)
GetEnabledJobByName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error)
}

func (u UpstreamResolver) CheckStaticResolvable(ctx context.Context, tnnt tenant.Tenant, incomingJobs []*job.Job, logWriter writer.LogWriter) error {
Expand Down
26 changes: 20 additions & 6 deletions core/job/resolver/upstream_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,11 +646,19 @@ type JobRepository struct {
mock.Mock
}

// GetAllByResourceDestination provides a mock function with given fields: ctx, resourceDestination
func (_m *JobRepository) GetAllByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error) {
// GetAllEnabledByResourceDestination provides a mock function with given fields: ctx, resourceDestination
func (_m *JobRepository) GetAllEnabledByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error) {
ret := _m.Called(ctx, resourceDestination)

if len(ret) == 0 {
panic("no return value specified for GetAllEnabledByResourceDestination")
}

var r0 []*job.Job
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, resource.URN) ([]*job.Job, error)); ok {
return rf(ctx, resourceDestination)
}
if rf, ok := ret.Get(0).(func(context.Context, resource.URN) []*job.Job); ok {
r0 = rf(ctx, resourceDestination)
} else {
Expand All @@ -659,7 +667,6 @@ func (_m *JobRepository) GetAllByResourceDestination(ctx context.Context, resour
}
}

var r1 error
if rf, ok := ret.Get(1).(func(context.Context, resource.URN) error); ok {
r1 = rf(ctx, resourceDestination)
} else {
Expand All @@ -669,11 +676,19 @@ func (_m *JobRepository) GetAllByResourceDestination(ctx context.Context, resour
return r0, r1
}

// GetByJobName provides a mock function with given fields: ctx, projectName, jobName
func (_m *JobRepository) GetByJobName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) {
// GetEnabledJobByName provides a mock function with given fields: ctx, projectName, jobName
func (_m *JobRepository) GetEnabledJobByName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) {
ret := _m.Called(ctx, projectName, jobName)

if len(ret) == 0 {
panic("no return value specified for GetEnabledJobByName")
}

var r0 *job.Job
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, tenant.ProjectName, job.Name) (*job.Job, error)); ok {
return rf(ctx, projectName, jobName)
}
if rf, ok := ret.Get(0).(func(context.Context, tenant.ProjectName, job.Name) *job.Job); ok {
r0 = rf(ctx, projectName, jobName)
} else {
Expand All @@ -682,7 +697,6 @@ func (_m *JobRepository) GetByJobName(ctx context.Context, projectName tenant.Pr
}
}

var r1 error
if rf, ok := ret.Get(1).(func(context.Context, tenant.ProjectName, job.Name) error); ok {
r1 = rf(ctx, projectName, jobName)
} else {
Expand Down
49 changes: 42 additions & 7 deletions internal/store/postgres/job/job_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
task_name, task_config, window_spec, assets, hooks, metadata, destination, sources, project_name, namespace_name, created_at, updated_at`

jobColumns = `id, ` + jobColumnsToStore + `, deleted_at, is_dirty`

enabledOnlyStatement = ` AND state != 'disabled'`
)

var changelogMetrics = promauto.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -55,7 +57,7 @@ func (j JobRepository) Add(ctx context.Context, jobs []*job.Job) ([]*job.Job, er
}

func (j JobRepository) insertJobSpec(ctx context.Context, jobEntity *job.Job) error {
existingJob, err := j.get(ctx, jobEntity.ProjectName(), jobEntity.Spec().Name(), false)
existingJob, err := j.get(ctx, jobEntity.ProjectName(), jobEntity.Spec().Name(), false, false)
if err != nil {
if errors.IsErrorType(err, errors.ErrNotFound) {
return j.triggerInsert(ctx, jobEntity)
Expand Down Expand Up @@ -235,7 +237,7 @@ WHERE
}

func (j JobRepository) preCheckUpdate(ctx context.Context, jobEntity *job.Job) error {
existingJob, err := j.get(ctx, jobEntity.ProjectName(), jobEntity.Spec().Name(), false)
existingJob, err := j.get(ctx, jobEntity.ProjectName(), jobEntity.Spec().Name(), false, false)
if err != nil && errors.IsErrorType(err, errors.ErrNotFound) {
return errors.NewError(errors.ErrNotFound, job.EntityJob, fmt.Sprintf("job %s not exists yet", jobEntity.Spec().Name()))
}
Expand Down Expand Up @@ -341,7 +343,7 @@ func (j JobRepository) computeAndPersistChangeLog(ctx context.Context, existingJ
}

func (j JobRepository) updateAndPersistChangelog(ctx context.Context, incomingJobEntity *job.Job) error {
existingJob, err := j.get(ctx, incomingJobEntity.ProjectName(), incomingJobEntity.Spec().Name(), false)
existingJob, err := j.get(ctx, incomingJobEntity.ProjectName(), incomingJobEntity.Spec().Name(), false, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -394,14 +396,18 @@ WHERE
return nil
}

func (j JobRepository) get(ctx context.Context, projectName tenant.ProjectName, jobName job.Name, onlyActiveJob bool) (*Spec, error) {
func (j JobRepository) get(ctx context.Context, projectName tenant.ProjectName, jobName job.Name, onlyActiveJob bool, onlyEnabledJob bool) (*Spec, error) {
getJobByNameAtProject := `SELECT ` + jobColumns + ` FROM job WHERE name = $1 AND project_name = $2`

if onlyActiveJob {
jobDeletedFilter := " AND deleted_at IS NULL"
getJobByNameAtProject += jobDeletedFilter
}

if onlyEnabledJob {
getJobByNameAtProject += enabledOnlyStatement
}

spec, err := FromRow(j.db.QueryRow(ctx, getJobByNameAtProject, jobName.String(), projectName.String()))
if errors.IsErrorType(err, errors.ErrNotFound) {
err = errors.NotFound(job.EntityJob, fmt.Sprintf("unable to get job %s", jobName))
Expand Down Expand Up @@ -445,6 +451,7 @@ WITH static_upstreams AS (
WHERE project_name = $1
AND name = any ($2)
AND j.deleted_at IS NULL
AND state != 'disabled'
),

inferred_upstreams AS (
Expand All @@ -454,6 +461,7 @@ inferred_upstreams AS (
WHERE project_name = $1
AND name = any ($2)
AND j.deleted_at IS NULL
AND state != 'disabled'
)

SELECT
Expand All @@ -471,6 +479,7 @@ JOIN job j ON
(su.static_upstream = j.name and su.project_name = j.project_name) OR
(su.static_upstream = j.project_name || '/' ||j.name)
WHERE j.deleted_at IS NULL
AND j.state != 'disabled'

UNION ALL

Expand All @@ -486,7 +495,8 @@ SELECT
false AS upstream_external
FROM inferred_upstreams id
JOIN job j ON id.source = j.destination
WHERE j.deleted_at IS NULL;`
WHERE j.deleted_at IS NULL
AND j.state != 'disabled';`

rows, err := j.db.Query(ctx, query, projectName, jobNames)
if err != nil {
Expand Down Expand Up @@ -619,7 +629,21 @@ func (JobRepository) toUpstreams(storeUpstreams []*JobWithUpstream) ([]*job.Upst
}

func (j JobRepository) GetByJobName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) {
spec, err := j.get(ctx, projectName, jobName, true)
spec, err := j.get(ctx, projectName, jobName, true, false)
if err != nil {
return nil, err
}

job, err := specToJob(spec)
if err != nil {
return nil, err
}

return job, nil
}

func (j JobRepository) GetEnabledJobByName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error) {
spec, err := j.get(ctx, projectName, jobName, true, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -664,9 +688,20 @@ func (j JobRepository) GetAllByProjectName(ctx context.Context, projectName tena
}

func (j JobRepository) GetAllByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error) {
return j.getAllByResourceDestination(ctx, resourceDestination, false)
}

func (j JobRepository) GetAllEnabledByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error) {
return j.getAllByResourceDestination(ctx, resourceDestination, true)
}

func (j JobRepository) getAllByResourceDestination(ctx context.Context, resourceDestination resource.URN, enabledOnly bool) ([]*job.Job, error) {
me := errors.NewMultiError("get all job specs by resource destination")

getAllByDestination := `SELECT ` + jobColumns + ` FROM job WHERE destination = $1 AND deleted_at IS NULL;`
getAllByDestination := `SELECT ` + jobColumns + ` FROM job WHERE destination = $1 AND deleted_at IS NULL`
if enabledOnly {
getAllByDestination += enabledOnlyStatement
}

rows, err := j.db.Query(ctx, getAllByDestination, resourceDestination.String())
if err != nil {
Expand Down
Loading
Loading