Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

highly variable gene annotation #511

Merged
merged 22 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/python/cellxgene_census/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ experimental = [
"torch==2.0.1",
"torchdata==0.6.1",
"scikit-learn==1.2.2",
"scikit-misc==0.2.0",
]

[project.urls]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,8 @@

import anndata
import tiledbsoma as soma

# TODO: waiting on https://github.com/single-cell-data/TileDB-SOMA/issues/872.
from somacore.options import SparseDFCoord

# TODO: rm this import and use `soma.AxisColumnNames` after https://github.com/single-cell-data/TileDB-SOMA/issues/791
from somacore.query.query import AxisColumnNames

from ._experiment import _get_experiment


Expand All @@ -29,7 +24,7 @@ def get_anndata(
obs_coords: Optional[SparseDFCoord] = None,
var_value_filter: Optional[str] = None,
var_coords: Optional[SparseDFCoord] = None,
column_names: Optional[AxisColumnNames] = None,
column_names: Optional[soma.AxisColumnNames] = None,
) -> anndata.AnnData:
"""
Convience wrapper around ``soma.Experiment`` query, to build and execute a query,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from ._highly_variable_genes import get_highly_variable_genes, highly_variable_genes

__all__ = [
"get_highly_variable_genes",
"highly_variable_genes",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import threading
from collections import deque
from concurrent import futures
from typing import Deque, Iterator, Optional, TypeVar

_T = TypeVar("_T")


class EagerIterator(Iterator[_T]):
atolopko-czi marked this conversation as resolved.
Show resolved Hide resolved
def __init__(
self,
iterator: Iterator[_T],
pool: Optional[futures.Executor] = None,
):
super().__init__()
self.iterator = iterator
self._pool = pool or futures.ThreadPoolExecutor()
self._own_pool = pool is None
self._future = self._pool.submit(self.iterator.__next__)

def __next__(self) -> _T:
try:
res = self._future.result()
self._future = self._pool.submit(self.iterator.__next__)
return res
except StopIteration:
self._cleanup()
raise

def _cleanup(self) -> None:
if self._own_pool:
self._pool.shutdown()

def __del__(self) -> None:
# Ensure the threadpool is cleaned up in the case where the
# iterator is not exhausted. For more information on __del__:
# https://docs.python.org/3/reference/datamodel.html#object.__del__
self._cleanup()
super_del = getattr(super(), "__del__", lambda: None)
super_del()


class EagerBufferedIterator(Iterator[_T]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but left in because I am consider it as a means of further performance enhancement.

def __init__(
self,
iterator: Iterator[_T],
max_pending: int = 1,
pool: Optional[futures.Executor] = None,
):
super().__init__()
self.iterator = iterator
self.max_pending = max_pending
self._pool = pool or futures.ThreadPoolExecutor()
self._own_pool = pool is None
self._pending_results: Deque[futures.Future[_T]] = deque()
self._lock = threading.Lock()
self._begin_next()

def __next__(self) -> _T:
try:
res = self._pending_results[0].result()
self._pending_results.popleft()
self._begin_next()
return res
except StopIteration:
self._cleanup()
raise

def _begin_next(self) -> None:
def _fut_done(fut: futures.Future[_T]) -> None:
if fut.exception() is None:
self._begin_next()

with self._lock:
not_running = len(self._pending_results) == 0 or self._pending_results[-1].done()
if len(self._pending_results) < self.max_pending and not_running:
_future = self._pool.submit(self.iterator.__next__)
_future.add_done_callback(_fut_done)
self._pending_results.append(_future)
assert len(self._pending_results) <= self.max_pending

def _cleanup(self) -> None:
if self._own_pool:
self._pool.shutdown()

def __del__(self) -> None:
# Ensure the threadpool is cleaned up in the case where the
# iterator is not exhausted. For more information on __del__:
# https://docs.python.org/3/reference/datamodel.html#object.__del__
self._cleanup()
super_del = getattr(super(), "__del__", lambda: None)
super_del()
Loading