Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): Caching should not be wrongly shown as successful if the artifacts have been deleted from S3 #7938

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions backend/src/apiserver/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func initMinioClient(initConnectionTimeout time.Duration) storage.ObjectStoreInt
"ObjectStoreConfig.Host", os.Getenv(minioServiceHost))
minioServicePort := common.GetStringConfigWithDefault(
"ObjectStoreConfig.Port", os.Getenv(minioServicePort))
minioServiceRegion := common.GetStringConfigWithDefault(
region := common.GetStringConfigWithDefault(
"ObjectStoreConfig.Region", os.Getenv(minioServiceRegion))
minioServiceSecure := common.GetBoolConfigWithDefault(
"ObjectStoreConfig.Secure", common.GetBoolFromStringWithDefault(os.Getenv(minioServiceSecure), false))
Expand All @@ -399,10 +399,11 @@ func initMinioClient(initConnectionTimeout time.Duration) storage.ObjectStoreInt
disableMultipart := common.GetBoolConfigWithDefault("ObjectStoreConfig.Multipart.Disable", true)

minioClient := client.CreateMinioClientOrFatal(minioServiceHost, minioServicePort, accessKey,
secretKey, minioServiceSecure, minioServiceRegion, initConnectionTimeout)
createMinioBucket(minioClient, bucketName, minioServiceRegion)
secretKey, minioServiceSecure, region, initConnectionTimeout)
createMinioBucket(minioClient, bucketName, region)

return storage.NewMinioObjectStore(&storage.MinioClient{Client: minioClient}, bucketName, pipelinePath, disableMultipart)
return storage.NewMinioObjectStore(&storage.MinioClient{Client: minioClient}, region,
bucketName, pipelinePath, disableMultipart)
}

func createMinioBucket(minioClient *minio.Client, bucketName, region string) {
Expand Down
7 changes: 7 additions & 0 deletions backend/src/apiserver/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ func GetPodNamespace() string {
return GetStringConfig(PodNamespace)
}

func GetFromStringWithDefault(value string, defaultValue string) string {
if len(value) == 0 {
return defaultValue
}
return value
}

func GetBoolFromStringWithDefault(value string, defaultValue bool) bool {
boolVal, err := strconv.ParseBool(value)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions backend/src/apiserver/resource/client_manager_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/kubeflow/pipelines/backend/src/apiserver/auth"
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
apiserver_storage "github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
)

Expand All @@ -43,6 +44,7 @@ type FakeClientManager struct {
ArgoClientFake *client.FakeArgoClient
swfClientFake *client.FakeSwfClient
k8sCoreClientFake *client.FakeKuberneteCoreClient
apiClient apiserver_storage.ObjectStoreInterface
SubjectAccessReviewClientFake client.SubjectAccessReviewInterface
tokenReviewClientFake client.TokenReviewInterface
logArchive archive.LogArchiveInterface
Expand Down Expand Up @@ -179,6 +181,10 @@ func (f *FakeClientManager) KubernetesCoreClient() client.KubernetesCoreInterfac
return f.k8sCoreClientFake
}

func (f *FakeClientManager) MinioClient() apiserver_storage.ObjectStoreInterface {
return f.apiClient
}

func (f *FakeClientManager) SubjectAccessReviewClient() client.SubjectAccessReviewInterface {
return f.SubjectAccessReviewClientFake
}
Expand Down
11 changes: 8 additions & 3 deletions backend/src/apiserver/storage/object_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/ghodss/yaml"
"github.com/kubeflow/pipelines/backend/src/common/util"
minio "github.com/minio/minio-go"
"github.com/minio/minio-go"
)

