Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Resources: Stream Analytics Inputs/Outputs #3250

Merged
merged 10 commits into from
Apr 16, 2019
3 changes: 3 additions & 0 deletions azurerm/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,9 @@ func Provider() terraform.ResourceProvider {
"azurerm_storage_table": resourceArmStorageTable(),
"azurerm_stream_analytics_job": resourceArmStreamAnalyticsJob(),
"azurerm_stream_analytics_function_javascript_udf": resourceArmStreamAnalyticsFunctionUDF(),
"azurerm_stream_analytics_output_blob": resourceArmStreamAnalyticsOutputBlob(),
"azurerm_stream_analytics_output_eventhub": resourceArmStreamAnalyticsOutputEventHub(),
"azurerm_stream_analytics_output_servicebus_queue": resourceArmStreamAnalyticsOutputServiceBusQueue(),
"azurerm_subnet_network_security_group_association": resourceArmSubnetNetworkSecurityGroupAssociation(),
"azurerm_subnet_route_table_association": resourceArmSubnetRouteTableAssociation(),
"azurerm_subnet": resourceArmSubnet(),
Expand Down
234 changes: 234 additions & 0 deletions azurerm/resource_arm_stream_analytics_output_blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package azurerm

import (
"fmt"
"log"

"github.com/Azure/azure-sdk-for-go/services/streamanalytics/mgmt/2016-03-01/streamanalytics"
"github.com/hashicorp/terraform/helper/schema"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/azure"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/response"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/tf"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/validate"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/utils"
)

func resourceArmStreamAnalyticsOutputBlob() *schema.Resource {
return &schema.Resource{
Create: resourceArmStreamAnalyticsOutputBlobCreateUpdate,
Read: resourceArmStreamAnalyticsOutputBlobRead,
Update: resourceArmStreamAnalyticsOutputBlobCreateUpdate,
Delete: resourceArmStreamAnalyticsOutputBlobDelete,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},

Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"stream_analytics_job_name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"resource_group_name": resourceGroupNameSchema(),

"date_format": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"path_pattern": {
Type: schema.TypeString,
Required: true,
},

"storage_account_key": {
Type: schema.TypeString,
Required: true,
Sensitive: true,
ValidateFunc: validate.NoEmptyStrings,
},

"storage_account_name": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"storage_container_name": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"time_format": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"serialization": azure.SchemaStreamAnalyticsOutputSerialization(),
},
}
}

func resourceArmStreamAnalyticsOutputBlobCreateUpdate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).streamAnalyticsOutputsClient
ctx := meta.(*ArmClient).StopContext

log.Printf("[INFO] preparing arguments for Azure Stream Analytics Output Blob creation.")
name := d.Get("name").(string)
jobName := d.Get("stream_analytics_job_name").(string)
resourceGroup := d.Get("resource_group_name").(string)

if requireResourcesToBeImported && d.IsNewResource() {
existing, err := client.Get(ctx, resourceGroup, jobName, name)
if err != nil {
if !utils.ResponseWasNotFound(existing.Response) {
return fmt.Errorf("Error checking for presence of existing Stream Analytics Output Blob %q (Job %q / Resource Group %q): %s", name, jobName, resourceGroup, err)
}
}

if existing.ID != nil && *existing.ID != "" {
return tf.ImportAsExistsError("azurerm_stream_analytics_output_blob", *existing.ID)
}
}

containerName := d.Get("storage_container_name").(string)
dateFormat := d.Get("date_format").(string)
pathPattern := d.Get("path_pattern").(string)
storageAccountKey := d.Get("storage_account_key").(string)
storageAccountName := d.Get("storage_account_name").(string)
timeFormat := d.Get("time_format").(string)

serializationRaw := d.Get("serialization").([]interface{})
serialization, err := azure.ExpandStreamAnalyticsOutputSerialization(serializationRaw)
if err != nil {
return fmt.Errorf("Error expanding `serialization`: %+v", err)
}

