Skip to content

Commit

Permalink
Introduce Deltalake (#11)
Browse files Browse the repository at this point in the history
Co-authored-by: James Bourbeau <[email protected]>
  • Loading branch information
mrocklin and jrbourbeau committed Feb 15, 2024
1 parent d3d406a commit 16e753e
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 99 deletions.
13 changes: 6 additions & 7 deletions pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@
from prefect import serve

from pipeline.monitor import check_model_endpoint
from pipeline.preprocess import json_to_parquet
from pipeline.preprocess import compact_tables, json_to_parquet
from pipeline.reduce import query_reduce
from pipeline.resize import resize_parquet
from pipeline.train import update_model

if __name__ == "__main__":
preprocess = json_to_parquet.to_deployment(
name="preprocess",
interval=timedelta(seconds=30),
interval=timedelta(seconds=60),
)
resize = resize_parquet.to_deployment(
name="resize",
interval=timedelta(seconds=30),
compact = compact_tables.to_deployment(
name="compact",
interval=timedelta(minutes=5),
)
reduce = query_reduce.to_deployment(
name="reduce",
Expand All @@ -32,7 +31,7 @@

serve(
preprocess,
resize,
compact,
reduce,
train,
monitor,
Expand Down
74 changes: 57 additions & 17 deletions pipeline/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import coiled
import dask
import deltalake
import pandas as pd
from filelock import FileLock
import pyarrow as pa
from prefect import flow, task
from prefect.tasks import exponential_backoff

from .settings import (
LOCAL,
Expand All @@ -10,44 +13,81 @@
STAGING_JSON_DIR,
STAGING_PARQUET_DIR,
fs,
storage_options,
)

# TODO: Couldn't figure out how to limit concurrent flow runs
# in Prefect, so am using a file lock...
lock = FileLock("preprocess.lock")
dask.config.set({"coiled.use_aws_creds_endpoint": False})


@task(log_prints=True)
@task(
log_prints=True,
retries=10,
retry_delay_seconds=exponential_backoff(10),
retry_jitter_factor=1,
)
@coiled.function(
local=LOCAL,
region=REGION,
keepalive="10 minutes",
tags={"workflow": "etl-tpch"},
)
def convert_to_parquet(file):
def json_file_to_parquet(file):
"""Convert raw JSON data file to Parquet."""
print(f"Processing {file}")
df = pd.read_json(file, lines=True, engine="pyarrow")
outfile = STAGING_PARQUET_DIR / file.relative_to(STAGING_JSON_DIR).with_suffix(
".snappy.parquet"
)
df = pd.read_json(file, lines=True)
outfile = STAGING_PARQUET_DIR / file.parent.name
fs.makedirs(outfile.parent, exist_ok=True)
df.to_parquet(outfile, compression="snappy")
data = pa.Table.from_pandas(df, preserve_index=False)
deltalake.write_deltalake(
outfile, data, mode="append", storage_options=storage_options
)
print(f"Saved {outfile}")
return outfile
return file


@task
def archive_json_file(file):
outfile = RAW_JSON_DIR / file.relative_to(STAGING_JSON_DIR)
fs.makedirs(outfile.parent, exist_ok=True)
# Need str(...), otherwise, `TypeError: 'S3Path' object is not iterable`
fs.mv(str(file), str(outfile))
print(f"Archived {str(outfile)}")

return outfile


def list_new_json_files():
return list(STAGING_JSON_DIR.rglob("*.json"))


@flow(log_prints=True)
def json_to_parquet():
with lock:
files = list(STAGING_JSON_DIR.rglob("*.json"))
parquet_files = convert_to_parquet.map(files)
archive_json_file.map(files, wait_for=parquet_files)
files = list_new_json_files()
files = json_file_to_parquet.map(files)
archive_json_file.map(files)


@task(log_prints=True)
@coiled.function(
local=LOCAL,
region=REGION,
keepalive="10 minutes",
tags={"workflow": "etl-tpch"},
)
def compact(table):
print(f"Compacting table {table}")
table = table if LOCAL else f"s3://{table}"
t = deltalake.DeltaTable(table, storage_options=storage_options)
t.optimize.compact()
t.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False)


@task
def list_tables():
directories = fs.ls(STAGING_PARQUET_DIR)
return directories


@flow(log_prints=True)
def compact_tables():
tables = list_tables()
compact.map(tables)
168 changes: 93 additions & 75 deletions pipeline/reduce.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,117 @@
import functools
import itertools
import uuid

import coiled
import dask_expr as dd
import dask
import dask_deltatable
from dask.distributed import LocalCluster
from prefect import flow, task

from .settings import LOCAL, PROCESSED_DATA_DIR, REDUCED_DATA_DIR, REGION, fs
from .settings import LOCAL, REDUCED_DATA_DIR, REGION, STAGING_PARQUET_DIR, fs

dask.config.set({"coiled.use_aws_creds_endpoint": False})


@task
def save_query(region, part_type):

if LOCAL:
cluster = LocalCluster()
cluster = LocalCluster
else:
cluster = coiled.Cluster(
region=REGION,
tags={"workflow": "etl-tpch"},
cluster = functools.partial(
coiled.Cluster, region=REGION, tags={"workflow": "etl-tpch"}
)

client = cluster.get_client() # noqa: F841
size = 15
region_ds = dd.read_parquet(PROCESSED_DATA_DIR / "region")
nation_filtered = dd.read_parquet(PROCESSED_DATA_DIR / "nation")
supplier_filtered = dd.read_parquet(PROCESSED_DATA_DIR / "supplier")
part_filtered = dd.read_parquet(PROCESSED_DATA_DIR / "part")
partsupp_filtered = dd.read_parquet(PROCESSED_DATA_DIR / "partsupp")

region_filtered = region_ds[(region_ds["r_name"] == region.upper())]
r_n_merged = nation_filtered.merge(
region_filtered, left_on="n_regionkey", right_on="r_regionkey", how="inner"
)
s_r_n_merged = r_n_merged.merge(
supplier_filtered,
left_on="n_nationkey",
right_on="s_nationkey",
how="inner",
)
ps_s_r_n_merged = s_r_n_merged.merge(
partsupp_filtered, left_on="s_suppkey", right_on="ps_suppkey", how="inner"
)
part_filtered = part_filtered[
(part_filtered["p_size"] == size)
& (part_filtered["p_type"].astype(str).str.endswith(part_type.upper()))
]
merged_df = part_filtered.merge(
ps_s_r_n_merged, left_on="p_partkey", right_on="ps_partkey", how="inner"
)
min_values = merged_df.groupby("p_partkey")["ps_supplycost"].min().reset_index()
min_values.columns = ["P_PARTKEY_CPY", "MIN_SUPPLYCOST"]
merged_df = merged_df.merge(
min_values,
left_on=["p_partkey", "ps_supplycost"],
right_on=["P_PARTKEY_CPY", "MIN_SUPPLYCOST"],
how="inner",
)
with cluster() as cluster:
with cluster.get_client():
size = 15
region_ds = dask_deltatable.read_deltalake(STAGING_PARQUET_DIR / "region")
nation_filtered = dask_deltatable.read_deltalake(
STAGING_PARQUET_DIR / "nation"
)
supplier_filtered = dask_deltatable.read_deltalake(
STAGING_PARQUET_DIR / "supplier"
)
part_filtered = dask_deltatable.read_deltalake(STAGING_PARQUET_DIR / "part")
partsupp_filtered = dask_deltatable.read_deltalake(
STAGING_PARQUET_DIR / "partsupp"
)

result = (
merged_df[
[
"s_acctbal",
"s_name",
"n_name",
"p_partkey",
"p_mfgr",
"s_address",
"s_phone",
"s_comment",
region_filtered = region_ds[(region_ds["r_name"] == region.upper())]
r_n_merged = nation_filtered.merge(
region_filtered,
left_on="n_regionkey",
right_on="r_regionkey",
how="inner",
)
s_r_n_merged = r_n_merged.merge(
supplier_filtered,
left_on="n_nationkey",
right_on="s_nationkey",
how="inner",
)
ps_s_r_n_merged = s_r_n_merged.merge(
partsupp_filtered,
left_on="s_suppkey",
right_on="ps_suppkey",
how="inner",
)
part_filtered = part_filtered[
(part_filtered["p_size"] == size)
& (part_filtered["p_type"].astype(str).str.endswith(part_type.upper()))
]
]
.sort_values(
by=[
"s_acctbal",
# "n_name",
# "s_name",
# "p_partkey",
],
# ascending=[
# False,
# True,
# True,
# True,
# ],
)
.head(100, compute=False)
)
merged_df = part_filtered.merge(
ps_s_r_n_merged, left_on="p_partkey", right_on="ps_partkey", how="inner"
)
min_values = (
merged_df.groupby("p_partkey")["ps_supplycost"].min().reset_index()
)
min_values.columns = ["P_PARTKEY_CPY", "MIN_SUPPLYCOST"]
merged_df = merged_df.merge(
min_values,
left_on=["p_partkey", "ps_supplycost"],
right_on=["P_PARTKEY_CPY", "MIN_SUPPLYCOST"],
how="inner",
)

result = (
merged_df[
[
"s_acctbal",
"s_name",
"n_name",
"p_partkey",
"p_mfgr",
"s_address",
"s_phone",
"s_comment",
]
]
.sort_values(
by=[
"s_acctbal",
# "n_name",
# "s_name",
# "p_partkey",
],
# ascending=[
# False,
# True,
# True,
# True,
# ],
)
.head(100, compute=False)
)

outdir = REDUCED_DATA_DIR / region / part_type
fs.makedirs(outdir, exist_ok=True)
outdir = REDUCED_DATA_DIR / region / part_type
fs.makedirs(outdir, exist_ok=True)

def name(_):
return f"{uuid.uuid4()}.snappy.parquet"
def name(_):
return f"{uuid.uuid4()}.snappy.parquet"

result.to_parquet(outdir, compression="snappy", name_function=name)
result.to_parquet(outdir, compression="snappy", name_function=name)


@flow
Expand Down
2 changes: 2 additions & 0 deletions pipeline/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
ROOT = Path(__file__).parent.parent.resolve() / "data"
fs = fsspec.filesystem("local")
REGION = None
storage_options = {}
else:
# TODO: Make the cloud path nicer (e.g. s3://coiled-datasets-rp)
ROOT = Path("s3://oss-scratch-space/jrbourbeau/etl-tpch/data")
Expand All @@ -17,6 +18,7 @@
bucket = str(ROOT).replace("s3://", "").split("/")[0]
resp = boto3.client("s3").get_bucket_location(Bucket=bucket)
REGION = resp["LocationConstraint"] or "us-east-1"
storage_options = {"AWS_REGION": REGION, "AWS_S3_ALLOW_UNSAFE_RENAME": "true"}

STAGING_JSON_DIR = ROOT / "staged" / "json"
STAGING_PARQUET_DIR = ROOT / "staged" / "parquet"
Expand Down

0 comments on commit 16e753e

Please sign in to comment.