Skip to content

Commit

Permalink
Add execution configuration as a matchable resource (flyteorg#204)
Browse files Browse the repository at this point in the history
Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
Katrina Rogan authored Jun 4, 2021
1 parent 86bac97 commit 16a9b13
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 20 deletions.
2 changes: 1 addition & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.18.50
github.com/flyteorg/flyteidl v0.18.54
github.com/flyteorg/flyteplugins v0.5.49
github.com/flyteorg/flytepropeller v0.10.16
github.com/flyteorg/flytestdlib v0.3.22
Expand Down
5 changes: 5 additions & 0 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
Expand Down Expand Up @@ -306,6 +307,8 @@ github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
github.com/flyteorg/flyteidl v0.18.48/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.18.50 h1:L1fMj6QEXoKin+cPQn9sfwJ1x14tlChdz1mG1WaaIW4=
github.com/flyteorg/flyteidl v0.18.50/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.18.54 h1:OWgA9LUqH7rTLd8DywFzfm+LkTtLzrPztP5JaO/4hpM=
github.com/flyteorg/flyteidl v0.18.54/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.5.49 h1:jqmNrsTQ2+m+vYKqDVNO3CYy9q3XYTms3XzOr3roAT0=
github.com/flyteorg/flyteplugins v0.5.49/go.mod h1:567WIA0Rr6QjmXsqvGsU+Cyb57Ia6qzddxIw//RPwYk=
github.com/flyteorg/flytepropeller v0.10.16 h1:WSVh0X0F9xSw1BxGKbHqu0oha37YaBmO3bOS7GjR3Qo=
Expand Down Expand Up @@ -1392,6 +1395,7 @@ go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
Expand Down Expand Up @@ -2030,6 +2034,7 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0=
k8s.io/api v0.0.0-20210217171935-8e2decd92398/go.mod h1:60tmSUpHxGPFerNHbo/ayI2lKxvtrhbxFyXuEIWJd78=
Expand Down
54 changes: 41 additions & 13 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,24 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request
return parentNodeExecutionID, sourceExecutionID, nil
}

func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admin.ExecutionCreateRequest) (*admin.WorkflowExecutionConfig, error) {
resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: request.Project,
Domain: request.Domain,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
if err != nil {
if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
logger.Errorf(ctx, "Failed to get workflow execution config overrides with error: %v", err)
return nil, err
}
}
if resource != nil && resource.Attributes.GetWorkflowExecutionConfig() != nil {
return resource.Attributes.GetWorkflowExecutionConfig(), nil
}
return nil, nil
}

func (m *ExecutionManager) launchSingleTaskExecution(
ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) (
context.Context, *models.Execution, error) {
Expand Down Expand Up @@ -494,14 +512,19 @@ func (m *ExecutionManager) launchSingleTaskExecution(
logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err)
return nil, nil, err
}
executionConfig, err := m.getExecutionConfig(ctx, &request)
if err != nil {
return nil, nil, err
}
executeTaskInputs := workflowengineInterfaces.ExecuteTaskInput{
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Inputs: request.Inputs,
ReferenceName: taskIdentifier.Name,
AcceptedAt: requestedAt,
Auth: requestSpec.AuthRole,
QueueingBudget: qualityOfService.QueuingBudget,
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Inputs: request.Inputs,
ReferenceName: taskIdentifier.Name,
AcceptedAt: requestedAt,
Auth: requestSpec.AuthRole,
QueueingBudget: qualityOfService.QueuingBudget,
ExecutionConfig: executionConfig,
}
if requestSpec.Labels != nil {
executeTaskInputs.Labels = requestSpec.Labels.Values
Expand Down Expand Up @@ -661,15 +684,20 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err)
return nil, nil, err
}
executionConfig, err := m.getExecutionConfig(ctx, &request)
if err != nil {
return nil, nil, err
}

// TODO: Reduce CRD size and use offloaded input URI to blob store instead.
executeWorkflowInputs := workflowengineInterfaces.ExecuteWorkflowInput{
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Inputs: executionInputs,
Reference: *launchPlan,
AcceptedAt: requestedAt,
QueueingBudget: qualityOfService.QueuingBudget,
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Inputs: executionInputs,
Reference: *launchPlan,
AcceptedAt: requestedAt,
QueueingBudget: qualityOfService.QueuingBudget,
ExecutionConfig: executionConfig,
}
err = m.addLabelsAndAnnotations(request.Spec, &executeWorkflowInputs)
if err != nil {
Expand Down
34 changes: 34 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"errors"
"testing"

managerInterfaces "github.com/flyteorg/flyteadmin/pkg/manager/interfaces"
managerMocks "github.com/flyteorg/flyteadmin/pkg/manager/mocks"

"github.com/flyteorg/flyteidl/clients/go/coreutils"

"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -2902,3 +2905,34 @@ func TestCreateSingleTaskExecution(t *testing.T) {

assert.NoError(t, err)
}

