From 93776aa33b55ac5509f3568aae8857562bdada6d Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Thu, 12 Jan 2023 09:46:28 -0500 Subject: [PATCH] Revert "Stream Name splits toggle added, target now works with upper case stream names (#59)" This reverts commit 355bceb4ccd35a5c0e6ba74b136cc09455ad6b9f. --- README.md | 1 - target_postgres/connector.py | 10 +++++----- target_postgres/sinks.py | 28 +++++++--------------------- target_postgres/target.py | 11 ----------- 4 files changed, 12 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index ffd86506..eca58914 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,6 @@ Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Target | database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. | | sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port, dialect. Note that you must esacpe password special characters properly see https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords | | dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. | -| stream_name_splits | True | False | When stream names have dashes in them then use the format of [schema]-[table]. If you do not want schemas to be determined from the stream name then Disable this feature. | stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | | stream_map_config | False | None | User-defined config values to be used within map expressions. | | flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 36c911b1..4c74694c 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -54,17 +54,17 @@ def get_sqlalchemy_url(self, config: dict) -> str: def truncate_table(self, name): """Clear table data.""" - self.connection.execute(f'TRUNCATE TABLE "{name}"') + self.connection.execute(f"TRUNCATE TABLE {name}") def drop_table(self, name): """Drop table data.""" - self.connection.execute(f'DROP TABLE "{name}"') + self.connection.execute(f"DROP TABLE {name}") def create_temp_table_from_table(self, from_table_name, temp_table_name): """Temp table from another table.""" ddl = sqlalchemy.DDL( - 'CREATE TEMP TABLE "%(temp_table_name)s" AS ' - 'SELECT * FROM "%(from_table_name)s" LIMIT 0', + "CREATE TEMP TABLE %(temp_table_name)s AS " + "SELECT * FROM %(from_table_name)s LIMIT 0", {"temp_table_name": temp_table_name, "from_table_name": from_table_name}, ) self.connection.execute(ddl) @@ -166,7 +166,7 @@ def get_column_add_ddl( column = sqlalchemy.Column(column_name, column_type) return sqlalchemy.DDL( - 'ALTER TABLE "%(table_name)s" ADD COLUMN "%(column_name)s" %(column_type)s', + "ALTER TABLE %(table_name)s ADD COLUMN %(column_name)s %(column_type)s", { "table_name": table_name, "column_name": column.compile(dialect=self._engine.dialect), diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 92882901..779e8d99 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -123,11 +123,11 @@ def merge_upsert_from_table( where_condition = " and ".join([f'target."{key}" is null' for key in join_keys]) insert_sql = f""" - INSERT INTO \"{to_table_name}\" + INSERT INTO {to_table_name} SELECT temp.* - FROM \"{from_table_name}\" AS temp - LEFT JOIN \"{to_table_name}\" AS target ON {join_condition} + FROM {from_table_name} AS temp + LEFT JOIN {to_table_name} AS target ON {join_condition} WHERE {where_condition} """ self.connection.execute(insert_sql) @@ -141,9 +141,9 @@ def merge_upsert_from_table( ) where_condition = join_condition update_sql = f""" - UPDATE \"{to_table_name}\" AS target + UPDATE {to_table_name} AS target SET {columns} - FROM \"{from_table_name}\" AS temp + FROM {from_table_name} AS temp WHERE {where_condition} """ self.connection.execute(update_sql) @@ -223,7 +223,7 @@ def generate_insert_statement( def conform_name(self, name: str, object_type: Optional[str] = None) -> str: """Conforming names of tables, schemas, column names.""" - return name.replace("-", "_") + return name @property def schema_name(self) -> Optional[str]: @@ -246,7 +246,7 @@ def schema_name(self) -> Optional[str]: if default_target_schema: return default_target_schema - if self.config["stream_name_splits"] is True and len(parts) in {2, 3}: + if len(parts) in {2, 3}: # Stream name is a two-part or three-part identifier. # Use the second-to-last part as the schema name. stream_schema = self.conform_name(parts[-2], "schema") @@ -254,17 +254,3 @@ def schema_name(self) -> Optional[str]: # Schema name not detected. return None - - @property - def table_name(self) -> str: - """Return the table name, with no schema or database part. - - Returns: - The target table name. - """ - parts = self.stream_name.split("-") - if self.config["stream_name_splits"] is True: - table = self.stream_name if len(parts) == 1 else parts[-1] - else: - table = self.stream_name - return self.conform_name(table, "table") diff --git a/target_postgres/target.py b/target_postgres/target.py index 880d0826..0f2add8e 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -116,18 +116,7 @@ def __init__( th.StringType, description="Postgres schema to send data to, example: tap-clickup", ), - th.Property( - "stream_name_splits", - th.BooleanType, - default=True, - description=( - "When stream names have dashes in them then use " - + "the format of [schema]-[table]. If you do not want schemas " - + "to be determined from the stream name then Disable this feature." - ), - ), ).to_dict() - default_sink_class = PostgresSink @property