Skip to content

Commit

Permalink
highly variable gene annotation (#511)
Browse files Browse the repository at this point in the history
* initial implementation of highly_variable_genes

* add test marks

* add prebuffered iterator

* lint

* lint

* docstrings

* reduce expensive tests

* fix typo

* actually fix typo

* add test for get_highly_variable_genes

* lint

* reduce memory use in tests

* add example to docstring

* fix anon access in small memory context

* PR feedback

* loess jitter

* increase max loess noise max to 1e-6

* add tests
  • Loading branch information
Bruce Martin authored Jun 7, 2023
1 parent 58cb475 commit 379a8ca
Show file tree
Hide file tree
Showing 8 changed files with 1,005 additions and 6 deletions.
1 change: 1 addition & 0 deletions api/python/cellxgene_census/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ experimental = [
"torch==2.0.1",
"torchdata==0.6.1",
"scikit-learn==1.2.2",
"scikit-misc==0.2.0",
]

[project.urls]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,8 @@

import anndata
import tiledbsoma as soma

# TODO: waiting on https://github.com/single-cell-data/TileDB-SOMA/issues/872.
from somacore.options import SparseDFCoord

# TODO: rm this import and use `soma.AxisColumnNames` after https://github.com/single-cell-data/TileDB-SOMA/issues/791
from somacore.query.query import AxisColumnNames

from ._experiment import _get_experiment


Expand All @@ -29,7 +24,7 @@ def get_anndata(
obs_coords: Optional[SparseDFCoord] = None,
var_value_filter: Optional[str] = None,
var_coords: Optional[SparseDFCoord] = None,
column_names: Optional[AxisColumnNames] = None,
column_names: Optional[soma.AxisColumnNames] = None,
) -> anndata.AnnData:
"""
Convience wrapper around ``soma.Experiment`` query, to build and execute a query,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
API to facilitate preprocessing of SOMA datasets.
"""

from ._highly_variable_genes import get_highly_variable_genes, highly_variable_genes

__all__ = [
"get_highly_variable_genes",
"highly_variable_genes",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import threading
from collections import deque
from concurrent import futures
from typing import Deque, Iterator, Optional, TypeVar

_T = TypeVar("_T")


class EagerIterator(Iterator[_T]):
def __init__(
self,
iterator: Iterator[_T],
pool: Optional[futures.Executor] = None,
):
super().__init__()
self.iterator = iterator
self._pool = pool or futures.ThreadPoolExecutor()
self._own_pool = pool is None
self._future = self._pool.submit(self.iterator.__next__)

def __next__(self) -> _T:
try:
res = self._future.result()
self._future = self._pool.submit(self.iterator.__next__)
return res
except StopIteration:
self._cleanup()
raise

def _cleanup(self) -> None:
if self._own_pool:
self._pool.shutdown()

def __del__(self) -> None:
# Ensure the threadpool is cleaned up in the case where the
# iterator is not exhausted. For more information on __del__:
# https://docs.python.org/3/reference/datamodel.html#object.__del__
self._cleanup()
super_del = getattr(super(), "__del__", lambda: None)
super_del()


class EagerBufferedIterator(Iterator[_T]):
def __init__(
self,
iterator: Iterator[_T],
max_pending: int = 1,
pool: Optional[futures.Executor] = None,
):
super().__init__()
self.iterator = iterator
self.max_pending = max_pending
self._pool = pool or futures.ThreadPoolExecutor()
self._own_pool = pool is None
self._pending_results: Deque[futures.Future[_T]] = deque()
self._lock = threading.Lock()
self._begin_next()

def __next__(self) -> _T:
try:
res = self._pending_results[0].result()
self._pending_results.popleft()
self._begin_next()
return res
except StopIteration:
self._cleanup()
raise

def _begin_next(self) -> None:
def _fut_done(fut: futures.Future[_T]) -> None:
if fut.exception() is None:
self._begin_next()

with self._lock:
not_running = len(self._pending_results) == 0 or self._pending_results[-1].done()
if len(self._pending_results) < self.max_pending and not_running:
_future = self._pool.submit(self.iterator.__next__)
_future.add_done_callback(_fut_done)
self._pending_results.append(_future)
assert len(self._pending_results) <= self.max_pending

def _cleanup(self) -> None:
if self._own_pool:
self._pool.shutdown()

def __del__(self) -> None:
# Ensure the threadpool is cleaned up in the case where the
# iterator is not exhausted. For more information on __del__:
# https://docs.python.org/3/reference/datamodel.html#object.__del__
self._cleanup()
super_del = getattr(super(), "__del__", lambda: None)
super_del()
Loading

0 comments on commit 379a8ca

Please sign in to comment.