Skip to content

Commit

Permalink
test: Add scripts to send benchmark results to datadog (#5432)
Browse files Browse the repository at this point in the history
* Add config files

* log benchmarks to stdout

* Add top-k and batch size to configs

* Add batch size to configs

* fix: don't download files if they already exist

* Add batch size to configs

* refine script

* Remove configs using 1m docs

* update run script

* update run script

* update run script

* datadog integration

* remove out folder

* gitignore benchmarks output

* test: send benchmarks to datadog

* remove uncommented lines in script

* feat: take branch/tag argument for benchmark setup script

* fix: run.sh should ignore errors

* Remove changes unrelated to datadog

* Apply black

* Update test/benchmarks/utils.py

Co-authored-by: Silvano Cerza <[email protected]>

* PR feedback

* Account for reader benchmarks not doing indexing

* Change key of reader metrics

* Apply PR feedback

* Remove whitespace

---------

Co-authored-by: rjanjua <[email protected]>
Co-authored-by: Silvano Cerza <[email protected]>
  • Loading branch information
3 people authored Aug 3, 2023
1 parent a26859f commit 56cea8c
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 6 deletions.
153 changes: 153 additions & 0 deletions test/benchmarks/datadog/metric_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
from enum import Enum
from itertools import chain
from time import time
from typing import Dict, List, Optional

import datadog
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
import logging

LOGGER = logging.getLogger(__name__)


class Tag(Enum):
@classmethod
def values(cls):
return [e.value for e in cls]


class NoneTag(Tag):
none = "none_none_none_none-1234" # should not match any other tag


class DatasetSizeTags(Tag):
size_100k = "dataset_size:100k"


class ReaderModelTags(Tag):
debertabase = "reader:debertabase"
debertalarge = "reader:debertalarge"
tinyroberta = "reader:tinyroberta"


class RetrieverModelTags(Tag):
bm25 = "retriever:bm25"
minilm = "retriever:minilm"
mpnetbase = "retriever:mpnetbase"


class DocumentStoreModelTags(Tag):
opensearch = "documentstore:opensearch"
elasticsearch = "documentstore:elasticsearch"
weaviate = "documentstore:weaviate"


class BenchmarkType(Tag):
retriever = "benchmark_type:retriever"
retriever_reader = "benchmark_type:retriever_reader"
reader = "benchmark_type:reader"


class CustomDatadogMetric:
name: str
timestamp: float
value: float
tags: List[Tag]

def __init__(self, name: str, value: float, tags: Optional[List[Tag]] = None) -> None:
self.timestamp = time()
self.name = name
self.value = value
self.tags = self.validate_tags(tags) if tags is not None else []

def validate_tags(self, tags: List[Tag]) -> List[Tag]:
valid_tags: List[Tag] = []
for tag in tags:
if isinstance(
tag, (DatasetSizeTags, ReaderModelTags, RetrieverModelTags, DocumentStoreModelTags, BenchmarkType)
):
valid_tags.append(tag)
elif tag != NoneTag.none:
# Log invalid tags as errors
LOGGER.error(f"Tag is not a valid dataset or environment tag: tag={tag}")

return valid_tags


class IndexingDocsPerSecond(CustomDatadogMetric):
def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None:
name = "haystack.benchmarks.indexing.docs_per_second"
super().__init__(name=name, value=value, tags=tags)


class QueryingExactMatchMetric(CustomDatadogMetric):
def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None:
name = "haystack.benchmarks.querying.exact_match"
super().__init__(name=name, value=value, tags=tags)


class QueryingF1Metric(CustomDatadogMetric):
def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None:
name = "haystack.benchmarks.querying.f1_score"
super().__init__(name=name, value=value, tags=tags)


class QueryingRecallMetric(CustomDatadogMetric):
def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None:
name = "haystack.benchmarks.querying.recall"
super().__init__(name=name, value=value, tags=tags)


class QueryingMapMetric(CustomDatadogMetric):
def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None:
name = "haystack.benchmarks.querying.map"
super().__init__(name=name, value=value, tags=tags)


class QueryingSecondsPerQueryMetric(CustomDatadogMetric):
def __init__(self, value: float, tags: Optional[List[Tag]] = None) -> None:
name = "haystack.benchmarks.querying.seconds_per_query"
super().__init__(name=name, value=value, tags=tags)


class MetricsAPI:
def __init__(self, datadog_api_key: str, datadog_host: str):
self.datadog_api_key = datadog_api_key
self.datadog_host = datadog_host

@retry(retry=retry_if_exception_type(ConnectionError), wait=wait_fixed(5), stop=stop_after_attempt(3), reraise=True)
def send_custom_dd_metric(self, metric: CustomDatadogMetric) -> dict:
datadog.initialize(api_key=self.datadog_api_key, host_name=self.datadog_host)

tags: List[str] = list(map(lambda t: str(t.value), metric.tags))
post_metric_response: Dict = datadog.api.Metric.send(
metric=metric.name, points=[metric.timestamp, metric.value], tags=tags
)

if post_metric_response.get("status") != "ok":
LOGGER.error(
f"Could not send custom metric. Retrying. metric_name={metric.name}, metric_value={metric.value}, "
f"status={post_metric_response.get('status')}, error={post_metric_response.get('errors')}, "
f"{post_metric_response}"
)
raise ConnectionError(f"Could not send custom metric. {post_metric_response}")
else:
LOGGER.info(
f"Sent custom metric. metric_name={metric.name}, metric_value={metric.value}, "
f"status={post_metric_response.get('status')}"
)

return post_metric_response

def send_custom_dd_metrics(self, metrics: List[CustomDatadogMetric]) -> List[Dict]:
responses = []
for metric in metrics:
try:
response = self.send_custom_dd_metric(metric)
responses.append(response)
except ConnectionError as e:
LOGGER.error(
f"Could not send custom metric even after retrying. "
f"metric_name={metric.name}, metric_value={metric.value}"
)
return responses
1 change: 1 addition & 0 deletions test/benchmarks/datadog/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
datadog==0.45.0
165 changes: 165 additions & 0 deletions test/benchmarks/datadog/send_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import argparse
import os
import json
from typing import Dict

from metric_handler import (
ReaderModelTags,
NoneTag,
RetrieverModelTags,
DocumentStoreModelTags,
BenchmarkType,
LOGGER,
DatasetSizeTags,
IndexingDocsPerSecond,
QueryingExactMatchMetric,
QueryingF1Metric,
QueryingRecallMetric,
QueryingSecondsPerQueryMetric,
QueryingMapMetric,
MetricsAPI,
Tag,
)


def parse_benchmark_files(folder_path: str) -> Dict:
metrics = {}
for filename in os.listdir(folder_path):
if filename.endswith(".json"):
file_path = os.path.join(folder_path, filename)
with open(file_path, "r") as file:
data = json.load(file)
indexing_metrics = data.get("indexing", {})
querying_metrics = data.get("querying")
config = data.get("config")
if indexing_metrics.get("error") is None and querying_metrics.get("error") is None:
metrics[filename.split(".json")[0]] = {
"indexing": indexing_metrics,
"querying": querying_metrics,
"config": config,
}
return metrics


def get_reader_tag(config: Dict) -> Tag:
for comp in config["components"]:
if comp["name"] == "Reader":
model = comp["params"]["model_name_or_path"]

if model == "deepset/tinyroberta-squad2":
return ReaderModelTags.tinyroberta

if model == "deepset/deberta-v3-base-squad2":
return ReaderModelTags.debertabase

if model == "deepset/deberta-v3-large-squad2":
return ReaderModelTags.debertalarge

return NoneTag.none


def get_retriever_tag(config: Dict) -> Tag:
for comp in config["components"]:
if comp["name"] == "Retriever":
if comp["type"] == "BM25Retriever":
return RetrieverModelTags.bm25

model = comp["params"]["embedding_model"]
if "minilm" in model:
return RetrieverModelTags.minilm

if "mpnet-base" in model:
return RetrieverModelTags.mpnetbase

return NoneTag.none


def get_documentstore_tag(config: Dict) -> Tag:
for comp in config["components"]:
if comp["name"] == "DocumentStore":
if comp["type"] == "ElasticsearchDocumentStore":
return DocumentStoreModelTags.elasticsearch

if comp["type"] == "WeaviateDocumentStore":
return DocumentStoreModelTags.weaviate

if comp["type"] == "OpenSearchDocumentStore":
return DocumentStoreModelTags.opensearch

return NoneTag.none


def get_benchmark_type_tag(reader_tag, retriever_tag, document_store_tag):
if reader_tag != NoneTag.none and retriever_tag != NoneTag.none and document_store_tag != NoneTag.none:
return BenchmarkType.retriever_reader
elif retriever_tag != NoneTag.none and document_store_tag != NoneTag.none:
return BenchmarkType.retriever
elif reader_tag != NoneTag.none and retriever_tag == NoneTag.none:
return BenchmarkType.reader

LOGGER.warn(
f"Did not find benchmark_type for the combination of tags, retriever={retriever_tag}, reader={reader_tag}, "
f"document_store={document_store_tag}"
)
return NoneTag.none


def collect_metrics_from_json_files(folder_path):
benchmark_metrics = parse_benchmark_files(folder_path)
metrics_to_send_to_dd = []
for benchmark_name, metrics in benchmark_metrics.items():
indexing_metrics = metrics["indexing"]
querying_metrics = metrics["querying"]
config = metrics["config"]

docs_per_second = indexing_metrics.get("docs_per_second")

exact_match = querying_metrics.get("exact_match")
f1_score = querying_metrics.get("f1")
recall = querying_metrics.get("recall")
seconds_per_query = querying_metrics.get("seconds_per_query")
map_query = querying_metrics.get("map")

size_tag = DatasetSizeTags.size_100k
reader_tag = get_reader_tag(config)
retriever_tag = get_retriever_tag(config)
document_store_tag = get_documentstore_tag(config)
benchmark_type_tag = get_benchmark_type_tag(reader_tag, retriever_tag, document_store_tag)

tags = [size_tag, reader_tag, retriever_tag, document_store_tag, benchmark_type_tag]

if docs_per_second:
metrics_to_send_to_dd.append(IndexingDocsPerSecond(docs_per_second, tags))

if exact_match or exact_match == 0:
metrics_to_send_to_dd.append(QueryingExactMatchMetric(exact_match, tags))

if f1_score or f1_score == 0:
metrics_to_send_to_dd.append(QueryingF1Metric(f1_score, tags))

if recall or recall == 0:
metrics_to_send_to_dd.append(QueryingRecallMetric(recall, tags))

if seconds_per_query:
metrics_to_send_to_dd.append(QueryingSecondsPerQueryMetric(seconds_per_query, tags))

if map_query or map_query == 0:
metrics_to_send_to_dd.append(QueryingMapMetric(map_query, tags))

return metrics_to_send_to_dd


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("folder_path", type=str, help="Path to the folder with benchmark results")
parser.add_argument("datadog_api_key", type=str, help="Datadog API key")
parser.add_argument("datadog_api_host", type=str, help="Datadog API host")
args = parser.parse_args()

folder_path = args.folder_path
datadog_api_key = args.datadog_api_key
datadog_api_host = args.datadog_api_host

metrics_to_send_to_dd = collect_metrics_from_json_files(folder_path)
api = MetricsAPI(datadog_api_key=datadog_api_key, datadog_host=datadog_api_host)
api.send_custom_dd_metrics(metrics_to_send_to_dd)
2 changes: 1 addition & 1 deletion test/benchmarks/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def benchmark_reader(pipeline: Pipeline, labels_file: Path) -> Dict:

reader_type, reader_model, reader_top_k = get_reader_config(pipeline)
results = {
"reader": {
"querying": {
"exact_match": metrics["exact_match"],
"f1": metrics["f1"],
"n_queries": len(eval_labels),
Expand Down
2 changes: 0 additions & 2 deletions test/benchmarks/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import json

from haystack import Pipeline
from haystack.nodes import BaseRetriever, BaseReader
from haystack.pipelines.config import read_pipeline_config_from_yaml

from utils import prepare_environment, contains_reader, contains_retriever
Expand Down Expand Up @@ -72,7 +71,6 @@ def run_benchmark(pipeline_yaml: Path) -> Dict:

config_file = Path(args.config)
output_file = f"{config_file.stem}_results.json" if args.output is None else args.output

results = run_benchmark(config_file)
with open(output_file, "w") as f:
json.dump(results, f, indent=2)
15 changes: 12 additions & 3 deletions test/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def prepare_environment(pipeline_config: Dict, benchmark_config: Dict):
# Download data if specified in benchmark config
if "data_url" in benchmark_config:
download_from_url(url=benchmark_config["data_url"], target_dir="data/")

n_docs = 0
if "documents_directory" in benchmark_config:
documents_dir = Path(benchmark_config["documents_directory"])
Expand Down Expand Up @@ -56,18 +55,28 @@ def launch_document_store(document_store: str, n_docs: int = 0):
launch_weaviate(sleep=30, delete_existing=True)


def download_from_url(url: str, target_dir: Union[str, Path]):
def file_previously_downloaded(url_path: Path, target_dir: Union[str, Path]) -> bool:
if ".tar" in url_path.suffixes:
return Path(target_dir, url_path.parent).exists()
return Path(target_dir, url_path.name).exists()


def download_from_url(url: str, target_dir: Union[str, Path]) -> None:
"""
Download from a URL to a local file.
:param url: URL
:param target_dir: Local directory where the URL content will be saved.
"""
url_path = Path(url)

if file_previously_downloaded(url_path, target_dir):
logger.info(f"Skipping download of {url}, as a previous copy exists")
return

if not os.path.exists(target_dir):
os.makedirs(target_dir)

url_path = Path(url)
logger.info("Downloading %s to %s", url_path.name, target_dir)
with tempfile.NamedTemporaryFile() as temp_file:
http_get(url=url, temp_file=temp_file)
Expand Down

0 comments on commit 56cea8c

Please sign in to comment.