Skip to content

Commit

Permalink
Use low-level controller and handlers in SetupWithManager (#1315)
Browse files Browse the repository at this point in the history
* Use low-level controller and handlers in SetupWithManager

Signed-off-by: Jiaxin Shan <[email protected]>

* Add job validation in reconcile loop

Signed-off-by: Jiaxin Shan <[email protected]>

* Set defaults in onOwnerCreateFunc

Signed-off-by: Jiaxin Shan <[email protected]>

* Correctly update job status in apiserver

Signed-off-by: Jiaxin Shan <[email protected]>

* Remove Flag for Kubeconfig to fix flag redefined

CI reports `flag redefined: kubeconfig` issue and this is due to duplicate flag registration. See #1316 for more details.

Signed-off-by: Jiaxin Shan <[email protected]>

* Fix tensorflow job missing default port issue

This is to fix original controller issue. It leverages init to register default to scheme. In our unversal operator project, we did clean up for register.go and break the case.

For more details, please check #1317 (comment)

Signed-off-by: Jiaxin Shan <[email protected]>
  • Loading branch information
Jeffwan committed Aug 5, 2021
1 parent f152cca commit f0f793b
Show file tree
Hide file tree
Showing 9 changed files with 481 additions and 128 deletions.
2 changes: 1 addition & 1 deletion cmd/tf-operator.v1/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewServerOption() *ServerOption {

// AddFlags adds flags for a specific CMServer to the specified FlagSet.
func (s *ServerOption) AddFlags(fs *flag.FlagSet) {
fs.StringVar(&s.Kubeconfig, "kubeconfig", "", "The path of kubeconfig file")
//fs.StringVar(&s.Kubeconfig, "kubeconfig", "", "The path of kubeconfig file")

fs.StringVar(&s.MasterURL, "master", "",
`The url of the Kubernetes API server,
Expand Down
81 changes: 81 additions & 0 deletions pkg/apis/xgboost/validation/validation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package validation

import (
"fmt"

xgboostv1 "github.com/kubeflow/tf-operator/pkg/apis/xgboost/v1"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"

torchv1 "github.com/kubeflow/tf-operator/pkg/apis/pytorch/v1"
)

func ValidateV1XGBoostJobSpec(c *xgboostv1.XGBoostJobSpec) error {
if c.XGBReplicaSpecs == nil {
return fmt.Errorf("XGBoostJobSpec is not valid")
}
masterExists := false
for rType, value := range c.XGBReplicaSpecs {
if value == nil || len(value.Template.Spec.Containers) == 0 {
return fmt.Errorf("XGBoostJobSpec is not valid: containers definition expected in %v", rType)
}
// Make sure the replica type is valid.
validReplicaTypes := []commonv1.ReplicaType{xgboostv1.XGBoostReplicaTypeMaster, xgboostv1.XGBoostReplicaTypeWorker}

isValidReplicaType := false
for _, t := range validReplicaTypes {
if t == rType {
isValidReplicaType = true
break
}
}

if !isValidReplicaType {
return fmt.Errorf("XGBoostReplicaType is %v but must be one of %v", rType, validReplicaTypes)
}

//Make sure the image is defined in the container
defaultContainerPresent := false
for _, container := range value.Template.Spec.Containers {
if container.Image == "" {
msg := fmt.Sprintf("XGBoostReplicaType is not valid: Image is undefined in the container of %v", rType)
return fmt.Errorf(msg)
}
if container.Name == xgboostv1.DefaultContainerName {
defaultContainerPresent = true
}
}
//Make sure there has at least one container named "pytorch"
if !defaultContainerPresent {
msg := fmt.Sprintf("XGBoostReplicaType is not valid: There is no container named %s in %v", torchv1.DefaultContainerName, rType)
return fmt.Errorf(msg)
}
if rType == xgboostv1.XGBoostReplicaTypeMaster {
masterExists = true
if value.Replicas != nil && int(*value.Replicas) != 1 {
return fmt.Errorf("XGBoostReplicaType is not valid: There must be only 1 master replica")
}
}

}

if !masterExists {
return fmt.Errorf("XGBoostReplicaType is not valid: Master ReplicaSpec must be present")
}
return nil

}
114 changes: 114 additions & 0 deletions pkg/apis/xgboost/validation/validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package validation

import (
"testing"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
xgboostv1 "github.com/kubeflow/tf-operator/pkg/apis/xgboost/v1"

v1 "k8s.io/api/core/v1"
)

func TestValidateV1XGBoostJobSpec(t *testing.T) {
testCases := []xgboostv1.XGBoostJobSpec{
{
XGBReplicaSpecs: nil,
},
{
XGBReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
xgboostv1.XGBoostReplicaTypeWorker: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{},
},
},
},
},
},
{
XGBReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
xgboostv1.XGBoostReplicaTypeWorker: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Image: "",
},
},
},
},
},
},
},
{
XGBReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
xgboostv1.XGBoostReplicaTypeWorker: &commonv1.ReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "",
Image: "gcr.io/kubeflow-ci/xgboost-dist-mnist_test:1.0",
},
},
},
},
},
},
},
{
XGBReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
xgboostv1.XGBoostReplicaTypeMaster: &commonv1.ReplicaSpec{
Replicas: xgboostv1.Int32(2),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "xgboost",
Image: "gcr.io/kubeflow-ci/xgboost-dist-mnist_test:1.0",
},
},
},
},
},
},
},
{
XGBReplicaSpecs: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{
xgboostv1.XGBoostReplicaTypeWorker: &commonv1.ReplicaSpec{
Replicas: xgboostv1.Int32(1),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
v1.Container{
Name: "xgboost",
Image: "gcr.io/kubeflow-ci/xgboost-dist-mnist_test:1.0",
},
},
},
},
},
},
},
}
for _, c := range testCases {
err := ValidateV1XGBoostJobSpec(&c)
if err == nil {
t.Error("Failed validate the v1.XGBoostJobSpec")
}
}
}
73 changes: 73 additions & 0 deletions pkg/common/util/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ package util

import (
"fmt"
"reflect"

"github.com/sirupsen/logrus"

commonutil "github.com/kubeflow/common/pkg/util"

"github.com/kubeflow/common/pkg/controller.v1/common"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
Expand Down Expand Up @@ -54,6 +61,72 @@ func OnDependentCreateFunc(exp expectation.ControllerExpectationsInterface) func
}
}

// OnDependentUpdateFunc modify expectations when dependent (pod/service) update observed.
func OnDependentUpdateFunc(jc *common.JobController) func(updateEvent event.UpdateEvent) bool {
return func(e event.UpdateEvent) bool {
newObj := e.ObjectNew
oldObj := e.ObjectOld
if newObj.GetResourceVersion() == oldObj.GetResourceVersion() {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return false
}

var logger *logrus.Entry
if _, ok := newObj.(*corev1.Pod); ok {
logger = commonutil.LoggerForPod(newObj.(*corev1.Pod), jc.Controller.GetAPIGroupVersionKind().Kind)
}

if _, ok := newObj.(*corev1.Service); ok {
logger = commonutil.LoggerForService(newObj.(*corev1.Service), jc.Controller.GetAPIGroupVersionKind().Kind)
}

newControllerRef := metav1.GetControllerOf(newObj)
oldControllerRef := metav1.GetControllerOf(oldObj)
controllerRefChanged := !reflect.DeepEqual(newControllerRef, oldControllerRef)

if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if job := resolveControllerRef(jc, oldObj.GetName(), oldControllerRef); job != nil {
logger.Infof("pod/service controller ref updated: %v, %v", newObj, oldObj)
return true
}
}

// If it has a controller ref, that's all that matters.
if newControllerRef != nil {
job := resolveControllerRef(jc, newObj.GetNamespace(), newControllerRef)
if job == nil {
return false
}
logger.Debugf("pod/service has a controller ref: %v, %v", newObj, oldObj)
return true
}
return false
}
}

// resolveControllerRef returns the job referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching job
// of the correct Kind.
func resolveControllerRef(jc *common.JobController, namespace string, controllerRef *metav1.OwnerReference) metav1.Object {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != jc.Controller.GetAPIGroupVersionKind().Kind {
return nil
}
job, err := jc.Controller.GetJobFromInformerCache(namespace, controllerRef.Name)
if err != nil {
return nil
}
if job.GetUID() != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return job
}

// OnDependentDeleteFunc modify expectations when dependent (pod/service) deletion observed.
func OnDependentDeleteFunc(exp expectation.ControllerExpectationsInterface) func(event.DeleteEvent) bool {
return func(e event.DeleteEvent) bool {
Expand Down
Loading

0 comments on commit f0f793b

Please sign in to comment.