Skip to content

Commit

Permalink
distinguish user error from system error (flyteorg#74)
Browse files Browse the repository at this point in the history
* distinguish user error from system error

* system retries

* plugin version

* fixing tests

* fixing versions

* omitempty

* Update pkg/controller/nodes/task/k8s/plugin_manager.go

Co-Authored-By: Haytham AbuelFutuh <[email protected]>

* Update pkg/apis/flyteworkflow/v1alpha1/node_status.go

Co-Authored-By: Haytham AbuelFutuh <[email protected]>

* system error

* fix increment count

* fix

* .

* Update pkg/controller/nodes/executor.go

Co-Authored-By: Haytham AbuelFutuh <[email protected]>

* adding a unit test

* master

* silly linter

* .

Co-authored-by: Haytham AbuelFutuh <[email protected]>
  • Loading branch information
surindersinghp and EngHabu committed Mar 5, 2020
1 parent f7f5495 commit 3a8ddf0
Show file tree
Hide file tree
Showing 13 changed files with 245 additions and 64 deletions.
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,7 @@ github.com/lyft/flyteidl v0.17.6 h1:O0qpT6ya45e/92+E84uGOYa0ZsaFoE5ZfPoyJ6e1bEQ=
github.com/lyft/flyteidl v0.17.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.7 h1:tDzRjcUUepGCnR/8/VLdsD9toW28nJ969qzcrJe5O5c=
github.com/lyft/flyteidl v0.17.7/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.9 h1:IOxTQD2u4dPWKmSaLWZVlakyzMGPb6cP2G6/fmZMkkc=
github.com/lyft/flyteplugins v0.3.9/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flyteplugins v0.3.10 h1:AwGv0TKl/RTOESY0+P5v9gyjO67LeHXHZqiMVaa+X4w=
github.com/lyft/flyteplugins v0.3.10/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ type MutableNodeStatus interface {
SetParentTaskID(t *core.TaskExecutionIdentifier)
UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string)
IncrementAttempts() uint32
IncrementSystemFailures() uint32
SetCached()
ResetDirty()

Expand Down Expand Up @@ -254,6 +255,7 @@ type ExecutableNodeStatus interface {
GetOutputDir() DataReference
GetMessage() string
GetAttempts() uint32
GetSystemFailures() uint32
GetWorkflowNodeStatus() ExecutableWorkflowNodeStatus
GetTaskNodeStatus() ExecutableTaskNodeStatus

Expand Down
64 changes: 64 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ type NodeStatus struct {
DataDir DataReference `json:"-"`
OutputDir DataReference `json:"-"`
Attempts uint32 `json:"attempts"`
SystemFailures uint32 `json:"systemFailures,omitempty"`
Cached bool `json:"cached"`

// This is useful only for branch nodes. If this is set, then it can be used to determine if execution can proceed
Expand Down Expand Up @@ -289,6 +290,10 @@ func (in *NodeStatus) GetAttempts() uint32 {
return in.Attempts
}

func (in *NodeStatus) GetSystemFailures() uint32 {
return in.SystemFailures
}

func (in *NodeStatus) SetCached() {
in.Cached = true
in.SetDirty()
Expand All @@ -304,6 +309,12 @@ func (in *NodeStatus) IncrementAttempts() uint32 {
return in.Attempts
}

func (in *NodeStatus) IncrementSystemFailures() uint32 {
in.SystemFailures++
in.SetDirty()
return in.SystemFailures
}

func (in *NodeStatus) GetOrCreateDynamicNodeStatus() MutableDynamicNodeStatus {
if in.DynamicNodeStatus == nil {
in.SetDirty()
Expand Down Expand Up @@ -559,6 +570,10 @@ func (in *NodeStatus) Equals(other *NodeStatus) bool {
return false
}

if in.SystemFailures != other.SystemFailures {
return false
}

if in.Phase != other.Phase {
return false
}
Expand Down
19 changes: 14 additions & 5 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ var (
RenewDeadline: config.Duration{Duration: time.Second * 10},
RetryPeriod: config.Duration{Duration: time.Second * 2},
},
DefaultDeadlines: DefaultDeadlines{
DefaultNodeExecutionDeadline: config.Duration{Duration: time.Hour * 48},
DefaultNodeActiveDeadline: config.Duration{Duration: time.Hour * 48},
DefaultWorkflowActiveDeadline: config.Duration{Duration: time.Hour * 72},
NodeConfig: NodeConfig{
DefaultDeadlines: DefaultDeadlines{
DefaultNodeExecutionDeadline: config.Duration{Duration: time.Hour * 48},
DefaultNodeActiveDeadline: config.Duration{Duration: time.Hour * 48},
DefaultWorkflowActiveDeadline: config.Duration{Duration: time.Hour * 72},
},
MaxNodeRetriesForSystemFailures: 3,
},
}
)
Expand All @@ -67,7 +70,7 @@ type Config struct {
PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."`
MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"`
KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"`
DefaultDeadlines DefaultDeadlines `json:"default-deadlines,omitempty" pflag:",Default value for timeouts"`
NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"`
}

type KubeClientConfig struct {
Expand Down Expand Up @@ -115,6 +118,12 @@ type WorkqueueConfig struct {
Capacity int `json:"capacity" pflag:",Bucket capacity as number of items"`
}

// configuration for a node
type NodeConfig struct {
DefaultDeadlines DefaultDeadlines `json:"default-deadlines,omitempty" pflag:",Default value for timeouts"`
MaxNodeRetriesForSystemFailures uint32 `json:"max-node-retries-system-failures" pflag:"2,Maximum number of retries per node for node failure due to infra issues"`
}

// Contains default values for timeouts
type DefaultDeadlines struct {
DefaultNodeExecutionDeadline config.Duration `json:"node-execution-deadline" pflag:",Default value of node execution timeout"`
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store")
}

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.DefaultDeadlines, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, kubeClient, catalogClient, scope)
nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, kubeClient, catalogClient, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
}
Expand Down
Loading

0 comments on commit 3a8ddf0

Please sign in to comment.