Skip to content

Commit

Permalink
Stream Name splits toggle added, target now works with upper case str…
Browse files Browse the repository at this point in the history
…eam names (#59)

* Stream Name splits toggle added, target now works with upper case stream names

* Lint fixed
  • Loading branch information
visch authored Jan 10, 2023
1 parent b02e03a commit 355bceb
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 12 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Target
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port, dialect. Note that you must esacpe password special characters properly see https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords |
| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. |
| stream_name_splits | True | False | When stream names have dashes in them then use the format of [schema]-[table]. If you do not want schemas to be determined from the stream name then Disable this feature.
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
Expand Down
10 changes: 5 additions & 5 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ def get_sqlalchemy_url(self, config: dict) -> str:

def truncate_table(self, name):
"""Clear table data."""
self.connection.execute(f"TRUNCATE TABLE {name}")
self.connection.execute(f'TRUNCATE TABLE "{name}"')

def drop_table(self, name):
"""Drop table data."""
self.connection.execute(f"DROP TABLE {name}")
self.connection.execute(f'DROP TABLE "{name}"')

def create_temp_table_from_table(self, from_table_name, temp_table_name):
"""Temp table from another table."""
ddl = sqlalchemy.DDL(
"CREATE TEMP TABLE %(temp_table_name)s AS "
"SELECT * FROM %(from_table_name)s LIMIT 0",
'CREATE TEMP TABLE "%(temp_table_name)s" AS '
'SELECT * FROM "%(from_table_name)s" LIMIT 0',
{"temp_table_name": temp_table_name, "from_table_name": from_table_name},
)
self.connection.execute(ddl)
Expand Down Expand Up @@ -166,7 +166,7 @@ def get_column_add_ddl(
column = sqlalchemy.Column(column_name, column_type)

return sqlalchemy.DDL(
"ALTER TABLE %(table_name)s ADD COLUMN %(column_name)s %(column_type)s",
'ALTER TABLE "%(table_name)s" ADD COLUMN "%(column_name)s" %(column_type)s',
{
"table_name": table_name,
"column_name": column.compile(dialect=self._engine.dialect),
Expand Down
28 changes: 21 additions & 7 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ def merge_upsert_from_table(
where_condition = " and ".join([f'target."{key}" is null' for key in join_keys])

insert_sql = f"""
INSERT INTO {to_table_name}
INSERT INTO \"{to_table_name}\"
SELECT
temp.*
FROM {from_table_name} AS temp
LEFT JOIN {to_table_name} AS target ON {join_condition}
FROM \"{from_table_name}\" AS temp
LEFT JOIN \"{to_table_name}\" AS target ON {join_condition}
WHERE {where_condition}
"""
self.connection.execute(insert_sql)
Expand All @@ -141,9 +141,9 @@ def merge_upsert_from_table(
)
where_condition = join_condition
update_sql = f"""
UPDATE {to_table_name} AS target
UPDATE \"{to_table_name}\" AS target
SET {columns}
FROM {from_table_name} AS temp
FROM \"{from_table_name}\" AS temp
WHERE {where_condition}
"""
self.connection.execute(update_sql)
Expand Down Expand Up @@ -223,7 +223,7 @@ def generate_insert_statement(

def conform_name(self, name: str, object_type: Optional[str] = None) -> str:
"""Conforming names of tables, schemas, column names."""
return name
return name.replace("-", "_")

@property
def schema_name(self) -> Optional[str]:
Expand All @@ -246,11 +246,25 @@ def schema_name(self) -> Optional[str]:
if default_target_schema:
return default_target_schema

if len(parts) in {2, 3}:
if self.config["stream_name_splits"] is True and len(parts) in {2, 3}:
# Stream name is a two-part or three-part identifier.
# Use the second-to-last part as the schema name.
stream_schema = self.conform_name(parts[-2], "schema")
return stream_schema

# Schema name not detected.
return None

@property
def table_name(self) -> str:
"""Return the table name, with no schema or database part.
Returns:
The target table name.
"""
parts = self.stream_name.split("-")
if self.config["stream_name_splits"] is True:
table = self.stream_name if len(parts) == 1 else parts[-1]
else:
table = self.stream_name
return self.conform_name(table, "table")
11 changes: 11 additions & 0 deletions target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,18 @@ def __init__(
th.StringType,
description="Postgres schema to send data to, example: tap-clickup",
),
th.Property(
"stream_name_splits",
th.BooleanType,
default=True,
description=(
"When stream names have dashes in them then use "
+ "the format of [schema]-[table]. If you do not want schemas "
+ "to be determined from the stream name then Disable this feature."
),
),
).to_dict()

default_sink_class = PostgresSink

@property
Expand Down

0 comments on commit 355bceb

Please sign in to comment.