Skip to content

Commit

Permalink
feat: Developers can now more easily override the mapping from SQL co…
Browse files Browse the repository at this point in the history
…lumn type to JSON schema (#2618)

* feat: (WIP) Let developers more easily override SQL column type to JSON schema mapping

* Rename `SQLToJSONSchemaMap` -> `SQLToJSONSchema`

* Remove annotation from example

* Move class reference docs

* Fix example in docs
  • Loading branch information
edgarrmondragon authored Sep 6, 2024
1 parent efdc735 commit 7ea1422
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 9 deletions.
8 changes: 8 additions & 0 deletions docs/classes/singer_sdk.connectors.sql.SQLToJSONSchema.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
singer_sdk.connectors.sql.SQLToJSONSchema
=========================================

.. currentmodule:: singer_sdk.connectors.sql

.. autoclass:: SQLToJSONSchema
:members:
:special-members: __init__, __call__
4 changes: 4 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@
"https://json-schema.org/understanding-json-schema/reference/%s",
"%s",
),
"column_type": (
"https://docs.sqlalchemy.org/en/20/core/type_basics.html#sqlalchemy.types.%s",
"%s",
),
}

# -- Options for intersphinx -----------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions docs/guides/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ porting
pagination-classes
custom-clis
config-schema
sql-tap
```
54 changes: 54 additions & 0 deletions docs/guides/sql-tap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Building SQL taps

## Default type mapping

The Singer SDK automatically handles the most common SQLAlchemy column types, using [`functools.singledispatchmethod`](inv:python:py:class:#functools.singledispatchmethod) to process each type. See the [`SQLToJSONSchema`](connectors.sql.SQLToJSONSchema) reference documentation for details.

## Custom type mapping

If the class above doesn't cover all the types supported by the SQLAlchemy dialect in your tap, you can subclass it and override or extend with a new method for the type you need to support:

```python
import functools

from sqlalchemy import Numeric
from singer_sdk import typing as th
from singer_sdk.connectors import SQLConnector
from singer_sdk.connectors.sql import SQLToJSONSchema

from my_sqlalchemy_dialect import VectorType


class CustomSQLToJSONSchema(SQLToJSONSchema):
@SQLToJSONSchema.to_jsonschema.register
def custom_number_to_jsonschema(self, column_type: Numeric):
"""Override the default mapping for NUMERIC columns.
For example, a scale of 4 translates to a multipleOf 0.0001.
"""
return {"type": ["number"], "multipleOf": 10**-column_type.scale}

