From b1274aafd8767c146ce6de69a045405fd60e4b11 Mon Sep 17 00:00:00 2001 From: Humair Khan Date: Tue, 23 Apr 2024 14:43:22 -0400 Subject: [PATCH 1/8] feat: add store session info to artifact property Signed-off-by: Humair Khan --- backend/src/v2/component/importer_launcher.go | 20 +++++++ backend/src/v2/component/launcher_v2.go | 2 +- backend/src/v2/config/env.go | 3 +- backend/src/v2/metadata/client.go | 53 ++++++++++++------- backend/src/v2/metadata/client_fake.go | 3 +- backend/src/v2/objectstore/config.go | 10 ++++ 6 files changed, 68 insertions(+), 23 deletions(-) diff --git a/backend/src/v2/component/importer_launcher.go b/backend/src/v2/component/importer_launcher.go index e6dae29d639..1207ed89fea 100644 --- a/backend/src/v2/component/importer_launcher.go +++ b/backend/src/v2/component/importer_launcher.go @@ -2,7 +2,9 @@ package component import ( "context" + "encoding/json" "fmt" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" pb "github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata" @@ -225,6 +227,10 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact state := pb.Artifact_LIVE + provider, err := objectstore.ParseProviderFromPath(artifactUri) + if err != nil { + return nil, fmt.Errorf("No Provider scheme found in artifact Uri: %s", artifactUri) + } artifact = &pb.Artifact{ TypeId: &artifactTypeId, State: &state, @@ -241,6 +247,20 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact artifact.CustomProperties[k] = value } } + + // Assume all imported artifacts will rely on execution environment for store provider session info + storeSessionInfo := objectstore.SessionInfo{ + Provider: provider, + Params: map[string]string{ + "fromEnv": "true", + }, + } + storeSessionInfoJSON, err := json.Marshal(storeSessionInfo) + if err != nil { + return nil, err + } + storeSessionInfoStr := string(storeSessionInfoJSON) + artifact.CustomProperties["store_session_info"] = metadata.StringValue(storeSessionInfoStr) return artifact, nil } diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go index 411055daca0..2d9b569b898 100644 --- a/backend/src/v2/component/launcher_v2.go +++ b/backend/src/v2/component/launcher_v2.go @@ -459,7 +459,7 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec if err != nil { return nil, fmt.Errorf("failed to determine schema for output %q: %w", name, err) } - mlmdArtifact, err := opts.metadataClient.RecordArtifact(ctx, name, schema, outputArtifact, pb.Artifact_LIVE) + mlmdArtifact, err := opts.metadataClient.RecordArtifact(ctx, name, schema, outputArtifact, pb.Artifact_LIVE, opts.bucketConfig) if err != nil { return nil, metadataErr(err) } diff --git a/backend/src/v2/config/env.go b/backend/src/v2/config/env.go index aa20b5a3916..3597091b79a 100644 --- a/backend/src/v2/config/env.go +++ b/backend/src/v2/config/env.go @@ -102,11 +102,10 @@ func InPodName() (string, error) { } func (c *Config) GetStoreSessionInfo(path string) (objectstore.SessionInfo, error) { - bucketConfig, err := objectstore.ParseBucketPathToConfig(path) + provider, err := objectstore.ParseProviderFromPath(path) if err != nil { return objectstore.SessionInfo{}, err } - provider := strings.TrimSuffix(bucketConfig.Scheme, "://") bucketProviders, err := c.getBucketProviders() if err != nil { return objectstore.SessionInfo{}, err diff --git a/backend/src/v2/metadata/client.go b/backend/src/v2/metadata/client.go index 4854809c88a..16d0c47dc58 100644 --- a/backend/src/v2/metadata/client.go +++ b/backend/src/v2/metadata/client.go @@ -18,8 +18,10 @@ package metadata import ( "context" + "encoding/json" "errors" "fmt" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" "path" "strconv" "strings" @@ -90,7 +92,7 @@ type ClientInterface interface { GetArtifactName(ctx context.Context, artifactId int64) (string, error) GetArtifacts(ctx context.Context, ids []int64) ([]*pb.Artifact, error) GetOutputArtifactsByExecutionId(ctx context.Context, executionId int64) (map[string]*OutputArtifact, error) - RecordArtifact(ctx context.Context, outputName, schema string, runtimeArtifact *pipelinespec.RuntimeArtifact, state pb.Artifact_State) (*OutputArtifact, error) + RecordArtifact(ctx context.Context, outputName, schema string, runtimeArtifact *pipelinespec.RuntimeArtifact, state pb.Artifact_State, bucketConfig *objectstore.Config) (*OutputArtifact, error) GetOrInsertArtifactType(ctx context.Context, schema string) (typeID int64, err error) FindMatchedArtifact(ctx context.Context, artifactToMatch *pb.Artifact, pipelineContextId int64) (matchedArtifact *pb.Artifact, err error) } @@ -301,11 +303,11 @@ func (c *Client) GetPipeline(ctx context.Context, pipelineName, runID, namespace } glog.Infof("Pipeline Context: %+v", pipelineContext) metadata := map[string]*pb.Value{ - keyNamespace: stringValue(namespace), - keyResourceName: stringValue(runResource), + keyNamespace: StringValue(namespace), + keyResourceName: StringValue(runResource), // pipeline root of this run - keyPipelineRoot: stringValue(GenerateOutputURI(pipelineRoot, []string{pipelineName, runID}, true)), - keyStoreSessionInfo: stringValue(storeSessionInfo), + keyPipelineRoot: StringValue(GenerateOutputURI(pipelineRoot, []string{pipelineName, runID}, true)), + keyStoreSessionInfo: StringValue(storeSessionInfo), } runContext, err := c.getOrInsertContext(ctx, runID, pipelineRunContextType, metadata) glog.Infof("Pipeline Run Context: %+v", runContext) @@ -401,7 +403,7 @@ func (c *Client) getExecutionTypeID(ctx context.Context, executionType *pb.Execu return eType.GetTypeId(), nil } -func stringValue(s string) *pb.Value { +func StringValue(s string) *pb.Value { return &pb.Value{Value: &pb.Value_StringValue{StringValue: s}} } @@ -531,8 +533,8 @@ func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, config TypeId: &typeID, CustomProperties: map[string]*pb.Value{ // We should support overriding display name in the future, for now it defaults to task name. - keyDisplayName: stringValue(config.TaskName), - keyTaskName: stringValue(config.TaskName), + keyDisplayName: StringValue(config.TaskName), + keyTaskName: StringValue(config.TaskName), }, } if config.Name != "" { @@ -555,15 +557,15 @@ func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, config e.CustomProperties[keyIterationCount] = intValue(int64(*config.IterationCount)) } if config.ExecutionType == ContainerExecutionTypeName { - e.CustomProperties[keyPodName] = stringValue(config.PodName) - e.CustomProperties[keyPodUID] = stringValue(config.PodUID) - e.CustomProperties[keyNamespace] = stringValue(config.Namespace) - e.CustomProperties[keyImage] = stringValue(config.Image) + e.CustomProperties[keyPodName] = StringValue(config.PodName) + e.CustomProperties[keyPodUID] = StringValue(config.PodUID) + e.CustomProperties[keyNamespace] = StringValue(config.Namespace) + e.CustomProperties[keyImage] = StringValue(config.Image) if config.CachedMLMDExecutionID != "" { - e.CustomProperties[keyCachedExecutionID] = stringValue(config.CachedMLMDExecutionID) + e.CustomProperties[keyCachedExecutionID] = StringValue(config.CachedMLMDExecutionID) } if config.FingerPrint != "" { - e.CustomProperties[keyCacheFingerPrint] = stringValue(config.FingerPrint) + e.CustomProperties[keyCacheFingerPrint] = StringValue(config.FingerPrint) } } if config.InputParameters != nil { @@ -623,9 +625,9 @@ func (c *Client) PrePublishExecution(ctx context.Context, execution *Execution, if e.CustomProperties == nil { e.CustomProperties = make(map[string]*pb.Value) } - e.CustomProperties[keyPodName] = stringValue(config.PodName) - e.CustomProperties[keyPodUID] = stringValue(config.PodUID) - e.CustomProperties[keyNamespace] = stringValue(config.Namespace) + e.CustomProperties[keyPodName] = StringValue(config.PodName) + e.CustomProperties[keyPodUID] = StringValue(config.PodUID) + e.CustomProperties[keyNamespace] = StringValue(config.Namespace) e.LastKnownState = pb.Execution_RUNNING.Enum() _, err := c.svc.PutExecution(ctx, &pb.PutExecutionRequest{ @@ -889,7 +891,7 @@ func SchemaToArtifactType(schema string) (*pb.ArtifactType, error) { } // RecordArtifact ... -func (c *Client) RecordArtifact(ctx context.Context, outputName, schema string, runtimeArtifact *pipelinespec.RuntimeArtifact, state pb.Artifact_State) (*OutputArtifact, error) { +func (c *Client) RecordArtifact(ctx context.Context, outputName, schema string, runtimeArtifact *pipelinespec.RuntimeArtifact, state pb.Artifact_State, bucketConfig *objectstore.Config) (*OutputArtifact, error) { artifact, err := toMLMDArtifact(runtimeArtifact) if err != nil { return nil, err @@ -911,7 +913,20 @@ func (c *Client) RecordArtifact(ctx context.Context, outputName, schema string, } if _, ok := artifact.CustomProperties["display_name"]; !ok { // display name default value - artifact.CustomProperties["display_name"] = stringValue(outputName) + artifact.CustomProperties["display_name"] = StringValue(outputName) + } + + // An artifact can belong to an external store specified via kfp-launcher + // or via executor environment (e.g. IRSA) + // This allows us to easily identify where to locate the artifact both + // in user executor environment as well as in kfp ui + if _, ok := artifact.CustomProperties["store_session_info"]; !ok { + storeSessionInfoJSON, err1 := json.Marshal(bucketConfig.SessionInfo) + if err1 != nil { + return nil, err1 + } + storeSessionInfoStr := string(storeSessionInfoJSON) + artifact.CustomProperties["store_session_info"] = StringValue(storeSessionInfoStr) } res, err := c.svc.PutArtifacts(ctx, &pb.PutArtifactsRequest{ diff --git a/backend/src/v2/metadata/client_fake.go b/backend/src/v2/metadata/client_fake.go index de8d007621e..8e9b7b84677 100644 --- a/backend/src/v2/metadata/client_fake.go +++ b/backend/src/v2/metadata/client_fake.go @@ -19,6 +19,7 @@ package metadata import ( "context" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" pb "github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata" @@ -82,7 +83,7 @@ func (c *FakeClient) GetOutputArtifactsByExecutionId(ctx context.Context, execut return nil, nil } -func (c *FakeClient) RecordArtifact(ctx context.Context, outputName, schema string, runtimeArtifact *pipelinespec.RuntimeArtifact, state pb.Artifact_State) (*OutputArtifact, error) { +func (c *FakeClient) RecordArtifact(ctx context.Context, outputName, schema string, runtimeArtifact *pipelinespec.RuntimeArtifact, state pb.Artifact_State, bucketConfig *objectstore.Config) (*OutputArtifact, error) { return nil, nil } diff --git a/backend/src/v2/objectstore/config.go b/backend/src/v2/objectstore/config.go index 06b26b8c436..28e82cd65de 100644 --- a/backend/src/v2/objectstore/config.go +++ b/backend/src/v2/objectstore/config.go @@ -151,6 +151,16 @@ func ParseBucketConfigForArtifactURI(uri string) (*Config, error) { }, nil } +// ParseProviderFromPath prases the uri and returns the scheme, which is +// used as the Provider string +func ParseProviderFromPath(uri string) (string, error) { + bucketConfig, err := ParseBucketPathToConfig(uri) + if err != nil { + return "", err + } + return strings.TrimSuffix(bucketConfig.Scheme, "://"), nil +} + func MinioDefaultEndpoint() string { // Discover minio-service in the same namespace by env var. // https://kubernetes.io/docs/concepts/services-networking/service/#environment-variables From 09d1bfbf6697e4b1b4e8d971e319579f0a8b117a Mon Sep 17 00:00:00 2001 From: Humair Khan Date: Wed, 1 May 2024 13:42:35 -0400 Subject: [PATCH 2/8] chore: fix tests for api, artifact load, & runlist Note that for runlist, the mock error response only returns one valid run list with error set, the other is undefined, so to support multiple runIds the mock error response will need to be adjusted. Signed-off-by: Humair Khan --- frontend/src/lib/Apis.test.ts | 8 +++----- frontend/src/lib/OutputArtifactLoader.test.ts | 2 +- frontend/src/pages/RunList.test.tsx | 6 +++--- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/frontend/src/lib/Apis.test.ts b/frontend/src/lib/Apis.test.ts index e080c3a91fa..ffb3fc57192 100644 --- a/frontend/src/lib/Apis.test.ts +++ b/frontend/src/lib/Apis.test.ts @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -import { Apis } from './Apis'; -import { StorageService } from './WorkflowParser'; +import {Apis} from './Apis'; +import {StorageService} from './WorkflowParser'; const fetchSpy = (response: string) => { const spy = jest.fn(() => @@ -128,9 +128,7 @@ describe('Apis', () => { const spy = fetchSpy('file contents'); expect( await Apis.readFile({ - bucket: 'testbucket', - key: 'testkey', - source: StorageService.GCS, + path: {source: StorageService.GCS, key:"testkey", bucket:'testbucket'}, }), ).toEqual('file contents'); expect(spy).toHaveBeenCalledWith('artifacts/get?source=gcs&bucket=testbucket&key=testkey', { diff --git a/frontend/src/lib/OutputArtifactLoader.test.ts b/frontend/src/lib/OutputArtifactLoader.test.ts index 69d6f7d23cf..100e9a940ef 100644 --- a/frontend/src/lib/OutputArtifactLoader.test.ts +++ b/frontend/src/lib/OutputArtifactLoader.test.ts @@ -88,7 +88,7 @@ describe('OutputArtifactLoader', () => { fileToRead = JSON.stringify({ outputs: [metadata] }); await OutputArtifactLoader.load(storagePath, 'ns1'); expect(readFileSpy).toHaveBeenCalledTimes(2); - expect(readFileSpy.mock.calls.map(([metadata, namespace]) => namespace)) + expect(readFileSpy.mock.calls.map(([{path, namespace}]) => namespace)) .toMatchInlineSnapshot(` Array [ "ns1", diff --git a/frontend/src/pages/RunList.test.tsx b/frontend/src/pages/RunList.test.tsx index f39609a9911..6a214608080 100644 --- a/frontend/src/pages/RunList.test.tsx +++ b/frontend/src/pages/RunList.test.tsx @@ -111,7 +111,7 @@ describe('RunList', () => { } beforeEach(() => { - formatDateStringSpy.mockImplementation((date?: Date) => { + formatDateStringSpy.mockImplementation((date?: Date | string) => { return date ? '1/2/2019, 12:34:56 PM' : '-'; }); onErrorSpy.mockClear(); @@ -324,7 +324,7 @@ describe('RunList', () => { 'bad stuff happened', ); const props = generateProps(); - props.runIdListMask = ['testrun1', 'testrun2']; + props.runIdListMask = ['testrun1']; render( @@ -333,7 +333,7 @@ describe('RunList', () => { await waitFor(() => { // won't call listRuns if specific run id is provided expect(listRunsSpy).toHaveBeenCalledTimes(0); - expect(getRunSpy).toHaveBeenCalledTimes(2); + expect(getRunSpy).toHaveBeenCalledTimes(1); }); screen.findByText('Failed to get associated experiment: bad stuff happened'); From 711683313593404971e313b12ee4c7cb86db7f01 Mon Sep 17 00:00:00 2001 From: Humair Khan Date: Wed, 1 May 2024 13:57:00 -0400 Subject: [PATCH 3/8] chore: support protocols in aws s3 endpoint config Signed-off-by: Humair Khan --- backend/src/v2/objectstore/object_store.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/src/v2/objectstore/object_store.go b/backend/src/v2/objectstore/object_store.go index 66693290105..a7b85565565 100644 --- a/backend/src/v2/objectstore/object_store.go +++ b/backend/src/v2/objectstore/object_store.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/kubernetes" "os" "path/filepath" + "regexp" "strings" ) @@ -261,12 +262,13 @@ func createS3BucketSession(ctx context.Context, namespace string, sessionInfo *S // AWS Specific: // Path-style S3 endpoints, which are commonly used, may fall into either of two subdomains: - // 1) s3.amazonaws.com + // 1) [https://]s3.amazonaws.com // 2) s3..amazonaws.com // for (1) the endpoint is not required, thus we skip it, otherwise the writer will fail to close due to region mismatch. // https://aws.amazon.com/blogs/infrastructure-and-automation/best-practices-for-using-amazon-s3-endpoints-in-aws-cloudformation-templates/ // https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ - if strings.ToLower(params.Endpoint) != "s3.amazonaws.com" { + awsEndpoint, _ := regexp.MatchString(`^(https://)?s3.amazonaws.com`, strings.ToLower(params.Endpoint)) + if !awsEndpoint { config.Endpoint = aws.String(params.Endpoint) } From ecdd81d0e267f6e7e80040a4fc2792ab1a98ba2f Mon Sep 17 00:00:00 2001 From: Humair Khan Date: Wed, 1 May 2024 13:57:41 -0400 Subject: [PATCH 4/8] feat(ui): allow ui server to parse provider info Signed-off-by: Humair Khan --- frontend/server/aws-helper.test.ts | 16 +- frontend/server/aws-helper.ts | 2 +- frontend/server/handlers/artifacts.ts | 263 ++++++++++++++++---------- frontend/server/k8s-helper.ts | 13 +- frontend/server/minio-helper.test.ts | 7 +- frontend/server/minio-helper.ts | 172 +++++++++++++---- frontend/server/utils.ts | 9 + frontend/server/workflow-helper.ts | 6 +- 8 files changed, 339 insertions(+), 149 deletions(-) diff --git a/frontend/server/aws-helper.test.ts b/frontend/server/aws-helper.test.ts index 87a2a00ce01..232e3607d84 100644 --- a/frontend/server/aws-helper.test.ts +++ b/frontend/server/aws-helper.test.ts @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. import fetch from 'node-fetch'; -import { awsInstanceProfileCredentials, isS3Endpoint } from './aws-helper'; +import { awsInstanceProfileCredentials, isAWSS3Endpoint } from './aws-helper'; // mock node-fetch module jest.mock('node-fetch'); @@ -107,30 +107,30 @@ describe('awsInstanceProfileCredentials', () => { describe('isS3Endpoint', () => { it('checks a valid s3 endpoint', () => { - expect(isS3Endpoint('s3.amazonaws.com')).toBe(true); + expect(isAWSS3Endpoint('s3.amazonaws.com')).toBe(true); }); it('checks a valid s3 regional endpoint', () => { - expect(isS3Endpoint('s3.dualstack.us-east-1.amazonaws.com')).toBe(true); + expect(isAWSS3Endpoint('s3.dualstack.us-east-1.amazonaws.com')).toBe(true); }); it('checks a valid s3 cn endpoint', () => { - expect(isS3Endpoint('s3.cn-north-1.amazonaws.com.cn')).toBe(true); + expect(isAWSS3Endpoint('s3.cn-north-1.amazonaws.com.cn')).toBe(true); }); it('checks a valid s3 fips GovCloud endpoint', () => { - expect(isS3Endpoint('s3-fips.us-gov-west-1.amazonaws.com')).toBe(true); + expect(isAWSS3Endpoint('s3-fips.us-gov-west-1.amazonaws.com')).toBe(true); }); it('checks a valid s3 PrivateLink endpoint', () => { - expect(isS3Endpoint('vpce-1a2b3c4d-5e6f.s3.us-east-1.vpce.amazonaws.com')).toBe(true); + expect(isAWSS3Endpoint('vpce-1a2b3c4d-5e6f.s3.us-east-1.vpce.amazonaws.com')).toBe(true); }); it('checks an invalid s3 endpoint', () => { - expect(isS3Endpoint('amazonaws.com')).toBe(false); + expect(isAWSS3Endpoint('amazonaws.com')).toBe(false); }); it('checks non-s3 endpoint', () => { - expect(isS3Endpoint('minio.kubeflow')).toBe(false); + expect(isAWSS3Endpoint('minio.kubeflow')).toBe(false); }); }); diff --git a/frontend/server/aws-helper.ts b/frontend/server/aws-helper.ts index 35a023d2ec8..b0d061c063a 100644 --- a/frontend/server/aws-helper.ts +++ b/frontend/server/aws-helper.ts @@ -51,7 +51,7 @@ async function getIAMInstanceProfile(): Promise { * * @param endpoint minio endpoint to check. */ -export function isS3Endpoint(endpoint: string = ''): boolean { +export function isAWSS3Endpoint(endpoint: string = ''): boolean { return !!endpoint.match(/s3.{0,}\.amazonaws\.com\.?.{0,}/i); } diff --git a/frontend/server/handlers/artifacts.ts b/frontend/server/handlers/artifacts.ts index cbc8a5da84a..6dd6f0cdf34 100644 --- a/frontend/server/handlers/artifacts.ts +++ b/frontend/server/handlers/artifacts.ts @@ -13,8 +13,8 @@ // limitations under the License. import fetch from 'node-fetch'; import { AWSConfigs, HttpConfigs, MinioConfigs, ProcessEnv } from '../configs'; -import { Client as MinioClient } from 'minio'; -import { PreviewStream, findFileOnPodVolume } from '../utils'; +import {Client as MinioClient} from 'minio'; +import {PreviewStream, findFileOnPodVolume, parseJSONString} from '../utils'; import { createMinioClient, getObjectStream } from '../minio-helper'; import * as serverInfo from '../helpers/server-info'; import { Handler, Request, Response } from 'express'; @@ -24,6 +24,9 @@ import { HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS } from '../consts'; import * as fs from 'fs'; import { isAllowedDomain } from './domain-checker'; +import {getK8sSecret} from "../k8s-helper"; +import {StorageOptions} from "@google-cloud/storage/build/src/storage"; +import {CredentialBody} from "google-auth-library/build/src/auth/credentials"; /** * ArtifactsQueryStrings describes the expected query strings key value pairs @@ -38,6 +41,31 @@ interface ArtifactsQueryStrings { key: string; /** return only the first x characters or bytes. */ peek?: number; + /** optional provider info to use to query object store */ + providerInfo?: string; + namespace?: string; +} + +export interface S3ProviderInfo { + Provider: string; + Params: { + fromEnv: string; + secretName?: string; + accessKeyKey?: string; + secretKeyKey?: string; + region?: string; + endpoint?: string; + disableSSL?: string; + }; +} + +export interface GCSProviderInfo { + Provider: string; + Params: { + fromEnv: string; + secretName?: string; + tokenKey?: string; + }; } /** @@ -49,10 +77,10 @@ interface ArtifactsQueryStrings { * @param tryExtract whether the handler try to extract content from *.tar.gz files. */ export function getArtifactsHandler({ - artifactsConfigs, - useParameter, - tryExtract, -}: { + artifactsConfigs, + useParameter, + tryExtract, + }: { artifactsConfigs: { aws: AWSConfigs; http: HttpConfigs; @@ -67,7 +95,7 @@ export function getArtifactsHandler({ const source = useParameter ? req.params.source : req.query.source; const bucket = useParameter ? req.params.bucket : req.query.bucket; const key = useParameter ? req.params[0] : req.query.key; - const { peek = 0 } = req.query as Partial; + const { peek = 0, providerInfo = "", namespace = ""} = req.query as Partial; if (!source) { res.status(500).send('Storage source is missing from artifact request'); return; @@ -81,53 +109,63 @@ export function getArtifactsHandler({ return; } console.log(`Getting storage artifact at: ${source}: ${bucket}/${key}`); + + let client : MinioClient; switch (source) { case 'gcs': - getGCSArtifactHandler({ bucket, key }, peek)(req, res); + await getGCSArtifactHandler({bucket, key}, peek, providerInfo, namespace)(req, res); break; - case 'minio': - getMinioArtifactHandler( - { - bucket, - client: new MinioClient(minio), - key, - tryExtract, - }, - peek, + try { + client = await createMinioClient(minio, 'minio', providerInfo, namespace); + } catch (e) { + res.status(500).send(`Failed to initialize Minio Client for Minio Provider: ${e}`); + return; + } + await getMinioArtifactHandler( + { + bucket, + client, + key, + tryExtract, + }, + peek, )(req, res); break; case 's3': - getMinioArtifactHandler( - { - bucket, - client: await createMinioClient(aws), - key, - }, - peek, + try { + client = await createMinioClient(minio, 's3', providerInfo, namespace); + } catch (e) { + res.status(500).send(`Failed to initialize Minio Client for S3 Provider: ${e}`); + return; + } + await getMinioArtifactHandler( + { + bucket, + client, + key, + }, + peek, )(req, res); break; - case 'http': case 'https': - getHttpArtifactsHandler( - allowedDomain, - getHttpUrl(source, http.baseUrl || '', bucket, key), - http.auth, - peek, + await getHttpArtifactsHandler( + allowedDomain, + getHttpUrl(source, http.baseUrl || '', bucket, key), + http.auth, + peek, )(req, res); break; - case 'volume': await getVolumeArtifactsHandler( - { - bucket, - key, - }, - peek, + { + bucket, + key, + }, + peek, )(req, res); break; - default: res.status(500).send('Unknown storage source'); return; @@ -164,7 +202,7 @@ function getHttpArtifactsHandler( if (auth.key.length > 0) { // inject original request's value if exists, otherwise default to provided default value headers[auth.key] = - req.headers[auth.key] || req.headers[auth.key.toLowerCase()] || auth.defaultValue; + req.headers[auth.key] || req.headers[auth.key.toLowerCase()] || auth.defaultValue; } if (!isAllowedDomain(url, allowedDomain)) { res.status(500).send(`Domain not allowed.`); @@ -172,9 +210,9 @@ function getHttpArtifactsHandler( } const response = await fetch(url, { headers }); response.body - .on('error', err => res.status(500).send(`Unable to retrieve artifact: ${err}`)) - .pipe(new PreviewStream({ peek })) - .pipe(res); + .on('error', err => res.status(500).send(`Unable to retrieve artifact: ${err}`)) + .pipe(new PreviewStream({ peek })) + .pipe(res); }; } @@ -196,16 +234,43 @@ function getMinioArtifactHandler( }; } -function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: number = 0) { +async function parseGCSProviderInfo(providerInfo: GCSProviderInfo, namespace: string): Promise { + if (!providerInfo.Params.tokenKey || !providerInfo.Params.secretName) { + throw new Error('Provider info with fromEnv:false supplied with incomplete secret credential info.'); + } + let configGCS: StorageOptions; + try { + const tokenString = await getK8sSecret(providerInfo.Params.secretName, providerInfo.Params.tokenKey, namespace); + const credentials = parseJSONString(tokenString); + configGCS = {credentials}; + configGCS.scopes = "https://www.googleapis.com/auth/devstorage.read_write"; + return configGCS; + } catch (err) { + throw new Error('Failed to parse GCS Provider config. Error: ' + err); + } +} + +function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: number = 0, providerInfoString?: string, namespace?: string) { const { key, bucket } = options; return async (_: Request, res: Response) => { try { + let storageOptions : StorageOptions | undefined; + if(providerInfoString) { + const providerInfo = parseJSONString(providerInfoString); + if (providerInfo && providerInfo.Params.fromEnv === "false") { + if (!namespace){ + res.status(500).send('Failed to parse provider info. Reason: No namespace provided'); + } else { + storageOptions = await parseGCSProviderInfo(providerInfo, namespace); + } + } + } // Read all files that match the key pattern, which can include wildcards '*'. // The way this works is we list all paths whose prefix is the substring // of the pattern until the first wildcard, then we create a regular // expression out of the pattern, escaping all non-wildcard characters, // and we use it to match all enumerated paths. - const storage = new Storage(); + const storage = new Storage(storageOptions); const prefix = key.indexOf('*') > -1 ? key.substr(0, key.indexOf('*')) : key; const files = await storage.bucket(bucket).getFiles({ prefix }); const matchingFiles = files[0].filter(f => { @@ -214,11 +279,11 @@ function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: n // Build a RegExp object that only recognizes asterisks ('*'), and // escapes everything else. const regex = new RegExp( - '^' + + '^' + key - .split(/\*+/) - .map(escapeRegexChars) - .join('.*') + + .split(/\*+/) + .map(escapeRegexChars) + .join('.*') + '$', ); return regex.test(f.name); @@ -230,16 +295,16 @@ function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: n return; } console.log( - `Found ${matchingFiles.length} matching files: `, - matchingFiles.map(file => file.name).join(','), + `Found ${matchingFiles.length} matching files: `, + matchingFiles.map(file => file.name).join(','), ); let contents = ''; // TODO: support peek for concatenated matching files if (peek) { matchingFiles[0] - .createReadStream() - .pipe(new PreviewStream({ peek })) - .pipe(res); + .createReadStream() + .pipe(new PreviewStream({ peek })) + .pipe(res); return; } @@ -247,17 +312,17 @@ function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: n matchingFiles.forEach((f, i) => { const buffer: Buffer[] = []; f.createReadStream() - .on('data', data => buffer.push(Buffer.from(data))) - .on('end', () => { - contents += - Buffer.concat(buffer) - .toString() - .trim() + '\n'; - if (i === matchingFiles.length - 1) { - res.send(contents); - } - }) - .on('error', () => res.status(500).send('Failed to read file: ' + f.name)); + .on('data', data => buffer.push(Buffer.from(data))) + .on('end', () => { + contents += + Buffer.concat(buffer) + .toString() + .trim() + '\n'; + if (i === matchingFiles.length - 1) { + res.send(contents); + } + }) + .on('error', () => res.status(500).send('Failed to read file: ' + f.name)); }); } catch (err) { res.status(500).send('Failed to download GCS file(s). Error: ' + err); @@ -297,14 +362,14 @@ function getVolumeArtifactsHandler(options: { bucket: string; key: string }, pee const stat = await fs.promises.stat(filePath); if (stat.isDirectory()) { res - .status(400) - .send(`Failed to open volume file ${filePath} is directory, does not support now`); + .status(400) + .send(`Failed to open volume file ${filePath} is directory, does not support now`); return; } fs.createReadStream(filePath) - .pipe(new PreviewStream({ peek })) - .pipe(res); + .pipe(new PreviewStream({ peek })) + .pipe(res); } catch (err) { console.log(`Failed to open volume: ${err}`); res.status(500).send(`Failed to open volume.`); @@ -340,10 +405,10 @@ const QUERIES = { }; export function getArtifactsProxyHandler({ - enabled, - allowedDomain, - namespacedServiceGetter, -}: { + enabled, + allowedDomain, + namespacedServiceGetter, + }: { enabled: boolean; allowedDomain: string; namespacedServiceGetter: NamespacedServiceGetter; @@ -352,36 +417,36 @@ export function getArtifactsProxyHandler({ return (req, res, next) => next(); } return proxy( - (_pathname, req) => { - // only proxy requests with namespace query parameter - return !!getNamespaceFromUrl(req.url || ''); - }, - { - changeOrigin: true, - onProxyReq: proxyReq => { - console.log('Proxied artifact request: ', proxyReq.path); + (_pathname, req) => { + // only proxy requests with namespace query parameter + return !!getNamespaceFromUrl(req.url || ''); }, - pathRewrite: (pathStr, req) => { - const url = new URL(pathStr || '', DUMMY_BASE_PATH); - url.searchParams.delete(QUERIES.NAMESPACE); - return url.pathname + url.search; - }, - router: req => { - const namespace = getNamespaceFromUrl(req.url || ''); - if (!namespace) { - console.log(`namespace query param expected in ${req.url}.`); - throw new Error(`namespace query param expected.`); - } - const urlStr = namespacedServiceGetter(namespace!); - if (!isAllowedDomain(urlStr, allowedDomain)) { - console.log(`Domain is not allowed.`); - throw new Error(`Domain is not allowed.`); - } - return namespacedServiceGetter(namespace!); + { + changeOrigin: true, + onProxyReq: proxyReq => { + console.log('Proxied artifact request: ', proxyReq.path); + }, + pathRewrite: (pathStr, req) => { + const url = new URL(pathStr || '', DUMMY_BASE_PATH); + url.searchParams.delete(QUERIES.NAMESPACE); + return url.pathname + url.search; + }, + router: req => { + const namespace = getNamespaceFromUrl(req.url || ''); + if (!namespace) { + console.log(`namespace query param expected in ${req.url}.`); + throw new Error(`namespace query param expected.`); + } + const urlStr = namespacedServiceGetter(namespace!); + if (!isAllowedDomain(urlStr, allowedDomain)) { + console.log(`Domain is not allowed.`); + throw new Error(`Domain is not allowed.`); + } + return namespacedServiceGetter(namespace!); + }, + target: '/artifacts', + headers: HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS, }, - target: '/artifacts', - headers: HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS, - }, ); } diff --git a/frontend/server/k8s-helper.ts b/frontend/server/k8s-helper.ts index 7db2561d00d..67d17d71d71 100644 --- a/frontend/server/k8s-helper.ts +++ b/frontend/server/k8s-helper.ts @@ -333,13 +333,20 @@ export async function getArgoWorkflow(workflowName: string): Promise { const MockedMinioClient: jest.Mock = MinioClient as any; + const MockedAuthorizeFn: jest.Mock = jest.fn(x => undefined); beforeEach(() => { jest.resetAllMocks(); @@ -34,7 +35,7 @@ describe('minio-helper', () => { accessKey: 'accesskey', endPoint: 'minio.kubeflow:80', secretKey: 'secretkey', - }); + }, 's3'); expect(client).toBeInstanceOf(MinioClient); expect(MockedMinioClient).toHaveBeenCalledWith({ @@ -47,7 +48,7 @@ describe('minio-helper', () => { it('fallbacks to the provided configs if EC2 metadata is not available.', async () => { const client = await createMinioClient({ endPoint: 'minio.kubeflow:80', - }); + }, 's3'); expect(client).toBeInstanceOf(MinioClient); expect(MockedMinioClient).toHaveBeenCalledWith({ @@ -71,7 +72,7 @@ describe('minio-helper', () => { Promise.resolve(true), ); - const client = await createMinioClient({ endPoint: 's3.awsamazon.com' }); + const client = await createMinioClient({ endPoint: 's3.awsamazon.com' }, 's3'); expect(client).toBeInstanceOf(MinioClient); expect(MockedMinioClient).toHaveBeenCalledWith({ diff --git a/frontend/server/minio-helper.ts b/frontend/server/minio-helper.ts index bb272440d27..37afd1c05cb 100644 --- a/frontend/server/minio-helper.ts +++ b/frontend/server/minio-helper.ts @@ -16,8 +16,17 @@ import { Transform, PassThrough } from 'stream'; import * as tar from 'tar-stream'; import peek from 'peek-stream'; import gunzip from 'gunzip-maybe'; +import { URL } from 'url'; import { Client as MinioClient, ClientOptions as MinioClientOptions } from 'minio'; -import { awsInstanceProfileCredentials, isS3Endpoint } from './aws-helper'; +import { awsInstanceProfileCredentials, isAWSS3Endpoint } from './aws-helper'; +import { S3ProviderInfo} from "./handlers/artifacts"; +import {getK8sSecret} from "./k8s-helper"; +import {ErrorDetails, parseError, parseJSONString} from "./utils"; +import {AuthorizeFn} from "./helpers/auth"; +import {AuthorizeRequestResources, AuthorizeRequestVerb} from "./src/generated/apis/auth"; +import * as k8sHelper from "./k8s-helper"; +import {ParamsDictionary} from "express-serve-static-core"; +import { Request } from 'express'; const { fromNodeProviderChain } = require('@aws-sdk/credential-providers'); /** MinioRequestConfig describes the info required to retrieve an artifact. */ @@ -34,48 +43,145 @@ export interface MinioClientOptionsWithOptionalSecrets extends Partial(providerInfoString); + // If fromEnv == false, we rely on the default credentials or env to provide credentials (e.g. IRSA) + if (providerInfo && providerInfo.Params.fromEnv === "false") { + if (!namespace){ + throw new Error("Artifact Store provider given, but no namespace provided."); + } else { + config = await parseS3ProviderInfo(config, providerInfo, namespace); } - } catch (err) { - console.error('Unable to get credentials from AWS credential provider chain: ', err); } } - - // This logic is S3 generic - if (!config.accessKey || !config.secretKey) { - try { - if (await awsInstanceProfileCredentials.ok()) { - const credentials = await awsInstanceProfileCredentials.getCredentials(); - if (credentials) { + // If using s3 and sourcing credentials from environment (currently only check aws env) + if (providerType === "s3" && (!config.accessKey || !config.secretKey)) { + // AWS S3 with credentials from provider chain + if (isAWSS3Endpoint(config.endPoint)) { + try { + const credentials = fromNodeProviderChain(); + const aws_credentials = await credentials(); + if (aws_credentials) { const { - AccessKeyId: accessKey, - SecretAccessKey: secretKey, - Token: sessionToken, - } = credentials; + accessKeyId: accessKey, + secretAccessKey: secretKey, + sessionToken: sessionToken, + } = aws_credentials; return new MinioClient({ ...config, accessKey, secretKey, sessionToken }); } - console.error('unable to get credentials from AWS metadata store.'); + } catch (err) { + console.error('Unable to get credentials from AWS credential provider chain: ', err); + } + } else { + // If no access/secret key or endpoint provided + // attempt to fetch AWS S3 instance profile credentials + if (!config.accessKey || !config.secretKey) { + try { + if (await awsInstanceProfileCredentials.ok()) { + const credentials = await awsInstanceProfileCredentials.getCredentials(); + if (credentials) { + const { + AccessKeyId: accessKey, + SecretAccessKey: secretKey, + Token: sessionToken, + } = credentials; + return new MinioClient({ ...config, accessKey, secretKey, sessionToken }); + } + console.error('Unable to get credentials from AWS metadata store.'); + } + } catch (err) { + console.error('Unable to get aws instance profile credentials: ', err); + } + } + } + } + + // If using any AWS or S3 compatible store (e.g. minio, aws s3 when using manual creds, ceph, etc.) + let mc : MinioClient; + try { + mc = await new MinioClient(config as MinioClientOptions); + } catch (err) { + throw new Error(`Failed to create MinioClient: ${err}`); + } + return mc; +} + +// Parse provider info for any s3 compatible store that's not AWS S3 +async function parseS3ProviderInfo(config: MinioClientOptionsWithOptionalSecrets, providerInfo: S3ProviderInfo, namespace: string) : Promise { + if (!providerInfo.Params.accessKeyKey || !providerInfo.Params.secretKeyKey || !providerInfo.Params.secretName) { + throw new Error('Provider info with fromEnv:false supplied with incomplete secret credential info.'); + } + + try { + config.accessKey = await getK8sSecret(providerInfo.Params.secretName, providerInfo.Params.accessKeyKey, namespace); + config.secretKey = await getK8sSecret(providerInfo.Params.secretName, providerInfo.Params.secretKeyKey, namespace); + } catch (e) { + throw new Error(`Encountered error when trying to fetch provider secret ${providerInfo.Params.secretName}.`); + } + + if (isAWSS3Endpoint(providerInfo.Params.endpoint)) { + if (providerInfo.Params.endpoint) { + if(providerInfo.Params.endpoint.startsWith("https")){ + const parseEndpoint = new URL(providerInfo.Params.endpoint); + config.endPoint = parseEndpoint.hostname; + } else { + config.endPoint = providerInfo.Params.endpoint; } - } catch (err) { - console.error('Unable to get aws instance profile credentials: ', err); + } else { + throw new Error('Provider info missing endpoint parameter.'); + } + + if (providerInfo.Params.region) { + config.region = providerInfo.Params.region; + } + + // It's possible the user specifies these via config + // since aws s3 and s3-compatible use the same config parameters + // safeguard the user by ensuring these remain unset (default) + config.port = undefined; + config.useSSL = undefined; + } else { + if (providerInfo.Params.endpoint) { + const parseEndpoint = new URL(providerInfo.Params.endpoint); + const host = parseEndpoint.hostname; + const port = parseEndpoint.port; + config.endPoint = host; + // user provided port in endpoint takes precedence + // e.g. if the user has provided ..svc.cluster.local: + config.port = port ? Number(port) : undefined; + } + + config.region = providerInfo.Params.region ? providerInfo.Params.region : undefined; + + if (providerInfo.Params.disableSSL) { + config.useSSL = !(providerInfo.Params.disableSSL.toLowerCase() === "true"); + } else { + config.useSSL = undefined; } } - return new MinioClient(config as MinioClientOptions); + return config; } /** diff --git a/frontend/server/utils.ts b/frontend/server/utils.ts index 6d317473788..2ba7b95c25c 100644 --- a/frontend/server/utils.ts +++ b/frontend/server/utils.ts @@ -66,6 +66,15 @@ export function loadJSON(filepath?: string, defaultValue?: T): T | undefined } } +export function parseJSONString(str: string) { + try { + const jsonValue: T = JSON.parse(str); + return jsonValue; + } catch { + return undefined; + } +} + /** * find final file path in pod: * 1. check volume and volume mount exist in pod diff --git a/frontend/server/workflow-helper.ts b/frontend/server/workflow-helper.ts index 678740d53ec..2acefbf181f 100644 --- a/frontend/server/workflow-helper.ts +++ b/frontend/server/workflow-helper.ts @@ -138,7 +138,7 @@ export function createPodLogsMinioRequestConfig( // different bucket/prefix for diff namespace? return async (podName: string, _namespace?: string): Promise => { // create a new client each time to ensure session token has not expired - const client = await createMinioClient(minioOptions); + const client = await createMinioClient(minioOptions, "s3"); const workflowName = workflowNameFromPodName(podName); return { bucket, @@ -189,13 +189,15 @@ export async function getPodLogsMinioRequestConfigfromWorkflow( const { host, port } = urlSplit(s3Artifact.endpoint, s3Artifact.insecure); const { accessKey, secretKey } = await getMinioClientSecrets(s3Artifact); + + const client = await createMinioClient({ accessKey, endPoint: host, port, secretKey, useSSL: !s3Artifact.insecure, - }); + }, "s3"); return { bucket: s3Artifact.bucket, client, From fa45198a943eeb41e1cb96946b9683b26ba219d6 Mon Sep 17 00:00:00 2001 From: Humair Khan Date: Wed, 1 May 2024 13:58:14 -0400 Subject: [PATCH 5/8] feat(ui): parse artifact provider info in ui Signed-off-by: Humair Khan --- frontend/server/handlers/artifacts.ts | 4 +- .../integration-tests/artifact-get.test.ts | 28 +++++-- frontend/server/minio-helper.ts | 76 ++++++------------- frontend/src/components/ArtifactPreview.tsx | 13 +++- .../src/components/MinioArtifactPreview.tsx | 2 +- .../src/components/tabs/InputOutputTab.tsx | 65 ++++++++++++---- .../components/tabs/RuntimeNodeDetailsV2.tsx | 17 ++++- .../viewers/MetricsVisualizations.tsx | 10 ++- frontend/src/lib/Apis.ts | 18 ++++- frontend/src/lib/OutputArtifactLoader.ts | 4 +- frontend/src/mlmd/MlmdUtils.ts | 4 + 11 files changed, 152 insertions(+), 89 deletions(-) diff --git a/frontend/server/handlers/artifacts.ts b/frontend/server/handlers/artifacts.ts index 6dd6f0cdf34..4fbcba99b3f 100644 --- a/frontend/server/handlers/artifacts.ts +++ b/frontend/server/handlers/artifacts.ts @@ -15,7 +15,7 @@ import fetch from 'node-fetch'; import { AWSConfigs, HttpConfigs, MinioConfigs, ProcessEnv } from '../configs'; import {Client as MinioClient} from 'minio'; import {PreviewStream, findFileOnPodVolume, parseJSONString} from '../utils'; -import { createMinioClient, getObjectStream } from '../minio-helper'; +import {createMinioClient, getObjectStream} from '../minio-helper'; import * as serverInfo from '../helpers/server-info'; import { Handler, Request, Response } from 'express'; import { Storage } from '@google-cloud/storage'; @@ -134,7 +134,7 @@ export function getArtifactsHandler({ break; case 's3': try { - client = await createMinioClient(minio, 's3', providerInfo, namespace); + client = await createMinioClient(aws, 's3', providerInfo, namespace); } catch (e) { res.status(500).send(`Failed to initialize Minio Client for S3 Provider: ${e}`); return; diff --git a/frontend/server/integration-tests/artifact-get.test.ts b/frontend/server/integration-tests/artifact-get.test.ts index 6a96303d5b2..5edde4ae83c 100644 --- a/frontend/server/integration-tests/artifact-get.test.ts +++ b/frontend/server/integration-tests/artifact-get.test.ts @@ -37,9 +37,10 @@ describe('/artifacts', () => { const { argv } = commonSetup(); let artifactContent: any = 'hello world'; + let mockedMinioClient: jest.Mock = MinioClient as any; beforeEach(() => { artifactContent = 'hello world'; // reset - const mockedMinioClient: jest.Mock = MinioClient as any; + mockedMinioClient = MinioClient as any; mockedMinioClient.mockImplementation(function() { return { getObject: async (bucket: string, key: string) => { @@ -90,14 +91,33 @@ describe('/artifacts', () => { }); }); + it('creates s3 minio client if source=s3', done => { + const configs = loadConfigs(argv, {}); + app = new UIServer(configs); + + process.env.AWS_ACCESS_KEY_ID = 'aws123'; + process.env.AWS_SECRET_ACCESS_KEY = 'awsSecret123'; + const request = requests(app.start()); + request + .get('/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt') + .expect(200, artifactContent, err => { + expect(mockedMinioClient).toBeCalledWith({ + accessKey: 'aws123', + endPoint: 's3.amazonaws.com', + region: 'us-east-1', + secretKey: 'awsSecret123', + useSSL: true, + }); + done(err); + }); + }); + it('responds with a s3 artifact if source=s3', done => { - const mockedMinioClient: jest.Mock = jest.spyOn(minioHelper, 'createMinioClient') as any; const configs = loadConfigs(argv, { AWS_ACCESS_KEY_ID: 'aws123', AWS_SECRET_ACCESS_KEY: 'awsSecret123', }); app = new UIServer(configs); - const request = requests(app.start()); request .get('/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt') @@ -114,7 +134,6 @@ describe('/artifacts', () => { }); it('responds with partial s3 artifact if peek=5 flag is set', done => { - const mockedMinioClient = jest.spyOn(minioHelper, 'createMinioClient'); const configs = loadConfigs(argv, { AWS_ACCESS_KEY_ID: 'aws123', AWS_SECRET_ACCESS_KEY: 'awsSecret123', @@ -137,7 +156,6 @@ describe('/artifacts', () => { }); it('responds with a s3 artifact from bucket in non-default region if source=s3', done => { - const mockedMinioClient: jest.Mock = jest.spyOn(minioHelper, 'createMinioClient') as any; const configs = loadConfigs(argv, { AWS_ACCESS_KEY_ID: 'aws123', AWS_SECRET_ACCESS_KEY: 'awsSecret123', diff --git a/frontend/server/minio-helper.ts b/frontend/server/minio-helper.ts index 37afd1c05cb..5c70b9756bc 100644 --- a/frontend/server/minio-helper.ts +++ b/frontend/server/minio-helper.ts @@ -21,14 +21,8 @@ import { Client as MinioClient, ClientOptions as MinioClientOptions } from 'mini import { awsInstanceProfileCredentials, isAWSS3Endpoint } from './aws-helper'; import { S3ProviderInfo} from "./handlers/artifacts"; import {getK8sSecret} from "./k8s-helper"; -import {ErrorDetails, parseError, parseJSONString} from "./utils"; -import {AuthorizeFn} from "./helpers/auth"; -import {AuthorizeRequestResources, AuthorizeRequestVerb} from "./src/generated/apis/auth"; -import * as k8sHelper from "./k8s-helper"; -import {ParamsDictionary} from "express-serve-static-core"; -import { Request } from 'express'; -const { fromNodeProviderChain } = require('@aws-sdk/credential-providers'); - +import {parseJSONString} from "./utils"; +const { fromNodeProviderChain, fromEnv } = require('@aws-sdk/credential-providers'); /** MinioRequestConfig describes the info required to retrieve an artifact. */ export interface MinioRequestConfig { bucket: string; @@ -55,16 +49,14 @@ export interface MinioClientOptionsWithOptionalSecrets extends Partial(providerInfoString); // If fromEnv == false, we rely on the default credentials or env to provide credentials (e.g. IRSA) @@ -76,45 +68,27 @@ export async function createMinioClient(defaultConfig: MinioClientOptionsWithOpt } } } - // If using s3 and sourcing credentials from environment (currently only check aws env) + + // If using s3 and sourcing credentials from environment (currently only aws is supported) if (providerType === "s3" && (!config.accessKey || !config.secretKey)) { // AWS S3 with credentials from provider chain if (isAWSS3Endpoint(config.endPoint)) { try { const credentials = fromNodeProviderChain(); - const aws_credentials = await credentials(); - if (aws_credentials) { + const awsCredentials = await credentials(); + if (awsCredentials) { const { accessKeyId: accessKey, secretAccessKey: secretKey, - sessionToken: sessionToken, - } = aws_credentials; + sessionToken, + } = awsCredentials; return new MinioClient({ ...config, accessKey, secretKey, sessionToken }); } - } catch (err) { - console.error('Unable to get credentials from AWS credential provider chain: ', err); + } catch (e) { + console.error('Unable to get aws instance profile credentials: ', e); } } else { - // If no access/secret key or endpoint provided - // attempt to fetch AWS S3 instance profile credentials - if (!config.accessKey || !config.secretKey) { - try { - if (await awsInstanceProfileCredentials.ok()) { - const credentials = await awsInstanceProfileCredentials.getCredentials(); - if (credentials) { - const { - AccessKeyId: accessKey, - SecretAccessKey: secretKey, - Token: sessionToken, - } = credentials; - return new MinioClient({ ...config, accessKey, secretKey, sessionToken }); - } - console.error('Unable to get credentials from AWS metadata store.'); - } - } catch (err) { - console.error('Unable to get aws instance profile credentials: ', err); - } - } + console.error('Encountered S3-compatible provider type with no provided credentials, and unsupported environment based credential support.'); } } @@ -201,8 +175,8 @@ export function isTarball(buf: Buffer) { const v0 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x20, 0x20, 0x00]; return ( - v1.reduce((res, curr, i) => res && curr === buf[offset + i], true) || - v0.reduce((res, curr, i) => res && curr === buf[offset + i], true as boolean) + v1.reduce((res, curr, i) => res && curr === buf[offset + i], true) || + v0.reduce((res, curr, i) => res && curr === buf[offset + i], true as boolean) ); } @@ -212,11 +186,11 @@ export function isTarball(buf: Buffer) { */ export function maybeTarball(): Transform { return peek( - { newline: false, maxBuffer: 264 }, - (data: Buffer, swap: (error?: Error, parser?: Transform) => void) => { - if (isTarball(data)) swap(undefined, extractFirstTarRecordAsStream()); - else swap(undefined, new PassThrough()); - }, + { newline: false, maxBuffer: 264 }, + (data: Buffer, swap: (error?: Error, parser?: Transform) => void) => { + if (isTarball(data)) swap(undefined, extractFirstTarRecordAsStream()); + else swap(undefined, new PassThrough()); + }, ); } @@ -258,11 +232,11 @@ function extractFirstTarRecordAsStream() { * */ export async function getObjectStream({ - bucket, - key, - client, - tryExtract = true, -}: MinioRequestConfig): Promise { + bucket, + key, + client, + tryExtract = true, + }: MinioRequestConfig): Promise { const stream = await client.getObject(bucket, key); return tryExtract ? stream.pipe(gunzip()).pipe(maybeTarball()) : stream.pipe(new PassThrough()); } diff --git a/frontend/src/components/ArtifactPreview.tsx b/frontend/src/components/ArtifactPreview.tsx index 36b6da10466..43deb93338c 100644 --- a/frontend/src/components/ArtifactPreview.tsx +++ b/frontend/src/components/ArtifactPreview.tsx @@ -24,6 +24,7 @@ import { stylesheet } from 'typestyle'; import Banner from './Banner'; import { ValueComponentProps } from './DetailsTable'; import { logger } from 'src/lib/Utils'; +import { URIToSessionInfo } from "./tabs/InputOutputTab"; const css = stylesheet({ root: { @@ -50,6 +51,7 @@ const css = stylesheet({ export interface ArtifactPreviewProps extends ValueComponentProps { namespace?: string; + sessionMap?: URIToSessionInfo; maxbytes?: number; maxlines?: number; } @@ -60,12 +62,16 @@ export interface ArtifactPreviewProps extends ValueComponentProps { const ArtifactPreview: React.FC = ({ value, namespace, + sessionMap, maxbytes = 255, maxlines = 20, }) => { - let storage: StoragePath | undefined; + let storage: StoragePath | undefined + let providerInfo: string | undefined + if (value) { try { + providerInfo = sessionMap?.get(value) storage = WorkflowParser.parseStoragePath(value); } catch (error) { logger.error(error); @@ -74,7 +80,7 @@ const ArtifactPreview: React.FC = ({ const { isSuccess, isError, data, error } = useQuery( ['artifact_preview', { value, namespace, maxbytes, maxlines }], - () => getPreview(storage, namespace, maxbytes, maxlines), + () => getPreview(storage, providerInfo, namespace, maxbytes, maxlines), { staleTime: Infinity }, ); @@ -125,6 +131,7 @@ export default ArtifactPreview; async function getPreview( storagePath: StoragePath | undefined, + providerInfo: string | undefined, namespace: string | undefined, maxbytes: number, maxlines?: number, @@ -133,7 +140,7 @@ async function getPreview( return ``; } // TODO how to handle binary data (can probably use magic number to id common mime types) - let data = await Apis.readFile(storagePath, namespace, maxbytes + 1); + let data = await Apis.readFile({path: storagePath, providerInfo: providerInfo, namespace: namespace, peek: maxbytes +1}); // is preview === data and no maxlines if (data.length <= maxbytes && (!maxlines || data.split('\n').length < maxlines)) { return data; diff --git a/frontend/src/components/MinioArtifactPreview.tsx b/frontend/src/components/MinioArtifactPreview.tsx index ab4a9a70611..f27ccc9e9af 100644 --- a/frontend/src/components/MinioArtifactPreview.tsx +++ b/frontend/src/components/MinioArtifactPreview.tsx @@ -78,7 +78,7 @@ async function getPreview( maxlines?: number, ): Promise<{ data: string; hasMore: boolean }> { // TODO how to handle binary data (can probably use magic number to id common mime types) - let data = await Apis.readFile(storagePath, namespace, maxbytes + 1); + let data = await Apis.readFile({path: storagePath, namespace: namespace, peek: maxbytes +1}); // is preview === data and no maxlines if (data.length <= maxbytes && !maxlines) { return { data, hasMore: false }; diff --git a/frontend/src/components/tabs/InputOutputTab.tsx b/frontend/src/components/tabs/InputOutputTab.tsx index a510732e2e8..9957fd164f7 100644 --- a/frontend/src/components/tabs/InputOutputTab.tsx +++ b/frontend/src/components/tabs/InputOutputTab.tsx @@ -27,7 +27,7 @@ import { getArtifactName, getArtifactTypeName, getArtifactTypes, - getLinkedArtifactsByExecution, + getLinkedArtifactsByExecution, getStoreSessionInfoFromArtifact, LinkedArtifact, } from 'src/mlmd/MlmdUtils'; import { ArtifactType, Execution } from 'src/third_party/mlmd'; @@ -37,7 +37,17 @@ import DetailsTable from '../DetailsTable'; import { RoutePageFactory } from '../Router'; import { ExecutionTitle } from './ExecutionTitle'; -type ParamList = Array>; +export type ParamList = Array>; +export type URIToSessionInfo = Map; +export interface ArtifactParamsWithSessionInfo { + params: ParamList; + sessionMap: URIToSessionInfo +} + +export interface ArtifactLocation { + uri: string; + store_session_info: string | undefined; +} export interface IOTabProps { execution: Execution; @@ -69,19 +79,30 @@ export function InputOutputTab({ execution, namespace }: IOTabProps) { // Restructs artifacts and parameters for visualization. const inputParams = extractInputFromExecution(execution); const outputParams = extractOutputFromExecution(execution); - let inputArtifacts: ParamList = []; - let outputArtifacts: ParamList = []; + let inputArtifactsWithSessionInfo: ArtifactParamsWithSessionInfo | undefined; + let outputArtifactsWithSessionInfo: ArtifactParamsWithSessionInfo | undefined; if (isSuccess && linkedArtifacts) { - inputArtifacts = getArtifactParamList( + inputArtifactsWithSessionInfo = getArtifactParamList( filterEventWithInputArtifact(linkedArtifacts), artifactTypeNames, ); - outputArtifacts = getArtifactParamList( + outputArtifactsWithSessionInfo = getArtifactParamList( filterEventWithOutputArtifact(linkedArtifacts), artifactTypeNames, ); } + let inputArtifacts : ParamList = [] + let outputArtifacts : ParamList = [] + + if (inputArtifactsWithSessionInfo){ + inputArtifacts = inputArtifactsWithSessionInfo.params + } + + if (outputArtifactsWithSessionInfo) { + outputArtifacts = outputArtifactsWithSessionInfo.params + } + let isIoEmpty = false; if ( inputParams.length === 0 && @@ -129,6 +150,7 @@ export function InputOutputTab({ execution, namespace }: IOTabProps) { valueComponent={ArtifactPreview} valueComponentProps={{ namespace: namespace, + sessionMap: inputArtifactsWithSessionInfo?.sessionMap, }} /> @@ -153,6 +175,7 @@ export function InputOutputTab({ execution, namespace }: IOTabProps) { valueComponent={ArtifactPreview} valueComponentProps={{ namespace: namespace, + sessionMap: outputArtifactsWithSessionInfo?.sessionMap, }} /> @@ -191,24 +214,34 @@ function extractParamFromExecution(execution: Execution, name: string): KeyValue export function getArtifactParamList( inputArtifacts: LinkedArtifact[], artifactTypeNames: string[], -): ParamList { - return Object.values(inputArtifacts).map((linkedArtifact, index) => { +): (ArtifactParamsWithSessionInfo) { + + let sessMap : URIToSessionInfo = new Map() + + let params = Object.values(inputArtifacts).map((linkedArtifact, index) => { let key = getArtifactName(linkedArtifact); if ( - key && - (artifactTypeNames[index] === 'system.Metrics' || - artifactTypeNames[index] === 'system.ClassificationMetrics') + key && + (artifactTypeNames[index] === 'system.Metrics' || + artifactTypeNames[index] === 'system.ClassificationMetrics') ) { key += ' (This is an empty file by default)'; } const artifactId = linkedArtifact.artifact.getId(); const artifactElement = RoutePageFactory.artifactDetails(artifactId) ? ( - - {key} - + + {key} + ) : ( - key + key ); - return [artifactElement, linkedArtifact.artifact.getUri()]; + + const uri = linkedArtifact.artifact.getUri(); + const sessInfo = getStoreSessionInfoFromArtifact(linkedArtifact); + sessMap.set(uri, sessInfo) + + return [artifactElement, uri]; }); + + return {params: params, sessionMap: sessMap} } diff --git a/frontend/src/components/tabs/RuntimeNodeDetailsV2.tsx b/frontend/src/components/tabs/RuntimeNodeDetailsV2.tsx index 4b1aa618e82..c9ada66e430 100644 --- a/frontend/src/components/tabs/RuntimeNodeDetailsV2.tsx +++ b/frontend/src/components/tabs/RuntimeNodeDetailsV2.tsx @@ -47,7 +47,10 @@ import LogViewer from 'src/components/LogViewer'; import { getResourceStateText, ResourceType } from 'src/components/ResourceInfo'; import { MetricsVisualizations } from 'src/components/viewers/MetricsVisualizations'; import { ArtifactTitle } from 'src/components/tabs/ArtifactTitle'; -import InputOutputTab, { getArtifactParamList } from 'src/components/tabs/InputOutputTab'; +import InputOutputTab, { + getArtifactParamList, + ParamList +} from 'src/components/tabs/InputOutputTab'; import { convertYamlToPlatformSpec, convertYamlToV2PipelineSpec } from 'src/lib/v2/WorkflowUtils'; import { PlatformDeploymentConfig } from 'src/generated/pipeline_spec/pipeline_spec'; import { getComponentSpec } from 'src/lib/v2/NodeUtils'; @@ -177,7 +180,7 @@ function TaskNodeDetail({ {selectedTab === 0 && (() => { if (execution) { - return ; + return ; } return NODE_STATE_UNAVAILABLE; })()} @@ -418,6 +421,13 @@ function ArtifactInfo({ ['Created At', createdAt], ]; + let artifactParamsWithSessionInfo = getArtifactParamList([linkedArtifact], artifactTypeName); + let artifactParams : ParamList = [] + + if (artifactParamsWithSessionInfo){ + artifactParams = artifactParamsWithSessionInfo.params + } + return (
@@ -431,10 +441,11 @@ function ArtifactInfo({ key={`artifact-url`} title='Artifact URI' - fields={getArtifactParamList([linkedArtifact], artifactTypeName)} + fields={artifactParams} valueComponent={ArtifactPreview} valueComponentProps={{ namespace: namespace, + sessionMap: artifactParamsWithSessionInfo.sessionMap, }} />
diff --git a/frontend/src/components/viewers/MetricsVisualizations.tsx b/frontend/src/components/viewers/MetricsVisualizations.tsx index 757e03a98f7..fab79d0c980 100644 --- a/frontend/src/components/viewers/MetricsVisualizations.tsx +++ b/frontend/src/components/viewers/MetricsVisualizations.tsx @@ -27,7 +27,7 @@ import { getMetadataValue } from 'src/mlmd/library'; import { filterArtifactsByType, filterLinkedArtifactsByType, - getArtifactName, + getArtifactName, getStoreSessionInfoFromArtifact, LinkedArtifact, } from 'src/mlmd/MlmdUtils'; import { Artifact, ArtifactType, Execution } from 'src/third_party/mlmd'; @@ -887,8 +887,10 @@ export async function getHtmlViewerConfig( throw new Error('HTML Artifact storagePath unknown'); } + const providerInfo = getStoreSessionInfoFromArtifact(linkedArtifact); + // TODO(zijianjoy): Limit the size of HTML file fetching to prevent UI frozen. - let data = await Apis.readFile(storagePath, namespace); + let data = await Apis.readFile({path: storagePath, providerInfo, namespace: namespace}); return { htmlContent: data, type: PlotType.WEB_APP } as HTMLViewerConfig; }); return Promise.all(htmlViewerConfigs); @@ -913,8 +915,10 @@ export async function getMarkdownViewerConfig( throw new Error('Markdown Artifact storagePath unknown'); } + const providerInfo = getStoreSessionInfoFromArtifact(linkedArtifact); + // TODO(zijianjoy): Limit the size of Markdown file fetching to prevent UI frozen. - let data = await Apis.readFile(storagePath, namespace); + let data = await Apis.readFile({path: storagePath, providerInfo, namespace: namespace}); return { markdownContent: data, type: PlotType.MARKDOWN } as MarkdownViewerConfig; }); return Promise.all(markdownViewerConfigs); diff --git a/frontend/src/lib/Apis.ts b/frontend/src/lib/Apis.ts index 0c42bb60711..497e9b75104 100644 --- a/frontend/src/lib/Apis.ts +++ b/frontend/src/lib/Apis.ts @@ -265,23 +265,34 @@ export class Apis { /** * Reads file from storage using server. */ - public static readFile(path: StoragePath, namespace?: string, peek?: number): Promise { - return this._fetch(this.buildReadFileUrl({ path, namespace, peek, isDownload: false })); + public static readFile({path, providerInfo, namespace, peek} : { + path: StoragePath; + namespace?: string; + providerInfo?: string; + peek?: number; + }): Promise { + let query = this.buildReadFileUrl({ path, namespace, providerInfo, peek, isDownload: false }) + return this._fetch(query); } /** * Builds an url for the readFile API to retrieve a workflow artifact. * @param path object describing the artifact (e.g. source, bucket, and key) + * @param namespace the experiment namespace + * @param providerInfo non default provider info to build session for the object store where the artifact is stored + * @param peek the amount used to be read * @param isDownload whether we download the artifact as is (e.g. skip extracting from *.tar.gz) */ public static buildReadFileUrl({ path, namespace, + providerInfo, peek, isDownload, }: { path: StoragePath; namespace?: string; + providerInfo?: string; peek?: number; isDownload?: boolean; }) { @@ -289,10 +300,11 @@ export class Apis { if (isDownload) { return `artifacts/${source}/${bucket}/${key}${buildQuery({ namespace, + providerInfo, peek, })}`; } else { - return `artifacts/get${buildQuery({ source, namespace, peek, bucket, key })}`; + return `artifacts/get${buildQuery({ source, namespace, providerInfo, peek, bucket, key })}`; } } diff --git a/frontend/src/lib/OutputArtifactLoader.ts b/frontend/src/lib/OutputArtifactLoader.ts index c29a349fd7b..8a23d71841a 100644 --- a/frontend/src/lib/OutputArtifactLoader.ts +++ b/frontend/src/lib/OutputArtifactLoader.ts @@ -58,7 +58,7 @@ export class OutputArtifactLoader { public static async load(outputPath: StoragePath, namespace?: string): Promise { let plotMetadataList: PlotMetadata[] = []; try { - const metadataFile = await Apis.readFile(outputPath, namespace); + const metadataFile = await Apis.readFile({path: outputPath, namespace: namespace}); if (metadataFile) { try { plotMetadataList = OutputArtifactLoader.parseOutputMetadataInJson( @@ -516,7 +516,7 @@ async function readSourceContent( if (storage === 'inline') { return source; } - return await Apis.readFile(WorkflowParser.parseStoragePath(source), namespace); + return await Apis.readFile({path: WorkflowParser.parseStoragePath(source), namespace: namespace}); } export const TEST_ONLY = { diff --git a/frontend/src/mlmd/MlmdUtils.ts b/frontend/src/mlmd/MlmdUtils.ts index efdf1bacfc9..a6614108eb0 100644 --- a/frontend/src/mlmd/MlmdUtils.ts +++ b/frontend/src/mlmd/MlmdUtils.ts @@ -287,6 +287,10 @@ export interface LinkedArtifact { artifact: Artifact; } +export function getStoreSessionInfoFromArtifact(artifact : LinkedArtifact) : string | undefined { + return artifact.artifact.getCustomPropertiesMap().get("store_session_info")?.getStringValue(); +} + export async function getLinkedArtifactsByEvents(events: Event[]): Promise { const artifactIds = events .filter(event => event.getArtifactId()) From 4d5ebdbdce8609a521574b2fa84c034969cc336b Mon Sep 17 00:00:00 2001 From: Humair Khan Date: Thu, 2 May 2024 16:22:45 -0400 Subject: [PATCH 6/8] chore: add tests for provider info Signed-off-by: Humair Khan --- .../integration-tests/artifact-get.test.ts | 205 +++++++++++++++++- frontend/server/minio-helper.ts | 5 +- frontend/server/utils.ts | 2 +- 3 files changed, 205 insertions(+), 7 deletions(-) diff --git a/frontend/server/integration-tests/artifact-get.test.ts b/frontend/server/integration-tests/artifact-get.test.ts index 5edde4ae83c..87aa3bfe34f 100644 --- a/frontend/server/integration-tests/artifact-get.test.ts +++ b/frontend/server/integration-tests/artifact-get.test.ts @@ -22,13 +22,14 @@ import requests from 'supertest'; import { UIServer } from '../app'; import { loadConfigs } from '../configs'; import * as serverInfo from '../helpers/server-info'; -import * as minioHelper from '../minio-helper'; import { commonSetup, mkTempDir } from './test-helper'; +import {getK8sSecret} from "../k8s-helper"; const MinioClient = minio.Client; jest.mock('minio'); jest.mock('node-fetch'); jest.mock('@google-cloud/storage'); +jest.mock('../k8s-helper'); const mockedFetch: jest.Mock = fetch as any; @@ -37,10 +38,9 @@ describe('/artifacts', () => { const { argv } = commonSetup(); let artifactContent: any = 'hello world'; - let mockedMinioClient: jest.Mock = MinioClient as any; beforeEach(() => { artifactContent = 'hello world'; // reset - mockedMinioClient = MinioClient as any; + const mockedMinioClient = MinioClient as any; mockedMinioClient.mockImplementation(function() { return { getObject: async (bucket: string, key: string) => { @@ -91,7 +91,8 @@ describe('/artifacts', () => { }); }); - it('creates s3 minio client if source=s3', done => { + it('responds with artifact if source is AWS S3, and creds are sourced from Env', done => { + const mockedMinioClient: jest.Mock = minio.Client as any; const configs = loadConfigs(argv, {}); app = new UIServer(configs); @@ -112,7 +113,8 @@ describe('/artifacts', () => { }); }); - it('responds with a s3 artifact if source=s3', done => { + it('responds with artifact if source is AWS S3, and creds are sourced from Load Configs', done => { + const mockedMinioClient: jest.Mock = minio.Client as any; const configs = loadConfigs(argv, { AWS_ACCESS_KEY_ID: 'aws123', AWS_SECRET_ACCESS_KEY: 'awsSecret123', @@ -129,11 +131,201 @@ describe('/artifacts', () => { secretKey: 'awsSecret123', useSSL: true, }); + + expect(mockedMinioClient).toBeCalledTimes(1); done(err); }); }); + it('responds with artifact if source is AWS S3, and creds are sourced from Provider Configs', done => { + const mockedMinioClient: jest.Mock = minio.Client as any; + const mockedGetK8sSecret: jest.Mock = getK8sSecret as any; + mockedGetK8sSecret.mockResolvedValue('someSecret'); + const configs = loadConfigs(argv, {}); + app = new UIServer(configs); + const request = requests(app.start()); + const providerInfo = { + "Params": { + "accessKeyKey": "someSecret", + // this not set and default is used (tls=true) + // since aws connections are always tls secured + "disableSSL": "false", + "endpoint": "s3.amazonaws.com", + "fromEnv": "false", + // this not set and default is used + // since aws connections always have the same port + "port": "0001", + "region": "us-east-2", + "secretKeyKey": "someSecret", + "secretName": "aws-s3-creds" + }, + "Provider": "s3", + }; + const namespace = "test"; + request + .get(`/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt&namespace=${namespace}&providerInfo=${JSON.stringify(providerInfo)}`) + .expect(200, artifactContent, err => { + expect(mockedMinioClient).toBeCalledWith({ + accessKey: 'someSecret', + endPoint: 's3.amazonaws.com', + port: undefined, + region: 'us-east-2', + secretKey: 'someSecret', + useSSL: undefined, + }); + expect(mockedMinioClient).toBeCalledTimes(1); + expect(mockedGetK8sSecret).toBeCalledWith("aws-s3-creds", "someSecret", `${namespace}`); + expect(mockedGetK8sSecret).toBeCalledTimes(2); + done(err); + }); + }); + + it('responds error when source is s3, and creds are sourced from Provider Configs, but no namespace is provided', done => { + const mockedGetK8sSecret: jest.Mock = getK8sSecret as any; + const configs = loadConfigs(argv, {}); + app = new UIServer(configs); + const request = requests(app.start()); + const providerInfo = { + "Params": { + "accessKeyKey": "AWS_ACCESS_KEY_ID", + "disableSSL": "false", + "endpoint": "s3.amazonaws.com", + "fromEnv": "false", + "region": "us-east-2", + "secretKeyKey": "AWS_SECRET_ACCESS_KEY", + "secretName": "aws-s3-creds" + }, + "Provider": "s3", + }; + request + .get(`/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt$&providerInfo=${JSON.stringify(providerInfo)}`) + .expect(500, 'Failed to initialize Minio Client for S3 Provider: Error: Artifact Store provider given, but no namespace provided.', err => { + expect(mockedGetK8sSecret).toBeCalledTimes(0); + done(err); + }); + }); + + it('responds with artifact if source is s3-compatible, and creds are sourced from Provider Configs', done => { + const mockedMinioClient: jest.Mock = minio.Client as any; + const mockedGetK8sSecret: jest.Mock = getK8sSecret as any; + mockedGetK8sSecret.mockResolvedValue('someSecret'); + const configs = loadConfigs(argv, {}); + app = new UIServer(configs); + const request = requests(app.start()); + const providerInfo = { + "Params": { + "accessKeyKey": "someSecret", + "disableSSL": "false", + "endpoint": "https://mys3.com", + "fromEnv": "false", + "region": "auto", + "secretKeyKey": "someSecret", + "secretName": "my-secret" + }, + "Provider": "s3", + }; + const namespace = "test"; + request + .get(`/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt&namespace=${namespace}&providerInfo=${JSON.stringify(providerInfo)}`) + .expect(200, artifactContent, err => { + expect(mockedMinioClient).toBeCalledWith({ + accessKey: 'someSecret', + endPoint: 'mys3.com', + port: undefined, + region: 'auto', + secretKey: 'someSecret', + useSSL: true, + }); + expect(mockedMinioClient).toBeCalledTimes(1); + expect(mockedGetK8sSecret).toBeCalledWith("my-secret", "someSecret", `${namespace}`); + expect(mockedGetK8sSecret).toBeCalledTimes(2); + done(err); + }); + }); + + it('responds with artifact if source is s3-compatible, and creds are sourced from Provider Configs, with endpoint port', done => { + const mockedMinioClient: jest.Mock = minio.Client as any; + const mockedGetK8sSecret: jest.Mock = getK8sSecret as any; + mockedGetK8sSecret.mockResolvedValue('someSecret'); + const configs = loadConfigs(argv, {}); + app = new UIServer(configs); + const request = requests(app.start()); + const providerInfo = { + "Params": { + "accessKeyKey": "someSecret", + "disableSSL": "false", + "endpoint": "https://mys3.ns.svc.cluster.local:1234", + "fromEnv": "false", + "region": "auto", + "secretKeyKey": "someSecret", + "secretName": "my-secret" + }, + "Provider": "s3", + }; + const namespace = "test"; + request + .get(`/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt&namespace=${namespace}&providerInfo=${JSON.stringify(providerInfo)}`) + .expect(200, artifactContent, err => { + expect(mockedMinioClient).toBeCalledWith({ + accessKey: 'someSecret', + endPoint: 'mys3.ns.svc.cluster.local', + port: 1234, + region: 'auto', + secretKey: 'someSecret', + useSSL: true, + }); + expect(mockedMinioClient).toBeCalledTimes(1); + expect(mockedGetK8sSecret).toBeCalledWith("my-secret", "someSecret", `${namespace}`); + expect(mockedGetK8sSecret).toBeCalledTimes(2); + done(err); + }); + }); + + it('responds with artifact if source is gcs, and creds are sourced from Provider Configs', done => { + const artifactContent = 'hello world'; + const mockedGcsStorage: jest.Mock = GCSStorage as any; + const mockedGetK8sSecret: jest.Mock = getK8sSecret as any; + mockedGetK8sSecret.mockResolvedValue('{"private_key":"testkey","client_email":"testemail"}'); + const stream = new PassThrough(); + stream.write(artifactContent); + stream.end(); + mockedGcsStorage.mockImplementationOnce(() => ({ + bucket: () => ({ + getFiles: () => + Promise.resolve([[{ name: 'hello/world.txt', createReadStream: () => stream }]]), + }), + })); + const configs = loadConfigs(argv, {}); + app = new UIServer(configs); + const request = requests(app.start()); + const providerInfo = { + "Params": { + "fromEnv": "false", + "secretName": "someSecret", + "tokenKey": 'somekey' + }, + "Provider": "gs", + }; + const namespace = "test"; + request + .get(`/artifacts/get?source=gcs&bucket=ml-pipeline&key=hello%2Fworld.txt&namespace=${namespace}&providerInfo=${JSON.stringify(providerInfo)}`) + .expect(200, artifactContent + '\n', err => { + const expectedArg = { + "credentials": { + "client_email": "testemail", + "private_key": "testkey", + }, + "scopes": "https://www.googleapis.com/auth/devstorage.read_write" + }; + expect(mockedGcsStorage).toBeCalledWith(expectedArg); + expect(mockedGetK8sSecret).toBeCalledWith("someSecret", "somekey", `${namespace}`); + expect(mockedGetK8sSecret).toBeCalledTimes(1); + done(err); + }); + }); + it('responds with partial s3 artifact if peek=5 flag is set', done => { + const mockedMinioClient: jest.Mock = minio.Client as any; const configs = loadConfigs(argv, { AWS_ACCESS_KEY_ID: 'aws123', AWS_SECRET_ACCESS_KEY: 'awsSecret123', @@ -151,11 +343,14 @@ describe('/artifacts', () => { secretKey: 'awsSecret123', useSSL: true, }); + + expect(mockedMinioClient).toBeCalledTimes(1); done(err); }); }); it('responds with a s3 artifact from bucket in non-default region if source=s3', done => { + const mockedMinioClient: jest.Mock = minio.Client as any; const configs = loadConfigs(argv, { AWS_ACCESS_KEY_ID: 'aws123', AWS_SECRET_ACCESS_KEY: 'awsSecret123', diff --git a/frontend/server/minio-helper.ts b/frontend/server/minio-helper.ts index 5c70b9756bc..20ac1dc5ca3 100644 --- a/frontend/server/minio-helper.ts +++ b/frontend/server/minio-helper.ts @@ -59,8 +59,11 @@ export interface MinioClientOptionsWithOptionalSecrets extends Partial(providerInfoString); + if (!providerInfo) { + throw new Error("Failed to parse provider info."); + } // If fromEnv == false, we rely on the default credentials or env to provide credentials (e.g. IRSA) - if (providerInfo && providerInfo.Params.fromEnv === "false") { + if (providerInfo.Params.fromEnv === "false") { if (!namespace){ throw new Error("Artifact Store provider given, but no namespace provided."); } else { diff --git a/frontend/server/utils.ts b/frontend/server/utils.ts index 2ba7b95c25c..69068804e81 100644 --- a/frontend/server/utils.ts +++ b/frontend/server/utils.ts @@ -70,7 +70,7 @@ export function parseJSONString(str: string) { try { const jsonValue: T = JSON.parse(str); return jsonValue; - } catch { + } catch (e) { return undefined; } } From bf809f28afc1ed80538236df2392b15d9a1ddbd6 Mon Sep 17 00:00:00 2001 From: Humair Khan Date: Thu, 2 May 2024 17:30:00 -0400 Subject: [PATCH 7/8] chore: update ec2 tests ..and clean up imports. Signed-off-by: Humair Khan --- frontend/server/handlers/artifacts.ts | 10 +++--- frontend/server/minio-helper.test.ts | 32 +++++++------------ frontend/server/minio-helper.ts | 10 +++--- .../src/components/tabs/InputOutputTab.tsx | 3 +- .../viewers/MetricsVisualizations.tsx | 3 +- 5 files changed, 26 insertions(+), 32 deletions(-) diff --git a/frontend/server/handlers/artifacts.ts b/frontend/server/handlers/artifacts.ts index 4fbcba99b3f..5ccbbe6e36b 100644 --- a/frontend/server/handlers/artifacts.ts +++ b/frontend/server/handlers/artifacts.ts @@ -13,8 +13,8 @@ // limitations under the License. import fetch from 'node-fetch'; import { AWSConfigs, HttpConfigs, MinioConfigs, ProcessEnv } from '../configs'; -import {Client as MinioClient} from 'minio'; -import {PreviewStream, findFileOnPodVolume, parseJSONString} from '../utils'; +import { Client as MinioClient } from 'minio'; +import { PreviewStream, findFileOnPodVolume, parseJSONString } from '../utils'; import {createMinioClient, getObjectStream} from '../minio-helper'; import * as serverInfo from '../helpers/server-info'; import { Handler, Request, Response } from 'express'; @@ -24,9 +24,9 @@ import { HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS } from '../consts'; import * as fs from 'fs'; import { isAllowedDomain } from './domain-checker'; -import {getK8sSecret} from "../k8s-helper"; -import {StorageOptions} from "@google-cloud/storage/build/src/storage"; -import {CredentialBody} from "google-auth-library/build/src/auth/credentials"; +import { getK8sSecret } from "../k8s-helper"; +import { StorageOptions } from "@google-cloud/storage/build/src/storage"; +import { CredentialBody } from "google-auth-library/build/src/auth/credentials"; /** * ArtifactsQueryStrings describes the expected query strings key value pairs diff --git a/frontend/server/minio-helper.test.ts b/frontend/server/minio-helper.test.ts index 763adeb081e..3e139d71658 100644 --- a/frontend/server/minio-helper.test.ts +++ b/frontend/server/minio-helper.test.ts @@ -14,12 +14,11 @@ import * as zlib from 'zlib'; import { PassThrough } from 'stream'; import { Client as MinioClient } from 'minio'; -import { awsInstanceProfileCredentials } from './aws-helper'; import { createMinioClient, isTarball, maybeTarball, getObjectStream } from './minio-helper'; -import { V1beta1JobTemplateSpec } from '@kubernetes/client-node'; +const { fromNodeProviderChain } = require('@aws-sdk/credential-providers'); jest.mock('minio'); -jest.mock('./aws-helper'); +jest.mock('@aws-sdk/credential-providers'); describe('minio-helper', () => { const MockedMinioClient: jest.Mock = MinioClient as any; @@ -57,30 +56,23 @@ describe('minio-helper', () => { }); it('uses EC2 metadata credentials if access key are not provided.', async () => { - (awsInstanceProfileCredentials.getCredentials as jest.Mock).mockImplementation(() => - Promise.resolve({ - AccessKeyId: 'AccessKeyId', - Code: 'Success', - Expiration: new Date(Date.now() + 1000).toISOString(), // expires 1 sec later - LastUpdated: '2019-12-17T10:55:38Z', - SecretAccessKey: 'SecretAccessKey', - Token: 'SessionToken', - Type: 'AWS-HMAC', - }), + (fromNodeProviderChain as jest.Mock).mockImplementation(() => + () => + Promise.resolve({ + accessKeyId: 'AccessKeyId', + secretAccessKey: 'SecretAccessKey', + sessionToken: 'SessionToken', + }) ); - (awsInstanceProfileCredentials.ok as jest.Mock).mockImplementation(() => - Promise.resolve(true), - ); - - const client = await createMinioClient({ endPoint: 's3.awsamazon.com' }, 's3'); - + const client = await createMinioClient({ endPoint: 's3.amazonaws.com' }, 's3'); expect(client).toBeInstanceOf(MinioClient); expect(MockedMinioClient).toHaveBeenCalledWith({ accessKey: 'AccessKeyId', - endPoint: 's3.awsamazon.com', + endPoint: 's3.amazonaws.com', secretKey: 'SecretAccessKey', sessionToken: 'SessionToken', }); + expect(MockedMinioClient).toBeCalledTimes(1); }); }); diff --git a/frontend/server/minio-helper.ts b/frontend/server/minio-helper.ts index 20ac1dc5ca3..fae0da14c3a 100644 --- a/frontend/server/minio-helper.ts +++ b/frontend/server/minio-helper.ts @@ -18,11 +18,11 @@ import peek from 'peek-stream'; import gunzip from 'gunzip-maybe'; import { URL } from 'url'; import { Client as MinioClient, ClientOptions as MinioClientOptions } from 'minio'; -import { awsInstanceProfileCredentials, isAWSS3Endpoint } from './aws-helper'; -import { S3ProviderInfo} from "./handlers/artifacts"; -import {getK8sSecret} from "./k8s-helper"; -import {parseJSONString} from "./utils"; -const { fromNodeProviderChain, fromEnv } = require('@aws-sdk/credential-providers'); +import { isAWSS3Endpoint } from './aws-helper'; +import { S3ProviderInfo } from "./handlers/artifacts"; +import { getK8sSecret } from "./k8s-helper"; +import { parseJSONString } from "./utils"; +const { fromNodeProviderChain } = require('@aws-sdk/credential-providers'); /** MinioRequestConfig describes the info required to retrieve an artifact. */ export interface MinioRequestConfig { bucket: string; diff --git a/frontend/src/components/tabs/InputOutputTab.tsx b/frontend/src/components/tabs/InputOutputTab.tsx index 9957fd164f7..c4e1b213fa0 100644 --- a/frontend/src/components/tabs/InputOutputTab.tsx +++ b/frontend/src/components/tabs/InputOutputTab.tsx @@ -27,7 +27,8 @@ import { getArtifactName, getArtifactTypeName, getArtifactTypes, - getLinkedArtifactsByExecution, getStoreSessionInfoFromArtifact, + getLinkedArtifactsByExecution, + getStoreSessionInfoFromArtifact, LinkedArtifact, } from 'src/mlmd/MlmdUtils'; import { ArtifactType, Execution } from 'src/third_party/mlmd'; diff --git a/frontend/src/components/viewers/MetricsVisualizations.tsx b/frontend/src/components/viewers/MetricsVisualizations.tsx index fab79d0c980..22ec593af02 100644 --- a/frontend/src/components/viewers/MetricsVisualizations.tsx +++ b/frontend/src/components/viewers/MetricsVisualizations.tsx @@ -27,7 +27,8 @@ import { getMetadataValue } from 'src/mlmd/library'; import { filterArtifactsByType, filterLinkedArtifactsByType, - getArtifactName, getStoreSessionInfoFromArtifact, + getArtifactName, + getStoreSessionInfoFromArtifact, LinkedArtifact, } from 'src/mlmd/MlmdUtils'; import { Artifact, ArtifactType, Execution } from 'src/third_party/mlmd'; From e0ff01dc4f5cb3d25cd591a96842a9e794d5c208 Mon Sep 17 00:00:00 2001 From: Humair Khan Date: Thu, 9 May 2024 08:54:19 -0400 Subject: [PATCH 8/8] chore: prettier fixes Signed-off-by: Humair Khan --- frontend/server/handlers/artifacts.ts | 224 ++++++++------- .../integration-tests/artifact-get.test.ts | 258 ++++++++++-------- frontend/server/minio-helper.test.ts | 35 ++- frontend/server/minio-helper.ts | 91 +++--- frontend/server/workflow-helper.ts | 20 +- frontend/src/components/ArtifactPreview.tsx | 15 +- .../src/components/MinioArtifactPreview.tsx | 2 +- .../src/components/tabs/InputOutputTab.tsx | 35 ++- .../components/tabs/RuntimeNodeDetailsV2.tsx | 10 +- .../viewers/MetricsVisualizations.tsx | 4 +- frontend/src/lib/Apis.test.ts | 6 +- frontend/src/lib/Apis.ts | 9 +- frontend/src/lib/OutputArtifactLoader.test.ts | 2 +- frontend/src/lib/OutputArtifactLoader.ts | 7 +- frontend/src/mlmd/MlmdUtils.ts | 7 +- 15 files changed, 407 insertions(+), 318 deletions(-) diff --git a/frontend/server/handlers/artifacts.ts b/frontend/server/handlers/artifacts.ts index 5ccbbe6e36b..d5bea2cbf6a 100644 --- a/frontend/server/handlers/artifacts.ts +++ b/frontend/server/handlers/artifacts.ts @@ -15,7 +15,7 @@ import fetch from 'node-fetch'; import { AWSConfigs, HttpConfigs, MinioConfigs, ProcessEnv } from '../configs'; import { Client as MinioClient } from 'minio'; import { PreviewStream, findFileOnPodVolume, parseJSONString } from '../utils'; -import {createMinioClient, getObjectStream} from '../minio-helper'; +import { createMinioClient, getObjectStream } from '../minio-helper'; import * as serverInfo from '../helpers/server-info'; import { Handler, Request, Response } from 'express'; import { Storage } from '@google-cloud/storage'; @@ -24,9 +24,9 @@ import { HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS } from '../consts'; import * as fs from 'fs'; import { isAllowedDomain } from './domain-checker'; -import { getK8sSecret } from "../k8s-helper"; -import { StorageOptions } from "@google-cloud/storage/build/src/storage"; -import { CredentialBody } from "google-auth-library/build/src/auth/credentials"; +import { getK8sSecret } from '../k8s-helper'; +import { StorageOptions } from '@google-cloud/storage/build/src/storage'; +import { CredentialBody } from 'google-auth-library/build/src/auth/credentials'; /** * ArtifactsQueryStrings describes the expected query strings key value pairs @@ -77,10 +77,10 @@ export interface GCSProviderInfo { * @param tryExtract whether the handler try to extract content from *.tar.gz files. */ export function getArtifactsHandler({ - artifactsConfigs, - useParameter, - tryExtract, - }: { + artifactsConfigs, + useParameter, + tryExtract, +}: { artifactsConfigs: { aws: AWSConfigs; http: HttpConfigs; @@ -95,7 +95,9 @@ export function getArtifactsHandler({ const source = useParameter ? req.params.source : req.query.source; const bucket = useParameter ? req.params.bucket : req.query.bucket; const key = useParameter ? req.params[0] : req.query.key; - const { peek = 0, providerInfo = "", namespace = ""} = req.query as Partial; + const { peek = 0, providerInfo = '', namespace = '' } = req.query as Partial< + ArtifactsQueryStrings + >; if (!source) { res.status(500).send('Storage source is missing from artifact request'); return; @@ -110,10 +112,10 @@ export function getArtifactsHandler({ } console.log(`Getting storage artifact at: ${source}: ${bucket}/${key}`); - let client : MinioClient; + let client: MinioClient; switch (source) { case 'gcs': - await getGCSArtifactHandler({bucket, key}, peek, providerInfo, namespace)(req, res); + await getGCSArtifactHandler({ bucket, key }, peek, providerInfo, namespace)(req, res); break; case 'minio': try { @@ -123,13 +125,13 @@ export function getArtifactsHandler({ return; } await getMinioArtifactHandler( - { - bucket, - client, - key, - tryExtract, - }, - peek, + { + bucket, + client, + key, + tryExtract, + }, + peek, )(req, res); break; case 's3': @@ -140,30 +142,30 @@ export function getArtifactsHandler({ return; } await getMinioArtifactHandler( - { - bucket, - client, - key, - }, - peek, + { + bucket, + client, + key, + }, + peek, )(req, res); break; case 'http': case 'https': await getHttpArtifactsHandler( - allowedDomain, - getHttpUrl(source, http.baseUrl || '', bucket, key), - http.auth, - peek, + allowedDomain, + getHttpUrl(source, http.baseUrl || '', bucket, key), + http.auth, + peek, )(req, res); break; case 'volume': await getVolumeArtifactsHandler( - { - bucket, - key, - }, - peek, + { + bucket, + key, + }, + peek, )(req, res); break; default: @@ -202,7 +204,7 @@ function getHttpArtifactsHandler( if (auth.key.length > 0) { // inject original request's value if exists, otherwise default to provided default value headers[auth.key] = - req.headers[auth.key] || req.headers[auth.key.toLowerCase()] || auth.defaultValue; + req.headers[auth.key] || req.headers[auth.key.toLowerCase()] || auth.defaultValue; } if (!isAllowedDomain(url, allowedDomain)) { res.status(500).send(`Domain not allowed.`); @@ -210,9 +212,9 @@ function getHttpArtifactsHandler( } const response = await fetch(url, { headers }); response.body - .on('error', err => res.status(500).send(`Unable to retrieve artifact: ${err}`)) - .pipe(new PreviewStream({ peek })) - .pipe(res); + .on('error', err => res.status(500).send(`Unable to retrieve artifact: ${err}`)) + .pipe(new PreviewStream({ peek })) + .pipe(res); }; } @@ -234,31 +236,45 @@ function getMinioArtifactHandler( }; } -async function parseGCSProviderInfo(providerInfo: GCSProviderInfo, namespace: string): Promise { +async function parseGCSProviderInfo( + providerInfo: GCSProviderInfo, + namespace: string, +): Promise { if (!providerInfo.Params.tokenKey || !providerInfo.Params.secretName) { - throw new Error('Provider info with fromEnv:false supplied with incomplete secret credential info.'); + throw new Error( + 'Provider info with fromEnv:false supplied with incomplete secret credential info.', + ); } let configGCS: StorageOptions; try { - const tokenString = await getK8sSecret(providerInfo.Params.secretName, providerInfo.Params.tokenKey, namespace); + const tokenString = await getK8sSecret( + providerInfo.Params.secretName, + providerInfo.Params.tokenKey, + namespace, + ); const credentials = parseJSONString(tokenString); - configGCS = {credentials}; - configGCS.scopes = "https://www.googleapis.com/auth/devstorage.read_write"; + configGCS = { credentials }; + configGCS.scopes = 'https://www.googleapis.com/auth/devstorage.read_write'; return configGCS; } catch (err) { throw new Error('Failed to parse GCS Provider config. Error: ' + err); } } -function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: number = 0, providerInfoString?: string, namespace?: string) { +function getGCSArtifactHandler( + options: { key: string; bucket: string }, + peek: number = 0, + providerInfoString?: string, + namespace?: string, +) { const { key, bucket } = options; return async (_: Request, res: Response) => { try { - let storageOptions : StorageOptions | undefined; - if(providerInfoString) { - const providerInfo = parseJSONString(providerInfoString); - if (providerInfo && providerInfo.Params.fromEnv === "false") { - if (!namespace){ + let storageOptions: StorageOptions | undefined; + if (providerInfoString) { + const providerInfo = parseJSONString(providerInfoString); + if (providerInfo && providerInfo.Params.fromEnv === 'false') { + if (!namespace) { res.status(500).send('Failed to parse provider info. Reason: No namespace provided'); } else { storageOptions = await parseGCSProviderInfo(providerInfo, namespace); @@ -279,11 +295,11 @@ function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: n // Build a RegExp object that only recognizes asterisks ('*'), and // escapes everything else. const regex = new RegExp( - '^' + + '^' + key - .split(/\*+/) - .map(escapeRegexChars) - .join('.*') + + .split(/\*+/) + .map(escapeRegexChars) + .join('.*') + '$', ); return regex.test(f.name); @@ -295,16 +311,16 @@ function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: n return; } console.log( - `Found ${matchingFiles.length} matching files: `, - matchingFiles.map(file => file.name).join(','), + `Found ${matchingFiles.length} matching files: `, + matchingFiles.map(file => file.name).join(','), ); let contents = ''; // TODO: support peek for concatenated matching files if (peek) { matchingFiles[0] - .createReadStream() - .pipe(new PreviewStream({ peek })) - .pipe(res); + .createReadStream() + .pipe(new PreviewStream({ peek })) + .pipe(res); return; } @@ -312,17 +328,17 @@ function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: n matchingFiles.forEach((f, i) => { const buffer: Buffer[] = []; f.createReadStream() - .on('data', data => buffer.push(Buffer.from(data))) - .on('end', () => { - contents += - Buffer.concat(buffer) - .toString() - .trim() + '\n'; - if (i === matchingFiles.length - 1) { - res.send(contents); - } - }) - .on('error', () => res.status(500).send('Failed to read file: ' + f.name)); + .on('data', data => buffer.push(Buffer.from(data))) + .on('end', () => { + contents += + Buffer.concat(buffer) + .toString() + .trim() + '\n'; + if (i === matchingFiles.length - 1) { + res.send(contents); + } + }) + .on('error', () => res.status(500).send('Failed to read file: ' + f.name)); }); } catch (err) { res.status(500).send('Failed to download GCS file(s). Error: ' + err); @@ -362,14 +378,14 @@ function getVolumeArtifactsHandler(options: { bucket: string; key: string }, pee const stat = await fs.promises.stat(filePath); if (stat.isDirectory()) { res - .status(400) - .send(`Failed to open volume file ${filePath} is directory, does not support now`); + .status(400) + .send(`Failed to open volume file ${filePath} is directory, does not support now`); return; } fs.createReadStream(filePath) - .pipe(new PreviewStream({ peek })) - .pipe(res); + .pipe(new PreviewStream({ peek })) + .pipe(res); } catch (err) { console.log(`Failed to open volume: ${err}`); res.status(500).send(`Failed to open volume.`); @@ -405,10 +421,10 @@ const QUERIES = { }; export function getArtifactsProxyHandler({ - enabled, - allowedDomain, - namespacedServiceGetter, - }: { + enabled, + allowedDomain, + namespacedServiceGetter, +}: { enabled: boolean; allowedDomain: string; namespacedServiceGetter: NamespacedServiceGetter; @@ -417,36 +433,36 @@ export function getArtifactsProxyHandler({ return (req, res, next) => next(); } return proxy( - (_pathname, req) => { - // only proxy requests with namespace query parameter - return !!getNamespaceFromUrl(req.url || ''); + (_pathname, req) => { + // only proxy requests with namespace query parameter + return !!getNamespaceFromUrl(req.url || ''); + }, + { + changeOrigin: true, + onProxyReq: proxyReq => { + console.log('Proxied artifact request: ', proxyReq.path); }, - { - changeOrigin: true, - onProxyReq: proxyReq => { - console.log('Proxied artifact request: ', proxyReq.path); - }, - pathRewrite: (pathStr, req) => { - const url = new URL(pathStr || '', DUMMY_BASE_PATH); - url.searchParams.delete(QUERIES.NAMESPACE); - return url.pathname + url.search; - }, - router: req => { - const namespace = getNamespaceFromUrl(req.url || ''); - if (!namespace) { - console.log(`namespace query param expected in ${req.url}.`); - throw new Error(`namespace query param expected.`); - } - const urlStr = namespacedServiceGetter(namespace!); - if (!isAllowedDomain(urlStr, allowedDomain)) { - console.log(`Domain is not allowed.`); - throw new Error(`Domain is not allowed.`); - } - return namespacedServiceGetter(namespace!); - }, - target: '/artifacts', - headers: HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS, + pathRewrite: (pathStr, req) => { + const url = new URL(pathStr || '', DUMMY_BASE_PATH); + url.searchParams.delete(QUERIES.NAMESPACE); + return url.pathname + url.search; + }, + router: req => { + const namespace = getNamespaceFromUrl(req.url || ''); + if (!namespace) { + console.log(`namespace query param expected in ${req.url}.`); + throw new Error(`namespace query param expected.`); + } + const urlStr = namespacedServiceGetter(namespace!); + if (!isAllowedDomain(urlStr, allowedDomain)) { + console.log(`Domain is not allowed.`); + throw new Error(`Domain is not allowed.`); + } + return namespacedServiceGetter(namespace!); }, + target: '/artifacts', + headers: HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS, + }, ); } diff --git a/frontend/server/integration-tests/artifact-get.test.ts b/frontend/server/integration-tests/artifact-get.test.ts index 87aa3bfe34f..d104e1a8e63 100644 --- a/frontend/server/integration-tests/artifact-get.test.ts +++ b/frontend/server/integration-tests/artifact-get.test.ts @@ -23,7 +23,7 @@ import { UIServer } from '../app'; import { loadConfigs } from '../configs'; import * as serverInfo from '../helpers/server-info'; import { commonSetup, mkTempDir } from './test-helper'; -import {getK8sSecret} from "../k8s-helper"; +import { getK8sSecret } from '../k8s-helper'; const MinioClient = minio.Client; jest.mock('minio'); @@ -100,17 +100,17 @@ describe('/artifacts', () => { process.env.AWS_SECRET_ACCESS_KEY = 'awsSecret123'; const request = requests(app.start()); request - .get('/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt') - .expect(200, artifactContent, err => { - expect(mockedMinioClient).toBeCalledWith({ - accessKey: 'aws123', - endPoint: 's3.amazonaws.com', - region: 'us-east-1', - secretKey: 'awsSecret123', - useSSL: true, - }); - done(err); + .get('/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt') + .expect(200, artifactContent, err => { + expect(mockedMinioClient).toBeCalledWith({ + accessKey: 'aws123', + endPoint: 's3.amazonaws.com', + region: 'us-east-1', + secretKey: 'awsSecret123', + useSSL: true, }); + done(err); + }); }); it('responds with artifact if source is AWS S3, and creds are sourced from Load Configs', done => { @@ -145,39 +145,43 @@ describe('/artifacts', () => { app = new UIServer(configs); const request = requests(app.start()); const providerInfo = { - "Params": { - "accessKeyKey": "someSecret", + Params: { + accessKeyKey: 'someSecret', // this not set and default is used (tls=true) // since aws connections are always tls secured - "disableSSL": "false", - "endpoint": "s3.amazonaws.com", - "fromEnv": "false", + disableSSL: 'false', + endpoint: 's3.amazonaws.com', + fromEnv: 'false', // this not set and default is used // since aws connections always have the same port - "port": "0001", - "region": "us-east-2", - "secretKeyKey": "someSecret", - "secretName": "aws-s3-creds" + port: '0001', + region: 'us-east-2', + secretKeyKey: 'someSecret', + secretName: 'aws-s3-creds', }, - "Provider": "s3", + Provider: 's3', }; - const namespace = "test"; + const namespace = 'test'; request - .get(`/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt&namespace=${namespace}&providerInfo=${JSON.stringify(providerInfo)}`) - .expect(200, artifactContent, err => { - expect(mockedMinioClient).toBeCalledWith({ - accessKey: 'someSecret', - endPoint: 's3.amazonaws.com', - port: undefined, - region: 'us-east-2', - secretKey: 'someSecret', - useSSL: undefined, - }); - expect(mockedMinioClient).toBeCalledTimes(1); - expect(mockedGetK8sSecret).toBeCalledWith("aws-s3-creds", "someSecret", `${namespace}`); - expect(mockedGetK8sSecret).toBeCalledTimes(2); - done(err); + .get( + `/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt&namespace=${namespace}&providerInfo=${JSON.stringify( + providerInfo, + )}`, + ) + .expect(200, artifactContent, err => { + expect(mockedMinioClient).toBeCalledWith({ + accessKey: 'someSecret', + endPoint: 's3.amazonaws.com', + port: undefined, + region: 'us-east-2', + secretKey: 'someSecret', + useSSL: undefined, }); + expect(mockedMinioClient).toBeCalledTimes(1); + expect(mockedGetK8sSecret).toBeCalledWith('aws-s3-creds', 'someSecret', `${namespace}`); + expect(mockedGetK8sSecret).toBeCalledTimes(2); + done(err); + }); }); it('responds error when source is s3, and creds are sourced from Provider Configs, but no namespace is provided', done => { @@ -186,23 +190,31 @@ describe('/artifacts', () => { app = new UIServer(configs); const request = requests(app.start()); const providerInfo = { - "Params": { - "accessKeyKey": "AWS_ACCESS_KEY_ID", - "disableSSL": "false", - "endpoint": "s3.amazonaws.com", - "fromEnv": "false", - "region": "us-east-2", - "secretKeyKey": "AWS_SECRET_ACCESS_KEY", - "secretName": "aws-s3-creds" + Params: { + accessKeyKey: 'AWS_ACCESS_KEY_ID', + disableSSL: 'false', + endpoint: 's3.amazonaws.com', + fromEnv: 'false', + region: 'us-east-2', + secretKeyKey: 'AWS_SECRET_ACCESS_KEY', + secretName: 'aws-s3-creds', }, - "Provider": "s3", + Provider: 's3', }; request - .get(`/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt$&providerInfo=${JSON.stringify(providerInfo)}`) - .expect(500, 'Failed to initialize Minio Client for S3 Provider: Error: Artifact Store provider given, but no namespace provided.', err => { + .get( + `/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt$&providerInfo=${JSON.stringify( + providerInfo, + )}`, + ) + .expect( + 500, + 'Failed to initialize Minio Client for S3 Provider: Error: Artifact Store provider given, but no namespace provided.', + err => { expect(mockedGetK8sSecret).toBeCalledTimes(0); done(err); - }); + }, + ); }); it('responds with artifact if source is s3-compatible, and creds are sourced from Provider Configs', done => { @@ -213,34 +225,38 @@ describe('/artifacts', () => { app = new UIServer(configs); const request = requests(app.start()); const providerInfo = { - "Params": { - "accessKeyKey": "someSecret", - "disableSSL": "false", - "endpoint": "https://mys3.com", - "fromEnv": "false", - "region": "auto", - "secretKeyKey": "someSecret", - "secretName": "my-secret" + Params: { + accessKeyKey: 'someSecret', + disableSSL: 'false', + endpoint: 'https://mys3.com', + fromEnv: 'false', + region: 'auto', + secretKeyKey: 'someSecret', + secretName: 'my-secret', }, - "Provider": "s3", + Provider: 's3', }; - const namespace = "test"; + const namespace = 'test'; request - .get(`/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt&namespace=${namespace}&providerInfo=${JSON.stringify(providerInfo)}`) - .expect(200, artifactContent, err => { - expect(mockedMinioClient).toBeCalledWith({ - accessKey: 'someSecret', - endPoint: 'mys3.com', - port: undefined, - region: 'auto', - secretKey: 'someSecret', - useSSL: true, - }); - expect(mockedMinioClient).toBeCalledTimes(1); - expect(mockedGetK8sSecret).toBeCalledWith("my-secret", "someSecret", `${namespace}`); - expect(mockedGetK8sSecret).toBeCalledTimes(2); - done(err); + .get( + `/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt&namespace=${namespace}&providerInfo=${JSON.stringify( + providerInfo, + )}`, + ) + .expect(200, artifactContent, err => { + expect(mockedMinioClient).toBeCalledWith({ + accessKey: 'someSecret', + endPoint: 'mys3.com', + port: undefined, + region: 'auto', + secretKey: 'someSecret', + useSSL: true, }); + expect(mockedMinioClient).toBeCalledTimes(1); + expect(mockedGetK8sSecret).toBeCalledWith('my-secret', 'someSecret', `${namespace}`); + expect(mockedGetK8sSecret).toBeCalledTimes(2); + done(err); + }); }); it('responds with artifact if source is s3-compatible, and creds are sourced from Provider Configs, with endpoint port', done => { @@ -251,34 +267,38 @@ describe('/artifacts', () => { app = new UIServer(configs); const request = requests(app.start()); const providerInfo = { - "Params": { - "accessKeyKey": "someSecret", - "disableSSL": "false", - "endpoint": "https://mys3.ns.svc.cluster.local:1234", - "fromEnv": "false", - "region": "auto", - "secretKeyKey": "someSecret", - "secretName": "my-secret" + Params: { + accessKeyKey: 'someSecret', + disableSSL: 'false', + endpoint: 'https://mys3.ns.svc.cluster.local:1234', + fromEnv: 'false', + region: 'auto', + secretKeyKey: 'someSecret', + secretName: 'my-secret', }, - "Provider": "s3", + Provider: 's3', }; - const namespace = "test"; + const namespace = 'test'; request - .get(`/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt&namespace=${namespace}&providerInfo=${JSON.stringify(providerInfo)}`) - .expect(200, artifactContent, err => { - expect(mockedMinioClient).toBeCalledWith({ - accessKey: 'someSecret', - endPoint: 'mys3.ns.svc.cluster.local', - port: 1234, - region: 'auto', - secretKey: 'someSecret', - useSSL: true, - }); - expect(mockedMinioClient).toBeCalledTimes(1); - expect(mockedGetK8sSecret).toBeCalledWith("my-secret", "someSecret", `${namespace}`); - expect(mockedGetK8sSecret).toBeCalledTimes(2); - done(err); + .get( + `/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt&namespace=${namespace}&providerInfo=${JSON.stringify( + providerInfo, + )}`, + ) + .expect(200, artifactContent, err => { + expect(mockedMinioClient).toBeCalledWith({ + accessKey: 'someSecret', + endPoint: 'mys3.ns.svc.cluster.local', + port: 1234, + region: 'auto', + secretKey: 'someSecret', + useSSL: true, }); + expect(mockedMinioClient).toBeCalledTimes(1); + expect(mockedGetK8sSecret).toBeCalledWith('my-secret', 'someSecret', `${namespace}`); + expect(mockedGetK8sSecret).toBeCalledTimes(2); + done(err); + }); }); it('responds with artifact if source is gcs, and creds are sourced from Provider Configs', done => { @@ -292,36 +312,40 @@ describe('/artifacts', () => { mockedGcsStorage.mockImplementationOnce(() => ({ bucket: () => ({ getFiles: () => - Promise.resolve([[{ name: 'hello/world.txt', createReadStream: () => stream }]]), + Promise.resolve([[{ name: 'hello/world.txt', createReadStream: () => stream }]]), }), })); const configs = loadConfigs(argv, {}); app = new UIServer(configs); const request = requests(app.start()); const providerInfo = { - "Params": { - "fromEnv": "false", - "secretName": "someSecret", - "tokenKey": 'somekey' + Params: { + fromEnv: 'false', + secretName: 'someSecret', + tokenKey: 'somekey', }, - "Provider": "gs", + Provider: 'gs', }; - const namespace = "test"; + const namespace = 'test'; request - .get(`/artifacts/get?source=gcs&bucket=ml-pipeline&key=hello%2Fworld.txt&namespace=${namespace}&providerInfo=${JSON.stringify(providerInfo)}`) - .expect(200, artifactContent + '\n', err => { - const expectedArg = { - "credentials": { - "client_email": "testemail", - "private_key": "testkey", - }, - "scopes": "https://www.googleapis.com/auth/devstorage.read_write" - }; - expect(mockedGcsStorage).toBeCalledWith(expectedArg); - expect(mockedGetK8sSecret).toBeCalledWith("someSecret", "somekey", `${namespace}`); - expect(mockedGetK8sSecret).toBeCalledTimes(1); - done(err); - }); + .get( + `/artifacts/get?source=gcs&bucket=ml-pipeline&key=hello%2Fworld.txt&namespace=${namespace}&providerInfo=${JSON.stringify( + providerInfo, + )}`, + ) + .expect(200, artifactContent + '\n', err => { + const expectedArg = { + credentials: { + client_email: 'testemail', + private_key: 'testkey', + }, + scopes: 'https://www.googleapis.com/auth/devstorage.read_write', + }; + expect(mockedGcsStorage).toBeCalledWith(expectedArg); + expect(mockedGetK8sSecret).toBeCalledWith('someSecret', 'somekey', `${namespace}`); + expect(mockedGetK8sSecret).toBeCalledTimes(1); + done(err); + }); }); it('responds with partial s3 artifact if peek=5 flag is set', done => { diff --git a/frontend/server/minio-helper.test.ts b/frontend/server/minio-helper.test.ts index 3e139d71658..554c81020e5 100644 --- a/frontend/server/minio-helper.test.ts +++ b/frontend/server/minio-helper.test.ts @@ -30,11 +30,14 @@ describe('minio-helper', () => { describe('createMinioClient', () => { it('creates a minio client with the provided configs.', async () => { - const client = await createMinioClient({ - accessKey: 'accesskey', - endPoint: 'minio.kubeflow:80', - secretKey: 'secretkey', - }, 's3'); + const client = await createMinioClient( + { + accessKey: 'accesskey', + endPoint: 'minio.kubeflow:80', + secretKey: 'secretkey', + }, + 's3', + ); expect(client).toBeInstanceOf(MinioClient); expect(MockedMinioClient).toHaveBeenCalledWith({ @@ -45,9 +48,12 @@ describe('minio-helper', () => { }); it('fallbacks to the provided configs if EC2 metadata is not available.', async () => { - const client = await createMinioClient({ - endPoint: 'minio.kubeflow:80', - }, 's3'); + const client = await createMinioClient( + { + endPoint: 'minio.kubeflow:80', + }, + 's3', + ); expect(client).toBeInstanceOf(MinioClient); expect(MockedMinioClient).toHaveBeenCalledWith({ @@ -56,13 +62,12 @@ describe('minio-helper', () => { }); it('uses EC2 metadata credentials if access key are not provided.', async () => { - (fromNodeProviderChain as jest.Mock).mockImplementation(() => - () => - Promise.resolve({ - accessKeyId: 'AccessKeyId', - secretAccessKey: 'SecretAccessKey', - sessionToken: 'SessionToken', - }) + (fromNodeProviderChain as jest.Mock).mockImplementation(() => () => + Promise.resolve({ + accessKeyId: 'AccessKeyId', + secretAccessKey: 'SecretAccessKey', + sessionToken: 'SessionToken', + }), ); const client = await createMinioClient({ endPoint: 's3.amazonaws.com' }, 's3'); expect(client).toBeInstanceOf(MinioClient); diff --git a/frontend/server/minio-helper.ts b/frontend/server/minio-helper.ts index fae0da14c3a..5f244800f2a 100644 --- a/frontend/server/minio-helper.ts +++ b/frontend/server/minio-helper.ts @@ -19,9 +19,9 @@ import gunzip from 'gunzip-maybe'; import { URL } from 'url'; import { Client as MinioClient, ClientOptions as MinioClientOptions } from 'minio'; import { isAWSS3Endpoint } from './aws-helper'; -import { S3ProviderInfo } from "./handlers/artifacts"; -import { getK8sSecret } from "./k8s-helper"; -import { parseJSONString } from "./utils"; +import { S3ProviderInfo } from './handlers/artifacts'; +import { getK8sSecret } from './k8s-helper'; +import { parseJSONString } from './utils'; const { fromNodeProviderChain } = require('@aws-sdk/credential-providers'); /** MinioRequestConfig describes the info required to retrieve an artifact. */ export interface MinioRequestConfig { @@ -56,16 +56,21 @@ export interface MinioClientOptionsWithOptionalSecrets extends Partial(providerInfoString); + const providerInfo = parseJSONString(providerInfoString); if (!providerInfo) { - throw new Error("Failed to parse provider info."); + throw new Error('Failed to parse provider info.'); } // If fromEnv == false, we rely on the default credentials or env to provide credentials (e.g. IRSA) - if (providerInfo.Params.fromEnv === "false") { - if (!namespace){ - throw new Error("Artifact Store provider given, but no namespace provided."); + if (providerInfo.Params.fromEnv === 'false') { + if (!namespace) { + throw new Error('Artifact Store provider given, but no namespace provided.'); } else { config = await parseS3ProviderInfo(config, providerInfo, namespace); } @@ -73,7 +78,7 @@ export async function createMinioClient(config: MinioClientOptionsWithOptionalSe } // If using s3 and sourcing credentials from environment (currently only aws is supported) - if (providerType === "s3" && (!config.accessKey || !config.secretKey)) { + if (providerType === 's3' && (!config.accessKey || !config.secretKey)) { // AWS S3 with credentials from provider chain if (isAWSS3Endpoint(config.endPoint)) { try { @@ -91,12 +96,14 @@ export async function createMinioClient(config: MinioClientOptionsWithOptionalSe console.error('Unable to get aws instance profile credentials: ', e); } } else { - console.error('Encountered S3-compatible provider type with no provided credentials, and unsupported environment based credential support.'); + console.error( + 'Encountered S3-compatible provider type with no provided credentials, and unsupported environment based credential support.', + ); } } // If using any AWS or S3 compatible store (e.g. minio, aws s3 when using manual creds, ceph, etc.) - let mc : MinioClient; + let mc: MinioClient; try { mc = await new MinioClient(config as MinioClientOptions); } catch (err) { @@ -106,21 +113,41 @@ export async function createMinioClient(config: MinioClientOptionsWithOptionalSe } // Parse provider info for any s3 compatible store that's not AWS S3 -async function parseS3ProviderInfo(config: MinioClientOptionsWithOptionalSecrets, providerInfo: S3ProviderInfo, namespace: string) : Promise { - if (!providerInfo.Params.accessKeyKey || !providerInfo.Params.secretKeyKey || !providerInfo.Params.secretName) { - throw new Error('Provider info with fromEnv:false supplied with incomplete secret credential info.'); +async function parseS3ProviderInfo( + config: MinioClientOptionsWithOptionalSecrets, + providerInfo: S3ProviderInfo, + namespace: string, +): Promise { + if ( + !providerInfo.Params.accessKeyKey || + !providerInfo.Params.secretKeyKey || + !providerInfo.Params.secretName + ) { + throw new Error( + 'Provider info with fromEnv:false supplied with incomplete secret credential info.', + ); } try { - config.accessKey = await getK8sSecret(providerInfo.Params.secretName, providerInfo.Params.accessKeyKey, namespace); - config.secretKey = await getK8sSecret(providerInfo.Params.secretName, providerInfo.Params.secretKeyKey, namespace); + config.accessKey = await getK8sSecret( + providerInfo.Params.secretName, + providerInfo.Params.accessKeyKey, + namespace, + ); + config.secretKey = await getK8sSecret( + providerInfo.Params.secretName, + providerInfo.Params.secretKeyKey, + namespace, + ); } catch (e) { - throw new Error(`Encountered error when trying to fetch provider secret ${providerInfo.Params.secretName}.`); + throw new Error( + `Encountered error when trying to fetch provider secret ${providerInfo.Params.secretName}.`, + ); } if (isAWSS3Endpoint(providerInfo.Params.endpoint)) { if (providerInfo.Params.endpoint) { - if(providerInfo.Params.endpoint.startsWith("https")){ + if (providerInfo.Params.endpoint.startsWith('https')) { const parseEndpoint = new URL(providerInfo.Params.endpoint); config.endPoint = parseEndpoint.hostname; } else { @@ -153,7 +180,7 @@ async function parseS3ProviderInfo(config: MinioClientOptionsWithOptionalSecrets config.region = providerInfo.Params.region ? providerInfo.Params.region : undefined; if (providerInfo.Params.disableSSL) { - config.useSSL = !(providerInfo.Params.disableSSL.toLowerCase() === "true"); + config.useSSL = !(providerInfo.Params.disableSSL.toLowerCase() === 'true'); } else { config.useSSL = undefined; } @@ -178,8 +205,8 @@ export function isTarball(buf: Buffer) { const v0 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x20, 0x20, 0x00]; return ( - v1.reduce((res, curr, i) => res && curr === buf[offset + i], true) || - v0.reduce((res, curr, i) => res && curr === buf[offset + i], true as boolean) + v1.reduce((res, curr, i) => res && curr === buf[offset + i], true) || + v0.reduce((res, curr, i) => res && curr === buf[offset + i], true as boolean) ); } @@ -189,11 +216,11 @@ export function isTarball(buf: Buffer) { */ export function maybeTarball(): Transform { return peek( - { newline: false, maxBuffer: 264 }, - (data: Buffer, swap: (error?: Error, parser?: Transform) => void) => { - if (isTarball(data)) swap(undefined, extractFirstTarRecordAsStream()); - else swap(undefined, new PassThrough()); - }, + { newline: false, maxBuffer: 264 }, + (data: Buffer, swap: (error?: Error, parser?: Transform) => void) => { + if (isTarball(data)) swap(undefined, extractFirstTarRecordAsStream()); + else swap(undefined, new PassThrough()); + }, ); } @@ -235,11 +262,11 @@ function extractFirstTarRecordAsStream() { * */ export async function getObjectStream({ - bucket, - key, - client, - tryExtract = true, - }: MinioRequestConfig): Promise { + bucket, + key, + client, + tryExtract = true, +}: MinioRequestConfig): Promise { const stream = await client.getObject(bucket, key); return tryExtract ? stream.pipe(gunzip()).pipe(maybeTarball()) : stream.pipe(new PassThrough()); } diff --git a/frontend/server/workflow-helper.ts b/frontend/server/workflow-helper.ts index 2acefbf181f..d6ad124684b 100644 --- a/frontend/server/workflow-helper.ts +++ b/frontend/server/workflow-helper.ts @@ -138,7 +138,7 @@ export function createPodLogsMinioRequestConfig( // different bucket/prefix for diff namespace? return async (podName: string, _namespace?: string): Promise => { // create a new client each time to ensure session token has not expired - const client = await createMinioClient(minioOptions, "s3"); + const client = await createMinioClient(minioOptions, 's3'); const workflowName = workflowNameFromPodName(podName); return { bucket, @@ -190,14 +190,16 @@ export async function getPodLogsMinioRequestConfigfromWorkflow( const { host, port } = urlSplit(s3Artifact.endpoint, s3Artifact.insecure); const { accessKey, secretKey } = await getMinioClientSecrets(s3Artifact); - - const client = await createMinioClient({ - accessKey, - endPoint: host, - port, - secretKey, - useSSL: !s3Artifact.insecure, - }, "s3"); + const client = await createMinioClient( + { + accessKey, + endPoint: host, + port, + secretKey, + useSSL: !s3Artifact.insecure, + }, + 's3', + ); return { bucket: s3Artifact.bucket, client, diff --git a/frontend/src/components/ArtifactPreview.tsx b/frontend/src/components/ArtifactPreview.tsx index 43deb93338c..1799c1b9f4f 100644 --- a/frontend/src/components/ArtifactPreview.tsx +++ b/frontend/src/components/ArtifactPreview.tsx @@ -24,7 +24,7 @@ import { stylesheet } from 'typestyle'; import Banner from './Banner'; import { ValueComponentProps } from './DetailsTable'; import { logger } from 'src/lib/Utils'; -import { URIToSessionInfo } from "./tabs/InputOutputTab"; +import { URIToSessionInfo } from './tabs/InputOutputTab'; const css = stylesheet({ root: { @@ -66,12 +66,12 @@ const ArtifactPreview: React.FC = ({ maxbytes = 255, maxlines = 20, }) => { - let storage: StoragePath | undefined - let providerInfo: string | undefined + let storage: StoragePath | undefined; + let providerInfo: string | undefined; if (value) { try { - providerInfo = sessionMap?.get(value) + providerInfo = sessionMap?.get(value); storage = WorkflowParser.parseStoragePath(value); } catch (error) { logger.error(error); @@ -140,7 +140,12 @@ async function getPreview( return ``; } // TODO how to handle binary data (can probably use magic number to id common mime types) - let data = await Apis.readFile({path: storagePath, providerInfo: providerInfo, namespace: namespace, peek: maxbytes +1}); + let data = await Apis.readFile({ + path: storagePath, + providerInfo: providerInfo, + namespace: namespace, + peek: maxbytes + 1, + }); // is preview === data and no maxlines if (data.length <= maxbytes && (!maxlines || data.split('\n').length < maxlines)) { return data; diff --git a/frontend/src/components/MinioArtifactPreview.tsx b/frontend/src/components/MinioArtifactPreview.tsx index f27ccc9e9af..ac4e15f8677 100644 --- a/frontend/src/components/MinioArtifactPreview.tsx +++ b/frontend/src/components/MinioArtifactPreview.tsx @@ -78,7 +78,7 @@ async function getPreview( maxlines?: number, ): Promise<{ data: string; hasMore: boolean }> { // TODO how to handle binary data (can probably use magic number to id common mime types) - let data = await Apis.readFile({path: storagePath, namespace: namespace, peek: maxbytes +1}); + let data = await Apis.readFile({ path: storagePath, namespace: namespace, peek: maxbytes + 1 }); // is preview === data and no maxlines if (data.length <= maxbytes && !maxlines) { return { data, hasMore: false }; diff --git a/frontend/src/components/tabs/InputOutputTab.tsx b/frontend/src/components/tabs/InputOutputTab.tsx index c4e1b213fa0..f000f1a1f6e 100644 --- a/frontend/src/components/tabs/InputOutputTab.tsx +++ b/frontend/src/components/tabs/InputOutputTab.tsx @@ -42,7 +42,7 @@ export type ParamList = Array>; export type URIToSessionInfo = Map; export interface ArtifactParamsWithSessionInfo { params: ParamList; - sessionMap: URIToSessionInfo + sessionMap: URIToSessionInfo; } export interface ArtifactLocation { @@ -93,15 +93,15 @@ export function InputOutputTab({ execution, namespace }: IOTabProps) { ); } - let inputArtifacts : ParamList = [] - let outputArtifacts : ParamList = [] + let inputArtifacts: ParamList = []; + let outputArtifacts: ParamList = []; - if (inputArtifactsWithSessionInfo){ - inputArtifacts = inputArtifactsWithSessionInfo.params + if (inputArtifactsWithSessionInfo) { + inputArtifacts = inputArtifactsWithSessionInfo.params; } if (outputArtifactsWithSessionInfo) { - outputArtifacts = outputArtifactsWithSessionInfo.params + outputArtifacts = outputArtifactsWithSessionInfo.params; } let isIoEmpty = false; @@ -215,34 +215,33 @@ function extractParamFromExecution(execution: Execution, name: string): KeyValue export function getArtifactParamList( inputArtifacts: LinkedArtifact[], artifactTypeNames: string[], -): (ArtifactParamsWithSessionInfo) { - - let sessMap : URIToSessionInfo = new Map() +): ArtifactParamsWithSessionInfo { + let sessMap: URIToSessionInfo = new Map(); let params = Object.values(inputArtifacts).map((linkedArtifact, index) => { let key = getArtifactName(linkedArtifact); if ( - key && - (artifactTypeNames[index] === 'system.Metrics' || - artifactTypeNames[index] === 'system.ClassificationMetrics') + key && + (artifactTypeNames[index] === 'system.Metrics' || + artifactTypeNames[index] === 'system.ClassificationMetrics') ) { key += ' (This is an empty file by default)'; } const artifactId = linkedArtifact.artifact.getId(); const artifactElement = RoutePageFactory.artifactDetails(artifactId) ? ( - - {key} - + + {key} + ) : ( - key + key ); const uri = linkedArtifact.artifact.getUri(); const sessInfo = getStoreSessionInfoFromArtifact(linkedArtifact); - sessMap.set(uri, sessInfo) + sessMap.set(uri, sessInfo); return [artifactElement, uri]; }); - return {params: params, sessionMap: sessMap} + return { params: params, sessionMap: sessMap }; } diff --git a/frontend/src/components/tabs/RuntimeNodeDetailsV2.tsx b/frontend/src/components/tabs/RuntimeNodeDetailsV2.tsx index c9ada66e430..8a1cac7faa4 100644 --- a/frontend/src/components/tabs/RuntimeNodeDetailsV2.tsx +++ b/frontend/src/components/tabs/RuntimeNodeDetailsV2.tsx @@ -49,7 +49,7 @@ import { MetricsVisualizations } from 'src/components/viewers/MetricsVisualizati import { ArtifactTitle } from 'src/components/tabs/ArtifactTitle'; import InputOutputTab, { getArtifactParamList, - ParamList + ParamList, } from 'src/components/tabs/InputOutputTab'; import { convertYamlToPlatformSpec, convertYamlToV2PipelineSpec } from 'src/lib/v2/WorkflowUtils'; import { PlatformDeploymentConfig } from 'src/generated/pipeline_spec/pipeline_spec'; @@ -180,7 +180,7 @@ function TaskNodeDetail({ {selectedTab === 0 && (() => { if (execution) { - return ; + return ; } return NODE_STATE_UNAVAILABLE; })()} @@ -422,10 +422,10 @@ function ArtifactInfo({ ]; let artifactParamsWithSessionInfo = getArtifactParamList([linkedArtifact], artifactTypeName); - let artifactParams : ParamList = [] + let artifactParams: ParamList = []; - if (artifactParamsWithSessionInfo){ - artifactParams = artifactParamsWithSessionInfo.params + if (artifactParamsWithSessionInfo) { + artifactParams = artifactParamsWithSessionInfo.params; } return ( diff --git a/frontend/src/components/viewers/MetricsVisualizations.tsx b/frontend/src/components/viewers/MetricsVisualizations.tsx index 22ec593af02..f43992241c6 100644 --- a/frontend/src/components/viewers/MetricsVisualizations.tsx +++ b/frontend/src/components/viewers/MetricsVisualizations.tsx @@ -891,7 +891,7 @@ export async function getHtmlViewerConfig( const providerInfo = getStoreSessionInfoFromArtifact(linkedArtifact); // TODO(zijianjoy): Limit the size of HTML file fetching to prevent UI frozen. - let data = await Apis.readFile({path: storagePath, providerInfo, namespace: namespace}); + let data = await Apis.readFile({ path: storagePath, providerInfo, namespace: namespace }); return { htmlContent: data, type: PlotType.WEB_APP } as HTMLViewerConfig; }); return Promise.all(htmlViewerConfigs); @@ -919,7 +919,7 @@ export async function getMarkdownViewerConfig( const providerInfo = getStoreSessionInfoFromArtifact(linkedArtifact); // TODO(zijianjoy): Limit the size of Markdown file fetching to prevent UI frozen. - let data = await Apis.readFile({path: storagePath, providerInfo, namespace: namespace}); + let data = await Apis.readFile({ path: storagePath, providerInfo, namespace: namespace }); return { markdownContent: data, type: PlotType.MARKDOWN } as MarkdownViewerConfig; }); return Promise.all(markdownViewerConfigs); diff --git a/frontend/src/lib/Apis.test.ts b/frontend/src/lib/Apis.test.ts index ffb3fc57192..8c4c2184c52 100644 --- a/frontend/src/lib/Apis.test.ts +++ b/frontend/src/lib/Apis.test.ts @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -import {Apis} from './Apis'; -import {StorageService} from './WorkflowParser'; +import { Apis } from './Apis'; +import { StorageService } from './WorkflowParser'; const fetchSpy = (response: string) => { const spy = jest.fn(() => @@ -128,7 +128,7 @@ describe('Apis', () => { const spy = fetchSpy('file contents'); expect( await Apis.readFile({ - path: {source: StorageService.GCS, key:"testkey", bucket:'testbucket'}, + path: { source: StorageService.GCS, key: 'testkey', bucket: 'testbucket' }, }), ).toEqual('file contents'); expect(spy).toHaveBeenCalledWith('artifacts/get?source=gcs&bucket=testbucket&key=testkey', { diff --git a/frontend/src/lib/Apis.ts b/frontend/src/lib/Apis.ts index 497e9b75104..c87fa6d5078 100644 --- a/frontend/src/lib/Apis.ts +++ b/frontend/src/lib/Apis.ts @@ -265,13 +265,18 @@ export class Apis { /** * Reads file from storage using server. */ - public static readFile({path, providerInfo, namespace, peek} : { + public static readFile({ + path, + providerInfo, + namespace, + peek, + }: { path: StoragePath; namespace?: string; providerInfo?: string; peek?: number; }): Promise { - let query = this.buildReadFileUrl({ path, namespace, providerInfo, peek, isDownload: false }) + let query = this.buildReadFileUrl({ path, namespace, providerInfo, peek, isDownload: false }); return this._fetch(query); } diff --git a/frontend/src/lib/OutputArtifactLoader.test.ts b/frontend/src/lib/OutputArtifactLoader.test.ts index 100e9a940ef..819c8ee1051 100644 --- a/frontend/src/lib/OutputArtifactLoader.test.ts +++ b/frontend/src/lib/OutputArtifactLoader.test.ts @@ -88,7 +88,7 @@ describe('OutputArtifactLoader', () => { fileToRead = JSON.stringify({ outputs: [metadata] }); await OutputArtifactLoader.load(storagePath, 'ns1'); expect(readFileSpy).toHaveBeenCalledTimes(2); - expect(readFileSpy.mock.calls.map(([{path, namespace}]) => namespace)) + expect(readFileSpy.mock.calls.map(([{ path, namespace }]) => namespace)) .toMatchInlineSnapshot(` Array [ "ns1", diff --git a/frontend/src/lib/OutputArtifactLoader.ts b/frontend/src/lib/OutputArtifactLoader.ts index 8a23d71841a..fa1182cb4f5 100644 --- a/frontend/src/lib/OutputArtifactLoader.ts +++ b/frontend/src/lib/OutputArtifactLoader.ts @@ -58,7 +58,7 @@ export class OutputArtifactLoader { public static async load(outputPath: StoragePath, namespace?: string): Promise { let plotMetadataList: PlotMetadata[] = []; try { - const metadataFile = await Apis.readFile({path: outputPath, namespace: namespace}); + const metadataFile = await Apis.readFile({ path: outputPath, namespace: namespace }); if (metadataFile) { try { plotMetadataList = OutputArtifactLoader.parseOutputMetadataInJson( @@ -516,7 +516,10 @@ async function readSourceContent( if (storage === 'inline') { return source; } - return await Apis.readFile({path: WorkflowParser.parseStoragePath(source), namespace: namespace}); + return await Apis.readFile({ + path: WorkflowParser.parseStoragePath(source), + namespace: namespace, + }); } export const TEST_ONLY = { diff --git a/frontend/src/mlmd/MlmdUtils.ts b/frontend/src/mlmd/MlmdUtils.ts index a6614108eb0..32ae10fd1d6 100644 --- a/frontend/src/mlmd/MlmdUtils.ts +++ b/frontend/src/mlmd/MlmdUtils.ts @@ -287,8 +287,11 @@ export interface LinkedArtifact { artifact: Artifact; } -export function getStoreSessionInfoFromArtifact(artifact : LinkedArtifact) : string | undefined { - return artifact.artifact.getCustomPropertiesMap().get("store_session_info")?.getStringValue(); +export function getStoreSessionInfoFromArtifact(artifact: LinkedArtifact): string | undefined { + return artifact.artifact + .getCustomPropertiesMap() + .get('store_session_info') + ?.getStringValue(); } export async function getLinkedArtifactsByEvents(events: Event[]): Promise {