Skip to content

Commit

Permalink
Fix the Garbage collection problem (flyteorg#419)
Browse files Browse the repository at this point in the history
* change label selector op

Signed-off-by: Lisa <[email protected]>
Signed-off-by: aeioulisa <[email protected]>

* lint error

Signed-off-by: Lisa <[email protected]>
Signed-off-by: aeioulisa <[email protected]>

* test error

Signed-off-by: Lisa <[email protected]>
Signed-off-by: aeioulisa <[email protected]>

* revert para name and keep old function

Signed-off-by: Lisa <[email protected]>
Signed-off-by: aeioulisa <[email protected]>

* remove strconv.Itoa

Signed-off-by: aeioulisa <[email protected]>

* two lebal

Signed-off-by: aeioulisa <[email protected]>

* two lebal

Signed-off-by: aeioulisa <[email protected]>

* reset config and add split function

Signed-off-by: aeioulisa <[email protected]>
  • Loading branch information
aeioulisa authored Apr 15, 2022
1 parent 157cb56 commit d3bef3c
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 36 deletions.
37 changes: 35 additions & 2 deletions pkg/controller/completed_workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"strconv"
"strings"
"time"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
Expand All @@ -12,6 +13,7 @@ const controllerAgentName = "flyteworkflow-controller"
const workflowTerminationStatusKey = "termination-status"
const workflowTerminatedValue = "terminated"
const hourOfDayCompletedKey = "hour-of-day"
const completedTimeKey = "completed-time"

// This function creates a label selector, that will ignore all objects (in this case workflow) that DOES NOT have a
// label key=workflowTerminationStatusKey with a value=workflowTerminatedValue
Expand Down Expand Up @@ -41,7 +43,7 @@ func SetCompletedLabel(w *v1alpha1.FlyteWorkflow, currentTime time.Time) {
w.Labels = make(map[string]string)
}
w.Labels[workflowTerminationStatusKey] = workflowTerminatedValue
w.Labels[hourOfDayCompletedKey] = strconv.Itoa(currentTime.Hour())
w.Labels[completedTimeKey] = SplitTimeToMeetsLabelFormat(currentTime)
}

func HasCompletedLabel(w *v1alpha1.FlyteWorkflow) bool {
Expand Down Expand Up @@ -73,10 +75,36 @@ func CalculateHoursToDelete(retentionPeriodHours, currentHourOfDay int) []string
return hoursToDelete
}

//Calculates a list of all the hours that should be kept given the current time and the retentionperiod in hours
func CalculateHoursToKeep(retentionPeriodHours int, currentTime time.Time) []string {
hoursToKeep := make([]string, 0, retentionPeriodHours+1)
for i := 0; i <= retentionPeriodHours; i++ {
hoursToKeep = append(hoursToKeep, SplitTimeToMeetsLabelFormat(currentTime))
currentTime = currentTime.Add(-1 * time.Hour)
}
return hoursToKeep
}

// Creates a new selector that selects all completed workflows and workflows with completed hour label outside of the
// retention window
func CompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector {
hoursToDelete := CalculateHoursToDelete(retentionPeriodHours, currentTime.Hour())
hoursToKeep := CalculateHoursToKeep(retentionPeriodHours, currentTime)
s := CompletedWorkflowsLabelSelector()
s.MatchExpressions = append(s.MatchExpressions, v1.LabelSelectorRequirement{
Key: completedTimeKey,
Operator: v1.LabelSelectorOpNotIn,
Values: hoursToKeep,
})
s.MatchExpressions = append(s.MatchExpressions, v1.LabelSelectorRequirement{
Key: hourOfDayCompletedKey,
Operator: v1.LabelSelectorOpIn,
Values: []string{""},
})
return s
}

func CompletedWorkflowsSelectorOutsideRetentionPeriodAbandon(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector {
hoursToDelete := CalculateHoursToDelete(retentionPeriodHours-1, currentTime.Hour())
s := CompletedWorkflowsLabelSelector()
s.MatchExpressions = append(s.MatchExpressions, v1.LabelSelectorRequirement{
Key: hourOfDayCompletedKey,
Expand All @@ -85,3 +113,8 @@ func CompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int,
})
return s
}

func SplitTimeToMeetsLabelFormat(currentTime time.Time) string {
return strings.ReplaceAll(
strings.Split(currentTime.Round(time.Hour).String(), ":")[0], " ", ".")
}
51 changes: 21 additions & 30 deletions pkg/controller/completed_workflows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,45 +102,36 @@ func TestSetCompletedLabel(t *testing.T) {

func TestCalculateHoursToDelete(t *testing.T) {
assert.Equal(t, []string{
"6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22",
}, CalculateHoursToDelete(6, 5))
"2009-11-10.06", "2009-11-10.05", "2009-11-10.04", "2009-11-10.03", "2009-11-10.02", "2009-11-10.01", "2009-11-10.00",
}, CalculateHoursToKeep(6, time.Date(2009, time.November, 10, 6, 0, 0, 0, time.UTC)))

assert.Equal(t, []string{
"7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23",
}, CalculateHoursToDelete(6, 6))
"2009-10-01.03", "2009-10-01.02", "2009-10-01.01", "2009-10-01.00", "2009-09-30.23", "2009-09-30.22", "2009-09-30.21",
}, CalculateHoursToKeep(6, time.Date(2009, time.October, 1, 3, 0, 0, 0, time.UTC)))

