From 01ebefd308a08b0ce78a27c3135b5936193eaedc Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 27 Sep 2023 13:09:25 -0700 Subject: [PATCH] Plugin changes for plumbing k8s events into TaskExecutionEvent (#406) * Plugin changes for k8s events Signed-off-by: Andrew Dye * make generate Signed-off-by: Andrew Dye * Comment for SendObjectEvents Signed-off-by: Andrew Dye * make generate Signed-off-by: Andrew Dye --------- Signed-off-by: Andrew Dye --- go/tasks/pluginmachinery/core/phase.go | 10 +++++++++- go/tasks/pluginmachinery/flytek8s/config/config.go | 6 +++++- .../flytek8s/config/k8spluginconfig_flags.go | 1 + .../flytek8s/config/k8spluginconfig_flags_test.go | 14 ++++++++++++++ 4 files changed, 29 insertions(+), 2 deletions(-) diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index 93fd3067d..dbd1f699b 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -4,8 +4,9 @@ import ( "fmt" "time" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" structpb "github.com/golang/protobuf/ptypes/struct" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) const DefaultPhaseVersion = uint32(0) @@ -83,6 +84,11 @@ type ExternalResource struct { Phase Phase } +type ReasonInfo struct { + Reason string + OccurredAt *time.Time +} + type TaskInfo struct { // log information for the task execution Logs []*core.TaskLog @@ -96,6 +102,8 @@ type TaskInfo struct { CustomInfo *structpb.Struct // A collection of information about external resources launched by this task ExternalResources []*ExternalResource + // Additional reasons for this case. Note, these are not included in the phase state. + AdditionalReasons []ReasonInfo } func (t *TaskInfo) String() string { diff --git a/go/tasks/pluginmachinery/flytek8s/config/config.go b/go/tasks/pluginmachinery/flytek8s/config/config.go index daaae6e68..56a57ffde 100755 --- a/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -10,9 +10,10 @@ import ( "k8s.io/apimachinery/pkg/api/resource" - config2 "github.com/flyteorg/flytestdlib/config" v1 "k8s.io/api/core/v1" + config2 "github.com/flyteorg/flytestdlib/config" + "github.com/flyteorg/flyteplugins/go/tasks/config" ) @@ -167,6 +168,9 @@ type K8sPluginConfig struct { // DefaultPodTemplateResync defines the frequency at which the k8s informer resyncs the default // pod template resources. DefaultPodTemplateResync config2.Duration `json:"default-pod-template-resync" pflag:",Frequency of resyncing default pod templates"` + + // SendObjectEvents indicates whether to send k8s object events in TaskExecutionEvent updates (similar to kubectl get events). + SendObjectEvents bool `json:"send-object-events" pflag:",If true, will send k8s object events in TaskExecutionEvent updates."` } // FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers diff --git a/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go b/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go index 7e339dbbe..7a3f1c951 100755 --- a/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go +++ b/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go @@ -66,5 +66,6 @@ func (cfg K8sPluginConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "delete-resource-on-finalize"), defaultK8sConfig.DeleteResourceOnFinalize, "Instructs the system to delete the resource upon successful execution of a k8s pod rather than have the k8s garbage collector clean it up. This ensures that no resources are kept around (potentially consuming cluster resources). This, however, will cause k8s log links to expire as soon as the resource is finalized.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-pod-template-name"), defaultK8sConfig.DefaultPodTemplateName, "Name of the PodTemplate to use as the base for all k8s pods created by FlytePropeller.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-pod-template-resync"), defaultK8sConfig.DefaultPodTemplateResync.String(), "Frequency of resyncing default pod templates") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "send-object-events"), defaultK8sConfig.SendObjectEvents, "If true, will send k8s object events in TaskExecutionEvent updates.") return cmdFlags } diff --git a/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go b/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go index c10e9c70f..4d5918a3b 100755 --- a/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go +++ b/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go @@ -323,4 +323,18 @@ func TestK8sPluginConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_send-object-events", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("send-object-events", testValue) + if vBool, err := cmdFlags.GetBool("send-object-events"); err == nil { + testDecodeJson_K8sPluginConfig(t, fmt.Sprintf("%v", vBool), &actual.SendObjectEvents) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) }