Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

azurerm_data_factory_pipeline: Support for activities #6224

Merged
merged 9 commits into from
Jun 2, 2020
37 changes: 37 additions & 0 deletions azurerm/internal/services/datafactory/data_factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datafactory

import (
"encoding/json"
"fmt"
"log"
"regexp"
Expand All @@ -9,6 +10,7 @@ import (

"github.com/Azure/azure-sdk-for-go/services/datafactory/mgmt/2018-06-01/datafactory"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/azure"
)

func validateAzureRMDataFactoryLinkedServiceDatasetName(v interface{}, k string) (warnings []string, errors []error) {
Expand Down Expand Up @@ -190,3 +192,38 @@ func flattenDataFactoryStructureColumns(input interface{}) []interface{} {
}
return output
}

func deserializeDataFactoryPipelineActivities(jsonData string) (*[]datafactory.BasicActivity, error) {
jsonData = fmt.Sprintf(`{ "activities": %s }`, jsonData)
pipeline := &datafactory.Pipeline{}
err := pipeline.UnmarshalJSON([]byte(jsonData))
if err != nil {
return nil, err
}
return pipeline.Activities, nil
}

func serializeDataFactoryPipelineActivities(activities *[]datafactory.BasicActivity) (string, error) {
pipeline := &datafactory.Pipeline{Activities: activities}
result, err := pipeline.MarshalJSON()
if err != nil {
return "nil", err
}

var m map[string]*json.RawMessage
err = json.Unmarshal(result, &m)
if err != nil {
return "", err
}

activitiesJson, err := json.Marshal(m["activities"])
if err != nil {
return "", err
}

return string(activitiesJson), nil
}

func suppressJsonOrderingDifference(_, old, new string, _ *schema.ResourceData) bool {
return azure.NormalizeJson(old) == azure.NormalizeJson(new)
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func resourceArmDataFactoryPipeline() *schema.Resource {
Optional: true,
},

"activities_json": {
Type: schema.TypeString,
Optional: true,
StateFunc: azure.NormalizeJson,
DiffSuppressFunc: suppressJsonOrderingDifference,
},

"annotations": {
Type: schema.TypeList,
Optional: true,
Expand All @@ -98,7 +105,7 @@ func resourceArmDataFactoryPipelineCreateUpdate(d *schema.ResourceData, meta int
existing, err := client.Get(ctx, resourceGroupName, dataFactoryName, name, "")
if err != nil {
if !utils.ResponseWasNotFound(existing.Response) {
return fmt.Errorf("Error checking for presence of existing Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %s", name, resourceGroupName, dataFactoryName, err)
return fmt.Errorf("checking for presence of existing Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %s", name, resourceGroupName, dataFactoryName, err)
}
}

Expand All @@ -114,6 +121,14 @@ func resourceArmDataFactoryPipelineCreateUpdate(d *schema.ResourceData, meta int
Description: &description,
}

if v, ok := d.GetOk("activities_json"); ok {
activities, err := deserializeDataFactoryPipelineActivities(v.(string))
if err != nil {
return fmt.Errorf("parsing 'activities_json' for Data Factory Pipeline %q (Resource Group %q / Data Factory %q) ID: %+v", name, resourceGroupName, dataFactoryName, err)
}
pipeline.Activities = activities
}

if v, ok := d.GetOk("annotations"); ok {
annotations := v.([]interface{})
pipeline.Annotations = &annotations
Expand All @@ -127,16 +142,16 @@ func resourceArmDataFactoryPipelineCreateUpdate(d *schema.ResourceData, meta int
}

if _, err := client.CreateOrUpdate(ctx, resourceGroupName, dataFactoryName, name, config, ""); err != nil {
return fmt.Errorf("Error creating Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err)
return fmt.Errorf("creating Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err)
}

read, err := client.Get(ctx, resourceGroupName, dataFactoryName, name, "")
if err != nil {
return fmt.Errorf("Error retrieving Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err)
return fmt.Errorf("retrieving Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err)
}

if read.ID == nil {
return fmt.Errorf("Cannot read Data Factory Pipeline %q (Resource Group %q / Data Factory %q) ID", name, resourceGroupName, dataFactoryName)
return fmt.Errorf("cannot read Data Factory Pipeline %q (Resource Group %q / Data Factory %q) ID", name, resourceGroupName, dataFactoryName)
}

d.SetId(*read.ID)
Expand All @@ -163,7 +178,7 @@ func resourceArmDataFactoryPipelineRead(d *schema.ResourceData, meta interface{}
log.Printf("[DEBUG] Data Factory Pipeline %q was not found in Resource Group %q - removing from state!", name, id.ResourceGroup)
return nil
}
return fmt.Errorf("Error reading the state of Data Factory Pipeline %q: %+v", name, err)
return fmt.Errorf("reading the state of Data Factory Pipeline %q: %+v", name, err)
}

d.Set("name", resp.Name)
Expand All @@ -175,17 +190,27 @@ func resourceArmDataFactoryPipelineRead(d *schema.ResourceData, meta interface{}

parameters := flattenDataFactoryParameters(props.Parameters)
if err := d.Set("parameters", parameters); err != nil {
return fmt.Errorf("Error setting `parameters`: %+v", err)
return fmt.Errorf("setting `parameters`: %+v", err)
}

annotations := flattenDataFactoryAnnotations(props.Annotations)
if err := d.Set("annotations", annotations); err != nil {
return fmt.Errorf("Error setting `annotations`: %+v", err)
return fmt.Errorf("setting `annotations`: %+v", err)
}

variables := flattenDataFactoryVariables(props.Variables)
if err := d.Set("variables", variables); err != nil {
return fmt.Errorf("Error setting `variables`: %+v", err)
return fmt.Errorf("setting `variables`: %+v", err)
}

if activities := props.Activities; activities != nil {
activitiesJson, err := serializeDataFactoryPipelineActivities(activities)
if err != nil {
return fmt.Errorf("serializing `activities_json`: %+v", err)
}
if err := d.Set("activities_json", activitiesJson); err != nil {
return fmt.Errorf("setting `activities_json`: %+v", err)
}
}
}

Expand All @@ -206,7 +231,7 @@ func resourceArmDataFactoryPipelineDelete(d *schema.ResourceData, meta interface
resourceGroupName := id.ResourceGroup

if _, err = client.Delete(ctx, resourceGroupName, dataFactoryName, name); err != nil {
return fmt.Errorf("Error deleting Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err)
return fmt.Errorf("deleting Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err)
}

return nil
Expand Down
165 changes: 165 additions & 0 deletions azurerm/internal/services/datafactory/data_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,168 @@ func TestAzureRmDataFactoryLinkedServiceConnectionStringDiff(t *testing.T) {
}
}
}

func TestAzureRmDataFactoryDeserializePipelineActivities(t *testing.T) {
cases := []struct {
Json string
ExpectActivityCount int
ExpectErr bool
}{
{
Json: "{}",
ExpectActivityCount: 0,
ExpectErr: true,
},
{
Json: `[
{
"type": "ForEach",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.OutputBlobNameList",
"type": "Expression"
},
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
},
"dataIntegrationUnits": 32
},
"inputs": [
{
"referenceName": "exampleDataset",
"parameters": {
"MyFolderPath": "examplecontainer",
"MyFileName": "examplecontainer.csv"
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "exampleDataset",
"parameters": {
"MyFolderPath": "examplecontainer",
"MyFileName": {
"value": "@item()",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"name": "ExampleCopyActivity"
}
]
},
"name": "ExampleForeachActivity"
}
]`,
ExpectActivityCount: 1,
ExpectErr: false,
},
}

for _, tc := range cases {
items, err := deserializeDataFactoryPipelineActivities(tc.Json)
if err != nil {
if tc.ExpectErr {
t.Log("Expected error and got error")
return
}

t.Fatal(err)
}

if items == nil && !tc.ExpectErr {
t.Fatal("Expected items got nil")
}

if len(*items) != tc.ExpectActivityCount {
t.Fatal("Failed to deserialise pipeline")
}
}
}

func TestNormalizeJSON(t *testing.T) {
cases := []struct {
Old string
New string
Suppress bool
}{
{
Old: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "bob",
"value": "something"
}
}
]`,
New: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"value": "something",
"variableName": "bob"
}
}
]`,
Suppress: true,
},
{
Old: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "bobdifferent",
"value": "something"
}
}
]`,
New: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"value": "something",
"variableName": "bob"
}
}
]`,
Suppress: false,
},
{
Old: `{ "notbob": "notbill" }`,
New: `{ "bob": "bill" }`,
Suppress: false,
},
}

for _, tc := range cases {
suppress := suppressJsonOrderingDifference("test", tc.Old, tc.New, nil)

if suppress != tc.Suppress {
t.Fatalf("Expected JsonOrderingDifference to be '%t' for '%s' '%s' - got '%t'", tc.Suppress, tc.Old, tc.New, suppress)
}
}
}
Loading