@SQLToJSONSchema.to_jsonschema.register(VectorType)
def vector_to_json_schema(self, column_type):
"""Custom vector to JSON schema."""
return th.ArrayType(th.NumberType()).to_dict()
```

````{tip}
You can also use a type annotation to specify the type of the column when registering a new method:
```python
@SQLToJSONSchema.to_jsonschema.register
def vector_to_json_schema(self, column_type: VectorType):
return th.ArrayType(th.NumberType()).to_dict()
```
````

Then, you need to use your custom type mapping in your connector:

```python
class MyConnector(SQLConnector):
@functools.cached_property
def type_mapping(self):
return CustomSQLToJSONSchema()
```
9 changes: 9 additions & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,12 @@ Batch

batch.BaseBatcher
batch.JSONLinesBatcher

Other
-----

.. autosummary::
:toctree: classes
:template: class.rst

connectors.sql.SQLToJSONSchema
110 changes: 107 additions & 3 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import functools
import logging
import sys
import typing as t
Expand Down Expand Up @@ -109,6 +110,83 @@ def prepare_part(self, part: str) -> str: # noqa: PLR6301
return part


class SQLToJSONSchema:
"""SQLAlchemy to JSON Schema type mapping helper.
This class provides a mapping from SQLAlchemy types to JSON Schema types.
"""

@functools.singledispatchmethod
def to_jsonschema(self, column_type: sa.types.TypeEngine) -> dict: # noqa: ARG002, D102, PLR6301
return th.StringType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def datetime_to_jsonschema(self, column_type: sa.types.DateTime) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a generic datetime type.
Args:
column_type (:column_type:`DateTime`): The column type.
"""
return th.DateTimeType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def date_to_jsonschema(self, column_type: sa.types.Date) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a date type.
Args:
column_type (:column_type:`Date`): The column type.
"""
return th.DateType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def time_to_jsonschema(self, column_type: sa.types.Time) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a time type.
Args:
column_type (:column_type:`Time`): The column type.
"""
return th.TimeType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def integer_to_jsonschema(self, column_type: sa.types.Integer) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a an integer type.
Args:
column_type (:column_type:`Integer`): The column type.
"""
return th.IntegerType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def float_to_jsonschema(self, column_type: sa.types.Numeric) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a generic number type.
Args:
column_type (:column_type:`Numeric`): The column type.
"""
return th.NumberType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def string_to_jsonschema(self, column_type: sa.types.String) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a generic string type.
Args:
column_type (:column_type:`String`): The column type.
"""
# TODO: Enable support for maxLength.
# if sa_type.length:
# return StringType(max_length=sa_type.length).type_dict # noqa: ERA001
return th.StringType.type_dict # type: ignore[no-any-return]

@to_jsonschema.register
def boolean_to_jsonschema(self, column_type: sa.types.Boolean) -> dict: # noqa: ARG002, PLR6301
"""Return a JSON Schema representation of a boolean type.
Args:
column_type (:column_type:`Boolean`): The column type.
"""
return th.BooleanType.type_dict # type: ignore[no-any-return]


class SQLConnector: # noqa: PLR0904
"""Base class for SQLAlchemy-based connectors.
Expand Down Expand Up @@ -162,6 +240,17 @@ def logger(self) -> logging.Logger:
"""
return logging.getLogger("sqlconnector")

@functools.cached_property
def type_mapping(self) -> SQLToJSONSchema:
"""Return the type mapper object.
Override this method to provide a custom mapping for your SQL dialect.
Returns:
The type mapper object.
"""
return SQLToJSONSchema()

@contextmanager
def _connect(self) -> t.Iterator[sa.engine.Connection]:
with self._engine.connect().execution_options(stream_results=True) as conn:
Expand Down Expand Up @@ -266,8 +355,8 @@ def get_sqlalchemy_url(self, config: dict[str, t.Any]) -> str: # noqa: PLR6301

return t.cast(str, config["sqlalchemy_url"])

@staticmethod
def to_jsonschema_type(
self,
sql_type: (
str # noqa: ANN401
| sa.types.TypeEngine
Expand All @@ -293,10 +382,25 @@ def to_jsonschema_type(
Returns:
The JSON Schema representation of the provided type.
"""
if isinstance(sql_type, (str, sa.types.TypeEngine)):
if isinstance(sql_type, sa.types.TypeEngine):
return self.type_mapping.to_jsonschema(sql_type)

if isinstance(sql_type, str): # pragma: no cover
warnings.warn(
"Passing string types to `to_jsonschema_type` is deprecated. "
"Please pass a SQLAlchemy type object instead.",
DeprecationWarning,
stacklevel=2,
)
return th.to_jsonschema_type(sql_type)

if isinstance(sql_type, type):
if isinstance(sql_type, type): # pragma: no cover
warnings.warn(
"Passing type classes to `to_jsonschema_type` is deprecated. "
"Please pass a SQLAlchemy type object instead.",
DeprecationWarning,
stacklevel=2,
)
if issubclass(sql_type, sa.types.TypeEngine):
return th.to_jsonschema_type(sql_type)

Expand Down
17 changes: 13 additions & 4 deletions singer_sdk/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from __future__ import annotations

import json
import sys
import typing as t

import sqlalchemy as sa
Expand All @@ -65,9 +66,13 @@
get_datelike_property_type,
)

if t.TYPE_CHECKING:
import sys
if sys.version_info < (3, 13):
from typing_extensions import deprecated
else:
from typing import deprecated # noqa: ICN003 # pragma: no cover


if t.TYPE_CHECKING:
from jsonschema.protocols import Validator

if sys.version_info >= (3, 10):
Expand Down Expand Up @@ -1086,6 +1091,10 @@ def __iter__(self) -> t.Iterator[Property]:
return self.wrapped.values().__iter__()


@deprecated(
"Use `SQLToJSONSchema` instead.",
category=DeprecationWarning,
)
def to_jsonschema_type(
from_type: str | sa.types.TypeEngine | type[sa.types.TypeEngine],
) -> dict:
Expand Down Expand Up @@ -1119,9 +1128,9 @@ def to_jsonschema_type(
"bool": BooleanType.type_dict,
"variant": StringType.type_dict,
}
if isinstance(from_type, str):
if isinstance(from_type, str): # pragma: no cover
type_name = from_type
elif isinstance(from_type, sa.types.TypeEngine):
elif isinstance(from_type, sa.types.TypeEngine): # pragma: no cover
type_name = type(from_type).__name__
elif issubclass(from_type, sa.types.TypeEngine):
type_name = from_type.__name__
Expand Down
85 changes: 84 additions & 1 deletion tests/core/test_connector_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from samples.sample_duckdb import DuckDBConnector
from singer_sdk.connectors import SQLConnector
from singer_sdk.connectors.sql import FullyQualifiedName
from singer_sdk.connectors.sql import FullyQualifiedName, SQLToJSONSchema
from singer_sdk.exceptions import ConfigValidationError

if t.TYPE_CHECKING:
Expand All @@ -22,6 +22,10 @@ def stringify(in_dict):
return {k: str(v) for k, v in in_dict.items()}


class MyType(sa.types.TypeDecorator):
impl = sa.types.LargeBinary


class TestConnectorSQL: # noqa: PLR0904
"""Test the SQLConnector class."""

Expand Down Expand Up @@ -392,3 +396,82 @@ def prepare_part(self, part: str) -> str:
def test_fully_qualified_name_empty_error():
with pytest.raises(ValueError, match="Could not generate fully qualified name"):
FullyQualifiedName()


@pytest.mark.parametrize(
"sql_type, expected_jsonschema_type",
[
pytest.param(sa.types.VARCHAR(), {"type": ["string"]}, id="varchar"),
pytest.param(
sa.types.VARCHAR(length=127),
{"type": ["string"], "maxLength": 127},
marks=pytest.mark.xfail,
id="varchar-length",
),
pytest.param(sa.types.TEXT(), {"type": ["string"]}, id="text"),
pytest.param(sa.types.INTEGER(), {"type": ["integer"]}, id="integer"),
pytest.param(sa.types.BOOLEAN(), {"type": ["boolean"]}, id="boolean"),
pytest.param(sa.types.DECIMAL(), {"type": ["number"]}, id="decimal"),
pytest.param(sa.types.FLOAT(), {"type": ["number"]}, id="float"),
pytest.param(sa.types.REAL(), {"type": ["number"]}, id="real"),
pytest.param(sa.types.NUMERIC(), {"type": ["number"]}, id="numeric"),
pytest.param(
sa.types.DATE(),
{"type": ["string"], "format": "date"},
id="date",
),
pytest.param(
sa.types.DATETIME(),
{"type": ["string"], "format": "date-time"},
id="datetime",
),
pytest.param(
sa.types.TIMESTAMP(),
{"type": ["string"], "format": "date-time"},
id="timestamp",
),
pytest.param(
sa.types.TIME(),
{"type": ["string"], "format": "time"},
id="time",
),
pytest.param(
sa.types.BLOB(),
{"type": ["string"]},
id="unknown",
),
],
)
def test_sql_to_json_schema_map(
sql_type: sa.types.TypeEngine,
expected_jsonschema_type: dict,
):
m = SQLToJSONSchema()
assert m.to_jsonschema(sql_type) == expected_jsonschema_type


def test_custom_type():
class MyMap(SQLToJSONSchema):
@SQLToJSONSchema.to_jsonschema.register
def custom_number_to_jsonschema(self, column_type: sa.types.NUMERIC) -> dict:
"""Custom number to JSON schema.
For example, a scale of 4 translates to a multipleOf 0.0001.
"""
return {"type": ["number"], "multipleOf": 10**-column_type.scale}

@SQLToJSONSchema.to_jsonschema.register(MyType)
def my_type_to_jsonschema(self, column_type) -> dict: # noqa: ARG002
return {"type": ["string"], "contentEncoding": "base64"}

m = MyMap()

assert m.to_jsonschema(MyType()) == {
"type": ["string"],
"contentEncoding": "base64",
}
assert m.to_jsonschema(sa.types.NUMERIC(scale=2)) == {
"type": ["number"],
"multipleOf": 0.01,
}
assert m.to_jsonschema(sa.types.BOOLEAN()) == {"type": ["boolean"]}
Loading

0 comments on commit 7ea1422

Please sign in to comment.