Skip to content
/ pqxd Public

pqxd, the `database/sql` driver for PartiQL in Amazon DynamoDB

License

Notifications You must be signed in to change notification settings

miyamo2/pqxd

Repository files navigation

Go Reference CI GitHub go.mod Go version (subdirectory of monorepo) GitHub release (latest by date) Go Report Card GitHub License

Quick Start

Install

go get github.com/miyamo2/pqxd

Usage

package main

import (
	"context"
	"database/sql"
	"fmt"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/miyamo2/pqxd"
	"log"
	"time"
)

func main() {
	ctx := context.Background()
	cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("ap-northeast-1"))
	if err != nil {
		log.Fatalf("unable to load SDK config, %v", err)
	}
	db := sql.OpenDB(pqxd.NewConnector(cfg))
	if db == nil {
		log.Fatal(err)
	}
	if err := db.Ping(); err != nil {
		log.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	rows, err := db.QueryContext(ctx, `SELECT id, name FROM "users"`)
	if err != nil {
		fmt.Printf("something happend. err: %s\n", err.Error())
		return
	}

	for rows.NextResultSet() { // page feed with next token
		for rows.Next() {
			var (
				id   string
				name string
			)
			if err := rows.Scan(&id, &name); err != nil {
				fmt.Printf("something happend. err: %s\n", err.Error())
				continue
			}
			fmt.Printf("id: %s, name: %s\n", id, name)
		}
	}
}

SELECT

Tip

If * is specified in the select column list,
the results of the rows are automatically sorted by column name(asc).

However, if specified with *, the number of attributes may differ from row to row.

Therefore, it is recommended that the selection column list specify the attribute names.

Scan
rows, err := db.QueryContext(context.Background(), `SELECT id, name FROM "users"`)
for rows.NextResultSet() { // page feed with next token
    for rows.Next() {
        var (
            id string
            name string
        )
        if err := rows.Scan(&id, &name); err != nil {
            fmt.Printf("something happend. err: %s\n", err.Error())
            continue
        }
        fmt.Printf("id: %s, name: %s\n", id, name)
    }
}
GetItem
row := db.QueryRowContext(context.Background(), `SELECT id, name FROM "users" WHERE id = ?`, "1")
var (
    id string
    name string
)
if err := row.Scan(&id, &name); err != nil {
    fmt.Printf("something happend. err: %s\n", err.Error())
    return
}
fmt.Printf("id: %s, name: %s\n", id, name)
GetItem with Global Secondary Index
row := db.QueryRowContext(context.Background(), `SELECT id, name FROM "users"."gsi_pk-gsi-sk_index" WHERE gsi_pk = ? AND gsi_sk = ?`, "foo", "bar")

var (
    id string
    name string
)
if err := row.Scan(&id, &name); err != nil {
    fmt.Printf("something happend. err: %s\n", err.Error())
    return
}
fmt.Printf("id: %s, name: %s\n", id, name)
With Prepared Statement
ctx := context.Background()

stmt, err := db.PrepareContext(ctx, `SELECT id, name FROM "users" WHERE id = ?`)
if err != nil {
    fmt.Printf("something happend. err: %s\n", err.Error())
    return
}
defer stmt.Close()

rows, err := stmt.QueryRowContext(ctx, "1")
if err != nil {
    fmt.Printf("something happend. err: %s\n", err.Error())
    return
}

var (
    id string
    name string
)
if err := row.Scan(&id, &name); err != nil {
    fmt.Printf("something happend. err: %s\n", err.Error())
    return
}
fmt.Printf("id: %s, name: %s\n", id, name)
With Transaction
tx, err := db.Begin()
if err != nil {
    return err
}

ctx := context.Background()

rows, err := tx.QueryContext(ctx, `SELECT id, name FROM "users" WHERE id = ?`, "1")
if err != nil {
    tx.Rollback()
    return err
}

row := tx.QueryRowContext(ctx, `SELECT id, name FROM "users" WHERE id = ?`, "2")

// WARNING: Do not use `tx.Commit()` when using `SELECT` statement.
//
// Each `sql.Rows` or `sql.Row` is resolved 
// the first time `rows.NextResultSet()`, `rows.Next()` or `row.Scan()` 
// is performed within that transaction.
// So, after the `rows.NextResultSet()`, `rows.Next()` or `row.Scan()` is performed,
// the transaction is automatically committed.
for rows.Next() {
    var (
        id string
        name string
    )
    if err := rows.Scan(&id, &name); err != nil {
        fmt.Printf("something happend. err: %s\n", err.Error())
        continue
    }
    fmt.Printf("id: %s, name: %s\n", id, name)
}