const (
Expand All @@ -44,6 +44,7 @@ type MinioObjectStore struct {
bucketName string
baseFolder string
disableMultipart bool
region string
}

// GetPipelineKey adds the configured base folder to pipeline id.
Expand Down Expand Up @@ -126,6 +127,10 @@ func buildPath(folder, file string) string {
return folder + "/" + file
}

func NewMinioObjectStore(minioClient MinioClientInterface, bucketName string, baseFolder string, disableMultipart bool) *MinioObjectStore {
return &MinioObjectStore{minioClient: minioClient, bucketName: bucketName, baseFolder: baseFolder, disableMultipart: disableMultipart}
func NewMinioObjectStore(minioClient MinioClientInterface, region string, bucketName string,
baseFolder string, disableMultipart bool) *MinioObjectStore {
return &MinioObjectStore{minioClient: minioClient, region: region,
juliusvonkohout marked this conversation as resolved.
Show resolved Hide resolved
bucketName: bucketName, baseFolder: baseFolder,
disableMultipart: disableMultipart,
}
}
2 changes: 1 addition & 1 deletion backend/src/apiserver/storage/object_store_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ package storage

// Return the object store with faked minio client.
func NewFakeObjectStore() ObjectStoreInterface {
return NewMinioObjectStore(NewFakeMinioClient(), "", "pipelines", false)
return NewMinioObjectStore(NewFakeMinioClient(), "", "", "pipelines", false)
}
24 changes: 20 additions & 4 deletions backend/src/cache/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ package main

import (
"database/sql"
"encoding/json"
"fmt"
"log"
"time"

"encoding/json"
"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/jinzhu/gorm"
apiserver_client "github.com/kubeflow/pipelines/backend/src/apiserver/client"
apiserver_storage "github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/cache/client"
"github.com/kubeflow/pipelines/backend/src/cache/model"
"github.com/kubeflow/pipelines/backend/src/cache/storage"
Expand All @@ -38,6 +40,7 @@ type ClientManager struct {
db *storage.DB
cacheStore storage.ExecutionCacheStoreInterface
k8sCoreClient client.KubernetesCoreInterface
minioClient apiserver_storage.ObjectStoreInterface
time util.TimeInterface
}

Expand All @@ -49,18 +52,31 @@ func (c *ClientManager) KubernetesCoreClient() client.KubernetesCoreInterface {
return c.k8sCoreClient
}

func (c *ClientManager) MinioClient() apiserver_storage.ObjectStoreInterface {
return c.minioClient
}

func (c *ClientManager) Close() {
c.db.Close()
}

func (c *ClientManager) init(params WhSvrDBParameters, clientParams util.ClientParameters) {
func (c *ClientManager) init(params WhSvrDBParameters, clientParams util.ClientParameters, s3params S3Params) {
timeoutDuration, _ := time.ParseDuration(DefaultConnectionTimeout)
db := initDBClient(params, timeoutDuration)

c.time = util.NewRealTime()
c.db = db
c.cacheStore = storage.NewExecutionCacheStore(db, c.time)
c.k8sCoreClient = client.CreateKubernetesCoreOrFatal(timeoutDuration, clientParams)
c.minioClient = NewMinioClient(timeoutDuration, s3params)
}

func NewMinioClient(initConnectionTimeout time.Duration, params S3Params) apiserver_storage.ObjectStoreInterface {
glog.Infof("NewMinioClient %v", params)
minioClient := apiserver_client.CreateMinioClientOrFatal(params.serviceHost, params.servicePort,
params.accessKey, params.secretKey, params.isServiceSecure, params.region, initConnectionTimeout)
return apiserver_storage.NewMinioObjectStore(&apiserver_storage.MinioClient{Client: minioClient},
params.region, params.bucketName, params.pipelinePath, params.disableMultipart)
}

func initDBClient(params WhSvrDBParameters, initConnectionTimeout time.Duration) *storage.DB {
Expand Down Expand Up @@ -169,9 +185,9 @@ func initMysql(params WhSvrDBParameters, initConnectionTimeout time.Duration) st
return mysqlConfig.FormatDSN()
}

func NewClientManager(params WhSvrDBParameters, clientParams util.ClientParameters) ClientManager {
func NewClientManager(params WhSvrDBParameters, clientParams util.ClientParameters, s3params S3Params) ClientManager {
clientManager := ClientManager{}
clientManager.init(params, clientParams)
clientManager.init(params, clientParams, s3params)

return clientManager
}
36 changes: 35 additions & 1 deletion backend/src/cache/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"flag"
"log"
"net/http"
"os"
"path/filepath"

"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/cache/server"
"github.com/kubeflow/pipelines/backend/src/common/util"
)
Expand Down Expand Up @@ -57,9 +59,22 @@ type WhSvrDBParameters struct {
namespaceToWatch string
}

type S3Params struct {
serviceHost string
servicePort string
region string
accessKey string
secretKey string
isServiceSecure bool
bucketName string
pipelinePath string
disableMultipart bool
}

func main() {
var params WhSvrDBParameters
var clientParams util.ClientParameters
var s3params S3Params
var certFile string
var keyFile string

Expand All @@ -81,10 +96,12 @@ func main() {
flag.StringVar(&certFile, "tls_cert_filename", TLSCertFileDefault, "The TLS certificate filename.")
flag.StringVar(&keyFile, "tls_key_filename", TLSKeyFileDefault, "The TLS key filename.")

s3params = parseS3Flags()
//s3 endpoint params
flag.Parse()

log.Println("Initing client manager....")
clientManager := NewClientManager(params, clientParams)
clientManager := NewClientManager(params, clientParams, s3params)
ctx := context.Background()
go server.WatchPods(ctx, params.namespaceToWatch, &clientManager)

Expand All @@ -101,3 +118,20 @@ func main() {
}
log.Fatal(server.ListenAndServeTLS(certPath, keyPath))
}

func parseS3Flags() S3Params {
var params S3Params
flag.StringVar(&params.serviceHost, "s3_service_host", common.GetFromStringWithDefault(os.Getenv("SERVICE_HOST"), "minio-service.kubeflow.svc"), "hostname of S3.")
flag.StringVar(&params.servicePort, "s3_service_port", common.GetFromStringWithDefault(os.Getenv("SERVICE_PORT"), "9000"), "port of S3.")
flag.StringVar(&params.region, "s3_region", common.GetFromStringWithDefault(os.Getenv("SERVICE_REGION"), ""), "Region of S3 service.")
flag.StringVar(&params.accessKey, "s3_access_key", os.Getenv("OBJECTSTORECONFIG_ACCESSKEY"), "accessKey of S3.")
flag.StringVar(&params.secretKey, "s3_secret_key", os.Getenv("OBJECTSTORECONFIG_SECRETACCESSKEY"), "secretKey of S3.")
flag.BoolVar(&params.isServiceSecure, "s3_is_service_secure",
common.GetBoolFromStringWithDefault(os.Getenv("SERVICE_SECURE"), false), "is ssl enabled on S3.")
flag.StringVar(&params.bucketName, "s3_bucketname", os.Getenv("PIPELINE_BUCKET_NAME"), "default bucketname on S3.")
flag.StringVar(&params.pipelinePath, "s3_pipelinepath", "pipelines", "pipelinepath of S3.")
flag.BoolVar(&params.disableMultipart, "s3_disable_multipart",
common.GetBoolConfigWithDefault(os.Getenv("PIPELINE_BUCKET_DISABLE_MULTIPART"), true), "if multipart request are disabled on S3.")
log.Printf("S3Params %v\n", params)
return params
}
6 changes: 6 additions & 0 deletions backend/src/cache/server/client_manager_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server

import (
"github.com/golang/glog"
apiserver_storage "github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/cache/client"
"github.com/kubeflow/pipelines/backend/src/cache/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
Expand All @@ -25,6 +26,7 @@ type FakeClientManager struct {
db *storage.DB
cacheStore storage.ExecutionCacheStoreInterface
k8sCoreClientFake *client.FakeKuberneteCoreClient
apiClient apiserver_storage.ObjectStoreInterface
time util.TimeInterface
}

Expand Down Expand Up @@ -73,3 +75,7 @@ func (f *FakeClientManager) Close() error {
func (f *FakeClientManager) KubernetesCoreClient() client.KubernetesCoreInterface {
return f.k8sCoreClientFake
}

func (f *FakeClientManager) MinioClient() apiserver_storage.ObjectStoreInterface {
return f.apiClient
}
82 changes: 82 additions & 0 deletions backend/src/cache/server/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
"strconv"
"strings"

apiserver_storage "github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/cache/client"
"github.com/kubeflow/pipelines/backend/src/cache/model"
"github.com/kubeflow/pipelines/backend/src/cache/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
"k8s.io/api/admission/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -61,6 +63,7 @@ var (
type ClientManagerInterface interface {
CacheStore() storage.ExecutionCacheStoreInterface
KubernetesCoreClient() client.KubernetesCoreInterface
MinioClient() apiserver_storage.ObjectStoreInterface
}

// MutatePodIfCached will check whether the execution has already been run before from MLMD and apply the output into pod.metadata.output
Expand Down Expand Up @@ -127,9 +130,21 @@ func MutatePodIfCached(req *v1beta1.AdmissionRequest, clientMgr ClientManagerInt

var cachedExecution *model.ExecutionCache
cachedExecution, err = clientMgr.CacheStore().GetExecutionCache(executionHashKey, maxCacheStalenessInSeconds)
cacheKeyWithFileName := getCacheItemKey(cachedExecution)
log.Println("cache key extracted from output: " + cacheKeyWithFileName)

if err != nil {
log.Println(err.Error())
}

cachedExecution, err = retrieveCacheItemIfReallyExists(clientMgr.MinioClient(), cachedExecution, cacheKeyWithFileName)

if err != nil {
log.Printf("deleteItemFromCacheStore %s ", executionHashKey)
err = clientMgr.CacheStore().DeleteExecutionCache(executionHashKey)
log.Println(err)
}

// Found cached execution, add cached output and cache_id and replace container images.
if cachedExecution != nil {
log.Println("Cached output: " + cachedExecution.ExecutionOutput)
Expand Down Expand Up @@ -210,6 +225,73 @@ func MutatePodIfCached(req *v1beta1.AdmissionRequest, clientMgr ClientManagerInt
return patches, nil
}

//retrieveCacheItemIfReallyExists checks the objects
func retrieveCacheItemIfReallyExists(minioClient apiserver_storage.ObjectStoreInterface, execution *model.ExecutionCache, fileName string) (*model.ExecutionCache, error) {
if execution != nil && minioClient != nil {
log.Printf("retrieveCacheItemIfReallyExists %s ", execution.ExecutionTemplate)
bucket, key := getCacheItemS3Data(execution)
log.Printf("bucket %s , key: %s, filename: %s", bucket, key, fileName)
if minioClient != nil {
result, err := minioClient.GetFile(fileName)
log.Printf("cache retrieve result length %d %v", len(result), err)
if len(result) == 0 || err != nil {
return nil, util.NewInternalServerError(err, "Failed to get %v", key)
}
}
}
return execution, nil
}

func getCacheItemS3Data(execution *model.ExecutionCache) (string, string) {
key := ""
bucket := ""
var executionTemplate map[string]interface{}
b := []byte(execution.ExecutionTemplate)
err := json.Unmarshal(b, &executionTemplate)
if err != nil {
panic(err)
return bucket, key
}
if archiveLocation, ok := executionTemplate["archiveLocation"]; ok {
if s3, ok := archiveLocation.(map[string]interface{})["s3"]; ok {
if s3key, ok := s3.(map[string]interface{})["key"]; ok {
key = s3key.(string)
}
if s3bucket, ok := s3.(map[string]interface{})["bucket"]; ok {
bucket = s3bucket.(string)
}
}
}
return bucket, key
}

func getCacheItemKey(execution *model.ExecutionCache) string {
var output map[string]interface{}
if execution != nil {
b := []byte(execution.ExecutionOutput)
err := json.Unmarshal(b, &output)
if err != nil {
panic(err)
}
if output, ok := output["workflows.argoproj.io/outputs"]; ok {
var artifacts map[string]interface{}
if err := json.Unmarshal([]byte(output.(string)), &artifacts); err != nil {
panic(err)
}
if artifacts, ok := artifacts["artifacts"]; ok {
for _, artifact := range artifacts.([]interface{}) {
if s3, ok := artifact.(map[string]interface{})["s3"]; ok {
if s3key, ok := s3.(map[string]interface{})["key"]; ok {
return s3key.(string)
}
}
}
}
}
}
return ""
}

// intersectStructureWithSkeleton recursively intersects two maps
// nil values in the skeleton map mean that the whole value (which can also be a map) should be kept.
func intersectStructureWithSkeleton(src map[string]interface{}, skeleton map[string]interface{}) map[string]interface{} {
Expand Down
Loading