Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

chore!: Remove unused code/features: Global tables, CascadeDeleteFilters, ExtraFields, AlwaysDelete #392

Merged
merged 6 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 4 additions & 16 deletions cqproto/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,14 @@ func (g GRPCClient) GetProviderConfig(ctx context.Context, request *GetProviderC
}

func (g GRPCClient) ConfigureProvider(ctx context.Context, request *ConfigureProviderRequest) (*ConfigureProviderResponse, error) {
fieldsData, err := msgpack.Marshal(request.ExtraFields)
if err != nil {
return nil, err
}
res, err := g.client.ConfigureProvider(ctx, &internal.ConfigureProvider_Request{
CloudqueryVersion: request.CloudQueryVersion,
Connection: &internal.ConnectionDetails{
Type: internal.ConnectionType_POSTGRES,
Dsn: request.Connection.DSN,
},
Config: request.Config,
ExtraFields: fieldsData,
Format: configFormatFromProto(request.Format),
Config: request.Config,
Format: configFormatFromProto(request.Format),
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -164,21 +159,14 @@ func (g *GRPCServer) GetProviderConfig(ctx context.Context, request *internal.Ge
}

func (g *GRPCServer) ConfigureProvider(ctx context.Context, request *internal.ConfigureProvider_Request) (*internal.ConfigureProvider_Response, error) {
var eFields = make(map[string]interface{})
if request.GetExtraFields() != nil {
if err := msgpack.Unmarshal(request.GetExtraFields(), &eFields); err != nil {
return nil, err
}
}
resp, err := g.Impl.ConfigureProvider(ctx, &ConfigureProviderRequest{
CloudQueryVersion: request.GetCloudqueryVersion(),
Connection: ConnectionDetails{
Type: string(request.Connection.GetType()),
DSN: request.Connection.GetDsn(),
},
Config: request.Config,
ExtraFields: eFields,
Format: configFormatToProto(request.Format),
Config: request.Config,
Format: configFormatToProto(request.Format),
})
if err != nil {
return nil, err
Expand Down
547 changes: 268 additions & 279 deletions cqproto/internal/plugin.pb.go

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions cqproto/internal/plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ message ConfigureProvider {
ConnectionDetails connection = 2;
// Holds information such as credentials, regions, accounts, etc'
bytes config = 3;
// Msgpack encoded: Allows to inject & override fields into resource tables, use this carefully to override fields. Used in tests.
bytes extraFields = 5;
roneli marked this conversation as resolved.
Show resolved Hide resolved
// Holds the format of the config
ConfigFormat format = 6;
}
Expand Down
2 changes: 0 additions & 2 deletions cqproto/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ type ConfigureProviderRequest struct {
Connection ConnectionDetails
// Config is the configuration the user supplied for the provider
Config []byte
// Fields to inject to every resource on insert
ExtraFields map[string]interface{}
// ConfigFormat is the format of the passed config
Format ConfigFormat
}
Expand Down
25 changes: 6 additions & 19 deletions database/postgres/pgdatabase.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@ import (
"fmt"
"io"
"strconv"
"strings"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/cloudquery/cq-provider-sdk/provider/diag"
"github.com/cloudquery/cq-provider-sdk/provider/execution"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"github.com/doug-martin/goqu/v9"
// Init postgres
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
_ "github.com/doug-martin/goqu/v9/dialect/postgres" // Init postgres
"github.com/hashicorp/go-hclog"
"github.com/jackc/pgconn"
"github.com/jackc/pgerrcode"
Expand Down Expand Up @@ -48,7 +46,7 @@ func NewPgDatabase(ctx context.Context, logger hclog.Logger, dsn string, sd sche
}

// Insert inserts all resources to given table, table and resources are assumed from same table.
func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schema.Resources, shouldCascade bool) error {
if len(resources) == 0 {
return nil
}
Expand All @@ -67,14 +65,6 @@ func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schem
}
sqlStmt = sqlStmt.Values(values...)
}
if t.Global {
updateColumns := make([]string, len(cols))
for i, c := range cols {
updateColumns[i] = fmt.Sprintf("%[1]s = excluded.%[1]s", c)
}
sqlStmt = sqlStmt.Suffix(fmt.Sprintf("ON CONFLICT (%s) DO UPDATE SET %s",
strings.Join(p.sd.PrimaryKeys(t), ","), strings.Join(updateColumns, ",")))
}

s, args, err := sqlStmt.ToSql()
if err != nil {
Expand All @@ -87,7 +77,7 @@ func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schem
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
if shouldCascade {
if err := deleteResourceByCQId(ctx, tx, resources, cascadeDeleteFilters); err != nil {
if err := deleteResourceByCQId(ctx, tx, resources); err != nil {
return err
}
}
Expand All @@ -113,7 +103,7 @@ func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schem
}

// CopyFrom copies all resources from []*Resource
func (p PgDatabase) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
func (p PgDatabase) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool) error {
if len(resources) == 0 {
return nil
}
Expand All @@ -123,7 +113,7 @@ func (p PgDatabase) CopyFrom(ctx context.Context, resources schema.Resources, sh
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
if shouldCascade {
if err := deleteResourceByCQId(ctx, tx, resources, cascadeDeleteFilters); err != nil {
if err := deleteResourceByCQId(ctx, tx, resources); err != nil {
return err
}
}
Expand Down Expand Up @@ -253,11 +243,8 @@ func quoteColumns(columns []string) []string {
return ret
}

func deleteResourceByCQId(ctx context.Context, tx pgx.Tx, resources schema.Resources, cascadeDeleteFilters map[string]interface{}) error {
func deleteResourceByCQId(ctx context.Context, tx pgx.Tx, resources schema.Resources) error {
q := goqu.Dialect("postgres").Delete(resources.TableName()).Where(goqu.Ex{"cq_id": resources.GetIds()})
for k, v := range cascadeDeleteFilters {
q = q.Where(goqu.Ex{k: goqu.Op{"eq": v}})
}
sql, args, err := q.Prepared(true).ToSQL()
if err != nil {
return err
Expand Down
33 changes: 5 additions & 28 deletions provider/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ type TableExecutor struct {
Logger hclog.Logger
// classifiers
classifiers []ErrorClassifier
// extraFields to be passed to each created resource in the execution, used in tests.
extraFields map[string]interface{}
// metadata to be passed to each created resource in the execution, used by cq* resolvers.
metadata map[string]interface{}
// When the execution started
Expand All @@ -53,7 +51,7 @@ type TableExecutor struct {
}

// NewTableExecutor creates a new TableExecutor for given schema.Table
func NewTableExecutor(resourceName string, db Storage, logger hclog.Logger, table *schema.Table, extraFields, metadata map[string]interface{}, classifier ErrorClassifier, goroutinesSem *semaphore.Weighted, timeout time.Duration) TableExecutor {
func NewTableExecutor(resourceName string, db Storage, logger hclog.Logger, table *schema.Table, metadata map[string]interface{}, classifier ErrorClassifier, goroutinesSem *semaphore.Weighted, timeout time.Duration) TableExecutor {
var classifiers = []ErrorClassifier{defaultErrorClassifier}
if classifier != nil {
classifiers = append([]ErrorClassifier{classifier}, classifiers...)
Expand All @@ -66,7 +64,6 @@ func NewTableExecutor(resourceName string, db Storage, logger hclog.Logger, tabl
Table: table,
Db: db,
Logger: logger,
extraFields: extraFields,
metadata: metadata,
classifiers: classifiers,
executionStart: time.Now().Add(executionJitter),
Expand Down Expand Up @@ -172,20 +169,6 @@ func (e TableExecutor) doMultiplexResolve(ctx context.Context, clients []schema.
return totalResources, allDiags
}

// truncateTable cleans up a table from all data based on it's DeleteFilter
func (e TableExecutor) truncateTable(ctx context.Context, client schema.ClientMeta, parent *schema.Resource) error {
if e.Table.DeleteFilter == nil {
return nil
}
if e.Table.AlwaysDelete {
// Delete previous fetch
e.Logger.Debug("cleaning table previous fetch", "always_delete", e.Table.AlwaysDelete)
return e.Db.Delete(ctx, e.Table, e.Table.DeleteFilter(client, parent))
}
e.Logger.Debug("skipping table truncate")
return nil
}

// cleanupStaleData cleans resources in table that weren't update in the latest table resolve execution
func (e TableExecutor) cleanupStaleData(ctx context.Context, client schema.ClientMeta, parent *schema.Resource) error {
// Only clean top level tables
Expand All @@ -194,10 +177,7 @@ func (e TableExecutor) cleanupStaleData(ctx context.Context, client schema.Clien
}
e.Logger.Debug("cleaning table stale data", "last_update", e.executionStart)

filters := make([]interface{}, 0)
for k, v := range e.extraFields {
filters = append(filters, k, v)
}
var filters []interface{}
if e.Table.DeleteFilter != nil {
filters = append(filters, e.Table.DeleteFilter(client, parent)...)
}
Expand All @@ -220,9 +200,6 @@ func (e TableExecutor) callTableResolve(ctx context.Context, client schema.Clien
if e.Table.Resolver == nil {
return 0, diags.Add(diag.NewBaseError(nil, diag.SCHEMA, diag.WithSeverity(diag.ERROR), diag.WithResourceName(e.ResourceName), diag.WithSummary("table %q missing resolver, make sure table implements the resolver", e.Table.Name)))
}
if err := e.truncateTable(ctx, client, parent); err != nil {
return 0, diags.Add(ClassifyError(err, diag.WithResourceName(e.ResourceName)))
}

res := make(chan interface{})
var resolverErr error
Expand Down Expand Up @@ -327,14 +304,14 @@ func (e TableExecutor) saveToStorage(ctx context.Context, resources schema.Resou
if l := len(resources); l > 0 {
e.Logger.Debug("storing resources", "count", l)
}
err := e.Db.CopyFrom(ctx, resources, shouldCascade, e.extraFields)
err := e.Db.CopyFrom(ctx, resources, shouldCascade)
if err == nil {
return resources, nil
}
e.Logger.Warn("failed copy-from to db", "error", err)

// fallback insert, copy from sometimes does problems, so we fall back with bulk insert
err = e.Db.Insert(ctx, e.Table, resources, shouldCascade, e.extraFields)
err = e.Db.Insert(ctx, e.Table, resources, shouldCascade)
if err == nil {
return resources, nil
}
Expand All @@ -344,7 +321,7 @@ func (e TableExecutor) saveToStorage(ctx context.Context, resources schema.Resou
// Try to insert resource by resource if partial fetch is enabled and an error occurred
partialFetchResources := make(schema.Resources, 0)
for id := range resources {
if err := e.Db.Insert(ctx, e.Table, schema.Resources{resources[id]}, shouldCascade, e.extraFields); err != nil {
if err := e.Db.Insert(ctx, e.Table, schema.Resources{resources[id]}, shouldCascade); err != nil {
e.Logger.Error("failed to insert resource into db", "error", err, "resource_keys", resources[id].PrimaryKeyValues())
diags = diags.Add(ClassifyError(err, diag.WithType(diag.DATABASE)))
continue
Expand Down
69 changes: 10 additions & 59 deletions provider/execution/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import (
)

type ExecutionTestCase struct {
Name string
Table *schema.Table
ExtraFields map[string]interface{}
Name string
Table *schema.Table

SetupStorage func(t *testing.T) Storage
ExpectedResourceCount uint64
Expand Down Expand Up @@ -344,54 +343,6 @@ func TestTableExecutor_Resolve(t *testing.T) {
},
},
},
{
Name: "always_delete",
SetupStorage: func(t *testing.T) Storage {
db := new(DatabaseMock)
db.On("Delete", mock.Anything, mock.Anything, mock.Anything).Return(nil)
db.On("RemoveStaleData", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
db.On("Dialect").Return(noopDialect{})
return db
},
Table: &schema.Table{
Name: "simple",
AlwaysDelete: true,
DeleteFilter: func(meta schema.ClientMeta, parent *schema.Resource) []interface{} {
return []interface{}{}
},
Resolver: doNothingResolver,
Columns: commonColumns,
},
},
{
Name: "always_delete_fail",
SetupStorage: func(t *testing.T) Storage {
db := new(DatabaseMock)
db.On("Delete", mock.Anything, mock.Anything, mock.Anything).
Return(fromError(errors.New("failed delete"), diag.WithResourceName("always_delete_fail"), diag.WithType(diag.DATABASE)))
db.On("Dialect").Return(noopDialect{})
return db
},
Table: &schema.Table{
Name: "simple",
AlwaysDelete: true,
DeleteFilter: func(meta schema.ClientMeta, parent *schema.Resource) []interface{} {
return []interface{}{}
},
Resolver: doNothingResolver,
Columns: commonColumns,
},
ErrorExpected: true,
ExpectedDiags: []diag.FlatDiag{
{
Err: "failed delete",
Resource: "always_delete_fail",
Severity: diag.ERROR,
Type: diag.DATABASE,
Summary: "failed delete",
},
},
},
{
Name: "cleanup_stale_data_fail",
SetupStorage: func(t *testing.T) Storage {
Expand Down Expand Up @@ -426,7 +377,7 @@ func TestTableExecutor_Resolve(t *testing.T) {
db := new(DatabaseMock)
db.On("RemoveStaleData", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
db.On("Dialect").Return(noopDialect{})
db.On("CopyFrom", mock.Anything, mock.Anything, true, map[string]interface{}(nil)).Return(nil)
db.On("CopyFrom", mock.Anything, mock.Anything, true).Return(nil)
return db
},
Table: &schema.Table{
Expand All @@ -443,7 +394,7 @@ func TestTableExecutor_Resolve(t *testing.T) {
db := new(DatabaseMock)
db.On("RemoveStaleData", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
db.On("Dialect").Return(noopDialect{})
db.On("CopyFrom", mock.Anything, mock.Anything, true, map[string]interface{}(nil)).Return(nil)
db.On("CopyFrom", mock.Anything, mock.Anything, true).Return(nil)
return db
},
Table: &schema.Table{
Expand Down Expand Up @@ -476,7 +427,7 @@ func TestTableExecutor_Resolve(t *testing.T) {
db := new(DatabaseMock)
db.On("RemoveStaleData", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
db.On("Dialect").Return(schema.PostgresDialect{})
db.On("CopyFrom", mock.Anything, mock.Anything, true, map[string]interface{}(nil)).Return(nil).Run(
db.On("CopyFrom", mock.Anything, mock.Anything, true).Return(nil).Run(
func(args mock.Arguments) {
resources := args.Get(1).(schema.Resources)
if !assert.Greater(t, len(resources), 0) {
Expand Down Expand Up @@ -511,7 +462,7 @@ func TestTableExecutor_Resolve(t *testing.T) {
db := new(DatabaseMock)
db.On("RemoveStaleData", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
db.On("Dialect").Return(noopDialect{})
db.On("CopyFrom", mock.Anything, mock.Anything, true, map[string]interface{}(nil)).Return(nil)
db.On("CopyFrom", mock.Anything, mock.Anything, true).Return(nil)
return db
},
Table: &schema.Table{
Expand Down Expand Up @@ -556,10 +507,10 @@ func TestTableExecutor_Resolve(t *testing.T) {
ErrorExpected: true,
ExpectedDiags: []diag.FlatDiag{
{
Err: `error at github.com/cloudquery/cq-provider-sdk/provider/execution.glob..func4[execution_test.go:75] some error`,
Err: `error at github.com/cloudquery/cq-provider-sdk/provider/execution.glob..func4[execution_test.go:74] some error`,
Resource: "return_wrap_error",
Severity: diag.ERROR,
Summary: `failed to resolve table "simple": error at github.com/cloudquery/cq-provider-sdk/provider/execution.glob..func4[execution_test.go:75] some error`,
Summary: `failed to resolve table "simple": error at github.com/cloudquery/cq-provider-sdk/provider/execution.glob..func4[execution_test.go:74] some error`,
Type: diag.RESOLVING,
},
},
Expand Down Expand Up @@ -653,7 +604,7 @@ func TestTableExecutor_Resolve(t *testing.T) {
fmt.Println("debug")
}
limiter := semaphore.NewWeighted(int64(limit.GetMaxGoRoutines()))
exec := NewTableExecutor(tc.Name, storage, testlog.New(t), tc.Table, tc.ExtraFields, nil, nil, limiter, 10*time.Second)
exec := NewTableExecutor(tc.Name, storage, testlog.New(t), tc.Table, nil, nil, limiter, 10*time.Second)
count, diags := exec.Resolve(context.Background(), executionClient)
assert.Equal(t, tc.ExpectedResourceCount, count)
if tc.ErrorExpected {
Expand Down Expand Up @@ -715,7 +666,7 @@ func TestTableExecutor_resolveResourceValues(t *testing.T) {
storage = tc.SetupStorage(t)
}
limiter := semaphore.NewWeighted(int64(limit.GetMaxGoRoutines()))
exec := NewTableExecutor(tc.Name, storage, testlog.New(t), tc.Table, nil, nil, nil, limiter, 0)
exec := NewTableExecutor(tc.Name, storage, testlog.New(t), tc.Table, nil, nil, limiter, 0)

r := schema.NewResourceData(storage.Dialect(), tc.Table, nil, tc.ResourceData, tc.MetaData, exec.executionStart)
// columns should be resolved from ColumnResolver functions or default functions
Expand Down
Loading