Skip to content

Commit

Permalink
[r] Index orphaned replicas (#6626, PR #6627)
Browse files Browse the repository at this point in the history
  • Loading branch information
achave11-ucsc committed Nov 9, 2024
2 parents cfe7acf + 9b6cf31 commit 0b3b6b4
Show file tree
Hide file tree
Showing 30 changed files with 703 additions and 276 deletions.
8 changes: 8 additions & 0 deletions scripts/can_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def main(argv):
help='The version of the bundle to can. Required for HCA, ignored for AnVIL.')
parser.add_argument('--table-name',
help='The BigQuery table of the bundle to can. Only applicable for AnVIL.')
parser.add_argument('--batch-prefix',
help='The batch prefix of the bundle to can. Only applicable for AnVIL. '
'Use "null" for non-batched bundle formats.')
parser.add_argument('--output-dir', '-O',
default=os.path.join(config.project_root, 'test', 'indexer', 'data'),
help='The path to the output directory (default: %(default)s).')
Expand All @@ -78,6 +81,11 @@ def parse_fqid_fields(args: argparse.Namespace) -> JSON:
fields = {'uuid': args.uuid, 'version': args.version}
if args.table_name is not None:
fields['table_name'] = args.table_name
batch_prefix = args.batch_prefix
if batch_prefix is not None:
if batch_prefix == 'null':
batch_prefix = None
fields['batch_prefix'] = batch_prefix
return fields


