Skip to content

Commit

Permalink
fix: Fix SQL type merging for pre-existing target tables (#898)
Browse files Browse the repository at this point in the history
* new check added to streams sql _adapt_column_type and comments to merge_sql_types

* Removed issubclass check did work as intended

* Changed to a single string to string check

* update of merge_sql_types to match _adapt_column_type changes

Co-authored-by: Edgar R. M <[email protected]>
  • Loading branch information
BuzzCutNorman and edgarrmondragon authored Sep 9, 2022
1 parent 4d92a61 commit 950e9ef
Showing 1 changed file with 53 additions and 15 deletions.
68 changes: 53 additions & 15 deletions singer_sdk/streams/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,27 +721,53 @@ def merge_sql_types(
if len(sql_types) == 1:
return sql_types[0]

# Gathering Type to match variables
# sent in _adapt_column_type
current_type = sql_types[0]
# sql_type = sql_types[1]

# Getting the length of each type
# current_type_len: int = getattr(sql_types[0], "length", 0)
sql_type_len: int = getattr(sql_types[1], "length", 0)
if sql_type_len is None:
sql_type_len = 0

# Convert the two types given into a sorted list
# containing the best conversion classes
sql_types = self._sort_types(sql_types)

# If greater than two evaluate the first pair then on down the line
if len(sql_types) > 2:
return self.merge_sql_types(
[self.merge_sql_types([sql_types[0], sql_types[1]])] + sql_types[2:]
)

assert len(sql_types) == 2
generic_type = type(sql_types[0].as_generic())
if isinstance(generic_type, type):
if issubclass(
generic_type,
(sqlalchemy.types.String, sqlalchemy.types.Unicode),
):
return sql_types[0]

elif isinstance(
generic_type,
(sqlalchemy.types.String, sqlalchemy.types.Unicode),
):
return sql_types[0]
# Get the generic type class
for opt in sql_types:
# Get the length
opt_len: int = getattr(opt, "length", 0)
generic_type = type(opt.as_generic())

if isinstance(generic_type, type):
if issubclass(
generic_type,
(sqlalchemy.types.String, sqlalchemy.types.Unicode),
):
# If length None or 0 then is varchar max ?
if (opt_len is None) or (opt_len == 0):
return opt
elif isinstance(
generic_type,
(sqlalchemy.types.String, sqlalchemy.types.Unicode),
):
# If length None or 0 then is varchar max ?
if (opt_len is None) or (opt_len == 0):
return opt
# If best conversion class is equal to current type
# return the best conversion class
elif str(opt) == str(current_type):
return opt

raise ValueError(
f"Unable to merge sql types: {', '.join([str(t) for t in sql_types])}"
Expand Down Expand Up @@ -827,9 +853,21 @@ def _adapt_column_type(
Raises:
NotImplementedError: if altering columns is not supported.
"""
current_type = self._get_column_type(full_table_name, column_name)
current_type: sqlalchemy.types.TypeEngine = self._get_column_type(
full_table_name, column_name
)

# Check if the existing column type and the sql type are the same
if str(sql_type) == str(current_type):
# The current column and sql type are the same
# Nothing to do
return

# Not the same type, generic type or compatible types
# calling merge_sql_types for assistnace
compatible_sql_type = self.merge_sql_types([current_type, sql_type])
if current_type == compatible_sql_type:

if str(compatible_sql_type) == str(current_type):
# Nothing to do
return

Expand Down

0 comments on commit 950e9ef

Please sign in to comment.