Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add Spark support #108

Merged
merged 4 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 68 additions & 6 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ version: 2.1

jobs:

integration-tests:
integration-tests-core:

docker:
- image: cimg/python:3.9.9
- image: cimg/postgres:14.0
Expand All @@ -13,19 +14,27 @@ jobs:
DBT_PROFILES_DIR: ./integration_tests/ci
DBT_PROJECT_DIR: ./integration_tests
BIGQUERY_SERVICE_KEY_PATH: "/home/circleci/bigquery-service-key.json"
DBT_VERSION: 1.6.0
DBT_VERSION: 1.6.*

steps:
- checkout
- run:
name: Install Python packages
- run: &pip-install-core
name: Install core Python packages & dbt-core
command: |
python3 -m venv venv
. venv/bin/activate
pip install -U pip setuptools wheel
pip install dbt-core==$DBT_VERSION dbt-postgres==$DBT_VERSION dbt-bigquery==$DBT_VERSION dbt-snowflake==$DBT_VERSION dbt-duckdb==$DBT_VERSION
pip install "dbt-core==$DBT_VERSION"

- run:
name: Install dbt adapter packages
command: |
python3 -m venv venv
. venv/bin/activate
pip install "dbt-postgres==$DBT_VERSION" "dbt-bigquery==$DBT_VERSION" "dbt-snowflake==$DBT_VERSION"
pip install "dbt-duckdb==$DBT_VERSION"

- run: &dbt-deps
name: Install dbt dependencies
command: |
. venv/bin/activate
Expand Down Expand Up @@ -74,12 +83,65 @@ jobs:
- store_artifacts:
path: ./logs

integration-tests-spark-thrift:

docker:
- image: cimg/python:3.9.9
- image: godatadriven/spark:3.1.1
environment:
WAIT_FOR: localhost:5432
command: >
--class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
--name Thrift JDBC/ODBC Server
- image: postgres:9.6.17-alpine
environment:
POSTGRES_USER: dbt
POSTGRES_PASSWORD: dbt
POSTGRES_DB: metastore

resource_class: small

environment:
DBT_PROFILES_DIR: ./integration_tests/ci
DBT_PROJECT_DIR: ./integration_tests
DBT_VERSION: 1.6.*

steps:
- checkout
- run:
name: Install Ubuntu packages
command: |
sudo apt-get update
sudo apt-get install libsasl2-dev libsasl2-2
- run: *pip-install-core
- run:
name: Install dbt adapter packages
command: |
python3 -m venv venv
. venv/bin/activate
pip install dbt-spark "dbt-spark[PyHive]"
- run: *dbt-deps
- run:
name: Wait for Spark-Thrift
command: dockerize -wait tcp://localhost:10000 -timeout 15m -wait-retry-interval 5s
- run:
name: "Run Tests - Spark"
command: |
. venv/bin/activate
dbt build -t spark --project-dir $DBT_PROJECT_DIR

- store_artifacts:
path: ./logs

workflows:
version: 2
test-all:
jobs:
- hold:
type: approval
- integration-tests:
- integration-tests-core:
requires:
- hold
- integration-tests-spark-thrift:
requires:
- hold
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ target/
dbt_packages/
logs/
.python-version
integration_tests/.spark-warehouse
integration_tests/.hive-metastore
11 changes: 11 additions & 0 deletions integration_tests/ci/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,15 @@ integration_tests:
type: duckdb
path: ":memory:"

spark:
type: spark
method: thrift
host: 127.0.0.1
port: 10000
user: dbt
schema: analytics
connect_retries: 5
connect_timeout: 60
retry_all: true

target: postgres
28 changes: 28 additions & 0 deletions integration_tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
version: "3.7"
services:

dbt-spark3-thrift:
image: godatadriven/spark:3.1.1
ports:
- "10000:10000"
- "4040:4040"
depends_on:
- dbt-hive-metastore
command: >
--class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
--name Thrift JDBC/ODBC Server
volumes:
- ./.spark-warehouse/:/spark-warehouse/
- ./docker/hive-site.xml:/usr/spark/conf/hive-site.xml
- ./docker/spark-defaults.conf:/usr/spark/conf/spark-defaults.conf
environment:
- WAIT_FOR=dbt-hive-metastore:5432

