Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for bigquery #9

Merged
merged 7 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"fmt"

"github.com/thesaas-company/xray/config"
"github.com/thesaas-company/xray/databases/bigquery"
"github.com/thesaas-company/xray/databases/mysql"
"github.com/thesaas-company/xray/databases/postgres"
"github.com/thesaas-company/xray/databases/redshift"
"github.com/thesaas-company/xray/databases/snowflake"
"github.com/thesaas-company/xray/logger"
"github.com/thesaas-company/xray/types"
Expand Down Expand Up @@ -34,14 +36,26 @@ func NewClientWithConfig(dbConfig *config.Config, dbType types.DbType) (types.IS
return nil, err
}
return logger.NewLogger(sqlClient), nil
case types.BigQuery:
bigqueryClient, err := bigquery.NewBigQueryWithConfig(dbConfig)
if err != nil {
return nil, err
}
return logger.NewLogger(bigqueryClient), nil
case types.Redshift:
redshiftClient, err := redshift.NewRedshiftWithConfig(dbConfig)
if err != nil {
return nil, err
}
return logger.NewLogger(redshiftClient), nil
default:
return nil, fmt.Errorf("unsupported database type: %s", dbType)
}
}

// NewClient creates a new SQL client with the given database client and database type.
// It returns an error if the database type is not supported or if there is a problem creating the client.
func NewClient(dbClient *sql.DB, dbType types.DbType) (types.ISQL, error) {
func NewClient(dbClient *sql.DB , bq *bigquery.BigQuery, rs *redshift.Redshift, dbType types.DbType) (types.ISQL, error) {

switch dbType {
case types.MySQL:
Expand All @@ -62,6 +76,18 @@ func NewClient(dbClient *sql.DB, dbType types.DbType) (types.ISQL, error) {
return nil, err
}
return logger.NewLogger(sqlClient), nil
case types.BigQuery:
BigQueryClient, err := bigquery.NewBigQuery(bq.Client)
if err != nil {
return nil, err
}
return logger.NewLogger(BigQueryClient), nil
case types.Redshift:
redshiftClient, err := redshift.NewRedshift(rs.Client)
if err != nil {
return nil, err
}
return logger.NewLogger(redshiftClient), nil
default:
return nil, fmt.Errorf("unsupported database type: %s", dbType)
}
Expand Down
67 changes: 43 additions & 24 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,58 @@ package config

// Once we are done with mysql and postgres, Let's rethink about the config structure


// Config holds the configuration details for various databases.
type Config struct {
// Host is the database host URL.
Host string `yaml:"host" pflag:",Database host url"`
// Host is the database host URL.
Host string `yaml:"host" pflag:",Database host url"`

// Username is the database username.
Username string `yaml:"username" pflag:",Database username"`

// DatabaseName is the name of the database.
DatabaseName string `yaml:"database" pflag:",Database name"`

// Port is the database port.
Port string `yaml:"port" pflag:",Database Port"`

// SSL is used to enable or disable SSL for the database connection.
SSL string `yaml:"ssl" pflag:",Database ssl enable/disable"`

// ProjectID is the BigQuery project ID.
ProjectID string `yaml:"project_id" pflag:",BigQuery project ID"`

// JSONKeyPath is the path to the BigQuery JSON key file.
JSONKeyPath string `yaml:"json_key_path" pflag:",Path to BigQuery JSON key file"`

// Username is the database username.
Username string `yaml:"username" pflag:",Database username"`
// Warehouse is the Snowflake warehouse.
Warehouse string `yaml:"warehouse" pflag:",Snowflake warehouse"`

// DatabaseName is the name of the database.
DatabaseName string `yaml:"database" pflag:",Database name"`
// Schema is the Snowflake database schema.
Schema string `yaml:"schema" pflag:",Snowflake database schema"`

// Port is the database port.
Port string `yaml:"port" pflag:",Database Port"`
// Account is the Snowflake account ID.
Account string `yaml:"account" pflag:",Snowflake account ID"`

// SSL is used to enable or disable SSL for the database connection.
SSL string `yaml:"ssl" pflag:",Database ssl enable/disable"`
// Debug is used to enable or disable debug mode.
Debug bool `yaml:"debug" pflag:",Debug mode"`

// ProjectID is the BigQuery project ID.
ProjectID string `yaml:"project_id" pflag:",BigQuery project ID"`
// AWS holds the AWS configuration details.
AWS AWS `yaml:"aws"`
}

// JSONKeyPath is the path to the BigQuery JSON key file.
JSONKeyPath string `yaml:"json_key_path" pflag:",Path to BigQuery JSON key file"`
type AWS struct {
// Region is the AWS region.
Region string `yaml:"region" pflag:",AWS region"`

// Warehouse is the Snowflake warehouse.
Warehouse string `yaml:"warehouse" pflag:",Snowflake warehouse"`
// AccessKey is the AWS access key.
AccessKey string `yaml:"access_key" pflag:",AWS access key"`

// Schema is the Snowflake database schema.
Schema string `yaml:"schema" pflag:",Snowflake database schema"`
// SecretKey is the AWS secret key.
SecretAccessKey string `yaml:"secret_key" pflag:",AWS secret key"`

// Account is the Snowflake account ID.
Account string `yaml:"account" pflag:",Snowflake account ID"`
// ClusterIdentifier is the AWS cluster identifier.
ClusterIdentifier string `yaml:"cluster_identifier" pflag:",AWS cluster identifier"`

// Debug is used to enable or disable debug mode.
Debug bool `yaml:"debug" pflag:",Debug mode"`
}
// SecretArn is the AWS secret ARN.
SecretArn string `yaml:"secret_arn" pflag:",AWS secret ARN"`
}
145 changes: 145 additions & 0 deletions databases/bigquery/bigquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package bigquery

import (
"context"
"database/sql"
"encoding/json"
"fmt"

"cloud.google.com/go/bigquery"
"github.com/thesaas-company/xray/config"
"github.com/thesaas-company/xray/types"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)

// The BigQuery struct is responsible for holding the BigQuery client and configuration.
type BigQuery struct {
Client *bigquery.Client
Config *config.Config
}

func NewBigQuery(client *bigquery.Client) (types.ISQL, error) {
return &BigQuery{
Client: client,
Config: &config.Config{},
}, nil
}

func NewBigQueryWithConfig(cfg *config.Config) (types.ISQL, error) {
ctx := context.Background()
client, err := bigquery.NewClient(ctx, cfg.ProjectID, option.WithCredentialsFile(cfg.JSONKeyPath))
if err != nil {
return nil, err
}

return &BigQuery{
Client: client,
Config: cfg,
}, nil

}

// this function extarcts the schema of a table in BigQuery.
// It takes table name as input and returns a Table struct and an error.
func (b *BigQuery) Schema(table string) (types.Table, error) {
ctx := context.Background()
var schema types.Table

tableRef := b.Client.Dataset(b.Config.DatabaseName).Table(table)
schemaInfo, err := tableRef.Metadata(ctx)
if err != nil {
return types.Table{}, fmt.Errorf("error getting table metadata: %v", err)
}

schema.Name = schemaInfo.Name
schema.Description = schemaInfo.Description
schema.Columns = make([]types.Column, len(schemaInfo.Schema))
for i, fieldSchema := range schemaInfo.Schema {
schema.Columns[i] = types.Column{
Name: fieldSchema.Name,
Type: string(fieldSchema.Type),
Description: fieldSchema.Description,
CharacterMaximumLength: sql.NullInt64{
Int64: fieldSchema.MaxLength,
Valid: fieldSchema.MaxLength != 0,
},
DefaultValue: sql.NullString{
String: fieldSchema.DefaultValueExpression,
Valid: fieldSchema.DefaultValueExpression != "",
},
}
}

fmt.Println(schema)
return schema, nil
}

// Execute runs a query in BigQuery and returns the results as a byte slice.
// It takes a query string as input and returns a byte slice and an error.
func (b *BigQuery) Execute(query string) ([]byte, error) {
ctx := context.Background()
q := b.Client.Query(query)

exe, err := q.Run(ctx)
if err != nil {
return nil, fmt.Errorf("error running query")
}

status, err := exe.Wait(ctx)
if err != nil {
return nil, fmt.Errorf("error while waiting for query to complete: %v", err)
}

if err := status.Err(); err != nil {
return nil, fmt.Errorf("expected nil, found err: %v", err)
}

it, err := exe.Read(ctx)
if err != nil {
return nil, fmt.Errorf("error reading query results: %v", err)
}

var result []map[string]interface{}
for {
var values map[string]interface{}
err := it.Next(&values)
if err == iterator.Done {
break
}
if err != nil {
return nil, fmt.Errorf("error scanning row: %v", err)
}
result = append(result, values)
}

// Marshal the rows into JSON
jsonData, err := json.Marshal(result)
if err != nil {
return nil, fmt.Errorf("error marshaling query results: %v", err)
}

return jsonData, nil
}

// Tables returns a list of tables in a dataset.
// It takes a dataset name as input and returns a slice of strings and an error.

func (b *BigQuery) Tables(Dataset string) ([]string, error) {
res := b.Client.Dataset(Dataset).Tables(context.Background())
var tables []string

for {
table, err := res.Next()
if err == iterator.Done {
break
}

if err != nil {
return nil, fmt.Errorf("error scanning dataset")
}
tables = append(tables, table.TableID)
}

return tables, nil
}
106 changes: 106 additions & 0 deletions databases/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package bigquery

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/thesaas-company/xray/types"
)

