diff --git a/pipeline/config.yml b/pipeline/config.yml index b996b07..0119a35 100644 --- a/pipeline/config.yml +++ b/pipeline/config.yml @@ -1,6 +1,9 @@ +# Output location for data files. Can be a local directory +# or a remote path like "s3://path/to/bucket". +data-dir: ./data-test # Whether to run data-processing tasks locally # or on the cloud with Coiled. local: true -# Output location for data files. Can be a local directory -# or a remote path like "s3://path/to/bucket". -data-dir: ./data +# If using Coiled (`local: false`), use this Coiled workspace. +# Defaults to your default workspace. +workspace: null diff --git a/pipeline/dashboard.py b/pipeline/dashboard.py index 6d72c3b..8048d9c 100644 --- a/pipeline/dashboard.py +++ b/pipeline/dashboard.py @@ -7,11 +7,11 @@ from prefect import flow from rich import print -from .settings import DASHBOARD_FILE, LOCAL, REGION +from .settings import DASHBOARD_FILE, LOCAL, REGION, WORKSPACE port = 8080 -name = "etl-tpch-dashboard" -subdomain = "etl-tpch" +name = "dashboard" +subdomain = "dashboard" def deploy(): @@ -22,6 +22,7 @@ def deploy(): else: cmd = f""" coiled run \ + --workspace {WORKSPACE} \ --region {REGION} \ --vm-type t3.medium \ -f dashboard.py \ @@ -31,7 +32,6 @@ def deploy(): -e AWS_ACCESS_KEY_ID={os.environ['AWS_ACCESS_KEY_ID']} \ -e AWS_SECRET_ACCESS_KEY={os.environ['AWS_SECRET_ACCESS_KEY']} \ --detach \ - --keepalive '520 weeks' \ --name {name} \ -- \ {cmd} diff --git a/pipeline/data.py b/pipeline/data.py index 93f1278..d2abbc9 100644 --- a/pipeline/data.py +++ b/pipeline/data.py @@ -4,16 +4,24 @@ import coiled import duckdb +import numpy as np import pandas as pd import psutil from dask.distributed import print from prefect import flow, task -from .settings import LOCAL, PROCESSED_DIR, REGION, STAGING_DIR, fs, lock_generate +from .settings import ( + LOCAL, + PROCESSED_DIR, + REGION, + STAGING_DIR, + WORKSPACE, + fs, + lock_generate, +) def new_time(t, t_start=None, t_end=None): - d = pd.Timestamp("1998-12-31") - pd.Timestamp("1992-01-01") return t_start + (t - pd.Timestamp("1992-01-01")) * ((t_end - t_start) / d) @@ -24,7 +32,7 @@ def new_time(t, t_start=None, t_end=None): local=LOCAL, region=REGION, vm_type="m6i.2xlarge", - tags={"workflow": "etl-tpch"}, + account=WORKSPACE, ) def generate(scale: float, path: os.PathLike) -> None: static_tables = ["customer", "nation", "part", "partsupp", "region", "supplier"] @@ -84,12 +92,15 @@ def generate(scale: float, path: os.PathLike) -> None: .rename(columns={"o_orderkey_new": "l_orderkey"}) ) - # Shift times to be more recent + # Shift times to be more recent and lineitem prices to be non-uniform if table == "lineitem": df["l_shipdate"] = new_time( - df["l_shipdate"], t_start=now, t_end=now + pd.Timedelta("7 days") + df["l_shipdate"], t_start=now, t_end=now + pd.Timedelta("3 days") ) df = df.rename(columns={"l_shipdate": "l_ship_time"}) + df["l_extendedprice"] = ( + np.random.rand(df.shape[0]) * df["l_extendedprice"] + ) cols = [c for c in df.columns if "date" in c] df[cols] = new_time( df[cols], t_start=now - pd.Timedelta("15 minutes"), t_end=now diff --git a/pipeline/preprocess.py b/pipeline/preprocess.py index 0ed1e57..ab5f5e0 100644 --- a/pipeline/preprocess.py +++ b/pipeline/preprocess.py @@ -11,6 +11,7 @@ PROCESSED_DIR, REGION, STAGING_DIR, + WORKSPACE, fs, lock_compact, lock_json_to_parquet, @@ -25,11 +26,11 @@ retry_jitter_factor=1, ) @coiled.function( - name="data-etl", + name="json-to-parquet", local=LOCAL, region=REGION, vm_type="m6i.2xlarge", - tags={"workflow": "etl-tpch"}, + account=WORKSPACE, ) def json_file_to_parquet(file): """Convert raw JSON data file to Parquet.""" @@ -60,10 +61,11 @@ def json_to_parquet(): @task(log_prints=True) @coiled.function( + name="compact", local=LOCAL, region=REGION, vm_type="m6i.xlarge", - tags={"workflow": "etl-tpch"}, + account=WORKSPACE, ) def compact(table): print(f"Compacting table {table}") diff --git a/pipeline/reduce.py b/pipeline/reduce.py index 4cf5fe0..340539e 100644 --- a/pipeline/reduce.py +++ b/pipeline/reduce.py @@ -12,6 +12,7 @@ PROCESSED_DIR, REGION, RESULTS_DIR, + WORKSPACE, fs, lock_compact, storage_options, @@ -29,9 +30,9 @@ def unshipped_orders_by_revenue(segment): cluster = functools.partial( coiled.Cluster, name="reduce", + workspace=WORKSPACE, region=REGION, - n_workers=10, - tags={"workflow": "etl-tpch"}, + n_workers=30, shutdown_on_close=False, idle_timeout="1 minute", wait_for_workers=True, diff --git a/pipeline/settings.py b/pipeline/settings.py index e9dced8..2a30f58 100644 --- a/pipeline/settings.py +++ b/pipeline/settings.py @@ -10,6 +10,7 @@ data = yaml.safe_load(f) LOCAL = data["local"] +WORKSPACE = data["workspace"] ROOT = Path(data["data-dir"]).resolve() fs = fsspec.filesystem(ROOT.protocol, use_listings_cache=False)