func TestGetExecutionConfig(t *testing.T) {
resourceManager := managerMocks.MockResourceManager{}
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{
MaxParallelism: 100,
},
},
},
}, nil
}

executionManager := ExecutionManager{
resourceManager: &resourceManager,
}
execConfig, err := executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
})
assert.NoError(t, err)
assert.Equal(t, execConfig.MaxParallelism, int32(100))
}
13 changes: 7 additions & 6 deletions flyteadmin/pkg/workflowengine/impl/propeller_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,21 @@ func (c *FlytePropeller) addPermissions(launchPlan admin.LaunchPlan, flyteWf *v1
}
}

func addExecutionOverrides(taskPluginOverrides []*admin.PluginOverride, flyteWf *v1alpha1.FlyteWorkflow) {
func addExecutionOverrides(taskPluginOverrides []*admin.PluginOverride,
workflowExecutionConfig *admin.WorkflowExecutionConfig, flyteWf *v1alpha1.FlyteWorkflow) {
executionConfig := v1alpha1.ExecutionConfig{
TaskPluginImpls: make(map[string]v1alpha1.TaskPluginOverride),
}
if len(taskPluginOverrides) == 0 {
return
}
for _, override := range taskPluginOverrides {
executionConfig.TaskPluginImpls[override.TaskType] = v1alpha1.TaskPluginOverride{
PluginIDs: override.PluginId,
MissingPluginBehavior: override.MissingPluginBehavior,
}

}
if workflowExecutionConfig != nil {
executionConfig.MaxParallelism = uint32(workflowExecutionConfig.MaxParallelism)
}
flyteWf.ExecutionConfig = executionConfig
}

Expand Down Expand Up @@ -144,7 +145,7 @@ func (c *FlytePropeller) ExecuteWorkflow(ctx context.Context, input interfaces.E
flyteWf.WorkflowMeta = &v1alpha1.WorkflowMeta{}
}
flyteWf.WorkflowMeta.EventVersion = c.eventVersion
addExecutionOverrides(input.TaskPluginOverrides, flyteWf)
addExecutionOverrides(input.TaskPluginOverrides, input.ExecutionConfig, flyteWf)

if input.Reference.Spec.RawOutputDataConfig != nil {
flyteWf.RawOutputDataConfig = v1alpha1.RawOutputDataConfig{
Expand Down Expand Up @@ -227,7 +228,7 @@ func (c *FlytePropeller) ExecuteTask(ctx context.Context, input interfaces.Execu
flyteWf.Labels = labels
annotations := addMapValues(input.Annotations, flyteWf.Annotations)
flyteWf.Annotations = annotations
addExecutionOverrides(input.TaskPluginOverrides, flyteWf)
addExecutionOverrides(input.TaskPluginOverrides, input.ExecutionConfig, flyteWf)

/*
TODO(katrogan): uncomment once propeller has updated the flyte workflow CRD.
Expand Down
28 changes: 28 additions & 0 deletions flyteadmin/pkg/workflowengine/impl/propeller_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,3 +468,31 @@ func TestAddPermissions(t *testing.T) {
roleNameKey: "rollie-pollie",
})
}

func TestAddExecutionOverrides(t *testing.T) {
t.Run("task plugin overrides", func(t *testing.T) {
overrides := []*admin.PluginOverride{
{
TaskType: "taskType1",
PluginId: []string{"Plugin1", "Plugin2"},
MissingPluginBehavior: admin.PluginOverride_USE_DEFAULT,
},
}
workflow := &v1alpha1.FlyteWorkflow{}
addExecutionOverrides(overrides, nil, workflow)
assert.EqualValues(t, workflow.ExecutionConfig.TaskPluginImpls, map[string]v1alpha1.TaskPluginOverride{
"taskType1": {
PluginIDs: []string{"Plugin1", "Plugin2"},
MissingPluginBehavior: admin.PluginOverride_USE_DEFAULT,
},
})
})
t.Run("max parallelism", func(t *testing.T) {
workflowExecutionConfig := &admin.WorkflowExecutionConfig{
MaxParallelism: 100,
}
workflow := &v1alpha1.FlyteWorkflow{}
addExecutionOverrides(nil, workflowExecutionConfig, workflow)
assert.EqualValues(t, workflow.ExecutionConfig.MaxParallelism, uint32(100))
})
}
2 changes: 2 additions & 0 deletions flyteadmin/pkg/workflowengine/interfaces/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type ExecuteWorkflowInput struct {
Annotations map[string]string
QueueingBudget time.Duration
TaskPluginOverrides []*admin.PluginOverride
ExecutionConfig *admin.WorkflowExecutionConfig
}

type ExecuteTaskInput struct {
Expand All @@ -32,6 +33,7 @@ type ExecuteTaskInput struct {
Annotations map[string]string
QueueingBudget time.Duration
TaskPluginOverrides []*admin.PluginOverride
ExecutionConfig *admin.WorkflowExecutionConfig
}

type TerminateWorkflowInput struct {
Expand Down

0 comments on commit 16a9b13

Please sign in to comment.