diff --git a/.env.example b/.env.example index 01dc40e8..3c63d969 100644 --- a/.env.example +++ b/.env.example @@ -1,30 +1,33 @@ # Azure Subscription Variables SUBSCRIPTION_ID = '' -LOCATION = 'westeurope' +LOCATION = '' TENANT_ID = '' BASE_NAME = '' SP_APP_ID = '' SP_APP_SECRET = '' -RESOUCE_GROUP = 'mlops-rg' +RESOURCE_GROUP = 'mlops-RG' # Mock build/release ID for local testing BUILD_BUILDID = '001' # Azure ML Workspace Variables -WORKSPACE_NAME = 'aml-workspace' -EXPERIMENT_NAME = '' +WORKSPACE_NAME = 'mlops-aml-ws' +EXPERIMENT_NAME = 'mlopspython' # AML Compute Cluster Config AML_ENV_NAME='diabetes_regression_training_env' +AML_ENV_TRAIN_CONDA_DEP_FILE="conda_dependencies.yml" AML_COMPUTE_CLUSTER_NAME = 'train-cluster' AML_COMPUTE_CLUSTER_CPU_SKU = 'STANDARD_DS2_V2' AML_CLUSTER_MAX_NODES = '4' AML_CLUSTER_MIN_NODES = '0' AML_CLUSTER_PRIORITY = 'lowpriority' # Training Config -MODEL_NAME = 'sklearn_regression_model.pkl' +MODEL_NAME = 'diabetes_regression_model.pkl' MODEL_VERSION = '1' TRAIN_SCRIPT_PATH = 'training/train.py' + + # AML Pipeline Config TRAINING_PIPELINE_NAME = 'Training Pipeline' MODEL_PATH = '' @@ -51,3 +54,28 @@ ALLOW_RUN_CANCEL = 'true' # Flag to allow rebuilding the AML Environment after it was built for the first time. This enables dependency updates from conda_dependencies.yaml. AML_REBUILD_ENVIRONMENT = 'false' + + + +USE_GPU_FOR_SCORING = "false" +AML_ENV_SCORE_CONDA_DEP_FILE="conda_dependencies_scoring.yml" +AML_ENV_SCORECOPY_CONDA_DEP_FILE="conda_dependencies_scorecopy.yml" +# AML Compute Cluster Config for parallel batch scoring +AML_ENV_NAME_SCORING='diabetes_regression_scoring_env' +AML_ENV_NAME_SCORE_COPY='diabetes_regression_score_copy_env' +AML_COMPUTE_CLUSTER_NAME_SCORING = 'score-cluster' +AML_COMPUTE_CLUSTER_CPU_SKU_SCORING = 'STANDARD_DS2_V2' +AML_CLUSTER_MAX_NODES_SCORING = '4' +AML_CLUSTER_MIN_NODES_SCORING = '0' +AML_CLUSTER_PRIORITY_SCORING = 'lowpriority' +AML_REBUILD_ENVIRONMENT_SCORING = 'true' +BATCHSCORE_SCRIPT_PATH = 'scoring/parallel_batchscore.py' +BATCHSCORE_COPY_SCRIPT_PATH = 'scoring/parallel_batchscore_copyoutput.py' + + +SCORING_DATASTORE_INPUT_CONTAINER = 'input' +SCORING_DATASTORE_INPUT_FILENAME = 'diabetes_scoring_input.csv' +SCORING_DATASTORE_OUTPUT_CONTAINER = 'output' +SCORING_DATASTORE_OUTPUT_FILENAME = 'diabetes_scoring_output.csv' +SCORING_DATASET_NAME = 'diabetes_scoring_ds' +SCORING_PIPELINE_NAME = 'diabetes-scoring-pipeline' \ No newline at end of file diff --git a/.pipelines/diabetes_regression-batchscoring-ci.yml b/.pipelines/diabetes_regression-batchscoring-ci.yml new file mode 100644 index 00000000..79a7f46f --- /dev/null +++ b/.pipelines/diabetes_regression-batchscoring-ci.yml @@ -0,0 +1,64 @@ +# Continuous Integration (CI) pipeline that orchestrates the batch scoring of the diabetes_regression model. + +resources: + containers: + - container: mlops + image: mcr.microsoft.com/mlops/python:latest + + +pr: none +trigger: + branches: + include: + - master + paths: + include: + - diabetes_regression/scoring/parallel_batchscore.py + - ml_service/pipelines/diabetes_regression_build_parallel_batchscore_pipeline.py + - ml_service/pipelines/run_parallel_batchscore_pipeline.py + +variables: +- template: diabetes_regression-variables-template.yml +- group: devopsforai-aml-vg + +pool: + vmImage: ubuntu-latest + +stages: +- stage: 'Batch_Scoring_Pipeline_CI' + displayName: 'Batch Scoring Pipeline CI' + jobs: + - job: "Build_Batch_Scoring_Pipeline" + displayName: "Build Batch Scoring Pipeline" + container: mlops + timeoutInMinutes: 0 + steps: + - template: code-quality-template.yml + - task: AzureCLI@1 + name: publish_batchscore + inputs: + azureSubscription: '$(WORKSPACE_SVC_CONNECTION)' + scriptLocation: inlineScript + workingDirectory: $(Build.SourcesDirectory) + inlineScript: | + set -e # fail on error + export SUBSCRIPTION_ID=$(az account show --query id -o tsv) + # Invoke the Python building and publishing a training pipeline + python -m ml_service.pipelines.diabetes_regression_build_parallel_batchscore_pipeline + + - job: "Run_Batch_Score_Pipeline" + displayName: "Run Batch Scoring Pipeline" + dependsOn: "Build_Batch_Scoring_Pipeline" + timeoutInMinutes: 240 + pool: server + variables: + pipeline_id: $[ dependencies.Build_Batch_Scoring_Pipeline.outputs['publish_batchscore.pipeline_id']] + steps: + - task: ms-air-aiagility.vss-services-azureml.azureml-restApi-task.MLPublishedPipelineRestAPITask@0 + displayName: 'Invoke Batch Scoring pipeline' + inputs: + azureSubscription: '$(WORKSPACE_SVC_CONNECTION)' + PipelineId: '$(pipeline_id)' + ExperimentName: '$(EXPERIMENT_NAME)' + PipelineParameters: '"ParameterAssignments": {"model_name": "$(MODEL_NAME)"}' + \ No newline at end of file diff --git a/.pipelines/diabetes_regression-variables-template.yml b/.pipelines/diabetes_regression-variables-template.yml index 159f76af..cf4ef9cf 100644 --- a/.pipelines/diabetes_regression-variables-template.yml +++ b/.pipelines/diabetes_regression-variables-template.yml @@ -16,6 +16,7 @@ variables: # The path to the model scoring script relative to SOURCES_DIR_TRAIN - name: SCORE_SCRIPT value: scoring/score.py + # Azure ML Variables - name: EXPERIMENT_NAME @@ -35,6 +36,8 @@ variables: # AML Compute Cluster Config - name: AML_ENV_NAME value: diabetes_regression_training_env + - name: AML_ENV_TRAIN_CONDA_DEP_FILE + value: "conda_dependencies.yml" - name: AML_COMPUTE_CLUSTER_CPU_SKU value: STANDARD_DS2_V2 - name: AML_COMPUTE_CLUSTER_NAME @@ -69,3 +72,62 @@ variables: # Flag to allow rebuilding the AML Environment after it was built for the first time. This enables dependency updates from conda_dependencies.yaml. # - name: AML_REBUILD_ENVIRONMENT # value: "false" + + # Variables below are used for controlling various aspects of batch scoring + - name: USE_GPU_FOR_SCORING + value: False + # Conda dependencies for the batch scoring step + - name: AML_ENV_SCORE_CONDA_DEP_FILE + value: "conda_dependencies_scoring.yml" + # Conda dependencies for the score copying step + - name: AML_ENV_SCORECOPY_CONDA_DEP_FILE + value: "conda_dependencies_scorecopy.yml" + # AML Compute Cluster Config for parallel batch scoring + - name: AML_ENV_NAME_SCORING + value: diabetes_regression_scoring_env + - name: AML_ENV_NAME_SCORE_COPY + value: diabetes_regression_score_copy_env + - name: AML_COMPUTE_CLUSTER_CPU_SKU_SCORING + value: STANDARD_DS2_V2 + - name: AML_COMPUTE_CLUSTER_NAME_SCORING + value: score-cluster + - name: AML_CLUSTER_MIN_NODES_SCORING + value: 0 + - name: AML_CLUSTER_MAX_NODES_SCORING + value: 4 + - name: AML_CLUSTER_PRIORITY_SCORING + value: lowpriority + # The path to the batch scoring script relative to SOURCES_DIR_TRAIN + - name: BATCHSCORE_SCRIPT_PATH + value: scoring/parallel_batchscore.py + - name: BATCHSCORE_COPY_SCRIPT_PATH + value: scoring/parallel_batchscore_copyoutput.py + # Flag to allow rebuilding the AML Environment after it was built for the first time. + # This enables dependency updates from the conda dependencies yaml for scoring activities. + - name: AML_REBUILD_ENVIRONMENT_SCORING + value: "true" + + # Datastore config for scoring + # The storage account name and key are supplied as variables in a variable group + # in the Azure Pipelines library for this project. Please refer to repo docs for + # more details + + # Blob container where the input data for scoring can be found + - name: SCORING_DATASTORE_INPUT_CONTAINER + value: "input" + # Blobname for the input data - include any applicable path in the string + - name: SCORING_DATASTORE_INPUT_FILENAME + value: "diabetes_scoring_input.csv" + # Blob container where the output data for scoring can be found + - name: SCORING_DATASTORE_OUTPUT_CONTAINER + value: "output" + # Blobname for the output data - include any applicable path in the string + - name: SCORING_DATASTORE_OUTPUT_FILENAME + value: "diabetes_scoring_output.csv" + # Dataset name for input data for scoring + - name: SCORING_DATASET_NAME + value: "diabetes_scoring_ds" + # Scoring pipeline name + - name: SCORING_PIPELINE_NAME + value: "diabetes-scoring-pipeline" + \ No newline at end of file diff --git a/bootstrap/bootstrap.py b/bootstrap/bootstrap.py index 6e51b503..cbc49e18 100644 --- a/bootstrap/bootstrap.py +++ b/bootstrap/bootstrap.py @@ -87,10 +87,12 @@ def replace_project_name(project_dir, project_name, rename_name): r".pipelines/diabetes_regression-ci.yml", r".pipelines/abtest.yml", r".pipelines/diabetes_regression-ci-image.yml", + r".pipelines/diabetes_regression-batchscoring-ci.yml", r".pipelines/diabetes_regression-get-model-version-template.yml", # NOQA: E501 r".pipelines/diabetes_regression-variables-template.yml", r"environment_setup/Dockerfile", r"environment_setup/install_requirements.sh", + r"ml_service/pipelines/diabetes_regression_build_parallel_batchscore_pipeline.py", # NOQA: E501 r"ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r_on_dbricks.py", # NOQA: E501 r"ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r.py", # NOQA: E501 r"ml_service/pipelines/diabetes_regression_build_train_pipeline.py", # NOQA: E501 diff --git a/diabetes_regression/conda_dependencies_scorecopy.yml b/diabetes_regression/conda_dependencies_scorecopy.yml new file mode 100644 index 00000000..dffafd08 --- /dev/null +++ b/diabetes_regression/conda_dependencies_scorecopy.yml @@ -0,0 +1,31 @@ +# Conda environment specification. The dependencies defined in this file will +# be automatically provisioned for managed runs. These include runs against +# the localdocker, remotedocker, and cluster compute targets. + +# Note that this file is NOT used to automatically manage dependencies for the +# local compute target. To provision these dependencies locally, run: +# conda env update --file conda_dependencies.yml + +# Details about the Conda environment file format: +# https://conda.io/docs/using/envs.html#create-environment-file-by-hand + +# For managing Spark packages and configuration, see spark_dependencies.yml. +# Version of this configuration file's structure and semantics in AzureML. +# This directive is stored in a comment to preserve the Conda file structure. +# [AzureMlVersion] = 2 + +# These dependencies are used to create the environment used by the batch score +# copy pipeline step +name: diabetes_regression_score_copy_env +dependencies: + # The python interpreter version. + # Currently Azure ML Workbench only supports 3.5.2 and later. + - python=3.7.* + - pip + + - pip: + # Base AzureML SDK + - azureml-sdk==1.6.* + + # Score copying deps + - azure-storage-blob diff --git a/diabetes_regression/conda_dependencies_scoring.yml b/diabetes_regression/conda_dependencies_scoring.yml new file mode 100644 index 00000000..60c45c44 --- /dev/null +++ b/diabetes_regression/conda_dependencies_scoring.yml @@ -0,0 +1,32 @@ +# Conda environment specification. The dependencies defined in this file will +# be automatically provisioned for managed runs. These include runs against +# the localdocker, remotedocker, and cluster compute targets. + +# Note that this file is NOT used to automatically manage dependencies for the +# local compute target. To provision these dependencies locally, run: +# conda env update --file conda_dependencies.yml + +# Details about the Conda environment file format: +# https://conda.io/docs/using/envs.html#create-environment-file-by-hand + +# For managing Spark packages and configuration, see spark_dependencies.yml. +# Version of this configuration file's structure and semantics in AzureML. +# This directive is stored in a comment to preserve the Conda file structure. +# [AzureMlVersion] = 2 + +# These dependencies are used to create the environment used by the batch score +# pipeline step +name: diabetes_regression_scoring_env +dependencies: + # The python interpreter version. + # Currently Azure ML Workbench only supports 3.5.2 and later. + - python=3.7.* + - pip + + - pip: + # Base AzureML SDK + - azureml-sdk==1.6.* + + # Scoring deps + - scikit-learn + - pandas diff --git a/diabetes_regression/scoring/parallel_batchscore.py b/diabetes_regression/scoring/parallel_batchscore.py new file mode 100644 index 00000000..aef6f3fb --- /dev/null +++ b/diabetes_regression/scoring/parallel_batchscore.py @@ -0,0 +1,139 @@ +""" +Copyright (C) Microsoft Corporation. All rights reserved.​ + ​ +Microsoft Corporation (“Microsoft”) grants you a nonexclusive, perpetual, +royalty-free right to use, copy, and modify the software code provided by us +("Software Code"). You may not sublicense the Software Code or any use of it +(except to your affiliates and to vendors to perform work on your behalf) +through distribution, network access, service agreement, lease, rental, or +otherwise. This license does not purport to express any claim of ownership over +data you may have shared with Microsoft in the creation of the Software Code. +Unless applicable law gives you more rights, Microsoft reserves all other +rights not expressly granted herein, whether by implication, estoppel or +otherwise. ​ + ​ +THE SOFTWARE CODE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS +OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +MICROSOFT OR ITS LICENSORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THE SOFTWARE CODE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. +""" + +import numpy as np +import pandas as pd +import joblib +import sys +from typing import List +from util.model_helper import get_latest_model + +model = None + + +def parse_args() -> List[str]: + """ + The AML pipeline calls this file with a set of additional command + line arguments whose names are not documented. As such using the + ArgumentParser which necessitates that we supply the names of the + arguments is risky should those undocumented names change. Hence + we parse the arguments manually. + + :returns: List of model filters + + :raises: ValueError + """ + model_name_param = [ + (sys.argv[idx], sys.argv[idx + 1]) + for idx, itm in enumerate(sys.argv) + if itm == "--model_name" + ] + + if len(model_name_param) == 0: + raise ValueError( + "Model name is required but no model name parameter was passed to the script" # NOQA: E501 + ) + + model_name = model_name_param[0][1] + + model_tag_name_param = [ + (sys.argv[idx], sys.argv[idx + 1]) + for idx, itm in enumerate(sys.argv) + if itm == "--model_tag_name" + ] + model_tag_name = ( + None + if len(model_tag_name_param) < 1 + or len(model_tag_name_param[0][1].strip()) == 0 # NOQA: E501 + else model_tag_name_param[0][1] + ) + + model_tag_value_param = [ + (sys.argv[idx], sys.argv[idx + 1]) + for idx, itm in enumerate(sys.argv) + if itm == "--model_tag_value" + ] + model_tag_value = ( + None + if len(model_tag_value_param) < 1 + or len(model_tag_name_param[0][1].strip()) == 0 + else model_tag_value_param[0][1] + ) + + return [model_name, model_tag_name, model_tag_value] + + +def init(): + """ + Initializer called once per node that runs the scoring job. Parse command + line arguments and get the right model to use for scoring. + """ + try: + print("Initializing batch scoring script...") + + model_filter = parse_args() + amlmodel = get_latest_model( + model_filter[0], model_filter[1], model_filter[2] + ) # NOQA: E501 + + global model + modelpath = amlmodel.get_model_path(model_name=model_filter[0]) + model = joblib.load(modelpath) + print("Loaded model {}".format(model_filter[0])) + except Exception as ex: + print("Error: {}".format(ex)) + + +def run(mini_batch: pd.DataFrame) -> pd.DataFrame: + """ + The run method is called multiple times by the runtime. Each time + a mini-batch consisting of a portion of the input data is passed + in as a pandas DataFrame. The run method should return the scoring + results as a List or a pandas DataFrame. + + :param mini_batch: Dataframe containing a portion of the scoring data + + :returns: array containing the scores. + """ + + try: + result = None + + for _, sample in mini_batch.iterrows(): + # prediction + pred = model.predict(sample.values.reshape(1, -1)) + result = ( + np.array(pred) if result is None else np.vstack((result, pred)) + ) # NOQA: E501 + + return ( + [] + if result is None + else mini_batch.join(pd.DataFrame(result, columns=["score"])) + ) + + except Exception as ex: + print(ex) diff --git a/diabetes_regression/scoring/parallel_batchscore_copyoutput.py b/diabetes_regression/scoring/parallel_batchscore_copyoutput.py new file mode 100644 index 00000000..cc4af42c --- /dev/null +++ b/diabetes_regression/scoring/parallel_batchscore_copyoutput.py @@ -0,0 +1,91 @@ +""" +Copyright (C) Microsoft Corporation. All rights reserved.​ + ​ +Microsoft Corporation (“Microsoft”) grants you a nonexclusive, perpetual, +royalty-free right to use, copy, and modify the software code provided by us +("Software Code"). You may not sublicense the Software Code or any use of it +(except to your affiliates and to vendors to perform work on your behalf) +through distribution, network access, service agreement, lease, rental, or +otherwise. This license does not purport to express any claim of ownership over +data you may have shared with Microsoft in the creation of the Software Code. +Unless applicable law gives you more rights, Microsoft reserves all other +rights not expressly granted herein, whether by implication, estoppel or +otherwise. ​ + ​ +THE SOFTWARE CODE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS +OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +MICROSOFT OR ITS LICENSORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THE SOFTWARE CODE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. +""" + +from azure.storage.blob import ContainerClient +from datetime import datetime, date, timezone +import argparse +import os + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--output_path", type=str, default=None) + parser.add_argument("--scoring_datastore", type=str, default=None) + parser.add_argument("--score_container", type=str, default=None) + parser.add_argument("--scoring_datastore_key", type=str, default=None) + parser.add_argument("--scoring_output_filename", type=str, default=None) + + return parser.parse_args() + + +def copy_output(args): + print("Output : {}".format(args.output_path)) + + accounturl = "https://{}.blob.core.windows.net".format( + args.scoring_datastore + ) # NOQA E501 + + containerclient = ContainerClient( + accounturl, args.score_container, args.scoring_datastore_key + ) + + destfolder = date.today().isoformat() + filetime = ( + datetime.now(timezone.utc) + .time() + .isoformat("milliseconds") + .replace(":", "_") + .replace(".", "_") + ) # noqa E501 + destfilenameparts = args.scoring_output_filename.split(".") + destblobname = "{}/{}_{}.{}".format( + destfolder, destfilenameparts[0], filetime, destfilenameparts[1] + ) + + destblobclient = containerclient.get_blob_client(destblobname) + with open( + os.path.join(args.output_path, "parallel_run_step.txt"), "rb" + ) as scorefile: # noqa E501 + destblobclient.upload_blob(scorefile, blob_type="BlockBlob") + + +if __name__ == "__main__": + args = parse_args() + if ( + args.scoring_datastore is None + or args.scoring_datastore.strip() == "" + or args.score_container is None + or args.score_container.strip() == "" + or args.scoring_datastore_key is None + or args.scoring_datastore_key.strip() == "" + or args.scoring_output_filename is None + or args.scoring_output_filename.strip() == "" + or args.output_path is None + or args.output_path.strip() == "" + ): + print("Missing parameters") + else: + copy_output(args) diff --git a/docs/getting_started.md b/docs/getting_started.md index 89981fa3..cba7e424 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -64,6 +64,8 @@ The variable group should contain the following required variables. **Azure reso | AZURE_RM_SVC_CONNECTION | azure-resource-connection | [Azure Resource Manager Service Connection](#create-an-azure-devops-service-connection-for-the-azure-resource-manager) name | | WORKSPACE_SVC_CONNECTION | aml-workspace-connection | [Azure ML Workspace Service Connection](#create-an-azure-devops-azure-ml-workspace-service-connection) name | | ACI_DEPLOYMENT_NAME | mlops-aci | [Azure Container Instances](https://azure.microsoft.com/en-us/services/container-instances/) name | +| SCORING_DATASTORE_STORAGE_NAME | [your project name]scoredata | [Azure Blob Storage Account](https://docs.microsoft.com/en-us/azure/storage/blobs/) name (optional) | +| SCORING_DATASTORE_ACCESS_KEY | | [Azure Storage Account Key](https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-requests-to-azure-storage) (optional) | Make sure you select the **Allow access to all pipelines** checkbox in the variable group configuration. @@ -85,10 +87,17 @@ More variables are available for further tweaking, but the above variables are a **ACI_DEPLOYMENT_NAME** is used for naming the scoring service during deployment to [Azure Container Instances](https://azure.microsoft.com/en-us/services/container-instances/). +**SCORING_DATASTORE_STORAGE_NAME** is the name for an Azure Blob Storage account that will contain both data used as input to batch scoring, as well as the batch scoring outputs. This variable is optional and only needed if you intend to use the batch scoring facility. Note that since this resource is optional, the resource provisioning pipelines mentioned below do not create this resource automatically, and manual creation is required before use. + +**SCORING_DATASTORE_ACCESS_KEY** is the access key for the scoring data Azure storage account mentioned above. You may want to consider linking this variable to Azure KeyVault to avoid storing the access key in plain text. This variable is optional and only needed if you intend to use the batch scoring facility. + + ## Provisioning resources using Azure Pipelines The easiest way to create all required Azure resources (Resource Group, Azure ML Workspace, Container Registry, and others) is to use the **Infrastructure as Code (IaC)** [pipeline with ARM templates](../environment_setup/iac-create-environment-pipeline-arm.yml) or the [pipeline with Terraform templates](../environment_setup/iac-create-environment-pipeline-tf.yml). The pipeline takes care of setting up all required resources based on these [Azure Resource Manager templates](../environment_setup/arm-templates/cloud-environment.json), or based on these [Terraform templates](../environment_setup/tf-templates). +**Note:** Since Azure Blob storage account required for batch scoring is optional, the resource provisioning pipelines mentioned above do not create this resource automatically, and manual creation is required before use. + ### Create an Azure DevOps Service Connection for the Azure Resource Manager The [IaC provisioning pipeline](../environment_setup/iac-create-environment-pipeline.yml) requires an **Azure Resource Manager** [service connection](https://docs.microsoft.com/en-us/azure/devops/pipelines/library/service-endpoints?view=azure-devops&tabs=yaml#create-a-service-connection). @@ -133,14 +142,19 @@ You'll need sufficient permissions to register an application with your Azure AD ## Set up Build, Release Trigger, and Release Multi-Stage Pipeline -Now that you've provisioned all the required Azure resources and service connections, you can set up the pipeline for deploying your machine learning model to production. The pipeline has a sequence of stages for: +Now that you've provisioned all the required Azure resources and service connections, you can set up the pipelines for deploying your machine learning model to production. + +**There are two main Azure pipelines - one to handle model training and another to handle batch scoring of the model.** + +### **Azure [pipeline](../.pipelines/diabetes_regression-ci.yml) for model training and deployment** +This pipeline has a sequence of stages for: 1. **Model Code Continuous Integration:** triggered on code changes to master branch on GitHub. Runs linting, unit tests, code coverage and publishes a training pipeline. 1. **Train Model**: invokes the Azure ML service to trigger the published training pipeline to train, evaluate, and register a model. 1. **Release Deployment:** deploys a model to either [Azure Container Instances (ACI)](https://azure.microsoft.com/en-us/services/container-instances/), [Azure Kubernetes Service (AKS)](https://azure.microsoft.com/en-us/services/kubernetes-service), or [Azure App Service](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-deploy-app-service) environments. For simplicity, you're going to initially focus on Azure Container Instances. See [Further Exploration](#further-exploration) for other deployment types. 1. **Note:** Edit the pipeline definition to remove unused stages. For example, if you're deploying to Azure Container Instances and Azure Kubernetes Service only, delete the unused `Deploy_Webapp` stage. -### Set up the Pipeline +### Set up the Training Pipeline In your Azure DevOps project, create and run a new build pipeline based on the [diabetes_regression-ci.yml](../.pipelines/diabetes_regression-ci.yml) pipeline definition in your forked repository. @@ -155,7 +169,7 @@ Also check the published training pipeline in the **mlops-AML-WS** workspace in ![Training pipeline](./images/training-pipeline.png) -Great, you now have the build pipeline set up which automatically triggers every time there's a change in the master branch! +Great, you now have the build pipeline for training set up which automatically triggers every time there's a change in the master branch! The pipeline stages are summarized below: @@ -191,6 +205,46 @@ To disable the automatic trigger of the training pipeline, change the `auto-trig To skip model training and registration, and deploy a model successfully registered by a previous build (for testing changes to the score file or inference configuration), add the variable `MODEL_BUILD_ID` when the pipeline is queued, and set the value to the ID of the previous build. + +### **Azure [pipeline](../.pipelines/diabetes_regression-batchscoring-ci.yml) for batch scoring** +This pipeline has a sequence of stages for: +1. **Batch Scoring Code Continuous Integration:** triggered on code changes to master branch on GitHub. Runs linting, unit tests, code coverage and publishes a batch scoring pipeline. +1. **Run Batch Scoring**: invokes the published batch scoring pipeline to score a model. + +### Set up the Batch Scoring Pipeline + +In your Azure DevOps project, create and run a new build pipeline based on the [diabetes_regression-batchscoring-ci.yml](../.pipelines/diabetes_regression-batchscoring-ci.yml) +pipeline definition in your forked repository. + +Once the pipeline is finished, check the execution result: + +![Build](./images/batchscoring-ci-result.png) + +Also check the published batch scoring pipeline in the **mlops-AML-WS** workspace in [Azure Portal](https://portal.azure.com/): + +![Batch scoring pipeline](./images/batchscoring-pipeline.png) + +Great, you now have the build pipeline set up for batch scoring which automatically triggers every time there's a change in the master branch! + +The pipeline stages are summarized below: + +#### Batch Scoring CI + +- Linting (code quality analysis) +- Unit tests and code coverage analysis +- Build and publish *ML Batch Scoring Pipeline* in an *ML Workspace* + +#### Batch Score model + +- Determine the model to be used based on the model name, model tag name and model tag value bound pipeline parameters. +- Trigger the *ML Batch Scoring Pipeline* and waits for it to complete. + - This is an **agentless** job. The CI pipeline can wait for ML pipeline completion for hours or even days without using agent resources. +- Use the scoring input data supplied via the SCORING_DATASTORE_INPUT_* configuration variables. +- Once scoring is completed, the scores are made available in the same blob storage at the locations specified via the SCORING_DATASTORE_OUTPUT_* configuration variables. + +**Note** In the event a scoring data store is not yet configured, you can still try out batch scoring by supplying a scoring input data file within the data folder. Do make sure to set the SCORING_DATASTORE_INPUT_FILENAME variable to the name of the file. This approach will cause the score output to be written to the ML workspace's default datastore. + + ## Further Exploration You should now have a working pipeline that can get you started with MLOpsPython. Below are some additional features offered that might suit your scenario. diff --git a/docs/images/batchscoring-ci-result.png b/docs/images/batchscoring-ci-result.png new file mode 100644 index 00000000..d07d41a8 Binary files /dev/null and b/docs/images/batchscoring-ci-result.png differ diff --git a/docs/images/batchscoring-pipeline.png b/docs/images/batchscoring-pipeline.png new file mode 100644 index 00000000..2b79fe03 Binary files /dev/null and b/docs/images/batchscoring-pipeline.png differ diff --git a/ml_service/pipelines/diabetes_regression_build_parallel_batchscore_pipeline.py b/ml_service/pipelines/diabetes_regression_build_parallel_batchscore_pipeline.py new file mode 100644 index 00000000..d7acbf46 --- /dev/null +++ b/ml_service/pipelines/diabetes_regression_build_parallel_batchscore_pipeline.py @@ -0,0 +1,482 @@ +""" +Copyright (C) Microsoft Corporation. All rights reserved.​ + ​ +Microsoft Corporation (“Microsoft”) grants you a nonexclusive, perpetual, +royalty-free right to use, copy, and modify the software code provided by us +("Software Code"). You may not sublicense the Software Code or any use of it +(except to your affiliates and to vendors to perform work on your behalf) +through distribution, network access, service agreement, lease, rental, or +otherwise. This license does not purport to express any claim of ownership over +data you may have shared with Microsoft in the creation of the Software Code. +Unless applicable law gives you more rights, Microsoft reserves all other +rights not expressly granted herein, whether by implication, estoppel or +otherwise. ​ + ​ +THE SOFTWARE CODE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS +OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +MICROSOFT OR ITS LICENSORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THE SOFTWARE CODE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. +""" +import os +from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep +from ml_service.util.manage_environment import get_environment +from ml_service.pipelines.load_sample_data import create_sample_data_csv +from ml_service.util.env_variables import Env +from ml_service.util.attach_compute import get_compute +from azureml.core import ( + Workspace, + Dataset, + Datastore, + Model, + RunConfiguration, +) +from azureml.pipeline.core import Pipeline, PipelineData, PipelineParameter +from azureml.core.compute import ComputeTarget +from azureml.data.datapath import DataPath +from azureml.pipeline.steps import PythonScriptStep +from argparse import ArgumentParser, Namespace +from typing import Tuple + + +def parse_args() -> Namespace: + """ + Parse arguments supplied to the pipeline creation script. + The only allowed arguments are model_tag_name and model_tag_value + specifying a custom tag/value pair to help locate a specific model. + + + :returns: Namespace with two attributes model_tag_name and model_tag_value + and corresponding values + + """ + parser = ArgumentParser() + parser.add_argument("--model_tag_name", default=None, type=str) + parser.add_argument("--model_tag_value", default=None, type=str) + args = parser.parse_args() + return args + + +def get_model( + ws: Workspace, env: Env, tagname: str = None, tagvalue: str = None +) -> Model: + """ + Gets a model from the models registered with the AML workspace. + If a tag/value pair is supplied, uses it to filter. + + :param ws: Current AML workspace + :param env: Environment variables + :param tagname: Optional tag name, default is None + :param tagvalue: Optional tag value, default is None + + :returns: Model + + :raises: ValueError + """ + if tagname is not None and tagvalue is not None: + model = Model(ws, name=env.model_name, tags=[[tagname, tagvalue]]) + elif (tagname is None and tagvalue is not None) or ( + tagvalue is None and tagname is not None + ): + raise ValueError( + "model_tag_name and model_tag_value should both be supplied" + + "or excluded" # NOQA: E501 + ) + else: + model = Model(ws, name=env.model_name) + return model + + +def get_or_create_datastore( + datastorename: str, ws: Workspace, env: Env, input: bool = True +) -> Datastore: + """ + Obtains a datastore with matching name. Creates it if none exists. + + :param datastorename: Name of the datastore + :param ws: Current AML Workspace + :param env: Environment variables + :param input: Datastore points to the input container if + this is True(default) or the output storage container otherwise + + :returns: Datastore + + :raises: ValueError + """ + if datastorename is None: + raise ValueError("Datastore name is required.") + + containername = ( + env.scoring_datastore_input_container + if input + else env.scoring_datastore_output_container + ) + + if datastorename in ws.datastores: + + datastore = ws.datastores[datastorename] + + # the datastore is not registered but we have all details to register it + elif ( + env.scoring_datastore_access_key is not None + and containername is not None # NOQA: E501 + ): # NOQA:E501 + + datastore = Datastore.register_azure_blob_container( + workspace=ws, + datastore_name=datastorename, + account_name=env.scoring_datastore_storage_name, + account_key=env.scoring_datastore_access_key, + container_name=containername, + ) + else: + raise ValueError( + "No existing datastore named {} nor was enough information supplied to create one.".format( # NOQA: E501 + datastorename + ) + ) + + return datastore + + +def get_input_dataset(ws: Workspace, ds: Datastore, env: Env) -> Dataset: + """ + Gets an input dataset wrapped around an input data file. The input + data file is assumed to exist in the supplied datastore. + + + :param ws: AML Workspace + :param ds: Datastore containing the data file + :param env: Environment variables + + :returns: Input Dataset + """ + + scoringinputds = Dataset.Tabular.from_delimited_files( + path=DataPath(ds, env.scoring_datastore_input_filename) + ) + + scoringinputds = scoringinputds.register( + ws, + name=env.scoring_dataset_name, + tags={"purpose": "scoring input", "format": "csv"}, + create_new_version=True, + ).as_named_input(env.scoring_dataset_name) + + return scoringinputds + + +def get_fallback_input_dataset(ws: Workspace, env: Env) -> Dataset: + """ + Called when an input datastore does not exist or no input data file exists + at that location. Create a sample dataset using the diabetes dataset from + scikit-learn. Useful when debugging this code in the absence of the input + data location Azure blob. + + + :param ws: AML Workspace + :param env: Environment Variables + + :returns: Fallback input dataset + + :raises: FileNotFoundError + """ + # This call creates an example CSV from sklearn sample data. If you + # have already bootstrapped your project, you can comment this line + # out and use your own CSV. + create_sample_data_csv( + file_name=env.scoring_datastore_input_filename, for_scoring=True + ) + + if not os.path.exists(env.scoring_datastore_input_filename): + error_message = ( + "Could not find CSV dataset for scoring at {}. " + + "No alternate data store location was provided either.".format( + env.scoring_datastore_input_filename + ) # NOQA: E501 + ) + + raise FileNotFoundError(error_message) + + # upload the input data to the workspace default datastore + default_datastore = ws.get_default_datastore() + scoreinputdataref = default_datastore.upload_files( + [env.scoring_datastore_input_filename], + target_path="scoringinput", + overwrite=False, + ) + + scoringinputds = ( + Dataset.Tabular.from_delimited_files(scoreinputdataref) + .register(ws, env.scoring_dataset_name, create_new_version=True) + .as_named_input(env.scoring_dataset_name) + ) + + return scoringinputds + + +def get_output_location( + ws: Workspace, env: Env, outputdatastore: Datastore = None +) -> PipelineData: + """ + Returns a Datastore wrapped as a PipelineData instance suitable + for passing into a pipeline step. Represents the location where + the scoring output should be written. Uses the default workspace + blob store if no output datastore is supplied. + + + :param ws: AML Workspace + :param env: Environment Variables + :param outputdatastore: AML Datastore, optional, default is None + + :returns: PipelineData wrapping the output datastore + """ + + if outputdatastore is None: + output_loc = PipelineData( + name="defaultoutput", datastore=ws.get_default_datastore() + ) + else: + output_loc = PipelineData( + name=outputdatastore.name, datastore=outputdatastore + ) # NOQA: E501 + + return output_loc + + +def get_inputds_outputloc( + ws: Workspace, env: Env +) -> Tuple[Dataset, PipelineData]: # NOQA: E501 + """ + Prepare the input and output for the scoring step. Input is a tabular + dataset wrapped around the scoring data. Output is PipelineData + representing a location to write the scores down. + + :param ws: AML Workspace + :param env: Environment Variables + + :returns: Input dataset and output location + """ + + if env.scoring_datastore_storage_name is None: + # fall back to default + scoringinputds = get_fallback_input_dataset(ws, env) + output_loc = get_output_location(ws, env) + else: + inputdatastore = get_or_create_datastore( + "{}_in".format(env.scoring_datastore_storage_name), ws, env + ) + outputdatastore = get_or_create_datastore( + "{}_out".format(env.scoring_datastore_storage_name), + ws, + env, + input=False, # NOQA: E501 + ) + scoringinputds = get_input_dataset(ws, inputdatastore, env) + output_loc = get_output_location(ws, env, outputdatastore) + + return (scoringinputds, output_loc) + + +def get_run_configs( + ws: Workspace, computetarget: ComputeTarget, env: Env +) -> Tuple[ParallelRunConfig, RunConfiguration]: + """ + Creates the necessary run configurations required by the + pipeline to enable parallelized scoring. + + :param ws: AML Workspace + :param computetarget: AML Compute target + :param env: Environment Variables + + :returns: Tuple[Scoring Run configuration, Score copy run configuration] + """ + + # get a conda environment for scoring + environment = get_environment( + ws, + env.aml_env_name_scoring, + conda_dependencies_file=env.aml_env_score_conda_dep_file, + enable_docker=True, + use_gpu=env.use_gpu_for_scoring, + create_new=env.rebuild_env_scoring, + ) + + score_run_config = ParallelRunConfig( + entry_script=env.batchscore_script_path, + source_directory=env.sources_directory_train, + error_threshold=10, + output_action="append_row", + compute_target=computetarget, + node_count=env.max_nodes_scoring, + environment=environment, + run_invocation_timeout=300, + ) + + copy_run_config = RunConfiguration() + copy_run_config.environment = get_environment( + ws, + env.aml_env_name_score_copy, + conda_dependencies_file=env.aml_env_scorecopy_conda_dep_file, + enable_docker=True, + use_gpu=env.use_gpu_for_scoring, + create_new=env.rebuild_env_scoring, + ) + return (score_run_config, copy_run_config) + + +def get_scoring_pipeline( + model: Model, + scoring_dataset: Dataset, + output_loc: PipelineData, + score_run_config: ParallelRunConfig, + copy_run_config: RunConfiguration, + computetarget: ComputeTarget, + ws: Workspace, + env: Env, +) -> Pipeline: + """ + Creates the scoring pipeline. + + :param model: The model to use for scoring + :param scoring_dataset: Data to score + :param output_loc: Location to save the scoring results + :param score_run_config: Parallel Run configuration to support + parallelized scoring + :param copy_run_config: Script Run configuration to support + score copying + :param computetarget: AML Compute target + :param ws: AML Workspace + :param env: Environment Variables + + :returns: Scoring pipeline instance + """ + # To help filter the model make the model name, model version and a + # tag/value pair bindable parameters so that they can be passed to + # the pipeline when invoked either over REST or via the AML SDK. + model_name_param = PipelineParameter( + "model_name", default_value=env.model_name + ) # NOQA: E501 + model_tag_name_param = PipelineParameter( + "model_tag_name", default_value=" " + ) # NOQA: E501 + model_tag_value_param = PipelineParameter( + "model_tag_value", default_value=" " + ) # NOQA: E501 + + scoring_step = ParallelRunStep( + name="scoringstep", + inputs=[scoring_dataset], + output=output_loc, + arguments=[ + "--model_name", + model_name_param, + "--model_tag_name", + model_tag_name_param, + "--model_tag_value", + model_tag_value_param, + ], + parallel_run_config=score_run_config, + allow_reuse=False, + ) + + copying_step = PythonScriptStep( + name="scorecopystep", + script_name=env.batchscore_copy_script_path, + source_directory=env.sources_directory_train, + arguments=[ + "--output_path", + output_loc, + "--scoring_output_filename", + env.scoring_datastore_output_filename + if env.scoring_datastore_output_filename is not None + else "", + "--scoring_datastore", + env.scoring_datastore_storage_name + if env.scoring_datastore_storage_name is not None + else "", + "--score_container", + env.scoring_datastore_output_container + if env.scoring_datastore_output_container is not None + else "", + "--scoring_datastore_key", + env.scoring_datastore_access_key + if env.scoring_datastore_access_key is not None + else "", + ], + inputs=[output_loc], + allow_reuse=False, + compute_target=computetarget, + runconfig=copy_run_config, + ) + return Pipeline(workspace=ws, steps=[scoring_step, copying_step]) + + +def build_batchscore_pipeline(): + """ + Main method that builds and publishes a scoring pipeline. + """ + + try: + env = Env() + + args = parse_args() + + # Get Azure machine learning workspace + aml_workspace = Workspace.get( + name=env.workspace_name, + subscription_id=env.subscription_id, + resource_group=env.resource_group, + ) + + # Get Azure machine learning cluster + aml_compute_score = get_compute( + aml_workspace, + env.compute_name_scoring, + env.vm_size_scoring, + for_batch_scoring=True, + ) + + input_dataset, output_location = get_inputds_outputloc( + aml_workspace, env + ) # NOQA: E501 + + scoring_runconfig, score_copy_runconfig = get_run_configs( + aml_workspace, aml_compute_score, env + ) + + trained_model = get_model( + aml_workspace, env, args.model_tag_name, args.model_tag_value + ) + + scoring_pipeline = get_scoring_pipeline( + trained_model, + input_dataset, + output_location, + scoring_runconfig, + score_copy_runconfig, + aml_compute_score, + aml_workspace, + env, + ) + + published_pipeline = scoring_pipeline.publish( + name=env.scoring_pipeline_name, + description="Diabetes Batch Scoring Pipeline", + ) + pipeline_id_string = "##vso[task.setvariable variable=pipeline_id;isOutput=true]{}".format( # NOQA: E501 + published_pipeline.id + ) + print(pipeline_id_string) + except Exception as e: + print(e) + exit(1) + + +if __name__ == "__main__": + build_batchscore_pipeline() diff --git a/ml_service/pipelines/diabetes_regression_build_train_pipeline.py b/ml_service/pipelines/diabetes_regression_build_train_pipeline.py index dfe3f5b3..03937186 100644 --- a/ml_service/pipelines/diabetes_regression_build_train_pipeline.py +++ b/ml_service/pipelines/diabetes_regression_build_train_pipeline.py @@ -16,81 +16,90 @@ def main(): aml_workspace = Workspace.get( name=e.workspace_name, subscription_id=e.subscription_id, - resource_group=e.resource_group + resource_group=e.resource_group, ) print("get_workspace:") print(aml_workspace) # Get Azure machine learning cluster - aml_compute = get_compute( - aml_workspace, - e.compute_name, - e.vm_size) + aml_compute = get_compute(aml_workspace, e.compute_name, e.vm_size) if aml_compute is not None: print("aml_compute:") print(aml_compute) # Create a reusable Azure ML environment environment = get_environment( - aml_workspace, e.aml_env_name, create_new=e.rebuild_env) # + aml_workspace, + e.aml_env_name, + conda_dependencies_file=e.aml_env_train_conda_dep_file, + create_new=e.rebuild_env, + ) # run_config = RunConfiguration() run_config.environment = environment - if (e.datastore_name): + if e.datastore_name: datastore_name = e.datastore_name else: datastore_name = aml_workspace.get_default_datastore().name - run_config.environment.environment_variables["DATASTORE_NAME"] = datastore_name # NOQA: E501 + run_config.environment.environment_variables[ + "DATASTORE_NAME" + ] = datastore_name # NOQA: E501 - model_name_param = PipelineParameter( - name="model_name", default_value=e.model_name) + model_name_param = PipelineParameter(name="model_name", default_value=e.model_name) # NOQA: E501 dataset_version_param = PipelineParameter( - name="dataset_version", default_value=e.dataset_version) + name="dataset_version", default_value=e.dataset_version + ) data_file_path_param = PipelineParameter( - name="data_file_path", default_value="none") - caller_run_id_param = PipelineParameter( - name="caller_run_id", default_value="none") + name="data_file_path", default_value="none" + ) + caller_run_id_param = PipelineParameter(name="caller_run_id", default_value="none") # NOQA: E501 # Get dataset name dataset_name = e.dataset_name # Check to see if dataset exists - if (dataset_name not in aml_workspace.datasets): + if dataset_name not in aml_workspace.datasets: # This call creates an example CSV from sklearn sample data. If you # have already bootstrapped your project, you can comment this line # out and use your own CSV. create_sample_data_csv() # Use a CSV to read in the data set. - file_name = 'diabetes.csv' + file_name = "diabetes.csv" - if (not os.path.exists(file_name)): - raise Exception("Could not find CSV dataset at \"%s\". If you have bootstrapped your project, you will need to provide a CSV." % file_name) # NOQA: E501 + if not os.path.exists(file_name): + raise Exception( + 'Could not find CSV dataset at "%s". If you have bootstrapped your project, you will need to provide a CSV.' # NOQA: E501 + % file_name + ) # NOQA: E501 # Upload file to default datastore in workspace datatstore = Datastore.get(aml_workspace, datastore_name) - target_path = 'training-data/' + target_path = "training-data/" datatstore.upload_files( files=[file_name], target_path=target_path, overwrite=True, - show_progress=False) + show_progress=False, + ) # Register dataset path_on_datastore = os.path.join(target_path, file_name) dataset = Dataset.Tabular.from_delimited_files( - path=(datatstore, path_on_datastore)) + path=(datatstore, path_on_datastore) + ) dataset = dataset.register( workspace=aml_workspace, name=dataset_name, - description='diabetes training data', - tags={'format': 'CSV'}, - create_new_version=True) + description="diabetes training data", + tags={"format": "CSV"}, + create_new_version=True, + ) # Create a PipelineData to pass data between steps pipeline_data = PipelineData( - 'pipeline_data', - datastore=aml_workspace.get_default_datastore()) + "pipeline_data", datastore=aml_workspace.get_default_datastore() + ) train_step = PythonScriptStep( name="Train Model", @@ -99,12 +108,18 @@ def main(): source_directory=e.sources_directory_train, outputs=[pipeline_data], arguments=[ - "--model_name", model_name_param, - "--step_output", pipeline_data, - "--dataset_version", dataset_version_param, - "--data_file_path", data_file_path_param, - "--caller_run_id", caller_run_id_param, - "--dataset_name", dataset_name, + "--model_name", + model_name_param, + "--step_output", + pipeline_data, + "--dataset_version", + dataset_version_param, + "--data_file_path", + data_file_path_param, + "--caller_run_id", + caller_run_id_param, + "--dataset_name", + dataset_name, ], runconfig=run_config, allow_reuse=True, @@ -117,8 +132,10 @@ def main(): compute_target=aml_compute, source_directory=e.sources_directory_train, arguments=[ - "--model_name", model_name_param, - "--allow_run_cancel", e.allow_run_cancel, + "--model_name", + model_name_param, + "--allow_run_cancel", + e.allow_run_cancel, ], runconfig=run_config, allow_reuse=False, @@ -131,16 +148,13 @@ def main(): compute_target=aml_compute, source_directory=e.sources_directory_train, inputs=[pipeline_data], - arguments=[ - "--model_name", model_name_param, - "--step_input", pipeline_data, - ], + arguments=["--model_name", model_name_param, "--step_input", pipeline_data, ], # NOQA: E501 runconfig=run_config, allow_reuse=False, ) print("Step Register created") # Check run_evaluation flag to include or exclude evaluation step. - if ((e.run_evaluation).lower() == 'true'): + if (e.run_evaluation).lower() == "true": print("Include evaluation step before register step.") evaluate_step.run_after(train_step) register_step.run_after(evaluate_step) @@ -156,11 +170,11 @@ def main(): published_pipeline = train_pipeline.publish( name=e.pipeline_name, description="Model training/retraining pipeline", - version=e.build_id + version=e.build_id, ) - print(f'Published pipeline: {published_pipeline.name}') - print(f'for build {published_pipeline.version}') + print(f"Published pipeline: {published_pipeline.name}") + print(f"for build {published_pipeline.version}") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r.py b/ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r.py index b49ae53d..254f22eb 100644 --- a/ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r.py +++ b/ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r.py @@ -13,16 +13,13 @@ def main(): aml_workspace = Workspace.get( name=e.workspace_name, subscription_id=e.subscription_id, - resource_group=e.resource_group + resource_group=e.resource_group, ) print("get_workspace:") print(aml_workspace) # Get Azure machine learning cluster - aml_compute = get_compute( - aml_workspace, - e.compute_name, - e.vm_size) + aml_compute = get_compute(aml_workspace, e.compute_name, e.vm_size) if aml_compute is not None: print("aml_compute:") print(aml_compute) @@ -31,7 +28,11 @@ def main(): # Make sure to include `r-essentials' # in diabetes_regression/conda_dependencies.yml environment = get_environment( - aml_workspace, e.aml_env_name, create_new=e.rebuild_env) # NOQA: E501 + aml_workspace, + e.aml_env_name, + conda_dependencies_file=e.aml_env_train_conda_dep_file, + create_new=e.rebuild_env, + ) # NOQA: E501 run_config = RunConfiguration() run_config.environment = environment @@ -52,11 +53,11 @@ def main(): published_pipeline = train_pipeline.publish( name=e.pipeline_name, description="Model training/retraining pipeline", - version=e.build_id + version=e.build_id, ) - print(f'Published pipeline: {published_pipeline.name}') - print(f'for build {published_pipeline.version}') + print(f"Published pipeline: {published_pipeline.name}") + print(f"for build {published_pipeline.version}") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/ml_service/pipelines/load_sample_data.py b/ml_service/pipelines/load_sample_data.py index 717fc7ab..304a8e7b 100644 --- a/ml_service/pipelines/load_sample_data.py +++ b/ml_service/pipelines/load_sample_data.py @@ -5,12 +5,14 @@ # Loads the diabetes sample data from sklearn and produces a csv file that can # be used by the build/train pipeline script. -def create_sample_data_csv(): +def create_sample_data_csv(file_name: str = "diabetes.csv", + for_scoring: bool = False): sample_data = load_diabetes() df = pd.DataFrame( data=sample_data.data, columns=sample_data.feature_names) - df['Y'] = sample_data.target + if not for_scoring: + df['Y'] = sample_data.target # Hard code to diabetes so we fail fast if the project has been # bootstrapped. - df.to_csv('diabetes.csv', index=False) + df.to_csv(file_name, index=False) diff --git a/ml_service/pipelines/run_parallel_batchscore_pipeline.py b/ml_service/pipelines/run_parallel_batchscore_pipeline.py new file mode 100644 index 00000000..ec6cebae --- /dev/null +++ b/ml_service/pipelines/run_parallel_batchscore_pipeline.py @@ -0,0 +1,133 @@ +""" +Copyright (C) Microsoft Corporation. All rights reserved.​ + ​ +Microsoft Corporation (“Microsoft”) grants you a nonexclusive, perpetual, +royalty-free right to use, copy, and modify the software code provided by us +("Software Code"). You may not sublicense the Software Code or any use of it +(except to your affiliates and to vendors to perform work on your behalf) +through distribution, network access, service agreement, lease, rental, or +otherwise. This license does not purport to express any claim of ownership over +data you may have shared with Microsoft in the creation of the Software Code. +Unless applicable law gives you more rights, Microsoft reserves all other +rights not expressly granted herein, whether by implication, estoppel or +otherwise. ​ + ​ +THE SOFTWARE CODE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS +OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +MICROSOFT OR ITS LICENSORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THE SOFTWARE CODE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. +""" + +from azure.storage.blob import ContainerClient +from ml_service.util.env_variables import Env +from azureml.core import Experiment, Workspace +from azureml.pipeline.core import PublishedPipeline +import argparse + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--pipeline_id", type=str, default=None) + return parser.parse_args() + + +def get_pipeline(pipeline_id, ws: Workspace, env: Env): + if pipeline_id is not None: + scoringpipeline = PublishedPipeline.get(ws, pipeline_id) + else: + pipelines = PublishedPipeline.list(ws) + scoringpipelinelist = [ + pl for pl in pipelines if pl.name == env.scoring_pipeline_name + ] # noqa E501 + + if scoringpipelinelist.count == 0: + raise Exception( + "No pipeline found matching name:{}".format(env.scoring_pipeline_name) # NOQA: E501 + ) + else: + # latest published + scoringpipeline = scoringpipelinelist[0] + + return scoringpipeline + + +def copy_output(step_id: str, env: Env): + accounturl = "https://{}.blob.core.windows.net".format( + env.scoring_datastore_storage_name + ) + + srcblobname = "azureml/{}/{}_out/parallel_run_step.txt".format( + step_id, env.scoring_datastore_storage_name + ) + + srcbloburl = "{}/{}/{}".format( + accounturl, env.scoring_datastore_output_container, srcblobname + ) + + containerclient = ContainerClient( + accounturl, + env.scoring_datastore_output_container, + env.scoring_datastore_access_key, + ) + srcblobproperties = containerclient.get_blob_client( + srcblobname + ).get_blob_properties() # noqa E501 + + destfolder = srcblobproperties.last_modified.date().isoformat() + filetime = ( + srcblobproperties.last_modified.time() + .isoformat("milliseconds") + .replace(":", "_") + .replace(".", "_") + ) # noqa E501 + destfilenameparts = env.scoring_datastore_output_filename.split(".") + destblobname = "{}/{}_{}.{}".format( + destfolder, destfilenameparts[0], filetime, destfilenameparts[1] + ) + + destblobclient = containerclient.get_blob_client(destblobname) + destblobclient.start_copy_from_url(srcbloburl) + + +def run_batchscore_pipeline(): + try: + env = Env() + + args = parse_args() + + aml_workspace = Workspace.get( + name=env.workspace_name, + subscription_id=env.subscription_id, + resource_group=env.resource_group, + ) + + scoringpipeline = get_pipeline(args.pipeline_id, aml_workspace, env) + + experiment = Experiment(workspace=aml_workspace, name=env.experiment_name) # NOQA: E501 + + run = experiment.submit( + scoringpipeline, + pipeline_parameters={ + "model_name": env.model_name, + "model_tag_name": " ", + "model_tag_value": " ", + }, + ) + + run.wait_for_completion(show_output=True) + + if run.get_status() == "Finished": + copy_output(list(run.get_steps())[0].id, env) + + except Exception as ex: + print("Error: {}".format(ex)) + + +if __name__ == "__main__": + run_batchscore_pipeline() diff --git a/ml_service/util/attach_compute.py b/ml_service/util/attach_compute.py index bcff58da..ad9668db 100644 --- a/ml_service/util/attach_compute.py +++ b/ml_service/util/attach_compute.py @@ -1,3 +1,4 @@ + from azureml.core import Workspace from azureml.core.compute import AmlCompute from azureml.core.compute import ComputeTarget @@ -5,38 +6,33 @@ from ml_service.util.env_variables import Env -def get_compute( - workspace: Workspace, - compute_name: str, - vm_size: str -): +def get_compute(workspace: Workspace, compute_name: str, vm_size: str, for_batch_scoring: bool = False): # NOQA E501 try: if compute_name in workspace.compute_targets: compute_target = workspace.compute_targets[compute_name] if compute_target and type(compute_target) is AmlCompute: - print('Found existing compute target ' + compute_name - + ' so using it.') + print("Found existing compute target " + compute_name + " so using it.") # NOQA else: e = Env() compute_config = AmlCompute.provisioning_configuration( vm_size=vm_size, - vm_priority=e.vm_priority, - min_nodes=e.min_nodes, - max_nodes=e.max_nodes, + vm_priority=e.vm_priority if not for_batch_scoring else e.vm_priority_scoring, # NOQA E501 + min_nodes=e.min_nodes if not for_batch_scoring else e.min_nodes_scoring, # NOQA E501 + max_nodes=e.max_nodes if not for_batch_scoring else e.max_nodes_scoring, # NOQA E501 idle_seconds_before_scaledown="300" # #Uncomment the below lines for VNet support # vnet_resourcegroup_name=vnet_resourcegroup_name, # vnet_name=vnet_name, # subnet_name=subnet_name ) - compute_target = ComputeTarget.create(workspace, compute_name, - compute_config) + compute_target = ComputeTarget.create( + workspace, compute_name, compute_config + ) compute_target.wait_for_completion( - show_output=True, - min_node_count=None, - timeout_in_minutes=10) + show_output=True, min_node_count=None, timeout_in_minutes=10 + ) return compute_target - except ComputeTargetException as e: - print(e) - print('An error occurred trying to provision compute.') + except ComputeTargetException as ex: + print(ex) + print("An error occurred trying to provision compute.") exit(1) diff --git a/ml_service/util/env_variables.py b/ml_service/util/env_variables.py index 747df5ec..753c152d 100644 --- a/ml_service/util/env_variables.py +++ b/ml_service/util/env_variables.py @@ -11,6 +11,7 @@ class Env: """Loads all environment variables into a predefined set of properties """ + # to load .env file into environment variables for local execution load_dotenv() workspace_name: Optional[str] = os.environ.get("WORKSPACE_NAME") @@ -21,23 +22,27 @@ class Env: app_secret: Optional[str] = os.environ.get("SP_APP_SECRET") vm_size: Optional[str] = os.environ.get("AML_COMPUTE_CLUSTER_CPU_SKU") compute_name: Optional[str] = os.environ.get("AML_COMPUTE_CLUSTER_NAME") - vm_priority: Optional[str] = os.environ.get("AML_CLUSTER_PRIORITY", - 'lowpriority') + vm_priority: Optional[str] = os.environ.get( + "AML_CLUSTER_PRIORITY", "lowpriority" + ) # NOQA: E501 min_nodes: int = int(os.environ.get("AML_CLUSTER_MIN_NODES", 0)) max_nodes: int = int(os.environ.get("AML_CLUSTER_MAX_NODES", 4)) build_id: Optional[str] = os.environ.get("BUILD_BUILDID") pipeline_name: Optional[str] = os.environ.get("TRAINING_PIPELINE_NAME") sources_directory_train: Optional[str] = os.environ.get( - "SOURCES_DIR_TRAIN") + "SOURCES_DIR_TRAIN" + ) # NOQA: E501 train_script_path: Optional[str] = os.environ.get("TRAIN_SCRIPT_PATH") evaluate_script_path: Optional[str] = os.environ.get( - "EVALUATE_SCRIPT_PATH") + "EVALUATE_SCRIPT_PATH" + ) # NOQA: E501 register_script_path: Optional[str] = os.environ.get( - "REGISTER_SCRIPT_PATH") + "REGISTER_SCRIPT_PATH" + ) # NOQA: E501 model_name: Optional[str] = os.environ.get("MODEL_NAME") experiment_name: Optional[str] = os.environ.get("EXPERIMENT_NAME") - model_version: Optional[str] = os.environ.get('MODEL_VERSION') - image_name: Optional[str] = os.environ.get('IMAGE_NAME') + model_version: Optional[str] = os.environ.get("MODEL_VERSION") + image_name: Optional[str] = os.environ.get("IMAGE_NAME") db_cluster_id: Optional[str] = os.environ.get("DB_CLUSTER_ID") score_script: Optional[str] = os.environ.get("SCORE_SCRIPT") build_uri: Optional[str] = os.environ.get("BUILD_URI") @@ -45,8 +50,77 @@ class Env: datastore_name: Optional[str] = os.environ.get("DATASTORE_NAME") dataset_version: Optional[str] = os.environ.get("DATASET_VERSION") run_evaluation: Optional[str] = os.environ.get("RUN_EVALUATION", "true") - allow_run_cancel: Optional[str] = os.environ.get("ALLOW_RUN_CANCEL", - "true") + allow_run_cancel: Optional[str] = os.environ.get( + "ALLOW_RUN_CANCEL", "true" + ) # NOQA: E501 aml_env_name: Optional[str] = os.environ.get("AML_ENV_NAME") + aml_env_train_conda_dep_file: Optional[str] = os.environ.get( + "AML_ENV_TRAIN_CONDA_DEP_FILE", "conda_dependencies.yml" + ) rebuild_env: Optional[bool] = os.environ.get( - "AML_REBUILD_ENVIRONMENT", "false").lower().strip() == "true" + "AML_REBUILD_ENVIRONMENT", "false" + ).lower().strip() == "true" + + use_gpu_for_scoring: Optional[bool] = os.environ.get( + "USE_GPU_FOR_SCORING", "false" + ).lower().strip() == "true" + aml_env_score_conda_dep_file: Optional[str] = os.environ.get( + "AML_ENV_SCORE_CONDA_DEP_FILE", "conda_dependencies_scoring.yml" + ) + aml_env_scorecopy_conda_dep_file: Optional[str] = os.environ.get( + "AML_ENV_SCORECOPY_CONDA_DEP_FILE", "conda_dependencies_scorecopy.yml" + ) + vm_size_scoring: Optional[str] = os.environ.get( + "AML_COMPUTE_CLUSTER_CPU_SKU_SCORING" + ) + compute_name_scoring: Optional[str] = os.environ.get( + "AML_COMPUTE_CLUSTER_NAME_SCORING" + ) + vm_priority_scoring: Optional[str] = os.environ.get( + "AML_CLUSTER_PRIORITY_SCORING", "lowpriority" + ) + min_nodes_scoring: int = int( + os.environ.get("AML_CLUSTER_MIN_NODES_SCORING", 0) + ) # NOQA: E501 + max_nodes_scoring: int = int( + os.environ.get("AML_CLUSTER_MAX_NODES_SCORING", 4) + ) # NOQA: E501 + rebuild_env_scoring: Optional[bool] = os.environ.get( + "AML_REBUILD_ENVIRONMENT_SCORING", "false" + ).lower().strip() == "true" + scoring_datastore_storage_name: Optional[str] = os.environ.get( + "SCORING_DATASTORE_STORAGE_NAME" + ) + scoring_datastore_access_key: Optional[str] = os.environ.get( + "SCORING_DATASTORE_ACCESS_KEY" + ) + scoring_datastore_input_container: Optional[str] = os.environ.get( + "SCORING_DATASTORE_INPUT_CONTAINER" + ) + scoring_datastore_input_filename: Optional[str] = os.environ.get( + "SCORING_DATASTORE_INPUT_FILENAME" + ) + scoring_datastore_output_container: Optional[str] = os.environ.get( + "SCORING_DATASTORE_OUTPUT_CONTAINER" + ) + scoring_datastore_output_filename: Optional[str] = os.environ.get( + "SCORING_DATASTORE_OUTPUT_FILENAME" + ) + scoring_dataset_name: Optional[str] = os.environ.get( + "SCORING_DATASET_NAME" + ) # NOQA: E501 + scoring_pipeline_name: Optional[str] = os.environ.get( + "SCORING_PIPELINE_NAME" + ) # NOQA: E501 + aml_env_name_scoring: Optional[str] = os.environ.get( + "AML_ENV_NAME_SCORING" + ) # NOQA: E501 + aml_env_name_score_copy: Optional[str] = os.environ.get( + "AML_ENV_NAME_SCORE_COPY" + ) # NOQA: E501 + batchscore_script_path: Optional[str] = os.environ.get( + "BATCHSCORE_SCRIPT_PATH" + ) # NOQA: E501 + batchscore_copy_script_path: Optional[str] = os.environ.get( + "BATCHSCORE_COPY_SCRIPT_PATH" + ) # NOQA: E501 diff --git a/ml_service/util/manage_environment.py b/ml_service/util/manage_environment.py index 43749f3f..54c5a72f 100644 --- a/ml_service/util/manage_environment.py +++ b/ml_service/util/manage_environment.py @@ -1,12 +1,17 @@ + +import os from azureml.core import Workspace, Environment from ml_service.util.env_variables import Env -import os +from azureml.core.runconfig import DEFAULT_CPU_IMAGE, DEFAULT_GPU_IMAGE def get_environment( workspace: Workspace, environment_name: str, - create_new: bool = False + conda_dependencies_file: str, + create_new: bool = False, + enable_docker: bool = None, + use_gpu: bool = False ): try: e = Env() @@ -17,8 +22,14 @@ def get_environment( restored_environment = environments[environment_name] if restored_environment is None or create_new: - new_env = Environment.from_conda_specification(environment_name, os.path.join(e.sources_directory_train, "conda_dependencies.yml")) # NOQA: E501 + new_env = Environment.from_conda_specification( + environment_name, + os.path.join(e.sources_directory_train, conda_dependencies_file), # NOQA: E501 + ) # NOQA: E501 restored_environment = new_env + if enable_docker is not None: + restored_environment.docker.enabled = enable_docker + restored_environment.docker.base_image = DEFAULT_GPU_IMAGE if use_gpu else DEFAULT_CPU_IMAGE # NOQA: E501 restored_environment.register(workspace) if restored_environment is not None: