Skip to content

Commit

Permalink
New Resource: azurerm_stream_analytics_stream_input_blob
Browse files Browse the repository at this point in the history
  • Loading branch information
tombuildsstuff committed Apr 11, 2019
1 parent c4e578f commit eb9170e
Show file tree
Hide file tree
Showing 5 changed files with 750 additions and 0 deletions.
1 change: 1 addition & 0 deletions azurerm/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func Provider() terraform.ResourceProvider {
"azurerm_storage_share": resourceArmStorageShare(),
"azurerm_storage_table": resourceArmStorageTable(),
"azurerm_stream_analytics_job": resourceArmStreamAnalyticsJob(),
"azurerm_stream_analytics_stream_input_blob": resourceArmStreamAnalyticsStreamInputBlob(),
"azurerm_stream_analytics_stream_input_eventhub": resourceArmStreamAnalyticsStreamInputEventHub(),
"azurerm_stream_analytics_stream_input_iothub": resourceArmStreamAnalyticsStreamInputIoTHub(),
"azurerm_subnet_network_security_group_association": resourceArmSubnetNetworkSecurityGroupAssociation(),
Expand Down
243 changes: 243 additions & 0 deletions azurerm/resource_arm_stream_analytics_stream_input_blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package azurerm

import (
"fmt"
"log"

"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/response"

"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/azure"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/validate"

"github.com/Azure/azure-sdk-for-go/services/streamanalytics/mgmt/2016-03-01/streamanalytics"

"github.com/hashicorp/terraform/helper/schema"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/tf"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/utils"
)

func resourceArmStreamAnalyticsStreamInputBlob() *schema.Resource {
return &schema.Resource{
Create: resourceArmStreamAnalyticsStreamInputBlobCreateUpdate,
Read: resourceArmStreamAnalyticsStreamInputBlobRead,
Update: resourceArmStreamAnalyticsStreamInputBlobCreateUpdate,
Delete: resourceArmStreamAnalyticsStreamInputBlobDelete,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},

Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"stream_analytics_job_name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"resource_group_name": resourceGroupNameSchema(),

"date_format": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"path_pattern": {
Type: schema.TypeString,
Required: true,
},

"storage_account_key": {
Type: schema.TypeString,
Required: true,
Sensitive: true,
ValidateFunc: validate.NoEmptyStrings,
},

"storage_account_name": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"storage_container_name": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"time_format": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"serialization": azure.SchemaStreamAnalyticsStreamInputSerialization(),
},
}
}

func resourceArmStreamAnalyticsStreamInputBlobCreateUpdate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).streamAnalyticsInputsClient
ctx := meta.(*ArmClient).StopContext

log.Printf("[INFO] preparing arguments for Azure Stream Analytics Stream Input Blob creation.")
name := d.Get("name").(string)
jobName := d.Get("stream_analytics_job_name").(string)
resourceGroup := d.Get("resource_group_name").(string)

if requireResourcesToBeImported && d.IsNewResource() {
existing, err := client.Get(ctx, resourceGroup, jobName, name)
if err != nil {
if !utils.ResponseWasNotFound(existing.Response) {
return fmt.Errorf("Error checking for presence of existing Stream Analytics Stream Input %q (Job %q / Resource Group %q): %s", name, jobName, resourceGroup, err)
}
}

if existing.ID != nil && *existing.ID != "" {
return tf.ImportAsExistsError("azurerm_stream_analytics_stream_input_blob", *existing.ID)
}
}

containerName := d.Get("storage_container_name").(string)
dateFormat := d.Get("date_format").(string)
pathPattern := d.Get("path_pattern").(string)
storageAccountKey := d.Get("storage_account_key").(string)
storageAccountName := d.Get("storage_account_name").(string)
timeFormat := d.Get("time_format").(string)

serializationRaw := d.Get("serialization").([]interface{})
serialization, err := azure.ExpandStreamAnalyticsStreamInputSerialization(serializationRaw)
if err != nil {
return fmt.Errorf("Error expanding `serialization`: %+v", err)
}

