Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

Commit

Permalink
Remove ExtraFields / CascadeDeleteFilters
Browse files Browse the repository at this point in the history
This will break history support, so I'll revert this commit.
  • Loading branch information
disq committed May 20, 2022
1 parent 1919e6c commit 31d3bcd
Show file tree
Hide file tree
Showing 14 changed files with 313 additions and 359 deletions.
16 changes: 2 additions & 14 deletions cqproto/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,13 @@ func (g GRPCClient) GetProviderConfig(ctx context.Context, _ *GetProviderConfigR
}

func (g GRPCClient) ConfigureProvider(ctx context.Context, request *ConfigureProviderRequest) (*ConfigureProviderResponse, error) {
fieldsData, err := msgpack.Marshal(request.ExtraFields)
if err != nil {
return nil, err
}
res, err := g.client.ConfigureProvider(ctx, &internal.ConfigureProvider_Request{
CloudqueryVersion: request.CloudQueryVersion,
Connection: &internal.ConnectionDetails{
Type: internal.ConnectionType_POSTGRES,
Dsn: request.Connection.DSN,
},
Config: request.Config,
ExtraFields: fieldsData,
Config: request.Config,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -154,20 +149,13 @@ func (g *GRPCServer) GetProviderConfig(ctx context.Context, _ *internal.GetProvi
}

func (g *GRPCServer) ConfigureProvider(ctx context.Context, request *internal.ConfigureProvider_Request) (*internal.ConfigureProvider_Response, error) {
var eFields = make(map[string]interface{})
if request.GetExtraFields() != nil {
if err := msgpack.Unmarshal(request.GetExtraFields(), &eFields); err != nil {
return nil, err
}
}
resp, err := g.Impl.ConfigureProvider(ctx, &ConfigureProviderRequest{
CloudQueryVersion: request.GetCloudqueryVersion(),
Connection: ConnectionDetails{
Type: string(request.Connection.GetType()),
DSN: request.Connection.GetDsn(),
},
Config: request.Config,
ExtraFields: eFields,
Config: request.Config,
})
if err != nil {
return nil, err
Expand Down
568 changes: 278 additions & 290 deletions cqproto/internal/plugin.pb.go

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions cqproto/internal/plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ message ConfigureProvider {
ConnectionDetails connection = 2;
// Holds information such as credentials, regions, accounts, etc'
bytes config = 3;
// Msgpack encoded: Allows to inject & override fields into resource tables, use this carefully to override fields. Used in tests.
bytes extraFields = 5;
// next available id: 6
}
message Response {
string error = 1;
Expand Down
1 change: 0 additions & 1 deletion cqproto/internal/plugin_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions cqproto/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ type ConfigureProviderRequest struct {
Connection ConnectionDetails
// Config is the configuration the user supplied for the provider
Config []byte
// Fields to inject to every resource on insert
ExtraFields map[string]interface{}
}

type ConfigureProviderResponse struct {
Expand Down
5 changes: 1 addition & 4 deletions database/postgres/pgdatabase.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schem
}

// CopyFrom copies all resources from []*Resource
func (p PgDatabase) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
func (p PgDatabase) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool) error {
if len(resources) == 0 {
return nil
}
Expand All @@ -105,9 +105,6 @@ func (p PgDatabase) CopyFrom(ctx context.Context, resources schema.Resources, sh
}, func(tx pgx.Tx) error {
if shouldCascade {
q := goqu.Dialect("postgres").Delete(resources.TableName()).Where(goqu.Ex{"cq_id": resources.GetIds()})
for k, v := range cascadeDeleteFilters {
q = q.Where(goqu.Ex{k: goqu.Op{"eq": v}})
}
sql, args, err := q.Prepared(true).ToSQL()
if err != nil {
return err
Expand Down
29 changes: 11 additions & 18 deletions provider/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ type TableExecutor struct {
Logger hclog.Logger
// classifiers
classifiers []ErrorClassifier
// extraFields to be passed to each created resource in the execution, used in tests.
extraFields map[string]interface{}
// metadata to be passed to each created resource in the execution, used by cq* resolvers.
metadata map[string]interface{}
// When the execution started
Expand All @@ -51,7 +49,7 @@ type TableExecutor struct {
}

// NewTableExecutor creates a new TableExecutor for given schema.Table
func NewTableExecutor(resourceName string, db Storage, logger hclog.Logger, table *schema.Table, extraFields, metadata map[string]interface{}, classifier ErrorClassifier, goroutinesSem *semaphore.Weighted, timeout time.Duration) TableExecutor {
func NewTableExecutor(resourceName string, db Storage, logger hclog.Logger, table *schema.Table, metadata map[string]interface{}, classifier ErrorClassifier, goroutinesSem *semaphore.Weighted, timeout time.Duration) TableExecutor {
var classifiers = []ErrorClassifier{defaultErrorClassifier}
if classifier != nil {
classifiers = append([]ErrorClassifier{classifier}, classifiers...)
Expand All @@ -64,7 +62,6 @@ func NewTableExecutor(resourceName string, db Storage, logger hclog.Logger, tabl
Table: table,
Db: db,
Logger: logger,
extraFields: extraFields,
metadata: metadata,
classifiers: classifiers,
executionStart: time.Now().Add(executionJitter),
Expand Down Expand Up @@ -189,15 +186,12 @@ func (e TableExecutor) cleanupStaleData(ctx context.Context, client schema.Clien
if parent != nil {
return nil
}
e.Logger.Debug("cleaning table stale data", "last_update", e.executionStart)

filters := make([]interface{}, 0)
for k, v := range e.extraFields {
filters = append(filters, k, v)
}
if e.Table.DeleteFilter != nil {
filters = append(filters, e.Table.DeleteFilter(client, parent)...)
if e.Table.DeleteFilter == nil {
return nil
}

e.Logger.Debug("cleaning table stale data", "last_update", e.executionStart)
filters := e.Table.DeleteFilter(client, parent)
if err := e.Db.RemoveStaleData(ctx, e.Table, e.executionStart, filters); err != nil {
e.Logger.Warn("failed to clean table stale data", "last_update", e.executionStart, "err", err)
return err
Expand Down Expand Up @@ -320,12 +314,11 @@ func (e TableExecutor) saveToStorage(ctx context.Context, resources schema.Resou
if l := len(resources); l > 0 {
e.Logger.Debug("storing resources", "count", l)
}
//err := e.Db.CopyFrom(ctx, resources, shouldCascade, e.extraFields)
//if err == nil {
// return resources, nil
//}
//e.Logger.Warn("failed copy-from to db", "error", err)
var err error
err := e.Db.CopyFrom(ctx, resources, shouldCascade)
if err == nil {
return resources, nil
}
e.Logger.Warn("failed copy-from to db", "error", err)

// fallback insert, copy from sometimes does problems, so we fall back with bulk insert
err = e.Db.Insert(ctx, e.Table, resources)
Expand Down
9 changes: 4 additions & 5 deletions provider/execution/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import (
)

type ExecutionTestCase struct {
Name string
Table *schema.Table
ExtraFields map[string]interface{}
Name string
Table *schema.Table

SetupStorage func(t *testing.T) Storage
ExpectedResourceCount uint64
Expand Down Expand Up @@ -530,7 +529,7 @@ func TestTableExecutor_Resolve(t *testing.T) {
storage = tc.SetupStorage(t)
}
limiter := semaphore.NewWeighted(int64(limit.GetMaxGoRoutines()))
exec := NewTableExecutor(tc.Name, storage, testlog.New(t), tc.Table, tc.ExtraFields, nil, nil, limiter, 10*time.Second)
exec := NewTableExecutor(tc.Name, storage, testlog.New(t), tc.Table, nil, nil, limiter, 10*time.Second)
count, diags := exec.Resolve(context.Background(), executionClient)
assert.Equal(t, tc.ExpectedResourceCount, count)
if tc.ErrorExpected {
Expand Down Expand Up @@ -649,7 +648,7 @@ func TestTableExecutor_resolveResourceValues(t *testing.T) {
storage = tc.SetupStorage(t)
}
limiter := semaphore.NewWeighted(int64(limit.GetMaxGoRoutines()))
exec := NewTableExecutor(tc.Name, storage, testlog.New(t), tc.Table, nil, nil, nil, limiter, 0)
exec := NewTableExecutor(tc.Name, storage, testlog.New(t), tc.Table, nil, nil, limiter, 0)

r := schema.NewResourceData(storage.Dialect(), tc.Table, nil, tc.ResourceData, tc.MetaData, exec.executionStart)
// columns should be resolved from ColumnResolver functions or default functions
Expand Down
10 changes: 5 additions & 5 deletions provider/execution/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ func (_m *DatabaseMock) Close() {
_m.Called()
}

// CopyFrom provides a mock function with given fields: ctx, resources, shouldCascade, CascadeDeleteFilters
func (_m *DatabaseMock) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, CascadeDeleteFilters map[string]interface{}) error {
ret := _m.Called(ctx, resources, shouldCascade, CascadeDeleteFilters)
// CopyFrom provides a mock function with given fields: ctx, resources, shouldCascade
func (_m *DatabaseMock) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool) error {
ret := _m.Called(ctx, resources, shouldCascade)

var r0 error
if rf, ok := ret.Get(0).(func(context.Context, schema.Resources, bool, map[string]interface{}) error); ok {
r0 = rf(ctx, resources, shouldCascade, CascadeDeleteFilters)
if rf, ok := ret.Get(0).(func(context.Context, schema.Resources, bool) error); ok {
r0 = rf(ctx, resources, shouldCascade)
} else {
r0 = ret.Error(0)
}
Expand Down
2 changes: 1 addition & 1 deletion provider/execution/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Storage interface {
Insert(ctx context.Context, t *schema.Table, instance schema.Resources) error
Delete(ctx context.Context, t *schema.Table, kvFilters []interface{}) error
RemoveStaleData(ctx context.Context, t *schema.Table, executionStart time.Time, kvFilters []interface{}) error
CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, CascadeDeleteFilters map[string]interface{}) error
CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool) error
Close()
Dialect() schema.Dialect
}
Expand Down
2 changes: 1 addition & 1 deletion provider/execution/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (f noopStorage) RemoveStaleData(ctx context.Context, t *schema.Table, execu
return nil
}

func (f noopStorage) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, CascadeDeleteFilters map[string]interface{}) error {
func (f noopStorage) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool) error {
return nil
}

Expand Down
5 changes: 1 addition & 4 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ type Provider struct {
dbURL string
// meta is the provider's client created when configure is called
meta schema.ClientMeta
// Add extra fields to all resources, these fields don't show up in documentation and are used for internal CQ testing.
extraFields map[string]interface{}
// storageCreator creates a database based on requested engine
storageCreator func(ctx context.Context, logger hclog.Logger, dbURL string) (execution.Storage, error)
}
Expand Down Expand Up @@ -121,7 +119,6 @@ func (p *Provider) ConfigureProvider(_ context.Context, request *cqproto.Configu
}
}

p.extraFields = request.ExtraFields
p.dbURL = request.Connection.DSN
providerConfig := p.Config()
if err := defaults.Set(providerConfig); err != nil {
Expand Down Expand Up @@ -211,7 +208,7 @@ func (p *Provider) FetchResources(ctx context.Context, request *cqproto.FetchRes
if !ok {
return fmt.Errorf("plugin %s does not provide resource %s", p.Name, resource)
}
tableExec := execution.NewTableExecutor(resource, conn, p.Logger.With("table", table.Name), table, p.extraFields, request.Metadata, p.ErrorClassifier, goroutinesSem, request.Timeout)
tableExec := execution.NewTableExecutor(resource, conn, p.Logger.With("table", table.Name), table, request.Metadata, p.ErrorClassifier, goroutinesSem, request.Timeout)
p.Logger.Debug("fetching table...", "provider", p.Name, "table", table.Name)
// Save resource aside
r := resource
Expand Down
12 changes: 4 additions & 8 deletions provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,7 @@ func TestProvider_ConfigureProvider(t *testing.T) {
Connection: cqproto.ConnectionDetails{
DSN: "postgres://postgres:pass@localhost:5432/postgres?sslmode=disable",
},
Config: nil,
ExtraFields: nil,
Config: nil,
})
assert.Equal(t, "provider unitest logger not defined, make sure to run it with serve", resp.Diagnostics.Error())
assert.NoError(t, err)
Expand All @@ -285,8 +284,7 @@ func TestProvider_ConfigureProvider(t *testing.T) {
Connection: cqproto.ConnectionDetails{
DSN: "postgres://postgres:pass@localhost:5432/postgres?sslmode=disable",
},
Config: nil,
ExtraFields: nil,
Config: nil,
})
assert.True(t, resp.Diagnostics.HasErrors())
assert.Equal(t, "test error", resp.Diagnostics.Error())
Expand All @@ -313,8 +311,7 @@ func TestProvider_FetchResources(t *testing.T) {
Connection: cqproto.ConnectionDetails{
DSN: "postgres://postgres:pass@localhost:5432/postgres?sslmode=disable",
},
Config: nil,
ExtraFields: nil,
Config: nil,
})
ctrl := gomock.NewController(t)
var fetchCases = []FetchResourceTableTest{
Expand Down Expand Up @@ -440,8 +437,7 @@ func TestProvider_FetchResourcesParallelLimit(t *testing.T) {
Connection: cqproto.ConnectionDetails{
DSN: "postgres://postgres:pass@localhost:5432/postgres?sslmode=disable",
},
Config: nil,
ExtraFields: nil,
Config: nil,
})
assert.False(t, resp.Diagnostics.HasDiags())
assert.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions provider/schema/mock/mock_storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 31d3bcd

Please sign in to comment.