Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

feat: Testable Migrations (SDK) #146

Merged
merged 59 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3f488d6
Table Creator to migration generator
disq Jan 6, 2022
a720d16
Tear out sqlbuilder
disq Jan 6, 2022
79095d3
Diff migrations
disq Jan 7, 2022
46a8b1a
Add cli handler
disq Jan 7, 2022
598a1b2
Get ctx
disq Jan 7, 2022
1b16a49
Do the types in down migrations
disq Jan 7, 2022
c8e22ef
Find renames
disq Jan 7, 2022
14bd87b
Handle CREATE TABLE in UpgradeTable path
disq Jan 10, 2022
0377d8e
linting
disq Jan 10, 2022
4b5c794
Keep TableCreator signature for now, add plugin version 4 support
disq Jan 10, 2022
62ef6c8
Use consts for proto versions
disq Jan 10, 2022
5a3f99a
define Vunmanaged
disq Jan 10, 2022
42f9b75
Attempt to do the migrations hook
disq Jan 10, 2022
c23aadb
NotNull creation option, add cq_fetch_date to default columns
disq Jan 11, 2022
6cf040f
Fix tests for cq_fetch_date and meta->cq_meta rename
disq Jan 11, 2022
50f6035
not null cq_id
disq Jan 11, 2022
c01cb20
Always handle migrations in the same order
disq Jan 11, 2022
cbd96d4
Fix more tests
disq Jan 11, 2022
3e50d03
Add dialect
disq Jan 11, 2022
0d23e53
Dialectify migrations
disq Jan 11, 2022
4e6b8a8
cli migration helper: Generate separate files for separate dialects
disq Jan 11, 2022
c6dbfc8
Move migrations up, as migration
disq Jan 13, 2022
e4637ef
More dialect stuff
disq Jan 13, 2022
d810ee9
Move migrator elsewhere, use dialect directory
disq Jan 13, 2022
78523a5
migrator.NewMigrator -> migrator.New
disq Jan 13, 2022
51e2eb9
Register timescale scheme, use parser helper in DSNtoDialect
disq Jan 13, 2022
f5d125b
migrator: Add postHook to every call
disq Jan 13, 2022
3f0fc9a
Remove GetDefaultSDKColumns, fix tests
disq Jan 13, 2022
1c3be00
Remove inject fields test
disq Jan 13, 2022
f0f8148
dialect: Remove SupportsForeignKeys(), add Extra() for indexes
disq Jan 14, 2022
ae28cbb
tsdb: FK information as comments
disq Jan 14, 2022
40a1b0d
Fix dialect-specific migration handling
disq Jan 14, 2022
b50cd11
Use schema.QueryExecer instead of pool
disq Jan 14, 2022
ffd5b68
Remove V2 and V3 proto compat
disq Jan 14, 2022
5d437de
Fixes around constraint names and DSN replacements
disq Jan 14, 2022
2a1d236
Fix ResourceValues, resolve cqId last
disq Jan 14, 2022
587ed06
lint
disq Jan 14, 2022
ab91afb
CR feedback
disq Jan 15, 2022
86afefc
More CR feedback
disq Jan 15, 2022
75ef915
cqproto: Migrations by dialect
disq Jan 15, 2022
96199af
migration cli prefix fixes
disq Jan 16, 2022
b7a42f9
gocritic fix
disq Jan 16, 2022
69cdd90
Generate CQ ID ignoring internal column values, CR nits
disq Jan 16, 2022
3c20c3e
schema.Database => schema.Storage
disq Jan 16, 2022
a247d08
Update CLI migrator
disq Jan 16, 2022
8e7f5aa
cli migrator: Add -schema param to filter other schemas
disq Jan 16, 2022
fcf06f2
dialect: Remove FKs-as-comments, call DEFINE_FK instead
disq Jan 17, 2022
4e7f5c4
tsdb: Add parent-table FN, better fn names
disq Jan 17, 2022
a9e40bb
Generate CQ ID fix
disq Jan 17, 2022
f7316a3
setup_tsdb_trigger => setup_tsdb_child
disq Jan 17, 2022
5b2bfd7
Add migration test builder
disq Jan 17, 2022
9102a15
Add SetDSNElement to helpers
disq Jan 17, 2022
a99193f
Move DSN helpers to its own package: database/dsn
disq Jan 17, 2022
db9de39
Address CR
disq Jan 17, 2022
a200311
Column tweaking
disq Jan 17, 2022
7670f98
Update SQL formatting
disq Jan 17, 2022
b9c9ab8
CR: minor nits
disq Jan 18, 2022
832b00c
migrator: Use constants for "version"s
disq Jan 18, 2022
bdc9bc5
Merge branch 'main' into feat/all-migrations-as-files
roneli Jan 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cqproto/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import (
"google.golang.org/grpc"
)

