Skip to content

Commit

Permalink
Use PluginCleanupPolicy from flyteorg/flyteplugins#203 (flyteorg#311)
Browse files Browse the repository at this point in the history
* use PluginCleanupPolicy if it exists

Signed-off-by: Claire McGinty <[email protected]>

* add unit test

Signed-off-by: Claire McGinty <[email protected]>

* cleanup test code

Signed-off-by: Claire McGinty <[email protected]>

* assert OnAbort is not attempted by default

Signed-off-by: Claire McGinty <[email protected]>

* update plugin override interface

Signed-off-by: Claire McGinty <[email protected]>

* lint

Signed-off-by: Claire McGinty <[email protected]>

* Apply PR suggestions

Signed-off-by: Claire McGinty <[email protected]>

* update flyteplugins lib

Signed-off-by: Claire McGinty <[email protected]>
  • Loading branch information
clairemcginty authored Sep 14, 2021
1 parent 8fc6638 commit a1d959b
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.21.0
github.com/flyteorg/flyteplugins v0.6.0
github.com/flyteorg/flyteplugins v0.6.1
github.com/flyteorg/flytestdlib v0.3.34
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.21.0 h1:AwHNusfxJMfRRSDk2QWfb3aIlyLJrFWVGtpXCbCtJ5A=
github.com/flyteorg/flyteidl v0.21.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.6.0 h1:eoMmqJIw3K+J4JWokDcd4Y1YGLiicE6p5vEYhOUHZ4s=
github.com/flyteorg/flyteplugins v0.6.0/go.mod h1:rPzV/KS6h0BkgK0Z+CnO6JjY58tzUdYvDLMYS10IKG0=
github.com/flyteorg/flyteplugins v0.6.1 h1:Mq9uM/IN6fXHo03NlXSa+to2GHEom2NAcRWlr+bVH6g=
github.com/flyteorg/flyteplugins v0.6.1/go.mod h1:rPzV/KS6h0BkgK0Z+CnO6JjY58tzUdYvDLMYS10IKG0=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.3.33/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q=
github.com/flyteorg/flytestdlib v0.3.34 h1:OOuV03X8c1AWInzBU6IRsqpEF6y8WDJngbPcdL4VktY=
Expand Down
34 changes: 32 additions & 2 deletions pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,40 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution

e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())

err = e.kubeClient.GetClient().Delete(ctx, o)
deleteResource := true
abortOverride, hasAbortOverride := e.plugin.(k8s.PluginAbortOverride)

resourceToFinalize := o
var behavior k8s.AbortBehavior

if hasAbortOverride {
behavior, err = abortOverride.OnAbort(ctx, tCtx, o)
deleteResource = err == nil && behavior.DeleteResource
if err == nil && behavior.Resource != nil {
resourceToFinalize = behavior.Resource
}
}

if err != nil {
} else if deleteResource {
err = e.kubeClient.GetClient().Delete(ctx, resourceToFinalize)
} else {
if behavior.Patch != nil && behavior.Update == nil {
err = e.kubeClient.GetClient().Patch(ctx, resourceToFinalize, behavior.Patch.Patch, behavior.Patch.Options...)
} else if behavior.Patch == nil && behavior.Update != nil {
err = e.kubeClient.GetClient().Update(ctx, resourceToFinalize, behavior.Update.Options...)
} else {
err = errors.Errorf(errors.RuntimeFailure, "AbortBehavior for resource %v must specify either a Patch and an Update operation if Delete is set to false. Only one can be supplied.", resourceToFinalize.GetName())
}
if behavior.DeleteOnErr && err != nil {
logger.Warningf(ctx, "Failed to apply AbortBehavior for resource %v with error %v. Will attempt to delete resource.", resourceToFinalize.GetName(), err)
err = e.kubeClient.GetClient().Delete(ctx, resourceToFinalize)
}
}

if err != nil && !IsK8sObjectNotExists(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v",
o.GetNamespace(), o.GetName(), err)
resourceToFinalize.GetNamespace(), resourceToFinalize.GetName(), err)
return err
}

Expand Down
136 changes: 136 additions & 0 deletions pkg/controller/nodes/task/k8s/plugin_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type extendedFakeClient struct {
CreateError error
GetError error
DeleteError error
PatchError error
UpdateError error
}

func (e extendedFakeClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
Expand All @@ -69,6 +71,22 @@ func (e extendedFakeClient) Delete(ctx context.Context, obj client.Object, opts
return e.Client.Delete(ctx, obj, opts...)
}

func (e extendedFakeClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
if e.PatchError != nil {
return e.PatchError
}

return e.Client.Patch(ctx, obj, patch, opts...)
}

func (e extendedFakeClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
if e.UpdateError != nil {
return e.UpdateError
}

return e.Client.Update(ctx, obj, opts...)
}

type k8sSampleHandler struct {
}

Expand All @@ -88,6 +106,32 @@ func (k8sSampleHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.Plug
panic("implement me")
}

type pluginWithAbortOverride struct {
mock.Mock
}

func (p *pluginWithAbortOverride) GetProperties() k8s.PluginProperties {
return p.Called().Get(0).(k8s.PluginProperties)
}

func (p *pluginWithAbortOverride) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) {
panic("implement me")
}