dbt-hive-metastore:
image: postgres:9.6.17-alpine
volumes:
- ./.hive-metastore/:/var/lib/postgresql/data
environment:
- POSTGRES_USER=dbt
- POSTGRES_PASSWORD=dbt
- POSTGRES_DB=metastore
1 change: 1 addition & 0 deletions integration_tests/docker-start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker-compose up -d
1 change: 1 addition & 0 deletions integration_tests/docker-stop.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker-compose down && rm -rf ./.hive-metastore/ && rm -rf ./.spark-warehouse/
46 changes: 46 additions & 0 deletions integration_tests/docker/hive-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<configuration>

<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:postgresql://dbt-hive-metastore/metastore</value>
</property>

<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.postgresql.Driver</value>
</property>

<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>dbt</value>
</property>

<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>dbt</value>
</property>

<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>
9 changes: 9 additions & 0 deletions integration_tests/docker/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
spark.driver.memory 2g
spark.executor.memory 2g
spark.hadoop.datanucleus.autoCreateTables true
spark.hadoop.datanucleus.schema.autoCreateTables true
spark.hadoop.datanucleus.fixedDatastore false
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.jars.packages org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.userClassPathFirst true
46 changes: 42 additions & 4 deletions integration_tests/macros/get_test_dates.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ select
1 as day_of_week,
7 as iso_day_of_week,
334 as day_of_year,
cast('2020-11-29' as date) as week_start_date,
cast('2020-12-05' as date) as week_end_date,
cast('{{ get_test_week_start_date()[0] }}' as date) as week_start_date,
cast('{{ get_test_week_end_date()[0] }}' as date) as week_end_date,
{{ get_test_week_of_year()[0] }} as week_of_year,
-- in ISO terms, this is the end of the prior week
cast('2020-11-23' as date) as iso_week_start_date,
Expand Down Expand Up @@ -44,8 +44,8 @@ select
3 as day_of_week,
2 as iso_day_of_week,
336 as day_of_year,
cast('2020-11-29' as date) as week_start_date,
cast('2020-12-05' as date) as week_end_date,
cast('{{ get_test_week_start_date()[1] }}' as date) as week_start_date,
cast('{{ get_test_week_end_date()[1] }}' as date) as week_end_date,
{{ get_test_week_of_year()[1] }} as week_of_year,
cast('2020-11-30' as date) as iso_week_start_date,
cast('2020-12-06' as date) as iso_week_end_date,
Expand Down Expand Up @@ -83,6 +83,39 @@ select
{{ return([48,49]) }}
{%- endmacro %}