// MockBigQuery is a mock implementation of the BigQuery struct.
type MockBigQuery struct {
mock.Mock
}

// Schema is a mock implementation of the Schema method.
func (m *MockBigQuery) Schema(table string) (types.Table, error) {
args := m.Called(table)
return args.Get(0).(types.Table), args.Error(1)
}

// Execute is a mock implementation of the Execute method.
func (m *MockBigQuery) Execute(query string) ([]byte, error) {
args := m.Called(query)
return args.Get(0).([]byte), args.Error(1)
}

// Tables is a mock implementation of the Tables method.
func (m *MockBigQuery) Tables(dataset string) ([]string, error) {
args := m.Called(dataset)
return args.Get(0).([]string), args.Error(1)
}

func TestBigQuery_Schema(t *testing.T) {
// Create a new instance of the mock
mockBigQuery := new(MockBigQuery)

// Set the expected return values
expectedSchema := types.Table{
Name: "table_name",
Description: "table_description",
Columns: []types.Column{
{
Name: "column1",
Type: "string",
},
{
Name: "column2",
Type: "int",
},
},
}
mockBigQuery.On("Schema", "table_name").Return(expectedSchema, nil)

// Call the method under test
actualSchema, err := mockBigQuery.Schema("table_name")

// Assert the expected return values
assert.NoError(t, err)
assert.Equal(t, expectedSchema, actualSchema)

// Assert that the method was called with the correct arguments
mockBigQuery.AssertCalled(t, "Schema", "table_name")
fmt.Println(expectedSchema, actualSchema)
}