func (p *pluginWithAbortOverride) BuildIdentityResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionMetadata) (client.Object, error) {
args := p.Called(ctx, taskCtx)
return args.Get(0).(client.Object), args.Error(1)
}

func (p *pluginWithAbortOverride) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) {
panic("implement me")
}

func (p *pluginWithAbortOverride) OnAbort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, resource client.Object) (behavior k8s.AbortBehavior, err error) {
args := p.Called(ctx, tCtx, resource)
return args.Get(0).(k8s.AbortBehavior), args.Error(1)
}

func ExampleNewPluginManager() {
sCtx := &pluginsCoreMock.SetupContext{}
fakeKubeClient := mocks.NewFakeKubeClient()
Expand Down Expand Up @@ -203,6 +247,32 @@ func dummySetupContext(fakeClient client.Client) pluginsCore.SetupContext {
return setupContext
}

func buildPluginWithAbortOverride(ctx context.Context, tctx pluginsCore.TaskExecutionContext, abortBehavior k8s.AbortBehavior, client client.Client) (*PluginManager, error) {
pluginResource := &v1.Pod{}

mockResourceHandler := new(pluginWithAbortOverride)

mockResourceHandler.On(
"OnAbort", ctx, tctx, pluginResource,
).Return(abortBehavior, nil)

mockResourceHandler.On(
"BuildIdentityResource", ctx, tctx.TaskExecutionMetadata(),
).Return(pluginResource, nil)

mockResourceHandler.On("GetProperties").Return(k8s.PluginProperties{})

mockClient := extendedFakeClient{
Client: client,
}

return NewPluginManager(ctx, dummySetupContext(mockClient), k8s.PluginEntry{
ID: "x",
ResourceToWatch: pluginResource,
Plugin: mockResourceHandler,
}, NewResourceMonitorIndex())
}

func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
ctx := context.TODO()
/*var tmpl *core.TaskTemplate
Expand Down Expand Up @@ -419,6 +489,9 @@ func TestPluginManager_Abort(t *testing.T) {

err = pluginManager.Abort(ctx, tctx)
assert.NoError(t, err)

// no custom cleanup policy has been specified
mockResourceHandler.AssertNumberOfCalls(t, "OnAbort", 0)
})

t.Run("Abort Pod doesn't exist", func(t *testing.T) {
Expand All @@ -441,6 +514,69 @@ func TestPluginManager_Abort(t *testing.T) {
err = pluginManager.Abort(ctx, tctx)
assert.NoError(t, err)
})

t.Run("Abort Plugin has Patch PluginAbortOverride", func(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
expectedErr := errors.New("client-side patch error")
pluginManager, err := buildPluginWithAbortOverride(
ctx,
tctx,
k8s.AbortBehaviorPatchDefaultResource(k8s.PatchResourceOperation{
Patch: nil,
Options: nil,
}, false),
extendedFakeClient{
DeleteError: errors.New(
"kubeClient.Delete() should not be called if custom cleanup policy exists"),
PatchError: expectedErr,
})

assert.NotNil(t, res)
assert.NoError(t, err)

err = pluginManager.Abort(ctx, tctx)
assert.Equal(t, expectedErr, err)
})

t.Run("Abort Plugin has Update PluginAbortOverride", func(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
expectedErr := errors.New("client-side update error")
pluginManager, err := buildPluginWithAbortOverride(
ctx,
tctx,
k8s.AbortBehaviorUpdateDefaultResource(k8s.UpdateResourceOperation{
Options: nil,
}, false),
extendedFakeClient{
DeleteError: errors.New(
"kubeClient.Delete() should not be called if custom cleanup policy exists"),
UpdateError: expectedErr,
})

assert.NotNil(t, res)
assert.NoError(t, err)

err = pluginManager.Abort(ctx, tctx)
assert.Equal(t, expectedErr, err)
})

t.Run("Abort Plugin has Delete PluginAbortOverride", func(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
expectedErr := errors.New("client-side delete error")
pluginManager, err := buildPluginWithAbortOverride(
ctx,
tctx,
k8s.AbortBehaviorDeleteDefaultResource(),
extendedFakeClient{
DeleteError: expectedErr,
})

assert.NotNil(t, res)
assert.NoError(t, err)

err = pluginManager.Abort(ctx, tctx)
assert.Equal(t, expectedErr, err)
})
}

func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) {
Expand Down

0 comments on commit a1d959b

Please sign in to comment.