-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use GET /datasets endpoint for builder #378
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,3 @@ | ||
import concurrent.futures | ||
import csv | ||
import io | ||
import logging | ||
|
@@ -53,81 +52,42 @@ def load_manifest_from_CxG() -> List[Dataset]: | |
logging.info("Loading manifest from CELLxGENE data portal...") | ||
|
||
# Load all collections and extract dataset_id | ||
collections = fetch_json(f"{CXG_BASE_URI}curation/v1/collections") | ||
assert isinstance(collections, list), "Unexpected REST API response, /curation/v1/collections" | ||
datasets = { | ||
dataset["dataset_id"]: { | ||
"collection_id": collection["collection_id"], | ||
"collection_name": null_to_empty_str(collection["name"]), | ||
"collection_doi": null_to_empty_str(collection["doi"]), | ||
"dataset_id": dataset["dataset_id"], | ||
} | ||
for collection in collections | ||
for dataset in collection["datasets"] | ||
} | ||
logging.info(f"Found {len(datasets)} datasets, in {len(collections)} collections") | ||
|
||
# load per-dataset schema version | ||
with concurrent.futures.ThreadPoolExecutor() as tp: | ||
dataset_metadata = tp.map( | ||
lambda d: fetch_json( | ||
f"{CXG_BASE_URI}curation/v1/collections/{d['collection_id']}/datasets/{d['dataset_id']}" | ||
), | ||
datasets.values(), | ||
) | ||
for d in dataset_metadata: | ||
assert ( | ||
isinstance(d, dict) and "dataset_id" in d | ||
), "Unexpected REST API response, /curation/v1/collections/.../datasets/..." | ||
datasets[d["dataset_id"]].update( | ||
{ | ||
"schema_version": d["schema_version"], | ||
"dataset_title": null_to_empty_str(d["title"]), | ||
} | ||
) | ||
datasets = fetch_json(f"{CXG_BASE_URI}curation/v1/datasets") | ||
assert isinstance(datasets, list), "Unexpected REST API response, /curation/v1/datasets" | ||
|
||
# Remove any datasets that don't match our target schema version | ||
obsolete_dataset_ids = [id for id in datasets if datasets[id]["schema_version"] not in CXG_SCHEMA_VERSION_IMPORT] | ||
if len(obsolete_dataset_ids) > 0: | ||
logging.warning(f"Dropping {len(obsolete_dataset_ids)} datasets due to unsupported schema version") | ||
for id in obsolete_dataset_ids: | ||
logging.info(f"Dropping dataset_id {id} due to schema version.") | ||
datasets.pop(id) | ||
|
||
# Grab the asset URI for each dataset | ||
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as tp: | ||
dataset_assets = tp.map( | ||
lambda d: ( | ||
d["dataset_id"], | ||
fetch_json( | ||
f"{CXG_BASE_URI}curation/v1/collections/{d['collection_id']}/datasets/{d['dataset_id']}/assets" | ||
), | ||
), | ||
datasets.values(), | ||
) | ||
no_asset_found = [] | ||
for dataset_id, assets in dataset_assets: | ||
assert isinstance( | ||
assets, list | ||
), "Unexpected REST API response, /curation/v1/collections/.../datasets/.../assets" | ||
response = [] | ||
|
||
for dataset in datasets: | ||
dataset_id = dataset["dataset_id"] | ||
schema_version = dataset["schema_version"] | ||
|
||
if schema_version not in CXG_SCHEMA_VERSION_IMPORT: | ||
logging.warning(f"Dropping dataset {dataset_id} due to unsupported schema version") | ||
continue | ||
|
||
assets = dataset.get("assets", []) | ||
assets_h5ad = [a for a in assets if a["filetype"] == "H5AD"] | ||
if len(assets_h5ad) == 0: | ||
if not assets_h5ad: | ||
logging.error(f"Unable to find H5AD asset for dataset id {dataset_id} - ignoring this dataset") | ||
no_asset_found.append(dataset_id) | ||
else: | ||
asset = assets_h5ad[0] | ||
datasets[dataset_id].update( | ||
{ | ||
"corpora_asset_h5ad_uri": asset["presigned_url"], | ||
"asset_h5ad_filesize": asset["filesize"], | ||
} | ||
) | ||
|
||
# drop any datasets where we could not find an asset | ||
for id in no_asset_found: | ||
datasets.pop(id, None) | ||
|
||
return [Dataset(**d) for d in datasets.values()] | ||
continue | ||
asset_h5ad_uri = assets[0]["url"] | ||
asset_h5ad_filesize = assets[0]["filesize"] | ||
|
||
d = Dataset( | ||
dataset_id=dataset_id, | ||
corpora_asset_h5ad_uri=asset_h5ad_uri, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @danieljhegeman title is a required (non-null) field, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like a good time to rename |
||
dataset_title=null_to_empty_str(dataset["title"]), | ||
collection_id=dataset["collection_id"], | ||
collection_name=null_to_empty_str(dataset["collection_name"]), | ||
collection_doi=null_to_empty_str(dataset["collection_doi"]), | ||
asset_h5ad_filesize=asset_h5ad_filesize, | ||
schema_version=schema_version, | ||
) | ||
response.append(d) | ||
|
||
logging.info(f"Found {len(datasets)} datasets") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. out-of-scope, but it would be nice to report all the warning type counts here (datasets excluded for schema, missing h5ad asset) |
||
|
||
return response | ||
|
||
|
||
def load_manifest(manifest_fp: Optional[Union[str, io.TextIOBase]] = None) -> List[Dataset]: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,7 @@ | ||
import pathlib | ||
import re | ||
from unittest.mock import patch | ||
|
||
from cellxgene_census_builder.build_soma.manifest import CXG_BASE_URI, load_manifest | ||
from cellxgene_census_builder.build_soma.manifest import load_manifest | ||
|
||
|
||
def test_load_manifest_from_file(tmp_path: pathlib.Path, manifest_csv: str) -> None: | ||
|
@@ -42,23 +41,26 @@ def test_load_manifest_from_cxg() -> None: | |
If no parameters are specified, `load_manifest` should load the dataset list from Discover API. | ||
""" | ||
with patch("cellxgene_census_builder.build_soma.manifest.fetch_json") as m: | ||
|
||
def mock_call_fn(uri): # type: ignore | ||
if uri == f"{CXG_BASE_URI}curation/v1/collections": | ||
return [ | ||
{ | ||
"collection_id": "collection_1", | ||
"doi": None, | ||
"name": "1", | ||
"datasets": [{"dataset_id": "dataset_id_1"}, {"dataset_id": "dataset_id_2"}], | ||
} | ||
] | ||
elif m := re.match(rf"{CXG_BASE_URI}curation/v1/collections/(\w+)/datasets/(\w+)$", uri): | ||
return {"dataset_id": m[2], "schema_version": "3.0.0", "title": f"dataset #{m[2]}"} | ||
elif m := re.match(rf"{CXG_BASE_URI}curation/v1/collections/(\w+)/datasets/(\w+)/assets$", uri): | ||
return [{"filetype": "H5AD", "filesize": 1024, "presigned_url": f"https://fake.url/{m[2]}.h5ad"}] | ||
|
||
m.side_effect = mock_call_fn | ||
m.return_value = [ | ||
{ | ||
"dataset_id": "dataset_id_1", | ||
"collection_id": "collection_1", | ||
"collection_name": "1", | ||
"collection_doi": None, | ||
"title": "dataset #1", | ||
"schema_version": "3.0.0", | ||
"assets": [{"filesize": 123, "filetype": "H5AD", "url": "https://fake.url/dataset_id_1.h5ad"}], | ||
}, | ||
{ | ||
"dataset_id": "dataset_id_2", | ||
"collection_id": "collection_1", | ||
"collection_name": "1", | ||
"collection_doi": None, | ||
"title": "dataset #2", | ||
"schema_version": "3.0.0", | ||
"assets": [{"filesize": 456, "filetype": "H5AD", "url": "https://fake.url/dataset_id_2.h5ad"}], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. worth adding a non-H5AD asset as well, either here or in an appropriate test |
||
}, | ||
] | ||
|
||
manifest = load_manifest(None) | ||
assert len(manifest) == 2 | ||
|
@@ -73,27 +75,26 @@ def test_load_manifest_from_cxg_excludes_datasets_with_old_schema() -> None: | |
`load_manifest` should exclude datasets that do not have a current schema version. | ||
""" | ||
with patch("cellxgene_census_builder.build_soma.manifest.fetch_json") as m: | ||
|
||
def mock_call_fn(uri): # type: ignore | ||
if uri == f"{CXG_BASE_URI}curation/v1/collections": | ||
return [ | ||
{ | ||
"collection_id": "collection_1", | ||
"doi": None, | ||
"name": "1", | ||
"datasets": [{"dataset_id": "dataset_id_1"}, {"dataset_id": "dataset_id_2"}], | ||
} | ||
] | ||
elif m := re.match(rf"{CXG_BASE_URI}curation/v1/collections/(\w+)/datasets/(\w+)$", uri): | ||
return { | ||
"dataset_id": m[2], | ||
"schema_version": "3.0.0" if m[2] == "dataset_id_1" else "2.0.0", | ||
"title": f"dataset #{m[2]}", | ||
} | ||
elif m := re.match(rf"{CXG_BASE_URI}curation/v1/collections/(\w+)/datasets/(\w+)/assets$", uri): | ||
return [{"filetype": "H5AD", "filesize": 1024, "presigned_url": f"https://fake.url/{m[2]}.h5ad"}] | ||
|
||
m.side_effect = mock_call_fn | ||
m.return_value = [ | ||
{ | ||
"dataset_id": "dataset_id_1", | ||
"collection_id": "collection_1", | ||
"collection_name": "1", | ||
"collection_doi": None, | ||
"title": "dataset #1", | ||
"schema_version": "3.0.0", | ||
"assets": [{"filesize": 123, "filetype": "H5AD", "url": "https://fake.url/dataset_id_1.h5ad"}], | ||
}, | ||
{ | ||
"dataset_id": "dataset_id_2", | ||
"collection_id": "collection_1", | ||
"collection_name": "1", | ||
"collection_doi": None, | ||
"title": "dataset #2", | ||
"schema_version": "2.0.0", # Old schema version | ||
"assets": [{"filesize": 456, "filetype": "H5AD", "url": "https://fake.url/dataset_id_2.h5ad"}], | ||
}, | ||
] | ||
|
||
manifest = load_manifest(None) | ||
assert len(manifest) == 1 | ||
|
@@ -106,25 +107,26 @@ def test_load_manifest_from_cxg_excludes_datasets_with_no_assets() -> None: | |
`load_manifest` should exclude datasets that do not have assets | ||
""" | ||
with patch("cellxgene_census_builder.build_soma.manifest.fetch_json") as m: | ||
|
||
def mock_call_fn(uri): # type: ignore | ||
if uri == f"{CXG_BASE_URI}curation/v1/collections": | ||
return [ | ||
{ | ||
"collection_id": "collection_1", | ||
"doi": None, | ||
"name": "1", | ||
"datasets": [{"dataset_id": "dataset_id_1"}, {"dataset_id": "dataset_id_2"}], | ||
} | ||
] | ||
elif m := re.match(rf"{CXG_BASE_URI}curation/v1/collections/(\w+)/datasets/(\w+)$", uri): | ||
return {"dataset_id": m[2], "schema_version": "3.0.0", "title": f"dataset #{m[2]}"} | ||
elif m := re.match(rf"{CXG_BASE_URI}curation/v1/collections/(\w+)/datasets/dataset_id_1/assets$", uri): | ||
return [{"filetype": "H5AD", "filesize": 1024, "presigned_url": "https://fake.url/dataset_id_1.h5ad"}] | ||
elif m := re.match(rf"{CXG_BASE_URI}curation/v1/collections/(\w+)/datasets/dataset_id_2/assets$", uri): | ||
return [] | ||
|
||
m.side_effect = mock_call_fn | ||
m.return_value = [ | ||
{ | ||
"dataset_id": "dataset_id_1", | ||
"collection_id": "collection_1", | ||
"collection_name": "1", | ||
"collection_doi": None, | ||
"title": "dataset #1", | ||
"schema_version": "3.0.0", | ||
"assets": [{"filesize": 123, "filetype": "H5AD", "url": "https://fake.url/dataset_id_1.h5ad"}], | ||
}, | ||
{ | ||
"dataset_id": "dataset_id_2", | ||
"collection_id": "collection_1", | ||
"collection_name": "1", | ||
"collection_doi": None, | ||
"title": "dataset #2", | ||
"schema_version": "3.0.0", | ||
"assets": [], | ||
}, | ||
] | ||
|
||
manifest = load_manifest(None) | ||
assert len(manifest) == 1 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worth asserting that
len(assets) == 1
as a sanity check? Or at least logging this unexpected case as a warning.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a
logging.error
and acontinue
.