Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Transition to Queue if the JobCondition is empty #387

Merged
merged 12 commits into from
Sep 1, 2023
24 changes: 5 additions & 19 deletions go/tasks/plugins/k8s/kfoperators/common/common_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,6 @@ type ReplicaEntry struct {
RestartPolicy commonOp.RestartPolicy
}

// ExtractMPICurrentCondition will return the first job condition for MPI
func ExtractMPICurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.JobCondition, error) {
if jobConditions != nil {
sort.Slice(jobConditions, func(i, j int) bool {
return jobConditions[i].LastTransitionTime.Time.After(jobConditions[j].LastTransitionTime.Time)
})

for _, jc := range jobConditions {
if jc.Status == v1.ConditionTrue {
return jc, nil
}
}
}

return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)
}

// ExtractCurrentCondition will return the first job condition for tensorflow/pytorch
func ExtractCurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.JobCondition, error) {
if jobConditions != nil {
Expand All @@ -60,14 +43,17 @@ func ExtractCurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.Jo
return jc, nil
}
}
return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)
}

return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)
return commonOp.JobCondition{}, nil
}

// GetPhaseInfo will return the phase of kubeflow job
func GetPhaseInfo(currentCondition commonOp.JobCondition, occurredAt time.Time,
taskPhaseInfo pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error) {
if len(currentCondition.Type) == 0 {
return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil
}
switch currentCondition.Type {
case commonOp.JobCreated:
return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil
Expand Down
39 changes: 16 additions & 23 deletions go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestExtractMPICurrentCondition(t *testing.T) {
func TestExtractCurrentCondition(t *testing.T) {
jobCreated := commonOp.JobCondition{
Type: commonOp.JobCreated,
Status: corev1.ConditionTrue,
Expand All @@ -31,46 +31,39 @@ func TestExtractMPICurrentCondition(t *testing.T) {
jobCreated,
jobRunningActive,
}
currentCondition, err := ExtractMPICurrentCondition(jobConditions)
currentCondition, err := ExtractCurrentCondition(jobConditions)
assert.NoError(t, err)
assert.Equal(t, currentCondition, jobCreated)

jobConditions = nil
currentCondition, err = ExtractMPICurrentCondition(jobConditions)
assert.Error(t, err)
currentCondition, err = ExtractCurrentCondition(jobConditions)
assert.NoError(t, err)
assert.Equal(t, currentCondition, commonOp.JobCondition{})
assert.Equal(t, err, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions))
}

func TestExtractCurrentCondition(t *testing.T) {
jobCreated := commonOp.JobCondition{
Type: commonOp.JobCreated,
Status: corev1.ConditionTrue,
}
jobRunningActive := commonOp.JobCondition{
Type: commonOp.JobRunning,
Status: corev1.ConditionFalse,
}
jobConditions := []commonOp.JobCondition{
jobCreated,
jobRunningActive,
}
currentCondition, err := ExtractCurrentCondition(jobConditions)
currentCondition, err = ExtractCurrentCondition(nil)
assert.NoError(t, err)
assert.Equal(t, currentCondition, jobCreated)
assert.Equal(t, currentCondition, commonOp.JobCondition{})

jobConditions = nil
jobUnknown := commonOp.JobCondition{Type: "unknown"}
jobConditions = []commonOp.JobCondition{jobUnknown}
currentCondition, err = ExtractCurrentCondition(jobConditions)
assert.Error(t, err)
assert.Equal(t, currentCondition, commonOp.JobCondition{})
assert.Equal(t, err, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions))
}

func TestGetPhaseInfo(t *testing.T) {
jobCreating := commonOp.JobCondition{}
taskPhase, err := GetPhaseInfo(jobCreating, time.Now(), pluginsCore.TaskInfo{})
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase())
assert.NotNil(t, taskPhase.Info())
assert.Nil(t, err)

jobCreated := commonOp.JobCondition{
Type: commonOp.JobCreated,
}
taskPhase, err := GetPhaseInfo(jobCreated, time.Now(), pluginsCore.TaskInfo{})
taskPhase, err = GetPhaseInfo(jobCreated, time.Now(), pluginsCore.TaskInfo{})
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase())
assert.NotNil(t, taskPhase.Info())
Expand Down
3 changes: 1 addition & 2 deletions go/tasks/plugins/k8s/kfoperators/mpi/mpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (mpiOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginContext
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
currentCondition, err := common.ExtractMPICurrentCondition(app.Status.Conditions)
currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
Expand All @@ -223,7 +223,6 @@ func (mpiOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginContext
}

return common.GetMPIPhaseInfo(currentCondition, occurredAt, taskPhaseInfo)

}

func init() {
Expand Down
31 changes: 31 additions & 0 deletions go/tasks/plugins/k8s/kfoperators/pytorch/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package pytorch

import (
pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config"
"github.com/flyteorg/flytestdlib/config"
"time"
)

//go:generate pflags Config --default-var=defaultConfig

var (
defaultConfig = Config{
Timeout: config.Duration{Duration: 1 * time.Minute},
}
hamersaw marked this conversation as resolved.
Show resolved Hide resolved

configSection = pluginsConfig.MustRegisterSubSection("pytorch", &defaultConfig)
)

// Config is config for 'ray' plugin
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
type Config struct {
// If kubeflow operator doesn't update the status of the task after this timeout, the task will be considered failed.
Timeout config.Duration `json:"timeout,omitempty"`
}

func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

func SetConfig(cfg *Config) error {
return configSection.SetConfig(cfg)
}
55 changes: 55 additions & 0 deletions go/tasks/plugins/k8s/kfoperators/pytorch/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

116 changes: 116 additions & 0 deletions go/tasks/plugins/k8s/kfoperators/pytorch/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ func (pytorchOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginCont
return pluginsCore.PhaseInfoUndefined, err
}

if app.Status.StartTime == nil && app.CreationTimestamp.Add(GetConfig().Timeout.Duration).Before(time.Now()) {
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the pytorch CR since creation time %v", app.CreationTimestamp)
}
currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
Expand Down
Loading