Skip to content

Commit

Permalink
Introduce WebAPI Plugin Interface, an example & AWS athena plugin (fl…
Browse files Browse the repository at this point in the history
…yteorg#146)

* Proposal: Common plugin interface for all service plugins
* Add remote plugin interface and implemention
* make properties config-friendly
* Rename remote.PluginContext
* Simplify ResourceMeta
* New task log interface and template plugin
* Update spark and pytorch plugins
* Proposal: Common plugin interface for all service plugins


Co-authored-by: Ketan Umare <[email protected]>
  • Loading branch information
EngHabu and Ketan Umare committed Jan 30, 2021
1 parent 5f27429 commit 8a49ee2
Show file tree
Hide file tree
Showing 59 changed files with 5,714 additions and 550 deletions.
2 changes: 1 addition & 1 deletion copilot/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.2
github.com/imdario/mergo v0.3.9 // indirect
github.com/lyft/flyteidl v0.18.9
github.com/lyft/flyteidl v0.18.11
github.com/lyft/flyteplugins v0.4.4
github.com/lyft/flytestdlib v0.3.9
github.com/mitchellh/go-ps v1.0.0
Expand Down
213 changes: 26 additions & 187 deletions copilot/go.sum

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ require (
github.com/Masterminds/semver v1.5.0
github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20200410212604-780c48ecb21a
github.com/aws/aws-sdk-go v1.29.23
github.com/aws/aws-sdk-go-v2 v1.0.0
github.com/aws/aws-sdk-go-v2/config v1.0.0
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/aws/aws-sdk-go-v2/service/sagemaker v1.0.0 // indirect
github.com/coocood/freecache v1.1.0
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-test/deep v1.0.5
Expand All @@ -32,7 +36,7 @@ require (
k8s.io/api v0.17.3
k8s.io/apimachinery v0.17.3
k8s.io/client-go v11.0.1-0.20190918222721-c0e3722d5cf0+incompatible
k8s.io/utils v0.0.0-20200124190032-861946025e34 // indirect
k8s.io/utils v0.0.0-20200124190032-861946025e34
sigs.k8s.io/controller-runtime v0.5.1
)

Expand Down
745 changes: 664 additions & 81 deletions go.sum

Large diffs are not rendered by default.

36 changes: 26 additions & 10 deletions go/tasks/aws/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
package aws

import (
"time"
"context"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/lyft/flytestdlib/config"

pluginsConfig "github.com/lyft/flyteplugins/go/tasks/config"
Expand All @@ -18,22 +21,19 @@ const ConfigSectionKey = "aws"

var (
defaultConfig = &Config{
Region: "us-east-1",
MaxErrorStringLength: 150,
Retries: 3,
CatalogCacheTimeout: config.Duration{Duration: time.Second * 5},
Region: "us-east-2",
Retries: 3,
}

configSection = pluginsConfig.MustRegisterSubSection(ConfigSectionKey, defaultConfig)
)

// Config section for AWS Package
type Config struct {
Region string `json:"region" pflag:",AWS Region to connect to."`
AccountID string `json:"accountId" pflag:",AWS Account Identifier."`
Retries int `json:"retries" pflag:",Number of retries."`
MaxErrorStringLength int `json:"maxErrorLength" pflag:",Maximum size of error messages."`
CatalogCacheTimeout config.Duration `json:"catalog-timeout" pflag:"\"5s\",Timeout duration for checking catalog for all batch tasks"`
Region string `json:"region" pflag:",AWS Region to connect to."`
AccountID string `json:"accountId" pflag:",AWS Account Identifier."`
Retries int `json:"retries" pflag:",Number of retries."`
LogLevel aws.ClientLogMode `json:"logLevel" pflag:"-,Defines the Sdk Log Level."`
}

type RateLimiterConfig struct {
Expand All @@ -46,6 +46,22 @@ func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

func (cfg Config) GetSdkConfig() (aws.Config, error) {
sdkConfig, err := awsConfig.LoadDefaultConfig(context.TODO(),
awsConfig.WithRegion(cfg.Region),
awsConfig.WithRetryer(func() aws.Retryer {
return retry.NewStandard(func(options *retry.StandardOptions) {
options.MaxAttempts = cfg.Retries
})
}),
awsConfig.WithClientLogMode(cfg.LogLevel))
if err != nil {
return aws.Config{}, err
}

return sdkConfig, nil
}

func MustRegisterSubSection(key config.SectionKey, cfg config.Config) config.Section {
return configSection.MustRegisterSection(key, cfg)
}
2 changes: 0 additions & 2 deletions go/tasks/aws/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 0 additions & 44 deletions go/tasks/aws/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 52 additions & 0 deletions go/tasks/pluginmachinery/core/allocationstatus_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions go/tasks/pluginmachinery/core/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,23 @@ import (
"context"
)

type AllocationStatus string
//go:generate enumer -type=AllocationStatus -trimprefix=AllocationStatus

type AllocationStatus int

const (
// This is the enum returned when there's an error
AllocationUndefined AllocationStatus = "ResourceGranted"
AllocationUndefined AllocationStatus = iota

// Go for it
AllocationStatusGranted AllocationStatus = "ResourceGranted"
AllocationStatusGranted

// This means that no resources are available globally. This is the only rejection message we use right now.
AllocationStatusExhausted AllocationStatus = "ResourceExhausted"
AllocationStatusExhausted

// We're not currently using this - but this would indicate that things globally are okay, but that your
// own namespace is too busy
AllocationStatusNamespaceQuotaExceeded AllocationStatus = "NamespaceQuotaExceeded"
AllocationStatusNamespaceQuotaExceeded
)

const namespaceSeparator = ":"
Expand Down
93 changes: 93 additions & 0 deletions go/tasks/pluginmachinery/internal/webapi/allocation_token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package webapi

import (
"context"
"fmt"

"k8s.io/utils/clock"

"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/webapi"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
)

type tokenAllocator struct {
clock clock.Clock
}

func newTokenAllocator(c clock.Clock) tokenAllocator {
return tokenAllocator{
clock: c,
}
}

func (a tokenAllocator) allocateToken(ctx context.Context, p webapi.AsyncPlugin, tCtx core.TaskExecutionContext, state *State, metrics Metrics) (
newState *State, phaseInfo core.PhaseInfo, err error) {
if len(p.GetConfig().ResourceQuotas) == 0 {
// No quota, return success
return &State{
AllocationTokenRequestStartTime: a.clock.Now(),
Phase: PhaseAllocationTokenAcquired,
}, core.PhaseInfoQueued(a.clock.Now(), 0, "No allocation token required"), nil
}

ns, constraints, err := p.ResourceRequirements(ctx, tCtx)
if err != nil {
logger.Errorf(ctx, "Failed to calculate resource requirements for task. Error: %v", err)
return nil, core.PhaseInfo{}, err
}

token := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, ns, token, constraints)
if err != nil {
logger.Errorf(ctx, "Failed to allocate resources for task. Error: %v", err)
return nil, core.PhaseInfo{}, err
}

switch allocationStatus {
case core.AllocationStatusGranted:
metrics.AllocationGranted.Inc(ctx)
metrics.ResourceWaitTime.Observe(float64(a.clock.Since(state.AllocationTokenRequestStartTime).Milliseconds()))
return &State{
AllocationTokenRequestStartTime: a.clock.Now(),
Phase: PhaseAllocationTokenAcquired,
}, core.PhaseInfoQueued(a.clock.Now(), 0, "Allocation token required"), nil
case core.AllocationStatusNamespaceQuotaExceeded:
case core.AllocationStatusExhausted:
metrics.AllocationNotGranted.Inc(ctx)
logger.Infof(ctx, "Couldn't allocate token because allocation status is [%v].", allocationStatus.String())
startTime := state.AllocationTokenRequestStartTime
if startTime.IsZero() {
startTime = a.clock.Now()
}

return &State{
AllocationTokenRequestStartTime: startTime,
Phase: PhaseNotStarted,
}, core.PhaseInfoQueued(a.clock.Now(), 0,
fmt.Sprintf("Quota for task has exceeded. The request is enqueued.")), nil
}

return nil, core.PhaseInfo{}, fmt.Errorf("allocation status undefined [%v]", allocationStatus)
}

func (a tokenAllocator) releaseToken(ctx context.Context, p webapi.AsyncPlugin, tCtx core.TaskExecutionContext, metrics Metrics) error {
ns, _, err := p.ResourceRequirements(ctx, tCtx)
if err != nil {
logger.Errorf(ctx, "Failed to calculate resource requirements for task. Error: %v", err)
return err
}

token := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
err = tCtx.ResourceManager().ReleaseResource(ctx, ns, token)
if err != nil {
metrics.ResourceReleaseFailed.Inc(ctx)
logger.Errorf(ctx, "Failed to release resources for task. Error: %v", err)
return err
}

metrics.ResourceReleased.Inc(ctx)
return nil
}
Loading

0 comments on commit 8a49ee2

Please sign in to comment.