Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETL-676] Create data loading procedure for each parquet datatype #132

Merged
merged 5 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions snowflake/objects/database/recover/schema/parquet/deploy.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
/*
Create a parquet schema (if it doesn't yet exist) and deploy all child objects.

Jinja templating variables:
git_branch - The name of the git branch from which we are deploying.
*/
CREATE SCHEMA IF NOT EXISTS parquet;
USE SCHEMA parquet;

SET parquet_file_format_name = 'parquet_format';
SET parquet_stage_name = 'parquet_s3';
SET parquet_prod_stage_name = 'parquet_prod_s3';
SET parquet_dev_stage_name = 'parquet_dev_s3';

EXECUTE IMMEDIATE
FROM './file_format/deploy.sql'
Expand All @@ -16,7 +20,29 @@ EXECUTE IMMEDIATE
FROM './stage/deploy.sql'
USING (
git_branch => '{{ git_branch }}',
parquet_stage_name => $parquet_stage_name
parquet_prod_stage_name => $parquet_prod_stage_name,
parquet_dev_stage_name => $parquet_dev_stage_name
);
EXECUTE IMMEDIATE
FROM './table/deploy.sql';
EXECUTE IMMEDIATE
$$
BEGIN
IF ('{{ git_branch }}' = 'main') THEN
-- Our procedures will reference the prod stage
EXECUTE IMMEDIATE
FROM './procedure/deploy.sql'
USING (
stage_name => $parquet_prod_stage_name,
file_format => $parquet_file_format_name
);
ELSE
EXECUTE IMMEDIATE
FROM './procedure/deploy.sql'
USING (
stage_name => $parquet_dev_stage_name,
file_format => $parquet_file_format_name
);
END IF;
END;
$$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
A stored procedure which copies Parquet data from a named stage into a table.

Because of limitations in how we can pass variables to stage names,
this procedure is specific to a stage location. That is, we cannot
use Snowflake scripting variables within the stage name, so we instead
use Jinja variables, which has the side effect of "fixing" the procedure
to use a specific stage location.

Jinja templating variables:
datatype - The datatype which our stage location refers to.
stage_name - The name of the stage where our data exists.
stage_path - The location within the stage where our data exists.
file_format - The name of the file format object used during copy.
*/
CREATE OR REPLACE PROCEDURE copy_into_table_from_{{ datatype }}_parquet_stage(
target_table VARCHAR
)
RETURNS TABLE ()
LANGUAGE SQL
as
$$
DECLARE
res RESULTSET DEFAULT (
COPY INTO IDENTIFIER(:target_table)
FROM @{{ stage_name }}/{{ stage_path }}
FILE_FORMAT = (
FORMAT_NAME = '{{ file_format }}'
)
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE
);
BEGIN
RETURN TABLE(res);
END;
$$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Deploy all PROCEDURE objects

Jinja templating variables:
stage_name - The name of the stage where our data exists.
file_format - The name of the file format object used by the
`copy_into_table_from_stage.sql` procedure.
*/

WITH create_procedure_for_each_parquet_table AS PROCEDURE ()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
DECLARE
parquet_datatypes ARRAY := [
'enrolledparticipants_customfields_symptoms',
'enrolledparticipants_customfields_treatments',
'enrolledparticipants',
'fitbitactivitylogs',
'fitbitdailydata',
'fitbitdevices',
'fitbitecg',
'fitbitecg_waveformsamples',
'fitbitintradaycombined',
'fitbitrestingheartrates',
'fitbitsleeplogs',
'fitbitsleeplogs_sleeplogdetails',
'googlefitsamples',
'healthkitv2activitysummaries',
'healthkitv2electrocardiogram',
'healthkitv2electrocardiogram_subsamples',
'healthkitv2heartbeat',
'healthkitv2heartbeat_subsamples',
'healthkitv2samples',
'healthkitv2statistics',
'healthkitv2workouts_events',
'healthkitv2workouts',
'symptomlog',
'symptomlog_value_symptoms',
'symptomlog_value_treatments'
];
datatype VARCHAR;
dataset_name VARCHAR;
BEGIN
FOR i in 0 to array_size(:parquet_datatypes)-1 DO
datatype := GET(:parquet_datatypes, :i);
dataset_name := CONCAT('dataset_', :datatype);
-- Create a stored procedure which uses this data type's stage location
EXECUTE IMMEDIATE
FROM './copy_into_table_from_stage.sql'
USING (
datatype => :datatype,
stage_name => '{{ stage_name }}',
stage_path => :dataset_name,
file_format => '{{ file_format }}'
);
END FOR;
END;
$$
CALL create_procedure_for_each_parquet_table();
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
Deploy all stages under the `parquet` schema.
*/
EXECUTE IMMEDIATE
FROM './parquet_s3.sql'
FROM './parquet_prod_s3.sql'
USING (
git_branch => '{{ git_branch }}',
parquet_stage_name => '{{ parquet_stage_name }}'
parquet_stage_name => '{{ parquet_prod_stage_name }}'
);
EXECUTE IMMEDIATE
FROM './parquet_dev_s3.sql'
USING (
git_branch => '{{ git_branch }}',
parquet_stage_name => '{{ parquet_dev_stage_name }}'
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/*
Create an external stage over the dev Parquet data in S3
*/
CREATE OR REPLACE STAGE {{ parquet_stage_name }}
URL = 's3://recover-dev-processed-data/{{ git_branch }}/parquet/'
STORAGE_INTEGRATION = recover_dev_s3;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Create an external stage over the Parquet data in S3
Create an external stage over the production Parquet data in S3
*/
CREATE OR REPLACE STAGE {{ parquet_stage_name }}
URL = 's3://recover-processed-data/{{ git_branch }}/parquet/'
Expand Down
3 changes: 3 additions & 0 deletions snowflake/objects/deploy.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
- STORAGE INTEGRATION `RECOVER_PROD_S3`
* An S3 storage integration which allows access to the
S3 buckets in the RECOVER production account.
- STORAGE INTEGRATION `RECOVER_DEV_S3`
* An S3 storage integration which allows access to the
S3 buckets in the RECOVER dev account.

Additionally, we assume that the following databases have already been created
when deploying to the "staging" or "main" environment, respectively:
Expand Down
Loading