From 0078a74ff8ad4bbd5ff6ae9561f47b6da4bac00a Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 7 Nov 2024 13:56:09 +0100 Subject: [PATCH] some more changes --- dlt/destinations/impl/bigquery/sql_client.py | 7 ++++--- dlt/destinations/sql_jobs.py | 6 ++++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/dlt/destinations/impl/bigquery/sql_client.py b/dlt/destinations/impl/bigquery/sql_client.py index 953d9a90cf..627bcf54df 100644 --- a/dlt/destinations/impl/bigquery/sql_client.py +++ b/dlt/destinations/impl/bigquery/sql_client.py @@ -199,6 +199,7 @@ def create_dataset(self) -> None: def execute_sql( self, sql: AnyStr, *args: Any, **kwargs: Any ) -> Optional[Sequence[Sequence[Any]]]: + print(sql) with self.execute_query(sql, *args, **kwargs) as curr: if not curr.description: return None @@ -274,6 +275,7 @@ def _make_database_exception(cls, ex: Exception) -> Exception: return DatabaseTransientException(ex) def truncate_tables(self, *tables: str) -> None: + """NOTE: We only truncate tables that exist, for auto-detect schema we don't know which tables exist""" statements: List[str] = ["DECLARE table_exists BOOL;"] for t in tables: table_name = self.make_qualified_table_name(t) @@ -282,9 +284,8 @@ def truncate_tables(self, *tables: str) -> None: f" `{self.project_id}.{self.dataset_name}.INFORMATION_SCHEMA.TABLES` WHERE" f" table_name = '{t}');" ) - statements.append( - f"IF table_exists THEN EXECUTE IMMEDIATE 'TRUNCATE TABLE `{table_name}`'; END IF;" - ) + truncate_stmt = self._truncate_table_sql(table_name).replace(";", "") + statements.append(f"IF table_exists THEN EXECUTE IMMEDIATE '{truncate_stmt}'; END IF;") self.execute_many(statements) @staticmethod diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index f59f087f4f..faa6b50531 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -501,6 +501,12 @@ def gen_merge_sql( root_table["name"] ) + # NOTE: this is bigquery specific code! Move to bigquery merge job + # NOTE: we also need to create all child tables + # NOTE: this will not work if the schema of the staging table changes in the next run.. + # in some cases we need to create final tables here + sql.append(f"CREATE TABLE IF NOT EXISTS {root_table_name} LIKE {staging_root_table_name};") + # get merge and primary keys from top level primary_keys = cls._escape_list( get_columns_names_with_prop(root_table, "primary_key"),