Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/goto/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "5e7cf7119e5ef2007c8323712c9d08c11863813d"
PROTON_COMMIT := "f00fdefd2767778ded4905dfbed83c77fef637b7"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down
41 changes: 41 additions & 0 deletions client/cmd/job/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"github.com/goto/optimus/client/cmd/internal/connection"
"github.com/goto/optimus/client/cmd/internal/logger"
"github.com/goto/optimus/client/cmd/internal/plan"
lerrors "github.com/goto/optimus/client/local/errors"
"github.com/goto/optimus/client/local/model"
"github.com/goto/optimus/client/local/specio"
"github.com/goto/optimus/config"
"github.com/goto/optimus/core/job/dto"
pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1"
)

Expand Down Expand Up @@ -213,13 +215,52 @@ func (v *validateCommand) executeServerValidation(namespace *config.Namespace, j
success = false
}

if err := v.validateJobSourceIsDeprecated(fromServer, response); err != nil {
v.logger.Error("error returned when validating deprecated sources: %v", err)
return err
}

if !success {
return errors.New("encountered one or more errors")
}

return nil
}

// validateJobSourceIsDeprecated this only support for job spec is from local, due to needs check whether the source is existing or new
func (*validateCommand) validateJobSourceIsDeprecated(fromServer bool, response *pb.ValidateResponse) error {
if fromServer {
return nil
}

var errList []string
var warnList []string

for _, results := range response.GetResultsByJobName() {
for _, result := range results.GetResults() {
skip := result == nil || result.GetSuccess() || !dto.StageSourceDeprecationValidation.Equal(result.GetName())
if skip {
continue
}

switch result.GetLevel() {
case pb.Level_LEVEL_ERROR:
errList = append(errList, result.GetMessages()...)
case pb.Level_LEVEL_WARNING:
warnList = append(warnList, result.GetMessages()...)
}
}
}

if len(errList) > 0 {
return lerrors.NewValidationErrorf("new job sources is deprecated: [%s]", strings.Join(errList, ", "))
}
if len(warnList) > 0 {
return lerrors.NewWarnErrorf("existing job sources is deprecated: [%s]", strings.Join(warnList, ", "))
}
return nil
}

func (v *validateCommand) processResponse(response *pb.ValidateResponse) error {
if response == nil {
return nil
Expand Down
31 changes: 31 additions & 0 deletions client/local/errors/command_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package errors

import "fmt"

const (
ExitCodeWarn = 10
ExitCodeValidationError = 30
)

// CmdError is a custom error type for command errors, it will contain the error and the exit code
type CmdError struct {
Cause error
Code int
}

func (e *CmdError) Error() string { return e.Cause.Error() }

func NewCmdError(cause error, code int) *CmdError {
return &CmdError{
Cause: cause,
Code: code,
}
}

func NewWarnErrorf(format string, args ...any) *CmdError {
return NewCmdError(fmt.Errorf(format, args...), ExitCodeWarn)
}

func NewValidationErrorf(format string, args ...any) *CmdError {
return NewCmdError(fmt.Errorf(format, args...), ExitCodeValidationError)
}
21 changes: 12 additions & 9 deletions core/job/dto/stage.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package dto

const (
StageCyclicValidation ValidateStage = "cyclic validation"
StageDeletionValidation ValidateStage = "validation for deletion"
StageDestinationValidation ValidateStage = "destination validation"
StagePreparation ValidateStage = "validation preparation"
StageRunCompileValidation ValidateStage = "compile validation for run"
StageSourceValidation ValidateStage = "source validation"
StageTenantValidation ValidateStage = "tenant validation"
StageUpstreamValidation ValidateStage = "upstream validation"
StageWindowValidation ValidateStage = "window validation"
StageCyclicValidation ValidateStage = "cyclic validation"
StageDeletionValidation ValidateStage = "validation for deletion"
StageDestinationValidation ValidateStage = "destination validation"
StagePreparation ValidateStage = "validation preparation"
StageRunCompileValidation ValidateStage = "compile validation for run"
StageSourceValidation ValidateStage = "source validation"
StageTenantValidation ValidateStage = "tenant validation"
StageUpstreamValidation ValidateStage = "upstream validation"
StageWindowValidation ValidateStage = "window validation"
StageSourceDeprecationValidation ValidateStage = "source validation for deprecation"
)

type ValidateStage string

func (v ValidateStage) String() string {
return string(v)
}

func (v ValidateStage) Equal(compare string) bool { return string(v) == compare }
12 changes: 12 additions & 0 deletions core/job/dto/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,16 @@ type ValidateResult struct {
Stage ValidateStage
Messages []string
Success bool
Level *ValidateLevel
}

type ValidateLevel string

func (level ValidateLevel) String() string { return string(level) }

func (level ValidateLevel) Ptr() *ValidateLevel { return &level }

const (
ValidateLevelError ValidateLevel = "error"
ValidateLevelWarning ValidateLevel = "warning"
)
18 changes: 18 additions & 0 deletions core/job/handler/v1beta1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,23 @@ func raiseJobEventMetric(jobTenant tenant.Tenant, state string, metricValue int)
).Add(float64(metricValue))
}

func toValidateLevelProto(level *dto.ValidateLevel) *pb.Level {
if level == nil {
return nil
}

var levelPb pb.Level
switch *level {
case dto.ValidateLevelError:
levelPb = pb.Level_LEVEL_ERROR
case dto.ValidateLevelWarning:
levelPb = pb.Level_LEVEL_WARNING
default:
levelPb = pb.Level_LEVEL_UNSPECIFIED
}
return &levelPb
}

