Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update data directory structure #22

Merged
merged 1 commit into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import plotly.express as px
import streamlit as st

from pipeline.settings import REDUCED_DATA_DIR
from pipeline.settings import RESULTS_DIR


@st.cache_data
def get_data(region, part_type):
return dd.read_parquet(
REDUCED_DATA_DIR / region / part_type.upper() / "*.parquet"
RESULTS_DIR / region / part_type.upper() / "*.parquet"
).compute()


Expand Down
6 changes: 6 additions & 0 deletions pipeline/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Whether to run data-processing tasks locally
# or on the cloud with Coiled.
local: true
# Output location for data files. Can be a local directory
# or a remote path like "s3://path/to/bucket".
data-dir: ./data
Comment on lines +1 to +6
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this configuration out into a standalone file (related to, but doesn't close, #2)

33 changes: 5 additions & 28 deletions pipeline/data.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
import datetime
import os

import botocore.session
import coiled
import duckdb
import psutil
from dask.distributed import print
from prefect import flow, task

from .settings import (
LOCAL,
REGION,
STAGING_JSON_DIR,
STAGING_PARQUET_DIR,
fs,
lock_generate,
)
from .settings import LOCAL, PROCESSED_DIR, REGION, STAGING_DIR, fs, lock_generate


@task(log_prints=True)
Expand All @@ -31,21 +23,6 @@ def generate(scale: float, path: os.PathLike) -> None:
with duckdb.connect() as con:
con.install_extension("tpch")
con.load_extension("tpch")

if str(path).startswith("s3://"):
session = botocore.session.Session()
creds = session.get_credentials()
con.install_extension("httpfs")
con.load_extension("httpfs")
con.sql(
f"""
SET s3_region='{REGION}';
SET s3_access_key_id='{creds.access_key}';
SET s3_secret_access_key='{creds.secret_key}';
SET s3_session_token='{creds.token}';
"""
)

Comment on lines -34 to -48
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to the directory structure. We don't need to do this configuration because we're not reading / writing with duckdb.

con.sql(
f"""
SET memory_limit='{psutil.virtual_memory().available // 2**30 }G';
Expand All @@ -67,8 +44,8 @@ def generate(scale: float, path: os.PathLike) -> None:
)
for table in map(str, tables):
if table in static_tables and (
list((STAGING_JSON_DIR / table).rglob("*.json"))
or list((STAGING_PARQUET_DIR / table).rglob("*.parquet"))
list((STAGING_DIR / table).rglob("*.json"))
or list((PROCESSED_DIR / table).rglob("*.parquet"))
):
print(f"Static table {table} already exists")
continue
Expand Down Expand Up @@ -97,6 +74,6 @@ def generate_data():
with lock_generate:
generate(
scale=0.01,
path=STAGING_JSON_DIR,
path=STAGING_DIR,
)
generate.fn.client.restart()
generate.fn.client.restart(wait_for_workers=False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unrelated bugfix for running locally (xref dask/distributed#8534)

16 changes: 8 additions & 8 deletions pipeline/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from prefect.tasks import exponential_backoff

from .settings import (
ARCHIVE_DIR,
LOCAL,
RAW_JSON_DIR,
PROCESSED_DIR,
REGION,
STAGING_JSON_DIR,
STAGING_PARQUET_DIR,
STAGING_DIR,
fs,
lock_compact,
lock_json_to_parquet,
Expand All @@ -36,7 +36,7 @@ def json_file_to_parquet(file):
"""Convert raw JSON data file to Parquet."""
print(f"Processing {file}")
df = pd.read_json(file, lines=True)
outfile = STAGING_PARQUET_DIR / file.parent.name
outfile = PROCESSED_DIR / file.parent.name
fs.makedirs(outfile.parent, exist_ok=True)
data = pa.Table.from_pandas(df, preserve_index=False)
deltalake.write_deltalake(
Expand All @@ -48,15 +48,15 @@ def json_file_to_parquet(file):

@task
def archive_json_file(file):
outfile = RAW_JSON_DIR / file.relative_to(STAGING_JSON_DIR)
outfile = ARCHIVE_DIR / file.relative_to(STAGING_DIR)
fs.makedirs(outfile.parent, exist_ok=True)
fs.mv(str(file), str(outfile))

return outfile


def list_new_json_files():
return list(STAGING_JSON_DIR.rglob("*.json"))
return list(STAGING_DIR.rglob("*.json"))


@flow(log_prints=True)
Expand Down Expand Up @@ -88,9 +88,9 @@ def compact(table):

@task
def list_tables():
if not fs.exists(STAGING_PARQUET_DIR):
if not fs.exists(PROCESSED_DIR):
return []
directories = fs.ls(STAGING_PARQUET_DIR, refresh=True)
directories = fs.ls(PROCESSED_DIR, refresh=True)
return directories


Expand Down
16 changes: 8 additions & 8 deletions pipeline/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

from .settings import (
LOCAL,
REDUCED_DATA_DIR,
PROCESSED_DIR,
REGION,
STAGING_PARQUET_DIR,
RESULTS_DIR,
fs,
lock_compact,
storage_options,
Expand Down Expand Up @@ -38,22 +38,22 @@ def save_query(region, part_type):
with cluster.get_client():
size = 15
region_ds = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "region"),
str(PROCESSED_DIR / "region"),
delta_storage_options=storage_options,
)
nation_filtered = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "nation"),
str(PROCESSED_DIR / "nation"),
delta_storage_options=storage_options,
)
supplier_filtered = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "supplier"),
str(PROCESSED_DIR / "supplier"),
delta_storage_options=storage_options,
)
part_filtered = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "part"), delta_storage_options=storage_options
str(PROCESSED_DIR / "part"), delta_storage_options=storage_options
)
partsupp_filtered = dask_deltatable.read_deltalake(
str(STAGING_PARQUET_DIR / "partsupp"),
str(PROCESSED_DIR / "partsupp"),
delta_storage_options=storage_options,
)

Expand Down Expand Up @@ -124,7 +124,7 @@ def save_query(region, part_type):
.head(100)
)

outfile = REDUCED_DATA_DIR / region / part_type / "result.snappy.parquet"
outfile = RESULTS_DIR / region / part_type / "result.snappy.parquet"
fs.makedirs(outfile.parent, exist_ok=True)
result.to_parquet(outfile, compression="snappy")

Expand Down
32 changes: 15 additions & 17 deletions pipeline/settings.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
import boto3
import fsspec
import yaml
from filelock import FileLock
from upath import UPath as Path

LOCAL = True
# LOCAL = False
with open(Path(__file__).parent / "config.yml", "rb") as f:
data = yaml.safe_load(f)

LOCAL = data["local"]
ROOT = Path(data["data-dir"]).resolve()
fs = fsspec.filesystem(ROOT.protocol, use_listings_cache=False)

if LOCAL:
ROOT = Path(__file__).parent.parent.resolve() / "data"
fs = fsspec.filesystem("local")
REGION = None
storage_options = {}
else:
# TODO: Make the cloud path nicer (e.g. s3://coiled-datasets-rp)
ROOT = Path("s3://openscapes-scratch/jrbourbeau/etl-tpch/data")
fs = fsspec.filesystem("s3", use_listings_cache=False)
# Find cloud region being used
bucket = str(ROOT).replace("s3://", "").split("/")[0]
resp = boto3.client("s3").get_bucket_location(Bucket=bucket)
REGION = resp["LocationConstraint"] or "us-east-1"
storage_options = {"AWS_REGION": REGION, "AWS_S3_ALLOW_UNSAFE_RENAME": "true"}

STAGING_JSON_DIR = ROOT / "staged" / "json"
STAGING_PARQUET_DIR = ROOT / "staged" / "parquet"
RAW_JSON_DIR = ROOT / "raw" / "json"
RAW_PARQUET_DIR = ROOT / "raw" / "parquet"
PROCESSED_DATA_DIR = ROOT / "processed"
REDUCED_DATA_DIR = ROOT / "reduced"
STAGING_DIR = ROOT / "staging" # Input JSON files
PROCESSED_DIR = ROOT / "processed" # Processed Parquet files
RESULTS_DIR = ROOT / "results" # Reduced/aggrgated results
ARCHIVE_DIR = ROOT / "archive" # Archived JSON files
MODEL_FILE = ROOT.parent / "model.json"
MODEL_SERVER_FILE = ROOT.parent / "serve_model.py"

lock_generate = FileLock("generate.lock")
lock_json_to_parquet = FileLock("json_to_parquet.lock")
lock_compact = FileLock("compact.lock")
lock_dir = Path(__file__).parent.parent / ".locks"
lock_generate = FileLock(lock_dir / "generate.lock")
lock_json_to_parquet = FileLock(lock_dir / "json.lock")
lock_compact = FileLock(lock_dir / "compact.lock")
4 changes: 2 additions & 2 deletions pipeline/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import xgboost as xgb
from prefect import flow, task

from .settings import LOCAL, MODEL_FILE, REDUCED_DATA_DIR, REGION, Path, fs
from .settings import LOCAL, MODEL_FILE, REGION, RESULTS_DIR, Path, fs


@task
Expand Down Expand Up @@ -37,7 +37,7 @@ def train(file):


def list_training_data_files():
data_dir = REDUCED_DATA_DIR / "europe" / "brass"
data_dir = RESULTS_DIR / "europe" / "brass"
return list(data_dir.rglob("*.parquet"))


Expand Down
Loading