Skip to content

Commit

Permalink
feat: Validate parsed/transformed record against schema message (#1769)
Browse files Browse the repository at this point in the history
Co-authored-by: Edgar R. M <[email protected]>
  • Loading branch information
Ken Payne and edgarrmondragon authored Jul 5, 2023
1 parent efd9fb6 commit 77c3f90
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 2 deletions.
4 changes: 4 additions & 0 deletions singer_sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,7 @@ class ConformedNameClashException(Exception):
e.g. two columns conformed to the same name
"""


class MissingKeyPropertiesError(Exception):
"""Raised when a recieved (and/or transformed) record is missing key properties."""
20 changes: 20 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dateutil import parser
from jsonschema import Draft7Validator, FormatChecker

from singer_sdk.exceptions import MissingKeyPropertiesError
from singer_sdk.helpers._batch import (
BaseBatchFileEncoding,
BatchConfig,
Expand Down Expand Up @@ -321,6 +322,25 @@ def _validate_and_parse(self, record: dict) -> dict:
)
return record

def _singer_validate_message(self, record: dict) -> None:
"""Ensure record conforms to Singer Spec.
Args:
record: Record (after parsing, schema validations and transformations).
Raises:
MissingKeyPropertiesError: If record is missing one or more key properties.
"""
if not all(key_property in record for key_property in self.key_properties):
msg = (
f"Record is missing one or more key_properties. \n"
f"Key Properties: {self.key_properties}, "
f"Record Keys: {list(record.keys())}"
)
raise MissingKeyPropertiesError(
msg,
)

def _parse_timestamps_in_record(
self,
record: dict,
Expand Down
3 changes: 2 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,10 @@ def _process_record_message(self, message_dict: dict) -> None:
sink._remove_sdc_metadata_from_record(transformed_record)

sink._validate_and_parse(transformed_record)
transformed_record = sink.preprocess_record(transformed_record, context)
sink._singer_validate_message(transformed_record)

sink.tally_record_read()
transformed_record = sink.preprocess_record(transformed_record, context)
sink.process_record(transformed_record, context)
sink._after_process_record(context)

Expand Down
10 changes: 9 additions & 1 deletion singer_sdk/testing/target_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

import pytest

from singer_sdk.exceptions import RecordsWithoutSchemaException
from singer_sdk.exceptions import (
MissingKeyPropertiesError,
RecordsWithoutSchemaException,
)

from .templates import TargetFileTestTemplate, TargetTestTemplate

Expand Down Expand Up @@ -108,6 +111,11 @@ class TargetRecordMissingKeyProperty(TargetFileTestTemplate):

name = "record_missing_key_property"

def test(self) -> None:
"""Run test."""
with pytest.raises(MissingKeyPropertiesError):
super().test()


class TargetRecordMissingRequiredProperty(TargetFileTestTemplate):
"""Test Target handles record missing required property."""
Expand Down

0 comments on commit 77c3f90

Please sign in to comment.