Skip to content

Commit

Permalink
distinguish user error from system error (flyteorg#74)
Browse files Browse the repository at this point in the history
* distinguish user error from system error

* system retries

* plugin version

* fixing tests

* fixing versions

* omitempty

* Update pkg/controller/nodes/task/k8s/plugin_manager.go

Co-Authored-By: Haytham AbuelFutuh <[email protected]>

* Update pkg/apis/flyteworkflow/v1alpha1/node_status.go

Co-Authored-By: Haytham AbuelFutuh <[email protected]>

* system error

* fix increment count

* fix

* .

* Update pkg/controller/nodes/executor.go

Co-Authored-By: Haytham AbuelFutuh <[email protected]>

* adding a unit test

* master

* silly linter

* .

Co-authored-by: Haytham AbuelFutuh <[email protected]>
  • Loading branch information
surindersinghp and EngHabu authored Mar 5, 2020
1 parent 7143d60 commit 1824156
Show file tree
Hide file tree
Showing 13 changed files with 245 additions and 64 deletions.
3 changes: 1 addition & 2 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,7 @@ github.com/lyft/flyteidl v0.17.6 h1:O0qpT6ya45e/92+E84uGOYa0ZsaFoE5ZfPoyJ6e1bEQ=
github.com/lyft/flyteidl v0.17.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.7 h1:tDzRjcUUepGCnR/8/VLdsD9toW28nJ969qzcrJe5O5c=
github.com/lyft/flyteidl v0.17.7/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.9 h1:IOxTQD2u4dPWKmSaLWZVlakyzMGPb6cP2G6/fmZMkkc=
github.com/lyft/flyteplugins v0.3.9/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flyteplugins v0.3.10 h1:AwGv0TKl/RTOESY0+P5v9gyjO67LeHXHZqiMVaa+X4w=
github.com/lyft/flyteplugins v0.3.10/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ type MutableNodeStatus interface {
SetParentTaskID(t *core.TaskExecutionIdentifier)
UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string)
IncrementAttempts() uint32
IncrementSystemFailures() uint32
SetCached()
ResetDirty()

Expand Down Expand Up @@ -254,6 +255,7 @@ type ExecutableNodeStatus interface {
GetOutputDir() DataReference
GetMessage() string
GetAttempts() uint32
GetSystemFailures() uint32
GetWorkflowNodeStatus() ExecutableWorkflowNodeStatus
GetTaskNodeStatus() ExecutableTaskNodeStatus

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ type NodeStatus struct {
DataDir DataReference `json:"-"`
OutputDir DataReference `json:"-"`
Attempts uint32 `json:"attempts"`
SystemFailures uint32 `json:"systemFailures,omitempty"`
Cached bool `json:"cached"`

// This is useful only for branch nodes. If this is set, then it can be used to determine if execution can proceed
Expand Down Expand Up @@ -289,6 +290,10 @@ func (in *NodeStatus) GetAttempts() uint32 {
return in.Attempts
}

func (in *NodeStatus) GetSystemFailures() uint32 {
return in.SystemFailures
}

func (in *NodeStatus) SetCached() {
in.Cached = true
in.SetDirty()
Expand All @@ -304,6 +309,12 @@ func (in *NodeStatus) IncrementAttempts() uint32 {
return in.Attempts
}

func (in *NodeStatus) IncrementSystemFailures() uint32 {
in.SystemFailures++
in.SetDirty()
return in.SystemFailures
}

func (in *NodeStatus) GetOrCreateDynamicNodeStatus() MutableDynamicNodeStatus {
if in.DynamicNodeStatus == nil {
in.SetDirty()
Expand Down Expand Up @@ -559,6 +570,10 @@ func (in *NodeStatus) Equals(other *NodeStatus) bool {
return false
}

if in.SystemFailures != other.SystemFailures {
return false
}

if in.Phase != other.Phase {
return false
}
Expand Down
19 changes: 14 additions & 5 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ var (
RenewDeadline: config.Duration{Duration: time.Second * 10},
RetryPeriod: config.Duration{Duration: time.Second * 2},
},
DefaultDeadlines: DefaultDeadlines{
DefaultNodeExecutionDeadline: config.Duration{Duration: time.Hour * 48},
DefaultNodeActiveDeadline: config.Duration{Duration: time.Hour * 48},
DefaultWorkflowActiveDeadline: config.Duration{Duration: time.Hour * 72},
NodeConfig: NodeConfig{
DefaultDeadlines: DefaultDeadlines{
DefaultNodeExecutionDeadline: config.Duration{Duration: time.Hour * 48},
DefaultNodeActiveDeadline: config.Duration{Duration: time.Hour * 48},
DefaultWorkflowActiveDeadline: config.Duration{Duration: time.Hour * 72},
},
MaxNodeRetriesForSystemFailures: 3,
},
}
)
Expand All @@ -67,7 +70,7 @@ type Config struct {
PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."`
MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"`
KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"`
DefaultDeadlines DefaultDeadlines `json:"default-deadlines,omitempty" pflag:",Default value for timeouts"`
NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"`
}

type KubeClientConfig struct {
Expand Down Expand Up @@ -115,6 +118,12 @@ type WorkqueueConfig struct {
Capacity int `json:"capacity" pflag:",Bucket capacity as number of items"`
}

// configuration for a node
type NodeConfig struct {
DefaultDeadlines DefaultDeadlines `json:"default-deadlines,omitempty" pflag:",Default value for timeouts"`
MaxNodeRetriesForSystemFailures uint32 `json:"max-node-retries-system-failures" pflag:"2,Maximum number of retries per node for node failure due to infra issues"`
}

// Contains default values for timeouts
type DefaultDeadlines struct {
DefaultNodeExecutionDeadline config.Duration `json:"node-execution-deadline" pflag:",Default value of node execution timeout"`
Expand Down
6 changes: 3 additions & 3 deletions flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store")
}

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.DefaultDeadlines, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, kubeClient, catalogClient, scope)
nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, kubeClient, catalogClient, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
}
Expand Down
Loading

0 comments on commit 1824156

Please sign in to comment.