Skip to content

Commit

Permalink
Add interpolation to Hive queries (flyteorg#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
wild-endeavor authored Dec 2, 2020
1 parent 25ac5bd commit 17d3155
Show file tree
Hide file tree
Showing 18 changed files with 377 additions and 115 deletions.
2 changes: 1 addition & 1 deletion flyteplugins/copilot/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.2
github.com/imdario/mergo v0.3.9 // indirect
github.com/lyft/flyteidl v0.18.0
github.com/lyft/flyteidl v0.18.9
github.com/lyft/flyteplugins v0.4.4
github.com/lyft/flytestdlib v0.3.9
github.com/mitchellh/go-ps v1.0.0
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/copilot/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ github.com/lyft/flyteidl v0.17.32 h1:Iio3gYjTyPhAiOMWJ/H/4YtfWIZm5KZSlWMULT1Ef6U
github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.0 h1:f4yv1MafE26wpMC6QlthM02EeTEDXpy/waL54dRDiSs=
github.com/lyft/flyteidl v0.18.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flytepropeller v0.3.6/go.mod h1:1Iw3ngmJBP+52coloHL1rOxcX7EDDUUvTYFQQy2WYzk=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.9 h1:NaKp9xkeWWwhVvqTOcR/FqlASy1N2gu/kN7PVe4S7YI=
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVed
github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.9 h1:NaKp9xkeWWwhVvqTOcR/FqlASy1N2gu/kN7PVe4S7YI=
github.com/lyft/flytestdlib v0.3.9/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/spark-on-k8s-operator v0.1.4-0.20201027003055-c76b67e3b6d0 h1:1vSmc+Bo70X0JVYywQ9Hy/aet6p613ejacy9x5td0m4=
github.com/lyft/spark-on-k8s-operator v0.1.4-0.20201027003055-c76b67e3b6d0/go.mod h1:hkRqdqAsdNnxT/Zst6MNMRbTAoiCZ0JRw7svRgAYb0A=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
Expand Down
87 changes: 60 additions & 27 deletions ...o/tasks/pluginmachinery/utils/template.go → ...pluginmachinery/core/template/template.go
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
package utils
package template

import (
"context"
"fmt"
"reflect"
"regexp"
"strings"

"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"
"github.com/pkg/errors"

"reflect"

"github.com/golang/protobuf/ptypes"
idlCore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/pkg/errors"
)

var inputFileRegex = regexp.MustCompile(`(?i){{\s*[\.$]Input\s*}}`)
var inputPrefixRegex = regexp.MustCompile(`(?i){{\s*[\.$]InputPrefix\s*}}`)
var outputRegex = regexp.MustCompile(`(?i){{\s*[\.$]OutputPrefix\s*}}`)
var inputVarRegex = regexp.MustCompile(`(?i){{\s*[\.$]Inputs\.(?P<input_name>[^}\s]+)\s*}}`)
var rawOutputDataPrefixRegex = regexp.MustCompile(`(?i){{\s*[\.$]RawOutputDataPrefix\s*}}`)
var alphaNumericOnly = regexp.MustCompile("[^a-zA-Z0-9_]+")
var startsWithAlpha = regexp.MustCompile("^[^a-zA-Z_]+")

type ErrorCollection struct {
Errors []error
}

func (e ErrorCollection) Error() string {
sb := strings.Builder{}
for idx, err := range e.Errors {
sb.WriteString(fmt.Sprintf("%v: %v\r\n", idx, err))
}

return sb.String()
}

// Evaluates templates in each command with the equivalent value from passed args. Templates are case-insensitive
// Supported templates are:
Expand All @@ -32,7 +44,16 @@ var rawOutputDataPrefixRegex = regexp.MustCompile(`(?i){{\s*[\.$]RawOutputDataPr
// NOTE: I wanted to do in-place replacement, until I realized that in-place replacement will alter the definition of the
// graph. This is not desirable, as we may have to retry and in that case the replacement will not work and we want
// to create a new location for outputs
func ReplaceTemplateCommandArgs(ctx context.Context, command []string, in io.InputReader, out io.OutputFilePaths) ([]string, error) {
func ReplaceTemplateCommandArgs(ctx context.Context, tExecMeta core.TaskExecutionMetadata, command []string, in io.InputReader,
out io.OutputFilePaths) ([]string, error) {

// TODO: Change GetGeneratedName to follow these conventions
var perRetryUniqueKey = tExecMeta.GetTaskExecutionID().GetGeneratedName()
perRetryUniqueKey = startsWithAlpha.ReplaceAllString(perRetryUniqueKey, "a")
perRetryUniqueKey = alphaNumericOnly.ReplaceAllString(perRetryUniqueKey, "_")

logger.Debugf(ctx, "Using [%s] from [%s]", perRetryUniqueKey, tExecMeta.GetTaskExecutionID().GetGeneratedName())

if len(command) == 0 {
return []string{}, nil
}
Expand All @@ -41,7 +62,7 @@ func ReplaceTemplateCommandArgs(ctx context.Context, command []string, in io.Inp
}
res := make([]string, 0, len(command))
for _, commandTemplate := range command {
updated, err := replaceTemplateCommandArgs(ctx, commandTemplate, in, out)
updated, err := replaceTemplateCommandArgs(ctx, perRetryUniqueKey, commandTemplate, in, out)
if err != nil {
return res, err
}
Expand All @@ -52,7 +73,14 @@ func ReplaceTemplateCommandArgs(ctx context.Context, command []string, in io.Inp
return res, nil
}

func transformVarNameToStringVal(ctx context.Context, varName string, inputs *core.LiteralMap) (string, error) {
var inputFileRegex = regexp.MustCompile(`(?i){{\s*[\.$]Input\s*}}`)
var inputPrefixRegex = regexp.MustCompile(`(?i){{\s*[\.$]InputPrefix\s*}}`)
var outputRegex = regexp.MustCompile(`(?i){{\s*[\.$]OutputPrefix\s*}}`)
var inputVarRegex = regexp.MustCompile(`(?i){{\s*[\.$]Inputs\.(?P<input_name>[^}\s]+)\s*}}`)
var rawOutputDataPrefixRegex = regexp.MustCompile(`(?i){{\s*[\.$]RawOutputDataPrefix\s*}}`)
var perRetryUniqueKey = regexp.MustCompile(`(?i){{\s*[\.$]PerRetryUniqueKey\s*}}`)

func transformVarNameToStringVal(ctx context.Context, varName string, inputs *idlCore.LiteralMap) (string, error) {
inputVal, exists := inputs.Literals[varName]
if !exists {
return "", fmt.Errorf("requested input is not found [%s]", varName)
Expand All @@ -65,11 +93,14 @@ func transformVarNameToStringVal(ctx context.Context, varName string, inputs *co
return v, nil
}

func replaceTemplateCommandArgs(ctx context.Context, commandTemplate string, in io.InputReader, out io.OutputFilePaths) (string, error) {
func replaceTemplateCommandArgs(ctx context.Context, perRetryKey string, commandTemplate string,
in io.InputReader, out io.OutputFilePaths) (string, error) {

val := inputFileRegex.ReplaceAllString(commandTemplate, in.GetInputPath().String())
val = outputRegex.ReplaceAllString(val, out.GetOutputPrefixPath().String())
val = inputPrefixRegex.ReplaceAllString(val, in.GetInputPrefixPath().String())
val = rawOutputDataPrefixRegex.ReplaceAllString(val, out.GetRawOutputPrefix().String())
val = perRetryUniqueKey.ReplaceAllString(val, perRetryKey)

inputs, err := in.Get(ctx)
if err != nil {
Expand Down Expand Up @@ -98,39 +129,41 @@ func replaceTemplateCommandArgs(ctx context.Context, commandTemplate string, in
return val, nil
}

func serializePrimitive(p *core.Primitive) (string, error) {
func serializePrimitive(p *idlCore.Primitive) (string, error) {
switch o := p.Value.(type) {
case *core.Primitive_Integer:
case *idlCore.Primitive_Integer:
return fmt.Sprintf("%v", o.Integer), nil
case *core.Primitive_Boolean:
case *idlCore.Primitive_Boolean:
return fmt.Sprintf("%v", o.Boolean), nil
case *core.Primitive_Datetime:
case *idlCore.Primitive_Datetime:
return ptypes.TimestampString(o.Datetime), nil
case *core.Primitive_Duration:
case *idlCore.Primitive_Duration:
return o.Duration.String(), nil
case *core.Primitive_FloatValue:
case *idlCore.Primitive_FloatValue:
return fmt.Sprintf("%v", o.FloatValue), nil
case *core.Primitive_StringValue:
case *idlCore.Primitive_StringValue:
return o.StringValue, nil
default:
return "", fmt.Errorf("received an unexpected primitive type [%v]", reflect.TypeOf(p.Value))
}
}

func serializeLiteralScalar(l *core.Scalar) (string, error) {
func serializeLiteralScalar(l *idlCore.Scalar) (string, error) {
switch o := l.Value.(type) {
case *core.Scalar_Primitive:
case *idlCore.Scalar_Primitive:
return serializePrimitive(o.Primitive)
case *core.Scalar_Blob:
case *idlCore.Scalar_Blob:
return o.Blob.Uri, nil
case *idlCore.Scalar_Schema:
return o.Schema.Uri, nil
default:
return "", fmt.Errorf("received an unexpected scalar type [%v]", reflect.TypeOf(l.Value))
}
}

func serializeLiteral(ctx context.Context, l *core.Literal) (string, error) {
func serializeLiteral(ctx context.Context, l *idlCore.Literal) (string, error) {
switch o := l.Value.(type) {
case *core.Literal_Collection:
case *idlCore.Literal_Collection:
res := make([]string, 0, len(o.Collection.Literals))
for _, sub := range o.Collection.Literals {
s, err := serializeLiteral(ctx, sub)
Expand All @@ -142,7 +175,7 @@ func serializeLiteral(ctx context.Context, l *core.Literal) (string, error) {
}

return fmt.Sprintf("[%v]", strings.Join(res, ",")), nil
case *core.Literal_Scalar:
case *idlCore.Literal_Scalar:
return serializeLiteralScalar(o.Scalar)
default:
logger.Debugf(ctx, "received unexpected primitive type")
Expand Down
Loading

0 comments on commit 17d3155

Please sign in to comment.