Skip to content

Commit

Permalink
More updates (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbourbeau authored Mar 22, 2024
1 parent a00d25c commit 8dca03a
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 17 deletions.
9 changes: 6 additions & 3 deletions pipeline/config.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Output location for data files. Can be a local directory
# or a remote path like "s3://path/to/bucket".
data-dir: ./data-test
# Whether to run data-processing tasks locally
# or on the cloud with Coiled.
local: true
# Output location for data files. Can be a local directory
# or a remote path like "s3://path/to/bucket".
data-dir: ./data
# If using Coiled (`local: false`), use this Coiled workspace.
# Defaults to your default workspace.
workspace: null
8 changes: 4 additions & 4 deletions pipeline/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from prefect import flow
from rich import print

from .settings import DASHBOARD_FILE, LOCAL, REGION
from .settings import DASHBOARD_FILE, LOCAL, REGION, WORKSPACE

port = 8080
name = "etl-tpch-dashboard"
subdomain = "etl-tpch"
name = "dashboard"
subdomain = "dashboard"


def deploy():
Expand All @@ -22,6 +22,7 @@ def deploy():
else:
cmd = f"""
coiled run \
--workspace {WORKSPACE} \
--region {REGION} \
--vm-type t3.medium \
-f dashboard.py \
Expand All @@ -31,7 +32,6 @@ def deploy():
-e AWS_ACCESS_KEY_ID={os.environ['AWS_ACCESS_KEY_ID']} \
-e AWS_SECRET_ACCESS_KEY={os.environ['AWS_SECRET_ACCESS_KEY']} \
--detach \
--keepalive '520 weeks' \
--name {name} \
-- \
{cmd}
Expand Down
21 changes: 16 additions & 5 deletions pipeline/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,24 @@

import coiled
import duckdb
import numpy as np
import pandas as pd
import psutil
from dask.distributed import print
from prefect import flow, task

from .settings import LOCAL, PROCESSED_DIR, REGION, STAGING_DIR, fs, lock_generate
from .settings import (
LOCAL,
PROCESSED_DIR,
REGION,
STAGING_DIR,
WORKSPACE,
fs,
lock_generate,
)


def new_time(t, t_start=None, t_end=None):

d = pd.Timestamp("1998-12-31") - pd.Timestamp("1992-01-01")
return t_start + (t - pd.Timestamp("1992-01-01")) * ((t_end - t_start) / d)

Expand All @@ -24,7 +32,7 @@ def new_time(t, t_start=None, t_end=None):
local=LOCAL,
region=REGION,
vm_type="m6i.2xlarge",
tags={"workflow": "etl-tpch"},
account=WORKSPACE,
)
def generate(scale: float, path: os.PathLike) -> None:
static_tables = ["customer", "nation", "part", "partsupp", "region", "supplier"]
Expand Down Expand Up @@ -84,12 +92,15 @@ def generate(scale: float, path: os.PathLike) -> None:
.rename(columns={"o_orderkey_new": "l_orderkey"})
)

# Shift times to be more recent
# Shift times to be more recent and lineitem prices to be non-uniform
if table == "lineitem":
df["l_shipdate"] = new_time(
df["l_shipdate"], t_start=now, t_end=now + pd.Timedelta("7 days")
df["l_shipdate"], t_start=now, t_end=now + pd.Timedelta("3 days")
)
df = df.rename(columns={"l_shipdate": "l_ship_time"})
df["l_extendedprice"] = (
np.random.rand(df.shape[0]) * df["l_extendedprice"]
)
cols = [c for c in df.columns if "date" in c]
df[cols] = new_time(
df[cols], t_start=now - pd.Timedelta("15 minutes"), t_end=now
Expand Down
8 changes: 5 additions & 3 deletions pipeline/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
PROCESSED_DIR,
REGION,
STAGING_DIR,
WORKSPACE,
fs,
lock_compact,
lock_json_to_parquet,
Expand All @@ -25,11 +26,11 @@
retry_jitter_factor=1,
)
@coiled.function(
name="data-etl",
name="json-to-parquet",
local=LOCAL,
region=REGION,
vm_type="m6i.2xlarge",
tags={"workflow": "etl-tpch"},
account=WORKSPACE,
)
def json_file_to_parquet(file):
"""Convert raw JSON data file to Parquet."""
Expand Down Expand Up @@ -60,10 +61,11 @@ def json_to_parquet():

@task(log_prints=True)
@coiled.function(
name="compact",
local=LOCAL,
region=REGION,
vm_type="m6i.xlarge",
tags={"workflow": "etl-tpch"},
account=WORKSPACE,
)
def compact(table):
print(f"Compacting table {table}")
Expand Down
5 changes: 3 additions & 2 deletions pipeline/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
PROCESSED_DIR,
REGION,
RESULTS_DIR,
WORKSPACE,
fs,
lock_compact,
storage_options,
Expand All @@ -29,9 +30,9 @@ def unshipped_orders_by_revenue(segment):
cluster = functools.partial(
coiled.Cluster,
name="reduce",
workspace=WORKSPACE,
region=REGION,
n_workers=10,
tags={"workflow": "etl-tpch"},
n_workers=30,
shutdown_on_close=False,
idle_timeout="1 minute",
wait_for_workers=True,
Expand Down
1 change: 1 addition & 0 deletions pipeline/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
data = yaml.safe_load(f)

LOCAL = data["local"]
WORKSPACE = data["workspace"]
ROOT = Path(data["data-dir"]).resolve()
fs = fsspec.filesystem(ROOT.protocol, use_listings_cache=False)

Expand Down

0 comments on commit 8dca03a

Please sign in to comment.