assert.Equal(t, []string{
"0", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23",
}, CalculateHoursToDelete(6, 7))
"2009-01-01.00", "2008-12-31.23", "2008-12-31.22", "2008-12-31.21", "2008-12-31.20", "2008-12-31.19", "2008-12-31.18",
}, CalculateHoursToKeep(6, time.Date(2009, time.January, 1, 0, 0, 0, 0, time.UTC)))

assert.Equal(t, []string{
"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "23",
}, CalculateHoursToDelete(6, 22))
"2009-11-10.22", "2009-11-10.21", "2009-11-10.20", "2009-11-10.19", "2009-11-10.18", "2009-11-10.17", "2009-11-10.16",
}, CalculateHoursToKeep(6, time.Date(2009, time.November, 10, 22, 0, 0, 0, time.UTC)))

assert.Equal(t, []string{
"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16",
}, CalculateHoursToDelete(6, 23))
"2009-11-10.23", "2009-11-10.22", "2009-11-10.21", "2009-11-10.20", "2009-11-10.19", "2009-11-10.18", "2009-11-10.17",
}, CalculateHoursToKeep(6, time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)))

assert.Equal(t, []string{
"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22",
}, CalculateHoursToDelete(0, 23))
assert.Equal(t, []string{"2009-11-10.20", "2009-11-10.19"}, CalculateHoursToKeep(1, time.Date(2009, time.November, 10, 20, 0, 0, 0, time.UTC)))
assert.Equal(t, []string{"2009-11-10.23", "2009-11-10.22", "2009-11-10.21"}, CalculateHoursToKeep(2, time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)))

assert.Equal(t, []string{
"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "21", "22", "23",
}, CalculateHoursToDelete(0, 20))

"2009-11-10.12", "2009-11-10.11", "2009-11-10.10", "2009-11-10.09", "2009-11-10.08", "2009-11-10.07", "2009-11-10.06", "2009-11-10.05", "2009-11-10.04", "2009-11-10.03", "2009-11-10.02", "2009-11-10.01", "2009-11-10.00", "2009-11-09.23", "2009-11-09.22", "2009-11-09.21", "2009-11-09.20", "2009-11-09.19", "2009-11-09.18", "2009-11-09.17", "2009-11-09.16", "2009-11-09.15", "2009-11-09.14",
}, CalculateHoursToKeep(22, time.Date(2009, time.November, 10, 12, 0, 0, 0, time.UTC)))
assert.Equal(t, []string{
"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23",
}, CalculateHoursToDelete(0, 0))
"2009-11-10.00", "2009-11-09.23", "2009-11-09.22", "2009-11-09.21", "2009-11-09.20", "2009-11-09.19", "2009-11-09.18", "2009-11-09.17", "2009-11-09.16", "2009-11-09.15", "2009-11-09.14", "2009-11-09.13", "2009-11-09.12", "2009-11-09.11", "2009-11-09.10", "2009-11-09.09", "2009-11-09.08", "2009-11-09.07", "2009-11-09.06", "2009-11-09.05", "2009-11-09.04", "2009-11-09.03", "2009-11-09.02",
}, CalculateHoursToKeep(22, time.Date(2009, time.November, 10, 0, 0, 0, 0, time.UTC)))

assert.Equal(t, []string{
"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23",
}, CalculateHoursToDelete(0, 12))

assert.Equal(t, []string{"13"}, CalculateHoursToDelete(22, 12))
assert.Equal(t, []string{"1"}, CalculateHoursToDelete(22, 0))
assert.Equal(t, []string{"0"}, CalculateHoursToDelete(22, 23))
assert.Equal(t, []string{"23"}, CalculateHoursToDelete(22, 22))
assert.Equal(t, []string{"2022-03-30.12", "2022-03-30.11"}, CalculateHoursToKeep(1, time.Date(2022, time.March, 30, 12, 10, 0, 0, time.UTC)))
}

