diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index 151cd1c833..b48d290a45 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -144,6 +144,14 @@ default=False, ), ).to_dict() +TARGET_VALIDATE_RECORDS_CONFIG = PropertiesList( + Property( + "validate_records", + BooleanType(), + description="Whether to validate the schema of the incoming streams.", + default=True, + ), +).to_dict() class TargetLoadMethods(str, Enum): @@ -355,3 +363,6 @@ class TargetCapabilities(CapabilitiesEnum): #: Allow setting the target schema. TARGET_SCHEMA = "target-schema" + + #: Validate the schema of the incoming records. + VALIDATE_RECORDS = "validate-records" diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index abf5f82cd7..7cff956726 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -9,6 +9,7 @@ import json import time import typing as t +from functools import cached_property from gzip import GzipFile from gzip import open as gzip_open from types import MappingProxyType @@ -131,9 +132,6 @@ class Sink(metaclass=abc.ABCMeta): MAX_SIZE_DEFAULT = 10000 - validate_schema = True - """Enable JSON schema record validation.""" - validate_field_string_format = False """Enable JSON schema format validation, for example `date-time` string fields.""" @@ -186,6 +184,15 @@ def __init__( self._validator: BaseJSONSchemaValidator | None = self.get_validator() + @cached_property + def validate_schema(self) -> bool: + """Enable JSON schema record validation. + + Returns: + True if JSON schema validation is enabled. + """ + return self.config.get("validate_records", True) + def get_validator(self) -> BaseJSONSchemaValidator | None: """Get a record validator for this sink. diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 4ab959ff70..5a78bf362f 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -21,6 +21,7 @@ TARGET_HARD_DELETE_CONFIG, TARGET_LOAD_METHOD_CONFIG, TARGET_SCHEMA_CONFIG, + TARGET_VALIDATE_RECORDS_CONFIG, CapabilitiesEnum, PluginCapabilities, TargetCapabilities, @@ -104,6 +105,7 @@ def capabilities(self) -> list[CapabilitiesEnum]: PluginCapabilities.ABOUT, PluginCapabilities.STREAM_MAPS, PluginCapabilities.FLATTENING, + TargetCapabilities.VALIDATE_RECORDS, ] @property @@ -614,6 +616,9 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: if PluginCapabilities.BATCH in capabilities: _merge_missing(BATCH_CONFIG, config_jsonschema) + if TargetCapabilities.VALIDATE_RECORDS in capabilities: + _merge_missing(TARGET_VALIDATE_RECORDS_CONFIG, config_jsonschema) + super().append_builtin_config(config_jsonschema)