Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

committing solution containing: #11

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,43 @@ This should produce customers, products and transaction data under `./input_data

Please save Snowflake model code in `snowflake` and infrastructure code in `infra` folder.

Update this README as code evolves.
Update this README as code evolves.

### Setting up infrastructure

You need to store your AWS account access details and configuration in environment variables.
To do that please run the following in your terminal:
```
export AWS_ACCESS_KEY_ID="<your access key here>"
export AWS_SECRET_ACCESS_KEY="<your secret access here>"
export REGION="<region for your environment here>" e.g. "eu-west-2".
```

You can set your production in a different region. However, it's recommended to set a region that is geographically close to the location where your snowflake account is.
Please note that the current set up does not support to set up DEV and PROD environments under different AWS accounts just yet.

Please navigate to `/infra` folder and run `terraform init`
After that, run `terraform apply`.

### Setting up Snowflake

Please sign up for a Snowflake free trial account via https://signup.snowflake.com page.
Once done, you need to export your username and snowflake account name into environment variables as following:
```
export ACCOUNT="<your snowflake account here>" e.g. "gfxxxxx.eu-west-2.aws"
export PASSWORD="<your snowflake password here>"
export USER_NAME="<your snowflake username here>"
```

To setup Snowflake E2E with data and create the view navigate to the snowflake folder `cd snowflake\` and run `python main.py -env <your environment here>` e.g. `python main.py -env DEV` or `python main.py -env PROD`
To destroy what you have created in Snowflake run `python main.py -env <your environment here> -ac destroy` e.g. `python main.py -env DEV -ac destroy` or `python main.py -env PROD -ac destroy`


what I'd change in the future are:
- modularise terraform, so one can decide to create s3 dev or prod or both
- manage terraform state files in s3
- authentication to Snowflake with key pairs
- create IAM role for Snowflake <--> s3
- introduce Secrets Managers and save creds and sensitive info there
- for modelling introduce DBT and automate its setup
- some code refactoring to build classes, add annotations and docstrings
1 change: 1 addition & 0 deletions infra/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
provider "aws" {}
11 changes: 11 additions & 0 deletions infra/s3-dev.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
resource "aws_s3_bucket" "raw-data-dev" {
bucket = "convex-raw-data-dev"
acl = "private"

tags = {
Terraform = "true"
Project = "convex"
Environment = "dev"
}
}

10 changes: 10 additions & 0 deletions infra/s3-prod.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
resource "aws_s3_bucket" "raw-data-prod" {
bucket = "convex-raw-data-prod"
acl = "private"

tags = {
Terraform = "true"
Project = "convex"
Environment = "prod"
}
}
10 changes: 3 additions & 7 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
boto3==1.11.17
snowflake-connector-python==2.4.3
requests==2.23.0
atomicwrites==1.3.0
attrs==19.3.0
boto3==1.10.8
botocore==1.13.8
docutils==0.15.2
importlib-metadata==0.23
jmespath==0.9.4
Expand All @@ -14,15 +15,10 @@ pluggy==0.13.0
py==1.8.0
py4j==0.10.7
pyparsing==2.4.2
pyspark==2.4.4
pyspark-stubs==2.4.0.post6
pytest==5.2.2
python-dateutil==2.8.1
pytz==2019.3
s3transfer==0.2.1
scipy==1.3.1
simplejson==3.16.0
six==1.12.0
urllib3==1.25.6
wcwidth==0.1.7
zipp==0.6.0
4 changes: 4 additions & 0 deletions snowflake/commands/sf_destroy_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DESTROY = [
"DROP DATABASE IF EXISTS {DB};",
"DROP WAREHOUSE IF EXISTS STAGING_WH;"
]
51 changes: 51 additions & 0 deletions snowflake/commands/sf_ingest_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
INGEST_TRANSACTIONS = """
COPY INTO {DB}.STAGING.TRANSACTIONS(
PAYLOAD,
FILE_KEY,
INSERTED_TIMESTAMP
)
FROM (
select
$1::variant,
METADATA$FILENAME::VARCHAR,
convert_timezone('Europe/London',CURRENT_TIMESTAMP())::TIMESTAMP_TZ
from @{DB}.STAGING.TRANSACTIONS_JSON
);
"""


INGEST_CUSTOMERS = """
COPY INTO {DB}.STAGING.CUSTOMERS (
customer_id,
loyalty_score,
FILE_KEY,
INSERTED_TIMESTAMP
)
from (
select
$1::variant as customer_id,
$2::variant as loyalty_score,
METADATA$FILENAME::VARCHAR,
convert_timezone('Europe/London',CURRENT_TIMESTAMP())::TIMESTAMP_TZ
from @{DB}.STAGING.CUSTOMERS_CSV
);
"""


INGEST_PRODUCTS = """
COPY INTO {DB}.STAGING.PRODUCTS (
product_id,
product_description,
product_category,
FILE_KEY,
INSERTED_TIMESTAMP
)
from (
select
$1::variant as product_id,
$2::variant as product_description,
$3::variant as product_category,
METADATA$FILENAME::VARCHAR,
convert_timezone('Europe/London',CURRENT_TIMESTAMP())::TIMESTAMP_TZ
from @{DB}.STAGING.PRODUCTS_CSV);
"""
79 changes: 79 additions & 0 deletions snowflake/commands/sf_setup_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@

SETUP_USER_AND_ROLE = ["USE ROLE SYSADMIN;",
"create WAREHOUSE IF NOT EXISTS STAGING_WH WAREHOUSE_SIZE='X-SMALL' INITIALLY_SUSPENDED=TRUE;",
"create DATABASE IF NOT EXISTS {DB};",
"CREATE SCHEMA IF NOT EXISTS {DB}.STAGING;",
"CREATE SCHEMA IF NOT EXISTS {DB}.PERSIST ;",
"CREATE SCHEMA IF NOT EXISTS {DB}.ANALYTICS;",
]

SETUP_DB_OBJECTS = [
"""create table if not exists {DB}.STAGING.TRANSACTIONS(
PAYLOAD VARIANT,
FILE_KEY VARCHAR,
INSERTED_TIMESTAMP TIMESTAMP_TZ(9));""",

"""create table if not exists {DB}.STAGING.CUSTOMERS(
customer_id VARIANT,
loyalty_score VARIANT,
FILE_KEY VARCHAR,
INSERTED_TIMESTAMP TIMESTAMP_TZ(9));""",

"""create table if not exists {DB}.STAGING.PRODUCTS(
product_id VARIANT,
product_description VARIANT,
product_category VARIANT,
FILE_KEY VARCHAR,
INSERTED_TIMESTAMP TIMESTAMP_TZ(9));""",

"""
create table if not exists {DB}.PERSIST.TRANSACTIONS(
customer_id string,
price number,
product_id string,
date_of_purchase datetime,
FILE_KEY VARCHAR,
INSERTED_TIMESTAMP TIMESTAMP_TZ(9));
""",


"""CREATE or replace STAGE {DB}.STAGING.TRANSACTIONS_JSON
url='s3://{BUCKET}/TRANSACTIONS'
COMMENT = 'Stage for transactions data from external S3 location in JSON format'
credentials = (AWS_KEY_ID = '{AWS_KEY_ID}' AWS_SECRET_KEY = '{AWS_SECRET_KEY}')
file_format = (type = 'json', compression = gzip)
COPY_OPTIONS = (ON_ERROR = SKIP_FILE);""",

"""CREATE or replace STAGE {DB}.STAGING.CUSTOMERS_CSV
url='s3://{BUCKET}/CUSTOMERS'
COMMENT = 'Stage for customers data from external S3 location in CSV format'
credentials = (AWS_KEY_ID = '{AWS_KEY_ID}' AWS_SECRET_KEY = '{AWS_SECRET_KEY}')
file_format = (type = 'csv', empty_field_as_null = true, SKIP_HEADER = 1)
COPY_OPTIONS = (ON_ERROR = SKIP_FILE);""",

"""CREATE or replace STAGE {DB}.STAGING.PRODUCTS_CSV
url='s3://{BUCKET}/PRODUCTS'
COMMENT = 'Stage for products data from external S3 location in CSV format'
credentials = (AWS_KEY_ID = '{AWS_KEY_ID}' AWS_SECRET_KEY = '{AWS_SECRET_KEY}')
file_format = (type = 'csv', empty_field_as_null = true, SKIP_HEADER = 1)
COPY_OPTIONS = (ON_ERROR = SKIP_FILE);""",

"""
create table if not exists {DB}.PERSIST.CUSTOMERS(
customer_id string,
loyalty_score integer,
FILE_KEY VARCHAR,
INSERTED_TIMESTAMP TIMESTAMP_TZ(9));
""",

"""
create table if not exists {DB}.PERSIST.PRODUCTS(
product_id string,
product_description string,
product_category string,
FILE_KEY VARCHAR,
INSERTED_TIMESTAMP TIMESTAMP_TZ(9));
"""
]


54 changes: 54 additions & 0 deletions snowflake/commands/sf_transform_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
PERSIST = ["""
insert into {DB}.persist.customers
select
s.CUSTOMER_ID,
s.LOYALTY_SCORE,
s.file_key,
convert_timezone('Europe/London',CURRENT_TIMESTAMP())::TIMESTAMP_TZ
from {DB}.staging.customers s
where
s.INSERTED_TIMESTAMP >= (select ifnull(max(INSERTED_TIMESTAMP),(select min(INSERTED_TIMESTAMP) from {DB}.staging.customers)) from {DB}.persist.CUSTOMERS);
""",

"""
insert into {DB}.persist.products
select
s.product_id,
s.product_description,
s.product_category,
s.file_key,
convert_timezone('Europe/London',CURRENT_TIMESTAMP())::TIMESTAMP_TZ
from {DB}.staging.products s
where
s.INSERTED_TIMESTAMP >= (select ifnull(max(INSERTED_TIMESTAMP),(select min(INSERTED_TIMESTAMP) from {DB}.staging.products)) from {DB}.persist.products);
""",

"""
insert into {DB}.persist.transactions
select
s.payload:customer_id::string,
f1.value:price::number,
f1.value:product_id::string,
s.payload:date_of_purchase::datetime,
s.file_key,
convert_timezone('Europe/London',CURRENT_TIMESTAMP())::TIMESTAMP_TZ
from {DB}.staging.transactions s,
lateral flatten(input => s.payload, path => 'basket') f1
where
s.INSERTED_TIMESTAMP >= (select ifnull(max(INSERTED_TIMESTAMP),(select min(INSERTED_TIMESTAMP) from {DB}.staging.transactions)) from {DB}.persist.transactions);
"""]

ANALYTICS = ["""
create or replace view {DB}.analytics.single_view as
select
t.customer_id,
c.loyalty_score,
t.product_id,
p.product_category,
count(t.file_Key) purchase_count
from {DB}.persist.transactions t
join {DB}.persist.customers c on t.customer_id = c.customer_id
join {DB}.persist.products p on p.product_id = t.product_id
group by 1,2,3,4
;
"""]
5 changes: 5 additions & 0 deletions snowflake/conf-repo/config-dev.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"ENVIRONMENT": "DEV",
"BUCKET_NAME": "convex-raw-data-dev",
"DB": "DEV"
}
5 changes: 5 additions & 0 deletions snowflake/conf-repo/config-prod.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"ENVIRONMENT": "PROD",
"BUCKET_NAME": "convex-raw-data-prod",
"DB": "PROD"
}
Empty file added snowflake/ingestion/__init__.py
Empty file.
82 changes: 82 additions & 0 deletions snowflake/ingestion/raw_to_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import os
import sys
import datetime
import argparse

project_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
sys.path.append(project_path)
input_path = os.path.join(project_path, 'input_data')

from snowflake.utils.s3_operators import transfer_to_s3, generate_file_name
from snowflake.utils.generate_conf import get_conf_file_path_by_name, get_conf_file_by_path
from snowflake.utils.argument_parser import get_arguments


def process_raw(source, destination):
decode = {
'TRANSACTIONS': 'transactions',
'CUSTOMERS': 'customers.csv',
'PRODUCTS': 'products.csv',
'ALL': ['transactions', 'customers.csv', 'products.csv']
}

if decode.get(source) == 'transactions' or source == 'ALL':
walk_dir = os.path.abspath(os.path.join(input_path, 'starter/transactions'))
for root, subdirs, files in os.walk(walk_dir):
print('--\nroot = ' + root)
for subdir in subdirs:
print('\t- subdirectory ' + subdir)
list_file_path = os.path.join(root, subdir)

print('list_file_path = ' + list_file_path)
list_files = [os.path.join(list_file_path, f) for f in os.listdir(list_file_path)]
for list_file in list_files:
with open(list_file, 'r') as lf:
f_content = lf.read()
loadtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
filename = generate_file_name('json', suffix=loadtime)
filekey = '/'.join(['TRANSACTIONS', subdir, filename])
transfer_to_s3(f_content, destination, filekey, 'overwrite', 1)

if decode.get(source) == 'customers.csv' or source == 'ALL':
file = os.path.abspath(os.path.join(input_path, f'starter/customers.csv'))
with open(file, 'r') as lf:
f_content = lf.read()
loadtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
filename = generate_file_name('csv', suffix=loadtime)
filekey = '/'.join(['CUSTOMERS', filename])
transfer_to_s3(f_content, destination, filekey, 'overwrite')

if decode.get(source) == 'products.csv' or source == 'ALL':
file = os.path.abspath(os.path.join(input_path, f'starter/products.csv'))
with open(file, 'r') as lf:
f_content = lf.read()
loadtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
filename = generate_file_name('csv', suffix=loadtime)
filekey = '/'.join(['PRODUCTS', filename])
transfer_to_s3(f_content, destination, filekey, 'overwrite')


if __name__ == '__main__':

description = 'Ingestion for product, customer and transactions'
parser = argparse.ArgumentParser(description=description)

for argument in get_arguments("environment", "transfer_mode", "data_source", "bucket", "secret_name"):
default = argument.get('default', None)
choices = argument.get('choices', None)

parser.add_argument(
argument['short_command'], argument["command"], help=argument["help"], default=default, choices=choices,
required=argument['required'], nargs=argument['nargs'], type=argument['type']
)

args = parser.parse_args()
env = args.environment
data_source = args.data_source
config_path = get_conf_file_path_by_name(env)
config = get_conf_file_by_path(config_path)

bucket_name = args.bucket or config.get('BUCKET_NAME', args.bucket)

process_raw(data_source, bucket_name)
Loading