From 56cea8cbbdad373786e0495dfa99d729c0535c2e Mon Sep 17 00:00:00 2001 From: bogdankostic Date: Thu, 3 Aug 2023 10:09:00 +0200 Subject: [PATCH] test: Add scripts to send benchmark results to datadog (#5432) * Add config files * log benchmarks to stdout * Add top-k and batch size to configs * Add batch size to configs * fix: don't download files if they already exist * Add batch size to configs * refine script * Remove configs using 1m docs * update run script * update run script * update run script * datadog integration * remove out folder * gitignore benchmarks output * test: send benchmarks to datadog * remove uncommented lines in script * feat: take branch/tag argument for benchmark setup script * fix: run.sh should ignore errors * Remove changes unrelated to datadog * Apply black * Update test/benchmarks/utils.py Co-authored-by: Silvano Cerza <3314350+silvanocerza@users.noreply.github.com> * PR feedback * Account for reader benchmarks not doing indexing * Change key of reader metrics * Apply PR feedback * Remove whitespace --------- Co-authored-by: rjanjua Co-authored-by: Silvano Cerza <3314350+silvanocerza@users.noreply.github.com> --- test/benchmarks/datadog/metric_handler.py | 153 ++++++++++++++++++++ test/benchmarks/datadog/requirements.txt | 1 + test/benchmarks/datadog/send_metrics.py | 165 ++++++++++++++++++++++ test/benchmarks/reader.py | 2 +- test/benchmarks/run.py | 2 - test/benchmarks/utils.py | 15 +- 6 files changed, 332 insertions(+), 6 deletions(-) create mode 100644 test/benchmarks/datadog/metric_handler.py create mode 100644 test/benchmarks/datadog/requirements.txt create mode 100644 test/benchmarks/datadog/send_metrics.py diff --git a/test/benchmarks/datadog/metric_handler.py b/test/benchmarks/datadog/metric_handler.py new file mode 100644 index 0000000000..7a0eb5e408 --- /dev/null +++ b/test/benchmarks/datadog/metric_handler.py @@ -0,0 +1,153 @@ +from enum import Enum +from itertools import chain +from time import time +from typing import Dict, List, Optional + +import datadog +from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type +import logging + +LOGGER = logging.getLogger(__name__) + + +class Tag(Enum): + @classmethod + def values(cls): + return [e.value for e in cls] + + +class NoneTag(Tag): + none = "none_none_none_none-1234" # should not match any other tag + + +class DatasetSizeTags(Tag): + size_100k = "dataset_size:100k" + + +class ReaderModelTags(Tag): + debertabase = "reader:debertabase" + debertalarge = "reader:debertalarge" + tinyroberta = "reader:tinyroberta" + + +class RetrieverModelTags(Tag): + bm25 = "retriever:bm25" + minilm = "retriever:minilm" + mpnetbase = "retriever:mpnetbase" + + +class DocumentStoreModelTags(Tag): + opensearch = "documentstore:opensearch" + elasticsearch = "documentstore:elasticsearch" + weaviate = "documentstore:weaviate" + + +class BenchmarkType(Tag): + retriever = "benchmark_type:retriever" + retriever_reader = "benchmark_type:retriever_reader" + reader = "benchmark_type:reader" + + +class CustomDatadogMetric: + name: str + timestamp: float + value: float + tags: List[Tag] + + def __init__(self, name: str, value: float, tags: Optional[List[Tag]] = None) -> None: + self.timestamp = time() + self.name = name + self.value = value + self.tags = self.validate_tags(tags) if tags is not None else [] + + def validate_tags(self, tags: List[Tag]) -> List[Tag]: + valid_tags: List[Tag] = [] + for tag in tags: + if isinstance( + tag, (DatasetSizeTags, ReaderModelTags, RetrieverModelTags, DocumentStoreModelTags, BenchmarkType) + ): + valid_tags.append(tag) + elif tag != NoneTag.none: + # Log invalid tags as errors + LOGGER.error(f"Tag is not a valid dataset or environment tag: tag={tag}") + + return valid_tags + + +class IndexingDocsPerSecond(CustomDatadogMetric): + def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None: + name = "haystack.benchmarks.indexing.docs_per_second" + super().__init__(name=name, value=value, tags=tags) + + +class QueryingExactMatchMetric(CustomDatadogMetric): + def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None: + name = "haystack.benchmarks.querying.exact_match" + super().__init__(name=name, value=value, tags=tags) + + +class QueryingF1Metric(CustomDatadogMetric): + def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None: + name = "haystack.benchmarks.querying.f1_score" + super().__init__(name=name, value=value, tags=tags) + + +class QueryingRecallMetric(CustomDatadogMetric): + def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None: + name = "haystack.benchmarks.querying.recall" + super().__init__(name=name, value=value, tags=tags) + + +class QueryingMapMetric(CustomDatadogMetric): + def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None: + name = "haystack.benchmarks.querying.map" + super().__init__(name=name, value=value, tags=tags) + + +class QueryingSecondsPerQueryMetric(CustomDatadogMetric): + def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None: + name = "haystack.benchmarks.querying.seconds_per_query" + super().__init__(name=name, value=value, tags=tags) + + +class MetricsAPI: + def __init__(self, datadog_api_key: str, datadog_host: str): + self.datadog_api_key = datadog_api_key + self.datadog_host = datadog_host + + @retry(retry=retry_if_exception_type(ConnectionError), wait=wait_fixed(5), stop=stop_after_attempt(3), reraise=True) + def send_custom_dd_metric(self, metric: CustomDatadogMetric) -> dict: + datadog.initialize(api_key=self.datadog_api_key, host_name=self.datadog_host) + + tags: List[str] = list(map(lambda t: str(t.value), metric.tags)) + post_metric_response: Dict = datadog.api.Metric.send( + metric=metric.name, points=[metric.timestamp, metric.value], tags=tags + ) + + if post_metric_response.get("status") != "ok": + LOGGER.error( + f"Could not send custom metric. Retrying. metric_name={metric.name}, metric_value={metric.value}, " + f"status={post_metric_response.get('status')}, error={post_metric_response.get('errors')}, " + f"{post_metric_response}" + ) + raise ConnectionError(f"Could not send custom metric. {post_metric_response}") + else: + LOGGER.info( + f"Sent custom metric. metric_name={metric.name}, metric_value={metric.value}, " + f"status={post_metric_response.get('status')}" + ) + + return post_metric_response + + def send_custom_dd_metrics(self, metrics: List[CustomDatadogMetric]) -> List[Dict]: + responses = [] + for metric in metrics: + try: + response = self.send_custom_dd_metric(metric) + responses.append(response) + except ConnectionError as e: + LOGGER.error( + f"Could not send custom metric even after retrying. " + f"metric_name={metric.name}, metric_value={metric.value}" + ) + return responses diff --git a/test/benchmarks/datadog/requirements.txt b/test/benchmarks/datadog/requirements.txt new file mode 100644 index 0000000000..3ec93e2859 --- /dev/null +++ b/test/benchmarks/datadog/requirements.txt @@ -0,0 +1 @@ +datadog==0.45.0 diff --git a/test/benchmarks/datadog/send_metrics.py b/test/benchmarks/datadog/send_metrics.py new file mode 100644 index 0000000000..6d44449ed4 --- /dev/null +++ b/test/benchmarks/datadog/send_metrics.py @@ -0,0 +1,165 @@ +import argparse +import os +import json +from typing import Dict + +from metric_handler import ( + ReaderModelTags, + NoneTag, + RetrieverModelTags, + DocumentStoreModelTags, + BenchmarkType, + LOGGER, + DatasetSizeTags, + IndexingDocsPerSecond, + QueryingExactMatchMetric, + QueryingF1Metric, + QueryingRecallMetric, + QueryingSecondsPerQueryMetric, + QueryingMapMetric, + MetricsAPI, + Tag, +) + + +def parse_benchmark_files(folder_path: str) -> Dict: + metrics = {} + for filename in os.listdir(folder_path): + if filename.endswith(".json"): + file_path = os.path.join(folder_path, filename) + with open(file_path, "r") as file: + data = json.load(file) + indexing_metrics = data.get("indexing", {}) + querying_metrics = data.get("querying") + config = data.get("config") + if indexing_metrics.get("error") is None and querying_metrics.get("error") is None: + metrics[filename.split(".json")[0]] = { + "indexing": indexing_metrics, + "querying": querying_metrics, + "config": config, + } + return metrics + + +def get_reader_tag(config: Dict) -> Tag: + for comp in config["components"]: + if comp["name"] == "Reader": + model = comp["params"]["model_name_or_path"] + + if model == "deepset/tinyroberta-squad2": + return ReaderModelTags.tinyroberta + + if model == "deepset/deberta-v3-base-squad2": + return ReaderModelTags.debertabase + + if model == "deepset/deberta-v3-large-squad2": + return ReaderModelTags.debertalarge + + return NoneTag.none + + +def get_retriever_tag(config: Dict) -> Tag: + for comp in config["components"]: + if comp["name"] == "Retriever": + if comp["type"] == "BM25Retriever": + return RetrieverModelTags.bm25 + + model = comp["params"]["embedding_model"] + if "minilm" in model: + return RetrieverModelTags.minilm + + if "mpnet-base" in model: + return RetrieverModelTags.mpnetbase + + return NoneTag.none + + +def get_documentstore_tag(config: Dict) -> Tag: + for comp in config["components"]: + if comp["name"] == "DocumentStore": + if comp["type"] == "ElasticsearchDocumentStore": + return DocumentStoreModelTags.elasticsearch + + if comp["type"] == "WeaviateDocumentStore": + return DocumentStoreModelTags.weaviate + + if comp["type"] == "OpenSearchDocumentStore": + return DocumentStoreModelTags.opensearch + + return NoneTag.none + + +def get_benchmark_type_tag(reader_tag, retriever_tag, document_store_tag): + if reader_tag != NoneTag.none and retriever_tag != NoneTag.none and document_store_tag != NoneTag.none: + return BenchmarkType.retriever_reader + elif retriever_tag != NoneTag.none and document_store_tag != NoneTag.none: + return BenchmarkType.retriever + elif reader_tag != NoneTag.none and retriever_tag == NoneTag.none: + return BenchmarkType.reader + + LOGGER.warn( + f"Did not find benchmark_type for the combination of tags, retriever={retriever_tag}, reader={reader_tag}, " + f"document_store={document_store_tag}" + ) + return NoneTag.none + + +def collect_metrics_from_json_files(folder_path): + benchmark_metrics = parse_benchmark_files(folder_path) + metrics_to_send_to_dd = [] + for benchmark_name, metrics in benchmark_metrics.items(): + indexing_metrics = metrics["indexing"] + querying_metrics = metrics["querying"] + config = metrics["config"] + + docs_per_second = indexing_metrics.get("docs_per_second") + + exact_match = querying_metrics.get("exact_match") + f1_score = querying_metrics.get("f1") + recall = querying_metrics.get("recall") + seconds_per_query = querying_metrics.get("seconds_per_query") + map_query = querying_metrics.get("map") + + size_tag = DatasetSizeTags.size_100k + reader_tag = get_reader_tag(config) + retriever_tag = get_retriever_tag(config) + document_store_tag = get_documentstore_tag(config) + benchmark_type_tag = get_benchmark_type_tag(reader_tag, retriever_tag, document_store_tag) + + tags = [size_tag, reader_tag, retriever_tag, document_store_tag, benchmark_type_tag] + + if docs_per_second: + metrics_to_send_to_dd.append(IndexingDocsPerSecond(docs_per_second, tags)) + + if exact_match or exact_match == 0: + metrics_to_send_to_dd.append(QueryingExactMatchMetric(exact_match, tags)) + + if f1_score or f1_score == 0: + metrics_to_send_to_dd.append(QueryingF1Metric(f1_score, tags)) + + if recall or recall == 0: + metrics_to_send_to_dd.append(QueryingRecallMetric(recall, tags)) + + if seconds_per_query: + metrics_to_send_to_dd.append(QueryingSecondsPerQueryMetric(seconds_per_query, tags)) + + if map_query or map_query == 0: + metrics_to_send_to_dd.append(QueryingMapMetric(map_query, tags)) + + return metrics_to_send_to_dd + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("folder_path", type=str, help="Path to the folder with benchmark results") + parser.add_argument("datadog_api_key", type=str, help="Datadog API key") + parser.add_argument("datadog_api_host", type=str, help="Datadog API host") + args = parser.parse_args() + + folder_path = args.folder_path + datadog_api_key = args.datadog_api_key + datadog_api_host = args.datadog_api_host + + metrics_to_send_to_dd = collect_metrics_from_json_files(folder_path) + api = MetricsAPI(datadog_api_key=datadog_api_key, datadog_host=datadog_api_host) + api.send_custom_dd_metrics(metrics_to_send_to_dd) diff --git a/test/benchmarks/reader.py b/test/benchmarks/reader.py index 2bcbbed720..a2876123ab 100644 --- a/test/benchmarks/reader.py +++ b/test/benchmarks/reader.py @@ -34,7 +34,7 @@ def benchmark_reader(pipeline: Pipeline, labels_file: Path) -> Dict: reader_type, reader_model, reader_top_k = get_reader_config(pipeline) results = { - "reader": { + "querying": { "exact_match": metrics["exact_match"], "f1": metrics["f1"], "n_queries": len(eval_labels), diff --git a/test/benchmarks/run.py b/test/benchmarks/run.py index 1c20627993..6bd9d55911 100644 --- a/test/benchmarks/run.py +++ b/test/benchmarks/run.py @@ -4,7 +4,6 @@ import json from haystack import Pipeline -from haystack.nodes import BaseRetriever, BaseReader from haystack.pipelines.config import read_pipeline_config_from_yaml from utils import prepare_environment, contains_reader, contains_retriever @@ -72,7 +71,6 @@ def run_benchmark(pipeline_yaml: Path) -> Dict: config_file = Path(args.config) output_file = f"{config_file.stem}_results.json" if args.output is None else args.output - results = run_benchmark(config_file) with open(output_file, "w") as f: json.dump(results, f, indent=2) diff --git a/test/benchmarks/utils.py b/test/benchmarks/utils.py index 4fc6674373..04a4ebb29e 100644 --- a/test/benchmarks/utils.py +++ b/test/benchmarks/utils.py @@ -24,7 +24,6 @@ def prepare_environment(pipeline_config: Dict, benchmark_config: Dict): # Download data if specified in benchmark config if "data_url" in benchmark_config: download_from_url(url=benchmark_config["data_url"], target_dir="data/") - n_docs = 0 if "documents_directory" in benchmark_config: documents_dir = Path(benchmark_config["documents_directory"]) @@ -56,18 +55,28 @@ def launch_document_store(document_store: str, n_docs: int = 0): launch_weaviate(sleep=30, delete_existing=True) -def download_from_url(url: str, target_dir: Union[str, Path]): +def file_previously_downloaded(url_path: Path, target_dir: Union[str, Path]) -> bool: + if ".tar" in url_path.suffixes: + return Path(target_dir, url_path.parent).exists() + return Path(target_dir, url_path.name).exists() + + +def download_from_url(url: str, target_dir: Union[str, Path]) -> None: """ Download from a URL to a local file. :param url: URL :param target_dir: Local directory where the URL content will be saved. """ + url_path = Path(url) + + if file_previously_downloaded(url_path, target_dir): + logger.info(f"Skipping download of {url}, as a previous copy exists") + return if not os.path.exists(target_dir): os.makedirs(target_dir) - url_path = Path(url) logger.info("Downloading %s to %s", url_path.name, target_dir) with tempfile.NamedTemporaryFile() as temp_file: http_get(url=url, temp_file=temp_file)