var (
    id string
    name string
)
if err := row.Scan(&id, &name); err != nil {
    fmt.Printf("something happend. err: %s\n", err.Error())
    return
}
fmt.Printf("id: %s, name: %s\n", id, name)
RETURNING

pqxd supports the RETURNING clause.

row := db.QueryRowContext(context.Background(), `UPDATE "users" SET name = ? SET nickname = ? WHERE id = ? RETURNING MODIFIED OLD *`, "David", "Dave", "3")

var name, nickname sql.NullString
if err := row.Scan(&name, &nickname); err != nil {
    fmt.Printf("something happend. err: %s\n", err.Error())
    return
}
if name.Valid {
    fmt.Printf("name: %s\n", name.String)
}
if nickname.Valid {
    fmt.Printf("nickname: %s\n", nickname.String)
}

And provides individual syntax for specifying a column list instead of *.

row := db.QueryRowContext(context.Background(), `UPDATE "users" SET name = ? SET nickname = ? WHERE id = ? RETURNING ALL OLD id`, "Robert", "Bob", "2")

var id string
if err := row.Scan(&id); err != nil {
    fmt.Printf("something happend. err: %s\n", err.Error())
    return
}
fmt.Printf("id: %s\n", id)
Describe Table

pqxd supports the DescribeTable API with !pqxd_describe_table, the meta-table.

row := db.QueryRowContext(context.Background(), `SELECT TableStatus FROM "!pqxd_describe_table" WHERE table_name = ?`, "users")

var tableStatus pqxd.TableStatus
if err := row.Scan(&tableStatus); err != nil {
    fmt.Println(err.Error())
    return
}
fmt.Printf("TableStatus: %v\n", tableStatus)
List Tables

pqxd supports the ListTables API with !pqxd_list_tables, the meta-table.

rows, err := db.QueryContext(context.Background(), `SELECT * FROM "!pqxd_list_tables"`)

for rows.NextResultSet() { // page feed with last-evaluated-key
    for rows.Next() {
        var tableName string
        if err := rows.Scan(&tableName); err != nil {
            fmt.Println(err.Error())
            continue
        }
        fmt.Printf("tableName: %s\n", tableName)
    }
}

INSERT/UPDATE/DELETE

insertResult, err := db.Exec(`INSERT INTO "users" VALUE { 'id': ?, 'name': ? }`, "3", "Alice")
if err != nil {
    return err
}
affected, err := insertResult.RowsAffected()
if err != nil {
    return err
}
if affected != 1 {
    return fmt.Errorf("expected 1 row affected, got %d", affected)
}

updateResult, err := db.Exec(`UPDATE "users" SET name = ? WHERE id = ?`, "Bob", "2")
if err != nil {
    return err
}
affected, err = updateResult.RowsAffected()
if err != nil {
    return err
}
if affected != 1 {
    return fmt.Errorf("expected 1 row affected, got %d", affected)
}

deleteResult, err := db.Exec(`DELETE FROM "users" WHERE id = ?`, "1")
if err != nil {
    return err
}
affected, err = deleteResult.RowsAffected()
if err != nil {
    return err
}
if affected != 1 {
    return fmt.Errorf("expected 1 row affected, got %d", affected)
}
With Prepared Statement
stmt, err := db.Prepare(`INSERT INTO "users" VALUE { 'id': ?, 'name': ? }`)
if err != nil {
    return err
}
defer stmt.Close()

insertResult, err := stmt.Exec("3", "Alice")
if err != nil {
    return err
}
affected, err := insertResult.RowsAffected()
if err != nil {
    return err
}
if affected != 1 {
    return fmt.Errorf("expected 1 row affected, got %d", affected)
}
With Transaction
tx, err := db.Begin()
if err != nil {
    return err
}

insertResult, err := tx.Exec(`INSERT INTO "users" VALUE { 'id': ?, 'name': ? }`, "3", "Alice")
if err != nil {
    tx.Rollback()
    return err
}

updateResult, err := tx.Exec(`UPDATE "users" SET name = ? WHERE id = ?`, "Bob", "2")
if err != nil {
    tx.Rollback()
    return err
}

