Skip to content

Commit

Permalink
Add Artifacts related changes to private fork (#55)
Browse files Browse the repository at this point in the history
* test changes to time partition (#4737)

Signed-off-by: Yee Hing Tong <[email protected]>

* trigger from core -> artifact, artifact->artifacts

* artifacts.proto package to cloud

* data proxy package to cloud

* deactivateAllTriggers endpoint

* proto package paths, gen protos

---------

Signed-off-by: Yee Hing Tong <[email protected]>
Co-authored-by: Yee Hing Tong <[email protected]>
  • Loading branch information
squiishyy and wild-endeavor authored Feb 9, 2024
1 parent dd8ac0b commit 47d71ac
Show file tree
Hide file tree
Showing 51 changed files with 18,836 additions and 10,666 deletions.
20 changes: 10 additions & 10 deletions flyteadmin/pkg/artifacts/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (

admin2 "github.com/flyteorg/flyte/flyteidl/clients/go/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifacts"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

// ArtifactRegistry contains a client to talk to an Artifact service and has helper methods
type ArtifactRegistry struct {
client artifact.ArtifactRegistryClient
client artifacts.ArtifactRegistryClient
}

func (a *ArtifactRegistry) RegisterArtifactProducer(ctx context.Context, id *core.Identifier, ti core.TypedInterface) {
Expand All @@ -24,12 +24,12 @@ func (a *ArtifactRegistry) RegisterArtifactProducer(ctx context.Context, id *cor
return
}

ap := &artifact.ArtifactProducer{
ap := &artifacts.ArtifactProducer{
EntityId: id,
Outputs: ti.Outputs,
}
_, err := a.client.RegisterProducer(ctx, &artifact.RegisterProducerRequest{
Producers: []*artifact.ArtifactProducer{ap},
_, err := a.client.RegisterProducer(ctx, &artifacts.RegisterProducerRequest{
Producers: []*artifacts.ArtifactProducer{ap},
})
if err != nil {
logger.Errorf(ctx, "Failed to register artifact producer for task [%+v] with err: %v", id, err)
Expand All @@ -42,12 +42,12 @@ func (a *ArtifactRegistry) RegisterArtifactConsumer(ctx context.Context, id *cor
logger.Debugf(ctx, "Artifact client not configured, skipping registration for consumer [%+v]", id)
return
}
ac := &artifact.ArtifactConsumer{
ac := &artifacts.ArtifactConsumer{
EntityId: id,
Inputs: &pm,
}
_, err := a.client.RegisterConsumer(ctx, &artifact.RegisterConsumerRequest{
Consumers: []*artifact.ArtifactConsumer{ac},
_, err := a.client.RegisterConsumer(ctx, &artifacts.RegisterConsumerRequest{
Consumers: []*artifacts.ArtifactConsumer{ac},
})
if err != nil {
logger.Errorf(ctx, "Failed to register artifact consumer for entity [%+v] with err: %v", id, err)
Expand All @@ -60,7 +60,7 @@ func (a *ArtifactRegistry) RegisterTrigger(ctx context.Context, plan *admin.Laun
logger.Debugf(ctx, "Artifact client not configured, skipping trigger [%+v]", plan)
return fmt.Errorf("artifact client not configured")
}
_, err := a.client.CreateTrigger(ctx, &artifact.CreateTriggerRequest{
_, err := a.client.CreateTrigger(ctx, &artifacts.CreateTriggerRequest{
TriggerLaunchPlan: plan,
})
if err != nil {
Expand All @@ -71,7 +71,7 @@ func (a *ArtifactRegistry) RegisterTrigger(ctx context.Context, plan *admin.Laun
return nil
}

func (a *ArtifactRegistry) GetClient() artifact.ArtifactRegistryClient {
func (a *ArtifactRegistry) GetClient() artifacts.ArtifactRegistryClient {
if a == nil {
return nil
}
Expand Down
86 changes: 38 additions & 48 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
workflowengineInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteadmin/plugins"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
artifactsIdl "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifacts"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s"
Expand Down Expand Up @@ -745,6 +745,7 @@ func (m *ExecutionManager) getStringFromInput(ctx context.Context, inputBinding
strVal = p.GetStringValue()
case *core.Primitive_Datetime:
t := time.Unix(p.GetDatetime().Seconds, int64(p.GetDatetime().Nanos))
t = t.In(time.UTC)
strVal = t.Format("2006-01-02")
case *core.Primitive_StringValue:
strVal = p.GetStringValue()
Expand Down Expand Up @@ -779,46 +780,6 @@ func (m *ExecutionManager) fillInTemplateArgs(ctx context.Context, query core.Ar
if query.GetUri() != "" {
// If a query string, then just pass it through, nothing to fill in.
return query, nil
} else if query.GetArtifactTag() != nil {
t := query.GetArtifactTag()
ak := t.GetArtifactKey()
if ak == nil {
return query, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "tag doesn't have key")
}
var project, domain string
if ak.GetProject() == "" {
project = contextutils.Value(ctx, contextutils.ProjectKey)
} else {
project = ak.GetProject()
}
if ak.GetDomain() == "" {
domain = contextutils.Value(ctx, contextutils.DomainKey)
} else {
domain = ak.GetDomain()
}
strValue, err := m.getLabelValue(ctx, t.GetValue(), inputs)
if err != nil {
logger.Errorf(ctx, "Failed to template input string [%s] [%v]", t.GetValue(), err)
return query, err
}

return core.ArtifactQuery{
Identifier: &core.ArtifactQuery_ArtifactTag{
ArtifactTag: &core.ArtifactTag{
ArtifactKey: &core.ArtifactKey{
Project: project,
Domain: domain,
Name: ak.GetName(),
},
Value: &core.LabelValue{
Value: &core.LabelValue_StaticValue{
StaticValue: strValue,
},
},
},
},
}, nil

} else if query.GetArtifactId() != nil {
artifactID := query.GetArtifactId()
ak := artifactID.GetArtifactKey()
Expand All @@ -839,7 +800,7 @@ func (m *ExecutionManager) fillInTemplateArgs(ctx context.Context, query core.Ar

var partitions map[string]*core.LabelValue

if artifactID.GetPartitions() != nil && artifactID.GetPartitions().GetValue() != nil {
if artifactID.GetPartitions().GetValue() != nil {
partitions = make(map[string]*core.LabelValue, len(artifactID.GetPartitions().Value))
for k, v := range artifactID.GetPartitions().GetValue() {
newValue, err := m.getLabelValue(ctx, v, inputs)
Expand All @@ -850,6 +811,36 @@ func (m *ExecutionManager) fillInTemplateArgs(ctx context.Context, query core.Ar
partitions[k] = &core.LabelValue{Value: &core.LabelValue_StaticValue{StaticValue: newValue}}
}
}

var timePartition *core.TimePartition
if artifactID.GetTimePartition().GetValue() != nil {
if artifactID.GetTimePartition().Value.GetTimeValue() != nil {
// If the time value is set, then just pass it through, nothing to fill in.
timePartition = artifactID.GetTimePartition()
} else if artifactID.GetTimePartition().Value.GetInputBinding() != nil {
// Evaluate the time partition input binding
lit, ok := inputs[artifactID.GetTimePartition().Value.GetInputBinding().GetVar()]
if !ok {
return query, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "time partition input binding var [%s] not found in inputs %v", artifactID.GetTimePartition().Value.GetInputBinding().GetVar(), inputs)
}

if lit.GetScalar().GetPrimitive().GetDatetime() == nil {
return query, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"time partition binding to input var [%s] failing because %v is not a datetime",
artifactID.GetTimePartition().Value.GetInputBinding().GetVar(), lit)
}
timePartition = &core.TimePartition{
Value: &core.LabelValue{
Value: &core.LabelValue_TimeValue{
TimeValue: lit.GetScalar().GetPrimitive().GetDatetime(),
},
},
}
} else {
return query, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "time partition value cannot be empty when evaluating query: %v", query)
}
}

return core.ArtifactQuery{
Identifier: &core.ArtifactQuery_ArtifactId{
ArtifactId: &core.ArtifactID{
Expand All @@ -858,11 +849,10 @@ func (m *ExecutionManager) fillInTemplateArgs(ctx context.Context, query core.Ar
Domain: domain,
Name: ak.GetName(),
},
Dimensions: &core.ArtifactID_Partitions{
Partitions: &core.Partitions{
Value: partitions,
},
Partitions: &core.Partitions{
Value: partitions,
},
TimePartition: timePartition,
},
},
}, nil
Expand Down Expand Up @@ -901,7 +891,7 @@ func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inp
logger.Errorf(ctx, "Failed to fill in template args for [%s] [%v]", k, err)
return nil, nil, err
}
req := &artifact.GetArtifactRequest{
req := &artifactsIdl.GetArtifactRequest{
Query: &filledInQuery,
Details: false,
}
Expand All @@ -918,7 +908,7 @@ func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inp
}
} else if v.GetArtifactId() != nil {
// This case is for when someone hard-codes a known ArtifactID as a default value.
req := &artifact.GetArtifactRequest{
req := &artifactsIdl.GetArtifactRequest{
Query: &core.ArtifactQuery{
Identifier: &core.ArtifactQuery_ArtifactId{
ArtifactId: v.GetArtifactId(),
Expand Down
91 changes: 91 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5720,5 +5720,96 @@ func TestAddStateFilter(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "state <> ?", expression.Query)
})
}

func TestQueryTemplate(t *testing.T) {
ctx := context.Background()

aTime := time.Date(
2063, 4, 5, 00, 00, 00, 0, time.UTC)

rawInputs := map[string]interface{}{
"aStr": "hello world",
"anInt": 1,
"aFloat": 1.3,
"aTime": aTime,
}

otherInputs, err := coreutils.MakeLiteralMap(rawInputs)
assert.NoError(t, err)

m := ExecutionManager{}

ak := &core.ArtifactKey{
Project: "project",
Domain: "domain",
Name: "testname",
}

t.Run("test all present, nothing to fill in", func(t *testing.T) {
pMap := map[string]*core.LabelValue{
"partition1": {Value: &core.LabelValue_StaticValue{StaticValue: "my value"}},
"partition2": {Value: &core.LabelValue_StaticValue{StaticValue: "my value 2"}},
}
p := &core.Partitions{Value: pMap}

q := core.ArtifactQuery{
Identifier: &core.ArtifactQuery_ArtifactId{
ArtifactId: &core.ArtifactID{
ArtifactKey: ak,
Partitions: p,
TimePartition: nil,
},
},
}

filledQuery, err := m.fillInTemplateArgs(ctx, q, otherInputs.Literals)
assert.NoError(t, err)
assert.True(t, proto.Equal(&q, &filledQuery))
})

t.Run("template date-times, both in explicit tp and not", func(t *testing.T) {
pMap := map[string]*core.LabelValue{
"partition1": {Value: &core.LabelValue_InputBinding{InputBinding: &core.InputBindingData{Var: "aTime"}}},
"partition2": {Value: &core.LabelValue_StaticValue{StaticValue: "my value 2"}},
}
p := &core.Partitions{Value: pMap}

q := core.ArtifactQuery{
Identifier: &core.ArtifactQuery_ArtifactId{
ArtifactId: &core.ArtifactID{
ArtifactKey: ak,
Partitions: p,
TimePartition: &core.TimePartition{Value: &core.LabelValue{Value: &core.LabelValue_InputBinding{InputBinding: &core.InputBindingData{Var: "aTime"}}}},
},
},
}

filledQuery, err := m.fillInTemplateArgs(ctx, q, otherInputs.Literals)
assert.NoError(t, err)
staticTime := filledQuery.GetArtifactId().Partitions.Value["partition1"].GetStaticValue()
assert.Equal(t, "2063-04-05", staticTime)
assert.Equal(t, int64(2942956800), filledQuery.GetArtifactId().TimePartition.Value.GetTimeValue().Seconds)
})

t.Run("something missing", func(t *testing.T) {
pMap := map[string]*core.LabelValue{
"partition1": {Value: &core.LabelValue_StaticValue{StaticValue: "my value"}},
"partition2": {Value: &core.LabelValue_StaticValue{StaticValue: "my value 2"}},
}
p := &core.Partitions{Value: pMap}

q := core.ArtifactQuery{
Identifier: &core.ArtifactQuery_ArtifactId{
ArtifactId: &core.ArtifactID{
ArtifactKey: ak,
Partitions: p,
TimePartition: &core.TimePartition{Value: &core.LabelValue{Value: &core.LabelValue_InputBinding{InputBinding: &core.InputBindingData{Var: "wrong var"}}}},
},
},
}

_, err := m.fillInTemplateArgs(ctx, q, otherInputs.Literals)
assert.Error(t, err)
})
}
9 changes: 4 additions & 5 deletions flyteidl/clients/go/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"errors"
"fmt"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"

grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
Expand All @@ -17,6 +15,7 @@ import (

"github.com/flyteorg/flyte/flyteidl/clients/go/admin/cache"
"github.com/flyteorg/flyte/flyteidl/clients/go/admin/mocks"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifacts"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flyte/flytestdlib/logger"
)
Expand All @@ -32,7 +31,7 @@ type Clientset struct {
identityServiceClient service.IdentityServiceClient
dataProxyServiceClient service.DataProxyServiceClient
signalServiceClient service.SignalServiceClient
artifactServiceClient artifact.ArtifactRegistryClient
artifactServiceClient artifacts.ArtifactRegistryClient
}

// AdminClient retrieves the AdminServiceClient
Expand Down Expand Up @@ -62,7 +61,7 @@ func (c Clientset) SignalServiceClient() service.SignalServiceClient {
return c.signalServiceClient
}

func (c Clientset) ArtifactServiceClient() artifact.ArtifactRegistryClient {
func (c Clientset) ArtifactServiceClient() artifacts.ArtifactRegistryClient {
return c.artifactServiceClient
}

Expand Down Expand Up @@ -206,7 +205,7 @@ func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenC
cs.healthServiceClient = grpc_health_v1.NewHealthClient(adminConnection)
cs.dataProxyServiceClient = service.NewDataProxyServiceClient(adminConnection)
cs.signalServiceClient = service.NewSignalServiceClient(adminConnection)
cs.artifactServiceClient = artifact.NewArtifactRegistryClient(adminConnection)
cs.artifactServiceClient = artifacts.NewArtifactRegistryClient(adminConnection)

return &cs, nil
}
Expand Down
2 changes: 1 addition & 1 deletion flyteidl/gen/pb-cpp/flyteidl/admin/execution.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 47d71ac

Please sign in to comment.