Skip to content

Commit

Permalink
feat: ✨ add maxLength to schema
Browse files Browse the repository at this point in the history
if the column has a length (e.g. varchar(256), propagate this length to maxLength in the schema. This can then be used in sql targets to create columns with the right type and size.
  • Loading branch information
tobiascadee committed Aug 21, 2024
1 parent 3d0eedd commit bd09bf4
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 2 deletions.
7 changes: 5 additions & 2 deletions meltano.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
default_environment: dev
environments:
- name: dev
version: 1
send_anonymous_usage_stats: true
project_id: "tap-postgres"
Expand All @@ -11,7 +14,7 @@ plugins:
- catalog
- discover
config:
sqlalchemy_url: "postgresql://postgres:postgres@localhost:5432/postgres"
sqlalchemy_url: "postgresql://tobiascadee:password@localhost:5432/postgres"
settings:
- name: sqlalchemy_url
kind: password
Expand All @@ -37,7 +40,7 @@ plugins:
- name: ssl_client_private_key
kind: password
select:
- "*.*"
- "public-test.*"
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
91 changes: 91 additions & 0 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
from psycopg2 import extras
from singer_sdk import SQLConnector, SQLStream
from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from singer_sdk.helpers._state import increment_state
from singer_sdk.helpers._typing import TypeConformanceLevel
from singer_sdk.streams.core import REPLICATION_INCREMENTAL
from sqlalchemy.engine import Engine
from sqlalchemy.engine.reflection import Inspector

if TYPE_CHECKING:
from singer_sdk.helpers.types import Context
Expand Down Expand Up @@ -255,6 +258,94 @@ def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
return self.config["filter_schemas"]
return super().get_schema_names(engine, inspected)

def discover_catalog_entry(
self,
engine: Engine,
inspected: Inspector,
schema_name: str,
table_name: str,
is_view: bool,
) -> CatalogEntry:
"""Create `CatalogEntry` object for the given table or a view.
Args:
engine: SQLAlchemy engine
inspected: SQLAlchemy inspector instance for engine
schema_name: Schema name to inspect
table_name: Name of the table or a view
is_view: Flag whether this object is a view, returned by `get_object_names`
Returns:
`CatalogEntry` object for the given table or a view
"""
# Initialize unique stream name
unique_stream_id = f"{schema_name}-{table_name}"

# Detect key properties
possible_primary_keys: list[list[str]] = []
pk_def = inspected.get_pk_constraint(table_name, schema=schema_name)
if pk_def and "constrained_columns" in pk_def:
possible_primary_keys.append(pk_def["constrained_columns"])

# An element of the columns list is ``None`` if it's an expression and is
# returned in the ``expressions`` list of the reflected index.
possible_primary_keys.extend(
index_def["column_names"] # type: ignore[misc]
for index_def in inspected.get_indexes(table_name, schema=schema_name)
if index_def.get("unique", False)
)

key_properties = next(iter(possible_primary_keys), None)

# Initialize columns list
table_schema = th.PropertiesList()
for column_def in inspected.get_columns(table_name, schema=schema_name):
column_name = column_def["name"]
is_nullable = column_def.get("nullable", False)
jsonschema_type: dict = self.to_jsonschema_type(column_def["type"])
if hasattr(column_def["type"], "length"):
jsonschema_type["maxLength"] = column_def["type"].length
table_schema.append(
th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
nullable=is_nullable,
required=column_name in key_properties if key_properties else False,
),
)
schema = table_schema.to_dict()

# Initialize available replication methods
addl_replication_methods: list[str] = [""] # By default an empty list.
# Notes regarding replication methods:
# - 'INCREMENTAL' replication must be enabled by the user by specifying
# a replication_key value.
# - 'LOG_BASED' replication must be enabled by the developer, according
# to source-specific implementation capabilities.
replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods]))

# Create the catalog entry object
return CatalogEntry(
tap_stream_id=unique_stream_id,
stream=unique_stream_id,
table=table_name,
key_properties=key_properties,
schema=Schema.from_dict(schema),
is_view=is_view,
replication_method=replication_method,
metadata=MetadataMapping.get_standard_metadata(
schema_name=schema_name,
schema=schema,
replication_method=replication_method,
key_properties=key_properties,
valid_replication_keys=None, # Must be defined by user
),
database=None, # Expects single-database context
row_count=None,
stream_alias=None,
replication_key=None, # Must be defined by user
)


class PostgresStream(SQLStream):
"""Stream class for Postgres streams."""
Expand Down

0 comments on commit bd09bf4

Please sign in to comment.