Skip to content

Commit

Permalink
BugFix: Timeout should cause the existing execution to abort and thes…
Browse files Browse the repository at this point in the history
…e should be finite (flyteorg#71)

Bug 1; Execution Timeout currently has a bug such that the existing execution is not aborted. Thus the execution continues to run in the background potentially consuming resources.

Bug 2; On a task timeout the UI shows the existing task as running and a new execution begins

Bug 3; Timeouts could cause infinite retries, as it bypassed the retry logic

Solution
Every timeout is treated like a retryable failure, and every retryable failure enforces an abort to be invoked. Abort was already expected to be idempotent, thus invoking it for a terminated execution should not cause side-effects

+improvements
Finalize Child workflow always, even if finalizing parent fails

Unit tests:
Updated unit tests and add new unit tests
  • Loading branch information
Ketan Umare authored Feb 20, 2020
1 parent b755063 commit 6e5cd97
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 29 deletions.
33 changes: 21 additions & 12 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,26 +191,35 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx handler.Node

// This is a weird method. We should always finalize before we set the dynamic parent node phase as complete?
func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.NodeExecutionContext) error {
// We should always finalize the parent node success of failure.
// If we use the state to decide the finalize then we will never invoke the finalizer for the parent.
logger.Infof(ctx, "Finalizing Parent node")
if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil {
logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.")
return err
}
errs := make([]error, 0, 2)

ds := nCtx.NodeStateReader().GetDynamicNodeState()
if ds.Phase == v1alpha1.DynamicNodePhaseFailing || ds.Phase == v1alpha1.DynamicNodePhaseExecuting {
logger.Infof(ctx, "Finalizing dynamic workflow")
dynamicWF, isDynamic, err := d.buildContextualDynamicWorkflow(ctx, nCtx)
if err != nil {
return err
errs = append(errs, err)
} else {
if isDynamic {
if err := d.nodeExecutor.FinalizeHandler(ctx, dynamicWF, dynamicWF.StartNode()); err != nil {
logger.Errorf(ctx, "failed to finalize dynamic workflow, err: %s", err)
errs = append(errs, err)
}
}
}
}

if !isDynamic {
return nil
}
return d.nodeExecutor.FinalizeHandler(ctx, dynamicWF, dynamicWF.StartNode())
// We should always finalize the parent node success or failure.
// If we use the phase to decide when to finalize in the case where Dynamic node is in phase Executiing
// (i.e. child nodes are now being executed) and Finalize is invoked, we will never invoke the finalizer for the parent.
logger.Infof(ctx, "Finalizing Parent node")
if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil {
logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.")
errs = append(errs, err)
}

if len(errs) > 0 {
return errors.ErrorCollection{Errors: errs}
}

return nil
Expand Down
40 changes: 40 additions & 0 deletions pkg/controller/nodes/dynamic/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,46 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) {
assert.NotZero(t, len(n.ExpectedCalls))
assert.Equal(t, "FinalizeHandler", n.ExpectedCalls[0].Method)
})

t.Run("dynamicnodephase-executing-parenterror", func(t *testing.T) {

nCtx := createNodeContext("test", "x")
f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetOutputDir(), "futures.pb")
assert.NoError(t, err)
dj := createDynamicJobSpec()
assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj))

h := &mocks.TaskNodeHandler{}
h.OnFinalize(ctx, nCtx).Return(fmt.Errorf("err"))
n := &executorMocks.Node{}
n.OnFinalizeHandlerMatch(ctx, mock.Anything, mock.Anything).Return(nil)
d := New(h, n, promutils.NewTestScope())
assert.Error(t, d.Finalize(ctx, nCtx))
assert.NotZero(t, len(h.ExpectedCalls))
assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method)
assert.NotZero(t, len(n.ExpectedCalls))
assert.Equal(t, "FinalizeHandler", n.ExpectedCalls[0].Method)
})

