Skip to content

Commit

Permalink
Persist checkpoints in evaluation history db. Try 2 (#4083)
Browse files Browse the repository at this point in the history
This starts adding ingest checkpoint data into the history log.

The main benefit here is that we'd have commit/branch information for git-related ingests.

artifacts require a little more work and will be handled in a separate PR.

Signed-off-by: Juan Antonio Osorio <[email protected]>
  • Loading branch information
JAORMX authored Aug 5, 2024
1 parent 3c7dd9d commit c60ecff
Show file tree
Hide file tree
Showing 17 changed files with 123 additions and 20 deletions.
6 changes: 4 additions & 2 deletions database/query/eval_history.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ RETURNING id;
INSERT INTO evaluation_statuses(
rule_entity_id,
status,
details
details,
checkpoint
) VALUES (
$1,
$2,
$3
$3,
sqlc.arg(checkpoint)::jsonb
)
RETURNING id;

Expand Down
14 changes: 11 additions & 3 deletions internal/db/eval_history.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions internal/engine/eval_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package engine
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"

Expand Down Expand Up @@ -142,6 +143,12 @@ func (e *executor) createOrUpdateEvalStatus(
alertStatus := evalerrors.ErrorAsAlertStatus(params.GetActionsErr().AlertErr)
e.metrics.CountAlertStatus(ctx, alertStatus)

chckpoint := params.GetIngestResult().GetCheckpoint()
chkpjs, err := chckpoint.ToJSONorDefault(json.RawMessage(`{}`))
if err != nil {
logger.Err(err).Msg("error marshalling checkpoint")
}

// Log result in the evaluation history tables
err = e.querier.WithTransactionErr(func(qtx db.ExtendQuerier) error {
evalID, err := e.historyService.StoreEvaluationStatus(
Expand All @@ -152,6 +159,7 @@ func (e *executor) createOrUpdateEvalStatus(
params.EntityType,
entityID,
params.GetEvalErr(),
chkpjs,
)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion internal/engine/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func TestExecutor_handleEntityEvent(t *testing.T) {
evaluationID := uuid.New()
historyService := mockhistory.NewMockEvaluationHistoryService(ctrl)
historyService.EXPECT().
StoreEvaluationStatus(gomock.Any(), gomock.Any(), ruleInstanceID, profileID, db.EntitiesRepository, repositoryID, gomock.Any()).
StoreEvaluationStatus(
gomock.Any(), gomock.Any(), ruleInstanceID, profileID, db.EntitiesRepository, repositoryID, gomock.Any(), gomock.Any()).
Return(evaluationID, nil)

mockStore.EXPECT().
Expand Down
9 changes: 9 additions & 0 deletions internal/engine/ingester/artifact/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

evalerrors "github.com/stacklok/minder/internal/engine/errors"
engif "github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/entities/checkpoints"
artif "github.com/stacklok/minder/internal/providers/artifact"
"github.com/stacklok/minder/internal/verifier"
"github.com/stacklok/minder/internal/verifier/sigstore/container"
Expand Down Expand Up @@ -115,6 +116,14 @@ func (i *Ingest) Ingest(

return &engif.Result{
Object: applicable,
// We would ideally return an artifact's digest here, but
// the current state of the artifact ingester is actually evaluating
// multiple artifacts at the same time. This is not ideal, ideally
// we should evaluate one impulse at a time. This has to be fixed,
// but for now we return the current time as the checkpoint.
// We need to track the "impulse" that triggered the evaluation
// so we can return the correct checkpoint.
Checkpoint: checkpoints.NewCheckpointV1Now(),
}, nil
}

Expand Down
4 changes: 3 additions & 1 deletion internal/engine/ingester/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

evalerrors "github.com/stacklok/minder/internal/engine/errors"
engif "github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/entities/checkpoints"
"github.com/stacklok/minder/internal/util"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
"github.com/stacklok/minder/pkg/rule_methods"
Expand Down Expand Up @@ -110,7 +111,8 @@ func (idi *BuiltinRuleDataIngest) Ingest(ctx context.Context, ent protoreflect.P
}

return &engif.Result{
Object: resultObj,
Object: resultObj,
Checkpoint: checkpoints.NewCheckpointV1Now(),
}, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/engine/ingester/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/rs/zerolog"
"google.golang.org/protobuf/reflect/protoreflect"

engif "github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/entities/checkpoints"
pbinternal "github.com/stacklok/minder/internal/proto"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
provifv1 "github.com/stacklok/minder/pkg/providers/v1"
Expand Down Expand Up @@ -122,6 +124,8 @@ func (di *Diff) Ingest(
Pr: pr,
Deps: allDiffs,
},
// NOTE: At this point we're only retrieving the timestamp as the checkpoint.
Checkpoint: checkpoints.NewCheckpointV1(time.Now()),
}, nil

case pb.DiffTypeFull:
Expand Down Expand Up @@ -152,6 +156,8 @@ func (di *Diff) Ingest(
Pr: pr,
Files: allDiffs,
},
// NOTE: At this point we're only retrieving the timestamp as the checkpoint.
Checkpoint: checkpoints.NewCheckpointV1Now(),
}, nil

default:
Expand Down
19 changes: 16 additions & 3 deletions internal/engine/ingester/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

engerrors "github.com/stacklok/minder/internal/engine/errors"
engif "github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/entities/checkpoints"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
provifv1 "github.com/stacklok/minder/pkg/providers/v1"
)
Expand Down Expand Up @@ -102,10 +103,22 @@ func (gi *Git) Ingest(ctx context.Context, ent protoreflect.ProtoMessage, params
return nil, fmt.Errorf("could not get worktree: %w", err)
}

head, err := r.Head()
if err != nil {
return nil, fmt.Errorf("could not get head: %w", err)
}

hsh := head.Hash()

chkpoint := checkpoints.NewCheckpointV1Now().
WithBranch(branch).
WithCommitHash(hsh.String())

return &engif.Result{
Object: nil,
Fs: wt.Filesystem,
Storer: r.Storer,
Object: nil,
Fs: wt.Filesystem,
Storer: r.Storer,
Checkpoint: chkpoint,
}, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/engine/ingester/git/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestGitIngestWithCloneURLFromRepo(t *testing.T) {
require.NoError(t, err, "expected no error")
require.NotNil(t, got, "expected non-nil result")
require.NotNil(t, got.Fs, "expected non-nil fs")
require.NotNil(t, got.Checkpoint, "expected non-nil checkpoint")

fs := got.Fs
f, err := fs.Open("README")
Expand All @@ -65,6 +66,11 @@ func TestGitIngestWithCloneURLFromRepo(t *testing.T) {
require.NoError(t, err, "expected no error")

require.Contains(t, buf.String(), "Hello World", "expected README.md to contain Hello World")

require.NotNil(t, got.Checkpoint.Checkpoint.Branch, "expected non-nil branch")
require.Equal(t, "master", *got.Checkpoint.Checkpoint.Branch, "expected branch to be master")

require.NotNil(t, got.Checkpoint.Checkpoint.CommitHash, "expected non-nil commit")
}

func TestGitIngestWithCloneURLFromParams(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion internal/engine/ingester/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"

engif "github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/entities/checkpoints"
"github.com/stacklok/minder/internal/util"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
provifv1 "github.com/stacklok/minder/pkg/providers/v1"
Expand Down Expand Up @@ -154,7 +155,8 @@ func (rdi *Ingestor) Ingest(ctx context.Context, ent protoreflect.ProtoMessage,
}

return &engif.Result{
Object: data,
Object: data,
Checkpoint: checkpoints.NewCheckpointV1Now(),
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/engine/ingester/rest/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func TestRestIngest(t *testing.T) {
}

require.NoError(t, err, "unexpected error creating remediate engine")
require.Equal(t, tt.ingResultFn(), result, "unexpected result")
require.Equal(t, tt.ingResultFn().Object, result.Object, "unexpected result")
})
}
}
Expand Down
14 changes: 14 additions & 0 deletions internal/engine/interfaces/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/stacklok/minder/internal/db"
evalerrors "github.com/stacklok/minder/internal/engine/errors"
"github.com/stacklok/minder/internal/entities/checkpoints"
"github.com/stacklok/minder/internal/profiles/models"
)

Expand Down Expand Up @@ -60,6 +61,19 @@ type Result struct {
// FIXME: It might be cleaner to either wrap both Fs and Storer in a struct
// or pass out the git.Repository structure instead of the storer.
Storer storage.Storer

// Checkpoint is the checkpoint at which the ingestion was done. This is
// used to persist the state of the entity at ingestion time.
Checkpoint *checkpoints.CheckpointEnvelopeV1
}

// GetCheckpoint returns the checkpoint of the result
func (r *Result) GetCheckpoint() *checkpoints.CheckpointEnvelopeV1 {
if r == nil {
return nil
}

return r.Checkpoint
}

// ActionType represents the type of action, i.e., remediate, alert, etc.
Expand Down
29 changes: 28 additions & 1 deletion internal/entities/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
// Package checkpoints contains logic relating to checkpoint management for entities
package checkpoints

import "time"
import (
"encoding/json"
"time"
)

// V1 is the version string for the v1 format.
const V1 = "v1"
Expand Down Expand Up @@ -47,6 +50,11 @@ type CheckpointV1 struct {
Digest *string `json:"digest,omitempty" yaml:"digest,omitempty"`
}

// NewCheckpointV1Now creates a new CheckpointV1 with the current time.
func NewCheckpointV1Now() *CheckpointEnvelopeV1 {
return NewCheckpointV1(time.Now())
}

// NewCheckpointV1 creates a new CheckpointV1 with the given timestamp.
func NewCheckpointV1(timestamp time.Time) *CheckpointEnvelopeV1 {
return &CheckpointEnvelopeV1{
Expand Down Expand Up @@ -80,3 +88,22 @@ func (c *CheckpointEnvelopeV1) WithDigest(digest string) *CheckpointEnvelopeV1 {
c.Checkpoint.Digest = &digest
return c
}

// ToJSON marshals the checkpoint to JSON.
func (c *CheckpointEnvelopeV1) ToJSON() (json.RawMessage, error) {
return json.Marshal(c)
}

// ToJSONorDefault marshals the checkpoint to JSON or returns a default value.
func (c *CheckpointEnvelopeV1) ToJSONorDefault(def json.RawMessage) (json.RawMessage, error) {
if c == nil {
return def, nil
}

js, err := c.ToJSON()
if err != nil {
return def, err
}

return js, nil
}
2 changes: 1 addition & 1 deletion internal/entities/checkpoints/checkpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestCheckpointEnvelopeV1_MarshalJSON(t *testing.T) {
t.Parallel()

// Marshal the input to JSON
output, err := json.Marshal(tt.input)
output, err := tt.input.ToJSON()
require.NoError(t, err)

assert.Equal(t, string(output), tt.expected)
Expand Down
8 changes: 4 additions & 4 deletions internal/history/mock/service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion internal/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type EvaluationHistoryService interface {
entityType db.Entities,
entityID uuid.UUID,
evalError error,
marshaledCheckpoint []byte,
) (uuid.UUID, error)
// ListEvaluationHistory returns a list of evaluations stored
// in the history table.
Expand Down Expand Up @@ -69,6 +70,7 @@ func (e *evaluationHistoryService) StoreEvaluationStatus(
entityType db.Entities,
entityID uuid.UUID,
evalError error,
marshaledCheckpoint []byte,
) (uuid.UUID, error) {
var ruleEntityID uuid.UUID
status := evalerrors.ErrorAsEvalStatus(evalError)
Expand Down Expand Up @@ -110,7 +112,7 @@ func (e *evaluationHistoryService) StoreEvaluationStatus(
ruleEntityID = latestRecord.RuleEntityID
}

evaluationID, err := e.createNewStatus(ctx, qtx, ruleEntityID, profileID, status, details)
evaluationID, err := e.createNewStatus(ctx, qtx, ruleEntityID, profileID, status, details, marshaledCheckpoint)
if err != nil {
return uuid.Nil, fmt.Errorf("error while creating new evaluation status for rule/entity %s: %w", ruleEntityID, err)
}
Expand All @@ -125,12 +127,14 @@ func (_ *evaluationHistoryService) createNewStatus(
profileID uuid.UUID,
status db.EvalStatusTypes,
details string,
marshaledCheckpoint []byte,
) (uuid.UUID, error) {
newEvaluationID, err := qtx.InsertEvaluationStatus(ctx,
db.InsertEvaluationStatusParams{
RuleEntityID: ruleEntityID,
Status: status,
Details: details,
Checkpoint: marshaledCheckpoint,
},
)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion internal/history/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func TestStoreEvaluationStatus(t *testing.T) {
}

service := NewEvaluationHistoryService()
id, err := service.StoreEvaluationStatus(ctx, store, ruleID, profileID, scenario.EntityType, entityID, errTest)
id, err := service.StoreEvaluationStatus(
ctx, store, ruleID, profileID, scenario.EntityType, entityID, errTest, []byte("{}"))
if scenario.ExpectedError == "" {
require.Equal(t, evaluationID, id)
require.NoError(t, err)
Expand Down

0 comments on commit c60ecff

Please sign in to comment.