Skip to content

Commit

Permalink
Include orphans in manifest when filtering by only project/dataset (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hannes-ucsc committed Nov 10, 2024
2 parents 0b3b6b4 + dfb41a2 commit 17dadfa
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 48 deletions.
8 changes: 8 additions & 0 deletions src/azul/indexer/document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ def metadata_plugin(self, catalog: CatalogName) -> MetadataPlugin:
def aggregate_class(self, catalog: CatalogName) -> Type[Aggregate]:
return self.metadata_plugin(catalog).aggregate_class()

@property
def always_limit_access(self) -> bool:
"""
True if access restrictions are enforced unconditionally. False, if the
filter stage is allowed to weaken them, e.g., based on the entity type.
"""
return True

def transformer_types(self,
catalog: CatalogName
) -> Iterable[Type[Transformer]]:
Expand Down
32 changes: 23 additions & 9 deletions src/azul/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,25 @@ def order(self) -> str:

@attr.s(auto_attribs=True, frozen=True, kw_only=True)
class SpecialFields:
"""
Azul defines a number of fields in each /index/{entity_type} response that
are synthetic (not directly taken from the metadata) and/or are used
internally. Their naming is inconsistent between metadata plugin
implementations. This class encapsulates the naming of these fields so that
we don't need to litter the source with strings literals and conditionals.
It is an incomplete abstraction in that it does not express the name of the
inner entity the field is a property of in the /index/{entity_type}
response. In that way, the values of the attributes of instances of this
class are more akin to a facet name, rather than a field name. However, not
every field represented here is actually a facet.
"""
accessible: ClassVar[FieldName] = 'accessible'
source_id: FieldName
source_spec: FieldName
bundle_uuid: FieldName
bundle_version: FieldName
implicit_hub_id: FieldName


class ManifestFormat(Enum):
Expand Down Expand Up @@ -409,6 +423,9 @@ def _field_mapping(self) -> _FieldMapping:
@property
@abstractmethod
def special_fields(self) -> SpecialFields:
"""
See :py:class:`SpecialFields`.
"""
raise NotImplementedError

@property
Expand All @@ -433,8 +450,8 @@ def manifest_config(self) -> ManifestConfig:
raise NotImplementedError

def verbatim_pfb_schema(self,
replicas: Iterable[JSON]
) -> tuple[Iterable[JSON], Sequence[str], JSON]:
replicas: list[JSON]
) -> list[JSON]:
"""
Generate a PFB schema for the verbatim manifest. The default,
metadata-agnostic implementation loads all replica documents into memory
Expand All @@ -445,17 +462,14 @@ def verbatim_pfb_schema(self,
:param replicas: The replica documents to be described by the PFB schema
:return: a triple of
1. the same set of replicas passed to this method
2. the set of entity types defined by the PFB schema
3. a PFB schema describing the provided replicas
:return: a tuple of
1. the set of entity types defined by the PFB schema
2. a PFB schema describing the provided replicas
"""
from azul.service import (
avro_pfb,
)
replicas = list(replicas)
replica_types, pfb_schema = avro_pfb.pfb_schema_from_replicas(replicas)
return replicas, replica_types, pfb_schema
return avro_pfb.pfb_schema_from_replicas(replicas)

@abstractmethod
def document_slice(self, entity_type: str) -> DocumentSlice | None:
Expand Down
30 changes: 18 additions & 12 deletions src/azul/plugins/metadata/anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@
AnvilSearchResponseStage,
AnvilSummaryResponseStage,
)
from azul.service.avro_pfb import (
avro_pfb_schema,
)
from azul.service.manifest_service import (
ManifestFormat,
)
Expand Down Expand Up @@ -246,7 +243,8 @@ def special_fields(self) -> SpecialFields:
return SpecialFields(source_id='source_id',
source_spec='source_spec',
bundle_uuid='bundle_uuid',
bundle_version='bundle_version')
bundle_version='bundle_version',
implicit_hub_id='datasets.dataset_id')

@property
def implicit_hub_type(self) -> str:
Expand Down Expand Up @@ -327,16 +325,24 @@ def recurse(mapping: MetadataPlugin._FieldMapping, path: FieldPath):
return result