{% macro spark__get_test_week_of_year() -%}
{# weeks_of_year for '2020-11-29' and '2020-12-01', respectively #}
{# spark uses ISO year #}
{{ return([48,49]) }}
{%- endmacro %}


{% macro get_test_week_start_date() -%}
{{ return(adapter.dispatch('get_test_week_start_date', 'dbt_date_integration_tests') ()) }}
{%- endmacro %}

{% macro default__get_test_week_start_date() -%}
{{ return(['2020-11-29', '2020-11-29']) }}
{%- endmacro %}

{% macro spark__get_test_week_start_date() -%}
{# spark does not support non-iso weeks #}
{{ return(['2020-11-23', '2020-11-30']) }}
{%- endmacro %}


{% macro get_test_week_end_date() -%}
{{ return(adapter.dispatch('get_test_week_end_date', 'dbt_date_integration_tests') ()) }}
{%- endmacro %}

{% macro default__get_test_week_end_date() -%}
{{ return(['2020-12-05', '2020-12-05']) }}
{%- endmacro %}

{% macro spark__get_test_week_end_date() -%}
{# spark does not support non-iso weeks #}
{{ return(['2020-11-29', '2020-12-06']) }}
{%- endmacro %}


{% macro get_test_timestamps() -%}
Expand All @@ -108,3 +141,8 @@ select
{{ return(['2021-06-07 07:35:20.000000',
'2021-06-07 14:35:20.000000']) }}
{%- endmacro %}

{% macro spark__get_test_timestamps() -%}
{{ return(['2021-06-07 07:35:20.000000',
'2021-06-07 14:35:20.000000']) }}
{%- endmacro %}
14 changes: 7 additions & 7 deletions macros/calendar_date/convert_timezone.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ convert_timezone('{{ source_tz }}', '{{ target_tz }}',
timestamp(datetime({{ column }}, '{{ target_tz}}'))
{%- endmacro -%}

{%- macro spark__convert_timezone(column, target_tz, source_tz) -%}
from_utc_timestamp(
to_utc_timestamp({{ column }}, '{{ source_tz }}'),
'{{ target_tz }}'
)
{%- endmacro -%}

{% macro postgres__convert_timezone(column, target_tz, source_tz) -%}
cast(
cast({{ column }} as {{ dbt.type_timestamp() }})
Expand All @@ -35,3 +28,10 @@ cast(
{% macro duckdb__convert_timezone(column, target_tz, source_tz) -%}
{{ return(dbt_date.postgres__convert_timezone(column, target_tz, source_tz)) }}
{%- endmacro -%}

{%- macro spark__convert_timezone(column, target_tz, source_tz) -%}
from_utc_timestamp(
to_utc_timestamp({{ column }}, '{{ source_tz }}'),
'{{ target_tz }}'
)
{%- endmacro -%}
5 changes: 5 additions & 0 deletions macros/calendar_date/day_name.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@
dayname({{ date }})
{%- endif -%}
{%- endmacro %}

{%- macro spark__day_name(date, short) -%}
{%- set f = 'E' if short else 'EEEE' -%}
date_format({{ date }}, '{{ f }}')
{%- endmacro %}
9 changes: 9 additions & 0 deletions macros/calendar_date/day_of_week.sql
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,12 @@
{%- macro duckdb__day_of_week(date, isoweek) -%}
{{ return(dbt_date.postgres__day_of_week(date, isoweek)) }}
{%- endmacro %}


{%- macro spark__day_of_week(date, isoweek) -%}

{%- set dow = "dayofweek_iso" if isoweek else "dayofweek" -%}

{{ dbt_date.date_part(dow, date) }}

{%- endmacro %}
4 changes: 4 additions & 0 deletions macros/calendar_date/day_of_year.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@
{%- macro redshift__day_of_year(date) -%}
cast({{ dbt_date.date_part('dayofyear', date) }} as {{ dbt.type_bigint() }})
{%- endmacro %}

{%- macro spark__day_of_year(date) -%}
dayofyear({{ date }})
{%- endmacro %}
1 change: 0 additions & 1 deletion macros/calendar_date/iso_week_end.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,3 @@
{%- macro snowflake__iso_week_end(date) -%}
{{ dbt_date._iso_week_end(date, 'weekiso') }}
{%- endmacro %}

4 changes: 4 additions & 0 deletions macros/calendar_date/iso_week_of_year.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ cast({{ dbt_date.date_part(week_type, date) }} as {{ dbt.type_int() }})
{%- macro duckdb__iso_week_of_year(date) -%}
{{ return(dbt_date.postgres__iso_week_of_year(date)) }}
{%- endmacro %}

{%- macro spark__iso_week_of_year(date) -%}
{{ dbt_date._iso_week_of_year(date, 'week') }}
{%- endmacro %}
4 changes: 4 additions & 0 deletions macros/calendar_date/iso_week_start.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ cast({{ dbt.date_trunc(week_type, date) }} as date)
{%- macro duckdb__iso_week_start(date) -%}
{{ return(dbt_date.postgres__iso_week_start(date)) }}
{%- endmacro %}

{%- macro spark__iso_week_start(date) -%}
{{ dbt_date._iso_week_start(date, 'week') }}
{%- endmacro %}
5 changes: 5 additions & 0 deletions macros/calendar_date/month_name.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@
monthname({{ date }})
{%- endif -%}
{%- endmacro %}

{%- macro spark__month_name(date, short) -%}
{%- set f = 'LLL' if short else 'LLLL' -%}
date_format({{ date }}, '{{ f }}')
{%- endmacro %}
5 changes: 5 additions & 0 deletions macros/calendar_date/to_unixtimestamp.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@
{%- macro bigquery__to_unixtimestamp(timestamp) -%}
unix_seconds({{ timestamp }})
{%- endmacro %}

{%- macro spark__to_unixtimestamp(timestamp) -%}
unix_timestamp({{ timestamp }})
{%- endmacro %}

4 changes: 4 additions & 0 deletions macros/calendar_date/week_of_year.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ cast(to_char({{ date }}, 'WW') as {{ dbt.type_int() }})
{%- macro duckdb__week_of_year(date) -%}
cast(ceil(dayofyear({{ date }}) / 7) as int)
{%- endmacro %}

{# {%- macro spark__week_of_year(date) -%}
weekofyear({{ date }})
{%- endmacro %} #}
Loading