Skip to content

Commit

Permalink
Asa sql output (#3567)
Browse files Browse the repository at this point in the history
  • Loading branch information
rlekni authored and katbyte committed Jun 13, 2019
1 parent abf8f47 commit 975e2e9
Show file tree
Hide file tree
Showing 5 changed files with 574 additions and 0 deletions.
1 change: 1 addition & 0 deletions azurerm/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ func Provider() terraform.ResourceProvider {
"azurerm_stream_analytics_job": resourceArmStreamAnalyticsJob(),
"azurerm_stream_analytics_function_javascript_udf": resourceArmStreamAnalyticsFunctionUDF(),
"azurerm_stream_analytics_output_blob": resourceArmStreamAnalyticsOutputBlob(),
"azurerm_stream_analytics_output_mssql": resourceArmStreamAnalyticsOutputSql(),
"azurerm_stream_analytics_output_eventhub": resourceArmStreamAnalyticsOutputEventHub(),
"azurerm_stream_analytics_output_servicebus_queue": resourceArmStreamAnalyticsOutputServiceBusQueue(),
"azurerm_stream_analytics_stream_input_blob": resourceArmStreamAnalyticsStreamInputBlob(),
Expand Down
212 changes: 212 additions & 0 deletions azurerm/resource_arm_stream_analytics_output_mssql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package azurerm

import (
"fmt"
"log"

"github.com/Azure/azure-sdk-for-go/services/streamanalytics/mgmt/2016-03-01/streamanalytics"
"github.com/hashicorp/terraform/helper/schema"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/response"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/tf"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/validate"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/utils"
)

func resourceArmStreamAnalyticsOutputSql() *schema.Resource {
return &schema.Resource{
Create: resourceArmStreamAnalyticsOutputSqlCreateUpdate,
Read: resourceArmStreamAnalyticsOutputSqlRead,
Update: resourceArmStreamAnalyticsOutputSqlCreateUpdate,
Delete: resourceArmStreamAnalyticsOutputSqlDelete,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},

Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"stream_analytics_job_name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"resource_group_name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"server": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"database": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"table": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"user": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"password": {
Type: schema.TypeString,
Required: true,
Sensitive: true,
ValidateFunc: validate.NoEmptyStrings,
},
},
}
}

func resourceArmStreamAnalyticsOutputSqlCreateUpdate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).streamAnalyticsOutputsClient
ctx := meta.(*ArmClient).StopContext

log.Printf("[INFO] Preparing arguments for Azure Stream Analytics SQL Output creation.")
name := d.Get("name").(string)
jobName := d.Get("stream_analytics_job_name").(string)
resourceGroup := d.Get("resource_group_name").(string)

if requireResourcesToBeImported && d.IsNewResource() {
existing, err := client.Get(ctx, resourceGroup, jobName, name)
if err != nil && !utils.ResponseWasNotFound(existing.Response) {
return fmt.Errorf("Error checking for existing Azure Stream Analytics SQL Output %q (Job %q / Resource Group %q): %s", name, jobName, resourceGroup, err)
}

if existing.ID != nil && *existing.ID != "" {
return tf.ImportAsExistsError("azurerm_stream_analytics_output_mssql", *existing.ID)
}
}

server := d.Get("server").(string)
databaseName := d.Get("database").(string)
tableName := d.Get("table").(string)
sqlUser := d.Get("user").(string)
sqlUserPassword := d.Get("password").(string)

props := streamanalytics.Output{
Name: utils.String(name),
OutputProperties: &streamanalytics.OutputProperties{
Datasource: &streamanalytics.AzureSQLDatabaseOutputDataSource{
Type: streamanalytics.TypeMicrosoftSQLServerDatabase,
AzureSQLDatabaseOutputDataSourceProperties: &streamanalytics.AzureSQLDatabaseOutputDataSourceProperties{
Server: utils.String(server),
Database: utils.String(databaseName),
User: utils.String(sqlUser),
Password: utils.String(sqlUserPassword),
Table: utils.String(tableName),
},
},
},
}

if d.IsNewResource() {
if _, err := client.CreateOrReplace(ctx, props, resourceGroup, jobName, name, "", ""); err != nil {
return fmt.Errorf("Error Creating Stream Analytics Output SQL %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}

read, err := client.Get(ctx, resourceGroup, jobName, name)
if err != nil {
return fmt.Errorf("Error retrieving Stream Analytics Output SQL %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}
if read.ID == nil {
return fmt.Errorf("Cannot read ID of Stream Analytics Output SQL %q (Job %q / Resource Group %q)", name, jobName, resourceGroup)
}

d.SetId(*read.ID)
} else {
if _, err := client.Update(ctx, props, resourceGroup, jobName, name, ""); err != nil {
return fmt.Errorf("Error Updating Stream Analytics Output SQL %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}
}

return resourceArmStreamAnalyticsOutputSqlRead(d, meta)
}

func resourceArmStreamAnalyticsOutputSqlRead(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).streamAnalyticsOutputsClient
ctx := meta.(*ArmClient).StopContext

id, err := parseAzureResourceID(d.Id())
if err != nil {
return err
}
resourceGroup := id.ResourceGroup
jobName := id.Path["streamingjobs"]
name := id.Path["outputs"]

resp, err := client.Get(ctx, resourceGroup, jobName, name)
if err != nil {
if utils.ResponseWasNotFound(resp.Response) {
log.Printf("[DEBUG] Output SQL %q was not found in Stream Analytics Job %q / Resource Group %q - removing from state!", name, jobName, resourceGroup)
d.SetId("")
return nil
}

return fmt.Errorf("Error retrieving Stream Output SQL %q (Stream Analytics Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}

d.Set("name", name)
d.Set("resource_group_name", resourceGroup)
d.Set("stream_analytics_job_name", jobName)

if props := resp.OutputProperties; props != nil {
v, ok := props.Datasource.AsAzureSQLDatabaseOutputDataSource()
if !ok {
return fmt.Errorf("Error converting Output Data Source to SQL Output: %+v", err)
}

d.Set("server", v.Server)
d.Set("database", v.Database)
d.Set("table", v.Table)
d.Set("user", v.User)

}

return nil
}

func resourceArmStreamAnalyticsOutputSqlDelete(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).streamAnalyticsOutputsClient
ctx := meta.(*ArmClient).StopContext

id, err := parseAzureResourceID(d.Id())
if err != nil {
return err
}
resourceGroup := id.ResourceGroup
jobName := id.Path["streamingjobs"]
name := id.Path["outputs"]

if resp, err := client.Delete(ctx, resourceGroup, jobName, name); err != nil {
if !response.WasNotFound(resp.Response) {
return fmt.Errorf("Error deleting Output SQL %q (Stream Analytics Job %q / Resource Group %q) %+v", name, jobName, resourceGroup, err)
}
}

return nil
}
Loading

0 comments on commit 975e2e9

Please sign in to comment.