props := streamanalytics.Input{
Name: utils.String(name),
Properties: &streamanalytics.StreamInputProperties{
Type: streamanalytics.TypeStream,
Datasource: &streamanalytics.BlobStreamInputDataSource{
Type: streamanalytics.TypeBasicStreamInputDataSourceTypeMicrosoftStorageBlob,
BlobStreamInputDataSourceProperties: &streamanalytics.BlobStreamInputDataSourceProperties{
Container: utils.String(containerName),
DateFormat: utils.String(dateFormat),
PathPattern: utils.String(pathPattern),
TimeFormat: utils.String(timeFormat),
StorageAccounts: &[]streamanalytics.StorageAccount{
{
AccountName: utils.String(storageAccountName),
AccountKey: utils.String(storageAccountKey),
},
},
},
},
Serialization: serialization,
},
}

if d.IsNewResource() {
if _, err := client.CreateOrReplace(ctx, props, resourceGroup, jobName, name, "", ""); err != nil {
return fmt.Errorf("Error Creating Stream Analytics Stream Input Blob %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}

read, err := client.Get(ctx, resourceGroup, jobName, name)
if err != nil {
return fmt.Errorf("Error retrieving Stream Analytics Stream Input Blob %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}
if read.ID == nil {
return fmt.Errorf("Cannot read ID of Stream Analytics Stream Input Blob %q (Job %q / Resource Group %q)", name, jobName, resourceGroup)
}

d.SetId(*read.ID)
} else {
if _, err := client.Update(ctx, props, resourceGroup, jobName, name, ""); err != nil {
return fmt.Errorf("Error Updating Stream Analytics Stream Input Blob %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}
}

return resourceArmStreamAnalyticsStreamInputBlobRead(d, meta)
}

func resourceArmStreamAnalyticsStreamInputBlobRead(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).streamAnalyticsInputsClient
ctx := meta.(*ArmClient).StopContext

id, err := parseAzureResourceID(d.Id())
if err != nil {
return err
}
resourceGroup := id.ResourceGroup
jobName := id.Path["streamingjobs"]
name := id.Path["inputs"]

resp, err := client.Get(ctx, resourceGroup, jobName, name)
if err != nil {
if utils.ResponseWasNotFound(resp.Response) {
log.Printf("[DEBUG] Stream Input Blob %q was not found in Stream Analytics Job %q / Resource Group %q - removing from state!", name, jobName, resourceGroup)
d.SetId("")
return nil
}

return fmt.Errorf("Error retrieving Stream Input Blob %q (Stream Analytics Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}

d.Set("name", name)
d.Set("resource_group_name", resourceGroup)
d.Set("stream_analytics_job_name", jobName)

if props := resp.Properties; props != nil {
v, ok := props.AsStreamInputProperties()
if !ok {
return fmt.Errorf("Error converting Stream Input Blob to an Stream Input: %+v", err)
}

eventHub, ok := v.Datasource.AsBlobStreamInputDataSource()
if !ok {
return fmt.Errorf("Error converting Stream Input Blob to an Blob Stream Input: %+v", err)
}

d.Set("date_format", eventHub.DateFormat)
d.Set("path_pattern", eventHub.PathPattern)
d.Set("storage_container_name", eventHub.Container)
d.Set("time_format", eventHub.TimeFormat)

if accounts := eventHub.StorageAccounts; accounts != nil && len(*accounts) > 0 {
account := (*accounts)[0]
d.Set("storage_account_name", account.AccountName)
}

if err := d.Set("serialization", azure.FlattenStreamAnalyticsStreamInputSerialization(v.Serialization)); err != nil {
return fmt.Errorf("Error setting `serialization`: %+v", err)
}
}

return nil
}

func resourceArmStreamAnalyticsStreamInputBlobDelete(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).streamAnalyticsInputsClient
ctx := meta.(*ArmClient).StopContext

id, err := parseAzureResourceID(d.Id())
if err != nil {
return err
}
resourceGroup := id.ResourceGroup
jobName := id.Path["streamingjobs"]
name := id.Path["inputs"]

if resp, err := client.Delete(ctx, resourceGroup, jobName, name); err != nil {
if !response.WasNotFound(resp.Response) {
return fmt.Errorf("Error deleting Stream Input Blob %q (Stream Analytics Job %q / Resource Group %q) %+v", name, jobName, resourceGroup, err)
}
}

return nil
}
Loading

0 comments on commit eb9170e

Please sign in to comment.