This repository has been archived by the owner on Aug 12, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 15
fix: Executor fixes #265
Merged
Merged
fix: Executor fixes #265
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
1919e6c
fix: Executor fixes
disq 31d3bcd
Remove ExtraFields / CascadeDeleteFilters
disq fcd66ae
Revert "Remove ExtraFields / CascadeDeleteFilters"
disq 689a935
Don't call cleanupStaleData if we have errors
disq 8430c95
enable copyfrom
disq 862f043
Fix empty multiplexer test
disq File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,7 +52,6 @@ type TableExecutor struct { | |
|
||
// NewTableExecutor creates a new TableExecutor for given schema.Table | ||
func NewTableExecutor(resourceName string, db Storage, logger hclog.Logger, table *schema.Table, extraFields, metadata map[string]interface{}, classifier ErrorClassifier, goroutinesSem *semaphore.Weighted, timeout time.Duration) TableExecutor { | ||
|
||
var classifiers = []ErrorClassifier{defaultErrorClassifier} | ||
if classifier != nil { | ||
classifiers = append([]ErrorClassifier{classifier}, classifiers...) | ||
|
@@ -77,50 +76,34 @@ func NewTableExecutor(resourceName string, db Storage, logger hclog.Logger, tabl | |
|
||
// Resolve is the root function of table executor which starts an execution of a Table resolving it, and it's relations. | ||
func (e TableExecutor) Resolve(ctx context.Context, meta schema.ClientMeta) (uint64, diag.Diagnostics) { | ||
var clients []schema.ClientMeta | ||
|
||
if e.Table.Multiplex != nil { | ||
if clients := e.Table.Multiplex(meta); len(clients) > 0 { | ||
return e.doMultiplexResolve(ctx, clients) | ||
} | ||
clients = e.Table.Multiplex(meta) | ||
} | ||
|
||
if e.timeout > 0 { | ||
var cancel context.CancelFunc | ||
ctx, cancel = context.WithTimeout(ctx, e.timeout) | ||
defer cancel() | ||
if len(clients) == 0 { | ||
clients = append(clients, meta) | ||
} | ||
return e.withLogger(append(meta.Logger().ImpliedArgs(), "multiplex", false)...).callTableResolve(ctx, meta, nil) | ||
|
||
return e.doMultiplexResolve(ctx, clients) | ||
} | ||
|
||
// withTable allows to create a new TableExecutor for received *schema.Table | ||
func (e TableExecutor) withTable(t *schema.Table, kv ...interface{}) *TableExecutor { | ||
var c [2]schema.ColumnList | ||
c[0], c[1] = e.Db.Dialect().Columns(t).Sift() | ||
return &TableExecutor{ | ||
ResourceName: e.ResourceName, | ||
Table: t, | ||
Db: e.Db, | ||
Logger: e.Logger.With(kv...), | ||
classifiers: e.classifiers, | ||
extraFields: e.extraFields, | ||
executionStart: e.executionStart, | ||
columns: c, | ||
goroutinesSem: e.goroutinesSem, | ||
} | ||
cpy := e | ||
cpy.Table = t | ||
cpy.Logger = cpy.Logger.With(kv...) | ||
cpy.columns = c | ||
|
||
return &cpy | ||
} | ||
|
||
func (e TableExecutor) withLogger(kv ...interface{}) *TableExecutor { | ||
return &TableExecutor{ | ||
ResourceName: e.ResourceName, | ||
Table: e.Table, | ||
Db: e.Db, | ||
Logger: e.Logger.With(kv...), | ||
classifiers: e.classifiers, | ||
extraFields: e.extraFields, | ||
executionStart: e.executionStart, | ||
columns: e.columns, | ||
goroutinesSem: e.goroutinesSem, | ||
} | ||
|
||
cpy := e | ||
cpy.Logger = cpy.Logger.With(kv...) | ||
return &cpy | ||
} | ||
|
||
// doMultiplexResolve resolves table with multiplexed clients appending all diagnostics returned from each multiplex. | ||
|
@@ -286,9 +269,13 @@ func (e TableExecutor) callTableResolve(ctx context.Context, client schema.Clien | |
if parent == nil { | ||
e.Logger.Info("fetched successfully", "count", nc) | ||
} | ||
if err := e.cleanupStaleData(ctx, client, parent); err != nil { | ||
return nc, diags.Add(fromError(err, diag.WithType(diag.DATABASE), diag.WithSummary("failed to cleanup stale data on table %q", e.Table.Name))) | ||
|
||
if !diags.HasErrors() { | ||
if err := e.cleanupStaleData(ctx, client, parent); err != nil { | ||
return nc, diags.Add(fromError(err, diag.WithType(diag.DATABASE), diag.WithSummary("failed to cleanup stale data on table %q", e.Table.Name))) | ||
} | ||
} | ||
|
||
return nc, diags | ||
} | ||
|
||
|
@@ -299,8 +286,8 @@ func (e TableExecutor) resolveResources(ctx context.Context, meta schema.ClientM | |
diags diag.Diagnostics | ||
) | ||
|
||
for _, o := range objects { | ||
resource := schema.NewResourceData(e.Db.Dialect(), e.Table, parent, o, e.metadata, e.executionStart) | ||
for i := range objects { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change seems unnecessary, no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the |
||
resource := schema.NewResourceData(e.Db.Dialect(), e.Table, parent, objects[i], e.metadata, e.executionStart) | ||
// Before inserting resolve all table column resolvers | ||
resolveDiags := e.resolveResourceValues(ctx, meta, resource) | ||
diags = diags.Add(resolveDiags) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a strong gut feeling we should avoid this change for now:
Seems dangerous in any case. I'd really avoid doing this unless we have a specific reason to do it (e.g. customers complining, sentry errors, ....).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the column resolver returns error diags, it doesn't continue to complete all other tasks so (if there are any) the new rows are incomplete anyway. no postresourceresolver called, no internal resolvers (cq_id!) called. https://github.com/cloudquery/cq-provider-sdk/blob/main/provider/execution/execution.go#L367-L370
not sure if it will lead to more rows or not on edge cases.