Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Builder: additional workflow steps #351

Merged
merged 6 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ def do_build(args: CensusBuildArgs, skip_completed_steps: bool = False) -> int:
do_build_soma,
do_validate_soma,
do_create_reports,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also sync reports, and I don't think that's currently handled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was not in the build process the last time I checked, but is easily added.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't, I just realized the omission today while doing the release follow-up tasks. Ty!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

do_data_copy,
do_the_release,
do_report_copy,
do_old_release_cleanup,
]
try:
for n, build_step in enumerate(build_steps, start=1):
Expand All @@ -75,11 +79,15 @@ def do_build(args: CensusBuildArgs, skip_completed_steps: bool = False) -> int:
args.state.commit(args.working_dir / CENSUS_BUILD_STATE)
logging.info(f"{step_n_of}: complete")

logging.info("Census build: completed")

# And last, last, last ... stash the logs
do_log_copy(args)

except Exception:
logging.critical("Caught exception, exiting", exc_info=True)
return 1

logging.info("Census build: completed")
return 0


Expand All @@ -100,7 +108,7 @@ def do_prebuild_checks(args: CensusBuildArgs) -> bool:
build_tag = args.config.build_tag
assert build_tag is not None
s3path = urlcat(args.config.cellxgene_census_S3_path, build_tag)
if s3fs.S3FileSystem(anon=True).exists(s3path):
if s3fs.S3FileSystem(anon=False).exists(s3path):
logging.error(f"Build tag {build_tag} already exists at {s3path}.")
return False

Expand Down Expand Up @@ -130,7 +138,7 @@ def do_validate_soma(args: CensusBuildArgs) -> bool:
def do_create_reports(args: CensusBuildArgs) -> bool:
from .census_summary import display_diff, display_summary

reports_dir = args.working_dir / "reports"
reports_dir = args.working_dir / args.config.reports_dir
reports_dir.mkdir(parents=True, exist_ok=True)

logging.info("Creating summary report")
Expand All @@ -144,6 +152,75 @@ def do_create_reports(args: CensusBuildArgs) -> bool:
return True


def do_data_copy(args: CensusBuildArgs) -> bool:
"""Copy data to S3, in preparation for a release"""
from .data_copy import sync_to_S3

sync_to_S3(
args.working_dir / args.build_tag,
urlcat(args.config.cellxgene_census_S3_path, args.build_tag),
dryrun=args.config.dryrun,
)
return True


def do_the_release(args: CensusBuildArgs) -> bool:
from .release_manifest import CensusVersionDescription, make_a_release

release: CensusVersionDescription = {
"release_date": None,
"release_build": args.build_tag,
"soma": {
"uri": urlcat(args.config.cellxgene_census_S3_path, args.build_tag, "soma/"),
"s3_region": "us-west-2",
},
"h5ads": {
"uri": urlcat(args.config.cellxgene_census_S3_path, args.build_tag, "h5ads/"),
"s3_region": "us-west-2",
},
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this release declaration could be moved into make_a_release, which would allow it to be unit tested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushing all of the data layout assumptions down to the module feels like it pollutes the abstraction. I'll add some tests for the main.do_* methods instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added tests for all of the new workflow steps, called from the top-level

make_a_release(
args.config.cellxgene_census_S3_path, args.build_tag, release, make_latest=True, dryrun=args.config.dryrun
)
return True


def do_report_copy(args: CensusBuildArgs) -> bool:
"""Copy build summary reports to S3 for posterity."""
from .data_copy import sync_to_S3

sync_to_S3(
args.working_dir / args.config.reports_dir,
urlcat(args.config.logs_S3_path, args.build_tag, args.config.reports_dir),
dryrun=args.config.dryrun,
)
return True


def do_old_release_cleanup(args: CensusBuildArgs) -> bool:
"""Clean up old releases"""
from .release_cleanup import remove_releases_older_than

remove_releases_older_than(
days=args.config.release_cleanup_days,
census_base_url=args.config.cellxgene_census_S3_path,
dryrun=args.config.dryrun,
)
return True


def do_log_copy(args: CensusBuildArgs) -> bool:
"""Copy logs to S3 for posterity. Should be the final step, to capture full output of build"""
from .data_copy import sync_to_S3

sync_to_S3(
args.working_dir / args.config.log_dir,
urlcat(args.config.logs_S3_path, args.build_tag, args.config.log_dir),
dryrun=args.config.dryrun,
)
return True


def create_args_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="cellxgene_census_builder", description="Build the official Census.")
parser.add_argument("working_dir", type=str, help="Working directory for the build")
Expand All @@ -156,4 +233,5 @@ def create_args_parser() -> argparse.ArgumentParser:
return parser


