Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉Updated normalization to handle new datatypes #19721

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b5ab136
Updated normalization simple stream processing to handle new datatypes
etsybaev Nov 22, 2022
19ca150
Updated normalization nested stream processing to handle new datatypes
etsybaev Nov 23, 2022
b743fd0
Updated normalization nested stream processing to handle new datatypes
etsybaev Nov 23, 2022
f76e126
Updated normalization drop_scd_catalog processing to handle new datat…
etsybaev Nov 23, 2022
64f2529
Updated normalization ephemeral test processing to handle new datatypes
etsybaev Nov 23, 2022
cc257b0
fixed more tests for normalization
etsybaev Nov 24, 2022
e4462ef
fixed more tests for normalization
etsybaev Nov 24, 2022
e4dc55d
fixed more tests for normalization
etsybaev Nov 25, 2022
724df92
fixed more tests for normalization
etsybaev Nov 25, 2022
d80179a
fixed more issues
etsybaev Nov 27, 2022
6dfad09
fixed more issues (clickhouse)
etsybaev Nov 28, 2022
6bf7eed
fixed more issues
etsybaev Nov 29, 2022
7244d91
fixed more issues
etsybaev Nov 29, 2022
d218245
fixed more issues
etsybaev Nov 29, 2022
d6a1d2a
added binary type processing for some DBs
etsybaev Dec 3, 2022
cb4a2ca
cleared commented code and moved some hardcodes to processing as macro
etsybaev Dec 3, 2022
f9052c6
fixed codestyle and cleared commented code
etsybaev Dec 4, 2022
138864f
minor refactor
etsybaev Dec 9, 2022
466b1d0
minor refactor
etsybaev Dec 9, 2022
85ab29f
minor refactor
etsybaev Dec 10, 2022
e523ce6
fixed bool cast error
etsybaev Dec 10, 2022
c4e4863
fixed dict->str cast error
etsybaev Dec 11, 2022
c3982bf
fixed is_combining_node cast py check
etsybaev Dec 11, 2022
4ec0ef8
removed commented code
etsybaev Dec 11, 2022
da8a520
removed commented code
etsybaev Dec 11, 2022
9bb26a8
committed autogenerated normalization_test_output files
etsybaev Dec 13, 2022
138335c
committed autogenerated normalization_test_output files (new files)
etsybaev Dec 14, 2022
45b6864
refactored utils.py
etsybaev Dec 14, 2022
6705501
Updated utils.py to use Callable functions and get rid of property_ty…
etsybaev Dec 15, 2022
9068ae1
committed autogenerated normalization_test_output files (new files)
etsybaev Dec 17, 2022
072871f
fixed typo in TIMESTAMP_WITH_TIMEZONE_TYPE
etsybaev Dec 22, 2022
b47d746
updated stream_processor to handle string type first as a wider type
etsybaev Dec 24, 2022
ce18d8b
fixed arrays normalization by updating is_simple_property method as p…
etsybaev Dec 27, 2022
b97f21f
format
edgao Jan 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@
string
{% endmacro %}

{%- macro type_binary() -%}
{{ adapter.dispatch('type_binary')() }}
{%- endmacro -%}

{%- macro default__type_binary() -%}
binary
{%- endmacro -%}

{%- macro redshift__type_json() -%}
{%- if redshift_super_type() -%}
super
Expand Down Expand Up @@ -72,6 +80,28 @@
char(1000)
{%- endmacro -%}

