Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbourbeau committed Feb 16, 2024
1 parent 1d1f516 commit 13a2270
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 82 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Then run each command below in separate terminal windows:
python serve_model.py # Serve ML model
```
```bash
python pipeline.py # Run ETL pipeline
python workflow.py # Run ETL pipeline
```
```bash
streamlit run dashboard.py # Serve dashboard
Expand Down
Empty file removed pipeline/__init__.py
Empty file.
51 changes: 11 additions & 40 deletions pipeline/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@
import coiled
import duckdb
import psutil
import pyarrow.compute as pc
from dask.distributed import print
from prefect import flow, task

from pipeline.settings import LOCAL, REGION, STAGING_JSON_DIR, fs
from .settings import LOCAL, REGION, STAGING_JSON_DIR, STAGING_PARQUET_DIR, fs


@task(log_prints=True)
@coiled.function(
name="data-generation",
name="data-etl",
local=LOCAL,
region=REGION,
keepalive="5 minutes",
tags={"workflow": "etl-tpch"},
)
def generate(scale: float, path: os.PathLike) -> None:
static_tables = ["customer", "nation", "part", "partsupp", "region", "supplier"]
with duckdb.connect() as con:
con.install_extension("tpch")
con.load_extension("tpch")
Expand Down Expand Up @@ -52,16 +53,18 @@ def generate(scale: float, path: os.PathLike) -> None:
con.sql(query)
print("Finished generating data, exporting...")

print("Converting types date -> timestamp_s and decimal -> double")
_alter_tables(con)
print("Done altering tables")

tables = (
con.sql("select * from information_schema.tables")
.arrow()
.column("table_name")
)
for table in map(str, tables):
if table in static_tables and (
list((STAGING_JSON_DIR / table).rglob("*.json"))
or list((STAGING_PARQUET_DIR / table).rglob("*.parquet"))
):
print(f"Static table {table} already exists")
continue
print(f"Exporting table: {table}")
stmt = f"""select * from {table}"""
df = con.sql(stmt).arrow()
Expand All @@ -82,41 +85,9 @@ def generate(scale: float, path: os.PathLike) -> None:
print("Finished exporting all data")


def _alter_tables(con):
"""
Temporary, used for debugging performance in data types.
ref discussion here: https://github.com/coiled/benchmarks/pull/1131
"""
tables = [
"nation",
"region",
"customer",
"supplier",
"lineitem",
"orders",
"partsupp",
"part",
]
for table in tables:
schema = con.sql(f"describe {table}").arrow()

# alter decimals to floats
for column in schema.filter(
pc.match_like(pc.field("column_type"), "DECIMAL%")
).column("column_name"):
con.sql(f"alter table {table} alter {column} type double")

# alter date to timestamp_s
for column in schema.filter(pc.field("column_type") == "DATE").column(
"column_name"
):
con.sql(f"alter table {table} alter {column} type timestamp_s")


@flow
def generate_data():
generate(
scale=0.02,
scale=0.01,
path=STAGING_JSON_DIR,
)
20 changes: 9 additions & 11 deletions pipeline/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import coiled
import dask
import deltalake
import pandas as pd
import pyarrow as pa
from dask.distributed import print
from prefect import flow, task
from prefect.concurrency.sync import concurrency
from prefect.tasks import exponential_backoff

from .settings import (
Expand All @@ -15,11 +13,11 @@
STAGING_JSON_DIR,
STAGING_PARQUET_DIR,
fs,
lock_compact,
lock_json_to_parquet,
storage_options,
)

dask.config.set({"coiled.use_aws_creds_endpoint": False})


@task(
log_prints=True,
Expand All @@ -28,10 +26,10 @@
retry_jitter_factor=1,
)
@coiled.function(
name="preprocessing",
name="data-etl",
local=LOCAL,
region=REGION,
keepalive="10 minutes",
keepalive="5 minutes",
tags={"workflow": "etl-tpch"},
)
def json_file_to_parquet(file):
Expand Down Expand Up @@ -63,7 +61,7 @@ def list_new_json_files():

@flow(log_prints=True)
def json_to_parquet():
with concurrency("json_to_parquet", occupy=1):
with lock_json_to_parquet:
files = list_new_json_files()
files = json_file_to_parquet.map(files)
futures = archive_json_file.map(files)
Expand All @@ -73,18 +71,18 @@ def json_to_parquet():

@task(log_prints=True)
@coiled.function(
name="preprocessing",
name="data-etl",
local=LOCAL,
region=REGION,
keepalive="10 minutes",
keepalive="5 minutes",
tags={"workflow": "etl-tpch"},
)
def compact(table):
print(f"Compacting table {table}")
table = table if LOCAL else f"s3://{table}"
t = deltalake.DeltaTable(table, storage_options=storage_options)
t.optimize.compact()
# t.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False)
t.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False)
return table


Expand All @@ -98,7 +96,7 @@ def list_tables():

@flow(log_prints=True)
def compact_tables():
with concurrency("compact", occupy=1):
with lock_compact:
tables = list_tables()
futures = compact.map(tables)
for f in futures:
Expand Down
28 changes: 11 additions & 17 deletions pipeline/reduce.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import functools
import itertools
import uuid

