Skip to content

Commit

Permalink
In global discovery, now removing services that are served-by some other
Browse files Browse the repository at this point in the history
service.

Also, improving discovery logging a bit

Also, adding an admonition to not use utils.testing outside of pyVO.
(which attemtps to address
astropy#470 (review))

Also, removing stale test inputs for global discovery.
  • Loading branch information
msdemlei committed Feb 5, 2024
1 parent bf6ca82 commit e7a1e53
Show file tree
Hide file tree
Showing 20 changed files with 127 additions and 198 deletions.
4 changes: 4 additions & 0 deletions docs/utils/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ Helpers for Testing (`pyvo.utils.testing`)
******************************************

This package contains a few helpers to make testing pyvo code simpler.
This is *not* intended to be used by user code or other libraries at
this point; the API might change at any time depending on the testing
needs of pyVO itself, and this documentation (mainly) addresses pyVO
developers.


The LearnableRequestMocker
Expand Down
2 changes: 1 addition & 1 deletion pyvo/discover/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
Various functions dealing with global data disovery.
"""

from .image import images_globally
from .image import images_globally, ImageDiscoverer
115 changes: 88 additions & 27 deletions pyvo/discover/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
import functools

from astropy import units as u
from astropy import table
from astropy import time
from astropy.coordinates import SkyCoord

from ..dam import obscore
from .. import dal
from .. import registry
from ..registry import regtap


# imports for type hints
from typing import List, Optional, Set, Tuple
from typing import Callable, List, Optional, Set, Tuple
from astropy.units.quantity import Quantity


Expand All @@ -38,6 +40,7 @@ class Queriable:
def __init__(self, res_rec):
self.res_rec = res_rec
self.ivoid = res_rec.ivoid
self.title = self.res_rec.res_title

def __str__(self):
return f"<{self.ivoid}>"
Expand Down Expand Up @@ -115,7 +118,7 @@ def _clean_for(records: List[Queriable], ivoids_to_remove: Set[str]):
return [r for r in records if r.ivoid not in ivoids_to_remove]


class _ImageDiscoverer:
class ImageDiscoverer:
"""A management class for VO global image discovery.
This encapsulates all of constraints, service lists, results, and
Expand All @@ -125,7 +128,7 @@ class _ImageDiscoverer:
The normal usage is do call discover_services(), which will locate
all VO services that may have relevant data. Alternatively, call
set_services(registry_results) with some result of a registry.search()
call. _ImageDiscoverer will then pick capabilities it can use out
call. ImageDiscoverer will then pick capabilities it can use out
of the resource records. Records without usable capabilities are
silently ignored.
Expand All @@ -145,7 +148,9 @@ class _ImageDiscoverer:
radius = None

def __init__(self,
space=None, spectrum=None, time=None, inclusive=False):
space=None, spectrum=None, time=None,
inclusive=False,
watcher=None):
if space:
self.center = (space[0], space[1])
self.radius = space[2]
Expand All @@ -158,9 +163,24 @@ def __init__(self,

self.inclusive = inclusive
self.results: List[obscore.ObsCoreMetadata] = []
self.log: List[str] = []
self.watcher = watcher
self.log_messages: List[str] = []
self.sia1_recs, self.sia2_recs, self.obscore_recs = [], [], []

def _info(self, message: str) -> None:
"""sends message to our watcher (if there is any)
"""
if self.watcher is not None:
self.watcher(message)

def _log(self, message: str) -> None:
"""logs message.
This will also do whatever _info does.
"""
self.log_messages.append(message)
self._info(message)

def _purge_redundant_services(self):
"""removes services querying data already covered by more capable
services from our current services lists.
Expand All @@ -169,12 +189,38 @@ def ids(recs):
return set(r.ivoid for r in recs)

self.sia1_recs = _clean_for(self.sia1_recs,
ids(self.sia2_recs)|ids(self.obscore_recs))
ids(self.sia2_recs) | ids(self.obscore_recs))
self.sia2_recs = _clean_for(self.sia2_recs, ids(self.obscore_recs))

# TODO: use futher heuristics to further cut down on dupes:
# Use relationships. I think we should tell people to use
# IsServiceFor for (say) SIA2 services built on top of TAP services.
# In addition, now throw out all services that have an
# IsServedBy relationship to another service we will also
# query. That's particularly valuable if there are large
# obscore services covering data from many SIA1 services.

ids_present = table.Table([
table.Column(name="id",
data=list(ids(self.sia1_recs)
| ids(self.sia2_recs)
| ids(self.obscore_recs)),
description="ivoids of candiate services",
meta={"ucd": "meta.ref.ivoid"}),])
services_for = regtap.get_RegTAP_service().run_sync(
"""SELECT ivoid, related_id
FROM rr.relationship
JOIN tap_upload.ids AS leftids ON (ivoid=leftids.id)
JOIN tap_upload.ids AS rightids ON (related_id=rightids.id)
WHERE relationship_type='isservedby'
""", uploads={'ids': ids_present})

for rec in services_for:
self._log(f"Skipping {rec['ivoid']} because"
f" it is served by {rec['related_id']}")

collections_to_remove = set(r["ivoid"] for r in services_for)
self.sia1_recs = _clean_for(self.sia1_recs, collections_to_remove)
self.sia2_recs = _clean_for(self.sia2_recs, collections_to_remove)
self.obscore_recs = _clean_for(self.obscore_recs, collections_to_remove)


def discover_services(self):
"""fills the X_recs attributes with resources declaring coverage
Expand Down Expand Up @@ -228,6 +274,8 @@ def set_services(self,
self.sia1_recs.append(Queriable(rsc))
# else ignore this record

self._purge_redundant_services()

def _query_one_sia1(self, rec: Queriable):
"""runs our query against a SIA1 capability of rec.
Expand All @@ -252,12 +300,17 @@ def non_spatial_filter(sia1_rec):
return False
return True

self._info("Querying SIA1 {}...".format(rec.title))
svc = rec.res_rec.get_service("sia")
self.results.extend(
ImageFound.from_sia1_recs(
found = list(ImageFound.from_sia1_recs(
svc.search(
pos=self.center, size=self.radius, intersect='overlaps'),
non_spatial_filter))
self._log("SIA1 {} {} records".format(
rec.title,
len(found)))

self.results.extend(found)

def _query_sia1(self):
"""runs the SIA1 part of our discovery.
Expand All @@ -266,20 +319,21 @@ def _query_sia1(self):
limitations of SIA1.
"""
if self.center is None:
self.log.append("SIA1 service skipped do to missing space"
self._log("SIA1 service skipped do to missing space"
" constraint")
return

for rec in self.sia1_recs:
try:
self._query_one_sia1(rec)
except Exception as msg:
self.log.append(f"SIA1 {rec.ivoid} skipped: {msg}")
raise
self._log(f"SIA1 {rec.ivoid} skipped: {msg}")

def _query_one_sia2(self, rec: Queriable):
"""runs our query against a SIA2 capability of rec.
"""
self._info("Querying SIA2 {}...".format(rec.title))

svc = rec.res_rec.get_service("sia2")
constraints = {}
if self.center is not None:
Expand All @@ -291,11 +345,9 @@ def _query_one_sia2(self, rec: Queriable):

matches = list(
ImageFound.from_obscore_recs(svc.search(**constraints)))
if len(matches):
self.log.append(f"SIA2 service {rec}: {len(matches)} recs")
else:
self.log.append(f"SIA2 service {rec}: no matches")

self._log("SIA2 {}: {} records".format(
rec.title,
len(matches)))
self.results.extend(matches)

def _query_sia2(self):
Expand All @@ -305,16 +357,20 @@ def _query_sia2(self):
try:
self._query_one_sia2(rec)
except Exception as msg:
self.log.append(f"SIA2 {rec.ivoid} skipped: {msg}")
raise
self._log(f"SIA2 {rec.ivoid} skipped: {msg}")

def _query_one_obscore(self, rec: Queriable, where_clause:str):
"""runs our query against a Obscore capability of rec.
"""
self._info("Querying Obscore {}...".format(rec.title))
svc = rec.res_rec.get_service("tap")
recs = svc.query("select * from ivoa.obscore"+where_clause)
self.results.extend(
ImageFound.from_obscore_recs(recs))
matches = ImageFound.from_obscore_recs(recs)

self._log("Obscore {}: {} records".format(
rec.title,
len(matches)))
self.results.extend(matches)

def _query_obscore(self):
"""runs the Obscore part of our discovery.
Expand All @@ -338,7 +394,8 @@ def _query_obscore(self):
try:
self._query_one_obscore(rec, where_clause)
except Exception as msg:
self.log.append(f"Obscore {rec['ivoid']} skipped: {msg}")
self._log("Obscore {} skipped: {}".format(
rec.res_rec['ivoid'], msg))

def query_services(self):
"""queries the discovered image services according to our
Expand All @@ -361,7 +418,8 @@ def images_globally(
space: Optional[Tuple[float, float, float]]=None,
spectrum: Optional[Quantity]=None,
time: Optional[float]=None,
inclusive: bool=False
inclusive: bool=False,
watcher: Optional[Callable[[str], None]]=None
) -> Tuple[List[obscore.ObsCoreMetadata], List[str]]:
"""returns a collection of ObsCoreMetadata-s matching certain constraints
and a list of log lines.
Expand All @@ -383,14 +441,17 @@ def images_globally(
Set to True to incluse services that do not declare their
STC coverage. By 2023, it's a good idea to do that as many
relevant archives do not do that.
watcher :
A callable that will be called with strings perhaps suitable
for displaying to a human.
When an image has insufficient metadata to evaluate a constraint, it
is excluded; this mimics the behaviour of SQL engines that consider
comparisons with NULL-s false.
"""
discoverer = _ImageDiscoverer(space, spectrum, time, inclusive)
discoverer = ImageDiscoverer(space, spectrum, time, inclusive, watcher)
discoverer.discover_services()
discoverer.query_services()
# TODO: We should un-dupe by image access URL
# TODO: We could compute SODA cutout URLs here in addition.
return discoverer.results, discoverer.log
return discoverer.results, discoverer.log_messages

This file was deleted.

Binary file not shown.

This file was deleted.

Binary file not shown.

This file was deleted.

Binary file not shown.
Loading

0 comments on commit e7a1e53

Please sign in to comment.