Skip to content

Commit

Permalink
Update data directory structure (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbourbeau committed Feb 27, 2024
1 parent 2dd4eff commit 36bcd72
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 65 deletions.
4 changes: 2 additions & 2 deletions dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import plotly.express as px
import streamlit as st

from pipeline.settings import REDUCED_DATA_DIR
from pipeline.settings import RESULTS_DIR


@st.cache_data
def get_data(region, part_type):
return dd.read_parquet(
REDUCED_DATA_DIR / region / part_type.upper() / "*.parquet"
RESULTS_DIR / region / part_type.upper() / "*.parquet"
).compute()


Expand Down
6 changes: 6 additions & 0 deletions pipeline/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Whether to run data-processing tasks locally
# or on the cloud with Coiled.
local: true
# Output location for data files. Can be a local directory
# or a remote path like "s3://path/to/bucket".
data-dir: ./data
33 changes: 5 additions & 28 deletions pipeline/data.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
import datetime
import os

import botocore.session
import coiled
import duckdb
import psutil
from dask.distributed import print
from prefect import flow, task

from .settings import (
LOCAL,
REGION,
STAGING_JSON_DIR,
STAGING_PARQUET_DIR,
fs,
lock_generate,
)
from .settings import LOCAL, PROCESSED_DIR, REGION, STAGING_DIR, fs, lock_generate


@task(log_prints=True)
Expand All @@ -31,21 +23,6 @@ def generate(scale: float, path: os.PathLike) -> None:
with duckdb.connect() as con:
con.install_extension("tpch")
con.load_extension("tpch")

if str(path).startswith("s3://"):
session = botocore.session.Session()
creds = session.get_credentials()
con.install_extension("httpfs")
con.load_extension("httpfs")
con.sql(
f"""
SET s3_region='{REGION}';
SET s3_access_key_id='{creds.access_key}';
SET s3_secret_access_key='{creds.secret_key}';
SET s3_session_token='{creds.token}';
"""
)

con.sql(
f"""
SET memory_limit='{psutil.virtual_memory().available // 2**30 }G';
Expand All @@ -67,8 +44,8 @@ def generate(scale: float, path: os.PathLike) -> None:
)
for table in map(str, tables):
if table in static_tables and (
list((STAGING_JSON_DIR / table).rglob("*.json"))
or list((STAGING_PARQUET_DIR / table).rglob("*.parquet"))
list((STAGING_DIR / table).rglob("*.json"))
or list((PROCESSED_DIR / table).rglob("*.parquet"))
):
print(f"Static table {table} already exists")
continue
Expand Down Expand Up @@ -97,6 +74,6 @@ def generate_data():
with lock_generate:
generate(
scale=0.01,
path=STAGING_JSON_DIR,
path=STAGING_DIR,
)
generate.fn.client.restart()
generate.fn.client.restart(wait_for_workers=False)
16 changes: 8 additions & 8 deletions pipeline/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from prefect.tasks import exponential_backoff

from .settings import (
ARCHIVE_DIR,
LOCAL,
RAW_JSON_DIR,
PROCESSED_DIR,
REGION,
STAGING_JSON_DIR,
STAGING_PARQUET_DIR,
STAGING_DIR,
fs,
lock_compact,
lock_json_to_parquet,
Expand All @@ -36,7 +36,7 @@ def json_file_to_parquet(file):
"""Convert raw JSON data file to Parquet."""
print(f"Processing {file}")
df = pd.read_json(file, lines=True)
outfile = STAGING_PARQUET_DIR / file.parent.name
outfile = PROCESSED_DIR / file.parent.name
fs.makedirs(outfile.parent, exist_ok=True)
data = pa.Table.from_pandas(df, preserve_index=False)
deltalake.write_deltalake(
Expand All @@ -48,15 +48,15 @@ def json_file_to_parquet(file):

@task
def archive_json_file(file):
outfile = RAW_JSON_DIR / file.relative_to(STAGING_JSON_DIR)
outfile = ARCHIVE_DIR / file.relative_to(STAGING_DIR)
fs.makedirs(outfile.parent, exist_ok=True)
fs.mv(str(file), str(outfile))

return outfile


def list_new_json_files():
return list(STAGING_JSON_DIR.rglob("*.json"))
return list(STAGING_DIR.rglob("*.json"))


@flow(log_prints=True)
Expand Down Expand Up @@ -88,9 +88,9 @@ def compact(table):

@task
def list_tables():
if not fs.exists(STAGING_PARQUET_DIR):
if not fs.exists(PROCESSED_DIR):
return []
directories = fs.ls(STAGING_PARQUET_DIR, refresh=True)
directories = fs.ls(PROCESSED_DIR, refresh=True)
return directories


Expand Down
16 changes: 8 additions & 8 deletions pipeline/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

from .settings import (
LOCAL,
REDUCED_DATA_DIR,
PROCESSED_DIR,
REGION,
STAGING_PARQUET_DIR,
RESULTS_DIR,
fs,
lock_compact,
storage_options,
Expand Down Expand Up @@ -38,22 +38,22 @@ def save_query(region, part_type):
with cluster.get_client():
size = 15
region_ds = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "region"),
str(PROCESSED_DIR / "region"),
delta_storage_options=storage_options,
)
nation_filtered = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "nation"),
str(PROCESSED_DIR / "nation"),
delta_storage_options=storage_options,
)
supplier_filtered = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "supplier"),
str(PROCESSED_DIR / "supplier"),
delta_storage_options=storage_options,
)
part_filtered = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "part"), delta_storage_options=storage_options
str(PROCESSED_DIR / "part"), delta_storage_options=storage_options
)
partsupp_filtered = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "partsupp"),
str(PROCESSED_DIR / "partsupp"),
delta_storage_options=storage_options,
)

Expand Down Expand Up @@ -124,7 +124,7 @@ def save_query(region, part_type):
.head(100)
)

outfile = REDUCED_DATA_DIR / region / part_type / "result.snappy.parquet"
outfile = RESULTS_DIR / region / part_type / "result.snappy.parquet"
fs.makedirs(outfile.parent, exist_ok=True)
result.to_parquet(outfile, compression="snappy")

Expand Down
32 changes: 15 additions & 17 deletions pipeline/settings.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
import boto3
import fsspec
import yaml
from filelock import FileLock
from upath import UPath as Path

LOCAL = True
# LOCAL = False
with open(Path(__file__).parent / "config.yml", "rb") as f:
data = yaml.safe_load(f)

LOCAL = data["local"]
ROOT = Path(data["data-dir"]).resolve()
fs = fsspec.filesystem(ROOT.protocol, use_listings_cache=False)

if LOCAL:
ROOT = Path(__file__).parent.parent.resolve() / "data"
fs = fsspec.filesystem("local")
REGION = None
storage_options = {}
else:
# TODO: Make the cloud path nicer (e.g. s3://coiled-datasets-rp)
ROOT = Path("s3://openscapes-scratch/jrbourbeau/etl-tpch/data")
fs = fsspec.filesystem("s3", use_listings_cache=False)
# Find cloud region being used
bucket = str(ROOT).replace("s3://", "").split("/")[0]
resp = boto3.client("s3").get_bucket_location(Bucket=bucket)
REGION = resp["LocationConstraint"] or "us-east-1"
storage_options = {"AWS_REGION": REGION, "AWS_S3_ALLOW_UNSAFE_RENAME": "true"}

STAGING_JSON_DIR = ROOT / "staged" / "json"
STAGING_PARQUET_DIR = ROOT / "staged" / "parquet"
RAW_JSON_DIR = ROOT / "raw" / "json"
RAW_PARQUET_DIR = ROOT / "raw" / "parquet"
PROCESSED_DATA_DIR = ROOT / "processed"
REDUCED_DATA_DIR = ROOT / "reduced"
STAGING_DIR = ROOT / "staging" # Input JSON files
PROCESSED_DIR = ROOT / "processed" # Processed Parquet files
RESULTS_DIR = ROOT / "results" # Reduced/aggrgated results
ARCHIVE_DIR = ROOT / "archive" # Archived JSON files
MODEL_FILE = ROOT.parent / "model.json"
MODEL_SERVER_FILE = ROOT.parent / "serve_model.py"

lock_generate = FileLock("generate.lock")
lock_json_to_parquet = FileLock("json_to_parquet.lock")
lock_compact = FileLock("compact.lock")
lock_dir = Path(__file__).parent.parent / ".locks"
lock_generate = FileLock(lock_dir / "generate.lock")
lock_json_to_parquet = FileLock(lock_dir / "json.lock")
lock_compact = FileLock(lock_dir / "compact.lock")
4 changes: 2 additions & 2 deletions pipeline/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import xgboost as xgb
from prefect import flow, task

from .settings import LOCAL, MODEL_FILE, REDUCED_DATA_DIR, REGION, Path, fs
from .settings import LOCAL, MODEL_FILE, REGION, RESULTS_DIR, Path, fs


@task
Expand Down Expand Up @@ -37,7 +37,7 @@ def train(file):


def list_training_data_files():
data_dir = REDUCED_DATA_DIR / "europe" / "brass"
data_dir = RESULTS_DIR / "europe" / "brass"
return list(data_dir.rglob("*.parquet"))


Expand Down

0 comments on commit 36bcd72

Please sign in to comment.