import coiled
import dask
import dask_deltatable
from dask.distributed import LocalCluster
from prefect import flow, task
Expand All @@ -14,11 +12,10 @@
REGION,
STAGING_PARQUET_DIR,
fs,
lock_compact,
storage_options,
)

dask.config.set({"coiled.use_aws_creds_endpoint": False})


@task
def save_query(region, part_type):
Expand All @@ -33,7 +30,7 @@ def save_query(region, part_type):
n_workers=20,
tags={"workflow": "etl-tpch"},
shutdown_on_close=False,
idle_timeout="3 minutes",
idle_timeout="5 minutes",
wait_for_workers=True,
)

Expand Down Expand Up @@ -124,21 +121,18 @@ def save_query(region, part_type):
True,
],
)
.head(100, compute=False)
.head(100)
)

outdir = REDUCED_DATA_DIR / region / part_type
fs.makedirs(outdir, exist_ok=True)

def name(_):
return f"{uuid.uuid4()}.snappy.parquet"

result.to_parquet(outdir, compression="snappy", name_function=name)
outfile = REDUCED_DATA_DIR / region / part_type / "result.snappy.parquet"
fs.makedirs(outfile.parent, exist_ok=True)
result.to_parquet(outfile, compression="snappy")


@flow
def query_reduce():
regions = ["europe", "africa", "america", "asia", "middle east"]
part_types = ["copper", "brass", "tin", "nickel", "steel"]
for region, part_type in itertools.product(regions, part_types):
save_query(region, part_type)
with lock_compact:
regions = ["europe", "africa", "america", "asia", "middle east"]
part_types = ["copper", "brass", "tin", "nickel", "steel"]
for region, part_type in itertools.product(regions, part_types):
save_query(region, part_type)
7 changes: 5 additions & 2 deletions pipeline/settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import boto3
import fsspec
from filelock import FileLock
from upath import UPath as Path

LOCAL = True
Expand All @@ -12,8 +13,7 @@
storage_options = {}
else:
# TODO: Make the cloud path nicer (e.g. s3://coiled-datasets-rp)
ROOT = Path("s3://openscapes-scratch/jrbourbeau/etl-tpch/data-test")
# ROOT = Path("s3://oss-scratch-space/jrbourbeau/etl-tpch/data-test")
ROOT = Path("s3://openscapes-scratch/jrbourbeau/etl-tpch/data")
fs = fsspec.filesystem("s3", use_listings_cache=False)
# Find cloud region being used
bucket = str(ROOT).replace("s3://", "").split("/")[0]
Expand All @@ -29,3 +29,6 @@
REDUCED_DATA_DIR = ROOT / "reduced"
MODEL_FILE = ROOT.parent / "model.json"
MODEL_SERVER_FILE = ROOT.parent / "serve_model.py"

lock_json_to_parquet = FileLock("json_to_parquet.lock")
lock_compact = FileLock("compact.lock")
9 changes: 5 additions & 4 deletions pipeline/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@

@task
@coiled.function(
name="train",
local=LOCAL,
region=REGION,
tags={"workflow": "etl-tpch"},
)
def train(files):
df = pd.read_parquet(files)
def train(file):
df = pd.read_parquet(file)
X = df[["p_partkey", "s_acctbal"]]
y = df["n_name"].map(
{"FRANCE": 0, "UNITED KINGDOM": 1, "RUSSIA": 2, "GERMANY": 3, "ROMANIA": 4}
{"GERMANY": 0, "ROMANIA": 1, "RUSSIA": 2, "FRANCE": 3, "UNITED KINGDOM": 4}
)
model = xgb.XGBClassifier()
if MODEL_FILE.exists():
Expand All @@ -45,5 +46,5 @@ def update_model():
if not files:
print("No training data available")
return
train(files)
train(files[0])
print(f"Updated model at {MODEL_FILE}")
14 changes: 7 additions & 7 deletions pipeline.py → workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# from datetime import timedelta
from datetime import timedelta

from prefect import serve

Expand All @@ -11,27 +11,27 @@
if __name__ == "__main__":
data = generate_data.to_deployment(
name="generate_data",
# interval=timedelta(seconds=30),
interval=timedelta(seconds=30),
)
preprocess = json_to_parquet.to_deployment(
name="preprocess",
# interval=timedelta(minutes=1),
interval=timedelta(minutes=1),
)
compact = compact_tables.to_deployment(
name="compact",
# interval=timedelta(minutes=5),
interval=timedelta(minutes=30),
)
reduce = query_reduce.to_deployment(
name="reduce",
# interval=timedelta(minutes=30),
interval=timedelta(hours=1),
)
train = update_model.to_deployment(
name="train",
# interval=timedelta(minutes=45),
interval=timedelta(hours=6),
)
monitor = check_model_endpoint.to_deployment(
name="monitor",
# interval=timedelta(seconds=30),
interval=timedelta(minutes=1),
)

serve(
Expand Down

0 comments on commit 13a2270

Please sign in to comment.