const (
V4 = 4

Vunmanaged = -1
)

// CQPlugin This is the implementation of plugin.GRPCServer so we can serve/consume this.
type CQPlugin struct {
// GRPCPlugin must still implement the Stub interface
Expand Down
38 changes: 38 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package database

import (
"context"

"github.com/cloudquery/cq-provider-sdk/database/postgres"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"github.com/hashicorp/go-hclog"
)

type DB struct {
disq marked this conversation as resolved.
Show resolved Hide resolved
schema.Database

dialectType schema.DialectType
}

func New(ctx context.Context, logger hclog.Logger, dsn string) (*DB, error) {
disq marked this conversation as resolved.
Show resolved Hide resolved
dType, newDSN, err := DSNtoDialect(dsn)
if err != nil {
return nil, err
}

dialect := schema.GetDialect(dType)

db, err := postgres.NewPgDatabase(ctx, logger, newDSN, dialect)
if err != nil {
return nil, err
}

return &DB{
Database: db,
dialectType: dType,
}, nil
}

func (d *DB) DialectType() schema.DialectType {
return d.dialectType
}
24 changes: 24 additions & 0 deletions database/dialect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package database

import (
"strings"

"github.com/cloudquery/cq-provider-sdk/helpers"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
)

func DSNtoDialect(dsn string) (d schema.DialectType, newDSN string, err error) {
disq marked this conversation as resolved.
Show resolved Hide resolved
u, err := helpers.ParseConnectionString(dsn)
if err != nil {
return schema.Postgres, dsn, err
}

switch u.Scheme {
case "timescaledb", "tsdb", "timescale":
// TODO remove
disq marked this conversation as resolved.
Show resolved Hide resolved
fixedDSN := strings.Replace(u.String(), u.Scheme+"://", "postgres://", 1)
return schema.TSDB, fixedDSN, nil
default:
return schema.Postgres, dsn, nil
}
}
29 changes: 29 additions & 0 deletions database/postgres/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package postgres

import (
"context"

"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
)

// Connect connects to the given DSN and returns a pgxpool
func Connect(ctx context.Context, dsn string) (*pgxpool.Pool, error) {
poolCfg, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, err
}
poolCfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
UUIDType := pgtype.DataType{
Value: &UUID{},
Name: "uuid",
OID: pgtype.UUIDOID,
}

conn.ConnInfo().RegisterDataType(UUIDType)
return nil
}
poolCfg.LazyConnect = true
return pgxpool.ConnectConfig(ctx, poolCfg)
}
190 changes: 190 additions & 0 deletions database/postgres/pgdatabase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package postgres

import (
"context"
"fmt"
"strconv"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/cloudquery/cq-provider-sdk/provider/schema"
"github.com/cloudquery/cq-provider-sdk/provider/schema/diag"
"github.com/doug-martin/goqu/v9"
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
"github.com/hashicorp/go-hclog"
"github.com/jackc/pgconn"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/spf13/cast"
)

type PgDatabase struct {
pool *pgxpool.Pool
log hclog.Logger
sd schema.Dialect
}

func NewPgDatabase(ctx context.Context, logger hclog.Logger, dsn string, sd schema.Dialect) (*PgDatabase, error) {
pool, err := Connect(ctx, dsn)
if err != nil {
return nil, err
}
return &PgDatabase{
pool: pool,
log: logger,
sd: sd,
}, nil
}

var _ schema.Database = (*PgDatabase)(nil)

