Skip to content

Commit

Permalink
feature: update builder tiledbsoma release (#154)
Browse files Browse the repository at this point in the history
* update builder for new soma create api

child objects created directly from parents

* update builder for new soma create api

- child objects created directly from parents
- validator performs reads using open() context manager

* fixes

* lint

* typing

* update unit tests for soma api updates

* fix ordering of builder steps, enhance builder unit test

* lint, typing

* move functions

* reinstate writing of presence matrix; improve comments

* upgrade to tiledbsoma=0.5.0a10

* lint

* more localized open experiment context managers

less sharing of the context managers; mainly this avoided trying to pass
opened state into child processes, which did not work (pickling)

* fix log output

* pickling fix

* tweaks: logging, imports, comments, @attrs

* pickling fix for validator

* fixes for standalone validator usage

* add TODOs for CR feedback

* validation improvements from CR feedback

* refactored build steps to move anndata iterations together

* combine filtering and axis steps for single dataset iteration

* lint

* type checking fixes

* tweaks from CR comments

* builder steps doc & log tweaks

* log tweak

* fix presence matrix build

include all census datasets in each presence matrix

* upgrade tiledbsoma to 0.5.0a11

* fix builder log messages

* presence matrix only includes datasets from parent experiment

* allow for missing presence data

(dev time testing, one organism data only)

* disable status checks for codecov

* create TileDB arrays with allow_dups enabled

* update census schema version to match spec - 0.1.1

* add broken process pool logger

* add broken process pool logging

* fix memory usage and several other bugs

* simply consolidation code

* lint

* lint

* port to 1.0rc0

* tune logging levels

* revert allows_duplicates change

---------

Co-authored-by: bkmartinjr <[email protected]>
  • Loading branch information
atolopko-czi and bkmartinjr authored Feb 23, 2023
1 parent 2e6a569 commit 85bf654
Show file tree
Hide file tree
Showing 14 changed files with 635 additions and 544 deletions.
4 changes: 4 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
coverage:
status:
project: off
patch: off
11 changes: 7 additions & 4 deletions tools/cell_census_builder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ TL;DR:

The build process:

- Pass 1: stage all source H5AD files
- Pass 2: build the axis dataframes for each experiment. This is a single-threaded pass, building dense dataframes.
- Pass 3: build the X layers for each experiment. This is a concurrent pass, reading/writing X layers in parallel.
- Pass 4: optional, validate the above
- Step 1: Retrieve all source H5AD files, storing locally (parallelized, I/O-bound)
- Step 2: Create root collection and child objects (fast).
- Step 3: Write the axis dataframes for each experiment, filtering the datasets and cells to include (serialized iteration of dataset H5ADs).
- Step 4: Write the X layers for each experiment (parallelized iteration of filtered dataset H5ADs).
- Step 5: Write datasets manifest and summary info.
- (Optional) Consolidate TileDB data
- (Optional) Validate the entire Cell Census, re-reading from storage.

Modes of operation:
a) (default) creating the entire "cell census" using all files currently in the CELLxGENE repository.
Expand Down
216 changes: 124 additions & 92 deletions tools/cell_census_builder/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
import os.path
import sys
from datetime import datetime, timezone
from typing import List, Tuple
from typing import List

import tiledbsoma as soma