sys.exit(main())
if __name__ == "__main__":
sys.exit(main())
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
"verbose": 1,
"log_dir": "logs",
"log_file": "build.log",
"reports_dir": "reports",
"consolidate": True,
"disable_dirty_git_check": True,
"dryrun": False, # if True, will disable copy of data/logs/reports/release.json to S3 buckets. Will NOT disable local build, etc.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

#
# Paths and census version name determined by spec.
"cellxgene_census_S3_path": "s3://cellxgene-data-public/cell-census",
"logs_S3_path": "s3://cellxgene-data-public-logs/builder",
"build_tag": datetime.now().astimezone().date().isoformat(),
#
# Default multi-process. Memory scaling based on empirical tests.
Expand All @@ -42,6 +45,9 @@
"host_validation_min_swap_memory": 2 * 1024**4, # 2TiB
"host_validation_min_free_disk_space": 1 * 1024**4, # 1 TiB
#
# Release clean up
"release_cleanup_days": 32, # Census builds older than this are deleted
#
# For testing convenience only
"manifest": None,
"test_first_n": None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import argparse
import logging
import pathlib
import subprocess
import sys
from typing import Union

from .logging import logging_init_params


def sync_to_S3(from_path: Union[str, pathlib.PosixPath], to_path: str, dryrun: bool = False) -> None:
"""Copy (sync) local directory to S3.

Equivalent of `aws s3 sync local_directory_path S3_path`.

Raises upon error.
"""
from_path = pathlib.PosixPath(from_path).absolute()
if not from_path.is_dir():
raise ValueError(f"Local path is not a directory: {from_path.as_posix()}")
if not to_path.startswith("s3://"):
raise ValueError(f"S3_path argument does not appear to be an S3 path: {to_path}")

cmd = ["aws", "s3", "sync", from_path.as_posix(), to_path, "--no-progress"]
if dryrun:
cmd += ["--dryrun"]

returncode = -1
try:
_log_it(f"Starting copy {from_path.as_posix()} -> {to_path}", dryrun)
with subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, text=True) as proc:
print(proc.returncode)
if proc.stdout is not None:
for line in proc.stdout:
logging.info(line)

returncode = proc.returncode
if returncode:
raise subprocess.CalledProcessError(returncode, proc.args)

finally:
_log_it(f"Completed copy, return code {returncode}, {from_path.as_posix()} -> {to_path}", dryrun)


def _log_it(msg: str, dryrun: bool) -> None:
logging.info(f"{'(dryrun) ' if dryrun else ''}{msg}")


def main() -> int:
description = """Sync (copy) a local directory to an S3 location."""
epilog = """Example:

python -m cellxgene_census_builder.data_copy /tmp/data/ s3://bucket/path/ --dryrun
"""
parser = argparse.ArgumentParser(
prog="cellxgene_census_builder.data_copy",
formatter_class=argparse.RawDescriptionHelpFormatter,
description=description,
epilog=epilog,
)
parser.add_argument("from_path", type=str, help="Data source, specified as a local path, e.g., /home/me/files/")
parser.add_argument("to_path", type=str, help="S3 path data is copied to, e.g., s3://bucket/path/")
parser.add_argument(
"--dryrun",
action=argparse.BooleanOptionalAction,
default=True,
help="Skips S3 data copies. Useful for previewing actions. Default: True",
)
parser.add_argument("-v", "--verbose", action="count", default=1, help="Increase logging verbosity")
args = parser.parse_args()

