Skip to content

Commit

Permalink
Moved filtering logic back in the sql statement.
Browse files Browse the repository at this point in the history
Table `latest_evaluation_statuses` tracks the latest evaluation id for
any given entity/rule pair. Adding it via left join allows us to
determine which records are not the latest ones among those older than
30 days by relying totally on the database rather than doing the
processing in application code. This also lowers a little bit the
resources necessary to process deletions.
  • Loading branch information
blkt committed Jul 25, 2024
1 parent b703d6d commit 6ca77ea
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 201 deletions.
67 changes: 11 additions & 56 deletions cmd/server/app/history_purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func historyPurgeCommand(cmd *cobra.Command, _ []string) error {
}
defer closer()

threshold := time.Now().UTC().AddDate(-30, 0, 0)
// We maintain up to 30 days of history, plus any record
// that's the latest for any entity/rule pair.
threshold := time.Now().UTC().AddDate(0, 0, -30)
fmt.Printf("Calculated threshold is %s", threshold)

if err := purgeLoop(ctx, store, threshold, batchSize, dryRun, cmd.Printf); err != nil {
Expand All @@ -68,7 +70,7 @@ func historyPurgeCommand(cmd *cobra.Command, _ []string) error {
}

// purgeLoop routine cleans up the evaluation history log by deleting
// all records older than a fixed threshold.
// all stale records older than a given threshold.
//
// As of the time of this writing, the size of the row structs is 80
// bytes, specifically
Expand Down Expand Up @@ -98,9 +100,12 @@ func purgeLoop(
total := 0
deleted := 0

records, err := store.ListEvaluationHistoryOlderThan(
// Note: this command relies on the following statement
// filtering out records that, despite being older than 30
// days, are the latest ones for any given entity/rule pair.
records, err := store.ListEvaluationHistoryStaleRecords(
ctx,
db.ListEvaluationHistoryOlderThanParams{
db.ListEvaluationHistoryStaleRecordsParams{
Threshold: threshold,
Size: int32(4000000),
},
Expand All @@ -114,20 +119,12 @@ func purgeLoop(
return nil
}

total = len(records)
deletes := filterRecords(records)

if len(deletes) == 0 {
printf("No records to delete after filtering\n")
return nil
}

// Skip deletion if --dry-run was passed.
if !dryRun {
deleted, err = deleteEvaluationHistory(
ctx,
store,
deletes,
records,
batchSize,
)
if err != nil {
Expand All @@ -143,56 +140,14 @@ func purgeLoop(
return nil
}

// filterRecords sift through the records separating the latest for
// any given entity/rule combination from older ones that can be
// safely deleted.
func filterRecords(
records []db.ListEvaluationHistoryOlderThanRow,
) []db.ListEvaluationHistoryOlderThanRow {
toDelete := make([]db.ListEvaluationHistoryOlderThanRow, 0)
toKeep := make(map[string]db.ListEvaluationHistoryOlderThanRow)

for _, record := range records {
// Record key is the combination of entity type,
// entity id and rule id.
key := fmt.Sprintf("%d/%s/%s",
record.EntityType,
record.EntityID,
record.RuleID,
)
latest, found := toKeep[key]

if !found {
// Tracking record as new latest for the same
// entity/rule.
toKeep[key] = record
continue
}

if record.EvaluationTime.After(latest.EvaluationTime) {
// Swap records because one was found that was
// more recent record for entity/rule.
toDelete = append(toDelete, latest)
toKeep[key] = record
continue
}

// If execution gets this far, we delete the record.
toDelete = append(toDelete, record)
}

return toDelete
}

func deleteEvaluationHistory(
ctx context.Context,
store db.Store,
records []db.ListEvaluationHistoryOlderThanRow,
records []db.ListEvaluationHistoryStaleRecordsRow,
batchSize uint,
) (int, error) {
deleted := 0
for {
// Skip deletion if --dry-run was passed.
if len(records) == 0 {
break
}
Expand Down
139 changes: 10 additions & 129 deletions cmd/server/app/history_purge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
func TestRecordSize(t *testing.T) {
t.Parallel()
size := unsafe.Sizeof(
db.ListEvaluationHistoryOlderThanRow{
db.ListEvaluationHistoryStaleRecordsRow{
ID: uuid.Nil,
EvaluationTime: time.Now(),
EntityType: int32(1),
Expand All @@ -49,132 +49,13 @@ func TestRecordSize(t *testing.T) {
require.Equal(t, 80, int(size))
}

func TestFilterRecords(t *testing.T) {
t.Parallel()

tests := []struct {
name string
records []db.ListEvaluationHistoryOlderThanRow
expected []db.ListEvaluationHistoryOlderThanRow
}{
{
name: "older removed",
records: []db.ListEvaluationHistoryOlderThanRow{
makeHistoryRow(
uuid1,
evaluatedAt1,
entityType,
entityID1,
ruleID1,
),
makeHistoryRow(
uuid2,
evaluatedAt2,
entityType,
entityID1,
ruleID1,
),
},
expected: []db.ListEvaluationHistoryOlderThanRow{
makeHistoryRow(
uuid2,
evaluatedAt2,
entityType,
entityID1,
ruleID1,
),
},
},
{
name: "older removed bis",
records: []db.ListEvaluationHistoryOlderThanRow{
makeHistoryRow(
uuid2,
evaluatedAt2,
entityType,
entityID1,
ruleID1,
),
makeHistoryRow(
uuid1,
evaluatedAt1,
entityType,
entityID1,
ruleID1,
),
},
expected: []db.ListEvaluationHistoryOlderThanRow{
makeHistoryRow(
uuid2,
evaluatedAt2,
entityType,
entityID1,
ruleID1,
),
},
},
{
name: "all new",
records: []db.ListEvaluationHistoryOlderThanRow{
makeHistoryRow(
uuid1,
evaluatedAt1,
entityType,
entityID1,
ruleID1,
),
makeHistoryRow(
uuid2,
evaluatedAt2,
entityType,
entityID2,
ruleID2,
),
},
expected: []db.ListEvaluationHistoryOlderThanRow{},
},
{
name: "entity type",
records: []db.ListEvaluationHistoryOlderThanRow{
makeHistoryRow(
uuid1,
evaluatedAt1,
entityType, // different entity type
entityID1,
ruleID1,
),
makeHistoryRow(
uuid1,
evaluatedAt1,
int32(2), // different entity type
entityID1,
ruleID1,
),
},
expected: []db.ListEvaluationHistoryOlderThanRow{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

res := filterRecords(tt.records)
require.Len(t, res, len(tt.expected))
for i := 0; i < len(tt.expected); i++ {
require.Equal(t, tt.expected[i], res[i])
}
})
}
}

func TestDeleteEvaluationHistory(t *testing.T) {
t.Parallel()

tests := []struct {
name string
dbSetup dbf.DBMockBuilder
records []db.ListEvaluationHistoryOlderThanRow
records []db.ListEvaluationHistoryStaleRecordsRow
size uint
err bool
}{
Expand All @@ -198,7 +79,7 @@ func TestDeleteEvaluationHistory(t *testing.T) {
},
),
),
records: []db.ListEvaluationHistoryOlderThanRow{
records: []db.ListEvaluationHistoryStaleRecordsRow{
makeHistoryRow(
uuid1,
evaluatedAt1,
Expand All @@ -208,10 +89,10 @@ func TestDeleteEvaluationHistory(t *testing.T) {
),
makeHistoryRow(
uuid2,
evaluatedAt1,
evaluatedAt2,
entityType,
entityID1,
ruleID1,
entityID2,
ruleID2,
),
makeHistoryRow(
uuid.Nil,
Expand Down Expand Up @@ -270,7 +151,7 @@ func TestDeleteEvaluationHistory(t *testing.T) {
},
),
),
records: []db.ListEvaluationHistoryOlderThanRow{
records: []db.ListEvaluationHistoryStaleRecordsRow{
makeHistoryRow(
uuid1,
evaluatedAt1,
Expand Down Expand Up @@ -300,7 +181,7 @@ func TestDeleteEvaluationHistory(t *testing.T) {
},
),
),
records: []db.ListEvaluationHistoryOlderThanRow{
records: []db.ListEvaluationHistoryStaleRecordsRow{
makeHistoryRow(
uuid1,
evaluatedAt1,
Expand Down Expand Up @@ -363,8 +244,8 @@ func makeHistoryRow(
entityType int32,
entityID uuid.UUID,
ruleID uuid.UUID,
) db.ListEvaluationHistoryOlderThanRow {
return db.ListEvaluationHistoryOlderThanRow{
) db.ListEvaluationHistoryStaleRecordsRow {
return db.ListEvaluationHistoryStaleRecordsRow{
ID: id,
EvaluationTime: evaluatedAt,
EntityType: entityType,
Expand Down
14 changes: 7 additions & 7 deletions database/mock/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion database/query/eval_history.sql
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ SELECT s.id::uuid AS evaluation_id,
CASE WHEN sqlc.narg(prev)::timestamp without time zone IS NULL THEN s.evaluation_time END DESC
LIMIT sqlc.arg(size)::integer;

-- name: ListEvaluationHistoryOlderThan :many
-- name: ListEvaluationHistoryStaleRecords :many
SELECT s.evaluation_time,
s.id,
ere.rule_id,
Expand All @@ -192,7 +192,12 @@ SELECT s.evaluation_time,
) AS entity_id
FROM evaluation_statuses s
JOIN evaluation_rule_entities ere ON s.rule_entity_id = ere.id
LEFT JOIN latest_evaluation_statuses l
ON l.rule_entity_id = s.rule_entity_id
AND l.evaluation_history_id = s.id
WHERE s.evaluation_time < sqlc.arg(threshold)
-- the following predicate ensures we get only "stale" records
AND l.evaluation_history_id IS NULL
-- listing from oldest to newest
ORDER BY s.evaluation_time ASC, rule_id ASC, entity_id ASC
LIMIT sqlc.arg(size)::integer;
Expand Down
Loading

0 comments on commit 6ca77ea

Please sign in to comment.