t.Run("dynamicnodephase-executing-childerror", func(t *testing.T) {

nCtx := createNodeContext("test", "x")
f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetOutputDir(), "futures.pb")
assert.NoError(t, err)
dj := createDynamicJobSpec()
assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj))

h := &mocks.TaskNodeHandler{}
h.OnFinalize(ctx, nCtx).Return(nil)
n := &executorMocks.Node{}
n.OnFinalizeHandlerMatch(ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("err"))
d := New(h, n, promutils.NewTestScope())
assert.Error(t, d.Finalize(ctx, nCtx))
assert.NotZero(t, len(h.ExpectedCalls))
assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method)
assert.NotZero(t, len(n.ExpectedCalls))
assert.Equal(t, "FinalizeHandler", n.ExpectedCalls[0].Method)
})
}

func init() {
Expand Down
27 changes: 15 additions & 12 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@ import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/lyft/flytepropeller/pkg/controller/config"
"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/lyft/flytestdlib/storage"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/lyft/flytepropeller/pkg/controller/config"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/executors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/errors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/handler"
"github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type nodeMetrics struct {
Expand Down Expand Up @@ -179,9 +181,9 @@ func (c *nodeExecutor) execute(ctx context.Context, h handler.Node, nCtx *execCo
return handler.PhaseInfoUndefined, err
}

phase := t.Info().GetPhase()
phase := t.Info()
// check for timeout for non-terminal phases
if !phase.IsTerminal() {
if !phase.GetPhase().IsTerminal() {
activeDeadline := c.defaultActiveDeadline
if nCtx.Node().GetActiveDeadline() != nil {
activeDeadline = *nCtx.Node().GetActiveDeadline()
Expand All @@ -198,11 +200,11 @@ func (c *nodeExecutor) execute(ctx context.Context, h handler.Node, nCtx *execCo
}
if c.isTimeoutExpired(nodeStatus.GetLastAttemptStartedAt(), executionDeadline) {
logger.Errorf(ctx, "Current execution for the node timed out; timeout configured: %v", executionDeadline)
return handler.PhaseInfoRetryableFailure("TimeOut", "node execution timed out", nil), nil
phase = handler.PhaseInfoRetryableFailure("TimeoutExpired", fmt.Sprintf("task execution timeout [%s] expired", executionDeadline.String()), nil)
}
}

if t.Info().GetPhase() == handler.EPhaseRetryableFailure {
if phase.GetPhase() == handler.EPhaseRetryableFailure {
maxAttempts := uint32(0)
if nCtx.Node().GetRetryStrategy() != nil && nCtx.Node().GetRetryStrategy().MinAttempts != nil {
maxAttempts = uint32(*nCtx.Node().GetRetryStrategy().MinAttempts)
Expand All @@ -211,17 +213,17 @@ func (c *nodeExecutor) execute(ctx context.Context, h handler.Node, nCtx *execCo
attempts := nodeStatus.GetAttempts() + 1
if attempts >= maxAttempts {
return handler.PhaseInfoFailure(
fmt.Sprintf("RetriesExhausted|%s", t.Info().GetErr().Code),
fmt.Sprintf("[%d/%d] attempts done. Last Error: %s", attempts, maxAttempts, t.Info().GetErr().Message),
t.Info().GetInfo(),
fmt.Sprintf("RetriesExhausted|%s", phase.GetErr().Code),
fmt.Sprintf("[%d/%d] attempts done. Last Error: %s", attempts, maxAttempts, phase.GetErr().Message),
phase.GetInfo(),
), nil
}

// Retrying to clearing all status
nCtx.nsm.clearNodeStatus()
}

return t.Info(), nil
return phase, nil
}

func (c *nodeExecutor) abort(ctx context.Context, h handler.Node, nCtx handler.NodeExecutionContext, reason string) error {
Expand Down Expand Up @@ -355,10 +357,11 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork
}

if currentPhase == v1alpha1.NodePhaseRetryableFailure {
logger.Debugf(ctx, "node failed with retryable failure, finalizing")
if err := c.finalize(ctx, h, nCtx); err != nil {
logger.Debugf(ctx, "node failed with retryable failure, aborting and finalizing, message: %s", nodeStatus.GetMessage())
if err := c.abort(ctx, h, nCtx, nodeStatus.GetMessage()); err != nil {
return executors.NodeStatusUndefined, err
}

nodeStatus.IncrementAttempts()
nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, v1.Now(), "retrying")
// We are going to retry in the next round, so we should clear all current state
Expand Down
24 changes: 20 additions & 4 deletions pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nodes

import (
"context"
"errors"
"fmt"
"reflect"
"testing"
Expand All @@ -26,8 +27,6 @@ import (
"github.com/lyft/flytestdlib/promutils"
"github.com/stretchr/testify/assert"

"errors"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/config"
"github.com/lyft/flytepropeller/pkg/controller/executors"
Expand Down Expand Up @@ -790,8 +789,10 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
).Return(test.handlerReturn())
h.On("FinalizeRequired").Return(true)
if test.currentNodePhase == v1alpha1.NodePhaseRetryableFailure {
h.OnAbortMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)
h.On("Finalize", mock.Anything, mock.Anything).Return(nil)
} else {
h.OnAbortMatch(mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
h.On("Finalize", mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
}
hf.On("GetHandler", v1alpha1.NodeKindTask).Return(h, nil)
Expand Down Expand Up @@ -1148,6 +1149,7 @@ func Test_nodeExecutor_timeout(t *testing.T) {
expectedPhase handler.EPhase
activeDeadline time.Duration
executionDeadline time.Duration
retries int
err error
}{
{
Expand All @@ -1164,6 +1166,16 @@ func Test_nodeExecutor_timeout(t *testing.T) {
expectedPhase: handler.EPhaseRetryableFailure,
activeDeadline: time.Second * 15,
executionDeadline: time.Second * 5,
retries: 2,
err: nil,
},
{
name: "retries-exhausted",
phaseInfo: handler.PhaseInfoRunning(nil),
expectedPhase: handler.EPhaseFailed,
activeDeadline: time.Second * 15,
executionDeadline: time.Second * 5,
retries: 1,
err: nil,
},
{
Expand Down Expand Up @@ -1197,6 +1209,9 @@ func Test_nodeExecutor_timeout(t *testing.T) {
queuedAtTime := &v1.Time{Time: queuedAt}
ns.On("GetQueuedAt").Return(queuedAtTime)
ns.On("GetLastAttemptStartedAt").Return(queuedAtTime)
ns.OnGetAttempts().Return(0)
ns.On("ClearLastAttemptStartedAt").Return()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &nodeExecutor{}
Expand Down Expand Up @@ -1224,16 +1239,17 @@ func Test_nodeExecutor_timeout(t *testing.T) {
mockNode.On("GetInputBindings").Return([]*v1alpha1.Binding{})
mockNode.On("GetActiveDeadline").Return(&tt.activeDeadline)
mockNode.On("GetExecutionDeadline").Return(&tt.executionDeadline)
mockNode.OnGetRetryStrategy().Return(&v1alpha1.RetryStrategy{MinAttempts: &tt.retries})

nCtx := &execContext{node: mockNode, nsm: &nodeStateManager{}}
nCtx := &execContext{node: mockNode, nsm: &nodeStateManager{nodeStatus: ns}}
phaseInfo, err := c.execute(context.TODO(), h, nCtx, ns)

if tt.err != nil {
assert.EqualError(t, err, tt.err.Error())
} else {
assert.NoError(t, err)
}
assert.Equal(t, tt.expectedPhase, phaseInfo.GetPhase())
assert.Equal(t, tt.expectedPhase.String(), phaseInfo.GetPhase().String())
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func ToError(executionError *core.ExecutionError, reason string) string {
if reason != "" {
return reason
}
return "unknown error"
return "unknown"
}

func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateManager, s v1alpha1.ExecutableNodeStatus) {
Expand Down

0 comments on commit 6e5cd97

Please sign in to comment.