# Configure the logger.
logging_init_params(args.verbose)

sync_to_S3(args.from_path, args.to_path, args.dryrun)
return 0


if __name__ == "__main__":
sys.exit(main())
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

def remove_releases_older_than(days: int, census_base_url: str, dryrun: bool, s3_anon: bool = False) -> None:
"""
Remove old releases, commiting the change to release.json.
Remove old releases, committing the change to release.json.

Current rules - delete releases where:
* Tag is a date older than `days` in age
Expand Down Expand Up @@ -74,7 +74,7 @@ def _update_release_manifest(
latest_tag = new_manifest["latest"]
_log_it(f"Commiting updated release.json with latest={latest_tag}", dryrun)
if not dryrun:
commit_release_manifest(census_base_url, new_manifest)
commit_release_manifest(census_base_url, new_manifest, dryrun=dryrun)


def _perform_recursive_delete(rls_tag: CensusVersionName, uri: str, dryrun: bool) -> None:
Expand Down Expand Up @@ -117,7 +117,7 @@ def main() -> int:
python -m cellxgene_census_builder.release_cleanup s3://cellxgene-data-public/cell-census/ --days 32 --dryrun
"""
parser = argparse.ArgumentParser(
prog="cellxgene_census_summary.release_cleanup",
prog="cellxgene_census_builder.release_cleanup",
formatter_class=argparse.RawDescriptionHelpFormatter,
description=description,
epilog=epilog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,23 @@ def get_release_manifest(census_base_url: str, s3_anon: bool = False) -> CensusR
return cast(CensusReleaseManifest, json.loads(f.read()))


def commit_release_manifest(census_base_url: str, release_manifest: CensusReleaseManifest) -> None:
def commit_release_manifest(
census_base_url: str, release_manifest: CensusReleaseManifest, dryrun: bool = False
) -> None:
"""
Write a new release manifest to the Census.
"""
# Out of an abundance of caution, validate the contents
validate_release_manifest(census_base_url, release_manifest)
_overwrite_release_manifest(census_base_url, release_manifest)
if not dryrun:
_overwrite_release_manifest(census_base_url, release_manifest)


def _overwrite_release_manifest(census_base_url: str, release_manifest: CensusReleaseManifest) -> None:
# This is a stand-alone function for ease of testing/mocking.
s3 = s3fs.S3FileSystem(anon=False)
with s3.open(urlcat(census_base_url, CENSUS_RELEASE_FILE), mode="w") as f:
f.write(json.dumps(release_manifest))
f.write(json.dumps(release_manifest, indent=2))


def validate_release_manifest(
Expand Down Expand Up @@ -128,3 +131,26 @@ def _validate_exists(rls_info: CensusVersionDescription, s3_anon: bool) -> None:
uri = rls_info["h5ads"]["uri"]
if not s3.isdir(uri):
raise ValueError(f"H5ADS URL in release.json does not exist {uri}")


def make_a_release(
census_base_url: str,
rls_tag: CensusVersionName,
rls_info: CensusVersionDescription,
make_latest: bool,
dryrun: bool = False,
) -> None:
"""
Make a release and optionally alias release as `latest`
"""

manifest = get_release_manifest(census_base_url)
if rls_tag in manifest:
raise ValueError(f"Release version {rls_tag} is already in the release manifest")
manifest[rls_tag] = rls_info

if make_latest:
manifest["latest"] = rls_tag

# Will validate, and raise on anything suspicious
commit_release_manifest(census_base_url, manifest, dryrun=dryrun)
Loading