Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Transition to Queue if the JobCondition is empty (#387)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Sep 1, 2023
1 parent 22e5d13 commit fdb4c55
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,6 @@ type ReplicaEntry struct {
RestartPolicy commonOp.RestartPolicy
}

// ExtractMPICurrentCondition will return the first job condition for MPI
func ExtractMPICurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.JobCondition, error) {
if jobConditions != nil {
sort.Slice(jobConditions, func(i, j int) bool {
return jobConditions[i].LastTransitionTime.Time.After(jobConditions[j].LastTransitionTime.Time)
})

for _, jc := range jobConditions {
if jc.Status == v1.ConditionTrue {
return jc, nil
}
}
}

return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)
}

// ExtractCurrentCondition will return the first job condition for tensorflow/pytorch
func ExtractCurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.JobCondition, error) {
if jobConditions != nil {
Expand All @@ -60,14 +43,17 @@ func ExtractCurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.Jo
return jc, nil
}
}
return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)
}

return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)
return commonOp.JobCondition{}, nil
}

// GetPhaseInfo will return the phase of kubeflow job
func GetPhaseInfo(currentCondition commonOp.JobCondition, occurredAt time.Time,
taskPhaseInfo pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error) {
if len(currentCondition.Type) == 0 {
return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil
}
switch currentCondition.Type {
case commonOp.JobCreated:
return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestExtractMPICurrentCondition(t *testing.T) {
func TestExtractCurrentCondition(t *testing.T) {
jobCreated := commonOp.JobCondition{
Type: commonOp.JobCreated,
Status: corev1.ConditionTrue,
Expand All @@ -31,46 +31,39 @@ func TestExtractMPICurrentCondition(t *testing.T) {
jobCreated,
jobRunningActive,
}
currentCondition, err := ExtractMPICurrentCondition(jobConditions)
currentCondition, err := ExtractCurrentCondition(jobConditions)
assert.NoError(t, err)
assert.Equal(t, currentCondition, jobCreated)

jobConditions = nil
currentCondition, err = ExtractMPICurrentCondition(jobConditions)
assert.Error(t, err)
currentCondition, err = ExtractCurrentCondition(jobConditions)
assert.NoError(t, err)
assert.Equal(t, currentCondition, commonOp.JobCondition{})
assert.Equal(t, err, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions))
}

func TestExtractCurrentCondition(t *testing.T) {
jobCreated := commonOp.JobCondition{
Type: commonOp.JobCreated,
Status: corev1.ConditionTrue,
}
jobRunningActive := commonOp.JobCondition{
Type: commonOp.JobRunning,
Status: corev1.ConditionFalse,
}
jobConditions := []commonOp.JobCondition{
jobCreated,
jobRunningActive,
}
currentCondition, err := ExtractCurrentCondition(jobConditions)
currentCondition, err = ExtractCurrentCondition(nil)
assert.NoError(t, err)
assert.Equal(t, currentCondition, jobCreated)
assert.Equal(t, currentCondition, commonOp.JobCondition{})

jobConditions = nil
jobUnknown := commonOp.JobCondition{Type: "unknown"}
jobConditions = []commonOp.JobCondition{jobUnknown}
currentCondition, err = ExtractCurrentCondition(jobConditions)
assert.Error(t, err)
assert.Equal(t, currentCondition, commonOp.JobCondition{})
assert.Equal(t, err, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions))
}

func TestGetPhaseInfo(t *testing.T) {
jobCreating := commonOp.JobCondition{}
taskPhase, err := GetPhaseInfo(jobCreating, time.Now(), pluginsCore.TaskInfo{})
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase())
assert.NotNil(t, taskPhase.Info())
assert.Nil(t, err)

jobCreated := commonOp.JobCondition{
Type: commonOp.JobCreated,
}
taskPhase, err := GetPhaseInfo(jobCreated, time.Now(), pluginsCore.TaskInfo{})
taskPhase, err = GetPhaseInfo(jobCreated, time.Now(), pluginsCore.TaskInfo{})
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase())
assert.NotNil(t, taskPhase.Info())
Expand Down
32 changes: 32 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/kfoperators/common/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package common

import (
"time"

pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config"
"github.com/flyteorg/flytestdlib/config"
)

//go:generate pflags Config --default-var=defaultConfig

var (
defaultConfig = Config{
Timeout: config.Duration{Duration: 1 * time.Minute},
}

configSection = pluginsConfig.MustRegisterSubSection("kf-operator", &defaultConfig)
)

// Config is config for 'pytorch' plugin
type Config struct {
// If kubeflow operator doesn't update the status of the task after this timeout, the task will be considered failed.
Timeout config.Duration `json:"timeout,omitempty"`
}

func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

func SetConfig(cfg *Config) error {
return configSection.SetConfig(cfg)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ func (mpiOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginContext
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
currentCondition, err := common.ExtractMPICurrentCondition(app.Status.Conditions)
if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the mpi custom resource since creation time %v", app.CreationTimestamp)
}
currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
Expand All @@ -223,7 +226,6 @@ func (mpiOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginContext
}

return common.GetMPIPhaseInfo(currentCondition, occurredAt, taskPhaseInfo)

}

func init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func dummyMPIJobResource(mpiResourceHandler mpiOperatorResourceHandler,
Status: mpiOp.JobStatus{
Conditions: jobConditions,
ReplicaStatuses: nil,
StartTime: nil,
StartTime: &v1.Time{Time: time.Now()},
CompletionTime: nil,
LastReconcileTime: nil,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ func (pytorchOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginCont
return pluginsCore.PhaseInfoUndefined, err
}

if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the pytorch custom resource since creation time %v", app.CreationTimestamp)
}
currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,9 @@ func dummyPytorchJobResource(pytorchResourceHandler pytorchOperatorResourceHandl

return &kubeflowv1.PyTorchJob{
ObjectMeta: v1.ObjectMeta{
Name: jobName,
Namespace: jobNamespace,
CreationTimestamp: v1.Time{Time: time.Now()},
Name: jobName,
Namespace: jobNamespace,
},
Spec: resource.(*kubeflowv1.PyTorchJob).Spec,
Status: commonOp.JobStatus{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ func (tensorflowOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginC
return pluginsCore.PhaseInfoUndefined, err
}

if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the tensorflow custom resource since creation time %v", app.CreationTimestamp)
}

currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func dummyTensorFlowJobResource(tensorflowResourceHandler tensorflowOperatorReso
Status: commonOp.JobStatus{
Conditions: jobConditions,
ReplicaStatuses: nil,
StartTime: nil,
StartTime: &v1.Time{Time: time.Now()},
CompletionTime: nil,
LastReconcileTime: nil,
},
Expand Down

0 comments on commit fdb4c55

Please sign in to comment.