diff --git a/go.mod b/go.mod index b3009551..328657fe 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible - github.com/flyteorg/flyteidl v1.5.14 + github.com/flyteorg/flyteidl v1.5.21 github.com/flyteorg/flyteplugins v1.0.67 github.com/flyteorg/flytepropeller v1.1.98 github.com/flyteorg/flytestdlib v1.0.22 diff --git a/go.sum b/go.sum index ed4b6ac8..25b14ed8 100644 --- a/go.sum +++ b/go.sum @@ -293,8 +293,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.14 h1:+3ewipoOp82fPyIVgvvrMq1lorl5Kz3Lh6sh/a9+loI= -github.com/flyteorg/flyteidl v1.5.14/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flyteidl v1.5.21 h1:zP1byUlNFqstTe7Io1DiiNgNf+mZAVmGZM04oIUA5kU= +github.com/flyteorg/flyteidl v1.5.21/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE= github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA= github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA= diff --git a/pkg/manager/impl/task_execution_manager.go b/pkg/manager/impl/task_execution_manager.go index ab0107f7..6e20c837 100644 --- a/pkg/manager/impl/task_execution_manager.go +++ b/pkg/manager/impl/task_execution_manager.go @@ -5,6 +5,10 @@ import ( "fmt" "strconv" + "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc/codes" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/contextutils" @@ -12,9 +16,6 @@ import ( "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" "github.com/flyteorg/flytestdlib/storage" - "github.com/golang/protobuf/proto" - "github.com/prometheus/client_golang/prometheus" - "google.golang.org/grpc/codes" cloudeventInterfaces "github.com/flyteorg/flyteadmin/pkg/async/cloudevent/interfaces" notificationInterfaces "github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces" @@ -189,7 +190,7 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req return nil, err } - if request.Event.Phase == core.TaskExecution_RUNNING && request.Event.PhaseVersion == 0 { + if request.Event.Phase == core.TaskExecution_RUNNING && request.Event.PhaseVersion == 0 { // TODO: need to be careful about missing inc/decs m.metrics.ActiveTaskExecutions.Inc() } else if common.IsTaskExecutionTerminal(request.Event.Phase) && request.Event.PhaseVersion == 0 { m.metrics.ActiveTaskExecutions.Dec() diff --git a/pkg/repositories/transformers/task_execution.go b/pkg/repositories/transformers/task_execution.go index edfc32b1..b12a8669 100644 --- a/pkg/repositories/transformers/task_execution.go +++ b/pkg/repositories/transformers/task_execution.go @@ -153,19 +153,27 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode CreatedAt: input.Request.Event.OccurredAt, Logs: input.Request.Event.Logs, CustomInfo: input.Request.Event.CustomInfo, - Reason: input.Request.Event.Reason, TaskType: input.Request.Event.TaskType, Metadata: metadata, EventVersion: input.Request.Event.EventVersion, } - if len(input.Request.Event.Reason) > 0 { + if len(input.Request.Event.Reasons) > 0 { + for _, reason := range input.Request.Event.Reasons { + closure.Reasons = append(closure.Reasons, &admin.Reason{ + OccurredAt: reason.OccurredAt, + Message: reason.Reason, + }) + } + closure.Reason = input.Request.Event.Reasons[len(input.Request.Event.Reasons)-1].Reason + } else if len(input.Request.Event.Reason) > 0 { closure.Reasons = []*admin.Reason{ - &admin.Reason{ + { OccurredAt: input.Request.Event.OccurredAt, Message: input.Request.Event.Reason, }, } + closure.Reason = input.Request.Event.Reason } eventPhase := input.Request.Event.Phase @@ -386,7 +394,17 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE } taskExecutionClosure.UpdatedAt = reportedAt taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs) - if len(request.Event.Reason) > 0 { + if len(request.Event.Reasons) > 0 { + for _, reason := range request.Event.Reasons { + taskExecutionClosure.Reasons = append( + taskExecutionClosure.Reasons, + &admin.Reason{ + OccurredAt: reason.OccurredAt, + Message: reason.Reason, + }) + } + taskExecutionClosure.Reason = request.Event.Reasons[len(request.Event.Reasons)-1].Reason + } else if len(request.Event.Reason) > 0 { if taskExecutionClosure.Reason != request.Event.Reason { // by tracking a time-series of reasons we increase the size of the TaskExecutionClosure in scenarios where // a task reports a large number of unique reasons. if this size increase becomes problematic we this logic diff --git a/pkg/repositories/transformers/task_execution_test.go b/pkg/repositories/transformers/task_execution_test.go index e3155b12..b196cf01 100644 --- a/pkg/repositories/transformers/task_execution_test.go +++ b/pkg/repositories/transformers/task_execution_test.go @@ -285,7 +285,7 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) { UpdatedAt: taskEventOccurredAtProto, Reason: "Task was scheduled", Reasons: []*admin.Reason{ - &admin.Reason{ + { OccurredAt: taskEventOccurredAtProto, Message: "Task was scheduled", }, @@ -406,6 +406,93 @@ func TestCreateTaskExecutionModelRunning(t *testing.T) { }, taskExecutionModel) } +func TestCreateTaskExecutionModelSingleEvents(t *testing.T) { + taskExecutionModel, err := CreateTaskExecutionModel(context.TODO(), CreateTaskExecutionModelInput{ + Request: &admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + TaskId: sampleTaskID, + ParentNodeExecutionId: sampleNodeExecID, + Phase: core.TaskExecution_RUNNING, + PhaseVersion: uint32(2), + RetryAttempt: 1, + InputValue: &event.TaskExecutionEvent_InputUri{ + InputUri: testInputURI, + }, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: taskEventOccurredAtProto, + Reason: "Task event 1", + }, + }, + }) + assert.Nil(t, err) + + expectedClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + CreatedAt: taskEventOccurredAtProto, + UpdatedAt: taskEventOccurredAtProto, + Reason: "Task event 1", + Reasons: []*admin.Reason{ + {OccurredAt: taskEventOccurredAtProto, Message: "Task event 1"}, + }, + } + expectedClosureBytes, err := proto.Marshal(expectedClosure) + assert.Nil(t, err) + assert.Equal(t, expectedClosureBytes, taskExecutionModel.Closure) +} + +func TestCreateTaskExecutionModelBatchedEvents(t *testing.T) { + secondTaskEventOccurredAt := taskEventOccurredAt.Add(time.Second) + secondTaskEventOccurredAtProto, _ := ptypes.TimestampProto(secondTaskEventOccurredAt) + taskExecutionModel, err := CreateTaskExecutionModel(context.TODO(), CreateTaskExecutionModelInput{ + Request: &admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + TaskId: sampleTaskID, + ParentNodeExecutionId: sampleNodeExecID, + Phase: core.TaskExecution_RUNNING, + PhaseVersion: uint32(2), + RetryAttempt: 1, + InputValue: &event.TaskExecutionEvent_InputUri{ + InputUri: testInputURI, + }, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: taskEventOccurredAtProto, + Reason: "Task event 1", // Here for backwards compatibility + Reasons: []*event.EventReason{ + { + OccurredAt: taskEventOccurredAtProto, + Reason: "Task event 1", + }, + { + OccurredAt: secondTaskEventOccurredAtProto, + Reason: "Task event 2", + }, + }, + }, + }, + }) + assert.Nil(t, err) + + expectedClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + CreatedAt: taskEventOccurredAtProto, + UpdatedAt: taskEventOccurredAtProto, + Reason: "Task event 2", + Reasons: []*admin.Reason{ + {OccurredAt: taskEventOccurredAtProto, Message: "Task event 1"}, + {OccurredAt: secondTaskEventOccurredAtProto, Message: "Task event 2"}, + }, + } + expectedClosureBytes, err := proto.Marshal(expectedClosure) + assert.Nil(t, err) + assert.Equal(t, expectedClosureBytes, taskExecutionModel.Closure) +} + func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { existingClosure := &admin.TaskExecutionClosure{ Phase: core.TaskExecution_RUNNING, @@ -425,7 +512,7 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { }), Reason: "Task was scheduled", Reasons: []*admin.Reason{ - &admin.Reason{ + { OccurredAt: taskEventOccurredAtProto, Message: "Task was scheduled", }, @@ -526,11 +613,11 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { }), Reason: "task failed", Reasons: []*admin.Reason{ - &admin.Reason{ + { OccurredAt: taskEventOccurredAtProto, Message: "Task was scheduled", }, - &admin.Reason{ + { OccurredAt: occuredAtProto, Message: "task failed", }, @@ -569,6 +656,207 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { } +func TestUpdateTaskExecutionModelSingleEvents(t *testing.T) { + existingClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + CreatedAt: taskEventOccurredAtProto, + UpdatedAt: taskEventOccurredAtProto, + Reason: "Task was scheduled", + Reasons: []*admin.Reason{ + { + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + }, + } + + closureBytes, err := proto.Marshal(existingClosure) + assert.Nil(t, err) + + existingTaskExecution := models.TaskExecution{ + TaskExecutionKey: models.TaskExecutionKey{ + TaskKey: models.TaskKey{ + Project: sampleTaskID.Project, + Domain: sampleTaskID.Domain, + Name: sampleTaskID.Name, + Version: sampleTaskID.Version, + }, + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: sampleNodeExecID.NodeId, + ExecutionKey: models.ExecutionKey{ + Project: sampleNodeExecID.ExecutionId.Project, + Domain: sampleNodeExecID.ExecutionId.Domain, + Name: sampleNodeExecID.ExecutionId.Name, + }, + }, + RetryAttempt: &retryAttemptValue, + }, + Phase: "TaskExecutionPhase_TASK_PHASE_RUNNING", + InputURI: "input uri", + Closure: closureBytes, + StartedAt: &taskEventOccurredAt, + TaskExecutionCreatedAt: &taskEventOccurredAt, + TaskExecutionUpdatedAt: &taskEventOccurredAt, + } + + occuredAt := taskEventOccurredAt.Add(time.Minute) + occuredAtProto, err := ptypes.TimestampProto(occuredAt) + assert.Nil(t, err) + + taskEventRequest := &admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + TaskId: sampleTaskID, + ParentNodeExecutionId: sampleNodeExecID, + Phase: core.TaskExecution_RUNNING, + RetryAttempt: 1, + InputValue: &event.TaskExecutionEvent_InputUri{ + InputUri: testInputURI, + }, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: occuredAtProto, + Reason: "update 1", + }, + } + + err = UpdateTaskExecutionModel(context.TODO(), taskEventRequest, &existingTaskExecution, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) + assert.Nil(t, err) + + expectedClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + UpdatedAt: occuredAtProto, + CreatedAt: taskEventOccurredAtProto, + Reason: "update 1", + Reasons: []*admin.Reason{ + { + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + { + OccurredAt: occuredAtProto, + Message: "update 1", + }, + }, + } + + expectedClosureBytes, err := proto.Marshal(expectedClosure) + assert.Nil(t, err) + assert.Equal(t, expectedClosureBytes, existingTaskExecution.Closure) +} + +func TestUpdateTaskExecutionModelBatchedEvents(t *testing.T) { + existingClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + CreatedAt: taskEventOccurredAtProto, + UpdatedAt: taskEventOccurredAtProto, + Reason: "Task was scheduled", + Reasons: []*admin.Reason{ + { + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + }, + } + + closureBytes, err := proto.Marshal(existingClosure) + assert.Nil(t, err) + + existingTaskExecution := models.TaskExecution{ + TaskExecutionKey: models.TaskExecutionKey{ + TaskKey: models.TaskKey{ + Project: sampleTaskID.Project, + Domain: sampleTaskID.Domain, + Name: sampleTaskID.Name, + Version: sampleTaskID.Version, + }, + NodeExecutionKey: models.NodeExecutionKey{ + NodeID: sampleNodeExecID.NodeId, + ExecutionKey: models.ExecutionKey{ + Project: sampleNodeExecID.ExecutionId.Project, + Domain: sampleNodeExecID.ExecutionId.Domain, + Name: sampleNodeExecID.ExecutionId.Name, + }, + }, + RetryAttempt: &retryAttemptValue, + }, + Phase: "TaskExecutionPhase_TASK_PHASE_RUNNING", + InputURI: "input uri", + Closure: closureBytes, + StartedAt: &taskEventOccurredAt, + TaskExecutionCreatedAt: &taskEventOccurredAt, + TaskExecutionUpdatedAt: &taskEventOccurredAt, + } + + occuredAt := taskEventOccurredAt.Add(time.Minute) + occuredAtProto, err := ptypes.TimestampProto(occuredAt) + assert.Nil(t, err) + secondOccuredAt := taskEventOccurredAt.Add(time.Minute * 2) + secondOccuredAtProto, err := ptypes.TimestampProto(secondOccuredAt) + assert.Nil(t, err) + + taskEventRequest := &admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + TaskId: sampleTaskID, + ParentNodeExecutionId: sampleNodeExecID, + Phase: core.TaskExecution_RUNNING, + RetryAttempt: 1, + InputValue: &event.TaskExecutionEvent_InputUri{ + InputUri: testInputURI, + }, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: occuredAtProto, + Reason: "update 1", // Here for backwards compatibility + Reasons: []*event.EventReason{ + { + OccurredAt: occuredAtProto, + Reason: "update 1", + }, + { + OccurredAt: secondOccuredAtProto, + Reason: "update 2", + }, + }, + }, + } + + err = UpdateTaskExecutionModel(context.TODO(), taskEventRequest, &existingTaskExecution, + interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient()) + assert.Nil(t, err) + + expectedClosure := &admin.TaskExecutionClosure{ + Phase: core.TaskExecution_RUNNING, + StartedAt: taskEventOccurredAtProto, + UpdatedAt: occuredAtProto, + CreatedAt: taskEventOccurredAtProto, + Reason: "update 2", + Reasons: []*admin.Reason{ + { + OccurredAt: taskEventOccurredAtProto, + Message: "Task was scheduled", + }, + { + OccurredAt: occuredAtProto, + Message: "update 1", + }, + { + OccurredAt: secondOccuredAtProto, + Message: "update 2", + }, + }, + } + + expectedClosureBytes, err := proto.Marshal(expectedClosure) + assert.Nil(t, err) + assert.Equal(t, expectedClosureBytes, existingTaskExecution.Closure) +} + func TestFromTaskExecutionModel(t *testing.T) { taskClosure := &admin.TaskExecutionClosure{ Phase: core.TaskExecution_RUNNING,