Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Emit target metrics #2486

Merged
merged 10 commits into from
Aug 13, 2024
12 changes: 9 additions & 3 deletions docs/implementation/metrics.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# Tap Metrics
# Tap and Target Metrics

Metrics logging is specified in the
[Singer Spec](https://hub.meltano.com/singer/spec#metrics). The SDK will automatically
emit metrics for `record_count`, `http_request_duration` and `sync_duration`.
[Singer Spec](https://hub.meltano.com/singer/spec#metrics).

The SDK will automatically emit the following metrics:

- `record_count`: The number of records processed by the tap or target.
- `http_request_duration`: The duration of HTTP requests made by the tap.
- `sync_duration`: The duration of the sync operation.
- `batch_processing_time`: The duration of processing a batch of records.

## Customization options

Expand Down
9 changes: 8 additions & 1 deletion singer_sdk/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Tag(str, enum.Enum):
JOB_TYPE = "job_type"
HTTP_STATUS_CODE = "http_status_code"
STATUS = "status"
PID = "pid"


class Metric(str, enum.Enum):
Expand All @@ -58,6 +59,7 @@ class Metric(str, enum.Enum):
HTTP_REQUEST_COUNT = "http_request_count"
JOB_DURATION = "job_duration"
SYNC_DURATION = "sync_duration"
BATCH_PROCESSING_TIME = "batch_processing_time"


@dataclass
Expand Down Expand Up @@ -116,6 +118,7 @@ def __init__(self, metric: Metric, tags: dict | None = None) -> None:
"""
self.metric = metric
self.tags = tags or {}
self.tags[Tag.PID] = os.getpid()
self.logger = get_metrics_logger()

@property
Expand Down Expand Up @@ -182,6 +185,10 @@ def __init__(
self.log_interval = log_interval
self.last_log_time = time()

def exit(self) -> None:
"""Exit the counter context."""
self._pop()

def __enter__(self) -> Counter:
"""Enter the counter context.

Expand All @@ -204,7 +211,7 @@ def __exit__(
exc_val: The exception value.
exc_tb: The exception traceback.
"""
self._pop()
self.exit()

def _pop(self) -> None:
"""Log and reset the counter."""
Expand Down
27 changes: 27 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import jsonschema.validators
from typing_extensions import override

from singer_sdk import metrics
from singer_sdk._singerlib.json import deserialize_json
from singer_sdk.exceptions import (
InvalidJSONSchema,
Expand Down Expand Up @@ -193,6 +194,31 @@ def __init__(
)

self._validator: BaseJSONSchemaValidator | None = self.get_validator()
self._record_counter: metrics.Counter = metrics.record_counter(self.stream_name)
self._batch_timer = metrics.Timer(
metrics.Metric.BATCH_PROCESSING_TIME,
tags={
metrics.Tag.STREAM: self.stream_name,
},
)

@property
def record_counter_metric(self) -> metrics.Counter:
"""Get the record counter for this sink.

Returns:
The Meter instance for the record counter.
"""
return self._record_counter

@property
def batch_processing_timer(self) -> metrics.Timer:
"""Get the batch processing timer for this sink.

Returns:
The Meter instance for the batch processing timer.
"""
return self._batch_timer

@cached_property
def validate_schema(self) -> bool:
Expand Down Expand Up @@ -685,6 +711,7 @@ def clean_up(self) -> None:
should not be relied on, it's recommended to use a uuid as well.
"""
self.logger.info("Cleaning up %s", self.stream_name)
self.record_counter_metric.exit()

def process_batch_files(
self,
Expand Down
4 changes: 3 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ def _process_record_message(self, message_dict: dict) -> None:

sink.tally_record_read()
sink.process_record(transformed_record, context)
sink.record_counter_metric.increment()
sink._after_process_record(context) # noqa: SLF001

if sink.is_full:
Expand Down Expand Up @@ -510,7 +511,8 @@ def drain_one(self, sink: Sink) -> None: # noqa: PLR6301
return

draining_status = sink.start_drain()
sink.process_batch(draining_status)
with sink.batch_processing_timer:
sink.process_batch(draining_status)
sink.mark_drained()

def _drain_all(self, sink_list: list[Sink], parallelism: int) -> None:
Expand Down
14 changes: 12 additions & 2 deletions tests/core/test_metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import os

import pytest
import time_machine
Expand All @@ -18,6 +19,8 @@ def __str__(self) -> str:


def test_meter():
pid = os.getpid()

class _MyMeter(metrics.Meter):
def __enter__(self):
return self
Expand All @@ -27,18 +30,22 @@ def __exit__(self, exc_type, exc_val, exc_tb):

meter = _MyMeter(metrics.Metric.RECORD_COUNT)

assert meter.tags == {}
assert meter.tags == {metrics.Tag.PID: pid}

stream_context = {"parent_id": 1}
meter.context = stream_context
assert meter.tags == {metrics.Tag.CONTEXT: stream_context}
assert meter.tags == {
metrics.Tag.CONTEXT: stream_context,
metrics.Tag.PID: pid,
}

meter.context = None
assert metrics.Tag.CONTEXT not in meter.tags


def test_record_counter(caplog: pytest.LogCaptureFixture):
caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME)
pid = os.getpid()
custom_object = CustomObject("test", 1)

with metrics.record_counter(
Expand Down Expand Up @@ -68,6 +75,7 @@ def test_record_counter(caplog: pytest.LogCaptureFixture):
assert point.tags == {
metrics.Tag.STREAM: "test_stream",
metrics.Tag.ENDPOINT: "test_endpoint",
metrics.Tag.PID: pid,
"custom_tag": "pytest",
"custom_obj": custom_object,
}
Expand All @@ -79,6 +87,7 @@ def test_record_counter(caplog: pytest.LogCaptureFixture):

def test_sync_timer(caplog: pytest.LogCaptureFixture):
caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME)
pid = os.getpid()
traveler = time_machine.travel(0, tick=False)
traveler.start()

Expand All @@ -100,6 +109,7 @@ def test_sync_timer(caplog: pytest.LogCaptureFixture):
assert point.tags == {
metrics.Tag.STREAM: "test_stream",
metrics.Tag.STATUS: "succeeded",
metrics.Tag.PID: pid,
"custom_tag": "pytest",
}

Expand Down
Loading