Expand Down
8 changes: 4 additions & 4 deletions scripts/post_deploy_tdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ def verify_source(self,
plugin = self.repository_plugin(catalog)
ref = plugin.resolve_source(str(source_spec))
log.info('TDR client is authorized for API access to %s.', source_spec)
ref = plugin.partition_source(catalog, ref)
prefix = ref.spec.prefix
if config.deployment.is_main:
require(prefix.common == '', source_spec)
if source_spec.prefix is not None:
require(source_spec.prefix.common == '', source_spec)
self.tdr.check_bigquery_access(source_spec)
else:
subgraph_count = len(plugin.list_bundles(ref, prefix.common))
ref = plugin.partition_source(catalog, ref)
subgraph_count = plugin.count_bundles(ref.spec)
require(subgraph_count > 0, 'Common prefix is too long', ref.spec)
require(subgraph_count <= 512, 'Common prefix is too short', ref.spec)

Expand Down
6 changes: 5 additions & 1 deletion src/azul/azulclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,11 @@ def list_bundles(self,
source = plugin.resolve_source(source)
else:
assert isinstance(source, SourceRef), source
return plugin.list_bundles(source, prefix)
log.info('Listing bundles with prefix %r in source %r.', prefix, source)
bundle_fqids = plugin.list_bundles(source, prefix)
log.info('There are %i bundle(s) with prefix %r in source %r.',
len(bundle_fqids), prefix, source)
return bundle_fqids

@property
def sqs(self):
Expand Down
13 changes: 6 additions & 7 deletions src/azul/indexer/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,13 +504,12 @@ class ContributionCoordinates(DocumentCoordinates[E], Generic[E]):
subgraph ("bundle") and represent either the addition of metadata to an
entity or the removal of metadata from an entity.
Contributions produced by
transformers don't specify a catalog, the catalog is supplied when the
contributions are written to the index and it is guaranteed to be the same
for all contributions produced in response to one notification. When
contributions are read back during aggregation, they specify a catalog, the
catalog they were read from. Because of that duality this class has to be
generic in E, the type of EntityReference.
Contributions produced by transformers don't specify a catalog. The catalog
is supplied when the contributions are written to the index and it is
guaranteed to be the same for all contributions produced in response to one
notification. When contributions are read back during aggregation, they
specify a catalog, the catalog they were read from. Because of that duality
this class has to be generic in E, the type of EntityReference.
"""

doc_type: ClassVar[DocumentType] = DocumentType.contribution
Expand Down
3 changes: 2 additions & 1 deletion src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ def index(self, catalog: CatalogName, bundle: Bundle) -> None:
for contributions, replicas in transforms:
tallies.update(self.contribute(catalog, contributions))
self.replicate(catalog, replicas)
self.aggregate(tallies)
if tallies:
self.aggregate(tallies)

def delete(self, catalog: CatalogName, bundle: Bundle) -> None:
"""
Expand Down
3 changes: 2 additions & 1 deletion src/azul/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def _contribution(self,
def _replica(self,
entity: EntityReference,
*,
root_hub: EntityID,
file_hub: EntityID | None,
) -> Replica:
replica_type, contents = self._replicate(entity)
Expand All @@ -144,7 +145,7 @@ def _replica(self,
contents=contents,
# The other hubs will be added when the indexer
# consolidates duplicate replicas.
hub_ids=alist(file_hub))
hub_ids=alist(file_hub, root_hub))

@classmethod
@abstractmethod
Expand Down
7 changes: 4 additions & 3 deletions src/azul/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,9 +615,10 @@ def _lookup_source_id(self, spec: SOURCE_SPEC) -> str:
raise NotImplementedError

@abstractmethod
def _count_subgraphs(self, source: SOURCE_SPEC) -> int:
def count_bundles(self, source: SOURCE_SPEC) -> int:
"""
The total number of subgraphs in the given source, ignoring its prefix.
The total number of subgraphs in the given source. The source's prefix
may be None.
"""
raise NotImplementedError

Expand All @@ -631,7 +632,7 @@ def partition_source(self,
should be appropriate for indexing in the given catalog.
"""
if source.spec.prefix is None:
count = self._count_subgraphs(source.spec)
count = self.count_bundles(source.spec)
is_main = config.deployment.is_main
is_it = catalog in config.integration_test_catalogs
# We use the "lesser" heuristic during IT to avoid indexing an
Expand Down
27 changes: 18 additions & 9 deletions src/azul/plugins/metadata/anvil/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,28 +122,37 @@ class AnvilBundle(Bundle[BUNDLE_FQID], ABC):
# the former to the latter during transformation.
entities: dict[EntityReference, MutableJSON] = attrs.field(factory=dict)
links: set[EntityLink] = attrs.field(factory=set)
orphans: dict[EntityReference, MutableJSON] = attrs.field(factory=dict)

def reject_joiner(self, catalog: CatalogName):
# FIXME: Optimize joiner rejection and re-enable it for AnVIL
# https://github.com/DataBiosphere/azul/issues/5256
pass

def to_json(self) -> MutableJSON:
return {
'entities': {
def serialize_entities(entities):
return {
str(entity_ref): entity
for entity_ref, entity in sorted(self.entities.items())
},
for entity_ref, entity in sorted(entities.items())
}

return {
'entities': serialize_entities(self.entities),
'orphans': serialize_entities(self.orphans),
'links': [link.to_json() for link in sorted(self.links)]
}

@classmethod
def from_json(cls, fqid: BUNDLE_FQID, json_: JSON) -> Self:
def deserialize_entities(json_entities):
return {
EntityReference.parse(entity_ref): entity
for entity_ref, entity in json_entities.items()
}

return cls(
fqid=fqid,
entities={
EntityReference.parse(entity_ref): entity
for entity_ref, entity in json_['entities'].items()
},
links=set(map(EntityLink.from_json, json_['links']))
entities=deserialize_entities(json_['entities']),
links=set(map(EntityLink.from_json, json_['links'])),
orphans=deserialize_entities(json_['orphans'])
)
43 changes: 32 additions & 11 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ def aggregator(cls, entity_type) -> EntityAggregator:
assert False, entity_type

def estimate(self, partition: BundlePartition) -> int:
# Orphans are not considered when deciding whether to partition the
# bundle, but if the bundle is partitioned then each orphan will be
# replicated in a single partition
return sum(map(partial(self._contains, partition), self.bundle.entities))

def transform(self,
Expand All @@ -188,7 +191,8 @@ def _transform(self,
raise NotImplementedError

def _replicate(self, entity: EntityReference) -> tuple[str, JSON]:
return entity.entity_type, self.bundle.entities[entity]
content = ChainMap(self.bundle.entities, self.bundle.orphans)[entity]
return entity.entity_type, content

def _convert_entity_type(self, entity_type: str) -> str:
assert entity_type == 'bundle' or entity_type.startswith('anvil_'), entity_type
Expand Down Expand Up @@ -406,7 +410,10 @@ def _file(self, file: EntityReference) -> MutableJSON:
uuid=file.entity_id)

def _only_dataset(self) -> EntityReference:
return one(self._entities_by_type['anvil_dataset'])
try:
return one(self._entities_by_type['anvil_dataset'])
except ValueError:
return one(o for o in self.bundle.orphans if o.entity_type == 'anvil_dataset')

@cached_property
def _activity_polymorphic_types(self) -> AbstractSet[str]:
Expand Down Expand Up @@ -506,7 +513,9 @@ def _dataset(self, dataset: EntityReference) -> MutableJSON:
return super()._dataset(dataset)

def _list_entities(self) -> Iterable[EntityReference]:
yield self._singleton()
# Suppress contributions for bundles that only contain orphans
if self.bundle.entities:
yield self._singleton()

@abstractmethod
def _singleton(self) -> EntityReference:
Expand Down Expand Up @@ -564,6 +573,23 @@ def _singleton(self) -> EntityReference:
return EntityReference(entity_type='bundle',
entity_id=self.bundle.uuid)

def transform(self,
partition: BundlePartition
) -> Iterable[Contribution | Replica]:
yield from super().transform(partition)
if config.enable_replicas:
# Replicas are only emitted by the file transformer for entities
# that are linked to at least one file. This excludes all orphans,
# and a small number of linked entities, usually from primary
# bundles don't include any files. Some of the replicas we emit here
# will be redundant with those emitted by the file transformer, but
# these will be coalesced by the index service before they are
# written to ElasticSearch.
dataset = self._only_dataset()
for entity in chain(self.bundle.orphans, self.bundle.entities):
if partition.contains(UUID(entity.entity_id)):
yield self._replica(entity, file_hub=None, root_hub=dataset.entity_id)


class DatasetTransformer(SingletonTransformer):

Expand All @@ -574,13 +600,6 @@ def entity_type(cls) -> str:
def _singleton(self) -> EntityReference:
return self._only_dataset()

def _transform(self,
entity: EntityReference
) -> Iterable[Contribution | Replica]:
yield from super()._transform(entity)
if self._is_duos(entity):
yield self._replica(entity, file_hub=None)


class DonorTransformer(BaseTransformer):

Expand Down Expand Up @@ -614,6 +633,7 @@ def _transform(self,
entity: EntityReference
) -> Iterable[Contribution | Replica]:
linked = self._linked_entities(entity)
dataset = self._only_dataset()
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
linked[activity_type]
Expand All @@ -627,7 +647,7 @@ def _transform(self,
)
yield self._contribution(contents, entity.entity_id)
if config.enable_replicas:
yield self._replica(entity, file_hub=entity.entity_id)
yield self._replica(entity, file_hub=entity.entity_id, root_hub=dataset.entity_id)
for linked_entity in linked:
yield self._replica(
linked_entity,
Expand All @@ -637,4 +657,5 @@ def _transform(self,
# hub IDs field empty for datasets and rely on the tenet
# that every file is an implicit hub of its parent dataset.
file_hub=None if linked_entity.entity_type == 'anvil_dataset' else entity.entity_id,
root_hub=dataset.entity_id
)
8 changes: 5 additions & 3 deletions src/azul/plugins/metadata/hca/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -1470,17 +1470,19 @@ def _transform(self,
for entity_type, values in additional_contents.items():
contents[entity_type].extend(values)
file_id = file.ref.entity_id
project_ref = self._api_project.ref
project_id = project_ref.entity_id
yield self._contribution(contents, file_id)
if config.enable_replicas:
yield self._replica(self.api_bundle.ref, file_hub=file_id)
yield self._replica(self.api_bundle.ref, file_hub=file_id, root_hub=project_id)
# Projects are linked to every file in their snapshot,
# making an explicit list of hub IDs for the project both
# redundant and impractically large. Therefore, we leave the
# hub IDs field empty for projects and rely on the tenet
# that every file is an implicit hub of its parent project.
yield self._replica(self._api_project.ref, file_hub=None)
yield self._replica(project_ref, file_hub=None, root_hub=project_id)
for linked_entity in visitor.entities:
yield self._replica(linked_entity, file_hub=file_id)
yield self._replica(linked_entity, file_hub=file_id, root_hub=project_id)

def matrix_stratification_values(self, file: api.File) -> JSON:
"""
Expand Down
32 changes: 16 additions & 16 deletions src/azul/plugins/repository/canned/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
from furl import (
furl,
)
from more_itertools import (
ilen,
)

from azul import (
CatalogName,
Expand Down Expand Up @@ -61,9 +64,6 @@
from azul.types import (
JSON,
)
from azul.uuids import (
validate_uuid_prefix,
)
from humancellatlas.data.metadata.helpers.staging_area import (
CannedStagingAreaFactory,
StagingArea,
Expand Down Expand Up @@ -163,27 +163,27 @@ def staging_area(self, url: str) -> StagingArea:
ref)
return factory.load_staging_area(path)

def _count_subgraphs(self, source: SOURCE_SPEC) -> int:
def count_bundles(self, source: SOURCE_SPEC) -> int:
staging_area = self.staging_area(source.spec.name)
return len(staging_area.links)
return ilen(
links_id
for links_id in staging_area.links
if source.prefix is None or links_id.startswith(source.prefix.common)
)

def list_bundles(self,
source: CannedSourceRef,
prefix: str
) -> list[CannedBundleFQID]:
self._assert_source(source)
validate_uuid_prefix(prefix)
log.info('Listing bundles with prefix %r in source %r.', prefix, source)
bundle_fqids = []
staging_area = self.staging_area(source.spec.name)
for link in staging_area.links.values():
if link.uuid.startswith(prefix):
bundle_fqids.append(CannedBundleFQID(source=source,
uuid=link.uuid,
version=link.version))
log.info('There are %i bundle(s) with prefix %r in source %r.',
len(bundle_fqids), prefix, source)
return bundle_fqids
return [
CannedBundleFQID(source=source,
uuid=link.uuid,
version=link.version)
for link in staging_area.links.values()
if link.uuid.startswith(prefix)
]

def fetch_bundle(self, bundle_fqid: CannedBundleFQID) -> CannedBundle:
self._assert_source(bundle_fqid.source)
Expand Down
2 changes: 1 addition & 1 deletion src/azul/plugins/repository/dss/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def sources(self) -> AbstractSet[SimpleSourceSpec]:
def _lookup_source_id(self, spec: SimpleSourceSpec) -> str:
return DSSSourceRef.id_from_spec(spec)

def _count_subgraphs(self, source: SimpleSourceSpec) -> NoReturn:
def count_bundles(self, source: SimpleSourceSpec) -> NoReturn:
assert False, 'DSS is EOL'

def list_sources(self,
Expand Down
18 changes: 0 additions & 18 deletions src/azul/plugins/repository/tdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,6 @@ def _drs_client(cls,
def _lookup_source_id(self, spec: TDRSourceSpec) -> str:
return self.tdr.lookup_source(spec)

def list_bundles(self,
source: TDRSourceRef,
prefix: str
) -> list[TDRBundleFQID]:
self._assert_source(source)
log.info('Listing bundles with prefix %r in source %r.', prefix, source)
bundle_fqids = self._list_bundles(source, prefix)
log.info('There are %i bundle(s) with prefix %r in source %r.',
len(bundle_fqids), prefix, source)
return bundle_fqids

def fetch_bundle(self, bundle_fqid: TDRBundleFQID) -> TDR_BUNDLE:
self._assert_source(bundle_fqid.source)
now = time.time()
Expand All @@ -223,13 +212,6 @@ def _run_sql(self, query):
def _full_table_name(self, source: TDRSourceSpec, table_name: str) -> str:
return source.qualify_table(table_name)

@abstractmethod
def _list_bundles(self,
source: TDRSourceRef,
prefix: str
) -> list[TDRBundleFQID]:
raise NotImplementedError

@abstractmethod
def _emulate_bundle(self, bundle_fqid: TDRBundleFQID) -> TDR_BUNDLE:
raise NotImplementedError
Expand Down
Loading

0 comments on commit 0b3b6b4

Please sign in to comment.