Skip to content

Commit

Permalink
[ML][Pipelines] Validate pipeline node IO name on reserved word (Azur…
Browse files Browse the repository at this point in the history
…e#28770)

* validate keyword in IO of node(s) in pipeline

* add test

* move io name validation to builder and log warning

* fix warning error

* update warning message

* update warning message
  • Loading branch information
zhengfeiwang authored Feb 13, 2023
1 parent c2264a1 commit e94435e
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 1 deletion.
41 changes: 41 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/constants/_job/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,44 @@ class PipelineConstants:

class ValidationErrorCode:
PARAMETER_TYPE_UNKNOWN = "ParameterTypeUnknown"


# Methods in Python dictionary, when used as IO name, will actually get function rather than IO object,
# resulting in validation error.
# So print warning message on this and suggest user to access with syntax "d[key]" instead of "d.key".
# Reference: builtins.py::dict
COMPONENT_IO_KEYWORDS = {
"clear",
"copy",
"fromkeys",
"get",
"items",
"keys",
"pop",
"popitem",
"setdefault",
"update",
"values",
"__class_getitem__",
"__contains__",
"__delitem__",
"__eq__",
"__getattribute__",
"__getitem__",
"__ge__",
"__init__",
"__ior__",
"__iter__",
"__len__",
"__le__",
"__lt__",
"__new__",
"__ne__",
"__or__",
"__repr__",
"__reversed__",
"__ror__",
"__setitem__",
"__sizeof__",
"__hash__",
}
25 changes: 25 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# pylint: disable=protected-access
import copy
import inspect
import logging
import typing
from collections import OrderedDict
from inspect import Parameter, signature
Expand All @@ -18,6 +19,7 @@
)
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.constants._component import ComponentSource, IOConstants
from azure.ai.ml.constants._job.pipeline import COMPONENT_IO_KEYWORDS
from azure.ai.ml.dsl._utils import _sanitize_python_variable_name
from azure.ai.ml.entities import PipelineJob
from azure.ai.ml.entities._builders import BaseNode
Expand All @@ -26,6 +28,7 @@
from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output, _get_param_with_standard_annotation
from azure.ai.ml.entities._inputs_outputs.utils import _get_annotation_by_value, is_group
from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
from azure.ai.ml.entities._job.pipeline._attr_dict import has_attr_safe
from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput, PipelineOutput, _GroupAttrDict

# We need to limit the depth of pipeline to avoid the built graph goes too deep and prevent potential
Expand All @@ -34,6 +37,8 @@

_BUILDER_STACK_MAX_DEPTH = 100

module_logger = logging.getLogger(__name__)


class _PipelineComponentBuilderStack:
def __init__(self):
Expand Down Expand Up @@ -390,6 +395,9 @@ def _get_name_or_component_name(node: Union[BaseNode, AutoMLJob]):
final_name = id_name_dict[_id]
node.name = final_name
result[final_name] = node

# Validate IO name of node with correct node name, and log warning if there is keyword.
self._validate_keyword_in_node_io(node)
return result

def _update_inputs(self, pipeline_inputs):
Expand Down Expand Up @@ -468,6 +476,23 @@ def _validate_inferred_outputs(self, output_meta_dict: dict, output_dict: dict):
if unmatched_outputs:
raise UserErrorException(f"{error_prefix}: {unmatched_outputs}")

@staticmethod
def _validate_keyword_in_node_io(node: Union[BaseNode, AutoMLJob]):
if has_attr_safe(node, "inputs"):
for input_name in set(node.inputs) & COMPONENT_IO_KEYWORDS:
module_logger.warning(
"Reserved word \"%s\" is used as input name in node \"%s\", "
"can only be accessed with '%s.inputs[\"%s\"]'",
input_name, node.name, node.name, input_name
)
if has_attr_safe(node, "outputs"):
for output_name in set(node.outputs) & COMPONENT_IO_KEYWORDS:
module_logger.warning(
"Reserved word \"%s\" is used as output name in node \"%s\", "
"can only be accessed with '%s.outputs[\"%s\"]'",
output_name, node.name, node.name, output_name
)


def _build_pipeline_parameter(func, *, user_provided_kwargs, group_default_kwargs=None, non_pipeline_inputs=None):
# Pass group defaults into kwargs to support group.item can be used even if no default on function.
Expand Down
25 changes: 24 additions & 1 deletion sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
from io import StringIO
from pathlib import Path
Expand Down Expand Up @@ -2803,4 +2804,26 @@ def register_node_output():
pipeline.settings.default_compute = "azureml:cpu-cluster"
with pytest.raises(UserErrorException) as e:
assert_job_cancel(pipeline, client)
assert 'The output name @ can only contain alphanumeric characters, dashes and underscores, with a limit of 255 characters.' in str(e.value)
assert 'The output name @ can only contain alphanumeric characters, dashes and underscores, with a limit of 255 characters.' in str(e.value)

def test_validate_pipeline_node_io_name_has_keyword(self, caplog):
# Refresh logger for pytest to capture log, otherwise the result is empty.
from azure.ai.ml.dsl import _pipeline_component_builder

_pipeline_component_builder.module_logger = logging.getLogger(__file__)
with caplog.at_level(logging.WARNING):
from test_configs.dsl_pipeline.pipeline_with_keyword_in_node_io.pipeline import pipeline_job

# validation should pass
assert pipeline_job._customized_validate().passed

warning_template = (
"Reserved word \"{io_name}\" is used as {io} name in node \"{node_name}\", "
"can only be accessed with '{node_name}.{io}s[\"{io_name}\"]'"
)
assert caplog.messages == [
warning_template.format(io_name="__contains__", io="output", node_name="node"),
warning_template.format(io_name="items", io="output", node_name="upstream_node"),
warning_template.format(io_name="keys", io="input", node_name="downstream_node"),
warning_template.format(io_name="__hash__", io="output", node_name="pipeline_component_func"),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command
name: component_with_keys_in_inputs
command: echo ${{inputs.keys}}
inputs:
keys:
type: uri_folder
environment: azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command
name: component_with_keyword_in_outputs
command: echo ${{outputs.__contains__}}
outputs:
__contains__:
type: uri_folder
environment: azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from pathlib import Path

from azure.ai.ml import load_component
from azure.ai.ml.dsl import pipeline

upstream_component = load_component(Path(__file__).parent / "upstream_node.yml")
downstream_component = load_component(Path(__file__).parent / "downstream_node.yml")
inner_component = load_component(Path(__file__).parent / "inner_node.yml")


@pipeline
def pipeline_component_func():
node = inner_component()
return {"__hash__": node.outputs["__contains__"]}


@pipeline
def pipeline_func():
upstream_node = upstream_component()
downstream_node = downstream_component(keys=upstream_node.outputs["items"]) # noqa: F841
pipeline_component_func()


pipeline_job = pipeline_func()
pipeline_job.settings.default_compute = "cpu-cluster"
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command
name: component_with_items_in_outputs
command: echo ${{outputs.items}}
outputs:
items:
type: uri_folder
environment: azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1

0 comments on commit e94435e

Please sign in to comment.