func toValidateResultProto(result map[job.Name][]dto.ValidateResult) map[string]*pb.ValidateResponse_ResultList {
output := make(map[string]*pb.ValidateResponse_ResultList)
for jobName, validateResults := range result {
Expand All @@ -803,6 +820,7 @@ func toValidateResultProto(result map[job.Name][]dto.ValidateResult) map[string]
Name: rst.Stage.String(),
Messages: rst.Messages,
Success: rst.Success,
Level: toValidateLevelProto(rst.Level),
}
}

Expand Down
91 changes: 91 additions & 0 deletions core/job/service/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type JobRunInputCompiler interface {
type ResourceExistenceChecker interface {
GetByURN(ctx context.Context, tnnt tenant.Tenant, urn resource.URN) (*resource.Resource, error)
ExistInStore(ctx context.Context, tnnt tenant.Tenant, urn resource.URN) (bool, error)
GetDeprecated(ctx context.Context, tnnt tenant.Tenant, urns ...resource.URN) ([]*resource.Resource, error)
}

func (j *JobService) Add(ctx context.Context, jobTenant tenant.Tenant, specs []*job.Spec) ([]job.Name, error) {
Expand Down Expand Up @@ -1612,6 +1613,8 @@ func (j *JobService) validateOneJob(ctx context.Context, tenantDetails *tenant.W
result = j.validateSource(ctx, tenantDetails, subjectJob.Spec())
output = append(output, result)

output = append(output, j.validateSourcesIsDeprecated(ctx, tenantDetails.ToTenant(), subjectJob))

result = j.validateWindow(tenantDetails, subjectJob.Spec().WindowConfig())
output = append(output, result)

Expand Down Expand Up @@ -1961,6 +1964,94 @@ func (*JobService) validateWindow(tenantDetails *tenant.WithDetails, windowConfi
}
}

func (j *JobService) validateSourcesIsDeprecated(ctx context.Context, tnnt tenant.Tenant, subjectJob *job.Job) dto.ValidateResult {
const stage = dto.StageSourceDeprecationValidation

deprecatedResources, err := j.resourceChecker.GetDeprecated(ctx, tnnt, subjectJob.Sources()...)
if err != nil {
j.logger.Error("error getting deprecated resources %s: %s", resource.URNs(subjectJob.Sources()).String(), err)
return dto.ValidateResult{
Stage: stage,
Messages: []string{fmt.Sprintf("error getting deprecated resources %s for job %s: %s", resource.URNs(subjectJob.Sources()).String(), subjectJob.FullName(), err)},
Success: false,
}
}

if len(deprecatedResources) > 0 {
return dto.ValidateResult{
Stage: stage,
Messages: j.buildDeprecatedSourceMessages(subjectJob, deprecatedResources),
Success: false,
Level: j.getJobSourceDeprecatedValidateLevel(ctx, tnnt, subjectJob, resource.Resources(deprecatedResources).GetURNs()),
}
}

return dto.ValidateResult{
Stage: stage,
Messages: []string{"no issue"},
Success: true,
}
}

func (*JobService) buildDeprecatedSourceMessages(subjectJob *job.Job, deprecatedResources resource.Resources) []string {
builder := new(strings.Builder)
messages := make([]string, 0, len(deprecatedResources))

for _, deprecated := range deprecatedResources {
if deprecated.GetDeprecationInfo() == nil {
continue
}

builder.WriteString("job ")
builder.WriteString(subjectJob.FullName())
builder.WriteString(" with source ")
builder.WriteString(deprecated.URN().String())
if deprecated.GetDeprecationInfo().Date.After(time.Now()) {
builder.WriteString(" will be deprecated at ")
} else {
builder.WriteString(" has deprecated at ")
}
builder.WriteString(deprecated.GetDeprecationInfo().Date.Format(time.DateOnly))
builder.WriteString(" with reason: ")
builder.WriteString(deprecated.GetDeprecationInfo().Reason)
if deprecated.GetDeprecationInfo().ReplacementTable != "" {
builder.WriteString(" and will be replaced by ")
builder.WriteString(deprecated.GetDeprecationInfo().ReplacementTable)
}

messages = append(messages, builder.String())
builder.Reset()
}

return messages
}

func (j *JobService) getJobSourceDeprecatedValidateLevel(ctx context.Context, tnnt tenant.Tenant, subjectJob *job.Job, deprecatedURNs []resource.URN) *dto.ValidateLevel {
if len(deprecatedURNs) == 0 {
return nil
}

currentLevel := dto.ValidateLevelError
existingJob, err := j.jobRepo.GetByJobName(ctx, tnnt.ProjectName(), subjectJob.Spec().Name())
if err != nil {
j.logger.Error("error getting existing job %s: %s", subjectJob.Spec().Name(), err)
return &currentLevel
}

existingSourceByURN := make(map[resource.URN]struct{})
for _, source := range existingJob.Sources() {
existingSourceByURN[source] = struct{}{}
}

for _, deprecatedURN := range deprecatedURNs {
_, existingSourceExists := existingSourceByURN[deprecatedURN]
if existingSourceExists {
currentLevel = dto.ValidateLevelWarning
}
}
return &currentLevel
}

func registerJobValidationMetric(tnnt tenant.Tenant, stage dto.ValidateStage, success bool) {
job.ValidationMetric.WithLabelValues(
tnnt.ProjectName().String(),
Expand Down
Loading
Loading