Skip to content

Commit

Permalink
perf: improve task stats IMAGEPULL performance (#8067)
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasBlaskey authored Oct 9, 2023
1 parent e7751fe commit 469c2ae
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 18 deletions.
1 change: 1 addition & 0 deletions agent/internal/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ func (c *Container) shimDockerEvents() events.Publisher[docker.Event] {
TaskType: model.TaskType(c.spec.TaskType),
Stats: &model.TaskStats{
AllocationID: c.allocationID,
ContainerID: &c.containerID,
EventType: e.Stats.Kind,
StartTime: e.Stats.StartTime,
EndTime: e.Stats.EndTime,
Expand Down
1 change: 1 addition & 0 deletions master/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ linters-settings:
forbid:
- 'fmt\.Print.*'
- 'metaV1.NamespaceAll' # Will error if someone has namespace restricted permissions.
- 'bundebug.WithVerbose'
staticcheck:
go: "1.20"

Expand Down
39 changes: 23 additions & 16 deletions master/internal/db/postgres_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,22 +431,25 @@ func (db *PgDB) RecordTaskEndStats(stats *model.TaskStats) error {

// RecordTaskEndStatsBun record end stats for tasks with bun.
func RecordTaskEndStatsBun(stats *model.TaskStats) error {
_, err := Bun().NewRaw(`UPDATE task_stats AS t
SET end_time = ?
FROM (
SELECT allocation_id, event_type, end_time
FROM task_stats
Where allocation_id = ? AND event_type = ? AND end_time IS NULL
ORDER BY start_time
FOR UPDATE
) AS t2
WHERE t.allocation_id = t2.allocation_id AND t.event_type = t2.event_type AND t.end_time IS NULL`,
stats.EndTime,
stats.AllocationID,
stats.EventType).
Exec(context.TODO())
query := Bun().NewUpdate().Model(stats).Column("end_time").
Where("allocation_id = ?", stats.AllocationID).
Where("event_type = ?", stats.EventType).
Where("end_time IS NULL")
if stats.ContainerID == nil {
// Just doing Where("container_id = ?", stats.ContainerID) in the null case
// generates WHERE container_id = NULL which doesn't seem to match on null rows.
// We don't use this case anywhere currently but this feels like an easy bug to write
// without this.
query = query.Where("container_id IS NULL")
} else {
query = query.Where("container_id = ?", stats.ContainerID)
}

return err
if _, err := query.Exec(context.TODO()); err != nil {
return fmt.Errorf("recording task end stats %+v: %w", stats, err)
}

return nil
}

// EndAllTaskStats called at master starts, in case master previously crashed.
Expand All @@ -457,7 +460,11 @@ FROM cluster_id, allocations
WHERE allocations.allocation_id = task_stats.allocation_id
AND allocations.end_time IS NOT NULL
AND task_stats.end_time IS NULL`)
return err
if err != nil {
return fmt.Errorf("ending all task stats: %w", err)
}

return nil
}

// TaskLogsFields returns the unique fields that can be filtered on for the given task.
Expand Down
48 changes: 48 additions & 0 deletions master/internal/db/postgres_tasks_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package db

import (
"context"
"encoding/json"
"fmt"
"go/ast"
Expand All @@ -19,6 +20,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/determined-ai/determined/master/pkg/cproto"
"github.com/determined-ai/determined/master/pkg/etc"
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/master/pkg/ptrs"
Expand Down Expand Up @@ -121,6 +123,52 @@ func TestJobTaskAndAllocationAPI(t *testing.T) {
require.True(t, reflect.DeepEqual(aIn, aOut), pprintedExpect(aIn, aOut))
}

func TestRecordAndEndTaskStats(t *testing.T) {
require.NoError(t, etc.SetRootPath(RootFromDB))
db := MustResolveTestPostgres(t)
MustMigrateTestPostgres(t, db, MigrationsFromDB)

tID := model.NewTaskID()
require.NoError(t, db.AddTask(&model.Task{
TaskID: tID,
TaskType: model.TaskTypeTrial,
StartTime: time.Now().UTC().Truncate(time.Millisecond),
}), "failed to add task")

allocationID := model.AllocationID(tID + "allocationID")
require.NoError(t, db.AddAllocation(&model.Allocation{
TaskID: tID,
AllocationID: allocationID,
}), "failed to add allocation")

var expected []*model.TaskStats
for i := 0; i < 3; i++ {
taskStats := &model.TaskStats{
AllocationID: allocationID,
EventType: "IMAGEPULL",
ContainerID: ptrs.Ptr(cproto.NewID()),
StartTime: ptrs.Ptr(time.Now().Truncate(time.Millisecond)),
}
if i == 0 {
taskStats.ContainerID = nil
}
require.NoError(t, RecordTaskStatsBun(taskStats))

taskStats.EndTime = ptrs.Ptr(time.Now().Truncate(time.Millisecond))
require.NoError(t, RecordTaskEndStatsBun(taskStats))
expected = append(expected, taskStats)
}

var actual []*model.TaskStats
err := Bun().NewSelect().
Model(&actual).
Where("allocation_id = ?", allocationID).
Scan(context.TODO(), &actual)
require.NoError(t, err)

require.ElementsMatch(t, expected, actual)
}

func TestAllocationState(t *testing.T) {
require.NoError(t, etc.SetRootPath(RootFromDB))
db := MustResolveTestPostgres(t)
Expand Down
10 changes: 8 additions & 2 deletions master/pkg/model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/uptrace/bun"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/determined-ai/determined/master/pkg/cproto"
"github.com/determined-ai/determined/master/pkg/ptrs"
"github.com/determined-ai/determined/master/pkg/tasklog"
"github.com/determined-ai/determined/proto/pkg/apiv1"
Expand Down Expand Up @@ -137,8 +138,13 @@ type AllocationState string
type TaskStats struct {
AllocationID AllocationID
EventType string
StartTime *time.Time
EndTime *time.Time
// ContainerID is sent by the agent. This won't always be present in the database
// This is a weird table since sometimes it is one row per allocation
// (like in record queued stats) and sometimes it is many per allocation like in
// pulled time.
ContainerID *cproto.ID
StartTime *time.Time
EndTime *time.Time
}

// ResourceAggregates is the model for resource_aggregates in the database.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE task_stats
DROP COLUMN container_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE task_stats
ADD COLUMN container_id TEXT;

CREATE INDEX idx_task_stats_container_id_id ON task_stats USING btree (container_id)
WHERE container_id IS NOT NULL;

0 comments on commit 469c2ae

Please sign in to comment.