props := streamanalytics.Output{
Name: utils.String(name),
OutputProperties: &streamanalytics.OutputProperties{
Datasource: &streamanalytics.BlobOutputDataSource{
Type: streamanalytics.TypeMicrosoftStorageBlob,
BlobOutputDataSourceProperties: &streamanalytics.BlobOutputDataSourceProperties{
StorageAccounts: &[]streamanalytics.StorageAccount{
{
AccountKey: utils.String(storageAccountKey),
AccountName: utils.String(storageAccountName),
},
},
Container: utils.String(containerName),
DateFormat: utils.String(dateFormat),
PathPattern: utils.String(pathPattern),
TimeFormat: utils.String(timeFormat),
},
},
Serialization: serialization,
},
}

if d.IsNewResource() {
if _, err := client.CreateOrReplace(ctx, props, resourceGroup, jobName, name, "", ""); err != nil {
return fmt.Errorf("Error Creating Stream Analytics Output Blob %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}

read, err := client.Get(ctx, resourceGroup, jobName, name)
if err != nil {
return fmt.Errorf("Error retrieving Stream Analytics Output Blob %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}
if read.ID == nil {
return fmt.Errorf("Cannot read ID of Stream Analytics Output Blob %q (Job %q / Resource Group %q)", name, jobName, resourceGroup)
}

d.SetId(*read.ID)
} else {
if _, err := client.Update(ctx, props, resourceGroup, jobName, name, ""); err != nil {
return fmt.Errorf("Error Updating Stream Analytics Output Blob %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}
}

return resourceArmStreamAnalyticsOutputBlobRead(d, meta)
}

func resourceArmStreamAnalyticsOutputBlobRead(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).streamAnalyticsOutputsClient
ctx := meta.(*ArmClient).StopContext

id, err := parseAzureResourceID(d.Id())
if err != nil {
return err
}
resourceGroup := id.ResourceGroup
jobName := id.Path["streamingjobs"]
name := id.Path["outputs"]

resp, err := client.Get(ctx, resourceGroup, jobName, name)
if err != nil {
if utils.ResponseWasNotFound(resp.Response) {
log.Printf("[DEBUG] Output Blob %q was not found in Stream Analytics Job %q / Resource Group %q - removing from state!", name, jobName, resourceGroup)
d.SetId("")
return nil
}

return fmt.Errorf("Error retrieving Stream Output EventHub %q (Stream Analytics Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}

d.Set("name", name)
d.Set("resource_group_name", resourceGroup)
d.Set("stream_analytics_job_name", jobName)

if props := resp.OutputProperties; props != nil {
v, ok := props.Datasource.AsBlobOutputDataSource()
if !ok {
return fmt.Errorf("Error converting Output Data Source to a Blob Output: %+v", err)
}

d.Set("date_format", v.DateFormat)
d.Set("path_pattern", v.PathPattern)
d.Set("storage_container_name", v.Container)
d.Set("time_format", v.TimeFormat)

if accounts := v.StorageAccounts; accounts != nil && len(*accounts) > 0 {
account := (*accounts)[0]
d.Set("storage_account_name", account.AccountName)
}

if err := d.Set("serialization", azure.FlattenStreamAnalyticsOutputSerialization(props.Serialization)); err != nil {
return fmt.Errorf("Error setting `serialization`: %+v", err)
}
}

return nil
}

func resourceArmStreamAnalyticsOutputBlobDelete(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).streamAnalyticsOutputsClient
ctx := meta.(*ArmClient).StopContext

id, err := parseAzureResourceID(d.Id())
if err != nil {
return err
}
resourceGroup := id.ResourceGroup
jobName := id.Path["streamingjobs"]
name := id.Path["outputs"]

if resp, err := client.Delete(ctx, resourceGroup, jobName, name); err != nil {
if !response.WasNotFound(resp.Response) {
return fmt.Errorf("Error deleting Output Blob %q (Stream Analytics Job %q / Resource Group %q) %+v", name, jobName, resourceGroup, err)
}
}

return nil
}
Loading