Skip to content

Commit

Permalink
Updates from long-running tests (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbourbeau committed Feb 19, 2024
1 parent ff013f1 commit bcfbb4d
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 128 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Then run each command below in separate terminal windows:
python serve_model.py # Serve ML model
```
```bash
python pipeline.py # Run ETL pipeline
python workflow.py # Run ETL pipeline
```
```bash
streamlit run dashboard.py # Serve dashboard
Expand Down
4 changes: 3 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ dependencies:
- filelock
- streamlit
- s3fs
- universal_pathlib
- universal_pathlib <0.2.0
- boto3
# - awscli
- dask-deltatable
Empty file removed pipeline/__init__.py
Empty file.
29 changes: 24 additions & 5 deletions pipeline/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,29 @@
import coiled
import duckdb
import psutil
from dask.distributed import print
from prefect import flow, task

from pipeline.settings import LOCAL, REGION, STAGING_JSON_DIR, fs
from .settings import (
LOCAL,
REGION,
STAGING_JSON_DIR,
STAGING_PARQUET_DIR,
fs,
lock_generate,
)


@task(log_prints=True)
@coiled.function(
name="data-generation",
local=LOCAL,
region=REGION,
keepalive="5 minutes",
tags={"workflow": "etl-tpch"},
)
def generate(scale: float, path: os.PathLike) -> None:
static_tables = ["customer", "nation", "part", "partsupp", "region", "supplier"]
with duckdb.connect() as con:
con.install_extension("tpch")
con.load_extension("tpch")
Expand Down Expand Up @@ -55,6 +66,12 @@ def generate(scale: float, path: os.PathLike) -> None:
.column("table_name")
)
for table in map(str, tables):
if table in static_tables and (
list((STAGING_JSON_DIR / table).rglob("*.json"))
or list((STAGING_PARQUET_DIR / table).rglob("*.parquet"))
):
print(f"Static table {table} already exists")
continue
print(f"Exporting table: {table}")
stmt = f"""select * from {table}"""
df = con.sql(stmt).arrow()
Expand All @@ -77,7 +94,9 @@ def generate(scale: float, path: os.PathLike) -> None:

@flow
def generate_data():
generate(
scale=0.01,
path=STAGING_JSON_DIR,
)
with lock_generate:
generate(
scale=0.01,
path=STAGING_JSON_DIR,
)
generate.fn.client.restart()
34 changes: 22 additions & 12 deletions pipeline/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import coiled
import dask
import deltalake
import pandas as pd
import pyarrow as pa
from dask.distributed import print
from prefect import flow, task
from prefect.tasks import exponential_backoff

Expand All @@ -13,11 +13,11 @@
STAGING_JSON_DIR,
STAGING_PARQUET_DIR,
fs,
lock_compact,
lock_json_to_parquet,
storage_options,
)

dask.config.set({"coiled.use_aws_creds_endpoint": False})


@task(
log_prints=True,
Expand All @@ -26,9 +26,10 @@
retry_jitter_factor=1,
)
@coiled.function(
name="data-etl",
local=LOCAL,
region=REGION,
keepalive="10 minutes",
keepalive="5 minutes",
tags={"workflow": "etl-tpch"},
)
def json_file_to_parquet(file):
Expand All @@ -50,7 +51,6 @@ def archive_json_file(file):
outfile = RAW_JSON_DIR / file.relative_to(STAGING_JSON_DIR)
fs.makedirs(outfile.parent, exist_ok=True)
fs.mv(str(file), str(outfile))
print(f"Archived {str(outfile)}")

return outfile

Expand All @@ -61,16 +61,20 @@ def list_new_json_files():

@flow(log_prints=True)
def json_to_parquet():
files = list_new_json_files()
files = json_file_to_parquet.map(files)
archive_json_file.map(files)
with lock_json_to_parquet:
files = list_new_json_files()
files = json_file_to_parquet.map(files)
futures = archive_json_file.map(files)
for f in futures:
print(f"Archived {str(f.result())}")


@task(log_prints=True)
@coiled.function(
name="data-etl",
local=LOCAL,
region=REGION,
keepalive="10 minutes",
keepalive="5 minutes",
tags={"workflow": "etl-tpch"},
)
def compact(table):
Expand All @@ -79,15 +83,21 @@ def compact(table):
t = deltalake.DeltaTable(table, storage_options=storage_options)
t.optimize.compact()
t.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False)
return table


@task
def list_tables():
directories = fs.ls(STAGING_PARQUET_DIR)
if not fs.exists(STAGING_PARQUET_DIR):
return []
directories = fs.ls(STAGING_PARQUET_DIR, refresh=True)
return directories


@flow(log_prints=True)
def compact_tables():
tables = list_tables()
compact.map(tables)
with lock_compact:
tables = list_tables()
futures = compact.map(tables)
for f in futures:
print(f"Finished compacting {f.result()} table")
80 changes: 48 additions & 32 deletions pipeline/reduce.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import functools
import itertools
import uuid

