Skip to content

Commit

Permalink
some more changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Nov 7, 2024
1 parent c669e15 commit 0078a74
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
7 changes: 4 additions & 3 deletions dlt/destinations/impl/bigquery/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def create_dataset(self) -> None:
def execute_sql(
self, sql: AnyStr, *args: Any, **kwargs: Any
) -> Optional[Sequence[Sequence[Any]]]:
print(sql)
with self.execute_query(sql, *args, **kwargs) as curr:
if not curr.description:
return None
Expand Down Expand Up @@ -274,6 +275,7 @@ def _make_database_exception(cls, ex: Exception) -> Exception:
return DatabaseTransientException(ex)

def truncate_tables(self, *tables: str) -> None:
"""NOTE: We only truncate tables that exist, for auto-detect schema we don't know which tables exist"""
statements: List[str] = ["DECLARE table_exists BOOL;"]
for t in tables:
table_name = self.make_qualified_table_name(t)
Expand All @@ -282,9 +284,8 @@ def truncate_tables(self, *tables: str) -> None:
f" `{self.project_id}.{self.dataset_name}.INFORMATION_SCHEMA.TABLES` WHERE"
f" table_name = '{t}');"
)
statements.append(
f"IF table_exists THEN EXECUTE IMMEDIATE 'TRUNCATE TABLE `{table_name}`'; END IF;"
)
truncate_stmt = self._truncate_table_sql(table_name).replace(";", "")
statements.append(f"IF table_exists THEN EXECUTE IMMEDIATE '{truncate_stmt}'; END IF;")
self.execute_many(statements)

@staticmethod
Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,12 @@ def gen_merge_sql(
root_table["name"]
)

# NOTE: this is bigquery specific code! Move to bigquery merge job
# NOTE: we also need to create all child tables
# NOTE: this will not work if the schema of the staging table changes in the next run..
# in some cases we need to create final tables here
sql.append(f"CREATE TABLE IF NOT EXISTS {root_table_name} LIKE {staging_root_table_name};")

# get merge and primary keys from top level
primary_keys = cls._escape_list(
get_columns_names_with_prop(root_table, "primary_key"),
Expand Down

0 comments on commit 0078a74

Please sign in to comment.