func TestCompletedWorkflowsSelectorOutsideRetentionPeriod(t *testing.T) {
Expand All @@ -151,10 +142,10 @@ func TestCompletedWorkflowsSelectorOutsideRetentionPeriod(t *testing.T) {
assert.Equal(t, workflowTerminatedValue, v)
assert.NotEmpty(t, s.MatchExpressions)
r := s.MatchExpressions[0]
assert.Equal(t, hourOfDayCompletedKey, r.Key)
assert.Equal(t, v1.LabelSelectorOpIn, r.Operator)
assert.Equal(t, 21, len(r.Values))
assert.Equal(t, completedTimeKey, r.Key)
assert.Equal(t, v1.LabelSelectorOpNotIn, r.Operator)
assert.Equal(t, 3, len(r.Values))
assert.Equal(t, []string{
"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20",
"2009-11-10.23", "2009-11-10.22", "2009-11-10.21",
}, r.Values)
}
39 changes: 38 additions & 1 deletion pkg/controller/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,41 @@ type GarbageCollector struct {
// Issues a background deletion command with label selector for all completed workflows outside of the retention period
func (g *GarbageCollector) deleteWorkflows(ctx context.Context) error {

s := CompletedWorkflowsSelectorOutsideRetentionPeriod(g.ttlHours-1, g.clk.Now())
s := CompletedWorkflowsSelectorOutsideRetentionPeriod(g.ttlHours, g.clk.Now())

// Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" {
namespaceList, err := g.namespaceClient.List(ctx, v1.ListOptions{})
if err != nil {
return err
}
for _, n := range namespaceList.Items {
namespaceCtx := contextutils.WithNamespace(ctx, n.GetName())
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", n.GetName())

if err := g.deleteWorkflowsForNamespace(ctx, n.GetName(), s); err != nil {
g.metrics.gcRoundFailure.Inc(namespaceCtx)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", n.GetName(), err)
} else {
g.metrics.gcRoundSuccess.Inc(namespaceCtx)
}
}
} else {
namespaceCtx := contextutils.WithNamespace(ctx, g.namespace)
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", g.namespace)
if err := g.deleteWorkflowsForNamespace(ctx, g.namespace, s); err != nil {
g.metrics.gcRoundFailure.Inc(namespaceCtx)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", g.namespace, err)
} else {
g.metrics.gcRoundSuccess.Inc(namespaceCtx)
}
}
return nil
}

func (g *GarbageCollector) deleteWorkflowsAbandon(ctx context.Context) error {

s := CompletedWorkflowsSelectorOutsideRetentionPeriodAbandon(g.ttlHours, g.clk.Now())

// Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" {
Expand Down Expand Up @@ -100,6 +134,9 @@ func (g *GarbageCollector) runGC(ctx context.Context, ticker clock.Ticker) {
case <-ticker.C():
logger.Infof(ctx, "Garbage collector running...")
t := g.metrics.gcTime.Start(ctx)
if err := g.deleteWorkflowsAbandon(ctx); err != nil {
logger.Errorf(ctx, "Garbage collection failed in this round.Error : [%v]", err)
}
if err := g.deleteWorkflows(ctx); err != nil {
logger.Errorf(ctx, "Garbage collection failed in this round.Error : [%v]", err)
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -101,7 +102,11 @@ func TestGarbageCollector_StartGC(t *testing.T) {
DeleteCollectionCb: func(options *v1.DeleteOptions, listOptions v1.ListOptions) error {
assert.NotNil(t, options)
assert.NotNil(t, listOptions)
assert.Equal(t, "hour-of-day in (0,1,10,11,12,13,14,15,16,17,18,19,2,20,21,3,4,5,6,7,8,9),termination-status=terminated", listOptions.LabelSelector)
if strings.HasPrefix(listOptions.LabelSelector, "completed-time") {
assert.Equal(t, "completed-time notin (2009-11-10.22,2009-11-10.23,2009-11-11.00),hour-of-day in (),termination-status=terminated", listOptions.LabelSelector)
} else {
assert.Equal(t, "hour-of-day in (0,1,10,11,12,13,14,15,16,17,18,19,2,20,21,3,4,5,6,7,8,9),termination-status=terminated", listOptions.LabelSelector)
}
wg.Done()
return nil
},
Expand Down Expand Up @@ -145,7 +150,7 @@ func TestGarbageCollector_StartGC(t *testing.T) {
mockNamespaceInvoked = false
gc, err := NewGarbageCollector(cfg, promutils.NewTestScope(), fakeClock, mockNamespaceClient, mockClient)
assert.NoError(t, err)
wg.Add(1)
wg.Add(2)
ctx := context.TODO()
ctx, cancel := context.WithCancel(ctx)
assert.NoError(t, gc.StartGC(ctx))
Expand All @@ -166,7 +171,7 @@ func TestGarbageCollector_StartGC(t *testing.T) {
mockNamespaceInvoked = false
gc, err := NewGarbageCollector(cfg, promutils.NewTestScope(), fakeClock, mockNamespaceClient, mockClient)
assert.NoError(t, err)
wg.Add(2)
wg.Add(4)
ctx := context.TODO()
ctx, cancel := context.WithCancel(ctx)
assert.NoError(t, gc.StartGC(ctx))
Expand Down

0 comments on commit d3bef3c

Please sign in to comment.