import coiled
import dask
import dask_deltatable
from dask.distributed import LocalCluster
from prefect import flow, task

from .settings import LOCAL, REDUCED_DATA_DIR, REGION, STAGING_PARQUET_DIR, fs

dask.config.set({"coiled.use_aws_creds_endpoint": False})
from .settings import (
LOCAL,
REDUCED_DATA_DIR,
REGION,
STAGING_PARQUET_DIR,
fs,
lock_compact,
storage_options,
)


@task
Expand All @@ -20,22 +24,37 @@ def save_query(region, part_type):
cluster = LocalCluster
else:
cluster = functools.partial(
coiled.Cluster, region=REGION, tags={"workflow": "etl-tpch"}
coiled.Cluster,
name="reduce",
region=REGION,
n_workers=10,
tags={"workflow": "etl-tpch"},
shutdown_on_close=False,
idle_timeout="5 minutes",
wait_for_workers=True,
)

with cluster() as cluster:
with cluster.get_client():
size = 15
region_ds = dask_deltatable.read_deltalake(STAGING_PARQUET_DIR / "region")
region_ds = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "region"),
delta_storage_options=storage_options,
)
nation_filtered = dask_deltatable.read_deltalake(
STAGING_PARQUET_DIR / "nation"
str(STAGING_PARQUET_DIR / "nation"),
delta_storage_options=storage_options,
)
supplier_filtered = dask_deltatable.read_deltalake(
STAGING_PARQUET_DIR / "supplier"
str(STAGING_PARQUET_DIR / "supplier"),
delta_storage_options=storage_options,
)
part_filtered = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "part"), delta_storage_options=storage_options
)
part_filtered = dask_deltatable.read_deltalake(STAGING_PARQUET_DIR / "part")
partsupp_filtered = dask_deltatable.read_deltalake(
STAGING_PARQUET_DIR / "partsupp"
str(STAGING_PARQUET_DIR / "partsupp"),
delta_storage_options=storage_options,
)

region_filtered = region_ds[(region_ds["r_name"] == region.upper())]
Expand Down Expand Up @@ -91,32 +110,29 @@ def save_query(region, part_type):
.sort_values(
by=[
"s_acctbal",
# "n_name",
# "s_name",
# "p_partkey",
"n_name",
"s_name",
"p_partkey",
],
ascending=[
False,
True,
True,
True,
],
# ascending=[
# False,
# True,
# True,
# True,
# ],
)
.head(100, compute=False)
.head(100)
)

outdir = REDUCED_DATA_DIR / region / part_type
fs.makedirs(outdir, exist_ok=True)

def name(_):
return f"{uuid.uuid4()}.snappy.parquet"

result.to_parquet(outdir, compression="snappy", name_function=name)
outfile = REDUCED_DATA_DIR / region / part_type / "result.snappy.parquet"
fs.makedirs(outfile.parent, exist_ok=True)
result.to_parquet(outfile, compression="snappy")


@flow
def query_reduce():
regions = ["europe", "africa", "america", "asia", "middle east"]
part_types = ["copper", "brass", "tin", "nickel", "steel"]
for region, part_type in itertools.product(regions, part_types):
save_query(region, part_type)
with lock_compact:
regions = ["europe", "africa", "america", "asia", "middle east"]
part_types = ["copper", "brass", "tin", "nickel", "steel"]
for region, part_type in itertools.product(regions, part_types):
save_query(region, part_type)
63 changes: 0 additions & 63 deletions pipeline/resize.py

This file was deleted.

9 changes: 7 additions & 2 deletions pipeline/settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import boto3
import fsspec
from filelock import FileLock
from upath import UPath as Path

LOCAL = True
Expand All @@ -12,8 +13,8 @@
storage_options = {}
else:
# TODO: Make the cloud path nicer (e.g. s3://coiled-datasets-rp)
ROOT = Path("s3://oss-scratch-space/jrbourbeau/etl-tpch/data")
fs = fsspec.filesystem("s3")
ROOT = Path("s3://openscapes-scratch/jrbourbeau/etl-tpch/data")
fs = fsspec.filesystem("s3", use_listings_cache=False)
# Find cloud region being used
bucket = str(ROOT).replace("s3://", "").split("/")[0]
resp = boto3.client("s3").get_bucket_location(Bucket=bucket)
Expand All @@ -28,3 +29,7 @@
REDUCED_DATA_DIR = ROOT / "reduced"
MODEL_FILE = ROOT.parent / "model.json"
MODEL_SERVER_FILE = ROOT.parent / "serve_model.py"

lock_generate = FileLock("generate.lock")
lock_json_to_parquet = FileLock("json_to_parquet.lock")
lock_compact = FileLock("compact.lock")
Loading

0 comments on commit bcfbb4d

Please sign in to comment.