From 463c925b34d2e870bd9e9c0d2327b53152e3eaaf Mon Sep 17 00:00:00 2001 From: Yuhang Liu Date: Mon, 18 Dec 2023 17:11:17 +0800 Subject: [PATCH 01/12] use pfazure submit flow run --- .github/workflows/promptflow-ci.yml | 8 +- scripts/promptflow-ci/promptflow_ci.py | 327 +++--------------- scripts/promptflow-ci/utils/authentication.py | 22 -- scripts/promptflow-ci/utils/flow_utils.py | 258 ++++---------- scripts/promptflow-ci/utils/mt_client.py | 131 ------- scripts/promptflow-ci/utils/retry_helper.py | 47 --- scripts/promptflow-ci/utils/utils.py | 2 +- 7 files changed, 111 insertions(+), 684 deletions(-) delete mode 100644 scripts/promptflow-ci/utils/authentication.py delete mode 100644 scripts/promptflow-ci/utils/mt_client.py delete mode 100644 scripts/promptflow-ci/utils/retry_helper.py diff --git a/.github/workflows/promptflow-ci.yml b/.github/workflows/promptflow-ci.yml index 734422f5b3..5eb139b870 100644 --- a/.github/workflows/promptflow-ci.yml +++ b/.github/workflows/promptflow-ci.yml @@ -6,9 +6,9 @@ on: branches: - main paths: - - 'assets/promptflow/models/**' - - '.github/workflows/promptflow_ci.yml' - - 'scripts/promptflow_ci/**' + - assets/promptflow/models/** + - .github/workflows/promptflow-ci.yml + - scripts/promptflow-ci/** env: PROMPTFLOW_DIR: "assets/promptflow/models" @@ -72,7 +72,7 @@ jobs: - name: Validate prompt flows run: | - python scripts/promptflow-ci/promptflow_ci.py --local + python scripts/promptflow-ci/promptflow_ci.py - name: Run cspell for typo check working-directory: ${{ env.PROMPTFLOW_DIR }} diff --git a/scripts/promptflow-ci/promptflow_ci.py b/scripts/promptflow-ci/promptflow_ci.py index 684ef77e84..2581a026f5 100644 --- a/scripts/promptflow-ci/promptflow_ci.py +++ b/scripts/promptflow-ci/promptflow_ci.py @@ -7,29 +7,22 @@ import os from pathlib import Path import yaml -import tempfile import json -import shutil -from markdown import markdown -from bs4 import BeautifulSoup import time import copy from azureml.core import Workspace from utils.utils import get_diff_files -from utils.logging_utils import log_debug, log_error, log_warning, debug_output +from utils.logging_utils import log_debug, log_error, log_warning from azure.storage.blob import BlobServiceClient -from utils.mt_client import get_mt_client -from promptflow.azure import PFClient -from azure.identity import AzureCliCredential, DefaultAzureCredential +from azure.identity import AzureCliCredential from utils import flow_utils -from promptflow.azure._load_functions import load_flow -from promptflow._sdk._utils import is_remote_uri TEST_FOLDER = "test" MODEL_FILE = "model.yaml" MODELS_ROOT = "assets/promptflow/models/" +RUN_YAML = 'run.yml' def validate_downlaod(model_dir): @@ -85,6 +78,9 @@ def get_changed_models(diff_files): changed_models.append(os.path.join( MODELS_ROOT, git_diff_file_path.split("/")[-2])) + changed_models = ('assets/promptflow/models/template-chat-flow', + 'assets/promptflow/models/template-eval-flow', 'assets/promptflow/models/template-standard-flow') + log_debug( f"Find {len(deleted_models_path)} deleted models: {deleted_models_path}.") log_debug(f"Find {len(changed_models)} changed models: {changed_models}.") @@ -113,175 +109,30 @@ def _dump_workspace_config(subscription_id, resource_group, workspace_name): config_file.write(json.dumps(workspace_config, indent=4)) -def create_flows(flows_dirs): - """Create flows from flows dir.""" - # key: flow dir name, value: (flow graph, flow create result, flow type) - flows_creation_info = {} - flow_validation_errors = [] - for flow_dir in flows_dirs: - log_debug(f"\nChecking flow dir: {flow_dir}") - with open(Path(flow_dir) / "flow.dag.yaml", "r") as dag_file: - flow_dag = yaml.safe_load(dag_file) - with open(Path(flow_dir) / "flow.meta.yaml", "r") as meta_file: - flow_meta = yaml.safe_load(meta_file) - flow_type = flow_meta["type"] - - flow_utils._validate_meta(flow_meta, flow_dir) - - # check if the flow.dag.yaml exits - if not os.path.exists(Path(flow_dir) / "flow.dag.yaml"): - log_warning( - f"flow.dag.yaml not found in {flow_dir}. Skip this flow.") - continue - - section_type = "gallery" - if "properties" in flow_meta.keys() and "promptflow.section" in flow_meta["properties"].keys(): - section_type = flow_meta["properties"]["promptflow.section"] - # check if the README.md exits - # skip checking README exists due to template flows don't have README.md. - if section_type != "template": - if not os.path.exists(Path(flow_dir) / "README.md"): - flow_validation_errors.append( - f"README.md not found in {flow_dir}. Please add README.md to the flow.") - continue - else: - # Check Links in Markdown Files of Flows, - # make sure it opens a new browser tab instead of refreshing the current page. - def extract_links_from_file(file_path): - with open(file_path, "r") as file: - content = file.read() - html = markdown(content) - soup = BeautifulSoup(html, "html.parser") - return soup - - def check_links(soup): - valid_links = True - links = soup.find_all("a") - for link in links: - if link.get("target") != "_blank": - log_debug(f'Invalid link syntax: {link}') - valid_links = False - return valid_links - - readme_file = os.path.join(flow_dir, "README.md") - log_debug(f"Checking links in {readme_file}") - soup = extract_links_from_file(readme_file) - valid_links = check_links(soup) - if not valid_links: - flow_validation_errors.append( - f"Some links in {flow_dir}'s README file do not follow the required syntax. " - "To ensure that links in the flow's README file open in a new browser tab " - "instead of refreshing the current page when users view the sample introduction, " - "please use the following syntax: link text." - ) - continue - # Call MT to create flow - log_debug( - f"Starting to create/update flow. Flow dir: {Path(flow_dir).name}.") - flow = load_flow(source=flow_dir) - properties = flow_meta.get("properties", None) - if properties and "promptflow.batch_inputs" in properties: - input_path = properties["promptflow.batch_inputs"] - samples_file = Path(flow_dir) / input_path - if samples_file.exists(): - with open(samples_file, "r", encoding="utf-8") as fp: - properties["update_promptflow.batch_inputs"] = json.loads( - fp.read()) - - flow_operations._resolve_arm_id_or_upload_dependencies_to_file_share( - flow) - log_debug(f"FlowDefinitionFilePath: {flow.path}") - - create_flow_payload = flow_utils.construct_create_flow_payload_of_new_contract( - flow, flow_meta, properties) - debug_output(create_flow_payload, "create_flow_payload", - Path(flow_dir).name, args.local) - create_flow_result = mt_client.create_or_update_flow( - create_flow_payload) - experiment_id = create_flow_result["experimentId"] - flows_creation_info.update({Path(flow_dir).name: ( - flow_dag, create_flow_result, flow_type, section_type)}) - - if create_flow_result['flowId'] is None: - raise Exception( - f"Flow id is None when creating/updating mode {flow_dir}. Please make sure the flow is valid") - debug_output(create_flow_result, "create_flow_result", - Path(flow_dir).name, args.local) - flow_link = flow_utils.get_flow_link(create_flow_result, ux_endpoint, args.subscription_id, - args.resource_group, args.workspace_name, experiment_id, args.ux_flight) - log_debug(f"Flow link to Azure Machine Learning Portal: {flow_link}") - - if len(flow_validation_errors) > 0: - log_debug( - "Promptflow CI failed due to the following flow validation errors:", True) - for failure in flow_validation_errors: - log_error(failure) - exit(1) - return flows_creation_info - - -def _resolve_data_to_asset_id(test_data): - """Resolve data to asset id.""" - from azure.ai.ml._artifacts._artifact_utilities import _upload_and_generate_remote_uri - from azure.ai.ml.constants._common import AssetTypes - - def _get_data_type(_data): - if os.path.isdir(_data): - return AssetTypes.URI_FOLDER - else: - return AssetTypes.URI_FILE - - if is_remote_uri(test_data): - # Pass through ARM id or remote url - return test_data - - if os.path.exists(test_data): # absolute local path, upload, transform to remote url - data_type = _get_data_type(test_data) - test_data = _upload_and_generate_remote_uri( - run_operations._operation_scope, - run_operations._datastore_operations, - test_data, - datastore_name=run_operations._workspace_default_datastore, - show_progress=run_operations._show_progress, - ) - if data_type == AssetTypes.URI_FOLDER and test_data and not test_data.endswith("/"): - test_data = test_data + "/" - else: - raise ValueError( - f"Local path {test_data!r} not exist. " - "If it's remote data, only data with azureml prefix or remote url is supported." - ) - return test_data - - def check_flow_run_status( flow_runs_to_check, - submitted_run_identifiers, + submitted_flow_run_links, + submitted_flow_run_ids, check_run_status_interval, check_run_status_max_attempts ): """Check flow run status.""" - for flow_run_identifier in flow_runs_to_check: - flow_id, flow_run_id = flow_utils.resolve_flow_run_identifier( - flow_run_identifier) - flow_run_link = flow_utils.construct_flow_run_link(ux_endpoint, args.subscription_id, - args.resource_group, args.workspace_name, - experiment_id, flow_id, flow_run_id) - log_debug(f"Start checking flow run {flow_run_id} run, link to Azure Machine Learning Portal: " - f"{flow_run_link}") + for flow_run_id, flow_run_link in zip(flow_runs_to_check, submitted_flow_run_links): + log_debug( + f"Start checking flow run {flow_run_id} run, {flow_run_link}") current_attempt = 0 while current_attempt < check_run_status_max_attempts: bulk_test_run = run_workspace.get_run(run_id=flow_run_id) if bulk_test_run.status == "Completed": - submitted_run_identifiers.remove(flow_run_identifier) + submitted_flow_run_ids.remove(flow_run_id) break elif bulk_test_run.status == "Failed": - submitted_run_identifiers.remove(flow_run_identifier) - failed_flow_runs.update({flow_run_identifier: flow_run_link}) + submitted_flow_run_ids.remove(flow_run_id) + failed_flow_runs.update({flow_run_id: flow_run_link}) break elif bulk_test_run.status == "Canceled": - submitted_run_identifiers.remove(flow_run_identifier) - failed_flow_runs.update({flow_run_identifier: flow_run_link}) + submitted_flow_run_ids.remove(flow_run_id) + failed_flow_runs.update({flow_run_id: flow_run_link}) break current_attempt += 1 @@ -306,16 +157,11 @@ def check_flow_run_status( default="https://eastus.api.azureml.ms/flow") parser.add_argument('--flow_submit_mode', type=str, default="sync") parser.add_argument('--run_time', type=str, default="default-mir") - parser.add_argument('--skipped_flows', type=str, default="bring_your_own_data_qna") + parser.add_argument('--skipped_flows', type=str, + default="bring_your_own_data_qna,playground_ayod_rag,template_chat_flow") # Skip bring_your_own_data_qna test, the flow has a bug. # Bug 2773738: Add retry when ClientAuthenticationError # https://msdata.visualstudio.com/Vienna/_workitems/edit/2773738 - parser.add_argument( - "--local", - help="local debug mode, will use interactive login authentication, and output the request " - "response to local files", - action='store_true' - ) args = parser.parse_args() # Get changed models folder or all models folder @@ -332,9 +178,15 @@ def check_flow_run_status( log_error(f"No change in {MODELS_ROOT}, skip flow testing.") exit(0) + if args.skipped_flows != "": + skipped_flows = args.skipped_flows.split(",") + skipped_flows = [flow.replace("_", "-") for flow in skipped_flows] + log_debug(f"Skipped flows: {skipped_flows}") + flows_dirs = [flow_dir for flow_dir in changed_models if Path( + flow_dir).name not in skipped_flows] # Check download models errors = [] - for model_dir in changed_models: + for model_dir in flows_dirs: try: validate_downlaod(model_dir) except Exception as e: @@ -349,14 +201,6 @@ def check_flow_run_status( # Check run flows handled_failures = [] - # Filter out skipped flows - if args.skipped_flows != "": - skipped_flows = args.skipped_flows.split(",") - skipped_flows = [flow.replace("_", "-") for flow in skipped_flows] - log_debug(f"Skipped flows: {skipped_flows}") - flows_dirs = [flow_dir for flow_dir in changed_models if Path( - flow_dir).name not in skipped_flows] - flows_dirs = [Path(os.path.join(dir, TEST_FOLDER)) for dir in flows_dirs] @@ -365,104 +209,33 @@ def check_flow_run_status( log_debug("No flow code change, skip flow testing.") exit(0) - ux_endpoint = args.ux_endpoint - runtime_name = args.run_time - - mt_client = get_mt_client( - args.subscription_id, - args.resource_group, - args.workspace_name, - args.tenant_id, - args.client_id, - args.client_secret, - args.local, - args.mt_service_route - ) - _dump_workspace_config(args.subscription_id, args.resource_group, args.workspace_name) - credential = DefaultAzureCredential(additionally_allowed_tenants=[ - "*"], exclude_shared_token_cache_credential=True) - pf_client = PFClient.from_config(credential=credential) - flow_operations = pf_client._flows - run_operations = pf_client._runs - # region: Step1. create/update flows, and store flow creation info in flows_creation_info + # region: Step1. create/update flow yamls, run yamls. try: - # add node variant for llm node - tmp_folder_path = Path(tempfile.mkdtemp()) - log_debug(f"tmp folder path: {tmp_folder_path}") - flows_dirs = flow_utils._assign_flow_values( - flows_dirs, tmp_folder_path) - - flows_creation_info = create_flows(flows_dirs) + flows_dirs = flow_utils._assign_flow_values(flows_dirs) + flow_utils._create_run_yamls(flows_dirs) except Exception as e: log_error("Error when creating flow") raise e - finally: - shutil.rmtree(tmp_folder_path) # endregion - # region: Step2. submit bulk test runs and evaluation flow bulk test runs asynchronously based on the - # flows_creation_info - submitted_flow_run_identifiers = set() - submit_interval = 2 # seconds - for flow_dir_name, creation_info in flows_creation_info.items(): - time.sleep(submit_interval) - flow_dir = Path(os.path.join(MODELS_ROOT, flow_dir_name, TEST_FOLDER)) - flow_dag = creation_info[0] - flow_create_result = creation_info[1] - flow_type = creation_info[2] - section_type = creation_info[3] - flow_id = flow_create_result['flowId'] - flow_resource_id = flow_create_result["flowResourceId"] - flow_name = flow_create_result["flowName"] - # Call MT to submit flow - # Skip template flow - if (section_type == 'template'): - log_debug(f"Skipped template flow: {flow_dir}. Flow id: {flow_id}") - continue - sample_path = flow_dir / "samples.json" - log_debug(f"Sample input file path: {sample_path}") - if not sample_path.exists(): - raise Exception( - f"Sample input file path doesn't exist when submitting flow {flow_dir}") - batch_data_inputs = _resolve_data_to_asset_id(sample_path) - log_debug( - f"\nStarting to submit bulk test run. Flow dir: {flow_dir_name}. Flow id: {flow_id}") - submit_flow_payload = flow_utils.construct_submit_flow_payload_of_new_contract( - flow_id, batch_data_inputs, runtime_name, flow_dag, args.flow_submit_mode - ) - - experiment_id = flow_create_result["experimentId"] - try: - submit_flow_result, flow_run_id, _ = mt_client.submit_flow( - submit_flow_payload, experiment_id) - bulk_test_run_id = submit_flow_result["bulkTestId"] - flow_run_ids = flow_utils.get_flow_run_ids(submit_flow_result) - except Exception as e: - failure_message = f"Submit bulk test run failed. Flow dir: {flow_dir}. Flow id: {flow_id}. Error: {e}" - log_error(failure_message) - handled_failures.append(failure_message) - else: - debug_output(submit_flow_result, "submit_flow_result", - flow_dir.name, args.local) - log_debug( - f"All the flow run links for bulk test: {bulk_test_run_id}") - for run_id in flow_run_ids: - submitted_flow_run_identifiers.add( - flow_utils.create_flow_run_identifier(flow_id, run_id)) - flow_run_link = flow_utils.get_flow_run_link(submit_flow_result, ux_endpoint, - args.subscription_id, args.resource_group, - args.workspace_name, experiment_id, run_id) - log_debug( - f"Flow run link for run {run_id} to Azure Machine Learning Portal: {flow_run_link}") + # region: Step2. submit flow runs using pfazure + submitted_flow_run_ids = [] + submitted_flow_run_links = [] + results, handled_failures = flow_utils.submit_flow_runs_using_pfazure(flows_dirs, args.subscription_id, + args.resource_group, args.workspace_name) + for key, val in results.items(): + submitted_flow_run_ids.append(key) + submitted_flow_run_links.append(val) # endregion # region: Step3. check the submitted run status check_run_status_interval = 30 # seconds check_run_status_max_attempts = 30 # times - flow_runs_count = len(submitted_flow_run_identifiers) + flow_runs_count = len(submitted_flow_run_ids) + flow_runs_to_check = copy.deepcopy(submitted_flow_run_ids) failed_flow_runs = {} # run key : flow_run_link failed_evaluation_runs = {} # run key : evaluation_run_link if flow_runs_count == 0: @@ -470,17 +243,16 @@ def check_flow_run_status( "\nNo bulk test run or bulk test evaluation run need to check status") run_workspace = Workspace.get( - name=run_operations._operation_scope.workspace_name, - subscription_id=run_operations._operation_scope.subscription_id, - resource_group=run_operations._operation_scope.resource_group_name, + name=args.workspace_name, + subscription_id=args.subscription_id, + resource_group=args.resource_group, ) - flow_runs_to_check = copy.deepcopy(submitted_flow_run_identifiers) log_debug(f"\n{flow_runs_count} bulk test runs need to check status.") - check_flow_run_status(flow_runs_to_check, submitted_flow_run_identifiers, + check_flow_run_status(flow_runs_to_check, submitted_flow_run_links, submitted_flow_run_ids, check_run_status_interval, check_run_status_max_attempts) - if len(submitted_flow_run_identifiers) > 0: + if len(submitted_flow_run_ids) > 0: failure_message = f"Not all bulk test runs or bulk test evaluation runs are completed after " \ f"{check_run_status_max_attempts} attempts. " \ f"Please check the run status on Azure Machine Learning Portal." @@ -491,12 +263,7 @@ def check_flow_run_status( else: handled_failures.append(failure_message) - for flow_run_identifier in submitted_flow_run_identifiers: - flow_id, flow_run_id = flow_utils.resolve_flow_run_identifier( - flow_run_identifier) - flow_run_link = flow_utils.construct_flow_run_link(ux_endpoint, args.subscription_id, - args.resource_group, args.workspace_name, - experiment_id, flow_id, flow_run_id) + for flow_run_id, flow_run_link in submitted_flow_run_ids.items(): log_debug( f"Flow run link for run {flow_run_id} to Azure Machine Learning Portal: {flow_run_link}") @@ -505,13 +272,13 @@ def check_flow_run_status( "Please check the run error on Azure Machine Learning Portal." log_error(failure_message, True) handled_failures.append(failure_message) - for flow_run_key, flow_run_link in failed_flow_runs.items(): + for flow_run_id, flow_run_link in failed_flow_runs.items(): log_error( f"Bulk test run link to Azure Machine Learning Portal: {flow_run_link}") - for flow_run_key, evaluation_run_link in failed_evaluation_runs.items(): + for flow_run_id, evaluation_run_link in failed_evaluation_runs.items(): log_error( f"Bulk test evaluation run link to Azure Machine Learning Portal: {evaluation_run_link}") - elif len(submitted_flow_run_identifiers) == 0: + elif len(submitted_flow_run_ids) == 0: log_debug( f"\nRun status checking completed. {flow_runs_count} flow runs completed.") # Fail CI if there are failures. diff --git a/scripts/promptflow-ci/utils/authentication.py b/scripts/promptflow-ci/utils/authentication.py deleted file mode 100644 index 70c25a098f..0000000000 --- a/scripts/promptflow-ci/utils/authentication.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -"""Auth utils.""" - -import uuid - -from azureml.core.authentication import AzureCliAuthentication - - -def get_azure_cli_authentication_header(request_id=None): - """Get login auth header.""" - interactive_auth = AzureCliAuthentication() - header = interactive_auth.get_authentication_header() - if request_id is None: - request_id = str(uuid.uuid4()) - # add request id to header for tracking - header["x-ms-client-request-id"] = request_id - header["Content-Type"] = "application/json" - header["Accept"] = "application/json" - - return header diff --git a/scripts/promptflow-ci/utils/flow_utils.py b/scripts/promptflow-ci/utils/flow_utils.py index 878d77ab2b..848ab709b0 100644 --- a/scripts/promptflow-ci/utils/flow_utils.py +++ b/scripts/promptflow-ci/utils/flow_utils.py @@ -4,76 +4,27 @@ """Flow utils.""" import os -import random import re -from datetime import datetime from pathlib import Path -import shutil -from unittest import mock import yaml +import concurrent.futures -from utils.logging_utils import log_error, log_debug +from utils.logging_utils import log_debug, log_error +from utils.utils import run_command -def create_flow_run_identifier(flow_id, flow_run_id): - """Generate the global unique flow run identifier.""" - return f"{flow_id}:{flow_run_id}" - - -def resolve_flow_run_identifier(flow_run_identifier): - """Resolve the flow run identifier to flow id and flow run id.""" - return flow_run_identifier.split(":")[0], flow_run_identifier.split(":")[1] - - -def _validate_meta(meta, flow_dir): - """Validate meta type.""" - if meta["type"] not in ["standard", "evaluate", "chat", "rag"]: - raise ValueError(f"Unknown type in meta.json. model dir: {flow_dir}.") - stage = meta["properties"]["promptflow.stage"] - if stage not in ["test", "prod", "disabled"]: - raise ValueError(f"Unknown stage in meta.json. flow dir: {flow_dir}.") - - -def _general_copy(src, dst, make_dirs=True): - """Call _copy to copy.""" - if make_dirs: - os.makedirs(os.path.dirname(dst), exist_ok=True) - if hasattr(os, "listxattr"): - with mock.patch("shutil._copyxattr", return_value=[]): - shutil.copy2(src, dst) - else: - shutil.copy2(src, dst) - - -def _copy(src: Path, dst: Path) -> None: - """Copy files.""" - if not src.exists(): - raise ValueError(f"Path {src} does not exist.") - if src.is_file(): - _general_copy(src, dst) - if src.is_dir(): - for name in src.glob("*"): - _copy(name, dst / name.name) - - -def _assign_flow_values(flow_dirs, tmp_folder_path): +def _assign_flow_values(flow_dirs): """Assign the flow values and update flow.dag.yaml.""" log_debug("\n=======Start overriding values for flows=======") - updated_bulk_test_main_flows_dirs = [] - for flow_dir in flow_dirs: - dst_path = (tmp_folder_path / flow_dir.parents[0].name).resolve() - _copy(Path(flow_dir), dst_path) - log_debug(dst_path) - updated_bulk_test_main_flows_dirs.append(dst_path) - for flow_dir in updated_bulk_test_main_flows_dirs: + for flow_dir in flow_dirs: flow_dir_name = flow_dir.name flow_dir_name = flow_dir_name.replace("-", "_") with open(Path(flow_dir) / "flow.dag.yaml", "r") as dag_file: flow_dag = yaml.safe_load(dag_file) # Override connection/inputs in nodes - log_debug(f"Start overriding values for nodes for '{flow_dir.name}'.") + log_debug(f"Start overriding values for nodes for '{flow_dir}'.") for flow_node in flow_dag["nodes"]: if "connection" in flow_node: flow_node["connection"] = "aoai_connection" @@ -88,150 +39,59 @@ def _assign_flow_values(flow_dirs, tmp_folder_path): with open(flow_dir / "flow.dag.yaml", "w", encoding="utf-8") as dag_file: yaml.dump(flow_dag, dag_file, allow_unicode=True) log_debug("=======Complete overriding values for flows=======\n") - return updated_bulk_test_main_flows_dirs - - -def construct_create_flow_payload_of_new_contract(flow, flow_meta, properties): - """Construct create flow payload.""" - flow_type = flow_meta.get("type", None) - if flow_type: - mapping = { - "standard": "default", - "evaluate": "evaluation", - "chat": "chat", - "rag": "rag" - } - flow_type = mapping[flow_type] - - return { - "flowName": flow_meta.get("display_name", None), - "description": flow_meta.get("description", None), - "tags": flow_meta.get("tags", None), - "flowType": flow_type, - "details": properties.get("promptflow.details.source", None) if properties else None, - "flowRunSettings": { - "batch_inputs": properties.get("update_promptflow.batch_inputs", None) if properties else None, - }, - "flowDefinitionFilePath": flow.path, - "isArchived": False, - } + return flow_dirs -def construct_submit_flow_payload_of_new_contract( - flow_id, - batch_data_inputs, - runtime_name, - flow_dag, - flow_submit_mode -): - """Construct submit flow payload.""" - flow_run_id = f"run_{datetime.now().strftime('%Y%m%d%H%M%S')}_{random.randint(100000, 999999)}" - tuning_node_names = [node["name"] - for node in flow_dag["nodes"] if "use_variants" in node] - submit_flow_payload = { - "flowId": flow_id, - "flowRunId": flow_run_id, - "flowSubmitRunSettings": { - "runtimeName": runtime_name, - "runMode": "BulkTest", - "batchDataInput": {"dataUri": batch_data_inputs}, - # Need to populate this field for the LLM node with variants - "tuningNodeNames": tuning_node_names, - }, - "asyncSubmission": True if flow_submit_mode == "async" else False, - "useWorkspaceConnection": True, - "useFlowSnapshotToSubmit": True, +def _create_run_yamls(flow_dirs): + log_debug("\n=======Start creating run.yaml for flows=======") + run_yaml = { + "$schema": "https://azuremlschemas.azureedge.net/promptflow/latest/Run.schema.json", + "flow": '.', + "data": 'samples.json' } - return submit_flow_payload - - -def construct_flow_link(aml_resource_uri, subscription, resource_group, workspace, experiment_id, flow_id, ux_flight): - """Construct flow link.""" - flow_link_format = ( - "{aml_resource_uri}/prompts/flow/{experiment_id}/{flow_id}/details?wsid=/subscriptions/" - "{subscription}/resourceGroups/{resource_group}/providers/Microsoft.MachineLearningServices/" - "workspaces/{workspace}&flight={ux_flight}" - ) - return flow_link_format.format( - aml_resource_uri=aml_resource_uri, - subscription=subscription, - resource_group=resource_group, - workspace=workspace, - experiment_id=experiment_id, - flow_id=flow_id, - ux_flight=ux_flight, - ) - - -def get_flow_link(create_flow_response_json, aml_resource_uri, subscription, resource_group, workspace, experiment_id, - ux_flight): - """Get flow link.""" - flow_id = create_flow_response_json["flowId"] - return construct_flow_link(aml_resource_uri, subscription, resource_group, workspace, experiment_id, flow_id, - ux_flight) - - -def get_flow_run_ids(bulk_test_response_json): - """Get flow run ids from response.""" - bulk_test_id = bulk_test_response_json["bulkTestId"] - flow_run_logs = bulk_test_response_json["flowRunLogs"] - flow_run_ids = [run_id for run_id in list( - flow_run_logs.keys()) if run_id != bulk_test_id] - log_debug(f"flow_run_ids in utils: {flow_run_ids}") - return flow_run_ids - - -def construct_flow_run_link( - aml_resource_uri, subscription, resource_group, workspace, experiment_id, flow_id, flow_run_id -): - """Construct flow run link.""" - bulk_test_run_link_format = ( - "{aml_resource_uri}/prompts/flow/{experiment_id}/{flow_id}/run/{flow_run_id}/details?wsid=/" - "subscriptions/{subscription}/resourceGroups/{resource_group}/providers/" - "Microsoft.MachineLearningServices/workspaces/{workspace}&flight=promptflow" - ) - return bulk_test_run_link_format.format( - aml_resource_uri=aml_resource_uri, - subscription=subscription, - resource_group=resource_group, - workspace=workspace, - experiment_id=experiment_id, - flow_id=flow_id, - flow_run_id=flow_run_id, - ) - - -def get_flow_run_link( - bulk_test_response_json, aml_resource_uri, subscription, resource_group, workspace, experiment_id, flow_run_id -): - """Get flow run link.""" - flow_run_resource_id = bulk_test_response_json["flowRunResourceId"] - flow_id, _ = _resolve_flow_run_resource_id(flow_run_resource_id) - link = construct_flow_run_link( - aml_resource_uri=aml_resource_uri, - subscription=subscription, - resource_group=resource_group, - workspace=workspace, - experiment_id=experiment_id, - flow_id=flow_id, - flow_run_id=flow_run_id, - ) - return link - - -def _resolve_flow_run_resource_id(flow_run_resource_id): - """Get flow id and flow run id from flow run resource id.""" - if flow_run_resource_id.startswith("azureml://"): - flow_run_resource_id = flow_run_resource_id[len("azureml://"):] - elif flow_run_resource_id.startswith("azureml:/"): - flow_run_resource_id = flow_run_resource_id[len("azureml:/"):] - - pairs = re.findall(r"([^\/]+)\/([^\/]+)", flow_run_resource_id) - flows = [pair for pair in pairs if pair[0] == "flows"] - flow_runs = [pair for pair in pairs if pair[0] == "flowRuns"] - if len(flows) == 0 or len(flow_runs) == 0: - log_error( - f"Resolve flow run resource id [{flow_run_resource_id}] failed") - return None, None - else: - return flows[0][1], flow_runs[0][1] + for flow_dir in flow_dirs: + flow_dir_name = flow_dir.name + flow_dir_name = flow_dir_name.replace("-", "_") + with open(flow_dir / "run.yml", "w", encoding="utf-8") as dag_file: + yaml.dump(run_yaml, dag_file, allow_unicode=True) + log_debug("=======Complete creating run.yaml for flows=======\n") + return + + +def submit_func(run_path, sub, rg, ws): + command = f"pfazure run create --file {run_path} --subscription {sub} -g {rg} -w {ws}" + res = run_command(command) + res = res.stdout.split('\n') + return res + + +def get_run_id_and_url(res): + run_id = "" + portal_url = "" + for line in res: + log_debug(line) + if ('"portal_url":' in line): + match = re.search(r'/run/(.*?)/details', line) + if match: + portal_url = line.strip() + run_id = match.group(1) + log_debug(f"runId: {run_id}") + return run_id, portal_url + + +def submit_flow_runs_using_pfazure(flow_dirs, sub, rg, ws): + results = {} + handled_failures = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: + futures = {executor.submit(submit_func, os.path.join(flow_dir, 'run.yml'), sub, rg, ws): flow_dir for flow_dir in flow_dirs} + for future in concurrent.futures.as_completed(futures): + try: + flow_dir = futures[future] + res = future.result() + run_id, portal_url = get_run_id_and_url(res) + results[run_id] = portal_url + except Exception as exc: + failure_message = f"Submit test run failed. Flow dir: {flow_dir}. Error: {exc}." + log_error(failure_message) + handled_failures.append(failure_message) + return results, handled_failures diff --git a/scripts/promptflow-ci/utils/mt_client.py b/scripts/promptflow-ci/utils/mt_client.py deleted file mode 100644 index 5721998dbf..0000000000 --- a/scripts/promptflow-ci/utils/mt_client.py +++ /dev/null @@ -1,131 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -"""MTClient class.""" - -from datetime import datetime - -import requests - -from .authentication import get_azure_cli_authentication_header -from .retry_helper import retry_helper -from .logging_utils import log_debug - - -class MTClient: - """MTClient class.""" - - flow_api_endpoint = "{MTServiceRoute}/api/subscriptions/{SubscriptionId}/resourceGroups/{ResourceGroupName}/" \ - "providers/Microsoft.MachineLearningServices/workspaces/{WorkspaceName}/flows" - create_flow_api_format = "{0}/?experimentId={1}" - create_flow_from_sample_api_format = "{0}/fromsample?experimentId={1}" - list_flows_api_format = "{0}/?experimentId={1}&ownedOnly={2}&flowType={3}" - submit_flow_api_format = "{0}/submit?experimentId={1}&endpointName={2}" - submit_flow_api_without_endpoint_name_format = "{0}/submit?experimentId={1}" - list_flow_runs_api_format = "{0}/{1}/runs?experimentId={2}&bulkTestId={3}" - get_flow_run_status_api_format = "{0}/{1}/{2}/status?experimentId={3}" - list_bulk_tests_api_format = "{0}/{1}/bulkTests" - get_bulk_tests_api_format = "{0}/{1}/bulkTests/{2}" - deploy_flow_api_format = "{0}/deploy?asyncCall={1}" - get_samples_api_format = "{0}/samples" - - def __init__(self, mt_service_route, - subscription_id, resource_group_name, workspace_name, - tenant_id=None, client_id=None, client_secret=None): - """MT Client init.""" - self.mt_service_route = mt_service_route - self.subscription_id = subscription_id - self.resource_group_name = resource_group_name - self.workspace_name = workspace_name - self.api_endpoint = self.flow_api_endpoint.format( - MTServiceRoute=self.mt_service_route, - SubscriptionId=self.subscription_id, - ResourceGroupName=self.resource_group_name, - WorkspaceName=self.workspace_name) - self.tenant_id = tenant_id - self.client_id = client_id - self.client_secret = client_secret - - def _request(self, method, url, **kwargs): - """MT client request call.""" - header = get_azure_cli_authentication_header() - - resp = method(url, **{**kwargs, "headers": header}) - if method.__name__ == "post": - log_debug( - f"[Request] {method.__name__} API Request id: {header['x-ms-client-request-id']}") - if resp.status_code != 200: - raise requests.exceptions.HTTPError( - f"{method.__name__} on url {url} failed with status code [{resp.status_code}. Error: {resp.json()}].", - response=resp) - return resp.json() - - def _get(self, url): - """Mt client get request.""" - return self._request(requests.get, url) - - def _post(self, url, json_body): - """Mt client post request.""" - return self._request(requests.post, url, json=json_body) - - @retry_helper() - def create_or_update_flow(self, json_body): - """Create flow.""" - url = self.api_endpoint - result = self._post(url, json_body) - return result - - @retry_helper() - def create_flow_from_sample(self, json_body, experiment_id): - """Create flow from sample json.""" - url = self.create_flow_from_sample_api_format.format( - self.api_endpoint, experiment_id) - result = self._post(url, json_body) - return result - - @retry_helper() - def submit_flow(self, json_body, experiment_id): - """Submit flow with a created flow run id or evaluation flow run id.""" - url = self.submit_flow_api_without_endpoint_name_format.format( - self.api_endpoint, experiment_id) - # We need to update flow run id in case retry happens, submit same json body with same flow run id will cause - # 409 error. - # Update flow run id - flow_run_id = f"run_{datetime.now().strftime('%Y%m%d%H%M%S')}" - evaluation_run_id = f"evaluate_{datetime.now().strftime('%Y%m%d%H%M%S')}" - json_body['flowRunId'] = flow_run_id - # Update evaluation run id for BulkTest run - if "evaluationFlowRunSettings" in json_body['flowSubmitRunSettings'] and "evaluation" in \ - json_body['flowSubmitRunSettings']['evaluationFlowRunSettings']: - json_body['flowSubmitRunSettings']['evaluationFlowRunSettings']["evaluation"]["flowRunId"] = \ - evaluation_run_id - result = self._post(url, json_body) - - return result, flow_run_id, evaluation_run_id - - @retry_helper() - def get_run_status(self, experiment_id, flow_id, run_id): - """Get run status.""" - url = self.get_flow_run_status_api_format.format( - self.api_endpoint, flow_id, run_id, experiment_id) - result = self._get(url) - return result - - -def get_mt_client( - subscription_id, - resource_group, - workspace_name, - tenant_id, - client_id, - client_secret, - is_local=False, - mt_service_route="https://eastus2euap.api.azureml.ms/flow") -> MTClient: - """Get mt client.""" - if (is_local): - mt_client = MTClient(mt_service_route, subscription_id, - resource_group, workspace_name) - else: - mt_client = MTClient(mt_service_route, subscription_id, resource_group, workspace_name, tenant_id, client_id, - client_secret) - return mt_client diff --git a/scripts/promptflow-ci/utils/retry_helper.py b/scripts/promptflow-ci/utils/retry_helper.py deleted file mode 100644 index b31d675120..0000000000 --- a/scripts/promptflow-ci/utils/retry_helper.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -"""Retry helper.""" - -import requests -import time -from functools import wraps -from .logging_utils import log_debug, log_error, log_warning - - -def retry_helper(retry_count=3): - """Retry helper wrapper.""" - def retry(func): - @wraps(func) - def wrapper(*args, **kwargs): - sleep_time = [1, 2, 4] - if retry_count > 3: - sleep_time.extend([10 for i in range(3, retry_count)]) - for i in range(retry_count): - try: - result = func(*args, **kwargs) - if result is None: - log_debug(f"{func.__name__} returns None, sleep {sleep_time[i]}s and will retry for {i + 1} " - f"attempt") - time.sleep(sleep_time[i]) - else: - return result - except Exception as e: - # do not retry for 401, 403 - if (e is requests.exceptions.HTTPError and e.response.status_code in [401, 403]) or \ - i == retry_count - 1: - log_error( - f"{func.__name__} failed after {retry_count} retry attempts. Error: {e}") - raise e - else: - log_warning(f"{func.__name__} failed, will sleep {sleep_time[i]} seconds and retry for the " - f"{i + 1} attempt. Error: {e}") - time.sleep(sleep_time[i]) - - result = func(*args, **kwargs) - assert result is not None, f"Failed to {func.__name__}, which returns None" - return result - - return wrapper - - return retry diff --git a/scripts/promptflow-ci/utils/utils.py b/scripts/promptflow-ci/utils/utils.py index 95e2f4ce60..e3f1643064 100644 --- a/scripts/promptflow-ci/utils/utils.py +++ b/scripts/promptflow-ci/utils/utils.py @@ -19,4 +19,4 @@ def get_diff_files(): """Run git diff.""" run_command('git fetch origin') diff_files = run_command('git diff --name-status remotes/origin/main ./') - return diff_files.stdout.split('\n') + return diff_files.stdout.split('\n') \ No newline at end of file From e0f0ec9c4496e9209ceb5c8ba2a89b9f11c5c217 Mon Sep 17 00:00:00 2001 From: Yuhang Liu Date: Mon, 18 Dec 2023 17:17:44 +0800 Subject: [PATCH 02/12] fix format --- scripts/promptflow-ci/promptflow_ci.py | 8 ++++++-- scripts/promptflow-ci/utils/flow_utils.py | 3 ++- scripts/promptflow-ci/utils/utils.py | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/scripts/promptflow-ci/promptflow_ci.py b/scripts/promptflow-ci/promptflow_ci.py index 2581a026f5..2487fce77d 100644 --- a/scripts/promptflow-ci/promptflow_ci.py +++ b/scripts/promptflow-ci/promptflow_ci.py @@ -224,8 +224,12 @@ def check_flow_run_status( # region: Step2. submit flow runs using pfazure submitted_flow_run_ids = [] submitted_flow_run_links = [] - results, handled_failures = flow_utils.submit_flow_runs_using_pfazure(flows_dirs, args.subscription_id, - args.resource_group, args.workspace_name) + results, handled_failures = flow_utils.submit_flow_runs_using_pfazure( + flows_dirs, + args.subscription_id, + args.resource_group, + args.workspace_name + ) for key, val in results.items(): submitted_flow_run_ids.append(key) submitted_flow_run_links.append(val) diff --git a/scripts/promptflow-ci/utils/flow_utils.py b/scripts/promptflow-ci/utils/flow_utils.py index 848ab709b0..8a981850da 100644 --- a/scripts/promptflow-ci/utils/flow_utils.py +++ b/scripts/promptflow-ci/utils/flow_utils.py @@ -83,7 +83,8 @@ def submit_flow_runs_using_pfazure(flow_dirs, sub, rg, ws): results = {} handled_failures = [] with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: - futures = {executor.submit(submit_func, os.path.join(flow_dir, 'run.yml'), sub, rg, ws): flow_dir for flow_dir in flow_dirs} + futures = {executor.submit(submit_func, os.path.join(flow_dir, 'run.yml'), sub, rg, ws): + flow_dir for flow_dir in flow_dirs} for future in concurrent.futures.as_completed(futures): try: flow_dir = futures[future] diff --git a/scripts/promptflow-ci/utils/utils.py b/scripts/promptflow-ci/utils/utils.py index e3f1643064..95e2f4ce60 100644 --- a/scripts/promptflow-ci/utils/utils.py +++ b/scripts/promptflow-ci/utils/utils.py @@ -19,4 +19,4 @@ def get_diff_files(): """Run git diff.""" run_command('git fetch origin') diff_files = run_command('git diff --name-status remotes/origin/main ./') - return diff_files.stdout.split('\n') \ No newline at end of file + return diff_files.stdout.split('\n') From 4b290d1f65602d6bd37f37ca263ad498e8f99166 Mon Sep 17 00:00:00 2001 From: Yuhang Liu Date: Mon, 18 Dec 2023 17:30:56 +0800 Subject: [PATCH 03/12] fix cspell --- assets/promptflow/.cspell.json | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/assets/promptflow/.cspell.json b/assets/promptflow/.cspell.json index abae42a22a..ab90d69888 100644 --- a/assets/promptflow/.cspell.json +++ b/assets/promptflow/.cspell.json @@ -139,7 +139,11 @@ "uionly", "llmops", "Abhishek", - "restx" + "restx", + "ayod", + "AYOD", + "Featur", + "showno" ], "flagWords": [ "Prompt Flow" From 5a03b055afecd6cd16688c940f7e86dcbddd3fe5 Mon Sep 17 00:00:00 2001 From: Yuhang Liu Date: Mon, 18 Dec 2023 17:35:42 +0800 Subject: [PATCH 04/12] pep8 update --- scripts/promptflow-ci/utils/flow_utils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/scripts/promptflow-ci/utils/flow_utils.py b/scripts/promptflow-ci/utils/flow_utils.py index 8a981850da..7f1cb8791e 100644 --- a/scripts/promptflow-ci/utils/flow_utils.py +++ b/scripts/promptflow-ci/utils/flow_utils.py @@ -83,8 +83,10 @@ def submit_flow_runs_using_pfazure(flow_dirs, sub, rg, ws): results = {} handled_failures = [] with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: - futures = {executor.submit(submit_func, os.path.join(flow_dir, 'run.yml'), sub, rg, ws): - flow_dir for flow_dir in flow_dirs} + futures = { + executor.submit(submit_func, os.path.join(flow_dir, 'run.yml'), sub, rg, ws): flow_dir + for flow_dir in flow_dirs + } for future in concurrent.futures.as_completed(futures): try: flow_dir = futures[future] From 4562472d7f5a4f5638cedb270e4d0b08d67d28f6 Mon Sep 17 00:00:00 2001 From: Yuhang Liu Date: Mon, 18 Dec 2023 17:41:58 +0800 Subject: [PATCH 05/12] fix syntax check --- scripts/promptflow-ci/utils/flow_utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scripts/promptflow-ci/utils/flow_utils.py b/scripts/promptflow-ci/utils/flow_utils.py index 7f1cb8791e..3ead30038c 100644 --- a/scripts/promptflow-ci/utils/flow_utils.py +++ b/scripts/promptflow-ci/utils/flow_utils.py @@ -43,6 +43,7 @@ def _assign_flow_values(flow_dirs): def _create_run_yamls(flow_dirs): + """Create run.yml.""" log_debug("\n=======Start creating run.yaml for flows=======") run_yaml = { "$schema": "https://azuremlschemas.azureedge.net/promptflow/latest/Run.schema.json", @@ -59,6 +60,7 @@ def _create_run_yamls(flow_dirs): def submit_func(run_path, sub, rg, ws): + """Worker function to submit flow run.""" command = f"pfazure run create --file {run_path} --subscription {sub} -g {rg} -w {ws}" res = run_command(command) res = res.stdout.split('\n') @@ -66,6 +68,7 @@ def submit_func(run_path, sub, rg, ws): def get_run_id_and_url(res): + """Resolve run_id an url from log.""" run_id = "" portal_url = "" for line in res: @@ -80,6 +83,7 @@ def get_run_id_and_url(res): def submit_flow_runs_using_pfazure(flow_dirs, sub, rg, ws): + """Multi thread submit flow run using pfazure.""" results = {} handled_failures = [] with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: From b491c0f75570411f0c0bc39daaaa15f164882c24 Mon Sep 17 00:00:00 2001 From: Yuhang Liu Date: Mon, 18 Dec 2023 17:44:54 +0800 Subject: [PATCH 06/12] update --- assets/promptflow/.cspell.json | 1 - scripts/promptflow-ci/promptflow_ci.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/assets/promptflow/.cspell.json b/assets/promptflow/.cspell.json index ab90d69888..9731e0aa88 100644 --- a/assets/promptflow/.cspell.json +++ b/assets/promptflow/.cspell.json @@ -142,7 +142,6 @@ "restx", "ayod", "AYOD", - "Featur", "showno" ], "flagWords": [ diff --git a/scripts/promptflow-ci/promptflow_ci.py b/scripts/promptflow-ci/promptflow_ci.py index 2487fce77d..48bbf4d34d 100644 --- a/scripts/promptflow-ci/promptflow_ci.py +++ b/scripts/promptflow-ci/promptflow_ci.py @@ -158,7 +158,7 @@ def check_flow_run_status( parser.add_argument('--flow_submit_mode', type=str, default="sync") parser.add_argument('--run_time', type=str, default="default-mir") parser.add_argument('--skipped_flows', type=str, - default="bring_your_own_data_qna,playground_ayod_rag,template_chat_flow") + default="bring_your_own_data_qna,template_chat_flow") # Skip bring_your_own_data_qna test, the flow has a bug. # Bug 2773738: Add retry when ClientAuthenticationError # https://msdata.visualstudio.com/Vienna/_workitems/edit/2773738 From 32e3624acf84a2ee018c9ea1e774c3236fa6b067 Mon Sep 17 00:00:00 2001 From: Yuhang Liu Date: Mon, 18 Dec 2023 18:12:39 +0800 Subject: [PATCH 07/12] update --- assets/promptflow/.cspell.json | 1 + scripts/promptflow-ci/utils/flow_utils.py | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/assets/promptflow/.cspell.json b/assets/promptflow/.cspell.json index 9731e0aa88..ab90d69888 100644 --- a/assets/promptflow/.cspell.json +++ b/assets/promptflow/.cspell.json @@ -142,6 +142,7 @@ "restx", "ayod", "AYOD", + "Featur", "showno" ], "flagWords": [ diff --git a/scripts/promptflow-ci/utils/flow_utils.py b/scripts/promptflow-ci/utils/flow_utils.py index 3ead30038c..e6ed3f2849 100644 --- a/scripts/promptflow-ci/utils/flow_utils.py +++ b/scripts/promptflow-ci/utils/flow_utils.py @@ -8,6 +8,7 @@ from pathlib import Path import yaml import concurrent.futures +import json from utils.logging_utils import log_debug, log_error from utils.utils import run_command @@ -38,6 +39,15 @@ def _assign_flow_values(flow_dirs): flow_node["inputs"]["connection"] = "aoai_connection" with open(flow_dir / "flow.dag.yaml", "w", encoding="utf-8") as dag_file: yaml.dump(flow_dag, dag_file, allow_unicode=True) + + if not os.path.exists(Path(flow_dir)/"samples.json"): + with open(flow_dir/"samples.json", 'w', encoding="utf-8") as sample_file: + samples = [] + sample = {} + for key, val in flow_dag["inputs"].items(): + sample[key] = val.get("default") + samples.append(sample) + json.dump(sample, sample_file, indent=4) log_debug("=======Complete overriding values for flows=======\n") return flow_dirs From 216c470fc12014bfe99d6785582dbd2239fe722a Mon Sep 17 00:00:00 2001 From: Yuhang Liu Date: Tue, 19 Dec 2023 11:39:28 +0800 Subject: [PATCH 08/12] update --- scripts/promptflow-ci/utils/flow_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/promptflow-ci/utils/flow_utils.py b/scripts/promptflow-ci/utils/flow_utils.py index e6ed3f2849..91a0a45512 100644 --- a/scripts/promptflow-ci/utils/flow_utils.py +++ b/scripts/promptflow-ci/utils/flow_utils.py @@ -71,7 +71,7 @@ def _create_run_yamls(flow_dirs): def submit_func(run_path, sub, rg, ws): """Worker function to submit flow run.""" - command = f"pfazure run create --file {run_path} --subscription {sub} -g {rg} -w {ws}" + command = f"pfazure run create --file {run_path} --subscription {sub} -g {rg} -w {ws}" res = run_command(command) res = res.stdout.split('\n') return res From 7e2670fcf1772a4d3de1e37bf878be0eefdc4153 Mon Sep 17 00:00:00 2001 From: Yuhang Liu Date: Tue, 19 Dec 2023 11:43:56 +0800 Subject: [PATCH 09/12] update utils --- scripts/promptflow-ci/utils/flow_utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/promptflow-ci/utils/flow_utils.py b/scripts/promptflow-ci/utils/flow_utils.py index 91a0a45512..81d29f50ca 100644 --- a/scripts/promptflow-ci/utils/flow_utils.py +++ b/scripts/promptflow-ci/utils/flow_utils.py @@ -37,6 +37,8 @@ def _assign_flow_values(flow_dirs): flow_node["inputs"]["deployment_name"] = "gpt-35-turbo" if "connection" in flow_node["inputs"]: flow_node["inputs"]["connection"] = "aoai_connection" + if "searchConnection" in flow_node["inputs"]: + flow_node["inputs"]["searchConnection"] = "AzureAISearch" with open(flow_dir / "flow.dag.yaml", "w", encoding="utf-8") as dag_file: yaml.dump(flow_dag, dag_file, allow_unicode=True) From 169a69da18643ade201b4c2cecb2864f64614bb4 Mon Sep 17 00:00:00 2001 From: Yuhang Liu Date: Tue, 19 Dec 2023 15:42:09 +0800 Subject: [PATCH 10/12] update --- scripts/promptflow-ci/utils/flow_utils.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/scripts/promptflow-ci/utils/flow_utils.py b/scripts/promptflow-ci/utils/flow_utils.py index 81d29f50ca..c4b9efac32 100644 --- a/scripts/promptflow-ci/utils/flow_utils.py +++ b/scripts/promptflow-ci/utils/flow_utils.py @@ -37,7 +37,7 @@ def _assign_flow_values(flow_dirs): flow_node["inputs"]["deployment_name"] = "gpt-35-turbo" if "connection" in flow_node["inputs"]: flow_node["inputs"]["connection"] = "aoai_connection" - if "searchConnection" in flow_node["inputs"]: + if "searchConnection" in flow_node["inputs"]: flow_node["inputs"]["searchConnection"] = "AzureAISearch" with open(flow_dir / "flow.dag.yaml", "w", encoding="utf-8") as dag_file: yaml.dump(flow_dag, dag_file, allow_unicode=True) @@ -47,7 +47,14 @@ def _assign_flow_values(flow_dirs): samples = [] sample = {} for key, val in flow_dag["inputs"].items(): - sample[key] = val.get("default") + value = val.get("default") + if isinstance(value, list): + if not value: + value.append("default") + elif isinstance(value, str): + if value == "": + value = "default" + sample[key] = value samples.append(sample) json.dump(sample, sample_file, indent=4) log_debug("=======Complete overriding values for flows=======\n") From 6a78de8db9a6cd24231cc576488bcd6f3216cec9 Mon Sep 17 00:00:00 2001 From: Yuhang Liu Date: Wed, 20 Dec 2023 19:17:50 +0800 Subject: [PATCH 11/12] fix comment --- scripts/promptflow-ci/promptflow_ci.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/scripts/promptflow-ci/promptflow_ci.py b/scripts/promptflow-ci/promptflow_ci.py index 48bbf4d34d..82aea723b7 100644 --- a/scripts/promptflow-ci/promptflow_ci.py +++ b/scripts/promptflow-ci/promptflow_ci.py @@ -162,6 +162,7 @@ def check_flow_run_status( # Skip bring_your_own_data_qna test, the flow has a bug. # Bug 2773738: Add retry when ClientAuthenticationError # https://msdata.visualstudio.com/Vienna/_workitems/edit/2773738 + # Skip template_chat_flow because not able to extract samples.json for test. args = parser.parse_args() # Get changed models folder or all models folder @@ -282,6 +283,10 @@ def check_flow_run_status( for flow_run_id, evaluation_run_link in failed_evaluation_runs.items(): log_error( f"Bulk test evaluation run link to Azure Machine Learning Portal: {evaluation_run_link}") + log_error(f"The links are scrubbed due to compliance, for how to debug the flow, please refer " + f"to https://msdata.visualstudio.com/Vienna/_git/PromptFlow?path=/docs/sharing-your-flows-in-prompt-flow-gallery.md" + f"&_a=preview&anchor=2.-how-to-debug-a-failed-run-in--%60validate-prompt-flows%60-step-of-%5Bpromptflow-ci" + f"%5D(https%3A//github.com/azure/azureml-assets/actions/workflows/promptflow-ci.yml)") elif len(submitted_flow_run_ids) == 0: log_debug( f"\nRun status checking completed. {flow_runs_count} flow runs completed.") From d3389eff1b77a8f27e0f6f3f7a89ffbfc31ab48b Mon Sep 17 00:00:00 2001 From: Yuhang Liu Date: Wed, 20 Dec 2023 19:34:04 +0800 Subject: [PATCH 12/12] update --- scripts/promptflow-ci/promptflow_ci.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/scripts/promptflow-ci/promptflow_ci.py b/scripts/promptflow-ci/promptflow_ci.py index 82aea723b7..f6d30ffe4a 100644 --- a/scripts/promptflow-ci/promptflow_ci.py +++ b/scripts/promptflow-ci/promptflow_ci.py @@ -283,10 +283,11 @@ def check_flow_run_status( for flow_run_id, evaluation_run_link in failed_evaluation_runs.items(): log_error( f"Bulk test evaluation run link to Azure Machine Learning Portal: {evaluation_run_link}") - log_error(f"The links are scrubbed due to compliance, for how to debug the flow, please refer " - f"to https://msdata.visualstudio.com/Vienna/_git/PromptFlow?path=/docs/sharing-your-flows-in-prompt-flow-gallery.md" - f"&_a=preview&anchor=2.-how-to-debug-a-failed-run-in--%60validate-prompt-flows%60-step-of-%5Bpromptflow-ci" - f"%5D(https%3A//github.com/azure/azureml-assets/actions/workflows/promptflow-ci.yml)") + log_error("The links are scrubbed due to compliance, for how to debug the flow, please refer " + "to https://msdata.visualstudio.com/Vienna/_git/PromptFlow?path=/docs/" + "sharing-your-flows-in-prompt-flow-gallery.md&_a=preview&anchor=2.-how-to-debug-a-failed" + "-run-in--%60validate-prompt-flows%60-step-of-%5Bpromptflow-ci" + "%5D(https%3A//github.com/azure/azureml-assets/actions/workflows/promptflow-ci.yml)") elif len(submitted_flow_run_ids) == 0: log_debug( f"\nRun status checking completed. {flow_runs_count} flow runs completed.")