diff --git a/dataproxy/service.go b/dataproxy/service.go index a62d37b43..45ee49ec2 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -132,6 +132,18 @@ func (s Service) CreateDownloadLink(ctx context.Context, req *service.CreateDown return nil, errors.NewFlyteAdminErrorf(codes.Internal, "no deckUrl found for request [%+v]", req) } + // Check if the native url exists + metadata, err := s.dataStore.Head(ctx, storage.DataReference(nativeURL)) + if err != nil { + return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Failed to check the existence of the URL [%s]. Error: %v", nativeURL, err) + } + if !metadata.Exists() { + return nil, errors.NewFlyteAdminErrorf( + codes.NotFound, + "URL [%s] does not exist yet. Please try again later. If you are using the real-time deck, this could be because the 'persist' function has not been called yet.", + nativeURL) + } + signedURLResp, err := s.dataStore.CreateSignedURL(ctx, storage.DataReference(nativeURL), storage.SignedURLProperties{ Scope: stow.ClientMethodGet, ExpiresIn: req.ExpiresIn.AsDuration(), diff --git a/dataproxy/service_test.go b/dataproxy/service_test.go index efaae1da1..7b4ba7d28 100644 --- a/dataproxy/service_test.go +++ b/dataproxy/service_test.go @@ -87,6 +87,9 @@ func TestCreateUploadLocation(t *testing.T) { func TestCreateDownloadLink(t *testing.T) { dataStore := commonMocks.GetMockStorageClient() + dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, reference storage.DataReference) (storage.Metadata, error) { + return existsMetadata{}, nil + } nodeExecutionManager := &mocks.MockNodeExecutionManager{} nodeExecutionManager.SetGetNodeExecutionFunc(func(ctx context.Context, request admin.NodeExecutionGetRequest) (*admin.NodeExecution, error) { return &admin.NodeExecution{ @@ -127,6 +130,19 @@ func TestCreateDownloadLink(t *testing.T) { }) assert.NoError(t, err) }) + + t.Run("nonexistent URI", func(t *testing.T) { + dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, reference storage.DataReference) (storage.Metadata, error) { + return nonexistentMetadata{}, nil + } + _, err = s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{ + ArtifactType: service.ArtifactType_ARTIFACT_TYPE_DECK, + Source: &service.CreateDownloadLinkRequest_NodeExecutionId{ + NodeExecutionId: &core.NodeExecutionIdentifier{}, + }, + }) + assert.Error(t, err) + }) } func TestCreateDownloadLocation(t *testing.T) { @@ -350,3 +366,23 @@ func TestService_Error(t *testing.T) { assert.Error(t, err, "no task executions") }) } + +type existsMetadata struct{} + +func (e existsMetadata) Exists() bool { + return true +} + +func (e existsMetadata) Size() int64 { + return int64(1) +} + +type nonexistentMetadata struct{} + +func (e nonexistentMetadata) Exists() bool { + return false +} + +func (e nonexistentMetadata) Size() int64 { + return int64(0) +} diff --git a/pkg/repositories/transformers/node_execution.go b/pkg/repositories/transformers/node_execution.go index 7eb57a70b..d3f615d6d 100644 --- a/pkg/repositories/transformers/node_execution.go +++ b/pkg/repositories/transformers/node_execution.go @@ -47,6 +47,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution "failed to marshal occurredAt into a timestamp proto with error: %v", err) } closure.StartedAt = startedAtProto + closure.DeckUri = request.Event.DeckUri return nil } diff --git a/pkg/repositories/transformers/node_execution_test.go b/pkg/repositories/transformers/node_execution_test.go index 88ef8fc26..49fbb50ba 100644 --- a/pkg/repositories/transformers/node_execution_test.go +++ b/pkg/repositories/transformers/node_execution_test.go @@ -55,6 +55,7 @@ var childExecutionID = &core.WorkflowExecutionIdentifier{ const dynamicWorkflowClosureRef = "s3://bucket/admin/metadata/workflow" const testInputURI = "fake://bucket/inputs.pb" +const DeckURI = "fake://bucket/deck.html" var testInputs = &core.LiteralMap{ Literals: map[string]*core.Literal{ @@ -69,6 +70,7 @@ func TestAddRunningState(t *testing.T) { Event: &event.NodeExecutionEvent{ Phase: core.NodeExecution_RUNNING, OccurredAt: startedAtProto, + DeckUri: DeckURI, }, } nodeExecutionModel := models.NodeExecution{} @@ -77,6 +79,7 @@ func TestAddRunningState(t *testing.T) { assert.Nil(t, err) assert.Equal(t, startedAt, *nodeExecutionModel.StartedAt) assert.True(t, proto.Equal(startedAtProto, closure.StartedAt)) + assert.Equal(t, DeckURI, closure.DeckUri) } func TestAddTerminalState_OutputURI(t *testing.T) { @@ -88,6 +91,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) { OutputUri: outputURI, }, OccurredAt: occurredAtProto, + DeckUri: DeckURI, }, } startedAt := occurredAt.Add(-time.Minute) @@ -103,6 +107,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) { assert.Nil(t, err) assert.EqualValues(t, outputURI, closure.GetOutputUri()) assert.Equal(t, time.Minute, nodeExecutionModel.Duration) + assert.Equal(t, DeckURI, closure.DeckUri) } func TestAddTerminalState_OutputData(t *testing.T) {