// Insert inserts all resources to given table, table and resources are assumed from same table.
func (p PgDatabase) Insert(ctx context.Context, t *schema.Table, resources schema.Resources) error {
if len(resources) == 0 {
return nil
}
// It is safe to assume that all resources have the same columns
cols := quoteColumns(resources.ColumnNames())
psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
sqlStmt := psql.Insert(t.Name).Columns(cols...)
for _, res := range resources {
if res.TableName() != t.Name {
return fmt.Errorf("resource table expected %s got %s", t.Name, res.TableName())
}
values, err := res.Values()
if err != nil {
return fmt.Errorf("table %s insert failed %w", t.Name, err)
}
sqlStmt = sqlStmt.Values(values...)
}

s, args, err := sqlStmt.ToSql()
if err != nil {
return diag.FromError(err, diag.ERROR, diag.DATABASE, t.Name, "bad insert SQL statement created", "")
}
_, err = p.pool.Exec(ctx, s, args...)
if err == nil {
return nil
}
if pgErr, ok := err.(*pgconn.PgError); ok {
// This should rarely occur, but if it occurs we want to print the SQL to debug it further
if pgerrcode.IsSyntaxErrororAccessRuleViolation(pgErr.Code) {
p.log.Debug("insert syntax error", "sql", s)
}
if pgerrcode.IsIntegrityConstraintViolation(pgErr.Code) {
p.log.Debug("insert integrity violation error", "constraint", pgErr.ConstraintName, "errMsg", pgErr.Message)
}
return diag.FromError(err, diag.ERROR, diag.DATABASE, t.Name, fmt.Sprintf("insert failed for table %s", t.Name), pgErr.Message)
}
return diag.FromError(err, diag.ERROR, diag.DATABASE, t.Name, err.Error(), "")
}

// CopyFrom copies all resources from []*Resource
func (p PgDatabase) CopyFrom(ctx context.Context, resources schema.Resources, shouldCascade bool, cascadeDeleteFilters map[string]interface{}) error {
if len(resources) == 0 {
return nil
}
err := p.pool.BeginTxFunc(ctx, pgx.TxOptions{
IsoLevel: pgx.ReadCommitted,
AccessMode: pgx.ReadWrite,
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
if shouldCascade {
q := goqu.Dialect("postgres").Delete(resources.TableName()).Where(goqu.Ex{"cq_id": resources.GetIds()})
for k, v := range cascadeDeleteFilters {
q = q.Where(goqu.Ex{k: goqu.Op{"eq": v}})
}
sql, args, err := q.Prepared(true).ToSQL()
if err != nil {
return err
}
_, err = tx.Exec(ctx, sql, args...)
if err != nil {
return err
}
}
copied, err := tx.CopyFrom(
ctx, pgx.Identifier{resources.TableName()}, resources.ColumnNames(),
pgx.CopyFromSlice(len(resources), func(i int) ([]interface{}, error) {
// use getResourceValues instead of Resource.Values since values require some special encoding for CopyFrom
return p.sd.GetResourceValues(resources[i])
}))
if err != nil {
return err
}
if copied != int64(len(resources)) {
return fmt.Errorf("not all resources copied %d != %d to %s", copied, len(resources), resources.TableName())
}
return nil
})
return err
}

// Exec allows executions of postgres queries with given args returning error of execution
func (p PgDatabase) Exec(ctx context.Context, query string, args ...interface{}) error {
_, err := p.pool.Exec(ctx, query, args...)
return err
}

// Query allows execution of postgres queries with given args returning data result
func (p PgDatabase) Query(ctx context.Context, query string, args ...interface{}) (pgx.Rows, error) {
rows, err := p.pool.Query(ctx, query, args...)
return rows, err
}

// QueryOne allows execution of postgres queries with given args returning data result
func (p PgDatabase) QueryOne(ctx context.Context, query string, args ...interface{}) pgx.Row {
row := p.pool.QueryRow(ctx, query, args...)
return row
}

func (p PgDatabase) Delete(ctx context.Context, t *schema.Table, kvFilters []interface{}) error {
nc := len(kvFilters)
if nc%2 != 0 {
return fmt.Errorf("number of args to delete should be even. Got %d", nc)
}
psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
ds := psql.Delete(t.Name)
for i := 0; i < nc; i += 2 {
ds = ds.Where(sq.Eq{kvFilters[i].(string): kvFilters[i+1]})
}
sql, args, err := ds.ToSql()
if err != nil {
return err
}

_, err = p.pool.Exec(ctx, sql, args...)
return err
}

