From a4178721323f2d977b37833a72c7ca4b75bb7917 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 15 Sep 2023 15:31:19 -0700 Subject: [PATCH] Bump flytestdlib Signed-off-by: Kevin Su --- go/tasks/plugins/array/awsbatch/jobs_store.go | 4 ++++ go/tasks/plugins/hive/execution_state.go | 4 ++++ go/tasks/plugins/hive/executions_cache.go | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/go/tasks/plugins/array/awsbatch/jobs_store.go b/go/tasks/plugins/array/awsbatch/jobs_store.go index ca061c578..2a84d1619 100644 --- a/go/tasks/plugins/array/awsbatch/jobs_store.go +++ b/go/tasks/plugins/array/awsbatch/jobs_store.go @@ -49,6 +49,10 @@ type Job struct { SubJobs []*Job `json:"array,omitempty"` } +func (j Job) IsTerminal() bool { + return j.Status.Phase.IsTerminal() +} + type Attempt struct { LogStream string `json:"logStream,omitempty"` StartedAt time.Time `json:"startedAt,omitempty"` diff --git a/go/tasks/plugins/hive/execution_state.go b/go/tasks/plugins/hive/execution_state.go index d0f86f73d..2bb4826ad 100644 --- a/go/tasks/plugins/hive/execution_state.go +++ b/go/tasks/plugins/hive/execution_state.go @@ -36,6 +36,10 @@ const ( PhaseQueryFailed ) +func (p ExecutionPhase) IsTerminal() bool { + return p == PhaseQuerySucceeded || p == PhaseQueryFailed +} + func (p ExecutionPhase) String() string { switch p { case PhaseNotStarted: diff --git a/go/tasks/plugins/hive/executions_cache.go b/go/tasks/plugins/hive/executions_cache.go index e80e723a8..83c6f2be3 100644 --- a/go/tasks/plugins/hive/executions_cache.go +++ b/go/tasks/plugins/hive/executions_cache.go @@ -64,6 +64,10 @@ func (e ExecutionStateCacheItem) ID() string { return e.Identifier } +func (e ExecutionStateCacheItem) IsTerminal() bool { + return e.ExecutionState.Phase.IsTerminal() +} + // This basically grab an updated status from the Qubole API and store it in the cache // All other handling should be in the synchronous loop. func (q *QuboleHiveExecutionsCache) SyncQuboleQuery(ctx context.Context, batch cache.Batch) (