diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index 4da147c3bf..88ba212e76 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -191,26 +191,35 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx handler.Node // This is a weird method. We should always finalize before we set the dynamic parent node phase as complete? func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.NodeExecutionContext) error { - // We should always finalize the parent node success of failure. - // If we use the state to decide the finalize then we will never invoke the finalizer for the parent. - logger.Infof(ctx, "Finalizing Parent node") - if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil { - logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.") - return err - } + errs := make([]error, 0, 2) ds := nCtx.NodeStateReader().GetDynamicNodeState() if ds.Phase == v1alpha1.DynamicNodePhaseFailing || ds.Phase == v1alpha1.DynamicNodePhaseExecuting { logger.Infof(ctx, "Finalizing dynamic workflow") dynamicWF, isDynamic, err := d.buildContextualDynamicWorkflow(ctx, nCtx) if err != nil { - return err + errs = append(errs, err) + } else { + if isDynamic { + if err := d.nodeExecutor.FinalizeHandler(ctx, dynamicWF, dynamicWF.StartNode()); err != nil { + logger.Errorf(ctx, "failed to finalize dynamic workflow, err: %s", err) + errs = append(errs, err) + } + } } + } - if !isDynamic { - return nil - } - return d.nodeExecutor.FinalizeHandler(ctx, dynamicWF, dynamicWF.StartNode()) + // We should always finalize the parent node success or failure. + // If we use the phase to decide when to finalize in the case where Dynamic node is in phase Executiing + // (i.e. child nodes are now being executed) and Finalize is invoked, we will never invoke the finalizer for the parent. + logger.Infof(ctx, "Finalizing Parent node") + if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil { + logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.") + errs = append(errs, err) + } + + if len(errs) > 0 { + return errors.ErrorCollection{Errors: errs} } return nil diff --git a/pkg/controller/nodes/dynamic/handler_test.go b/pkg/controller/nodes/dynamic/handler_test.go index 934ca3cb5a..65251616f7 100644 --- a/pkg/controller/nodes/dynamic/handler_test.go +++ b/pkg/controller/nodes/dynamic/handler_test.go @@ -595,6 +595,46 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { assert.NotZero(t, len(n.ExpectedCalls)) assert.Equal(t, "FinalizeHandler", n.ExpectedCalls[0].Method) }) + + t.Run("dynamicnodephase-executing-parenterror", func(t *testing.T) { + + nCtx := createNodeContext("test", "x") + f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetOutputDir(), "futures.pb") + assert.NoError(t, err) + dj := createDynamicJobSpec() + assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj)) + + h := &mocks.TaskNodeHandler{} + h.OnFinalize(ctx, nCtx).Return(fmt.Errorf("err")) + n := &executorMocks.Node{} + n.OnFinalizeHandlerMatch(ctx, mock.Anything, mock.Anything).Return(nil) + d := New(h, n, promutils.NewTestScope()) + assert.Error(t, d.Finalize(ctx, nCtx)) + assert.NotZero(t, len(h.ExpectedCalls)) + assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method) + assert.NotZero(t, len(n.ExpectedCalls)) + assert.Equal(t, "FinalizeHandler", n.ExpectedCalls[0].Method) + }) + + t.Run("dynamicnodephase-executing-childerror", func(t *testing.T) { + + nCtx := createNodeContext("test", "x") + f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetOutputDir(), "futures.pb") + assert.NoError(t, err) + dj := createDynamicJobSpec() + assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj)) + + h := &mocks.TaskNodeHandler{} + h.OnFinalize(ctx, nCtx).Return(nil) + n := &executorMocks.Node{} + n.OnFinalizeHandlerMatch(ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("err")) + d := New(h, n, promutils.NewTestScope()) + assert.Error(t, d.Finalize(ctx, nCtx)) + assert.NotZero(t, len(h.ExpectedCalls)) + assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method) + assert.NotZero(t, len(n.ExpectedCalls)) + assert.Equal(t, "FinalizeHandler", n.ExpectedCalls[0].Method) + }) } func init() { diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 5714147832..721ac2e4ee 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -13,7 +13,6 @@ import ( "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/event" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - "github.com/lyft/flytepropeller/pkg/controller/config" "github.com/lyft/flytestdlib/contextutils" "github.com/lyft/flytestdlib/logger" "github.com/lyft/flytestdlib/promutils" @@ -21,12 +20,15 @@ import ( "github.com/lyft/flytestdlib/storage" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/lyft/flytepropeller/pkg/controller/config" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/lyft/flytepropeller/pkg/controller/executors" "github.com/lyft/flytepropeller/pkg/controller/nodes/errors" "github.com/lyft/flytepropeller/pkg/controller/nodes/handler" "github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type nodeMetrics struct { @@ -179,9 +181,9 @@ func (c *nodeExecutor) execute(ctx context.Context, h handler.Node, nCtx *execCo return handler.PhaseInfoUndefined, err } - phase := t.Info().GetPhase() + phase := t.Info() // check for timeout for non-terminal phases - if !phase.IsTerminal() { + if !phase.GetPhase().IsTerminal() { activeDeadline := c.defaultActiveDeadline if nCtx.Node().GetActiveDeadline() != nil { activeDeadline = *nCtx.Node().GetActiveDeadline() @@ -198,11 +200,11 @@ func (c *nodeExecutor) execute(ctx context.Context, h handler.Node, nCtx *execCo } if c.isTimeoutExpired(nodeStatus.GetLastAttemptStartedAt(), executionDeadline) { logger.Errorf(ctx, "Current execution for the node timed out; timeout configured: %v", executionDeadline) - return handler.PhaseInfoRetryableFailure("TimeOut", "node execution timed out", nil), nil + phase = handler.PhaseInfoRetryableFailure("TimeoutExpired", fmt.Sprintf("task execution timeout [%s] expired", executionDeadline.String()), nil) } } - if t.Info().GetPhase() == handler.EPhaseRetryableFailure { + if phase.GetPhase() == handler.EPhaseRetryableFailure { maxAttempts := uint32(0) if nCtx.Node().GetRetryStrategy() != nil && nCtx.Node().GetRetryStrategy().MinAttempts != nil { maxAttempts = uint32(*nCtx.Node().GetRetryStrategy().MinAttempts) @@ -211,9 +213,9 @@ func (c *nodeExecutor) execute(ctx context.Context, h handler.Node, nCtx *execCo attempts := nodeStatus.GetAttempts() + 1 if attempts >= maxAttempts { return handler.PhaseInfoFailure( - fmt.Sprintf("RetriesExhausted|%s", t.Info().GetErr().Code), - fmt.Sprintf("[%d/%d] attempts done. Last Error: %s", attempts, maxAttempts, t.Info().GetErr().Message), - t.Info().GetInfo(), + fmt.Sprintf("RetriesExhausted|%s", phase.GetErr().Code), + fmt.Sprintf("[%d/%d] attempts done. Last Error: %s", attempts, maxAttempts, phase.GetErr().Message), + phase.GetInfo(), ), nil } @@ -221,7 +223,7 @@ func (c *nodeExecutor) execute(ctx context.Context, h handler.Node, nCtx *execCo nCtx.nsm.clearNodeStatus() } - return t.Info(), nil + return phase, nil } func (c *nodeExecutor) abort(ctx context.Context, h handler.Node, nCtx handler.NodeExecutionContext, reason string) error { @@ -355,10 +357,11 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork } if currentPhase == v1alpha1.NodePhaseRetryableFailure { - logger.Debugf(ctx, "node failed with retryable failure, finalizing") - if err := c.finalize(ctx, h, nCtx); err != nil { + logger.Debugf(ctx, "node failed with retryable failure, aborting and finalizing, message: %s", nodeStatus.GetMessage()) + if err := c.abort(ctx, h, nCtx, nodeStatus.GetMessage()); err != nil { return executors.NodeStatusUndefined, err } + nodeStatus.IncrementAttempts() nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, v1.Now(), "retrying") // We are going to retry in the next round, so we should clear all current state diff --git a/pkg/controller/nodes/executor_test.go b/pkg/controller/nodes/executor_test.go index 41812d86e6..eb118274eb 100644 --- a/pkg/controller/nodes/executor_test.go +++ b/pkg/controller/nodes/executor_test.go @@ -2,6 +2,7 @@ package nodes import ( "context" + "errors" "fmt" "reflect" "testing" @@ -26,8 +27,6 @@ import ( "github.com/lyft/flytestdlib/promutils" "github.com/stretchr/testify/assert" - "errors" - "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/lyft/flytepropeller/pkg/controller/config" "github.com/lyft/flytepropeller/pkg/controller/executors" @@ -790,8 +789,10 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { ).Return(test.handlerReturn()) h.On("FinalizeRequired").Return(true) if test.currentNodePhase == v1alpha1.NodePhaseRetryableFailure { + h.OnAbortMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) h.On("Finalize", mock.Anything, mock.Anything).Return(nil) } else { + h.OnAbortMatch(mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error")) h.On("Finalize", mock.Anything, mock.Anything).Return(fmt.Errorf("error")) } hf.On("GetHandler", v1alpha1.NodeKindTask).Return(h, nil) @@ -1148,6 +1149,7 @@ func Test_nodeExecutor_timeout(t *testing.T) { expectedPhase handler.EPhase activeDeadline time.Duration executionDeadline time.Duration + retries int err error }{ { @@ -1164,6 +1166,16 @@ func Test_nodeExecutor_timeout(t *testing.T) { expectedPhase: handler.EPhaseRetryableFailure, activeDeadline: time.Second * 15, executionDeadline: time.Second * 5, + retries: 2, + err: nil, + }, + { + name: "retries-exhausted", + phaseInfo: handler.PhaseInfoRunning(nil), + expectedPhase: handler.EPhaseFailed, + activeDeadline: time.Second * 15, + executionDeadline: time.Second * 5, + retries: 1, err: nil, }, { @@ -1197,6 +1209,9 @@ func Test_nodeExecutor_timeout(t *testing.T) { queuedAtTime := &v1.Time{Time: queuedAt} ns.On("GetQueuedAt").Return(queuedAtTime) ns.On("GetLastAttemptStartedAt").Return(queuedAtTime) + ns.OnGetAttempts().Return(0) + ns.On("ClearLastAttemptStartedAt").Return() + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := &nodeExecutor{} @@ -1224,8 +1239,9 @@ func Test_nodeExecutor_timeout(t *testing.T) { mockNode.On("GetInputBindings").Return([]*v1alpha1.Binding{}) mockNode.On("GetActiveDeadline").Return(&tt.activeDeadline) mockNode.On("GetExecutionDeadline").Return(&tt.executionDeadline) + mockNode.OnGetRetryStrategy().Return(&v1alpha1.RetryStrategy{MinAttempts: &tt.retries}) - nCtx := &execContext{node: mockNode, nsm: &nodeStateManager{}} + nCtx := &execContext{node: mockNode, nsm: &nodeStateManager{nodeStatus: ns}} phaseInfo, err := c.execute(context.TODO(), h, nCtx, ns) if tt.err != nil { @@ -1233,7 +1249,7 @@ func Test_nodeExecutor_timeout(t *testing.T) { } else { assert.NoError(t, err) } - assert.Equal(t, tt.expectedPhase, phaseInfo.GetPhase()) + assert.Equal(t, tt.expectedPhase.String(), phaseInfo.GetPhase().String()) }) } } diff --git a/pkg/controller/nodes/transformers.go b/pkg/controller/nodes/transformers.go index cb81b6de54..f6878a2455 100644 --- a/pkg/controller/nodes/transformers.go +++ b/pkg/controller/nodes/transformers.go @@ -130,7 +130,7 @@ func ToError(executionError *core.ExecutionError, reason string) string { if reason != "" { return reason } - return "unknown error" + return "unknown" } func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateManager, s v1alpha1.ExecutableNodeStatus) {