diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index 2c5698e29..3ccec3738 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -6,10 +6,12 @@ import decimal import json import logging +import os import sys import typing as t from collections import Counter, defaultdict +from singer_sdk import metrics from singer_sdk._singerlib.messages import Message, SingerMessageType from singer_sdk._singerlib.messages import format_message as singer_format_message from singer_sdk._singerlib.messages import write_message as singer_write_message @@ -33,8 +35,14 @@ def listen(self, file_input: t.IO[str] | None = None) -> None: if not file_input: file_input = sys.stdin - self._process_lines(file_input) - self._process_endofpipe() + load_timer = metrics.Timer( + metrics.Metric.SYNC_DURATION, + {"pid": os.getpid()}, + ) + + with load_timer: + self._process_lines(file_input) + self._process_endofpipe() @staticmethod def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: @@ -83,30 +91,37 @@ def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]: A counter object for the processed lines. """ stats: dict[str, int] = defaultdict(int) - for line in file_input: - line_dict = self.deserialize_json(line) - self._assert_line_requires(line_dict, requires={"type"}) + record_message_counter = metrics.Counter( + metrics.Metric.MESSAGE_COUNT, + {"pid": os.getpid()}, + log_interval=metrics.DEFAULT_LOG_INTERVAL, + ) + with record_message_counter: + for line in file_input: + line_dict = self.deserialize_json(line) + self._assert_line_requires(line_dict, requires={"type"}) - record_type: SingerMessageType = line_dict["type"] - if record_type == SingerMessageType.SCHEMA: - self._process_schema_message(line_dict) + record_type: SingerMessageType = line_dict["type"] + if record_type == SingerMessageType.SCHEMA: + self._process_schema_message(line_dict) - elif record_type == SingerMessageType.RECORD: - self._process_record_message(line_dict) + elif record_type == SingerMessageType.RECORD: + record_message_counter.increment() + self._process_record_message(line_dict) - elif record_type == SingerMessageType.ACTIVATE_VERSION: - self._process_activate_version_message(line_dict) + elif record_type == SingerMessageType.ACTIVATE_VERSION: + self._process_activate_version_message(line_dict) - elif record_type == SingerMessageType.STATE: - self._process_state_message(line_dict) + elif record_type == SingerMessageType.STATE: + self._process_state_message(line_dict) - elif record_type == SingerMessageType.BATCH: - self._process_batch_message(line_dict) + elif record_type == SingerMessageType.BATCH: + self._process_batch_message(line_dict) - else: - self._process_unknown_message(line_dict) + else: + self._process_unknown_message(line_dict) # pragma: no cover - stats[record_type] += 1 + stats[record_type] += 1 return Counter(**stats) diff --git a/singer_sdk/metrics.py b/singer_sdk/metrics.py index 990285ae0..9bb5dd759 100644 --- a/singer_sdk/metrics.py +++ b/singer_sdk/metrics.py @@ -45,6 +45,7 @@ class Tag(str, enum.Enum): JOB_TYPE = "job_type" HTTP_STATUS_CODE = "http_status_code" STATUS = "status" + PID = "pid" class Metric(str, enum.Enum): @@ -56,6 +57,7 @@ class Metric(str, enum.Enum): HTTP_REQUEST_COUNT = "http_request_count" JOB_DURATION = "job_duration" SYNC_DURATION = "sync_duration" + MESSAGE_COUNT = "message_count" @dataclass @@ -180,6 +182,10 @@ def __init__( self.log_interval = log_interval self.last_log_time = time() + def exit(self) -> None: + """Exit the counter context.""" + self._pop() + def __enter__(self) -> Counter: """Enter the counter context. @@ -202,7 +208,7 @@ def __exit__( exc_val: The exception value. exc_tb: The exception traceback. """ - self._pop() + self.exit() def _pop(self) -> None: """Log and reset the counter.""" diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index a2f54c8ca..c4787ce56 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -7,6 +7,7 @@ import datetime import importlib.util import json +import os import time import typing as t from functools import cached_property @@ -17,6 +18,7 @@ import jsonschema from typing_extensions import override +from singer_sdk import metrics from singer_sdk.exceptions import ( InvalidJSONSchema, InvalidRecord, @@ -188,6 +190,23 @@ def __init__( ) self._validator: BaseJSONSchemaValidator | None = self.get_validator() + self._record_counter: metrics.Counter = metrics.Counter( + metrics.Metric.RECORD_COUNT, + { + metrics.Tag.STREAM: stream_name, + metrics.Tag.PID: os.getpid(), + }, + log_interval=metrics.DEFAULT_LOG_INTERVAL, + ) + + @property + def record_counter_metric(self) -> metrics.Counter: + """Get the record counter metric. + + Returns: + The record counter metric. + """ + return self._record_counter @cached_property def validate_schema(self) -> bool: @@ -680,6 +699,7 @@ def clean_up(self) -> None: should not be relied on, it's recommended to use a uuid as well. """ self.logger.info("Cleaning up %s", self.stream_name) + self.record_counter_metric.exit() def process_batch_files( self, diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index d560a555e..8e5622c94 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -360,6 +360,7 @@ def _process_record_message(self, message_dict: dict) -> None: sink.tally_record_read() sink.process_record(transformed_record, context) + sink.record_counter_metric.increment() sink._after_process_record(context) # noqa: SLF001 if sink.is_full: