Skip to content

Commit

Permalink
feat: Migrate streams to the new sdk (#2113)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-jcieslak committed Oct 16, 2023
1 parent f7d0d97 commit 521fde5
Show file tree
Hide file tree
Showing 16 changed files with 2,206 additions and 16 deletions.
2 changes: 2 additions & 0 deletions pkg/sdk/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Client struct {
SessionPolicies SessionPolicies
Sessions Sessions
Shares Shares
Streams Streams
Tags Tags
Tasks Tasks
Users Users
Expand Down Expand Up @@ -158,6 +159,7 @@ func (c *Client) initialize() {
c.SessionPolicies = &sessionPolicies{client: c}
c.Sessions = &sessions{client: c}
c.Shares = &shares{client: c}
c.Streams = &streams{client: c}
c.SystemFunctions = &systemFunctions{client: c}
c.Tags = &tags{client: c}
c.Tasks = &tasks{client: c}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sdk/poc/example/database_role_def.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
dbRoleSet = g.QueryStruct("DatabaseRoleSet").
// Fields
TextAssignment("COMMENT", g.ParameterOptions().SingleQuotes().Required()).
OptionalQueryStructField("NestedThirdLevel", nestedThirdLevel, g.ListOptions().NoParens().SQL("NESTED"))
OptionalQueryStructField("NestedThirdLevel", nestedThirdLevel, g.ListOptions().NoParentheses().SQL("NESTED"))

dbRoleUnset = g.QueryStruct("DatabaseRoleUnset").
// Fields
Expand Down Expand Up @@ -57,9 +57,9 @@ var (
SQL("DATABASE ROLE").
IfExists().
Name().
OptionalQueryStructField("Rename", dbRoleRename, g.ListOptions().NoParens().SQL("RENAME TO")).
OptionalQueryStructField("Set", dbRoleSet, g.ListOptions().NoParens().SQL("SET")).
OptionalQueryStructField("Unset", dbRoleUnset, g.ListOptions().NoParens().SQL("UNSET")).
OptionalQueryStructField("Rename", dbRoleRename, g.ListOptions().NoParentheses().SQL("RENAME TO")).
OptionalQueryStructField("Set", dbRoleSet, g.ListOptions().NoParentheses().SQL("SET")).
OptionalQueryStructField("Unset", dbRoleUnset, g.ListOptions().NoParentheses().SQL("UNSET")).
// Validations
WithValidation(g.ValidIdentifier, "name").
WithValidation(g.ExactlyOneValueSet, "Rename", "Set", "Unset"),
Expand Down
12 changes: 11 additions & 1 deletion pkg/sdk/poc/generator/field_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func (v *ParameterTransformer) NoEquals() *ParameterTransformer {
return v
}

func (v *ParameterTransformer) ArrowEquals() *ParameterTransformer {
v.equals = "arrow_equals"
return v
}

func (v *ParameterTransformer) SingleQuotes() *ParameterTransformer {
v.quotes = "single_quotes"
return v
Expand Down Expand Up @@ -131,7 +136,12 @@ func (v *ListTransformer) Required() *ListTransformer {
return v
}

func (v *ListTransformer) NoParens() *ListTransformer {
func (v *ListTransformer) Parentheses() *ListTransformer {
v.parentheses = "parentheses"
return v
}

func (v *ListTransformer) NoParentheses() *ListTransformer {
v.parentheses = "no_parentheses"
return v
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sdk/poc/generator/keyword_builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ func (v *queryStruct) OptionalLimit() *queryStruct {
v.fields = append(v.fields, NewField("Limit", "*LimitFrom", Tags().Keyword().SQL("LIMIT"), nil))
return v
}

func (v *queryStruct) OptionalCopyGrants() *queryStruct {
return v.OptionalSQL("COPY GRANTS")
}
8 changes: 8 additions & 0 deletions pkg/sdk/poc/generator/parameter_builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,11 @@ func (v *queryStruct) OptionalBooleanAssignment(sqlPrefix string, transformer *P
func (v *queryStruct) OptionalIdentifierAssignment(sqlPrefix string, identifierKind string, transformer *ParameterTransformer) *queryStruct {
return v.OptionalAssignment(sqlPrefix, identifierKind, transformer)
}

func (v *queryStruct) OptionalComment() *queryStruct {
return v.OptionalTextAssignment("COMMENT", ParameterOptions().SingleQuotes())
}

func (v *queryStruct) SetComment() *queryStruct {
return v.OptionalTextAssignment("SET COMMENT", ParameterOptions().SingleQuotes())
}
1 change: 1 addition & 0 deletions pkg/sdk/poc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var definitionMapping = map[string]*generator.Interface{
"network_policies_def.go": sdk.NetworkPoliciesDef,
"session_policies_def.go": sdk.SessionPoliciesDef,
"tasks_def.go": sdk.TasksDef,
"streams_def.go": sdk.StreamsDef,
}

func main() {
Expand Down
201 changes: 201 additions & 0 deletions pkg/sdk/streams_def.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package sdk

import g "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk/poc/generator"

//go:generate go run ./poc/main.go

var (
onStreamDef = g.QueryStruct("OnStream").
OptionalSQL("AT").
OptionalSQL("BEFORE").
QueryStructField(
"Statement",
g.QueryStruct("OnStreamStatement").
OptionalTextAssignment("TIMESTAMP", g.ParameterOptions().ArrowEquals()).
OptionalTextAssignment("OFFSET", g.ParameterOptions().ArrowEquals()).
OptionalTextAssignment("STATEMENT", g.ParameterOptions().ArrowEquals()).
OptionalTextAssignment("STREAM", g.ParameterOptions().ArrowEquals().SingleQuotes()).
WithValidation(g.ExactlyOneValueSet, "Timestamp", "Offset", "Statement", "Stream"),
g.ListOptions().Parentheses(),
).
WithValidation(g.ExactlyOneValueSet, "At", "Before")

showStreamDbRowDef = g.DbStruct("showStreamsDbRow").
Field("created_on", "time.Time").
Field("name", "string").
Field("database_name", "string").
Field("schema_name", "string").
Field("tableOn", "sql.NullString").
Field("owner", "sql.NullString").
Field("comment", "sql.NullString").
Field("table_name", "sql.NullString").
Field("source_type", "sql.NullString").
Field("base_tables", "sql.NullString").
Field("type", "sql.NullString").
Field("stale", "sql.NullString").
Field("mode", "sql.NullString").
Field("stale_after", "sql.NullTime").
Field("invalid_reason", "sql.NullString").
Field("owner_role_type", "sql.NullString")

streamPlainStructDef = g.PlainStruct("Stream").
Field("CreatedOn", "time.Time").
Field("Name", "string").
Field("DatabaseName", "string").
Field("SchemaName", "string").
Field("TableOn", "*string").
Field("Owner", "*string").
Field("Comment", "*string").
Field("TableName", "*string").
Field("SourceType", "*string").
Field("BaseTables", "*string").
Field("Type", "*string").
Field("Stale", "*string").
Field("Mode", "*string").
Field("StaleAfter", "*time.Time").
Field("InvalidReason", "*string").
Field("OwnerRoleType", "*string")

StreamsDef = g.NewInterface(
"Streams",
"Stream",
g.KindOfT[SchemaObjectIdentifier](),
).
CustomOperation(
"CreateOnTable",
"https://docs.snowflake.com/en/sql-reference/sql/create-stream",
g.QueryStruct("CreateStreamOnTable").
Create().
OrReplace().
SQL("STREAM").
IfNotExists().
Name().
OptionalCopyGrants().
SQL("ON TABLE").
Identifier("TableId", g.KindOfT[SchemaObjectIdentifier](), g.IdentifierOptions().Required()).
OptionalQueryStructField("On", onStreamDef, g.KeywordOptions()).
OptionalBooleanAssignment("APPEND_ONLY", nil).
OptionalBooleanAssignment("SHOW_INITIAL_ROWS", nil).
OptionalComment().
WithValidation(g.ValidIdentifier, "name").
WithValidation(g.ValidIdentifier, "TableId").
WithValidation(g.ConflictingFields, "IfNotExists", "OrReplace"),
).
CustomOperation(
"CreateOnExternalTable",
"https://docs.snowflake.com/en/sql-reference/sql/create-stream",
g.QueryStruct("CreateStreamOnExternalTable").
Create().
OrReplace().
SQL("STREAM").
IfNotExists().
Name().
OptionalCopyGrants().
SQL("ON EXTERNAL TABLE").
Identifier("ExternalTableId", g.KindOfT[SchemaObjectIdentifier](), g.IdentifierOptions().Required()).
OptionalQueryStructField("On", onStreamDef, g.KeywordOptions()).
OptionalBooleanAssignment("INSERT_ONLY", nil).
OptionalComment().
WithValidation(g.ValidIdentifier, "name").
WithValidation(g.ValidIdentifier, "ExternalTableId").
WithValidation(g.ConflictingFields, "IfNotExists", "OrReplace"),
).
CustomOperation(
"CreateOnDirectoryTable",
"https://docs.snowflake.com/en/sql-reference/sql/create-stream",
g.QueryStruct("CreateStreamOnDirectoryTable").
Create().
OrReplace().
SQL("STREAM").
IfNotExists().
Name().
OptionalCopyGrants().
SQL("ON STAGE").
Identifier("StageId", g.KindOfT[SchemaObjectIdentifier](), g.IdentifierOptions().Required()).
OptionalComment().
WithValidation(g.ValidIdentifier, "name").
WithValidation(g.ValidIdentifier, "StageId").
WithValidation(g.ConflictingFields, "IfNotExists", "OrReplace"),
).
CustomOperation(
"CreateOnView",
"https://docs.snowflake.com/en/sql-reference/sql/create-stream",
g.QueryStruct("CreateStreamOnView").
Create().
OrReplace().
SQL("STREAM").
IfNotExists().
Name().
OptionalCopyGrants().
SQL("ON VIEW").
Identifier("ViewId", g.KindOfT[SchemaObjectIdentifier](), g.IdentifierOptions().Required()).
OptionalQueryStructField("On", onStreamDef, g.KeywordOptions()).
OptionalBooleanAssignment("APPEND_ONLY", nil).
OptionalBooleanAssignment("SHOW_INITIAL_ROWS", nil).
OptionalComment().
WithValidation(g.ValidIdentifier, "name").
WithValidation(g.ValidIdentifier, "ViewId").
WithValidation(g.ConflictingFields, "IfNotExists", "OrReplace"),
).
CustomOperation(
"Clone",
"https://docs.snowflake.com/en/sql-reference/sql/create-stream#variant-syntax",
g.QueryStruct("CloneStream").
Create().
OrReplace().
SQL("STREAM").
Name().
Identifier("sourceStream", g.KindOfT[SchemaObjectIdentifier](), g.IdentifierOptions().SQL("CLONE").Required()).
OptionalCopyGrants().
WithValidation(g.ValidIdentifier, "name"),
).
AlterOperation(
"https://docs.snowflake.com/en/sql-reference/sql/alter-stream",
g.QueryStruct("AlterStream").
Alter().
SQL("STREAM").
IfExists().
Name().
OptionalTextAssignment("SET COMMENT", g.ParameterOptions().SingleQuotes()).
OptionalSQL("UNSET COMMENT").
SetTags().
UnsetTags().
WithValidation(g.ValidIdentifier, "name").
WithValidation(g.ConflictingFields, "IfExists", "UnsetTags").
WithValidation(g.ExactlyOneValueSet, "SetComment", "UnsetComment", "SetTags", "UnsetTags"),
).
DropOperation(
"https://docs.snowflake.com/en/sql-reference/sql/drop-stream",
g.QueryStruct("DropStream").
Drop().
SQL("STREAM").
IfExists().
Name().
WithValidation(g.ValidIdentifier, "name"),
).
ShowOperation(
"https://docs.snowflake.com/en/sql-reference/sql/show-streams",
showStreamDbRowDef,
streamPlainStructDef,
g.QueryStruct("ShowStreams").
Show().
Terse().
SQL("STREAMS").
OptionalLike().
OptionalIn().
OptionalStartsWith().
OptionalLimit(),
).
ShowByIdOperation().
DescribeOperation(
g.DescriptionMappingKindSingleValue,
"https://docs.snowflake.com/en/sql-reference/sql/desc-stream",
showStreamDbRowDef,
streamPlainStructDef,
g.QueryStruct("DescribeStream").
Describe().
SQL("STREAM").
Name().
WithValidation(g.ValidIdentifier, "name"),
)
)
Loading

0 comments on commit 521fde5

Please sign in to comment.