Skip to content

Commit

Permalink
fix: limit max length edge case (#68)
Browse files Browse the repository at this point in the history
Closes #67
  • Loading branch information
pnadolny13 authored Jul 6, 2023
1 parent 86a9ad0 commit 3e95012
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
13 changes: 12 additions & 1 deletion target_snowflake/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from target_snowflake.snowflake_types import NUMBER, TIMESTAMP_NTZ, VARIANT


SNOWFLAKE_MAX_STRING_LENGTH = 16777216

class TypeMap:
def __init__(self, operator, map_value, match_value=None):
self.operator = operator
Expand Down Expand Up @@ -231,6 +233,14 @@ def get_column_alter_ddl(
},
)

@staticmethod
def _conform_max_length(jsonschema_type):
"""Alter jsonschema representations to limit max length to Snowflake's VARCHAR length."""
max_length = jsonschema_type.get("maxLength")
if max_length and max_length > SNOWFLAKE_MAX_STRING_LENGTH:
jsonschema_type["maxLength"] = SNOWFLAKE_MAX_STRING_LENGTH
return jsonschema_type

@staticmethod
def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
"""Return a JSON Schema representation of the provided type.
Expand All @@ -244,10 +254,11 @@ def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
The SQLAlchemy type representation of the data type.
"""
# start with default implementation
jsonschema_type = SnowflakeConnector._conform_max_length(jsonschema_type)
target_type = SQLConnector.to_sql_type(jsonschema_type)
# snowflake max and default varchar length
# https://docs.snowflake.com/en/sql-reference/intro-summary-data-types.html
maxlength = jsonschema_type.get("maxLength", 16777216)
maxlength = jsonschema_type.get("maxLength", SNOWFLAKE_MAX_STRING_LENGTH)
# define type maps
string_submaps = [
TypeMap(eq, TIMESTAMP_NTZ(), "date-time"),
Expand Down
33 changes: 33 additions & 0 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,38 @@ def setup(self) -> None:
"""
)


class SnowflakeTargetTypeEdgeCasesTest(TargetFileTestTemplate):
name = "type_edge_cases"

@property
def singer_filepath(self) -> Path:
current_dir = Path(__file__).resolve().parent
return current_dir / "target_test_streams" / f"{self.name}.singer"

def validate(self) -> None:
connector = self.target.default_sink_class.connector_class(self.target.config)
table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.{self.name}".upper()
connector.connection.execute(
f"select * from {table} order by 1",
)

table_schema = connector.get_table(table)
expected_types = {
"id": sct.NUMBER,
"col_max_length_str": sct.STRING,
"_sdc_extracted_at": sct.TIMESTAMP_NTZ,
"_sdc_batched_at": sct.TIMESTAMP_NTZ,
"_sdc_received_at": sct.TIMESTAMP_NTZ,
"_sdc_deleted_at": sct.TIMESTAMP_NTZ,
"_sdc_table_version": sct.NUMBER,
"_sdc_sequence": sct.NUMBER,
}
for column in table_schema.columns:
assert column.name in expected_types
isinstance(column.type, expected_types[column.name])


target_tests = TestSuite(
kind="target",
tests=[
Expand All @@ -445,5 +477,6 @@ def setup(self) -> None:
SnowflakeTargetColonsInColName,
SnowflakeTargetExistingTable,
SnowflakeTargetExistingTableAlter,
SnowflakeTargetTypeEdgeCasesTest,
],
)
2 changes: 2 additions & 0 deletions tests/target_test_streams/type_edge_cases.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"type": "SCHEMA", "stream": "type_edge_cases", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "col_max_length_str": {"maxLength": 4294967295, "type": [ "null", "string" ] }}}}
{"type": "RECORD", "stream": "type_edge_cases", "record": {"id": 1, "col_max_length_str": "foo"}}

0 comments on commit 3e95012

Please sign in to comment.