Skip to content

Commit

Permalink
[ML][Pipelines]Support parallel_for output mapping (#27778)
Browse files Browse the repository at this point in the history
* add type mapping

* fix pipeline output

* fix local failed tests

* add ut

* add e2e test

* fix tests

* fix comment

* fix test
  • Loading branch information
D-W- authored Dec 5, 2022
1 parent 52332fd commit 7cfcd65
Show file tree
Hide file tree
Showing 11 changed files with 2,229 additions and 317 deletions.
45 changes: 40 additions & 5 deletions sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/parallel_for.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
from azure.ai.ml._schema import PathAwareSchema
from azure.ai.ml._schema.pipeline.control_flow_job import ParallelForSchema
from azure.ai.ml._utils.utils import is_data_binding_expression
from azure.ai.ml.constants._component import ControlFlowType
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.constants._component import ControlFlowType, ComponentParameterTypes
from azure.ai.ml.entities import Component
from azure.ai.ml.entities._builders import BaseNode
from azure.ai.ml.entities._builders.control_flow_node import LoopNode
from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput
from azure.ai.ml.entities._job.pipeline._io.mixin import NodeIOMixin
from azure.ai.ml.entities._util import validate_attribute_type
from azure.ai.ml.exceptions import UserErrorException


class ParallelFor(LoopNode, NodeIOMixin):
Expand All @@ -32,6 +34,16 @@ class ParallelFor(LoopNode, NodeIOMixin):
:type max_concurrency: int
"""

OUT_TYPE_MAPPING = {
AssetTypes.URI_FILE: AssetTypes.MLTABLE,
AssetTypes.URI_FOLDER: AssetTypes.MLTABLE,
AssetTypes.MLTABLE: AssetTypes.MLTABLE,
ComponentParameterTypes.NUMBER: ComponentParameterTypes.STRING,
ComponentParameterTypes.STRING: ComponentParameterTypes.STRING,
ComponentParameterTypes.BOOLEAN: ComponentParameterTypes.STRING,
ComponentParameterTypes.INTEGER: ComponentParameterTypes.STRING,
}

def __init__(
self,
*,
Expand All @@ -55,11 +67,12 @@ def __init__(
# parallel for node shares output meta with body
try:
outputs = self.body._component.outputs
# transform body outputs to aggregate types when available
self._outputs = self._build_outputs_dict(output_definition_dict=self._convert_output_meta(outputs),
outputs={})
except AttributeError:
outputs = {}

# TODO: handle when body don't have component or component.outputs
self._outputs = self._build_outputs_dict_without_meta(outputs, none_data=True)
# when body output not available, create default output builder without meta
self._outputs = self._build_outputs_dict_without_meta(outputs={}, none_data=True)

self._items = items
self._validate_items(raise_error=True)
Expand Down Expand Up @@ -108,6 +121,28 @@ def _create_instance_from_schema_dict(cls, pipeline_jobs, loaded_data):
**loaded_data
)

def _convert_output_meta(self, outputs):
"""Convert output meta to aggregate types."""
# pylint: disable=protected-access
aggregate_outputs = {}
for name, output in outputs.items():
if output.type in self.OUT_TYPE_MAPPING:
new_type = self.OUT_TYPE_MAPPING[output.type]
else:
raise UserErrorException(
"Referencing output with type {} is not supported in parallel_for node.".format(output.type)
)
if isinstance(output, NodeOutput):
output = output._to_job_output()
if isinstance(output, Output):
out_dict = output._to_dict()
out_dict["type"] = new_type
resolved_output = Output(**out_dict)
else:
resolved_output = Output(type=new_type)
aggregate_outputs[name] = resolved_output
return aggregate_outputs

def _validate_items(self, raise_error=True):
validation_result = self._create_empty_validation_result()
if self.items is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,11 @@ def _to_job_output(self):
if isinstance(self._data, Output):
# For pipeline output with type Output, always pass to backend.
return self._data
if self._data is None and self._meta and self._meta.type:
# For un-configured pipeline output with meta, we need to return Output with accurate type,
# so it won't default to uri_folder.
return Output(type=self._meta.type)

return super(PipelineOutput, self)._to_job_output()

def _data_binding(self):
Expand Down
1 change: 1 addition & 0 deletions sdk/ml/azure-ai-ml/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ def enable_pipeline_private_preview_features(mocker: MockFixture):
mocker.patch("azure.ai.ml.dsl._pipeline_component_builder.is_private_preview_enabled", return_value=True)
mocker.patch("azure.ai.ml._schema.pipeline.pipeline_component.is_private_preview_enabled", return_value=True)
mocker.patch("azure.ai.ml.entities._schedule.schedule.is_private_preview_enabled", return_value=True)
mocker.patch("azure.ai.ml.dsl._pipeline_decorator.is_private_preview_enabled", return_value=True)


@pytest.fixture()
Expand Down
122 changes: 121 additions & 1 deletion sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_controlflow_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@


@pytest.mark.usefixtures(
"enable_private_preview_schema_features",
"enable_environment_id_arm_expansion",
"enable_pipeline_private_preview_features",
"mock_code_hash",
Expand Down Expand Up @@ -335,7 +336,6 @@ def parallel_for_pipeline():

with include_private_preview_nodes_in_pipeline():
pipeline_job = assert_job_cancel(pipeline_job, client)

dsl_pipeline_job_dict = omit_with_wildcard(pipeline_job._to_rest_object().as_dict(), *omit_fields)
assert dsl_pipeline_job_dict["properties"]["jobs"] == {
'after_node': {
Expand Down Expand Up @@ -460,3 +460,123 @@ def parallel_for_pipeline():
'{"component_in_number": 2}]',
'type': 'parallel_for'}
}

def test_parallel_for_pipeline_with_port_outputs(self, client: MLClient):
hello_world_component = load_component(
source="./tests/test_configs/components/helloworld_component.yml",
params_override=[
{"outputs": {
"component_out_path": {"type": "uri_folder"},
"component_out_file": {"type": "uri_file"},
"component_out_table": {"type": "mltable"},
}}
]
)

@pipeline
def parallel_for_pipeline():
parallel_body = hello_world_component(component_in_path=test_input)
parallel_node = parallel_for(
body=parallel_body,
items=[
{"component_in_number": 1},
{"component_in_number": 2},
]
)
return {
"component_out_path": parallel_node.outputs.component_out_path,
"component_out_file": parallel_node.outputs.component_out_file,
"component_out_table": parallel_node.outputs.component_out_table,
}

pipeline_job = parallel_for_pipeline()
pipeline_job.settings.default_compute = "cpu-cluster"

with include_private_preview_nodes_in_pipeline():
pipeline_job = assert_job_cancel(pipeline_job, client)

dsl_pipeline_job_dict = omit_with_wildcard(pipeline_job._to_rest_object().as_dict(), *omit_fields)
assert dsl_pipeline_job_dict["properties"]["jobs"] == {
'parallel_body': {'_source': 'REMOTE.WORKSPACE.COMPONENT',
'inputs': {'component_in_path': {
'job_input_type': 'uri_file',
'uri': 'https://dprepdata.blob.core.windows.net/demo/Titanic.csv'}},
'name': 'parallel_body',
'type': 'command'},
'parallel_node': {'body': '${{parent.jobs.parallel_body}}',
'items': '[{"component_in_number": 1}, '
'{"component_in_number": 2}]',
'type': 'parallel_for'}
}
assert dsl_pipeline_job_dict["properties"]["outputs"] == {
'component_out_file': {'job_output_type': 'mltable',
'mode': 'ReadWriteMount'},
'component_out_path': {'job_output_type': 'mltable',
'mode': 'ReadWriteMount'},
'component_out_table': {'job_output_type': 'mltable',
'mode': 'ReadWriteMount'}
}

# parallel for pipeline component is correctly generated
@pipeline
def parent_pipeline():
parallel_for_pipeline()

pipeline_job = parent_pipeline()
pipeline_job.settings.default_compute = "cpu-cluster"

rest_pipeline_component = pipeline_job.jobs["parallel_for_pipeline"].component._to_rest_object().as_dict()
assert rest_pipeline_component["properties"]["component_spec"]["outputs"] == {
'component_out_file': {'type': 'mltable'},
'component_out_path': {'type': 'mltable'},
'component_out_table': {'type': 'mltable'}
}

with include_private_preview_nodes_in_pipeline():
assert_job_cancel(pipeline_job, client)

def test_parallel_for_pipeline_with_primitive_outputs(self, client: MLClient):
hello_world_component = load_component(
source="./tests/test_configs/components/helloworld_component.yml",
params_override=[
{"outputs": {
"component_out_path": {"type": "uri_folder"},
"component_out_number": {"type": "number"},
"component_out_boolean": {"type": "boolean", "is_control": True},
}}
]
)

@pipeline
def parallel_for_pipeline():
parallel_body = hello_world_component(component_in_path=test_input)
parallel_node = parallel_for(
body=parallel_body,
items=[
{"component_in_number": 1},
{"component_in_number": 2},
]
)
return {
"component_out_path": parallel_node.outputs.component_out_path,
"component_out_number": parallel_node.outputs.component_out_number,
"component_out_boolean": parallel_node.outputs.component_out_boolean,
}

@pipeline
def parent_pipeline():
parallel_for_pipeline()

pipeline_job = parent_pipeline()
pipeline_job.settings.default_compute = "cpu-cluster"

rest_pipeline_component = pipeline_job.jobs["parallel_for_pipeline"].component._to_rest_object().as_dict()
assert rest_pipeline_component["properties"]["component_spec"]["outputs"] == {
'component_out_boolean': {'is_control': True, 'type': 'string'},
'component_out_number': {'type': 'string'},
'component_out_path': {'type': 'mltable'}
}

# parallel for pipeline component is correctly generated
with include_private_preview_nodes_in_pipeline():
assert_job_cancel(pipeline_job, client)
2 changes: 1 addition & 1 deletion sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def mixed_pipeline(job_in_number, job_in_path):
"outputs": {
"pipeline_job_out": {
"mode": "ReadWriteMount",
"job_output_type": "uri_folder",
"job_output_type": "mlflow_model",
}
},
"settings": {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.dsl._parallel_for import parallel_for
from azure.ai.ml.entities import Command
from azure.ai.ml.exceptions import ValidationException
from azure.ai.ml.exceptions import ValidationException, UserErrorException
from .._util import _DSL_TIMEOUT_SECOND, include_private_preview_nodes_in_pipeline


@pytest.mark.usefixtures(
"enable_pipeline_private_preview_features",
"enable_private_preview_schema_features"
)
@pytest.mark.timeout(_DSL_TIMEOUT_SECOND)
@pytest.mark.unittest
Expand Down Expand Up @@ -234,3 +235,76 @@ def my_pipeline():
rest_job = my_job._to_rest_object().as_dict()
rest_items = rest_job["properties"]["jobs"]["parallelfor"]["items"]
assert rest_items == '[{"component_in_number": 1}, {"component_in_number": 2}]'

@pytest.mark.parametrize(
"output_dict, pipeline_out_dict, component_out_dict, check_pipeline_job",
[
({"type": "uri_file"}, {'job_output_type': 'mltable'}, {'type': 'mltable'}, True),
({"type": "uri_folder"}, {'job_output_type': 'mltable'}, {'type': 'mltable'}, True),
({"type": "mltable"}, {'job_output_type': 'mltable'}, {'type': 'mltable'}, True),
({"type": "number"}, {}, {'type': 'string'}, False),
({"type": "string", "is_control": True}, {}, {'type': 'string', "is_control": True}, False),
({"type": "boolean", "is_control": True}, {}, {'type': 'string', "is_control": True}, False),
({"type": "integer"}, {}, {'type': 'string'}, False),
]
)
def test_parallel_for_outputs(self, output_dict, pipeline_out_dict, component_out_dict, check_pipeline_job):
basic_component = load_component(
source="./tests/test_configs/components/helloworld_component.yml",
params_override=[
{"outputs.component_out_path": output_dict}
]
)

@pipeline
def my_pipeline():
body = basic_component(component_in_path=Input(path="test_path1"))

foreach_node = parallel_for(
body=body,
items={
"iter1": {"component_in_number": 1},
"iter2": {"component_in_number": 2}
}
)
return {
"output": foreach_node.outputs.component_out_path
}

my_job = my_pipeline()

if check_pipeline_job:
rest_job = my_job._to_rest_object().as_dict()
rest_outputs = rest_job["properties"]["outputs"]
assert rest_outputs == {'output': pipeline_out_dict}

pipeline_component = my_job.component
rest_component = pipeline_component._to_rest_object().as_dict()
assert rest_component["properties"]["component_spec"]["outputs"] == {'output': component_out_dict}

@pytest.mark.parametrize(
"out_type", ["mlflow_model", "triton_model", "custom_model"]
)
def test_parallel_for_output_unsupported_case(self, out_type):
basic_component = load_component(
source="./tests/test_configs/components/helloworld_component.yml",
params_override=[
{"outputs.component_out_path": {"type": out_type}}
]
)

@pipeline
def my_pipeline():
body = basic_component(component_in_path=Input(path="test_path1"))

parallel_for(
body=body,
items={
"iter1": {"component_in_number": 1},
"iter2": {"component_in_number": 2}
}
)

with pytest.raises(UserErrorException) as e:
my_pipeline()
assert f"Referencing output with type {out_type} is not supported" in str(e.value)
Loading

0 comments on commit 7cfcd65

Please sign in to comment.