diff --git a/provider/execution/execution.go b/provider/execution/execution.go index 55387832..697fdcd4 100644 --- a/provider/execution/execution.go +++ b/provider/execution/execution.go @@ -52,7 +52,6 @@ type TableExecutor struct { // NewTableExecutor creates a new TableExecutor for given schema.Table func NewTableExecutor(resourceName string, db Storage, logger hclog.Logger, table *schema.Table, extraFields, metadata map[string]interface{}, classifier ErrorClassifier, goroutinesSem *semaphore.Weighted, timeout time.Duration) TableExecutor { - var classifiers = []ErrorClassifier{defaultErrorClassifier} if classifier != nil { classifiers = append([]ErrorClassifier{classifier}, classifiers...) @@ -77,50 +76,34 @@ func NewTableExecutor(resourceName string, db Storage, logger hclog.Logger, tabl // Resolve is the root function of table executor which starts an execution of a Table resolving it, and it's relations. func (e TableExecutor) Resolve(ctx context.Context, meta schema.ClientMeta) (uint64, diag.Diagnostics) { + var clients []schema.ClientMeta + if e.Table.Multiplex != nil { - if clients := e.Table.Multiplex(meta); len(clients) > 0 { - return e.doMultiplexResolve(ctx, clients) - } + clients = e.Table.Multiplex(meta) } - - if e.timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, e.timeout) - defer cancel() + if len(clients) == 0 { + clients = append(clients, meta) } - return e.withLogger(append(meta.Logger().ImpliedArgs(), "multiplex", false)...).callTableResolve(ctx, meta, nil) + + return e.doMultiplexResolve(ctx, clients) } // withTable allows to create a new TableExecutor for received *schema.Table func (e TableExecutor) withTable(t *schema.Table, kv ...interface{}) *TableExecutor { var c [2]schema.ColumnList c[0], c[1] = e.Db.Dialect().Columns(t).Sift() - return &TableExecutor{ - ResourceName: e.ResourceName, - Table: t, - Db: e.Db, - Logger: e.Logger.With(kv...), - classifiers: e.classifiers, - extraFields: e.extraFields, - executionStart: e.executionStart, - columns: c, - goroutinesSem: e.goroutinesSem, - } + cpy := e + cpy.Table = t + cpy.Logger = cpy.Logger.With(kv...) + cpy.columns = c + + return &cpy } func (e TableExecutor) withLogger(kv ...interface{}) *TableExecutor { - return &TableExecutor{ - ResourceName: e.ResourceName, - Table: e.Table, - Db: e.Db, - Logger: e.Logger.With(kv...), - classifiers: e.classifiers, - extraFields: e.extraFields, - executionStart: e.executionStart, - columns: e.columns, - goroutinesSem: e.goroutinesSem, - } - + cpy := e + cpy.Logger = cpy.Logger.With(kv...) + return &cpy } // doMultiplexResolve resolves table with multiplexed clients appending all diagnostics returned from each multiplex. @@ -286,9 +269,13 @@ func (e TableExecutor) callTableResolve(ctx context.Context, client schema.Clien if parent == nil { e.Logger.Info("fetched successfully", "count", nc) } - if err := e.cleanupStaleData(ctx, client, parent); err != nil { - return nc, diags.Add(fromError(err, diag.WithType(diag.DATABASE), diag.WithSummary("failed to cleanup stale data on table %q", e.Table.Name))) + + if !diags.HasErrors() { + if err := e.cleanupStaleData(ctx, client, parent); err != nil { + return nc, diags.Add(fromError(err, diag.WithType(diag.DATABASE), diag.WithSummary("failed to cleanup stale data on table %q", e.Table.Name))) + } } + return nc, diags } @@ -299,8 +286,8 @@ func (e TableExecutor) resolveResources(ctx context.Context, meta schema.ClientM diags diag.Diagnostics ) - for _, o := range objects { - resource := schema.NewResourceData(e.Db.Dialect(), e.Table, parent, o, e.metadata, e.executionStart) + for i := range objects { + resource := schema.NewResourceData(e.Db.Dialect(), e.Table, parent, objects[i], e.metadata, e.executionStart) // Before inserting resolve all table column resolvers resolveDiags := e.resolveResourceValues(ctx, meta, resource) diags = diags.Add(resolveDiags) diff --git a/provider/schema/meta.go b/provider/schema/meta.go index 03047550..da976fc6 100644 --- a/provider/schema/meta.go +++ b/provider/schema/meta.go @@ -22,6 +22,8 @@ type Meta struct { FetchId string `json:"fetch_id,omitempty"` } +const FetchIdMetaKey = "cq_fetch_id" + var ( cqMeta = Column{ Name: "cq_meta", @@ -31,7 +33,7 @@ var ( mi := Meta{ LastUpdate: time.Now().UTC(), } - if val, ok := resource.GetMeta("cq_fetch_id"); ok { + if val, ok := resource.GetMeta(FetchIdMetaKey); ok { if s, ok := val.(string); ok { mi.FetchId = s }