def verbatim_pfb_schema(self,
replicas: Iterable[JSON]
) -> tuple[Iterable[JSON], Sequence[str], JSON]:
entity_schemas = []
entity_types = []
for table_schema in sorted(anvil_schema['tables'], key=itemgetter('name')):
table_name = table_schema['name']
replicas: list[JSON]
) -> list[JSON]:
table_schemas_by_name = {
schema['name']: schema
for schema in anvil_schema['tables']
}
non_schema_replicas = [
r for r in replicas
if r['replica_type'] not in table_schemas_by_name
]
# For tables not described by the AnVIL schema, fall back to building
# their PFB schema dynamically from the shapes of the replicas
entity_schemas = super().verbatim_pfb_schema(non_schema_replicas)
# For the rest, use the AnVIL schema as the basis of the PFB schema
for table_name, table_schema in table_schemas_by_name.items():
# FIXME: Improve handling of DUOS replicas
# https://github.com/DataBiosphere/azul/issues/6139
is_duos_type = table_name == 'anvil_dataset'
entity_types.append(table_name)
field_schemas = [
self._pfb_schema_from_anvil_column(table_name=table_name,
column_name='datarepo_row_id',
Expand Down Expand Up @@ -369,7 +375,7 @@ def verbatim_pfb_schema(self,
'type': 'record',
'fields': field_schemas
})
return replicas, entity_types, avro_pfb_schema(entity_schemas)
return entity_schemas

def _pfb_schema_from_anvil_column(self,
*,
Expand Down
1 change: 1 addition & 0 deletions src/azul/plugins/metadata/anvil/service/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@

class AnvilFilterStage(FilterStage):

@property
def _limit_access(self) -> bool:
return self.entity_type != 'datasets'
3 changes: 2 additions & 1 deletion src/azul/plugins/metadata/hca/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ def special_fields(self) -> SpecialFields:
return SpecialFields(source_id='sourceId',
source_spec='sourceSpec',
bundle_uuid='bundleUuid',
bundle_version='bundleVersion')
bundle_version='bundleVersion',
implicit_hub_id='projectId')

@property
def implicit_hub_type(self) -> str:
Expand Down
1 change: 1 addition & 0 deletions src/azul/plugins/metadata/hca/service/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@

class HCAFilterStage(FilterStage):

@property
def _limit_access(self) -> bool:
return self.entity_type != 'projects'
10 changes: 3 additions & 7 deletions src/azul/service/avro_pfb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
ClassVar,
MutableSet,
Self,
Sequence,
)
from uuid import (
UUID,
Expand Down Expand Up @@ -315,18 +314,15 @@ def pfb_schema_from_field_types(field_types: FieldTypes) -> JSON:
return avro_pfb_schema(entity_schemas)


def pfb_schema_from_replicas(replicas: Iterable[JSON]
) -> tuple[Sequence[str], JSON]:
schemas_by_replica_type = {}
def pfb_schema_from_replicas(replicas: Iterable[JSON]) -> list[JSON]:
schemas_by_replica_type: dict[str, MutableJSON] = {}
for replica in replicas:
replica_type, replica_contents = replica['replica_type'], replica['contents']
_update_replica_schema(schema=schemas_by_replica_type,
path=(replica_type,),
key=replica_type,
value=replica_contents)
schemas_by_replica_type = sorted(schemas_by_replica_type.items())
keys, values = zip(*schemas_by_replica_type)
return keys, avro_pfb_schema(values)
return list(schemas_by_replica_type.values())


def avro_pfb_schema(azul_avro_schema: Iterable[JSON]) -> JSON:
Expand Down
8 changes: 6 additions & 2 deletions src/azul/service/elasticsearch_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,18 @@ def process_response(self, response: Response) -> Response:

@cached_property
def prepared_filters(self) -> TranslatedFilters:
filters_json = self.filters.reify(self.plugin, limit_access=self._limit_access())
limit_access = self.service.always_limit_access or self._limit_access
filters_json = self.filters.reify(self.plugin, limit_access=limit_access)
return self._translate_filters(filters_json)

@property
@abstractmethod
def _limit_access(self) -> bool:
"""
Whether to enforce the managed access controls during filter
reification.
reification, provided that the service allows such conditional
enforcement of access. If it doesn't, the return value should be
ignored, and access must be enforced unconditionally.
"""
raise NotImplementedError

Expand Down
19 changes: 15 additions & 4 deletions src/azul/service/manifest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,7 @@ class VerbatimManifestGenerator(FileBasedManifestGenerator, metaclass=ABCMeta):

@property
def entity_type(self) -> str:
return 'files'
return self.implicit_hub_type if self.include_orphans else 'files'

@property
def included_fields(self) -> list[FieldPath]:
Expand All @@ -2001,6 +2001,11 @@ def included_fields(self) -> list[FieldPath]:
def implicit_hub_type(self) -> str:
return self.service.metadata_plugin(self.catalog).implicit_hub_type

@property
def include_orphans(self) -> bool:
special_fields = self.service.metadata_plugin(self.catalog).special_fields
return self.filters.explicit.keys() == {special_fields.implicit_hub_id}

@attrs.frozen(kw_only=True)
class ReplicaKeys:
"""
Expand All @@ -2019,8 +2024,11 @@ def _replica_keys(self) -> Iterable[ReplicaKeys]:
hub_type = self.implicit_hub_type
request = self._create_request()
for hit in request.scan():
replica_id = one(hit['contents'][hub_type])['document_id']
if self.entity_type != hub_type:
replica_id = one(replica_id)
yield self.ReplicaKeys(hub_id=hit['entity_id'],
replica_id=one(one(hit['contents'][hub_type])['document_id']))
replica_id=replica_id)

def _all_replicas(self) -> Iterable[JSON]:
emitted_replica_ids = set()
Expand Down Expand Up @@ -2100,8 +2108,11 @@ def format(cls) -> ManifestFormat:

def create_file(self) -> tuple[str, Optional[str]]:
plugin = self.service.metadata_plugin(self.catalog)
replicas = self._all_replicas()
replicas, replica_types, pfb_schema = plugin.verbatim_pfb_schema(replicas)
replicas = list(self._all_replicas())
replica_schemas = plugin.verbatim_pfb_schema(replicas)
replica_schemas.sort(key=itemgetter('name'))
replica_types = [s['name'] for s in replica_schemas]
pfb_schema = avro_pfb.avro_pfb_schema(replica_schemas)
pfb_metadata_entity = avro_pfb.pfb_metadata_entity(replica_types, links=False)

def pfb_entities():
Expand Down
4 changes: 4 additions & 0 deletions src/azul/service/repository_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,3 +374,7 @@ def _hit_to_doc(hit: Hit) -> JSON:
if file_version is not None:
assert file_version == file['version']
return file

@property
def always_limit_access(self) -> bool:
return False
3 changes: 2 additions & 1 deletion test/service/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class TestFilterReification(AzulTestCase):
source_id='sourceId',
source_spec=MagicMock(),
bundle_uuid=MagicMock(),
bundle_version=MagicMock()
bundle_version=MagicMock(),
implicit_hub_id=MagicMock()
)

@property
Expand Down
43 changes: 32 additions & 11 deletions test/service/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2087,18 +2087,39 @@ def test_compact_manifest(self):
self._assert_tsv(expected, response)

def test_verbatim_jsonl_manifest(self):
response = self._get_manifest(ManifestFormat.verbatim_jsonl, filters={})
self.assertEqual(200, response.status_code)
expected = {
# Consolidate entities with the same replica (i.e. datasets)
json_hash(entity).digest(): {
'type': entity_ref.entity_type,
'value': entity,
all_entities, linked_entities = self._canned_entities()
cases = [
({}, False),
({'datasets.title': {'is': ['ANVIL_CMG_UWASH_DS_BDIS']}}, False),
# Orphans should be included only when filtering by dataset ID
({'datasets.dataset_id': {'is': ['52ee7665-7033-63f2-a8d9-ce8e32666739']}}, True)
]
for filters, expect_orphans in cases:
with self.subTest(filters=filters):
response = self._get_manifest(ManifestFormat.verbatim_jsonl, filters=filters)
self.assertEqual(200, response.status_code)
expected_rows = list(all_entities if expect_orphans else linked_entities)
self._assert_jsonl(expected_rows, response)

def _canned_entities(self):

def hash_entities(entities: dict[EntityReference, JSON]) -> dict[str, JSON]:
return {
json_hash(contents).digest(): {
'type': ref.entity_type,
'value': contents
}
for ref, contents in entities.items()
}
for bundle in self.bundles()
for entity_ref, entity in self._load_canned_bundle(bundle).entities.items()
}.values()
self._assert_jsonl(list(expected), response)

linked_entities_by_hash, all_entities_by_hash = {}, {}
for bundle_fqid in self.bundles():
bundle = self._load_canned_bundle(bundle_fqid)
linked_entities_by_hash.update(hash_entities(bundle.entities))
all_entities_by_hash.update(hash_entities(bundle.orphans))
all_entities_by_hash.update(linked_entities_by_hash)

return all_entities_by_hash.values(), linked_entities_by_hash.values()

def test_verbatim_pfb_manifest(self):
response = self._get_manifest(ManifestFormat.verbatim_pfb, filters={})
Expand Down
3 changes: 2 additions & 1 deletion test/service/test_request_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def special_fields(self) -> SpecialFields:
return SpecialFields(source_id='sourceId',
source_spec='sourceSpec',
bundle_uuid='bundleUuid',
bundle_version='bundleVersion')
bundle_version='bundleVersion',
implicit_hub_id='projectId')

@property
def _field_mapping(self) -> MetadataPlugin._FieldMapping:
Expand Down

0 comments on commit 17dadfa

Please sign in to comment.