deleteResult, err := tx.Exec(`DELETE FROM "users" WHERE id = ?`, "1")
if err != nil {
    tx.Rollback()
    return err
}

// RowsAffected is available after commit
tx.Commit()

// RowsAffected might return 0 or 1. If 0, it means statement is not successful.
if affected, err := insertResult.RowsAffected(); err != nil || affected != 1 {
    return err
}

if affected, err := updateResult.RowsAffected(); err != nil || affected != 1 {
    return err
}

if affected, err := deleteResult.RowsAffected(); err != nil || affected != 1 {
    return err
}

DSN(Data Source Name) String

We recommend using sql.OpenDB with pqxd.NewConnector instead of sql.Open. But if you want to use sql.Open, you can use the following DSN string.

AWS_REGION=<aws region>
;AWS_ACCESS_KEY_ID=<access key ID>
;AWS_SECRET_ACCESS_KEY=<secret access key>
[;ENDPOINT=<amazon dynamodb endpoint>]
Key description
AWS_REGION AWS Region. If not supplied, it is resolved from one of the following environment variables; AWS_REGION or AWS_DEFAULT_REGION.
AWS_ACCESS_KEY_ID AWS Access Key ID. If not supplied, it is resolved from one of the following environment variables; AWS_ACCESS_KEY or AWS_ACCESS_KEY_ID.
AWS_SECRET_ACCESS_KEY AWS Secret Access Key. If not supplied, it is resolved from one of the following environment variables; AWS_SECRET_KEY or AWS_SECRET_ACCESS_KEY.
ENDPOINT Endpoint of DynamoDB. Used to connect locally to an emulator or to a DynamoDB compatible interface.
db, err := sql.Open(pqxd.DriverName, "AWS_REGION=ap-northeast-1;AWS_ACCESS_KEY_ID=AKIA...;AWS_SECRET_ACCESS_KEY=...;")

Tip

If the application is run on AWS Lambda, connections can be obtained even if the DSN is an empty string. This is because the region, access key, and secret key are defined as runtime environment variables.

O11y

New Relic
package main

import (
	"context"
	"database/sql"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/miyamo2/pqxd"
	nraws "github.com/newrelic/go-agent/v3/integrations/nrawssdk-v2"
	"log"
)

func main() {
	ctx := context.Background()
	cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("ap-northeast-1"))
	if err != nil {
		log.Fatalf("unable to load SDK config, %v", err)
	}
	
	// Instrumenting New Relic
	nraws.AppendMiddlewares(&cfg.APIOptions, nil)
	
	db := sql.OpenDB(pqxd.NewConnector(cfg))
	if db == nil {
		log.Fatal(err)
	}
	db.Ping()
}
Datadog
package main

import (
	"context"
	"database/sql"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/miyamo2/pqxd"
	awstrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/aws/aws-sdk-go-v2/aws"
	"log"
)

func main() {
	ctx := context.Background()
	cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("ap-northeast-1"))
	if err != nil {
		log.Fatalf("unable to load SDK config, %v", err)
	}
	
	// Instrumenting Datadog
	awstrace.AppendMiddleware(&cfg)

	db := sql.OpenDB(pqxd.NewConnector(cfg))
	if db == nil {
		log.Fatal(err)
	}
	db.Ping()
}
AWS X-Ray
package main

import (
	"context"
	"database/sql"
	"github.com/miyamo2/pqxd"
	"log"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
	"github.com/aws/aws-xray-sdk-go/instrumentation/awsv2"
	"github.com/aws/aws-xray-sdk-go/xray"
)

func main() {
	ctx := context.Background()
	cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("ap-northeast-1"))
	if err != nil {
		log.Fatalf("unable to load SDK config, %v", err)
	}
	
	// Instrumenting X-Ray
	awsv2.AWSV2Instrumentor(&cfg.APIOptions)

	db := sql.OpenDB(pqxd.NewConnector(cfg))
	if db == nil {
		log.Fatal(err)
	}
	db.Ping()
}

Contributing

Feel free to open a PR or an Issue.

However, you must promise to follow our Code of Conduct.

Tasks

We recommend that this section be run with xc

test:unit

go test -v -coverpkg='github.com/miyamo2/pqxd' -coverprofile=coverage.out

test:integration

cd tests/integration
xc test

License

pqxd released under the MIT License

Special Thanks

pqxd is inspired by the following projects.
With the utmost respect, we would like to thank the authors and contributors of these projects.