Skip to content

Commit

Permalink
feat: Emit target metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Jun 18, 2024
1 parent 6334091 commit 94a642b
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 20 deletions.
53 changes: 34 additions & 19 deletions singer_sdk/io_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import decimal
import json
import logging
import os
import sys
import typing as t
from collections import Counter, defaultdict

from singer_sdk import metrics
from singer_sdk._singerlib.messages import Message, SingerMessageType
from singer_sdk._singerlib.messages import format_message as singer_format_message
from singer_sdk._singerlib.messages import write_message as singer_write_message
Expand All @@ -33,8 +35,14 @@ def listen(self, file_input: t.IO[str] | None = None) -> None:
if not file_input:
file_input = sys.stdin

self._process_lines(file_input)
self._process_endofpipe()
load_timer = metrics.Timer(
metrics.Metric.SYNC_DURATION,
{"pid": os.getpid()},
)

with load_timer:
self._process_lines(file_input)
self._process_endofpipe()

@staticmethod
def _assert_line_requires(line_dict: dict, requires: set[str]) -> None:
Expand Down Expand Up @@ -83,30 +91,37 @@ def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]:
A counter object for the processed lines.
"""
stats: dict[str, int] = defaultdict(int)
for line in file_input:
line_dict = self.deserialize_json(line)
self._assert_line_requires(line_dict, requires={"type"})
record_message_counter = metrics.Counter(
metrics.Metric.MESSAGE_COUNT,
{"pid": os.getpid()},
log_interval=metrics.DEFAULT_LOG_INTERVAL,
)
with record_message_counter:
for line in file_input:
line_dict = self.deserialize_json(line)
self._assert_line_requires(line_dict, requires={"type"})

record_type: SingerMessageType = line_dict["type"]
if record_type == SingerMessageType.SCHEMA:
self._process_schema_message(line_dict)
record_type: SingerMessageType = line_dict["type"]
if record_type == SingerMessageType.SCHEMA:
self._process_schema_message(line_dict)

elif record_type == SingerMessageType.RECORD:
self._process_record_message(line_dict)
elif record_type == SingerMessageType.RECORD:
record_message_counter.increment()
self._process_record_message(line_dict)

elif record_type == SingerMessageType.ACTIVATE_VERSION:
self._process_activate_version_message(line_dict)
elif record_type == SingerMessageType.ACTIVATE_VERSION:
self._process_activate_version_message(line_dict)

elif record_type == SingerMessageType.STATE:
self._process_state_message(line_dict)
elif record_type == SingerMessageType.STATE:
self._process_state_message(line_dict)

elif record_type == SingerMessageType.BATCH:
self._process_batch_message(line_dict)
elif record_type == SingerMessageType.BATCH:
self._process_batch_message(line_dict)

else:
self._process_unknown_message(line_dict)
else:
self._process_unknown_message(line_dict)

Check warning on line 122 in singer_sdk/io_base.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/io_base.py#L122

Added line #L122 was not covered by tests

stats[record_type] += 1
stats[record_type] += 1

return Counter(**stats)

Expand Down
8 changes: 7 additions & 1 deletion singer_sdk/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Tag(str, enum.Enum):
JOB_TYPE = "job_type"
HTTP_STATUS_CODE = "http_status_code"
STATUS = "status"
PID = "pid"


class Metric(str, enum.Enum):
Expand All @@ -56,6 +57,7 @@ class Metric(str, enum.Enum):
HTTP_REQUEST_COUNT = "http_request_count"
JOB_DURATION = "job_duration"
SYNC_DURATION = "sync_duration"
MESSAGE_COUNT = "message_count"


@dataclass
Expand Down Expand Up @@ -180,6 +182,10 @@ def __init__(
self.log_interval = log_interval
self.last_log_time = time()

def exit(self) -> None:
"""Exit the counter context."""
self._pop()

def __enter__(self) -> Counter:
"""Enter the counter context.
Expand All @@ -202,7 +208,7 @@ def __exit__(
exc_val: The exception value.
exc_tb: The exception traceback.
"""
self._pop()
self.exit()

def _pop(self) -> None:
"""Log and reset the counter."""
Expand Down
20 changes: 20 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import datetime
import importlib.util
import json
import os
import time
import typing as t
from functools import cached_property
Expand All @@ -17,6 +18,7 @@
import jsonschema
from typing_extensions import override

from singer_sdk import metrics
from singer_sdk.exceptions import (
InvalidJSONSchema,
InvalidRecord,
Expand Down Expand Up @@ -188,6 +190,23 @@ def __init__(
)

self._validator: BaseJSONSchemaValidator | None = self.get_validator()
self._record_counter: metrics.Counter = metrics.Counter(
metrics.Metric.RECORD_COUNT,
{
metrics.Tag.STREAM: stream_name,
metrics.Tag.PID: os.getpid(),
},
log_interval=metrics.DEFAULT_LOG_INTERVAL,
)

@property
def record_counter_metric(self) -> metrics.Counter:
"""Get the record counter metric.
Returns:
The record counter metric.
"""
return self._record_counter

@cached_property
def validate_schema(self) -> bool:
Expand Down Expand Up @@ -680,6 +699,7 @@ def clean_up(self) -> None:
should not be relied on, it's recommended to use a uuid as well.
"""
self.logger.info("Cleaning up %s", self.stream_name)
self.record_counter_metric.exit()

def process_batch_files(
self,
Expand Down
1 change: 1 addition & 0 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ def _process_record_message(self, message_dict: dict) -> None:

sink.tally_record_read()
sink.process_record(transformed_record, context)
sink.record_counter_metric.increment()
sink._after_process_record(context) # noqa: SLF001

if sink.is_full:
Expand Down

0 comments on commit 94a642b

Please sign in to comment.