{# binary data ------------------------------------------------- #}

{%- macro postgres__type_binary() -%}
bytea
{%- endmacro -%}

{%- macro bigquery__type_binary() -%}
bytes
{%- endmacro -%}

{%- macro mssql__type_binary() -%}
VARBINARY(MAX)
{%- endmacro -%}

{%- macro snowflake__type_binary() -%}
VARBINARY
{%- endmacro -%}

{%- macro clickhouse__type_binary() -%}
VARBINARY
{%- endmacro -%}

{# float ------------------------------------------------- #}
{% macro mysql__type_float() %}
float
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
from (
select distinct _airbyte_unique_key as unique_key
from {{ this }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
) recent_records
left join (
select _airbyte_unique_key as unique_key, count(_airbyte_unique_key) as active_count
from {{ this }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
group by _airbyte_unique_key
) active_counts
on recent_records.unique_key = active_counts.unique_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ select
json_extract_scalar(_airbyte_data, "$['datetime_no_tz']") as datetime_no_tz,
json_extract_scalar(_airbyte_data, "$['time_tz']") as time_tz,
json_extract_scalar(_airbyte_data, "$['time_no_tz']") as time_no_tz,
json_extract_scalar(_airbyte_data, "$['property_binary_data']") as property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at
Expand Down Expand Up @@ -74,6 +75,7 @@ select
cast(nullif(time_no_tz, '') as
time
) as time_no_tz,
cast(FROM_BASE64(property_binary_data) as bytes) as property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at
Expand Down Expand Up @@ -111,6 +113,8 @@ select
string
), ''), '-', coalesce(cast(time_no_tz as
string
), ''), '-', coalesce(cast(property_binary_data as
string
), '')) as
string
))) as _airbyte_exchange_rate_hashid,
Expand All @@ -134,6 +138,7 @@ select
datetime_no_tz,
time_tz,
time_no_tz,
property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
from (
select distinct _airbyte_unique_key as unique_key
from {{ this }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('dedup_exchange_rate')) }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('dedup_exchange_rate')) }}
) recent_records
left join (
select _airbyte_unique_key as unique_key, count(_airbyte_unique_key) as active_count
from {{ this }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('dedup_exchange_rate')) }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('dedup_exchange_rate')) }}
group by _airbyte_unique_key
) active_counts
on recent_records.unique_key = active_counts.unique_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@
partition_by = {"field": "_airbyte_emitted_at", "data_type": "timestamp", "granularity": "day"},
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
post_hook = ["
{%
set scd_table_relation = adapter.get_relation(
database=this.database,
schema=this.schema,
identifier='exchange_rate_scd'
)
%}
{%
if scd_table_relation is not none
%}
{%
do adapter.drop_relation(scd_table_relation)
%}
{% endif %}
"],
tags = [ "top-level" ]
) }}
-- Final base SQL model
Expand All @@ -21,6 +37,7 @@ select
datetime_no_tz,
time_tz,
time_no_tz,
property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
from (
select distinct _airbyte_unique_key as unique_key
from {{ this }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('dedup_exchange_rate')) }}
where 1=1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('dedup_exchange_rate')) }}
) recent_records
left join (
select _airbyte_unique_key as unique_key, count(_airbyte_unique_key) as active_count
from {{ this }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('dedup_exchange_rate')) }}
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('dedup_exchange_rate')) }}
group by _airbyte_unique_key
) active_counts
on recent_records.unique_key = active_counts.unique_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@
partition_by = {"field": "_airbyte_emitted_at", "data_type": "timestamp", "granularity": "day"},
unique_key = '_airbyte_ab_id',
schema = "test_normalization",
post_hook = ["
{%
set scd_table_relation = adapter.get_relation(
database=this.database,
schema=this.schema,
identifier='exchange_rate_scd'
)
%}
{%
if scd_table_relation is not none
%}
{%
do adapter.drop_relation(scd_table_relation)
%}
{% endif %}
"],
tags = [ "top-level" ]
) }}
-- Final base SQL model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ select
json_extract_scalar(_airbyte_data, "$['datetime_no_tz']") as datetime_no_tz,
json_extract_scalar(_airbyte_data, "$['time_tz']") as time_tz,
json_extract_scalar(_airbyte_data, "$['time_no_tz']") as time_no_tz,
json_extract_scalar(_airbyte_data, "$['property_binary_data']") as property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at
Expand Down Expand Up @@ -74,6 +75,7 @@ select
cast(nullif(time_no_tz, '') as
time
) as time_no_tz,
cast(FROM_BASE64(property_binary_data) as bytes) as property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at
Expand Down Expand Up @@ -111,6 +113,8 @@ select
string
), ''), '-', coalesce(cast(time_no_tz as
string
), ''), '-', coalesce(cast(property_binary_data as
string
), '')) as
string
))) as _airbyte_exchange_rate_hashid,
Expand All @@ -134,6 +138,7 @@ select
datetime_no_tz,
time_tz,
time_no_tz,
property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() as _airbyte_normalized_at,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create view _airbyte_test_normalization.dedup_exchange_rate_ab1__dbt_tmp
create view _airbyte_test_normalization.dedup_exchange_rate_ab1

