From 36bcd72e33616529945f3bf3af8121fad877e8e6 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 27 Feb 2024 10:33:58 -0600 Subject: [PATCH] Update data directory structure (#22) --- dashboard.py | 4 ++-- pipeline/config.yml | 6 ++++++ pipeline/data.py | 33 +++++---------------------------- pipeline/preprocess.py | 16 ++++++++-------- pipeline/reduce.py | 16 ++++++++-------- pipeline/settings.py | 32 +++++++++++++++----------------- pipeline/train.py | 4 ++-- 7 files changed, 46 insertions(+), 65 deletions(-) create mode 100644 pipeline/config.yml diff --git a/dashboard.py b/dashboard.py index 8401380..c54f0a4 100644 --- a/dashboard.py +++ b/dashboard.py @@ -2,13 +2,13 @@ import plotly.express as px import streamlit as st -from pipeline.settings import REDUCED_DATA_DIR +from pipeline.settings import RESULTS_DIR @st.cache_data def get_data(region, part_type): return dd.read_parquet( - REDUCED_DATA_DIR / region / part_type.upper() / "*.parquet" + RESULTS_DIR / region / part_type.upper() / "*.parquet" ).compute() diff --git a/pipeline/config.yml b/pipeline/config.yml new file mode 100644 index 0000000..b996b07 --- /dev/null +++ b/pipeline/config.yml @@ -0,0 +1,6 @@ +# Whether to run data-processing tasks locally +# or on the cloud with Coiled. +local: true +# Output location for data files. Can be a local directory +# or a remote path like "s3://path/to/bucket". +data-dir: ./data diff --git a/pipeline/data.py b/pipeline/data.py index 9f2eb45..b22db73 100644 --- a/pipeline/data.py +++ b/pipeline/data.py @@ -1,21 +1,13 @@ import datetime import os -import botocore.session import coiled import duckdb import psutil from dask.distributed import print from prefect import flow, task -from .settings import ( - LOCAL, - REGION, - STAGING_JSON_DIR, - STAGING_PARQUET_DIR, - fs, - lock_generate, -) +from .settings import LOCAL, PROCESSED_DIR, REGION, STAGING_DIR, fs, lock_generate @task(log_prints=True) @@ -31,21 +23,6 @@ def generate(scale: float, path: os.PathLike) -> None: with duckdb.connect() as con: con.install_extension("tpch") con.load_extension("tpch") - - if str(path).startswith("s3://"): - session = botocore.session.Session() - creds = session.get_credentials() - con.install_extension("httpfs") - con.load_extension("httpfs") - con.sql( - f""" - SET s3_region='{REGION}'; - SET s3_access_key_id='{creds.access_key}'; - SET s3_secret_access_key='{creds.secret_key}'; - SET s3_session_token='{creds.token}'; - """ - ) - con.sql( f""" SET memory_limit='{psutil.virtual_memory().available // 2**30 }G'; @@ -67,8 +44,8 @@ def generate(scale: float, path: os.PathLike) -> None: ) for table in map(str, tables): if table in static_tables and ( - list((STAGING_JSON_DIR / table).rglob("*.json")) - or list((STAGING_PARQUET_DIR / table).rglob("*.parquet")) + list((STAGING_DIR / table).rglob("*.json")) + or list((PROCESSED_DIR / table).rglob("*.parquet")) ): print(f"Static table {table} already exists") continue @@ -97,6 +74,6 @@ def generate_data(): with lock_generate: generate( scale=0.01, - path=STAGING_JSON_DIR, + path=STAGING_DIR, ) - generate.fn.client.restart() + generate.fn.client.restart(wait_for_workers=False) diff --git a/pipeline/preprocess.py b/pipeline/preprocess.py index 0a64787..2c15cf6 100644 --- a/pipeline/preprocess.py +++ b/pipeline/preprocess.py @@ -7,11 +7,11 @@ from prefect.tasks import exponential_backoff from .settings import ( + ARCHIVE_DIR, LOCAL, - RAW_JSON_DIR, + PROCESSED_DIR, REGION, - STAGING_JSON_DIR, - STAGING_PARQUET_DIR, + STAGING_DIR, fs, lock_compact, lock_json_to_parquet, @@ -36,7 +36,7 @@ def json_file_to_parquet(file): """Convert raw JSON data file to Parquet.""" print(f"Processing {file}") df = pd.read_json(file, lines=True) - outfile = STAGING_PARQUET_DIR / file.parent.name + outfile = PROCESSED_DIR / file.parent.name fs.makedirs(outfile.parent, exist_ok=True) data = pa.Table.from_pandas(df, preserve_index=False) deltalake.write_deltalake( @@ -48,7 +48,7 @@ def json_file_to_parquet(file): @task def archive_json_file(file): - outfile = RAW_JSON_DIR / file.relative_to(STAGING_JSON_DIR) + outfile = ARCHIVE_DIR / file.relative_to(STAGING_DIR) fs.makedirs(outfile.parent, exist_ok=True) fs.mv(str(file), str(outfile)) @@ -56,7 +56,7 @@ def archive_json_file(file): def list_new_json_files(): - return list(STAGING_JSON_DIR.rglob("*.json")) + return list(STAGING_DIR.rglob("*.json")) @flow(log_prints=True) @@ -88,9 +88,9 @@ def compact(table): @task def list_tables(): - if not fs.exists(STAGING_PARQUET_DIR): + if not fs.exists(PROCESSED_DIR): return [] - directories = fs.ls(STAGING_PARQUET_DIR, refresh=True) + directories = fs.ls(PROCESSED_DIR, refresh=True) return directories diff --git a/pipeline/reduce.py b/pipeline/reduce.py index 18c0a62..cd9dd0b 100644 --- a/pipeline/reduce.py +++ b/pipeline/reduce.py @@ -8,9 +8,9 @@ from .settings import ( LOCAL, - REDUCED_DATA_DIR, + PROCESSED_DIR, REGION, - STAGING_PARQUET_DIR, + RESULTS_DIR, fs, lock_compact, storage_options, @@ -38,22 +38,22 @@ def save_query(region, part_type): with cluster.get_client(): size = 15 region_ds = dask_deltatable.read_deltalake( - str(STAGING_PARQUET_DIR / "region"), + str(PROCESSED_DIR / "region"), delta_storage_options=storage_options, ) nation_filtered = dask_deltatable.read_deltalake( - str(STAGING_PARQUET_DIR / "nation"), + str(PROCESSED_DIR / "nation"), delta_storage_options=storage_options, ) supplier_filtered = dask_deltatable.read_deltalake( - str(STAGING_PARQUET_DIR / "supplier"), + str(PROCESSED_DIR / "supplier"), delta_storage_options=storage_options, ) part_filtered = dask_deltatable.read_deltalake( - str(STAGING_PARQUET_DIR / "part"), delta_storage_options=storage_options + str(PROCESSED_DIR / "part"), delta_storage_options=storage_options ) partsupp_filtered = dask_deltatable.read_deltalake( - str(STAGING_PARQUET_DIR / "partsupp"), + str(PROCESSED_DIR / "partsupp"), delta_storage_options=storage_options, ) @@ -124,7 +124,7 @@ def save_query(region, part_type): .head(100) ) - outfile = REDUCED_DATA_DIR / region / part_type / "result.snappy.parquet" + outfile = RESULTS_DIR / region / part_type / "result.snappy.parquet" fs.makedirs(outfile.parent, exist_ok=True) result.to_parquet(outfile, compression="snappy") diff --git a/pipeline/settings.py b/pipeline/settings.py index 9824e1f..137f5a7 100644 --- a/pipeline/settings.py +++ b/pipeline/settings.py @@ -1,35 +1,33 @@ import boto3 import fsspec +import yaml from filelock import FileLock from upath import UPath as Path -LOCAL = True -# LOCAL = False +with open(Path(__file__).parent / "config.yml", "rb") as f: + data = yaml.safe_load(f) + +LOCAL = data["local"] +ROOT = Path(data["data-dir"]).resolve() +fs = fsspec.filesystem(ROOT.protocol, use_listings_cache=False) if LOCAL: - ROOT = Path(__file__).parent.parent.resolve() / "data" - fs = fsspec.filesystem("local") REGION = None storage_options = {} else: - # TODO: Make the cloud path nicer (e.g. s3://coiled-datasets-rp) - ROOT = Path("s3://openscapes-scratch/jrbourbeau/etl-tpch/data") - fs = fsspec.filesystem("s3", use_listings_cache=False) - # Find cloud region being used bucket = str(ROOT).replace("s3://", "").split("/")[0] resp = boto3.client("s3").get_bucket_location(Bucket=bucket) REGION = resp["LocationConstraint"] or "us-east-1" storage_options = {"AWS_REGION": REGION, "AWS_S3_ALLOW_UNSAFE_RENAME": "true"} -STAGING_JSON_DIR = ROOT / "staged" / "json" -STAGING_PARQUET_DIR = ROOT / "staged" / "parquet" -RAW_JSON_DIR = ROOT / "raw" / "json" -RAW_PARQUET_DIR = ROOT / "raw" / "parquet" -PROCESSED_DATA_DIR = ROOT / "processed" -REDUCED_DATA_DIR = ROOT / "reduced" +STAGING_DIR = ROOT / "staging" # Input JSON files +PROCESSED_DIR = ROOT / "processed" # Processed Parquet files +RESULTS_DIR = ROOT / "results" # Reduced/aggrgated results +ARCHIVE_DIR = ROOT / "archive" # Archived JSON files MODEL_FILE = ROOT.parent / "model.json" MODEL_SERVER_FILE = ROOT.parent / "serve_model.py" -lock_generate = FileLock("generate.lock") -lock_json_to_parquet = FileLock("json_to_parquet.lock") -lock_compact = FileLock("compact.lock") +lock_dir = Path(__file__).parent.parent / ".locks" +lock_generate = FileLock(lock_dir / "generate.lock") +lock_json_to_parquet = FileLock(lock_dir / "json.lock") +lock_compact = FileLock(lock_dir / "compact.lock") diff --git a/pipeline/train.py b/pipeline/train.py index c3a6f91..e96c557 100644 --- a/pipeline/train.py +++ b/pipeline/train.py @@ -5,7 +5,7 @@ import xgboost as xgb from prefect import flow, task -from .settings import LOCAL, MODEL_FILE, REDUCED_DATA_DIR, REGION, Path, fs +from .settings import LOCAL, MODEL_FILE, REGION, RESULTS_DIR, Path, fs @task @@ -37,7 +37,7 @@ def train(file): def list_training_data_files(): - data_dir = REDUCED_DATA_DIR / "europe" / "brass" + data_dir = RESULTS_DIR / "europe" / "brass" return list(data_dir.rglob("*.parquet"))