from .anndata import open_anndata
from .census_summary import create_census_summary
from .consolidate import consolidate
from .datasets import Dataset, assign_soma_joinids, create_dataset_manifest
from .experiment_builder import ExperimentBuilder, populate_X_layers
from .datasets import Dataset, assign_dataset_soma_joinids, create_dataset_manifest
from .experiment_builder import ExperimentBuilder, populate_X_layers, reopen_experiment_builders
from .globals import (
CENSUS_DATA_NAME,
CENSUS_INFO_NAME,
Expand All @@ -30,7 +30,7 @@
from .validate import validate


def make_experiment_builders(base_uri: str, args: argparse.Namespace) -> List[ExperimentBuilder]:
def make_experiment_builders() -> List[ExperimentBuilder]:
"""
Define all soma.Experiments to build in the census.
Expand All @@ -48,13 +48,11 @@ def make_experiment_builders(base_uri: str, args: argparse.Namespace) -> List[Ex
]
experiment_builders = [ # The soma.Experiments we want to build
ExperimentBuilder(
base_uri=base_uri,
name="homo_sapiens",
anndata_cell_filter_spec=dict(organism_ontology_term_id="NCBITaxon:9606", assay_ontology_term_ids=RNA_SEQ),
gene_feature_length_uris=GENE_LENGTH_URIS,
),
ExperimentBuilder(
base_uri=base_uri,
name="mus_musculus",
anndata_cell_filter_spec=dict(organism_ontology_term_id="NCBITaxon:10090", assay_ontology_term_ids=RNA_SEQ),
gene_feature_length_uris=GENE_LENGTH_URIS,
Expand All @@ -76,15 +74,12 @@ def main() -> int:
assets_path = uricat(args.uri, args.build_tag, "h5ads")

# create the experiment builders
experiment_builders = make_experiment_builders(uricat(soma_path, CENSUS_DATA_NAME), args)
experiment_builders = make_experiment_builders()

cc = 0
if args.subcommand == "build":
cc = build(args, soma_path, assets_path, experiment_builders)

# sanity check for build completion
assert cc != 0 or all(e.is_finished() for e in experiment_builders)

if cc == 0 and (args.subcommand == "validate" or args.validate):
validate(args, soma_path, assets_path, experiment_builders)

Expand Down Expand Up @@ -133,63 +128,56 @@ def build(
logging.error(e)
return 1

# Step 1 - get all source assets
datasets = build_step1_get_source_assets(args, assets_path)
# Step 1 - get all source datasets
datasets = build_step1_get_source_datasets(args, assets_path)

# Step 2 - build axis dataframes
top_level_collection, filtered_datasets = build_step2_create_axis(
soma_path, assets_path, datasets, experiment_builders, args
)
assign_soma_joinids(filtered_datasets)
logging.info(f"({len(filtered_datasets)} of {len(datasets)}) suitable for processing.")
# Step 2 - create root collection, and all child objects, but do not populate any dataframes or matrices
root_collection = build_step2_create_root_collection(soma_path, experiment_builders)
gc.collect()

# Step 3- create X layers
build_step3_create_X_layers(assets_path, filtered_datasets, experiment_builders, args)
# Step 3 - populate axes
filtered_datasets = build_step3_populate_obs_and_var_axes(assets_path, datasets, experiment_builders)

# Step 4 - populate X layers
build_step4_populate_X_layers(assets_path, filtered_datasets, experiment_builders, args)
gc.collect()

# Write out dataset manifest and summary information
create_dataset_manifest(top_level_collection[CENSUS_INFO_NAME], filtered_datasets)
create_census_summary_cell_counts(
top_level_collection[CENSUS_INFO_NAME], [e.census_summary_cell_counts for e in experiment_builders]
)
create_census_summary(top_level_collection[CENSUS_INFO_NAME], experiment_builders, args.build_tag)
# Step 5- write out dataset manifest and summary information
build_step5_populate_summary_info(root_collection, experiment_builders, filtered_datasets, args.build_tag)

for eb in experiment_builders:
eb.build_completed = True

# consolidate TileDB data
if args.consolidate:
consolidate(args, top_level_collection.uri)
consolidate(args, root_collection.uri)

return 0


def create_top_level_collections(soma_path: str) -> soma.Collection:
def populate_root_collection(root_collection: soma.Collection) -> soma.Collection:
"""
Create the top-level SOMA collections for the Census.
Create the root SOMA collection for the Census.
Returns the top-most collection.
Returns the root collection.
"""
top_level_collection = soma.Collection(soma_path, context=SOMA_TileDB_Context())
if top_level_collection.exists():
logging.error("Census already exists - aborting")
raise Exception("Census already exists - aborting")

top_level_collection.create()
# Set top-level metadata for the experiment
top_level_collection.metadata["created_on"] = datetime.now(tz=timezone.utc).isoformat(timespec="seconds")
top_level_collection.metadata["cxg_schema_version"] = CXG_SCHEMA_VERSION
top_level_collection.metadata["census_schema_version"] = CENSUS_SCHEMA_VERSION
# Set root metadata for the experiment
root_collection.metadata["created_on"] = datetime.now(tz=timezone.utc).isoformat(timespec="seconds")
root_collection.metadata["cxg_schema_version"] = CXG_SCHEMA_VERSION
root_collection.metadata["census_schema_version"] = CENSUS_SCHEMA_VERSION

sha = get_git_commit_sha()
top_level_collection.metadata["git_commit_sha"] = sha
root_collection.metadata["git_commit_sha"] = sha

# Create sub-collections for experiments, etc.
for n in [CENSUS_INFO_NAME, CENSUS_DATA_NAME]:
cltn = soma.Collection(uricat(top_level_collection.uri, n), context=SOMA_TileDB_Context()).create()
top_level_collection.set(n, cltn, relative=True)
root_collection.add_new_collection(n)

return top_level_collection
return root_collection


def build_step1_get_source_assets(args: argparse.Namespace, assets_path: str) -> List[Dataset]:
def build_step1_get_source_datasets(args: argparse.Namespace, assets_path: str) -> List[Dataset]:
logging.info("Build step 1 - get source assets - started")

# Load manifest defining the datasets
Expand All @@ -210,78 +198,122 @@ def build_step1_get_source_assets(args: argparse.Namespace, assets_path: str) ->
return datasets


def build_step2_create_axis(
soma_path: str,
assets_path: str,
datasets: List[Dataset],
experiment_builders: List[ExperimentBuilder],
args: argparse.Namespace,
) -> Tuple[soma.Collection, List[Dataset]]:
"""
Create all objects, and populate the axis dataframes.
Returns: the filtered datasets that will be included. This is simply
an optimization to allow subsequent X matrix writing to skip unused
datasets.
"""
logging.info("Build step 2 - axis creation - started")

top_level_collection = create_top_level_collections(soma_path)

# Create axis
for e in experiment_builders:
e.create(data_collection=top_level_collection[CENSUS_DATA_NAME])
assert soma.Experiment(e.se_uri).exists()

# Write obs axis and accumulate var axis (and remember the datasets that pass our filter)
def populate_obs_axis(
assets_path: str, datasets: List[Dataset], experiment_builders: List[ExperimentBuilder]
) -> List[Dataset]:
filtered_datasets = []
N = len(datasets) * len(experiment_builders)
n = 1
n = 0

for dataset, ad in open_anndata(assets_path, datasets, backed="r"):
dataset_total_cell_count = 0
for e in experiment_builders:
dataset_total_cell_count += e.accumulate_axes(dataset, ad, progress=(n, N))

for eb in reopen_experiment_builders(experiment_builders):
n += 1
logging.info(f"{eb.name}: filtering dataset '{dataset.dataset_id}' ({n} of {N})")
ad_filtered = eb.filter_anndata_cells(ad)

if len(ad_filtered.obs) == 0: # type:ignore
logging.info(f"{eb.name} - H5AD has no data after filtering, skipping {dataset.dataset_h5ad_path}")
continue

# append to `obs`; accumulate `var` data
dataset_total_cell_count += eb.accumulate_axes(dataset, ad_filtered)

dataset.dataset_total_cell_count = dataset_total_cell_count
# dataset passes filter if either experiment includes cells from the dataset
if dataset_total_cell_count > 0:
filtered_datasets.append(dataset)
dataset.dataset_total_cell_count = dataset_total_cell_count

for eb in experiment_builders:
logging.info(f"Experiment {eb.name} will contain {eb.n_obs} cells from {eb.n_datasets} datasets")

return filtered_datasets


def populate_var_axis_and_presence(experiment_builders: List[ExperimentBuilder]) -> None:
for eb in reopen_experiment_builders(experiment_builders):
# populate `var`; create empty `presence` now that we have its dimensions
eb.populate_var_axis()


def build_step2_create_root_collection(soma_path: str, experiment_builders: List[ExperimentBuilder]) -> soma.Collection:
"""
Create all objects
Returns: the root collection.
"""
logging.info("Build step 2 - Create root collection - started")

with soma.Collection.create(soma_path, context=SOMA_TileDB_Context()) as root_collection:
populate_root_collection(root_collection)

for e in experiment_builders:
e.create(census_data=root_collection[CENSUS_DATA_NAME])

logging.info("Build step 2 - Create root collection - finished")
return root_collection


def build_step3_populate_obs_and_var_axes(
assets_path: str,
datasets: List[Dataset],
experiment_builders: List[ExperimentBuilder],
) -> List[Dataset]:
"""
Populate obs and var axes. Filter cells from datasets for each experiment, as obs is built.
"""
logging.info("Build step 3 - Populate obs and var axes - started")

filtered_datasets = populate_obs_axis(assets_path, datasets, experiment_builders)
logging.info(f"({len(filtered_datasets)} of {len(datasets)}) datasets suitable for processing.")

populate_var_axis_and_presence(experiment_builders)

assign_dataset_soma_joinids(filtered_datasets)

# Commit / write var
for e in experiment_builders:
e.commit_axis()
logging.info(f"Experiment {e.name} will contain {e.n_obs} cells from {e.n_datasets} datasets")
logging.info("Build step 3 - Populate obs and var axes - finished")

logging.info("Build step 2 - axis creation - finished")
return top_level_collection, filtered_datasets
return filtered_datasets


def build_step3_create_X_layers(
def build_step4_populate_X_layers(
assets_path: str,
filtered_datasets: List[Dataset],
experiment_builders: List[ExperimentBuilder],
args: argparse.Namespace,
) -> None:
"""
Create and populate all X layers
Populate X layers.
"""
logging.info("Build step 3 - X layer creation - started")
# base_path = args.uri

# Create X layers
for e in experiment_builders:
e.create_X_layers(filtered_datasets)
e.create_joinid_metadata()
logging.info("Build step 4 - Populate X layers - started")

# Process all X data
for eb in reopen_experiment_builders(experiment_builders):
eb.create_X_with_layers()

populate_X_layers(assets_path, filtered_datasets, experiment_builders, args)

# tidy up and finish
for e in experiment_builders:
e.commit_X(consolidate=args.consolidate)
e.commit_presence_matrix(filtered_datasets)
for eb in reopen_experiment_builders(experiment_builders):
eb.populate_presence_matrix()

logging.info("Build step 4 - Populate X layers - finished")


def build_step5_populate_summary_info(
root_collection: soma.Collection,
experiment_builders: List[ExperimentBuilder],
filtered_datasets: List[Dataset],
build_tag: str,
) -> None:
logging.info("Build step 5 - Populate summary info - started")

with soma.Collection.open(root_collection[CENSUS_INFO_NAME].uri, "w", context=SOMA_TileDB_Context()) as census_info:
create_dataset_manifest(census_info, filtered_datasets)
create_census_summary_cell_counts(census_info, [e.census_summary_cell_counts for e in experiment_builders])
create_census_summary(census_info, experiment_builders, build_tag)

logging.info("Build step 3 - X layer creation - finished")
logging.info("Build step 5 - Populate summary info - finished")


def create_args_parser() -> argparse.ArgumentParser:
Expand Down
13 changes: 5 additions & 8 deletions tools/cell_census_builder/census_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import tiledbsoma as soma

from .experiment_builder import ExperimentBuilder, get_summary_stats
from .globals import CENSUS_SCHEMA_VERSION, CENSUS_SUMMARY_NAME, SOMA_TileDB_Context
from .util import uricat
from .globals import CENSUS_SCHEMA_VERSION, CENSUS_SUMMARY_NAME


def create_census_summary(
Expand All @@ -29,9 +28,7 @@ def create_census_summary(
df["soma_joinid"] = range(len(df))

# write to a SOMA dataframe
summary_uri = uricat(info_collection.uri, CENSUS_SUMMARY_NAME)
summary = soma.DataFrame(summary_uri, context=SOMA_TileDB_Context())
summary.create(pa.Schema.from_pandas(df, preserve_index=False), index_column_names=["soma_joinid"])
for batch in pa.Table.from_pandas(df, preserve_index=False).to_batches():
summary.write(batch)
info_collection.set(CENSUS_SUMMARY_NAME, summary, relative=True)
with info_collection.add_new_dataframe(
CENSUS_SUMMARY_NAME, schema=pa.Schema.from_pandas(df, preserve_index=False), index_column_names=["soma_joinid"]
) as summary:
summary.write(pa.Table.from_pandas(df, preserve_index=False))
Loading

0 comments on commit 85bf654

Please sign in to comment.