func (p PgDatabase) RemoveStaleData(ctx context.Context, t *schema.Table, executionStart time.Time, kvFilters []interface{}) error {
q := goqu.Delete(t.Name).WithDialect("postgres").Where(goqu.L(`extract(epoch from (cq_meta->>'last_updated')::timestamp)`).Lt(executionStart.Unix()))
if len(kvFilters)%2 != 0 {
return fmt.Errorf("expected even number of k,v delete filters received %s", kvFilters)
}
for i := 0; i < len(kvFilters); i += 2 {
q = q.Where(goqu.Ex{cast.ToString(kvFilters[i]): goqu.Op{"eq": kvFilters[i+1]}})
}
sql, args, err := q.Prepared(true).ToSQL()
if err != nil {
return fmt.Errorf("failed building query: %w", err)
}
_, err = p.pool.Exec(ctx, sql, args...)
return err
}

func (p PgDatabase) Close() {
p.pool.Close()
}

func (p PgDatabase) Dialect() schema.Dialect {
return p.sd
}

func quoteColumns(columns []string) []string {
for i, v := range columns {
columns[i] = strconv.Quote(v)
}
return columns
}
38 changes: 38 additions & 0 deletions database/postgres/uuid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package postgres

import (
"encoding/hex"

"github.com/jackc/pgtype"
)

type UUID struct {
pgtype.UUID
}

func (dst UUID) Get() interface{} {
switch dst.Status {
case pgtype.Present:
// CQ-Change: Return entire object, not just Bytes
return dst
case pgtype.Null:
return nil
default:
return dst.Status
}
}

func (u UUID) String() string {
buf := make([]byte, 36)

hex.Encode(buf[0:8], u.Bytes[0:4])
buf[8] = '-'
hex.Encode(buf[9:13], u.Bytes[4:6])
buf[13] = '-'
hex.Encode(buf[14:18], u.Bytes[6:8])
buf[18] = '-'
hex.Encode(buf[19:23], u.Bytes[8:10])
buf[23] = '-'
hex.Encode(buf[24:], u.Bytes[10:])
return string(buf)
}
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ require (
github.com/hashicorp/go-plugin v1.4.3
github.com/hashicorp/go-version v1.3.0
github.com/hashicorp/hcl/v2 v2.10.1
github.com/huandu/go-sqlbuilder v1.13.0
github.com/iancoleman/strcase v0.2.0
github.com/jackc/pgconn v1.10.0
github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451
github.com/jackc/pgtype v1.8.1
github.com/jackc/pgx/v4 v4.13.0
github.com/mitchellh/hashstructure v1.1.0
github.com/modern-go/reflect2 v1.0.2
Expand Down Expand Up @@ -47,13 +47,11 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/yamux v0.0.0-20210826001029-26ff87cf9493 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.1.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.8.1 // indirect
github.com/jackc/puddle v1.1.4 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -568,12 +568,6 @@ github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKe
github.com/hashicorp/yamux v0.0.0-20210826001029-26ff87cf9493 h1:brI5vBRUlAlM34VFmnLPwjnCL/FxAJp9XvOdX6Zt+XE=
github.com/hashicorp/yamux v0.0.0-20210826001029-26ff87cf9493/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huandu/go-assert v1.1.5 h1:fjemmA7sSfYHJD7CUqs9qTwwfdNAx7/j2/ZlHXzNB3c=
github.com/huandu/go-assert v1.1.5/go.mod h1:yOLvuqZwmcHIC5rIzrBhT7D3Q9c3GFnd0JrPVhn/06U=
github.com/huandu/go-sqlbuilder v1.13.0 h1:IN1VRzcyQ+Kx74L0g5ZAY5qDaRJjwMWVmb6GrFAF8Jc=
github.com/huandu/go-sqlbuilder v1.13.0/go.mod h1:LILlbQo0MOYjlIiGgOSR3UcWQpd5Y/oZ7HLNGyAUz0E=
github.com/huandu/xstrings v1.3.2 h1:L18LIDzqlW6xN2rEkpdV8+oL/IXWJ1APd+vsdYy4Wdw=
github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0=
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down
Loading