Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

real time deck support #579

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dataproxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "no deckUrl found for request [%+v]", req)
}

// Check if the native url exists

Check warning on line 135 in dataproxy/service.go

View check run for this annotation

Codecov / codecov/patch

dataproxy/service.go#L135

Added line #L135 was not covered by tests
metadata, err := s.dataStore.Head(ctx, storage.DataReference(nativeURL))
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Failed to check the existence of the URL [%s]. Error: %v", nativeURL, err)
}

Check warning on line 139 in dataproxy/service.go

View check run for this annotation

Codecov / codecov/patch

dataproxy/service.go#L138-L139

Added lines #L138 - L139 were not covered by tests
if !metadata.Exists() {
return nil, errors.NewFlyteAdminErrorf(
codes.NotFound,
"URL [%s] does not exist yet. Please try again later. If you are using the real-time deck, this could be because the 'persist' function has not been called yet.",
nativeURL)
}

Check warning on line 145 in dataproxy/service.go

View check run for this annotation

Codecov / codecov/patch

dataproxy/service.go#L144-L145

Added lines #L144 - L145 were not covered by tests

signedURLResp, err := s.dataStore.CreateSignedURL(ctx, storage.DataReference(nativeURL), storage.SignedURLProperties{
Scope: stow.ClientMethodGet,
ExpiresIn: req.ExpiresIn.AsDuration(),
Expand Down
36 changes: 36 additions & 0 deletions dataproxy/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func TestCreateUploadLocation(t *testing.T) {

func TestCreateDownloadLink(t *testing.T) {
dataStore := commonMocks.GetMockStorageClient()
dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, reference storage.DataReference) (storage.Metadata, error) {
return existsMetadata{}, nil
}
nodeExecutionManager := &mocks.MockNodeExecutionManager{}
nodeExecutionManager.SetGetNodeExecutionFunc(func(ctx context.Context, request admin.NodeExecutionGetRequest) (*admin.NodeExecution, error) {
return &admin.NodeExecution{
Expand Down Expand Up @@ -127,6 +130,19 @@ func TestCreateDownloadLink(t *testing.T) {
})
assert.NoError(t, err)
})

t.Run("nonexistent URI", func(t *testing.T) {
dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, reference storage.DataReference) (storage.Metadata, error) {
return nonexistentMetadata{}, nil
}
_, err = s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{
ArtifactType: service.ArtifactType_ARTIFACT_TYPE_DECK,
Source: &service.CreateDownloadLinkRequest_NodeExecutionId{
NodeExecutionId: &core.NodeExecutionIdentifier{},
},
})
assert.Error(t, err)
})
}

func TestCreateDownloadLocation(t *testing.T) {
Expand Down Expand Up @@ -350,3 +366,23 @@ func TestService_Error(t *testing.T) {
assert.Error(t, err, "no task executions")
})
}

type existsMetadata struct{}

func (e existsMetadata) Exists() bool {
return true
}

func (e existsMetadata) Size() int64 {
return int64(1)
}

type nonexistentMetadata struct{}

func (e nonexistentMetadata) Exists() bool {
return false
}

func (e nonexistentMetadata) Size() int64 {
return int64(0)
}
1 change: 1 addition & 0 deletions pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution
"failed to marshal occurredAt into a timestamp proto with error: %v", err)
}
closure.StartedAt = startedAtProto
closure.DeckUri = request.Event.DeckUri
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/repositories/transformers/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var childExecutionID = &core.WorkflowExecutionIdentifier{
const dynamicWorkflowClosureRef = "s3://bucket/admin/metadata/workflow"

const testInputURI = "fake://bucket/inputs.pb"
const DeckURI = "fake://bucket/deck.html"

var testInputs = &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand All @@ -69,6 +70,7 @@ func TestAddRunningState(t *testing.T) {
Event: &event.NodeExecutionEvent{
Phase: core.NodeExecution_RUNNING,
OccurredAt: startedAtProto,
DeckUri: DeckURI,
},
}
nodeExecutionModel := models.NodeExecution{}
Expand All @@ -77,6 +79,7 @@ func TestAddRunningState(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, startedAt, *nodeExecutionModel.StartedAt)
assert.True(t, proto.Equal(startedAtProto, closure.StartedAt))
assert.Equal(t, DeckURI, closure.DeckUri)
}

func TestAddTerminalState_OutputURI(t *testing.T) {
Expand All @@ -88,6 +91,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) {
OutputUri: outputURI,
},
OccurredAt: occurredAtProto,
DeckUri: DeckURI,
},
}
startedAt := occurredAt.Add(-time.Minute)
Expand All @@ -103,6 +107,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, outputURI, closure.GetOutputUri())
assert.Equal(t, time.Minute, nodeExecutionModel.Duration)
assert.Equal(t, DeckURI, closure.DeckUri)
}

func TestAddTerminalState_OutputData(t *testing.T) {
Expand Down
Loading