Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

Commit

Permalink
fix: Delete by cq_id before insertion (#266)
Browse files Browse the repository at this point in the history
Co-authored-by: Kemal Hadimli <[email protected]>
  • Loading branch information
disq and disq authored May 23, 2022
1 parent 79f98ce commit 1f74be7
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 24 deletions.
44 changes: 32 additions & 12 deletions database/postgres/pgdatabase.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ func NewPgDatabase(ctx context.Context, logger hclog.Logger, dsn string, sd sche
var _ execution.Storage = (*PgDatabase)(nil)

// Insert inserts all resources to given table, table and resources are assumed from same table.
func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schema.Resources) error {
func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
if len(resources) == 0 {
return nil
}

// It is safe to assume that all resources have the same columns
cols := quoteColumns(resources.ColumnNames())
psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
Expand Down Expand Up @@ -76,10 +77,25 @@ func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schem
if err != nil {
return diag.NewBaseError(err, diag.DATABASE, diag.WithResourceName(t.Name), diag.WithSummary("bad insert SQL statement created"), diag.WithDetails("SQL statement %q is invalid", s))
}
_, err = p.pool.Exec(ctx, s, args...)

err = p.pool.BeginTxFunc(ctx, pgx.TxOptions{
IsoLevel: pgx.ReadCommitted,
AccessMode: pgx.ReadWrite,
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
if shouldCascade {
if err := deleteResourceByCQId(ctx, tx, resources, cascadeDeleteFilters); err != nil {
return err
}
}

_, err := tx.Exec(ctx, s, args...)
return err
})
if err == nil {
return nil
}

if pgErr, ok := err.(*pgconn.PgError); ok {
// This should rarely occur, but if it occurs we want to print the SQL to debug it further
if pgerrcode.IsSyntaxErrororAccessRuleViolation(pgErr.Code) {
Expand All @@ -104,16 +120,7 @@ func (p PgDatabase) CopyFrom(ctx context.Context, resources schema.Resources, sh
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
if shouldCascade {
q := goqu.Dialect("postgres").Delete(resources.TableName()).Where(goqu.Ex{"cq_id": resources.GetIds()})
for k, v := range cascadeDeleteFilters {
q = q.Where(goqu.Ex{k: goqu.Op{"eq": v}})
}
sql, args, err := q.Prepared(true).ToSQL()
if err != nil {
return err
}
_, err = tx.Exec(ctx, sql, args...)
if err != nil {
if err := deleteResourceByCQId(ctx, tx, resources, cascadeDeleteFilters); err != nil {
return err
}
}
Expand Down Expand Up @@ -221,3 +228,16 @@ func quoteColumns(columns []string) []string {
}
return ret
}

func deleteResourceByCQId(ctx context.Context, tx pgx.Tx, resources schema.Resources, cascadeDeleteFilters map[string]interface{}) error {
q := goqu.Dialect("postgres").Delete(resources.TableName()).Where(goqu.Ex{"cq_id": resources.GetIds()})
for k, v := range cascadeDeleteFilters {
q = q.Where(goqu.Ex{k: goqu.Op{"eq": v}})
}
sql, args, err := q.Prepared(true).ToSQL()
if err != nil {
return err
}
_, err = tx.Exec(ctx, sql, args...)
return err
}
4 changes: 2 additions & 2 deletions provider/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (e TableExecutor) saveToStorage(ctx context.Context, resources schema.Resou
e.Logger.Warn("failed copy-from to db", "error", err)

// fallback insert, copy from sometimes does problems, so we fall back with bulk insert
err = e.Db.Insert(ctx, e.Table, resources)
err = e.Db.Insert(ctx, e.Table, resources, shouldCascade, e.extraFields)
if err == nil {
return resources, nil
}
Expand All @@ -342,7 +342,7 @@ func (e TableExecutor) saveToStorage(ctx context.Context, resources schema.Resou
// Try to insert resource by resource if partial fetch is enabled and an error occurred
partialFetchResources := make(schema.Resources, 0)
for id := range resources {
if err := e.Db.Insert(ctx, e.Table, schema.Resources{resources[id]}); err != nil {
if err := e.Db.Insert(ctx, e.Table, schema.Resources{resources[id]}, shouldCascade, e.extraFields); err != nil {
e.Logger.Error("failed to insert resource into db", "error", err, "resource_keys", resources[id].PrimaryKeyValues())
diags = diags.Add(ClassifyError(err, diag.WithType(diag.DATABASE)))
continue
Expand Down
6 changes: 3 additions & 3 deletions provider/execution/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ func (_m *DatabaseMock) Exec(ctx context.Context, query string, args ...interfac
}

// Insert provides a mock function with given fields: ctx, t, instance
func (_m *DatabaseMock) Insert(ctx context.Context, t *schema.Table, instance schema.Resources) error {
func (_m *DatabaseMock) Insert(ctx context.Context, t *schema.Table, instance schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
ret := _m.Called(ctx, t, instance)

var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *schema.Table, schema.Resources) error); ok {
r0 = rf(ctx, t, instance)
if rf, ok := ret.Get(0).(func(context.Context, *schema.Table, schema.Resources, bool, map[string]interface{}) error); ok {
r0 = rf(ctx, t, instance, shouldCascade, cascadeDeleteFilters)
} else {
r0 = ret.Error(0)
}
Expand Down
4 changes: 2 additions & 2 deletions provider/execution/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
type Storage interface {
QueryExecer
Copier
Insert(ctx context.Context, t *schema.Table, instance schema.Resources) error
Insert(ctx context.Context, t *schema.Table, instance schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error
Delete(ctx context.Context, t *schema.Table, kvFilters []interface{}) error
RemoveStaleData(ctx context.Context, t *schema.Table, executionStart time.Time, kvFilters []interface{}) error
CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, CascadeDeleteFilters map[string]interface{}) error
CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error
Close()
Dialect() schema.Dialect
}
Expand Down
2 changes: 1 addition & 1 deletion provider/execution/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (f noopStorage) Exec(ctx context.Context, query string, args ...interface{}
return nil
}

func (f noopStorage) Insert(ctx context.Context, t *schema.Table, instance schema.Resources) error {
func (f noopStorage) Insert(ctx context.Context, t *schema.Table, instance schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions provider/schema/mock/mock_storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1f74be7

Please sign in to comment.