Skip to content

Commit

Permalink
Introduce --dataset-only flag for ferc_to_sqlite.
Browse files Browse the repository at this point in the history
Restrict processing to, say, ferc1_dbf or ferc2_xbrl dataset. This is
intended for ci-integration parallelism.
  • Loading branch information
rousik committed Dec 13, 2023
1 parent 88c3255 commit 7531882
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 8 deletions.
13 changes: 11 additions & 2 deletions src/pudl/extract/dbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import IO, Any, Protocol, Self

import pandas as pd
from pudl.resources import RuntimeSettings
import sqlalchemy as sa
from dagster import op
from dbfread import DBF, FieldParser
Expand Down Expand Up @@ -472,19 +473,27 @@ def get_dagster_op(cls) -> Callable:
"""Returns dagstger op that runs this extractor."""

@op(
name=f"dbf_{cls.DATASET}",
name=f"{cls.DATASET}_dbf",
required_resource_keys={
"ferc_to_sqlite_settings",
"datastore",
"runtime_settings",
},
)
def inner_method(context) -> None:
rs: RuntimeSettings = context.resources.runtime_settings
"""Instantiates dbf extractor and runs it."""
if rs.dataset_only and rs.dataset_only.lower() != f"{cls.DATASET.lower()}_dbf":
logger.info(
f"Skipping dataset {cls.DATASET} because it is not in the "
f"dataset_only list."
)
return

dbf_extractor = cls(
datastore=context.resources.datastore,
settings=context.resources.ferc_to_sqlite_settings,
clobber=context.resources.runtime_settings.clobber,
clobber=rs.clobber,
output_path=PudlPaths().output_dir,
)
dbf_extractor.execute()
Expand Down
15 changes: 11 additions & 4 deletions src/pudl/extract/xbrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,28 @@ def xbrl2sqlite_op_factory(form: XbrlFormNumber) -> Callable:
)
def inner_op(context) -> None:
output_path = PudlPaths().output_dir
runtime_settings: RuntimeSettings = context.resources.runtime_settings
rs: RuntimeSettings = context.resources.runtime_settings
settings = context.resources.ferc_to_sqlite_settings.get_xbrl_dataset_settings(
form
)
datastore = FercXbrlDatastore(context.resources.datastore)

logger.info(f"====== xbrl2sqlite runtime_settings: {rs}")
if settings is None or settings.disabled:
logger.info(
f"Skipping dataset ferc{form.value}_xbrl: no config or is disabled."
)
return

if rs.dataset_only and rs.dataset_only.lower() != f"ferc{form.value}_xbrl":
logger.info(
f"Skipping dataset ferc{form.value}_xbrl because of dataset_only exclusion."
)
return

sql_path = PudlPaths().sqlite_db_path(f"ferc{form.value}_xbrl")
if sql_path.exists():
if runtime_settings.clobber:
if rs.clobber:
sql_path.unlink()
else:
raise RuntimeError(
Expand All @@ -86,8 +93,8 @@ def inner_op(context) -> None:
datastore,
output_path=output_path,
sql_path=sql_path,
batch_size=runtime_settings.xbrl_batch_size,
workers=runtime_settings.xbrl_num_workers,
batch_size=rs.xbrl_batch_size,
workers=rs.xbrl_num_workers,
)

return inner_op
Expand Down
18 changes: 16 additions & 2 deletions src/pudl/ferc_to_sqlite/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ def get_ferc_to_sqlite_job():
),
default="INFO",
)
@click.option(
"--dataset-only",
type=str,
help=(
"If specified, restricts processing to only a given dataset. This is"
"expected to be in the form of ferc1_dbf, ferc1_xbrl. "
"This is intended for ci-integration purposes where we fan-out the "
"execution into several parallel small jobs that should finish faster. "
"Other operations are still going to be invoked, but they will terminate "
"early if this setting is in use."
)
)
def main(
etl_settings_yml: pathlib.Path,
batch_size: int,
Expand All @@ -149,6 +161,7 @@ def main(
gcs_cache_path: str,
logfile: pathlib.Path,
loglevel: str,
dataset_only: str,
):
"""Use Dagster to convert FERC data fom DBF and XBRL to SQLite databases.
Expand Down Expand Up @@ -178,9 +191,10 @@ def main(
},
"runtime_settings": {
"config": {
"workers": workers,
"batch_size": batch_size,
"xbrl_num_workers": workers,
"xbrl_batch_size": batch_size,
"clobber": clobber,
"dataset_only": dataset_only,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions src/pudl/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class RuntimeSettings(ConfigurableResource):
clobber: bool = False
xbrl_num_workers: None | int = None
xbrl_batch_size: int = 50
dataset_only: str = ""


@resource(config_schema=create_dagster_config(DatasetsSettings()))
Expand Down

0 comments on commit 7531882

Please sign in to comment.