Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

Commit

Permalink
feat: Send telemetry about failed COPY FROMs (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
irmatov authored Jun 30, 2022
1 parent c8fb1bf commit 8c5a329
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 5 deletions.
10 changes: 10 additions & 0 deletions provider/diag/diagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
SCHEMA
INTERNAL
USER
TELEMETRY
)

const (
Expand All @@ -42,6 +43,13 @@ const (
PANIC
)

const (
// diagnostics labels for telemetry events
CopyFromFailed = "copy_from_failed"
BulkInsertFailed = "bulk_insert_failed"
InsertFailed = "insert_failed"
)

func (s Severity) String() string {
switch s {
case IGNORE:
Expand Down Expand Up @@ -71,6 +79,8 @@ func (d Type) String() string {
return "User"
case INTERNAL:
return "Internal"
case TELEMETRY:
return "Telemetry"
case UNKNOWN:
fallthrough
default:
Expand Down
6 changes: 6 additions & 0 deletions provider/diag/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package diag

func TelemetryFromError(err error, eventType string, opts ...BaseErrorOption) Diagnostic {
opts = append([]BaseErrorOption{WithSeverity(IGNORE), WithDetails(eventType)}, opts...)
return NewBaseError(err, TELEMETRY, opts...)
}
24 changes: 21 additions & 3 deletions provider/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,34 +301,52 @@ func (e TableExecutor) resolveResources(ctx context.Context, meta schema.ClientM
// saveToStorage copies resource data to source, it has ways of inserting, first it tries the most performant CopyFrom if that does work it bulk inserts,
// finally it inserts each resource separately, appending errors for each failed resource, only successfully inserted resources are returned
func (e TableExecutor) saveToStorage(ctx context.Context, resources schema.Resources, shouldCascade bool) (schema.Resources, diag.Diagnostics) {
var diags diag.Diagnostics
if l := len(resources); l > 0 {
e.Logger.Debug("storing resources", "count", l)
}
err := e.Db.CopyFrom(ctx, resources, shouldCascade)
if err == nil {
return resources, nil
return resources, diags
}
e.Logger.Warn("failed copy-from to db", "error", err)
diags = diags.Add(diag.TelemetryFromError(err, diag.CopyFromFailed))

// fallback insert, copy from sometimes does problems, so we fall back with bulk insert
err = e.Db.Insert(ctx, e.Table, resources, shouldCascade)
if err == nil {
return resources, nil
return resources, diags
}
e.Logger.Error("failed insert to db", "error", err)
diags = diags.Add(diag.TelemetryFromError(err, diag.BulkInsertFailed))
// Setup diags, adding first diagnostic that bulk insert failed
diags := diag.Diagnostics{}.Add(ClassifyError(err, diag.WithType(diag.DATABASE), diag.WithSummary("failed bulk insert on table %q", e.Table.Name)))
diags = diags.Add(ClassifyError(err, diag.WithType(diag.DATABASE), diag.WithSummary("failed bulk insert on table %q", e.Table.Name)))
// Try to insert resource by resource if partial fetch is enabled and an error occurred
partialFetchResources := make(schema.Resources, 0)
var failed error
failedCount := 0
for id := range resources {
if err := e.Db.Insert(ctx, e.Table, schema.Resources{resources[id]}, shouldCascade); err != nil {
failed = err
failedCount++
e.Logger.Error("failed to insert resource into db", "error", err, "resource_keys", resources[id].PrimaryKeyValues())
diags = diags.Add(ClassifyError(err, diag.WithType(diag.DATABASE)))
continue
}
// If there is no error we add the resource to the final result
partialFetchResources = append(partialFetchResources, resources[id])
}
if failed != nil {
msg := "all resources"
if failedCount < len(resources) {
msg = "some resources"
}
diags = diags.Add(diag.TelemetryFromError(
failed,
diag.InsertFailed,
diag.WithSummary("%s failed to insert into table %q", msg, e.Table.Name),
))
}
return partialFetchResources, diags
}

Expand Down
4 changes: 2 additions & 2 deletions provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ func TestProvider_FetchResources(t *testing.T) {
return mockDB
},
Context: func() context.Context {
//nolint:govet
ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
t.Cleanup(cancel)
return ctx
},

Expand Down

0 comments on commit 8c5a329

Please sign in to comment.