as (

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@


create view _airbyte_test_normalization.dedup_exchange_rate_ab2__dbt_tmp
create view _airbyte_test_normalization.dedup_exchange_rate_ab2

as (

-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: _airbyte_test_normalization.dedup_exchange_rate_ab1
select
accurateCastOrNull(id, '
accurateCastOrNull(trim(BOTH '"' from id), '
BIGINT
') as id,
nullif(accurateCastOrNull(trim(BOTH '"' from currency), 'String'), 'null') as currency,
toDate(parseDateTimeBestEffortOrNull(trim(BOTH '"' from nullif(date, '')))) as date,
parseDateTime64BestEffortOrNull(trim(BOTH '"' from nullif(timestamp_col, ''))) as timestamp_col,
accurateCastOrNull("HKD@spéçiäl & characters", '
accurateCastOrNull(trim(BOTH '"' from "HKD@spéçiäl & characters"), '
Float64
') as "HKD@spéçiäl & characters",
nullif(accurateCastOrNull(trim(BOTH '"' from HKD_special___characters), 'String'), 'null') as HKD_special___characters,
accurateCastOrNull(NZD, '
accurateCastOrNull(trim(BOTH '"' from NZD), '
Float64
') as NZD,
accurateCastOrNull(USD, '
accurateCastOrNull(trim(BOTH '"' from USD), '
Float64
') as USD,
_airbyte_ab_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@







insert into test_normalization.dedup_cdc_excluded_scd ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "name", "_ab_cdc_lsn", "_ab_cdc_updated_at", "_ab_cdc_deleted_at", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_cdc_excluded_hashid")
Expand Down Expand Up @@ -101,4 +103,4 @@ select
_airbyte_dedup_cdc_excluded_hashid
from dedup_data where _airbyte_row_num = 1


Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@







insert into test_normalization.dedup_exchange_rate_scd ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "HKD_special___characters", "NZD", "USD", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid")
Expand Down Expand Up @@ -105,4 +107,4 @@ select
_airbyte_dedup_exchange_rate_hashid
from dedup_data where _airbyte_row_num = 1


Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@







insert into test_normalization.renamed_dedup_cdc_excluded_scd ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "_ab_cdc_updated_at", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_renamed_dedup_cdc_excluded_hashid")
Expand Down Expand Up @@ -87,4 +89,4 @@ select
_airbyte_renamed_dedup_cdc_excluded_hashid
from dedup_data where _airbyte_row_num = 1


Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@







insert into test_normalization.dedup_exchange_rate ("_airbyte_unique_key", "id", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "HKD_special___characters", "NZD", "USD", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_dedup_exchange_rate_hashid")
Expand All @@ -26,4 +28,4 @@ where 1 = 1
and _airbyte_active_row = 1



Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@







insert into test_normalization.renamed_dedup_cdc_excluded ("_airbyte_unique_key", "id", "_ab_cdc_updated_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_renamed_dedup_cdc_excluded_hashid")
Expand All @@ -20,4 +22,4 @@ where 1 = 1
and _airbyte_active_row = 1



Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@





insert into test_normalization.exchange_rate__dbt_tmp ("id", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "HKD_special___characters", "NZD", "USD", "column___with__quotes", "datetime_tz", "datetime_no_tz", "time_tz", "time_no_tz", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_exchange_rate_hashid")
insert into test_normalization.exchange_rate ("id", "currency", "date", "timestamp_col", "HKD@spéçiäl & characters", "HKD_special___characters", "NZD", "USD", "column___with__quotes", "datetime_tz", "datetime_no_tz", "time_tz", "time_no_tz", "property_binary_data", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_exchange_rate_hashid")

-- Final base SQL model
-- depends_on: _airbyte_test_normalization.exchange_rate_ab3
Expand All @@ -19,6 +21,7 @@ select
datetime_no_tz,
time_tz,
time_no_tz,
property_binary_data,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create view _airbyte_test_normalization.dedup_exchange_rate_stg__dbt_tmp
create view _airbyte_test_normalization.dedup_exchange_rate_stg

as (

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create view _airbyte_test_normalization.multiple_column_names_conflicts_stg__dbt_tmp
create view _airbyte_test_normalization.multiple_column_names_conflicts_stg

as (

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: {{ ref('dedup_exchange_rate_ab1') }}
select
accurateCastOrNull(id, '{{ dbt_utils.type_bigint() }}') as id,
accurateCastOrNull(trim(BOTH '"' from id), '{{ dbt_utils.type_bigint() }}') as id,
nullif(accurateCastOrNull(trim(BOTH '"' from currency), '{{ dbt_utils.type_string() }}'), 'null') as currency,
toDate(parseDateTimeBestEffortOrNull(trim(BOTH '"' from {{ empty_string_to_null('date') }}))) as date,
parseDateTime64BestEffortOrNull(trim(BOTH '"' from {{ empty_string_to_null('timestamp_col') }})) as timestamp_col,
accurateCastOrNull({{ quote('HKD@spéçiäl & characters') }}, '{{ dbt_utils.type_float() }}') as {{ quote('HKD@spéçiäl & characters') }},
accurateCastOrNull(trim(BOTH '"' from {{ quote('HKD@spéçiäl & characters') }}), '{{ dbt_utils.type_float() }}') as {{ quote('HKD@spéçiäl & characters') }},
nullif(accurateCastOrNull(trim(BOTH '"' from HKD_special___characters), '{{ dbt_utils.type_string() }}'), 'null') as HKD_special___characters,
accurateCastOrNull(NZD, '{{ dbt_utils.type_float() }}') as NZD,
accurateCastOrNull(USD, '{{ dbt_utils.type_float() }}') as USD,
accurateCastOrNull(trim(BOTH '"' from NZD), '{{ dbt_utils.type_float() }}') as NZD,
accurateCastOrNull(trim(BOTH '"' from USD), '{{ dbt_utils.type_float() }}') as USD,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: {{ ref('renamed_dedup_cdc_excluded_ab1') }}
select
accurateCastOrNull(id, '{{ dbt_utils.type_bigint() }}') as id,
accurateCastOrNull(_ab_cdc_updated_at, '{{ dbt_utils.type_float() }}') as _ab_cdc_updated_at,
accurateCastOrNull(trim(BOTH '"' from id), '{{ dbt_utils.type_bigint() }}') as id,
accurateCastOrNull(trim(BOTH '"' from _ab_cdc_updated_at), '{{ dbt_utils.type_float() }}') as _ab_cdc_updated_at,
_airbyte_ab_id,
_airbyte_emitted_at,
{{ current_timestamp() }} as _airbyte_normalized_at
Expand Down
Loading