func TestBigQuery_Execute(t *testing.T) {
// Create a new instance of the mock
mockBigQuery := new(MockBigQuery)

// Set the expected return values
expectedResult := []byte(`{"result": "success"}`)
mockBigQuery.On("Execute", "SELECT * FROM table").Return(expectedResult, nil)

// Call the method under test
actualResult, err := mockBigQuery.Execute("SELECT * FROM table")

// Assert the expected return values
assert.NoError(t, err)
assert.Equal(t, expectedResult, actualResult)

// Assert that the method was called with the correct arguments
mockBigQuery.AssertCalled(t, "Execute", "SELECT * FROM table")
fmt.Println(expectedResult, actualResult)
}

func TestBigQuery_Tables(t *testing.T) {
// Create a new instance of the mock
mockBigQuery := new(MockBigQuery)

// Set the expected return values
expectedTables := []string{"table1", "table2"}
mockBigQuery.On("Tables", "dataset").Return(expectedTables, nil)

// Call the method under test
actualTables, err := mockBigQuery.Tables("dataset")

// Assert the expected return values
assert.NoError(t, err)
assert.Equal(t, expectedTables, actualTables)

// Assert that the method was called with the correct arguments
mockBigQuery.AssertCalled(t, "Tables", "dataset")
fmt.Println(expectedTables, actualTables)
}
Loading
Loading