diff --git a/backend/src/agent/persistence/client/fake_namespace.go b/backend/src/agent/persistence/client/fake_namespace.go deleted file mode 100644 index bbc8c8e0224..00000000000 --- a/backend/src/agent/persistence/client/fake_namespace.go +++ /dev/null @@ -1,85 +0,0 @@ -package client - -import ( - "context" - "errors" - "github.com/golang/glog" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - types "k8s.io/apimachinery/pkg/types" - watch "k8s.io/apimachinery/pkg/watch" - corev1 "k8s.io/client-go/applyconfigurations/core/v1" -) - -type FakeNamespaceClient struct { - namespace string - user string -} - -func (f *FakeNamespaceClient) SetReturnValues(namespace string, user string) { - f.namespace = namespace - f.user = user -} - -func (f FakeNamespaceClient) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Namespace, error) { - if f.namespace == name && len(f.user) != 0 { - ns := v1.Namespace{ObjectMeta: metav1.ObjectMeta{ - Namespace: f.namespace, - Annotations: map[string]string{ - "owner": f.user, - }, - }} - return &ns, nil - } - return nil, errors.New("failed to get namespace") -} - -func (f FakeNamespaceClient) Create(ctx context.Context, namespace *v1.Namespace, opts metav1.CreateOptions) (*v1.Namespace, error) { - glog.Error("This fake method is not yet implemented.") - return nil, nil -} - -func (f FakeNamespaceClient) Update(ctx context.Context, namespace *v1.Namespace, opts metav1.UpdateOptions) (*v1.Namespace, error) { - glog.Error("This fake method is not yet implemented.") - return nil, nil -} - -func (f FakeNamespaceClient) UpdateStatus(ctx context.Context, namespace *v1.Namespace, opts metav1.UpdateOptions) (*v1.Namespace, error) { - glog.Error("This fake method is not yet implemented.") - return nil, nil -} - -func (f FakeNamespaceClient) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { - glog.Error("This fake method is not yet implemented.") - return nil -} - -func (f FakeNamespaceClient) List(ctx context.Context, opts metav1.ListOptions) (*v1.NamespaceList, error) { - glog.Error("This fake method is not yet implemented.") - return nil, nil -} - -func (f FakeNamespaceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - glog.Error("This fake method is not yet implemented.") - return nil, nil -} - -func (f FakeNamespaceClient) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Namespace, err error) { - glog.Error("This fake method is not yet implemented.") - return nil, nil -} - -func (f FakeNamespaceClient) Apply(ctx context.Context, namespace *corev1.NamespaceApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Namespace, err error) { - glog.Error("This fake method is not yet implemented.") - return nil, nil -} - -func (f FakeNamespaceClient) ApplyStatus(ctx context.Context, namespace *corev1.NamespaceApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Namespace, err error) { - glog.Error("This fake method is not yet implemented.") - return nil, nil -} - -func (f FakeNamespaceClient) Finalize(ctx context.Context, item *v1.Namespace, opts metav1.UpdateOptions) (*v1.Namespace, error) { - glog.Error("This fake method is not yet implemented.") - return nil, nil -} diff --git a/backend/src/agent/persistence/client/kubernetes_core.go b/backend/src/agent/persistence/client/kubernetes_core.go deleted file mode 100644 index 25605ba88a4..00000000000 --- a/backend/src/agent/persistence/client/kubernetes_core.go +++ /dev/null @@ -1,87 +0,0 @@ -package client - -import ( - "context" - "fmt" - "os" - "time" - - "github.com/cenkalti/backoff" - "github.com/golang/glog" - "github.com/pkg/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - v1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" - - "github.com/kubeflow/pipelines/backend/src/common/util" -) - -type KubernetesCoreInterface interface { - NamespaceClient() v1.NamespaceInterface - GetNamespaceOwner(namespace string) (string, error) -} - -type KubernetesCore struct { - coreV1Client v1.CoreV1Interface -} - -func (c *KubernetesCore) NamespaceClient() v1.NamespaceInterface { - return c.coreV1Client.Namespaces() -} - -func (c *KubernetesCore) GetNamespaceOwner(namespace string) (string, error) { - if os.Getenv("MULTIUSER") == "" || os.Getenv("MULTIUSER") == "false" { - return "", nil - } - ns, err := c.NamespaceClient().Get(context.Background(), namespace, metav1.GetOptions{}) - if err != nil { - return "", errors.Wrapf(err, "failed to get namespace '%v'", namespace) - } - owner, ok := ns.Annotations["owner"] - if !ok { - return "", errors.New(fmt.Sprintf("namespace '%v' has no owner in the annotations", namespace)) - } - return owner, nil -} - -func createKubernetesCore(clientParams util.ClientParameters) (KubernetesCoreInterface, error) { - clientSet, err := getKubernetesClientset(clientParams) - if err != nil { - return nil, err - } - return &KubernetesCore{clientSet.CoreV1()}, nil -} - -// CreateKubernetesCoreOrFatal creates a new client for the Kubernetes pod. -func CreateKubernetesCoreOrFatal(initConnectionTimeout time.Duration, clientParams util.ClientParameters) KubernetesCoreInterface { - var client KubernetesCoreInterface - var err error - var operation = func() error { - client, err = createKubernetesCore(clientParams) - return err - } - b := backoff.NewExponentialBackOff() - b.MaxElapsedTime = initConnectionTimeout - err = backoff.Retry(operation, b) - - if err != nil { - glog.Fatalf("Failed to create namespace client. Error: %v", err) - } - return client -} - -func getKubernetesClientset(clientParams util.ClientParameters) (*kubernetes.Clientset, error) { - restConfig, err := rest.InClusterConfig() - if err != nil { - return nil, errors.Wrap(err, "Failed to initialize kubernetes client.") - } - restConfig.QPS = float32(clientParams.QPS) - restConfig.Burst = clientParams.Burst - - clientSet, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return nil, errors.Wrap(err, "Failed to initialize kubernetes client set.") - } - return clientSet, nil -} diff --git a/backend/src/agent/persistence/client/kubernetes_core_fake.go b/backend/src/agent/persistence/client/kubernetes_core_fake.go deleted file mode 100644 index 73fa0e34fef..00000000000 --- a/backend/src/agent/persistence/client/kubernetes_core_fake.go +++ /dev/null @@ -1,37 +0,0 @@ -package client - -import ( - "context" - "errors" - "fmt" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1 "k8s.io/client-go/kubernetes/typed/core/v1" -) - -type KubernetesCoreFake struct { - coreV1ClientFake *FakeNamespaceClient -} - -func (c *KubernetesCoreFake) NamespaceClient() v1.NamespaceInterface { - return c.coreV1ClientFake -} - -func (c *KubernetesCoreFake) GetNamespaceOwner(namespace string) (string, error) { - ns, err := c.NamespaceClient().Get(context.Background(), namespace, metav1.GetOptions{}) - if err != nil { - return "", err - } - owner, ok := ns.Annotations["owner"] - if !ok { - return "", errors.New(fmt.Sprintf("namespace '%v' has no owner in the annotations", namespace)) - } - return owner, nil -} - -func NewKubernetesCoreFake() *KubernetesCoreFake { - return &KubernetesCoreFake{&FakeNamespaceClient{}} -} -func (c *KubernetesCoreFake) Set(namespaceToReturn string, userToReturn string) { - c.coreV1ClientFake.SetReturnValues(namespaceToReturn, userToReturn) -} diff --git a/backend/src/agent/persistence/client/pipeline_client.go b/backend/src/agent/persistence/client/pipeline_client.go index e1725cc20ca..25359933615 100644 --- a/backend/src/agent/persistence/client/pipeline_client.go +++ b/backend/src/agent/persistence/client/pipeline_client.go @@ -17,11 +17,9 @@ package client import ( "context" "fmt" - "os" "strings" "time" - "github.com/kubeflow/pipelines/backend/src/apiserver/common" "google.golang.org/grpc/metadata" api "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client" @@ -38,8 +36,8 @@ const ( type PipelineClientInterface interface { ReportWorkflow(workflow util.ExecutionSpec) error ReportScheduledWorkflow(swf *util.ScheduledWorkflow) error - ReadArtifact(request *api.ReadArtifactRequest, user string) (*api.ReadArtifactResponse, error) - ReportRunMetrics(request *api.ReportRunMetricsRequest, user string) (*api.ReportRunMetricsResponse, error) + ReadArtifact(request *api.ReadArtifactRequest) (*api.ReadArtifactResponse, error) + ReportRunMetrics(request *api.ReportRunMetricsRequest) (*api.ReportRunMetricsResponse, error) } type PipelineClient struct { @@ -173,17 +171,26 @@ func (p *PipelineClient) ReportScheduledWorkflow(swf *util.ScheduledWorkflow) er // ReadArtifact reads artifact content from run service. If the artifact is not present, returns // nil response. -func (p *PipelineClient) ReadArtifact(request *api.ReadArtifactRequest, user string) (*api.ReadArtifactResponse, error) { +func (p *PipelineClient) ReadArtifact(request *api.ReadArtifactRequest) (*api.ReadArtifactResponse, error) { pctx := context.Background() - if user != "" { - pctx = metadata.AppendToOutgoingContext(pctx, getKubeflowUserIDHeader(), - getKubeflowUserIDPrefix()+user) - } + pctx = metadata.AppendToOutgoingContext(pctx, "Authorization", + "Bearer "+p.tokenRefresher.GetToken()) + ctx, cancel := context.WithTimeout(pctx, time.Minute) defer cancel() response, err := p.runServiceClient.ReadArtifactV1(ctx, request) if err != nil { + statusCode, _ := status.FromError(err) + if statusCode.Code() == codes.Unauthenticated && strings.Contains(err.Error(), "service account token has expired") { + // If unauthenticated because SA token is expired, re-read/refresh the token and try again + p.tokenRefresher.RefreshToken() + return nil, util.NewCustomError(err, util.CUSTOM_CODE_TRANSIENT, + "Error while reporting workflow resource (code: %v, message: %v): %v", + statusCode.Code(), + statusCode.Message(), + err.Error()) + } // TODO(hongyes): check NotFound error code before skip the error. return nil, nil } @@ -192,17 +199,26 @@ func (p *PipelineClient) ReadArtifact(request *api.ReadArtifactRequest, user str } // ReportRunMetrics reports run metrics to run service. -func (p *PipelineClient) ReportRunMetrics(request *api.ReportRunMetricsRequest, user string) (*api.ReportRunMetricsResponse, error) { +func (p *PipelineClient) ReportRunMetrics(request *api.ReportRunMetricsRequest) (*api.ReportRunMetricsResponse, error) { pctx := context.Background() - if user != "" { - pctx = metadata.AppendToOutgoingContext(pctx, getKubeflowUserIDHeader(), - getKubeflowUserIDPrefix()+user) - } + pctx = metadata.AppendToOutgoingContext(pctx, "Authorization", + "Bearer "+p.tokenRefresher.GetToken()) + ctx, cancel := context.WithTimeout(pctx, time.Minute) defer cancel() response, err := p.runServiceClient.ReportRunMetricsV1(ctx, request) if err != nil { + statusCode, _ := status.FromError(err) + if statusCode.Code() == codes.Unauthenticated && strings.Contains(err.Error(), "service account token has expired") { + // If unauthenticated because SA token is expired, re-read/refresh the token and try again + p.tokenRefresher.RefreshToken() + return nil, util.NewCustomError(err, util.CUSTOM_CODE_TRANSIENT, + "Error while reporting workflow resource (code: %v, message: %v): %v", + statusCode.Code(), + statusCode.Message(), + err.Error()) + } // This call should always succeed unless the run doesn't exist or server is broken. In // either cases, the job should retry at a later time. return nil, util.NewCustomError(err, util.CUSTOM_CODE_TRANSIENT, @@ -210,19 +226,3 @@ func (p *PipelineClient) ReportRunMetrics(request *api.ReportRunMetricsRequest, } return response, nil } - -// TODO use config file & viper and "github.com/kubeflow/pipelines/backend/src/apiserver/common.GetKubeflowUserIDHeader()" -func getKubeflowUserIDHeader() string { - if value, ok := os.LookupEnv(common.KubeflowUserIDHeader); ok { - return value - } - return common.GoogleIAPUserIdentityHeader -} - -// TODO use of viper & viper and "github.com/kubeflow/pipelines/backend/src/apiserver/common.GetKubeflowUserIDPrefix()" -func getKubeflowUserIDPrefix() string { - if value, ok := os.LookupEnv(common.KubeflowUserIDPrefix); ok { - return value - } - return common.GoogleIAPUserIdentityPrefix -} diff --git a/backend/src/agent/persistence/client/pipeline_client_fake.go b/backend/src/agent/persistence/client/pipeline_client_fake.go index 6b1ff3a03e0..42e9bce25ba 100644 --- a/backend/src/agent/persistence/client/pipeline_client_fake.go +++ b/backend/src/agent/persistence/client/pipeline_client_fake.go @@ -57,7 +57,7 @@ func (p *PipelineClientFake) ReportScheduledWorkflow(swf *util.ScheduledWorkflow return nil } -func (p *PipelineClientFake) ReadArtifact(request *api.ReadArtifactRequest, user string) (*api.ReadArtifactResponse, error) { +func (p *PipelineClientFake) ReadArtifact(request *api.ReadArtifactRequest) (*api.ReadArtifactResponse, error) { if p.err != nil { return nil, p.err } @@ -65,7 +65,7 @@ func (p *PipelineClientFake) ReadArtifact(request *api.ReadArtifactRequest, user return p.artifacts[request.String()], nil } -func (p *PipelineClientFake) ReportRunMetrics(request *api.ReportRunMetricsRequest, user string) (*api.ReportRunMetricsResponse, error) { +func (p *PipelineClientFake) ReportRunMetrics(request *api.ReportRunMetricsRequest) (*api.ReportRunMetricsResponse, error) { p.reportedMetricsRequest = request return p.reportMetricsResponseStub, p.reportMetricsErrorStub } diff --git a/backend/src/agent/persistence/main.go b/backend/src/agent/persistence/main.go index f8c26da385e..4da32a7095e 100644 --- a/backend/src/agent/persistence/main.go +++ b/backend/src/agent/persistence/main.go @@ -95,10 +95,6 @@ func main() { } else { swfInformerFactory = swfinformers.NewFilteredSharedInformerFactory(swfClient, time.Second*30, namespace, nil) } - k8sCoreClient := client.CreateKubernetesCoreOrFatal(DefaultConnectionTimeout, util.ClientParameters{ - QPS: clientQPS, - Burst: clientBurst, - }) tokenRefresher := client.NewTokenRefresher(time.Duration(saTokenRefreshIntervalInSecs)*time.Second, nil) err = tokenRefresher.StartTokenRefreshTicker() @@ -122,7 +118,6 @@ func main() { swfInformerFactory, execInformer, pipelineClient, - k8sCoreClient, util.NewRealTime()) go swfInformerFactory.Start(stopCh) diff --git a/backend/src/agent/persistence/persistence_agent.go b/backend/src/agent/persistence/persistence_agent.go index d234df09bf1..d280b74dbd6 100644 --- a/backend/src/agent/persistence/persistence_agent.go +++ b/backend/src/agent/persistence/persistence_agent.go @@ -46,7 +46,6 @@ func NewPersistenceAgent( swfInformerFactory swfinformers.SharedInformerFactory, execInformer util.ExecutionInformer, pipelineClient *client.PipelineClient, - k8sCoreClient client.KubernetesCoreInterface, time util.TimeInterface) *PersistenceAgent { // obtain references to shared informers swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows() @@ -63,7 +62,7 @@ func NewPersistenceAgent( workflowWorker := worker.NewPersistenceWorker(time, workflowregister.WorkflowKind, execInformer, true, - worker.NewWorkflowSaver(workflowClient, pipelineClient, k8sCoreClient, ttlSecondsAfterWorkflowFinish)) + worker.NewWorkflowSaver(workflowClient, pipelineClient, ttlSecondsAfterWorkflowFinish)) agent := &PersistenceAgent{ swfClient: swfClient, diff --git a/backend/src/agent/persistence/worker/metrics_reporter.go b/backend/src/agent/persistence/worker/metrics_reporter.go index 021ff970f5c..c7a708cbf09 100644 --- a/backend/src/agent/persistence/worker/metrics_reporter.go +++ b/backend/src/agent/persistence/worker/metrics_reporter.go @@ -42,7 +42,7 @@ func NewMetricsReporter(pipelineClient client.PipelineClientInterface) *MetricsR } // ReportMetrics reports workflow metrics to pipeline server. -func (r MetricsReporter) ReportMetrics(workflow util.ExecutionSpec, user string) error { +func (r MetricsReporter) ReportMetrics(workflow util.ExecutionSpec) error { if !workflow.ExecutionStatus().HasMetrics() { return nil } @@ -52,14 +52,14 @@ func (r MetricsReporter) ReportMetrics(workflow util.ExecutionSpec, user string) // Skip reporting if the workflow doesn't have the run id label return nil } - runMetrics, partialFailures := workflow.ExecutionStatus().CollectionMetrics(r.pipelineClient.ReadArtifact, user) + runMetrics, partialFailures := workflow.ExecutionStatus().CollectionMetrics(r.pipelineClient.ReadArtifact) if len(runMetrics) == 0 { return aggregateErrors(partialFailures) } reportMetricsResponse, err := r.pipelineClient.ReportRunMetrics(&api.ReportRunMetricsRequest{ RunId: runID, Metrics: runMetrics, - }, user) + }) if err != nil { return err } diff --git a/backend/src/agent/persistence/worker/metrics_reporter_test.go b/backend/src/agent/persistence/worker/metrics_reporter_test.go index 7fa3ba000dd..c2b43faf2c9 100644 --- a/backend/src/agent/persistence/worker/metrics_reporter_test.go +++ b/backend/src/agent/persistence/worker/metrics_reporter_test.go @@ -32,11 +32,6 @@ import ( "k8s.io/apimachinery/pkg/types" ) -const ( - NamespaceName = "kf-namespace" - USER = "test-user@example.com" -) - func TestReportMetrics_NoCompletedNode_NoOP(t *testing.T) { pipelineFake := client.NewPipelineClientFake() @@ -57,7 +52,7 @@ func TestReportMetrics_NoCompletedNode_NoOP(t *testing.T) { }, }, }) - err := reporter.ReportMetrics(workflow, USER) + err := reporter.ReportMetrics(workflow) assert.Nil(t, err) assert.Nil(t, pipelineFake.GetReportedMetricsRequest()) } @@ -82,7 +77,7 @@ func TestReportMetrics_NoRunID_NoOP(t *testing.T) { }, }, }) - err := reporter.ReportMetrics(workflow, USER) + err := reporter.ReportMetrics(workflow) assert.Nil(t, err) assert.Nil(t, pipelineFake.GetReadArtifactRequest()) assert.Nil(t, pipelineFake.GetReportedMetricsRequest()) @@ -109,7 +104,7 @@ func TestReportMetrics_NoArtifact_NoOP(t *testing.T) { }, }, }) - err := reporter.ReportMetrics(workflow, USER) + err := reporter.ReportMetrics(workflow) assert.Nil(t, err) assert.Nil(t, pipelineFake.GetReadArtifactRequest()) assert.Nil(t, pipelineFake.GetReportedMetricsRequest()) @@ -139,7 +134,7 @@ func TestReportMetrics_NoMetricsArtifact_NoOP(t *testing.T) { }, }, }) - err := reporter.ReportMetrics(workflow, USER) + err := reporter.ReportMetrics(workflow) assert.Nil(t, err) assert.Nil(t, pipelineFake.GetReadArtifactRequest()) assert.Nil(t, pipelineFake.GetReportedMetricsRequest()) @@ -182,7 +177,7 @@ func TestReportMetrics_Succeed(t *testing.T) { Results: []*api.ReportRunMetricsResponse_ReportRunMetricResult{}, }, nil) - err1 := reporter.ReportMetrics(workflow, USER) + err1 := reporter.ReportMetrics(workflow) assert.Nil(t, err1) expectedMetricsRequest := &api.ReportRunMetricsRequest{ @@ -241,7 +236,7 @@ func TestReportMetrics_EmptyArchive_Fail(t *testing.T) { Data: []byte(artifactData), }) - err := reporter.ReportMetrics(workflow, USER) + err := reporter.ReportMetrics(workflow) assert.NotNil(t, err) assert.True(t, util.HasCustomCode(err, util.CUSTOM_CODE_PERMANENT)) @@ -284,7 +279,7 @@ func TestReportMetrics_MultipleFilesInArchive_Fail(t *testing.T) { Data: []byte(artifactData), }) - err := reporter.ReportMetrics(workflow, USER) + err := reporter.ReportMetrics(workflow) assert.NotNil(t, err) assert.True(t, util.HasCustomCode(err, util.CUSTOM_CODE_PERMANENT)) @@ -326,7 +321,7 @@ func TestReportMetrics_InvalidMetricsJSON_Fail(t *testing.T) { Data: []byte(artifactData), }) - err := reporter.ReportMetrics(workflow, USER) + err := reporter.ReportMetrics(workflow) assert.NotNil(t, err) assert.True(t, util.HasCustomCode(err, util.CUSTOM_CODE_PERMANENT)) @@ -387,7 +382,7 @@ func TestReportMetrics_InvalidMetricsJSON_PartialFail(t *testing.T) { Data: []byte(validArtifactData), }) - err := reporter.ReportMetrics(workflow, USER) + err := reporter.ReportMetrics(workflow) // Partial failure is reported while valid metrics are reported. assert.NotNil(t, err) @@ -447,7 +442,7 @@ func TestReportMetrics_CorruptedArchiveFile_Fail(t *testing.T) { Data: []byte("invalid tgz content"), }) - err := reporter.ReportMetrics(workflow, USER) + err := reporter.ReportMetrics(workflow) assert.NotNil(t, err) assert.True(t, util.HasCustomCode(err, util.CUSTOM_CODE_PERMANENT)) @@ -511,7 +506,7 @@ func TestReportMetrics_MultiplMetricErrors_TransientErrowWin(t *testing.T) { }, }, nil) - err := reporter.ReportMetrics(workflow, USER) + err := reporter.ReportMetrics(workflow) assert.NotNil(t, err) assert.True(t, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT)) @@ -520,8 +515,6 @@ func TestReportMetrics_MultiplMetricErrors_TransientErrowWin(t *testing.T) { func TestReportMetrics_Unauthorized(t *testing.T) { pipelineFake := client.NewPipelineClientFake() reporter := NewMetricsReporter(pipelineFake) - k8sFake := client.NewKubernetesCoreFake() - k8sFake.Set(NamespaceName, USER) workflow := util.NewWorkflow(&workflowapi.Workflow{ ObjectMeta: metav1.ObjectMeta{ @@ -557,7 +550,7 @@ func TestReportMetrics_Unauthorized(t *testing.T) { Results: []*api.ReportRunMetricsResponse_ReportRunMetricResult{}, }, errors.New("failed to read artifacts")) - err1 := reporter.ReportMetrics(workflow, USER) + err1 := reporter.ReportMetrics(workflow) assert.NotNil(t, err1) assert.Contains(t, err1.Error(), "failed to read artifacts") diff --git a/backend/src/agent/persistence/worker/persistence_worker_test.go b/backend/src/agent/persistence/worker/persistence_worker_test.go index e29226d1407..bde3ef7e4e6 100644 --- a/backend/src/agent/persistence/worker/persistence_worker_test.go +++ b/backend/src/agent/persistence/worker/persistence_worker_test.go @@ -53,11 +53,9 @@ func TestPersistenceWorker_Success(t *testing.T) { // Set up pipeline client pipelineClient := client.NewPipelineClientFake() - k8sClient := client.NewKubernetesCoreFake() - k8sClient.Set("MY_NAMESPACE", USER) // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient, k8sClient, 100) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), @@ -83,12 +81,11 @@ func TestPersistenceWorker_NotFoundError(t *testing.T) { }) workflowClient := client.NewWorkflowClientFake() - // Set up pipeline client and kubernetes client + // Set up pipeline client pipelineClient := client.NewPipelineClientFake() - k8sClient := client.NewKubernetesCoreFake() // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient, k8sClient, 100) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), @@ -115,12 +112,11 @@ func TestPersistenceWorker_GetWorklowError(t *testing.T) { workflowClient := client.NewWorkflowClientFake() workflowClient.Put("MY_NAMESPACE", "MY_NAME", nil) - // Set up pipeline client and kubernetes client + // Set up pipeline client pipelineClient := client.NewPipelineClientFake() - k8sClient := client.NewKubernetesCoreFake() // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient, k8sClient, 100) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), @@ -152,12 +148,9 @@ func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) { pipelineClient := client.NewPipelineClientFake() pipelineClient.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_TRANSIENT, "My Retriable Error")) - //Set up kubernetes client - k8sClient := client.NewKubernetesCoreFake() - k8sClient.Set("MY_NAMESPACE", USER) // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient, k8sClient, 100) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), @@ -188,11 +181,9 @@ func TestPersistenceWorker_ReportWorkflowNonRetryableError(t *testing.T) { pipelineClient := client.NewPipelineClientFake() pipelineClient.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, "My Permanent Error")) - // Set up kubernetes client - k8sClient := client.NewKubernetesCoreFake() // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient, k8sClient, 100) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), diff --git a/backend/src/agent/persistence/worker/workflow_saver.go b/backend/src/agent/persistence/worker/workflow_saver.go index 3b874273f11..5e93a60bb10 100644 --- a/backend/src/agent/persistence/worker/workflow_saver.go +++ b/backend/src/agent/persistence/worker/workflow_saver.go @@ -27,17 +27,15 @@ import ( type WorkflowSaver struct { client client.WorkflowClientInterface pipelineClient client.PipelineClientInterface - k8sClient client.KubernetesCoreInterface metricsReporter *MetricsReporter ttlSecondsAfterWorkflowFinish int64 } func NewWorkflowSaver(client client.WorkflowClientInterface, - pipelineClient client.PipelineClientInterface, k8sClient client.KubernetesCoreInterface, ttlSecondsAfterWorkflowFinish int64) *WorkflowSaver { + pipelineClient client.PipelineClientInterface, ttlSecondsAfterWorkflowFinish int64) *WorkflowSaver { return &WorkflowSaver{ client: client, pipelineClient: pipelineClient, - k8sClient: k8sClient, metricsReporter: NewMetricsReporter(pipelineClient), ttlSecondsAfterWorkflowFinish: ttlSecondsAfterWorkflowFinish, } @@ -70,11 +68,6 @@ func (s *WorkflowSaver) Save(key string, namespace string, name string, nowEpoch return nil } - user, err1 := s.k8sClient.GetNamespaceOwner(namespace) - if err1 != nil { - return util.Wrapf(err1, "Failed get '%v' namespace", namespace) - } - // Save this Workflow to the database. err = s.pipelineClient.ReportWorkflow(wf) retry := util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT) @@ -94,5 +87,5 @@ func (s *WorkflowSaver) Save(key string, namespace string, name string, nowEpoch log.WithFields(log.Fields{ "Workflow": name, }).Infof("Syncing Workflow (%v): success, processing complete.", name) - return s.metricsReporter.ReportMetrics(wf, user) + return s.metricsReporter.ReportMetrics(wf) } diff --git a/backend/src/agent/persistence/worker/workflow_saver_test.go b/backend/src/agent/persistence/worker/workflow_saver_test.go index 10a16b7ccda..358f36600c5 100644 --- a/backend/src/agent/persistence/worker/workflow_saver_test.go +++ b/backend/src/agent/persistence/worker/workflow_saver_test.go @@ -30,8 +30,6 @@ import ( func TestWorkflow_Save_Success(t *testing.T) { workflowFake := client.NewWorkflowClientFake() pipelineFake := client.NewPipelineClientFake() - k8sClient := client.NewKubernetesCoreFake() - k8sClient.Set("MY_NAMESPACE", USER) workflow := util.NewWorkflow(&workflowapi.Workflow{ ObjectMeta: metav1.ObjectMeta{ @@ -43,7 +41,7 @@ func TestWorkflow_Save_Success(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - saver := NewWorkflowSaver(workflowFake, pipelineFake, k8sClient, 100) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -54,10 +52,8 @@ func TestWorkflow_Save_Success(t *testing.T) { func TestWorkflow_Save_NotFoundDuringGet(t *testing.T) { workflowFake := client.NewWorkflowClientFake() pipelineFake := client.NewPipelineClientFake() - k8sClient := client.NewKubernetesCoreFake() - k8sClient.Set("MY_NAMESPACE", USER) - saver := NewWorkflowSaver(workflowFake, pipelineFake, k8sClient, 100) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -69,12 +65,10 @@ func TestWorkflow_Save_NotFoundDuringGet(t *testing.T) { func TestWorkflow_Save_ErrorDuringGet(t *testing.T) { workflowFake := client.NewWorkflowClientFake() pipelineFake := client.NewPipelineClientFake() - k8sClient := client.NewKubernetesCoreFake() - k8sClient.Set("MY_NAMESPACE", USER) workflowFake.Put("MY_NAMESPACE", "MY_NAME", nil) - saver := NewWorkflowSaver(workflowFake, pipelineFake, k8sClient, 100) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -86,8 +80,6 @@ func TestWorkflow_Save_ErrorDuringGet(t *testing.T) { func TestWorkflow_Save_PermanentFailureWhileReporting(t *testing.T) { workflowFake := client.NewWorkflowClientFake() pipelineFake := client.NewPipelineClientFake() - k8sClient := client.NewKubernetesCoreFake() - k8sClient.Set("MY_NAMESPACE", USER) pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, "My Permanent Error")) @@ -102,7 +94,7 @@ func TestWorkflow_Save_PermanentFailureWhileReporting(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - saver := NewWorkflowSaver(workflowFake, pipelineFake, k8sClient, 100) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -114,8 +106,6 @@ func TestWorkflow_Save_PermanentFailureWhileReporting(t *testing.T) { func TestWorkflow_Save_TransientFailureWhileReporting(t *testing.T) { workflowFake := client.NewWorkflowClientFake() pipelineFake := client.NewPipelineClientFake() - k8sClient := client.NewKubernetesCoreFake() - k8sClient.Set("MY_NAMESPACE", USER) pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_TRANSIENT, "My Transient Error")) @@ -130,7 +120,7 @@ func TestWorkflow_Save_TransientFailureWhileReporting(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - saver := NewWorkflowSaver(workflowFake, pipelineFake, k8sClient, 100) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -142,7 +132,6 @@ func TestWorkflow_Save_TransientFailureWhileReporting(t *testing.T) { func TestWorkflow_Save_SkippedDueToFinalStatue(t *testing.T) { workflowFake := client.NewWorkflowClientFake() pipelineFake := client.NewPipelineClientFake() - k8sClient := client.NewKubernetesCoreFake() // Add this will result in failure unless reporting is skipped pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, @@ -161,7 +150,7 @@ func TestWorkflow_Save_SkippedDueToFinalStatue(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - saver := NewWorkflowSaver(workflowFake, pipelineFake, k8sClient, 100) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -172,8 +161,6 @@ func TestWorkflow_Save_SkippedDueToFinalStatue(t *testing.T) { func TestWorkflow_Save_FinalStatueNotSkippedDueToExceedTTL(t *testing.T) { workflowFake := client.NewWorkflowClientFake() pipelineFake := client.NewPipelineClientFake() - k8sClient := client.NewKubernetesCoreFake() - k8sClient.Set("MY_NAMESPACE", USER) // Add this will result in failure unless reporting is skipped pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, @@ -195,7 +182,7 @@ func TestWorkflow_Save_FinalStatueNotSkippedDueToExceedTTL(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - saver := NewWorkflowSaver(workflowFake, pipelineFake, k8sClient, 1) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 1) // Sleep 2 seconds to make sure workflow passed TTL time.Sleep(2 * time.Second) @@ -210,7 +197,6 @@ func TestWorkflow_Save_FinalStatueNotSkippedDueToExceedTTL(t *testing.T) { func TestWorkflow_Save_SkippedDDueToMissingRunID(t *testing.T) { workflowFake := client.NewWorkflowClientFake() pipelineFake := client.NewPipelineClientFake() - k8sClient := client.NewKubernetesCoreFake() // Add this will result in failure unless reporting is skipped pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, @@ -225,33 +211,10 @@ func TestWorkflow_Save_SkippedDDueToMissingRunID(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - saver := NewWorkflowSaver(workflowFake, pipelineFake, k8sClient, 100) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) assert.Equal(t, false, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT)) assert.Equal(t, nil, err) } - -func TestWorkflow_Save_FailedToGetUser(t *testing.T) { - workflowFake := client.NewWorkflowClientFake() - pipelineFake := client.NewPipelineClientFake() - k8sClient := client.NewKubernetesCoreFake() - k8sClient.Set("ORIGINAL_NAMESPACE", USER) - - workflow := util.NewWorkflow(&workflowapi.Workflow{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "MY_NAMESPACE", - Name: "MY_NAME", - Labels: map[string]string{util.LabelKeyWorkflowRunId: "MY_UUID"}, - }, - }) - - workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - - saver := NewWorkflowSaver(workflowFake, pipelineFake, k8sClient, 100) - - err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) - assert.NotNil(t, err) - assert.Contains(t, err.Error(), fmt.Sprintf("Failed get '%v' namespace", "MY_NAMESPACE")) -} diff --git a/backend/src/common/util/execution_status.go b/backend/src/common/util/execution_status.go index 7eff8a20645..6831e141f49 100644 --- a/backend/src/common/util/execution_status.go +++ b/backend/src/common/util/execution_status.go @@ -31,7 +31,7 @@ type NodeStatus struct { Children []string } -type RetrieveArtifact func(request *api.ReadArtifactRequest, user string) (*api.ReadArtifactResponse, error) +type RetrieveArtifact func(request *api.ReadArtifactRequest) (*api.ReadArtifactResponse, error) // Abstract interface to encapsulate the resources of the execution runtime specifically // for status information. This interface is mainly to access the status related information @@ -61,7 +61,7 @@ type ExecutionStatus interface { // This function was in metrics_reporter.go. Moved to here because it // accesses the orchestration engine specific data struct. encapsulate the // specific data struct and provide a abstract function here. - CollectionMetrics(retrieveArtifact RetrieveArtifact, user string) ([]*api.RunMetric, []error) + CollectionMetrics(retrieveArtifact RetrieveArtifact) ([]*api.RunMetric, []error) // does ExecutionStatus contain any finished node or not HasMetrics() bool diff --git a/backend/src/common/util/workflow.go b/backend/src/common/util/workflow.go index 821f69df5de..64d38dcb456 100644 --- a/backend/src/common/util/workflow.go +++ b/backend/src/common/util/workflow.go @@ -436,12 +436,12 @@ const ( maxMetricsCountLimit = 50 ) -func (w *Workflow) CollectionMetrics(retrieveArtifact RetrieveArtifact, user string) ([]*api.RunMetric, []error) { +func (w *Workflow) CollectionMetrics(retrieveArtifact RetrieveArtifact) ([]*api.RunMetric, []error) { runID := w.Labels[LabelKeyWorkflowRunId] runMetrics := make([]*api.RunMetric, 0, len(w.Status.Nodes)) partialFailures := make([]error, 0, len(w.Status.Nodes)) for _, nodeStatus := range w.Status.Nodes { - nodeMetrics, err := collectNodeMetricsOrNil(runID, &nodeStatus, retrieveArtifact, user) + nodeMetrics, err := collectNodeMetricsOrNil(runID, &nodeStatus, retrieveArtifact) if err != nil { partialFailures = append(partialFailures, err) continue @@ -460,13 +460,13 @@ func (w *Workflow) CollectionMetrics(retrieveArtifact RetrieveArtifact, user str return runMetrics, partialFailures } -func collectNodeMetricsOrNil(runID string, nodeStatus *workflowapi.NodeStatus, retrieveArtifact RetrieveArtifact, user string) ( +func collectNodeMetricsOrNil(runID string, nodeStatus *workflowapi.NodeStatus, retrieveArtifact RetrieveArtifact) ( []*api.RunMetric, error, ) { if !nodeStatus.Completed() { return nil, nil } - metricsJSON, err := readNodeMetricsJSONOrEmpty(runID, nodeStatus, retrieveArtifact, user) + metricsJSON, err := readNodeMetricsJSONOrEmpty(runID, nodeStatus, retrieveArtifact) if err != nil || metricsJSON == "" { return nil, err } @@ -499,7 +499,7 @@ func collectNodeMetricsOrNil(runID string, nodeStatus *workflowapi.NodeStatus, r } func readNodeMetricsJSONOrEmpty(runID string, nodeStatus *workflowapi.NodeStatus, - retrieveArtifact RetrieveArtifact, user string, + retrieveArtifact RetrieveArtifact, ) (string, error) { if nodeStatus.Outputs == nil || nodeStatus.Outputs.Artifacts == nil { return "", nil // No output artifacts, skip the reporting @@ -520,7 +520,7 @@ func readNodeMetricsJSONOrEmpty(runID string, nodeStatus *workflowapi.NodeStatus NodeId: nodeStatus.ID, ArtifactName: metricsArtifactName, } - artifactResponse, err := retrieveArtifact(artifactRequest, user) + artifactResponse, err := retrieveArtifact(artifactRequest) if err != nil { return "", err } diff --git a/backend/third_party_licenses/persistence_agent.csv b/backend/third_party_licenses/persistence_agent.csv index 102c483cbd3..31defe0c67b 100644 --- a/backend/third_party_licenses/persistence_agent.csv +++ b/backend/third_party_licenses/persistence_agent.csv @@ -15,7 +15,6 @@ github.com/colinmarc/hdfs,https://github.com/colinmarc/hdfs/blob/9746310a4d31/LI github.com/davecgh/go-spew/spew,https://github.com/davecgh/go-spew/blob/v1.1.1/LICENSE,ISC github.com/doublerebel/bellows,https://github.com/doublerebel/bellows/blob/f177d92a03d3/LICENSE,MIT github.com/emicklei/go-restful/v3,https://github.com/emicklei/go-restful/blob/v3.8.0/LICENSE,MIT -github.com/fsnotify/fsnotify,https://github.com/fsnotify/fsnotify/blob/v1.5.1/LICENSE,BSD-3-Clause github.com/go-logr/logr,https://github.com/go-logr/logr/blob/v1.2.2/LICENSE,Apache-2.0 github.com/go-openapi/errors,https://github.com/go-openapi/errors/blob/v0.20.2/LICENSE,Apache-2.0 github.com/go-openapi/jsonpointer,https://github.com/go-openapi/jsonpointer/blob/v0.19.5/LICENSE,Apache-2.0 @@ -34,7 +33,6 @@ github.com/google/uuid,https://github.com/google/uuid/blob/v1.3.0/LICENSE,BSD-3- github.com/gorilla/websocket,https://github.com/gorilla/websocket/blob/v1.5.0/LICENSE,BSD-2-Clause github.com/grpc-ecosystem/grpc-gateway,https://github.com/grpc-ecosystem/grpc-gateway/blob/v1.16.0/LICENSE.txt,BSD-3-Clause github.com/hashicorp/go-uuid,https://github.com/hashicorp/go-uuid/blob/v1.0.2/LICENSE,MPL-2.0 -github.com/hashicorp/hcl,https://github.com/hashicorp/hcl/blob/v1.0.0/LICENSE,MPL-2.0 github.com/huandu/xstrings,https://github.com/huandu/xstrings/blob/v1.3.2/LICENSE,MIT github.com/imdario/mergo,https://github.com/imdario/mergo/blob/v0.3.12/LICENSE,BSD-3-Clause github.com/jcmturner/gofork,https://github.com/jcmturner/gofork/blob/v1.0.0/LICENSE,BSD-3-Clause @@ -44,7 +42,6 @@ github.com/klauspost/compress/flate,https://github.com/klauspost/compress/blob/v github.com/klauspost/pgzip,https://github.com/klauspost/pgzip/blob/v1.2.5/LICENSE,MIT github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0 github.com/lestrrat-go/strftime,https://github.com/lestrrat-go/strftime/blob/v1.0.4/LICENSE,MIT -github.com/magiconair/properties,https://github.com/magiconair/properties/blob/v1.8.5/LICENSE.md,BSD-2-Clause github.com/mailru/easyjson,https://github.com/mailru/easyjson/blob/v0.7.7/LICENSE,MIT github.com/matttproud/golang_protobuf_extensions/pbutil,https://github.com/matttproud/golang_protobuf_extensions/blob/c182affec369/LICENSE,Apache-2.0 github.com/mitchellh/copystructure,https://github.com/mitchellh/copystructure/blob/v1.2.0/LICENSE,MIT @@ -56,7 +53,6 @@ github.com/modern-go/reflect2,https://github.com/modern-go/reflect2/blob/v1.0.2/ github.com/munnerz/goautoneg,https://github.com/munnerz/goautoneg/blob/a7dc8b61c822/LICENSE,BSD-3-Clause github.com/oklog/ulid,https://github.com/oklog/ulid/blob/v1.3.1/LICENSE,Apache-2.0 github.com/oliveagle/jsonpath,https://github.com/oliveagle/jsonpath/blob/2e52cf6e6852/LICENSE,MIT -github.com/pelletier/go-toml,https://github.com/pelletier/go-toml/blob/v1.9.4/LICENSE,Apache-2.0 github.com/pkg/errors,https://github.com/pkg/errors/blob/v0.9.1/LICENSE,BSD-2-Clause github.com/prometheus/client_golang/prometheus,https://github.com/prometheus/client_golang/blob/v1.12.1/LICENSE,Apache-2.0 github.com/prometheus/client_model/go,https://github.com/prometheus/client_model/blob/v0.2.0/LICENSE,Apache-2.0 @@ -66,12 +62,8 @@ github.com/prometheus/procfs,https://github.com/prometheus/procfs/blob/v0.7.3/LI github.com/robfig/cron/v3,https://github.com/robfig/cron/blob/v3.0.1/LICENSE,MIT github.com/shopspring/decimal,https://github.com/shopspring/decimal/blob/v1.2.0/LICENSE,MIT github.com/sirupsen/logrus,https://github.com/sirupsen/logrus/blob/v1.8.1/LICENSE,MIT -github.com/spf13/afero,https://github.com/spf13/afero/blob/v1.8.0/LICENSE.txt,Apache-2.0 github.com/spf13/cast,https://github.com/spf13/cast/blob/v1.4.1/LICENSE,MIT -github.com/spf13/jwalterweatherman,https://github.com/spf13/jwalterweatherman/blob/v1.1.0/LICENSE,MIT github.com/spf13/pflag,https://github.com/spf13/pflag/blob/v1.0.5/LICENSE,BSD-3-Clause -github.com/spf13/viper,https://github.com/spf13/viper/blob/v1.10.1/LICENSE,MIT -github.com/subosito/gotenv,https://github.com/subosito/gotenv/blob/v1.2.0/LICENSE,MIT github.com/valyala/bytebufferpool,https://github.com/valyala/bytebufferpool/blob/v1.0.0/LICENSE,MIT github.com/valyala/fasttemplate,https://github.com/valyala/fasttemplate/blob/v1.2.1/LICENSE,MIT go.mongodb.org/mongo-driver,https://github.com/mongodb/mongo-go-driver/blob/v1.8.2/LICENSE,Apache-2.0 @@ -86,7 +78,6 @@ google.golang.org/genproto,https://github.com/googleapis/go-genproto/blob/197313 google.golang.org/grpc,https://github.com/grpc/grpc-go/blob/v1.44.0/LICENSE,Apache-2.0 google.golang.org/protobuf,https://github.com/protocolbuffers/protobuf-go/blob/v1.27.1/LICENSE,BSD-3-Clause gopkg.in/inf.v0,https://github.com/go-inf/inf/blob/v0.9.1/LICENSE,BSD-3-Clause -gopkg.in/ini.v1,https://github.com/go-ini/ini/blob/v1.66.3/LICENSE,Apache-2.0 gopkg.in/jcmturner/aescts.v1,https://github.com/jcmturner/aescts/blob/v1.0.1/LICENSE,Apache-2.0 gopkg.in/jcmturner/dnsutils.v1,https://github.com/jcmturner/dnsutils/blob/v1.0.1/LICENSE,Apache-2.0 gopkg.in/jcmturner/gokrb5.v5,https://github.com/jcmturner/gokrb5/blob/v5.3.0/LICENSE,Apache-2.0 diff --git a/manifests/kustomize/base/installs/multi-user/persistence-agent/cluster-role.yaml b/manifests/kustomize/base/installs/multi-user/persistence-agent/cluster-role.yaml index bd1a0f53dfe..84371af2084 100644 --- a/manifests/kustomize/base/installs/multi-user/persistence-agent/cluster-role.yaml +++ b/manifests/kustomize/base/installs/multi-user/persistence-agent/cluster-role.yaml @@ -27,8 +27,9 @@ rules: verbs: - report - apiGroups: - - '' + - pipelines.kubeflow.org resources: - - namespaces + - runs verbs: - - get \ No newline at end of file + - reportMetrics + - readArtifact diff --git a/manifests/kustomize/base/installs/multi-user/persistence-agent/deployment-patch.yaml b/manifests/kustomize/base/installs/multi-user/persistence-agent/deployment-patch.yaml index a5e7a9fc26c..1e165def422 100644 --- a/manifests/kustomize/base/installs/multi-user/persistence-agent/deployment-patch.yaml +++ b/manifests/kustomize/base/installs/multi-user/persistence-agent/deployment-patch.yaml @@ -7,14 +7,7 @@ spec: spec: containers: - name: ml-pipeline-persistenceagent - envFrom: - - configMapRef: - name: persistenceagent-config env: - name: NAMESPACE value: '' valueFrom: null - - name: KUBEFLOW_USERID_HEADER - value: kubeflow-userid - - name: KUBEFLOW_USERID_PREFIX - value: "" \ No newline at end of file diff --git a/manifests/kustomize/base/installs/multi-user/persistence-agent/kustomization.yaml b/manifests/kustomize/base/installs/multi-user/persistence-agent/kustomization.yaml index 560e0fc893c..b1f65469e1d 100644 --- a/manifests/kustomize/base/installs/multi-user/persistence-agent/kustomization.yaml +++ b/manifests/kustomize/base/installs/multi-user/persistence-agent/kustomization.yaml @@ -3,7 +3,3 @@ kind: Kustomization resources: - cluster-role.yaml - cluster-role-binding.yaml -configMapGenerator: -- name: persistenceagent-config - envs: - - params.env \ No newline at end of file diff --git a/manifests/kustomize/base/installs/multi-user/persistence-agent/params.env b/manifests/kustomize/base/installs/multi-user/persistence-agent/params.env deleted file mode 100644 index 4c3bab70f9d..00000000000 --- a/manifests/kustomize/base/installs/multi-user/persistence-agent/params.env +++ /dev/null @@ -1 +0,0 @@ -MULTIUSER=true diff --git a/manifests/kustomize/base/pipeline/ml-pipeline-persistenceagent-deployment.yaml b/manifests/kustomize/base/pipeline/ml-pipeline-persistenceagent-deployment.yaml index 30bea2326a4..0d8b504278f 100644 --- a/manifests/kustomize/base/pipeline/ml-pipeline-persistenceagent-deployment.yaml +++ b/manifests/kustomize/base/pipeline/ml-pipeline-persistenceagent-deployment.yaml @@ -25,10 +25,6 @@ spec: value: "86400" - name: NUM_WORKERS value: "2" - - name: KUBEFLOW_USERID_HEADER - value: kubeflow-userid - - name: KUBEFLOW_USERID_PREFIX - value: "" image: gcr.io/ml-pipeline/persistenceagent:dummy imagePullPolicy: IfNotPresent name: ml-pipeline-persistenceagent diff --git a/manifests/kustomize/base/pipeline/ml-pipeline-persistenceagent-role.yaml b/manifests/kustomize/base/pipeline/ml-pipeline-persistenceagent-role.yaml index 077d556e101..63bdd03d6a5 100644 --- a/manifests/kustomize/base/pipeline/ml-pipeline-persistenceagent-role.yaml +++ b/manifests/kustomize/base/pipeline/ml-pipeline-persistenceagent-role.yaml @@ -27,8 +27,9 @@ rules: verbs: - report - apiGroups: - - '' + - pipelines.kubeflow.org resources: - - namespaces + - runs verbs: - - get \ No newline at end of file + - reportMetrics + - readArtifact