Skip to content

Commit

Permalink
Presto plugin executor (flyteorg#69)
Browse files Browse the repository at this point in the history
* Presto plugin executor

* mockery

* add unit tests

* more unit tests

* update to correct import

* fix lint

* linting

* last linting?

* minor changes

* expanded comment on state machine logic

* PR feedback changes

* PR feedback changes 2

* PR feedback changes 3

* PR feedback changes 4

* add user to execute args

* resource reg

* update status

* e2e chages

* update sync period

* add input interpolator

* changes

* changes 2

* prefix implicit inputs with __

* comments

* more feedback

* edit metrics
  • Loading branch information
lu4nm3 committed Mar 24, 2020
1 parent bdee697 commit 635416c
Show file tree
Hide file tree
Showing 16 changed files with 2,100 additions and 3 deletions.
2 changes: 1 addition & 1 deletion flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/golang/protobuf v1.3.3
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/lyft/flyteidl v0.17.6
github.com/lyft/flyteidl v0.17.9
github.com/lyft/flytestdlib v0.3.3
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.1.2
Expand Down
4 changes: 2 additions & 2 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0 h1:NGL46+1RYcCXb3sShp0nQq
github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0/go.mod h1:/L5qH+AD540e7Cetbui1tuJeXdmNhO8jM6VkXeDdDhQ=
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f h1:PGuAMDzAen0AulUfaEhNQMYmUpa41pAVo3zHI+GJsCM=
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnznGEAqC3DcNm6yEj472xaFVfLM7hnYofMb12tQ=
github.com/lyft/flyteidl v0.17.6 h1:O0qpT6ya45e/92+E84uGOYa0ZsaFoE5ZfPoyJ6e1bEQ=
github.com/lyft/flyteidl v0.17.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes=
github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
Expand Down
126 changes: 126 additions & 0 deletions flyteplugins/go/tasks/plugins/presto/client/mocks/presto_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions flyteplugins/go/tasks/plugins/presto/client/noop_presto_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package client

import (
"context"
"net/http"
"net/url"

"time"

"github.com/lyft/flyteplugins/go/tasks/plugins/presto/config"
)

const (
httpRequestTimeoutSecs = 30
)

type noopPrestoClient struct {
client *http.Client
environment *url.URL
}

func (p noopPrestoClient) ExecuteCommand(
ctx context.Context,
queryStr string,
executeArgs PrestoExecuteArgs) (PrestoExecuteResponse, error) {

return PrestoExecuteResponse{}, nil
}

func (p noopPrestoClient) KillCommand(ctx context.Context, commandID string) error {
return nil
}

func (p noopPrestoClient) GetCommandStatus(ctx context.Context, commandID string) (PrestoStatus, error) {
return NewPrestoStatus(ctx, "UNKNOWN"), nil
}

func NewNoopPrestoClient(cfg *config.Config) PrestoClient {
return &noopPrestoClient{
client: &http.Client{Timeout: httpRequestTimeoutSecs * time.Second},
environment: cfg.Environment.ResolveReference(&cfg.Environment.URL),
}
}
35 changes: 35 additions & 0 deletions flyteplugins/go/tasks/plugins/presto/client/presto_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package client

import "context"

type PrestoStatus string

// Contains information needed to execute a Presto query
type PrestoExecuteArgs struct {
RoutingGroup string `json:"routingGroup,omitempty"`
Catalog string `json:"catalog,omitempty"`
Schema string `json:"schema,omitempty"`
Source string `json:"source,omitempty"`
User string `json:"user,omitempty"`
}

// Representation of a response after submitting a query to Presto
type PrestoExecuteResponse struct {
ID string `json:"id,omitempty"`
Status PrestoStatus `json:"status,omitempty"`
NextURI string `json:"nextUri,omitempty"`
}

//go:generate mockery -all -case=snake

// Interface to interact with PrestoClient for Presto tasks
type PrestoClient interface {
// Submits a query to Presto
ExecuteCommand(ctx context.Context, commandStr string, executeArgs PrestoExecuteArgs) (PrestoExecuteResponse, error)

// Cancels a currently running Presto query
KillCommand(ctx context.Context, commandID string) error

// Gets the status of a Presto query
GetCommandStatus(ctx context.Context, commandID string) (PrestoStatus, error)
}
43 changes: 43 additions & 0 deletions flyteplugins/go/tasks/plugins/presto/client/presto_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package client

import (
"context"
"strings"

"github.com/lyft/flytestdlib/logger"
)

// This type is meant only to encapsulate the response coming from Presto as a type, it is
// not meant to be stored locally.
const (
PrestoStatusUnknown PrestoStatus = "UNKNOWN"
PrestoStatusWaiting PrestoStatus = "WAITING"
PrestoStatusRunning PrestoStatus = "RUNNING"
PrestoStatusFinished PrestoStatus = "FINISHED"
PrestoStatusFailed PrestoStatus = "FAILED"
PrestoStatusCancelled PrestoStatus = "CANCELLED"
)

var PrestoStatuses = map[PrestoStatus]struct{}{
PrestoStatusUnknown: {},
PrestoStatusWaiting: {},
PrestoStatusRunning: {},
PrestoStatusFinished: {},
PrestoStatusFailed: {},
PrestoStatusCancelled: {},
}

func NewPrestoStatus(ctx context.Context, state string) PrestoStatus {
upperCased := strings.ToUpper(state)

// Presto has different failure modes so this maps them all to a single Failure on the
// Flyte side
if strings.Contains(upperCased, "FAILED") {
return PrestoStatusFailed
} else if _, ok := PrestoStatuses[PrestoStatus(upperCased)]; ok {
return PrestoStatus(upperCased)
} else {
logger.Warnf(ctx, "Invalid Presto Status found: %v", state)
return PrestoStatusUnknown
}
}
76 changes: 76 additions & 0 deletions flyteplugins/go/tasks/plugins/presto/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package config

//go:generate pflags Config --default-var=defaultConfig

import (
"context"
"net/url"
"time"

"github.com/lyft/flytestdlib/config"
"github.com/lyft/flytestdlib/logger"

pluginsConfig "github.com/lyft/flyteplugins/go/tasks/config"
)

const prestoConfigSectionKey = "presto"

func URLMustParse(s string) config.URL {
r, err := url.Parse(s)
if err != nil {
logger.Panicf(context.TODO(), "Bad Presto URL Specified as default, error: %s", err)
}
if r == nil {
logger.Panicf(context.TODO(), "Nil Presto URL specified.", err)
}
return config.URL{URL: *r}
}

type RoutingGroupConfig struct {
Name string `json:"name" pflag:",The name of a given Presto routing group"`
Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the routing group"`
ProjectScopeQuotaProportionCap float64 `json:"projectScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a project in the routing group"`
NamespaceScopeQuotaProportionCap float64 `json:"namespaceScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a namespace in the routing group"`
}

type RefreshCacheConfig struct {
Name string `json:"name" pflag:",The name of the rate limiter"`
SyncPeriod config.Duration `json:"syncPeriod" pflag:",The duration to wait before the cache is refreshed again"`
Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"`
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the cache"`
}

var (
defaultConfig = Config{
Environment: URLMustParse(""),
DefaultRoutingGroup: "adhoc",
DefaultUser: "flyte-default-user",
RoutingGroupConfigs: []RoutingGroupConfig{{Name: "adhoc", Limit: 250}, {Name: "etl", Limit: 100}},
RefreshCacheConfig: RefreshCacheConfig{
Name: "presto",
SyncPeriod: config.Duration{Duration: 5 * time.Second},
Workers: 15,
LruCacheSize: 10000,
},
}

prestoConfigSection = pluginsConfig.MustRegisterSubSection(prestoConfigSectionKey, &defaultConfig)
)

// Presto plugin configs
type Config struct {
Environment config.URL `json:"environment" pflag:",Environment endpoint for Presto to use"`
DefaultRoutingGroup string `json:"defaultRoutingGroup" pflag:",Default Presto routing group"`
DefaultUser string `json:"defaultUser" pflag:",Default Presto user"`
RoutingGroupConfigs []RoutingGroupConfig `json:"routingGroupConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
RefreshCacheConfig RefreshCacheConfig `json:"refreshCacheConfig" pflag:"Rate limiter config"`
}

// Retrieves the current config value or default.
func GetPrestoConfig() *Config {
return prestoConfigSection.GetConfig().(*Config)
}

func SetPrestoConfig(cfg *Config) error {
return prestoConfigSection.SetConfig(cfg)
}
Loading

0 comments on commit 635416c

Please sign in to comment.