Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

fix: Delete by cq_id before insertion #266

Merged
merged 3 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions database/postgres/pgdatabase.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ func NewPgDatabase(ctx context.Context, logger hclog.Logger, dsn string, sd sche
var _ execution.Storage = (*PgDatabase)(nil)

// Insert inserts all resources to given table, table and resources are assumed from same table.
func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schema.Resources) error {
func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
if len(resources) == 0 {
return nil
}

// It is safe to assume that all resources have the same columns
cols := quoteColumns(resources.ColumnNames())
psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
Expand Down Expand Up @@ -76,10 +77,25 @@ func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schem
if err != nil {
return diag.NewBaseError(err, diag.DATABASE, diag.WithResourceName(t.Name), diag.WithSummary("bad insert SQL statement created"), diag.WithDetails("SQL statement %q is invalid", s))
}
_, err = p.pool.Exec(ctx, s, args...)

err = p.pool.BeginTxFunc(ctx, pgx.TxOptions{
IsoLevel: pgx.ReadCommitted,
AccessMode: pgx.ReadWrite,
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
if shouldCascade {
if err := deleteResourceByCQId(ctx, tx, resources, cascadeDeleteFilters); err != nil {
return err
}
}

_, err := tx.Exec(ctx, s, args...)
return err
})
if err == nil {
return nil
}

if pgErr, ok := err.(*pgconn.PgError); ok {
// This should rarely occur, but if it occurs we want to print the SQL to debug it further
if pgerrcode.IsSyntaxErrororAccessRuleViolation(pgErr.Code) {
Expand All @@ -104,16 +120,7 @@ func (p PgDatabase) CopyFrom(ctx context.Context, resources schema.Resources, sh
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
if shouldCascade {
q := goqu.Dialect("postgres").Delete(resources.TableName()).Where(goqu.Ex{"cq_id": resources.GetIds()})
for k, v := range cascadeDeleteFilters {
q = q.Where(goqu.Ex{k: goqu.Op{"eq": v}})
}
sql, args, err := q.Prepared(true).ToSQL()
if err != nil {
return err
}
_, err = tx.Exec(ctx, sql, args...)
if err != nil {
if err := deleteResourceByCQId(ctx, tx, resources, cascadeDeleteFilters); err != nil {
return err
}
}
Expand Down Expand Up @@ -221,3 +228,20 @@ func quoteColumns(columns []string) []string {
}
return ret
}

type execer interface {
disq marked this conversation as resolved.
Show resolved Hide resolved
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
}

func deleteResourceByCQId(ctx context.Context, db execer, resources schema.Resources, cascadeDeleteFilters map[string]interface{}) error {
q := goqu.Dialect("postgres").Delete(resources.TableName()).Where(goqu.Ex{"cq_id": resources.GetIds()})
for k, v := range cascadeDeleteFilters {
q = q.Where(goqu.Ex{k: goqu.Op{"eq": v}})
}
sql, args, err := q.Prepared(true).ToSQL()
if err != nil {
return err
}
_, err = db.Exec(ctx, sql, args...)
return err
}
4 changes: 2 additions & 2 deletions provider/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (e TableExecutor) saveToStorage(ctx context.Context, resources schema.Resou
e.Logger.Warn("failed copy-from to db", "error", err)

// fallback insert, copy from sometimes does problems, so we fall back with bulk insert
err = e.Db.Insert(ctx, e.Table, resources)
err = e.Db.Insert(ctx, e.Table, resources, shouldCascade, e.extraFields)
if err == nil {
return resources, nil
}
Expand All @@ -355,7 +355,7 @@ func (e TableExecutor) saveToStorage(ctx context.Context, resources schema.Resou
// Try to insert resource by resource if partial fetch is enabled and an error occurred
partialFetchResources := make(schema.Resources, 0)
for id := range resources {
if err := e.Db.Insert(ctx, e.Table, schema.Resources{resources[id]}); err != nil {
if err := e.Db.Insert(ctx, e.Table, schema.Resources{resources[id]}, shouldCascade, e.extraFields); err != nil {
e.Logger.Error("failed to insert resource into db", "error", err, "resource_keys", resources[id].PrimaryKeyValues())
diags = diags.Add(ClassifyError(err, diag.WithType(diag.DATABASE)))
continue
Expand Down
6 changes: 3 additions & 3 deletions provider/execution/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ func (_m *DatabaseMock) Exec(ctx context.Context, query string, args ...interfac
}

// Insert provides a mock function with given fields: ctx, t, instance
func (_m *DatabaseMock) Insert(ctx context.Context, t *schema.Table, instance schema.Resources) error {
func (_m *DatabaseMock) Insert(ctx context.Context, t *schema.Table, instance schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
ret := _m.Called(ctx, t, instance)

var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *schema.Table, schema.Resources) error); ok {
r0 = rf(ctx, t, instance)
if rf, ok := ret.Get(0).(func(context.Context, *schema.Table, schema.Resources, bool, map[string]interface{}) error); ok {
r0 = rf(ctx, t, instance, shouldCascade, cascadeDeleteFilters)
} else {
r0 = ret.Error(0)
}
Expand Down
4 changes: 2 additions & 2 deletions provider/execution/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
type Storage interface {
QueryExecer
Copier
Insert(ctx context.Context, t *schema.Table, instance schema.Resources) error
Insert(ctx context.Context, t *schema.Table, instance schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error
Delete(ctx context.Context, t *schema.Table, kvFilters []interface{}) error
RemoveStaleData(ctx context.Context, t *schema.Table, executionStart time.Time, kvFilters []interface{}) error
CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, CascadeDeleteFilters map[string]interface{}) error
CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error
Close()
Dialect() schema.Dialect
}
Expand Down
4 changes: 2 additions & 2 deletions provider/execution/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (f noopStorage) Exec(ctx context.Context, query string, args ...interface{}
return nil
}

func (f noopStorage) Insert(ctx context.Context, t *schema.Table, instance schema.Resources) error {
func (f noopStorage) Insert(ctx context.Context, t *schema.Table, instance schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
return nil
}

Expand All @@ -33,7 +33,7 @@ func (f noopStorage) RemoveStaleData(ctx context.Context, t *schema.Table, execu
return nil
}

func (f noopStorage) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, CascadeDeleteFilters map[string]interface{}) error {
func (f noopStorage) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions provider/schema/mock/mock_storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.