Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use low-level controller and handlers in SetupWithManager #1315

Merged
merged 6 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/tf-operator.v1/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewServerOption() *ServerOption {

// AddFlags adds flags for a specific CMServer to the specified FlagSet.
func (s *ServerOption) AddFlags(fs *flag.FlagSet) {
fs.StringVar(&s.Kubeconfig, "kubeconfig", "", "The path of kubeconfig file")
//fs.StringVar(&s.Kubeconfig, "kubeconfig", "", "The path of kubeconfig file")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it temporary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can not find a way to workaround it. User can still use KUBECONFIG to specify the path of config files. This is the only user facing change to tf controller.v1 users.

Next step is to build universal operator to replace tf controller and hit against all test case. Once conformance test pass. We will remove tf controller.v1


fs.StringVar(&s.MasterURL, "master", "",
`The url of the Kubernetes API server,
Expand Down
81 changes: 81 additions & 0 deletions pkg/apis/xgboost/validation/validation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package validation

import (
"fmt"

xgboostv1 "github.com/kubeflow/tf-operator/pkg/apis/xgboost/v1"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"

torchv1 "github.com/kubeflow/tf-operator/pkg/apis/pytorch/v1"
)

func ValidateV1XGBoostJobSpec(c *xgboostv1.XGBoostJobSpec) error {
if c.XGBReplicaSpecs == nil {
return fmt.Errorf("XGBoostJobSpec is not valid")
}
masterExists := false
for rType, value := range c.XGBReplicaSpecs {
if value == nil || len(value.Template.Spec.Containers) == 0 {
return fmt.Errorf("XGBoostJobSpec is not valid: containers definition expected in %v", rType)
}
// Make sure the replica type is valid.
validReplicaTypes := []commonv1.ReplicaType{xgboostv1.XGBoostReplicaTypeMaster, xgboostv1.XGBoostReplicaTypeWorker}

isValidReplicaType := false
for _, t := range validReplicaTypes {
if t == rType {
isValidReplicaType = true
break
}
}

if !isValidReplicaType {
return fmt.Errorf("XGBoostReplicaType is %v but must be one of %v", rType, validReplicaTypes)
}

//Make sure the image is defined in the container
defaultContainerPresent := false
for _, container := range value.Template.Spec.Containers {
if container.Image == "" {
msg := fmt.Sprintf("XGBoostReplicaType is not valid: Image is undefined in the container of %v", rType)
return fmt.Errorf(msg)
}
if container.Name == xgboostv1.DefaultContainerName {
defaultContainerPresent = true
}
}
//Make sure there has at least one container named "pytorch"
if !defaultContainerPresent {
msg := fmt.Sprintf("XGBoostReplicaType is not valid: There is no container named %s in %v", torchv1.DefaultContainerName, rType)
return fmt.Errorf(msg)
}
if rType == xgboostv1.XGBoostReplicaTypeMaster {
masterExists = true
if value.Replicas != nil && int(*value.Replicas) != 1 {
return fmt.Errorf("XGBoostReplicaType is not valid: There must be only 1 master replica")
}
}

}

if !masterExists {
return fmt.Errorf("XGBoostReplicaType is not valid: Master ReplicaSpec must be present")
}
return nil

}
114 changes: 114 additions & 0 deletions pkg/apis/xgboost/validation/validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package validation

import (
"testing"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
xgboostv1 "github.com/kubeflow/tf-operator/pkg/apis/xgboost/v1"

v1 "k8s.io/api/core/v1"
)

func TestValidateV1XGBoostJobSpec(t *testing.T) {
testCases := []xgboostv1.XGBoostJobSpec{
{
XGBReplicaSpecs: nil,
},
{
XGBReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
xgboostv1.XGBoostReplicaTypeWorker: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{},
},
},
},
},
},
{
XGBReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
xgboostv1.XGBoostReplicaTypeWorker: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Image: "",
},
},
},
},
},
},
},
{
XGBReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
xgboostv1.XGBoostReplicaTypeWorker: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "",
Image: "gcr.io/kubeflow-ci/xgboost-dist-mnist_test:1.0",
},
},
},
},
},
},
},
{
XGBReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
xgboostv1.XGBoostReplicaTypeMaster: &commonv1.ReplicaSpec{
Replicas: xgboostv1.Int32(2),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "xgboost",
Image: "gcr.io/kubeflow-ci/xgboost-dist-mnist_test:1.0",
},
},
},
},
},
},
},
{
XGBReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
xgboostv1.XGBoostReplicaTypeWorker: &commonv1.ReplicaSpec{
Replicas: xgboostv1.Int32(1),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "xgboost",
Image: "gcr.io/kubeflow-ci/xgboost-dist-mnist_test:1.0",
},
},
},
},
},
},
},
}
for _, c := range testCases {
err := ValidateV1XGBoostJobSpec(&c)
if err == nil {
t.Error("Failed validate the v1.XGBoostJobSpec")
}
}
}
73 changes: 73 additions & 0 deletions pkg/common/util/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ package util

import (
"fmt"
"reflect"

"github.com/sirupsen/logrus"

commonutil "github.com/kubeflow/common/pkg/util"

"github.com/kubeflow/common/pkg/controller.v1/common"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
Expand Down Expand Up @@ -54,6 +61,72 @@ func OnDependentCreateFunc(exp expectation.ControllerExpectationsInterface) func
}
}

// OnDependentUpdateFunc modify expectations when dependent (pod/service) update observed.
func OnDependentUpdateFunc(jc *common.JobController) func(updateEvent event.UpdateEvent) bool {
return func(e event.UpdateEvent) bool {
newObj := e.ObjectNew
oldObj := e.ObjectOld
if newObj.GetResourceVersion() == oldObj.GetResourceVersion() {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return false
}

var logger *logrus.Entry
if _, ok := newObj.(*corev1.Pod); ok {
logger = commonutil.LoggerForPod(newObj.(*corev1.Pod), jc.Controller.GetAPIGroupVersionKind().Kind)
}

if _, ok := newObj.(*corev1.Service); ok {
logger = commonutil.LoggerForService(newObj.(*corev1.Service), jc.Controller.GetAPIGroupVersionKind().Kind)
}

newControllerRef := metav1.GetControllerOf(newObj)
oldControllerRef := metav1.GetControllerOf(oldObj)
controllerRefChanged := !reflect.DeepEqual(newControllerRef, oldControllerRef)

if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if job := resolveControllerRef(jc, oldObj.GetName(), oldControllerRef); job != nil {
logger.Infof("pod/service controller ref updated: %v, %v", newObj, oldObj)
return true
}
}

// If it has a controller ref, that's all that matters.
if newControllerRef != nil {
job := resolveControllerRef(jc, newObj.GetNamespace(), newControllerRef)
if job == nil {
return false
}
logger.Debugf("pod/service has a controller ref: %v, %v", newObj, oldObj)
return true
}
return false
}
}

// resolveControllerRef returns the job referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching job
// of the correct Kind.
func resolveControllerRef(jc *common.JobController, namespace string, controllerRef *metav1.OwnerReference) metav1.Object {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != jc.Controller.GetAPIGroupVersionKind().Kind {
return nil
}
job, err := jc.Controller.GetJobFromInformerCache(namespace, controllerRef.Name)
if err != nil {
return nil
}
if job.GetUID() != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return job
}

// OnDependentDeleteFunc modify expectations when dependent (pod/service) deletion observed.
func OnDependentDeleteFunc(exp expectation.ControllerExpectationsInterface) func(event.DeleteEvent) bool {
return func(e event.DeleteEvent) bool {
Expand Down
Loading