From 7ea1422907ea7ee6ca396127a20a6a29fee776d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Fri, 6 Sep 2024 10:53:19 -0600 Subject: [PATCH] feat: Developers can now more easily override the mapping from SQL column type to JSON schema (#2618) * feat: (WIP) Let developers more easily override SQL column type to JSON schema mapping * Rename `SQLToJSONSchemaMap` -> `SQLToJSONSchema` * Remove annotation from example * Move class reference docs * Fix example in docs --- ...ger_sdk.connectors.sql.SQLToJSONSchema.rst | 8 ++ docs/conf.py | 4 + docs/guides/index.md | 1 + docs/guides/sql-tap.md | 54 +++++++++ docs/reference.rst | 9 ++ singer_sdk/connectors/sql.py | 110 +++++++++++++++++- singer_sdk/typing.py | 17 ++- tests/core/test_connector_sql.py | 85 +++++++++++++- tests/core/test_sql_typing.py | 4 +- 9 files changed, 283 insertions(+), 9 deletions(-) create mode 100644 docs/classes/singer_sdk.connectors.sql.SQLToJSONSchema.rst create mode 100644 docs/guides/sql-tap.md diff --git a/docs/classes/singer_sdk.connectors.sql.SQLToJSONSchema.rst b/docs/classes/singer_sdk.connectors.sql.SQLToJSONSchema.rst new file mode 100644 index 000000000..5f2acf693 --- /dev/null +++ b/docs/classes/singer_sdk.connectors.sql.SQLToJSONSchema.rst @@ -0,0 +1,8 @@ +singer_sdk.connectors.sql.SQLToJSONSchema +========================================= + +.. currentmodule:: singer_sdk.connectors.sql + +.. autoclass:: SQLToJSONSchema + :members: + :special-members: __init__, __call__ \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py index fec62280e..947bedeb6 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -158,6 +158,10 @@ "https://json-schema.org/understanding-json-schema/reference/%s", "%s", ), + "column_type": ( + "https://docs.sqlalchemy.org/en/20/core/type_basics.html#sqlalchemy.types.%s", + "%s", + ), } # -- Options for intersphinx ----------------------------------------------------------- diff --git a/docs/guides/index.md b/docs/guides/index.md index 268f27957..60a94e8d5 100644 --- a/docs/guides/index.md +++ b/docs/guides/index.md @@ -9,4 +9,5 @@ porting pagination-classes custom-clis config-schema +sql-tap ``` diff --git a/docs/guides/sql-tap.md b/docs/guides/sql-tap.md new file mode 100644 index 000000000..4609e2721 --- /dev/null +++ b/docs/guides/sql-tap.md @@ -0,0 +1,54 @@ +# Building SQL taps + +## Default type mapping + +The Singer SDK automatically handles the most common SQLAlchemy column types, using [`functools.singledispatchmethod`](inv:python:py:class:#functools.singledispatchmethod) to process each type. See the [`SQLToJSONSchema`](connectors.sql.SQLToJSONSchema) reference documentation for details. + +## Custom type mapping + +If the class above doesn't cover all the types supported by the SQLAlchemy dialect in your tap, you can subclass it and override or extend with a new method for the type you need to support: + +```python +import functools + +from sqlalchemy import Numeric +from singer_sdk import typing as th +from singer_sdk.connectors import SQLConnector +from singer_sdk.connectors.sql import SQLToJSONSchema + +from my_sqlalchemy_dialect import VectorType + + +class CustomSQLToJSONSchema(SQLToJSONSchema): + @SQLToJSONSchema.to_jsonschema.register + def custom_number_to_jsonschema(self, column_type: Numeric): + """Override the default mapping for NUMERIC columns. + + For example, a scale of 4 translates to a multipleOf 0.0001. + """ + return {"type": ["number"], "multipleOf": 10**-column_type.scale} + + @SQLToJSONSchema.to_jsonschema.register(VectorType) + def vector_to_json_schema(self, column_type): + """Custom vector to JSON schema.""" + return th.ArrayType(th.NumberType()).to_dict() +``` + +````{tip} +You can also use a type annotation to specify the type of the column when registering a new method: + +```python +@SQLToJSONSchema.to_jsonschema.register +def vector_to_json_schema(self, column_type: VectorType): + return th.ArrayType(th.NumberType()).to_dict() +``` +```` + +Then, you need to use your custom type mapping in your connector: + +```python +class MyConnector(SQLConnector): + @functools.cached_property + def type_mapping(self): + return CustomSQLToJSONSchema() +``` diff --git a/docs/reference.rst b/docs/reference.rst index b59bd6651..eeaf1b53a 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -143,3 +143,12 @@ Batch batch.BaseBatcher batch.JSONLinesBatcher + +Other +----- + +.. autosummary:: + :toctree: classes + :template: class.rst + + connectors.sql.SQLToJSONSchema diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index c79c6308f..53ba1f8dc 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -2,6 +2,7 @@ from __future__ import annotations +import functools import logging import sys import typing as t @@ -109,6 +110,83 @@ def prepare_part(self, part: str) -> str: # noqa: PLR6301 return part +class SQLToJSONSchema: + """SQLAlchemy to JSON Schema type mapping helper. + + This class provides a mapping from SQLAlchemy types to JSON Schema types. + """ + + @functools.singledispatchmethod + def to_jsonschema(self, column_type: sa.types.TypeEngine) -> dict: # noqa: ARG002, D102, PLR6301 + return th.StringType.type_dict # type: ignore[no-any-return] + + @to_jsonschema.register + def datetime_to_jsonschema(self, column_type: sa.types.DateTime) -> dict: # noqa: ARG002, PLR6301 + """Return a JSON Schema representation of a generic datetime type. + + Args: + column_type (:column_type:`DateTime`): The column type. + """ + return th.DateTimeType.type_dict # type: ignore[no-any-return] + + @to_jsonschema.register + def date_to_jsonschema(self, column_type: sa.types.Date) -> dict: # noqa: ARG002, PLR6301 + """Return a JSON Schema representation of a date type. + + Args: + column_type (:column_type:`Date`): The column type. + """ + return th.DateType.type_dict # type: ignore[no-any-return] + + @to_jsonschema.register + def time_to_jsonschema(self, column_type: sa.types.Time) -> dict: # noqa: ARG002, PLR6301 + """Return a JSON Schema representation of a time type. + + Args: + column_type (:column_type:`Time`): The column type. + """ + return th.TimeType.type_dict # type: ignore[no-any-return] + + @to_jsonschema.register + def integer_to_jsonschema(self, column_type: sa.types.Integer) -> dict: # noqa: ARG002, PLR6301 + """Return a JSON Schema representation of a an integer type. + + Args: + column_type (:column_type:`Integer`): The column type. + """ + return th.IntegerType.type_dict # type: ignore[no-any-return] + + @to_jsonschema.register + def float_to_jsonschema(self, column_type: sa.types.Numeric) -> dict: # noqa: ARG002, PLR6301 + """Return a JSON Schema representation of a generic number type. + + Args: + column_type (:column_type:`Numeric`): The column type. + """ + return th.NumberType.type_dict # type: ignore[no-any-return] + + @to_jsonschema.register + def string_to_jsonschema(self, column_type: sa.types.String) -> dict: # noqa: ARG002, PLR6301 + """Return a JSON Schema representation of a generic string type. + + Args: + column_type (:column_type:`String`): The column type. + """ + # TODO: Enable support for maxLength. + # if sa_type.length: + # return StringType(max_length=sa_type.length).type_dict # noqa: ERA001 + return th.StringType.type_dict # type: ignore[no-any-return] + + @to_jsonschema.register + def boolean_to_jsonschema(self, column_type: sa.types.Boolean) -> dict: # noqa: ARG002, PLR6301 + """Return a JSON Schema representation of a boolean type. + + Args: + column_type (:column_type:`Boolean`): The column type. + """ + return th.BooleanType.type_dict # type: ignore[no-any-return] + + class SQLConnector: # noqa: PLR0904 """Base class for SQLAlchemy-based connectors. @@ -162,6 +240,17 @@ def logger(self) -> logging.Logger: """ return logging.getLogger("sqlconnector") + @functools.cached_property + def type_mapping(self) -> SQLToJSONSchema: + """Return the type mapper object. + + Override this method to provide a custom mapping for your SQL dialect. + + Returns: + The type mapper object. + """ + return SQLToJSONSchema() + @contextmanager def _connect(self) -> t.Iterator[sa.engine.Connection]: with self._engine.connect().execution_options(stream_results=True) as conn: @@ -266,8 +355,8 @@ def get_sqlalchemy_url(self, config: dict[str, t.Any]) -> str: # noqa: PLR6301 return t.cast(str, config["sqlalchemy_url"]) - @staticmethod def to_jsonschema_type( + self, sql_type: ( str # noqa: ANN401 | sa.types.TypeEngine @@ -293,10 +382,25 @@ def to_jsonschema_type( Returns: The JSON Schema representation of the provided type. """ - if isinstance(sql_type, (str, sa.types.TypeEngine)): + if isinstance(sql_type, sa.types.TypeEngine): + return self.type_mapping.to_jsonschema(sql_type) + + if isinstance(sql_type, str): # pragma: no cover + warnings.warn( + "Passing string types to `to_jsonschema_type` is deprecated. " + "Please pass a SQLAlchemy type object instead.", + DeprecationWarning, + stacklevel=2, + ) return th.to_jsonschema_type(sql_type) - if isinstance(sql_type, type): + if isinstance(sql_type, type): # pragma: no cover + warnings.warn( + "Passing type classes to `to_jsonschema_type` is deprecated. " + "Please pass a SQLAlchemy type object instead.", + DeprecationWarning, + stacklevel=2, + ) if issubclass(sql_type, sa.types.TypeEngine): return th.to_jsonschema_type(sql_type) diff --git a/singer_sdk/typing.py b/singer_sdk/typing.py index 2910125f8..31788989b 100644 --- a/singer_sdk/typing.py +++ b/singer_sdk/typing.py @@ -53,6 +53,7 @@ from __future__ import annotations import json +import sys import typing as t import sqlalchemy as sa @@ -65,9 +66,13 @@ get_datelike_property_type, ) -if t.TYPE_CHECKING: - import sys +if sys.version_info < (3, 13): + from typing_extensions import deprecated +else: + from typing import deprecated # noqa: ICN003 # pragma: no cover + +if t.TYPE_CHECKING: from jsonschema.protocols import Validator if sys.version_info >= (3, 10): @@ -1086,6 +1091,10 @@ def __iter__(self) -> t.Iterator[Property]: return self.wrapped.values().__iter__() +@deprecated( + "Use `SQLToJSONSchema` instead.", + category=DeprecationWarning, +) def to_jsonschema_type( from_type: str | sa.types.TypeEngine | type[sa.types.TypeEngine], ) -> dict: @@ -1119,9 +1128,9 @@ def to_jsonschema_type( "bool": BooleanType.type_dict, "variant": StringType.type_dict, } - if isinstance(from_type, str): + if isinstance(from_type, str): # pragma: no cover type_name = from_type - elif isinstance(from_type, sa.types.TypeEngine): + elif isinstance(from_type, sa.types.TypeEngine): # pragma: no cover type_name = type(from_type).__name__ elif issubclass(from_type, sa.types.TypeEngine): type_name = from_type.__name__ diff --git a/tests/core/test_connector_sql.py b/tests/core/test_connector_sql.py index f37fb953b..5b866366a 100644 --- a/tests/core/test_connector_sql.py +++ b/tests/core/test_connector_sql.py @@ -11,7 +11,7 @@ from samples.sample_duckdb import DuckDBConnector from singer_sdk.connectors import SQLConnector -from singer_sdk.connectors.sql import FullyQualifiedName +from singer_sdk.connectors.sql import FullyQualifiedName, SQLToJSONSchema from singer_sdk.exceptions import ConfigValidationError if t.TYPE_CHECKING: @@ -22,6 +22,10 @@ def stringify(in_dict): return {k: str(v) for k, v in in_dict.items()} +class MyType(sa.types.TypeDecorator): + impl = sa.types.LargeBinary + + class TestConnectorSQL: # noqa: PLR0904 """Test the SQLConnector class.""" @@ -392,3 +396,82 @@ def prepare_part(self, part: str) -> str: def test_fully_qualified_name_empty_error(): with pytest.raises(ValueError, match="Could not generate fully qualified name"): FullyQualifiedName() + + +@pytest.mark.parametrize( + "sql_type, expected_jsonschema_type", + [ + pytest.param(sa.types.VARCHAR(), {"type": ["string"]}, id="varchar"), + pytest.param( + sa.types.VARCHAR(length=127), + {"type": ["string"], "maxLength": 127}, + marks=pytest.mark.xfail, + id="varchar-length", + ), + pytest.param(sa.types.TEXT(), {"type": ["string"]}, id="text"), + pytest.param(sa.types.INTEGER(), {"type": ["integer"]}, id="integer"), + pytest.param(sa.types.BOOLEAN(), {"type": ["boolean"]}, id="boolean"), + pytest.param(sa.types.DECIMAL(), {"type": ["number"]}, id="decimal"), + pytest.param(sa.types.FLOAT(), {"type": ["number"]}, id="float"), + pytest.param(sa.types.REAL(), {"type": ["number"]}, id="real"), + pytest.param(sa.types.NUMERIC(), {"type": ["number"]}, id="numeric"), + pytest.param( + sa.types.DATE(), + {"type": ["string"], "format": "date"}, + id="date", + ), + pytest.param( + sa.types.DATETIME(), + {"type": ["string"], "format": "date-time"}, + id="datetime", + ), + pytest.param( + sa.types.TIMESTAMP(), + {"type": ["string"], "format": "date-time"}, + id="timestamp", + ), + pytest.param( + sa.types.TIME(), + {"type": ["string"], "format": "time"}, + id="time", + ), + pytest.param( + sa.types.BLOB(), + {"type": ["string"]}, + id="unknown", + ), + ], +) +def test_sql_to_json_schema_map( + sql_type: sa.types.TypeEngine, + expected_jsonschema_type: dict, +): + m = SQLToJSONSchema() + assert m.to_jsonschema(sql_type) == expected_jsonschema_type + + +def test_custom_type(): + class MyMap(SQLToJSONSchema): + @SQLToJSONSchema.to_jsonschema.register + def custom_number_to_jsonschema(self, column_type: sa.types.NUMERIC) -> dict: + """Custom number to JSON schema. + + For example, a scale of 4 translates to a multipleOf 0.0001. + """ + return {"type": ["number"], "multipleOf": 10**-column_type.scale} + + @SQLToJSONSchema.to_jsonschema.register(MyType) + def my_type_to_jsonschema(self, column_type) -> dict: # noqa: ARG002 + return {"type": ["string"], "contentEncoding": "base64"} + + m = MyMap() + + assert m.to_jsonschema(MyType()) == { + "type": ["string"], + "contentEncoding": "base64", + } + assert m.to_jsonschema(sa.types.NUMERIC(scale=2)) == { + "type": ["number"], + "multipleOf": 0.01, + } + assert m.to_jsonschema(sa.types.BOOLEAN()) == {"type": ["boolean"]} diff --git a/tests/core/test_sql_typing.py b/tests/core/test_sql_typing.py index 4248ea06d..5662d7d0d 100644 --- a/tests/core/test_sql_typing.py +++ b/tests/core/test_sql_typing.py @@ -70,5 +70,7 @@ def test_convert_sql_type_to_jsonschema_type( sql_type: sa.types.TypeEngine, is_of_jsonschema_type: dict, ): - result = th.to_jsonschema_type(sql_type) + with pytest.warns(DeprecationWarning): + result = th.to_jsonschema_type(sql_type) + assert result == is_of_jsonschema_type