Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added partition support for Glue #324

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions dbt/adapters/duckdb/plugins/glue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence

Expand Down Expand Up @@ -162,14 +163,34 @@ def _get_column_type_def(
return None


def _add_partition_columns(table_def: TableInputTypeDef, partition_columns) -> TableInputTypeDef:
partition_keys = []
if "PartitionKeys" not in table_def:
table_def["PartitionKeys"] = []
for column in partition_columns:
partition_column = ColumnTypeDef(Name=column["Name"], Type=column["Type"])
partition_keys.append(partition_column)
table_def["PartitionKeys"] = partition_keys
# Remove columns from StorageDescriptor if they match with partition columns to avoid duplicate columns
for partition_column in partition_columns:
table_def["StorageDescriptor"]["Columns"] = [
column
for column in table_def["StorageDescriptor"]["Columns"]
if not (
column["Name"] == partition_column["Name"]
and column["Type"] == partition_column["Type"]
)
]
return table_def


def _get_table_def(
table: str,
s3_path: str,
s3_parent: str,
columns: Sequence["ColumnTypeDef"],
file_format: str,
delimiter: str,
):
s3_parent = "/".join(s3_path.split("/")[:-1])
if file_format == "csv":
table_def = _get_csv_table_def(
table=table,
Expand Down Expand Up @@ -205,17 +226,27 @@ def create_or_update_table(
s3_path: str,
file_format: str,
delimiter: str,
partition_columns: List[Dict[str, str]] = [],
) -> None:
# Set s3 original path if partitioning is used, else use parent path
if partition_columns != []:
s3_parent = s3_path
if partition_columns == []:
s3_parent = "/".join(s3_path.split("/")[:-1])

# Existing table in AWS Glue catalog
glue_table = _get_table(client=client, database=database, table=table)
columns = _convert_columns(column_list)
table_def = _get_table_def(
table=table,
s3_path=s3_path,
s3_parent=s3_parent,
columns=columns,
file_format=file_format,
delimiter=delimiter,
)
# Add partition columns
if partition_columns != []:
table_def = _add_partition_columns(table_def, partition_columns)
if glue_table:
# Existing columns in AWS Glue catalog
glue_columns = _get_column_type_def(glue_table)
Expand All @@ -236,6 +267,8 @@ def store(self, target_config: TargetConfig):
assert target_config.location is not None
assert target_config.relation.identifier is not None
table: str = target_config.relation.identifier
partition_columns = target_config.config.get("partition_columns", [])

create_or_update_table(
self.client,
self.database,
Expand All @@ -244,4 +277,5 @@ def store(self, target_config: TargetConfig):
target_config.location.path,
target_config.location.format,
self.delimiter,
partition_columns,
)
1 change: 1 addition & 0 deletions dbt/include/duckdb/macros/materializations/external.sql
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
-- register table into glue
{%- set plugin_name = config.get('plugin') -%}
{%- set glue_register = config.get('glue_register', default=false) -%}
{%- set partition_columns = config.get('partition_columns', []) -%}
{% if plugin_name is not none or glue_register is true %}
{% if glue_register %}
{# legacy hack to set the glue database name, deprecate this #}
Expand Down
Loading