Skip to content

Commit

Permalink
Leverage async batch run for first async-enabled evaluator - FluencyE…
Browse files Browse the repository at this point in the history
…valuator (#3542)

# Description

This PR aims to improve the performance of the evaluate API by
leveraging async batch run to eliminate the overhead associated with
using multiple processes. The key changes include:

- Converting the FluencyEvaluator to an async-based implementation.
- Plumbing work in the BatchEngine to enable async batch runs.

For more details, please check the "Run Evaluators Asynchronously"
section in this
[document](https://microsoft-my.sharepoint.com/:w:/p/ninhu/ETB_zdMkFrdAuf3Lcg9ssrUB6RVmyuFs5Un1G74O1HlwSA?e=vtfp7w).

**Results:**

- Evaluation with 1 evaluator and 1 row used to take 16 seconds. Now, it
takes only 2 seconds, about 87% improve.
- The result is very close to a pure thread pool implementation, but
with async batch run, we also get proper timeout handling.


# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
  • Loading branch information
ninghu authored Jul 17, 2024
1 parent 93c211d commit e8d564b
Show file tree
Hide file tree
Showing 15 changed files with 188 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ async def exec_aggregation_async(
) -> AggregationResult:
return self._flow_executor.exec_aggregation(batch_inputs, aggregation_inputs, run_id=run_id)

async def exec_line_async(
self,
inputs: Mapping[str, Any],
index: Optional[int] = None,
run_id: Optional[str] = None,
) -> LineResult:
return await self._flow_executor.exec_line_async(inputs, index, run_id)

async def _exec_batch(
self,
batch_inputs: List[Mapping[str, Any]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _run_bulk(self, run: Run, stream=False, **kwargs):
with flow_overwrite_context(
flow_obj, tuning_node, variant, connections=run.connections, init_kwargs=run.init
) as flow:
self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage)
self._submit_bulk_run(flow=flow, run=run, local_storage=local_storage, **kwargs)

@classmethod
def _validate_inputs(cls, run: Run):
Expand All @@ -140,7 +140,7 @@ def _validate_inputs(cls, run: Run):
raise UserErrorException(message=str(error), error=error)

def _submit_bulk_run(
self, flow: Union[Flow, FlexFlow, Prompty], run: Run, local_storage: LocalStorageOperations
self, flow: Union[Flow, FlexFlow, Prompty], run: Run, local_storage: LocalStorageOperations, **kwargs
) -> dict:
logger.info(f"Submitting run {run.name}, log path: {local_storage.logger.file_path}")
run_id = run.name
Expand Down Expand Up @@ -183,6 +183,7 @@ def _submit_bulk_run(
storage=local_storage,
log_path=local_storage.logger.file_path,
init_kwargs=run.init,
**kwargs,
)
batch_result = batch_engine.run(
input_dirs=input_dirs,
Expand Down
7 changes: 5 additions & 2 deletions src/promptflow-devkit/promptflow/batch/_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def __init__(
self._storage = storage if storage else DefaultRunStorage(base_dir=self._working_dir)
self._kwargs = kwargs

self._batch_use_async = kwargs.get("batch_use_async", False)
self._batch_timeout_sec = batch_timeout_sec or get_int_env_var("PF_BATCH_TIMEOUT_SEC")
self._line_timeout_sec = line_timeout_sec or get_int_env_var("PF_LINE_TIMEOUT_SEC", LINE_TIMEOUT_SEC)
self._worker_count = worker_count or get_int_env_var("PF_WORKER_COUNT")
Expand Down Expand Up @@ -472,7 +473,7 @@ async def _exec(

# execute lines
is_timeout = False
if isinstance(self._executor_proxy, PythonExecutorProxy):
if not self._batch_use_async and isinstance(self._executor_proxy, PythonExecutorProxy):
results, is_timeout = await self._executor_proxy._exec_batch(
inputs_to_run,
output_dir,
Expand Down Expand Up @@ -656,6 +657,8 @@ def _check_eager_flow_and_language_from_yaml(self):

def _batch_timeout_expired(self) -> bool:
# Currently, local PythonExecutorProxy will handle the batch timeout by itself.
if self._batch_timeout_sec is None or isinstance(self._executor_proxy, PythonExecutorProxy):
if self._batch_timeout_sec is None or (
not self._batch_use_async and isinstance(self._executor_proxy, PythonExecutorProxy)
):
return False
return (datetime.utcnow() - self._start_time).total_seconds() > self._batch_timeout_sec
24 changes: 13 additions & 11 deletions src/promptflow-evals/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
# promptflow-evals package
# Release History

Please insert change log into "Next Release" ONLY.

## Next release

## 0.3.2
## v0.3.2 (Upcoming)

### Features Added
- Introduced `JailbreakAdversarialSimulator` for customers who need to do run jailbreak and non jailbreak adversarial simulations at the same time. More info in the README.md in `/promptflow/evals/synthetic/README.md#jailbreak-simulator`

- The `AdversarialSimulator` responds with `category` of harm in the response.

- Large simulation was causing a jinja exception, this has been fixed
### Bugs Fixed
- Large simulation was causing a jinja exception, this has been fixed.

### Improvements
- Converted built-in evaluators to async-based implementation, leveraging async batch run for performance improvement.
- Parity between evals and Simulator on signature, passing credentials.
- The `AdversarialSimulator` responds with `category` of harm in the response.

## v0.3.1 (2022-07-09)
- This release contains minor bug fixes and improvements.

## 0.0.1
- Introduced package
## v0.3.0 (2024-05-17)
- Initial release of promptflow-evals package.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from promptflow.tracing._integrations._openai_injector import inject_openai_api, recover_openai_api

from ..._user_agent import USER_AGENT
from .._utils import set_event_loop_policy
from .code_client import CodeClient
from .proxy_client import ProxyClient

Expand All @@ -25,6 +26,9 @@ def __enter__(self):
os.environ[PF_FLOW_ENTRY_IN_TMP] = "true"
os.environ[PF_FLOW_META_LOAD_IN_SUBPROCESS] = "false"

# For addressing the issue of asyncio event loop closed on Windows
set_event_loop_policy()

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(self.client, CodeClient):
recover_openai_api()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import inspect
import logging

import numpy as np
Expand All @@ -24,8 +25,18 @@ def __init__(self, pf_client: PFClient):
self._thread_pool = ThreadPoolExecutor(thread_name_prefix="evaluators_thread")

def run(self, flow, data, column_mapping=None, **kwargs):
flow_to_run = flow
if hasattr(flow, "_to_async"):
flow_to_run = flow._to_async()

batch_use_async = self._should_batch_use_async(flow_to_run)
eval_future = self._thread_pool.submit(
self._pf_client.run, flow, data=data, column_mapping=column_mapping, **kwargs
self._pf_client.run,
flow_to_run,
data=data,
column_mapping=column_mapping,
batch_use_async=batch_use_async,
**kwargs
)
return ProxyRun(run=eval_future)

Expand All @@ -38,3 +49,12 @@ def get_details(self, proxy_run, all_results=False):
def get_metrics(self, proxy_run):
run = proxy_run.run.result(timeout=BATCH_RUN_TIMEOUT)
return self._pf_client.get_metrics(run)

@staticmethod
def _should_batch_use_async(flow):
if hasattr(flow, "__call__") and inspect.iscoroutinefunction(flow.__call__):
return True
elif inspect.iscoroutinefunction(flow):
return True
else:
return False
19 changes: 16 additions & 3 deletions src/promptflow-evals/promptflow/evals/evaluate/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from promptflow.evals._constants import DEFAULT_EVALUATION_RESULTS_FILE_NAME, Prefixes
from promptflow.evals.evaluate._eval_run import EvalRun


LOGGER = logging.getLogger(__name__)

AZURE_WORKSPACE_REGEX_FORMAT = (
Expand Down Expand Up @@ -62,7 +61,11 @@ def _azure_pf_client_and_triad(trace_destination):


def _log_metrics_and_instance_results(
metrics, instance_results, trace_destination, run, evaluation_name,
metrics,
instance_results,
trace_destination,
run,
evaluation_name,
) -> str:
if trace_destination is None:
LOGGER.error("Unable to log traces as trace destination was not defined.")
Expand Down Expand Up @@ -175,7 +178,7 @@ def _apply_column_mapping(source_df: pd.DataFrame, mapping_config: dict, inplace
if match is not None:
pattern = match.group(1)
if pattern.startswith(pattern_prefix):
map_from_key = pattern[len(pattern_prefix):]
map_from_key = pattern[len(pattern_prefix) :]
elif pattern.startswith(run_outputs_prefix):
# Target-generated columns always starts from .outputs.
map_from_key = f"{Prefixes._TGT_OUTPUTS}{pattern[len(run_outputs_prefix) :]}"
Expand All @@ -199,3 +202,13 @@ def _apply_column_mapping(source_df: pd.DataFrame, mapping_config: dict, inplace

def _has_aggregator(evaluator):
return hasattr(evaluator, "__aggregate__")


def set_event_loop_policy():
import asyncio
import platform

if platform.system().lower() == "windows":
# Reference: https://stackoverflow.com/questions/45600579/asyncio-event-loop-is-closed-when-getting-loop
# On Windows seems to be a problem with EventLoopPolicy, use this snippet to work around it
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,48 @@

import numpy as np

from promptflow.client import load_flow
from promptflow.core import AzureOpenAIModelConfiguration
from promptflow._utils.async_utils import async_run_allowing_running_loop
from promptflow.core import AsyncPrompty, AzureOpenAIModelConfiguration

try:
from ..._user_agent import USER_AGENT
except ImportError:
USER_AGENT = None


class _AsyncFluencyEvaluator:
def __init__(self, model_config: AzureOpenAIModelConfiguration):
if model_config.api_version is None:
model_config.api_version = "2024-02-15-preview"

prompty_model_config = {"configuration": model_config}
prompty_model_config.update(
{"parameters": {"extra_headers": {"x-ms-useragent": USER_AGENT}}}
) if USER_AGENT and isinstance(model_config, AzureOpenAIModelConfiguration) else None
current_dir = os.path.dirname(__file__)
prompty_path = os.path.join(current_dir, "fluency.prompty")
self._flow = AsyncPrompty.load(source=prompty_path, model=prompty_model_config)

async def __call__(self, *, question: str, answer: str, **kwargs):
# Validate input parameters
question = str(question or "")
answer = str(answer or "")

if not (question.strip() and answer.strip()):
raise ValueError("Both 'question' and 'answer' must be non-empty strings.")

# Run the evaluation flow
llm_output = await self._flow(question=question, answer=answer)

score = np.nan
if llm_output:
match = re.search(r"\d", llm_output)
if match:
score = float(match.group())

return {"gpt_fluency": float(score)}


class FluencyEvaluator:
"""
Initialize a fluency evaluator configured for a specific Azure OpenAI model.
Expand All @@ -41,17 +75,7 @@ class FluencyEvaluator:
"""

def __init__(self, model_config: AzureOpenAIModelConfiguration):
# TODO: Remove this block once the bug is fixed
# https://msdata.visualstudio.com/Vienna/_workitems/edit/3151324
if model_config.api_version is None:
model_config.api_version = "2024-02-15-preview"

prompty_model_config = {"configuration": model_config}
prompty_model_config.update({"parameters": {"extra_headers": {"x-ms-useragent": USER_AGENT}}}) \
if USER_AGENT and isinstance(model_config, AzureOpenAIModelConfiguration) else None
current_dir = os.path.dirname(__file__)
prompty_path = os.path.join(current_dir, "fluency.prompty")
self._flow = load_flow(source=prompty_path, model=prompty_model_config)
self._async_evaluator = _AsyncFluencyEvaluator(model_config)

def __call__(self, *, question: str, answer: str, **kwargs):
"""
Expand All @@ -64,20 +88,7 @@ def __call__(self, *, question: str, answer: str, **kwargs):
:return: The fluency score.
:rtype: dict
"""
# Validate input parameters
question = str(question or "")
answer = str(answer or "")
return async_run_allowing_running_loop(self._async_evaluator, question=question, answer=answer, **kwargs)

if not (question.strip() and answer.strip()):
raise ValueError("Both 'question' and 'answer' must be non-empty strings.")

# Run the evaluation flow
llm_output = self._flow(question=question, answer=answer)

score = np.nan
if llm_output:
match = re.search(r"\d", llm_output)
if match:
score = float(match.group())

return {"gpt_fluency": float(score)}
def _to_async(self):
return self._async_evaluator
11 changes: 7 additions & 4 deletions src/promptflow-evals/tests/evals/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from unittest.mock import patch

import pytest

from pytest_mock import MockerFixture

from promptflow.client import PFClient
Expand Down Expand Up @@ -36,6 +35,7 @@ def is_record():
def is_replay():
return False


# Import of optional packages
AZURE_INSTALLED = True
try:
Expand Down Expand Up @@ -95,6 +95,8 @@ def model_config() -> dict:
raise ValueError(f"Connection '{conn_name}' not found in dev connections.")

model_config = AzureOpenAIModelConfiguration(**dev_connections[conn_name]["value"])
# Default to gpt-35-turbo for capacity reasons
model_config.azure_deployment = "gpt-35-turbo"

AzureOpenAIModelConfiguration.__repr__ = lambda self: "<sensitive data redacted>"

Expand Down Expand Up @@ -368,12 +370,13 @@ def pytest_collection_modifyitems(items):
parents = {}
for item in items:
# Check if parent contains 'localtest' marker and remove it.
if any(mark.name == 'localtest' for mark in item.parent.own_markers) or id(item.parent) in parents:
if any(mark.name == "localtest" for mark in item.parent.own_markers) or id(item.parent) in parents:
if id(item.parent) not in parents:
item.parent.own_markers = [
marker for marker in item.own_markers if getattr(marker, 'name', None) != 'localtest']
marker for marker in item.own_markers if getattr(marker, "name", None) != "localtest"
]
parents[id(item.parent)] = item.parent
if not item.get_closest_marker('azuretest'):
if not item.get_closest_marker("azuretest"):
# If item's parent was marked as 'localtest', mark the child as such, but not if
# it was marked as 'azuretest'.
item.add_marker(pytest.mark.localtest)
34 changes: 33 additions & 1 deletion src/promptflow-evals/tests/evals/e2etests/test_evaluate.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import json
import os
import pathlib
import time

import numpy as np
import pandas as pd
import pytest
import requests

from promptflow.evals.evaluate import evaluate
from promptflow.evals.evaluators import ContentSafetyEvaluator, F1ScoreEvaluator, GroundednessEvaluator
from promptflow.evals.evaluators import (
ContentSafetyEvaluator,
F1ScoreEvaluator,
FluencyEvaluator,
GroundednessEvaluator,
)


@pytest.fixture
Expand Down Expand Up @@ -154,6 +160,32 @@ def test_evaluate_with_content_safety_evaluator(self, project_scope, data_file,
assert 0 <= metrics.get("content_safety.self_harm_defect_rate") <= 1
assert 0 <= metrics.get("content_safety.hate_unfairness_defect_rate") <= 1

@pytest.mark.performance_test
def test_evaluate_with_async_enabled_evaluator(self, model_config, data_file):
fluency_eval = FluencyEvaluator(model_config)

start_time = time.time()
result = evaluate(
data=data_file,
evaluators={
"fluency": fluency_eval,
},
)
end_time = time.time()
duration = end_time - start_time

row_result_df = pd.DataFrame(result["rows"])
metrics = result["metrics"]

# validate the results
assert result is not None
assert result["rows"] is not None
input_data = pd.read_json(data_file, lines=True)
assert row_result_df.shape[0] == len(input_data)
assert "outputs.fluency.gpt_fluency" in row_result_df.columns.to_list()
assert "fluency.gpt_fluency" in metrics.keys()
assert duration < 10, f"evaluate API call took too long: {duration} seconds"

@pytest.mark.parametrize(
"use_pf_client,function,column",
[
Expand Down
Loading

0 comments on commit e8d564b

Please sign in to comment.