Skip to content

Commit

Permalink
work-around and diagnostics for Ctx excess allocation bug (#292)
Browse files Browse the repository at this point in the history
* work-around and diagnostics for Ctx excess allocation bug

* add additional logging
  • Loading branch information
Bruce Martin authored Mar 27, 2023
1 parent 7805203 commit 79c6240
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import concurrent.futures
import dataclasses
import gc
import logging
import os.path
import pathlib
Expand All @@ -16,7 +17,7 @@
from typing_extensions import Self

from ..build_state import CensusBuildArgs
from ..util import urlcat
from ..util import log_process_resource_status, urlcat
from .anndata import make_anndata_cell_filter, open_anndata
from .consolidate import list_uris_to_consolidate
from .datasets import Dataset
Expand Down Expand Up @@ -136,7 +137,9 @@ def validate_all_soma_objects_exist(soma_path: str, experiment_specifications: L
assert sum([c.non_zero_length for c in rna["feature_dataset_presence_matrix"].read().coos()]) > 0
# TODO(atolopko): validate 1) shape, 2) joinids exist in datsets and var

return True
gc.collect()
log_process_resource_status()
return True


def _validate_axis_dataframes(args: Tuple[str, str, Dataset, List[ExperimentSpecification]]) -> Dict[str, EbInfo]:
Expand Down Expand Up @@ -172,7 +175,9 @@ def _validate_axis_dataframes(args: Tuple[str, str, Dataset, List[ExperimentSpec
ad_obs = ad.obs[list(CXG_OBS_TERM_COLUMNS)].reset_index(drop=True)
assert (dataset_obs == ad_obs).all().all(), f"{dataset.dataset_id}/{eb.name} obs content, mismatch"

return eb_info
gc.collect()
log_process_resource_status()
return eb_info


def validate_axis_dataframes(
Expand Down Expand Up @@ -361,6 +366,8 @@ def _validate_X_layers_contents_by_dataset(args: Tuple[str, str, Dataset, List[E
f"{eb.name}:{dataset.dataset_id} unexpected False " "stored in presence matrix"
)

gc.collect()
log_process_resource_status()
return True


Expand Down Expand Up @@ -390,7 +397,9 @@ def _validate_X_layer_has_unique_coords(args: Tuple[ExperimentSpecification, str
unique_offsets = np.unique(offsets)
assert len(offsets) == len(unique_offsets)

return True
gc.collect()
log_process_resource_status()
return True


def validate_X_layers(
Expand Down
56 changes: 56 additions & 0 deletions tools/cell_census_builder/src/cell_census_builder/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import logging
import multiprocessing
import platform
import re
import urllib.parse

from .build_state import CensusBuildArgs
Expand Down Expand Up @@ -44,3 +47,56 @@ def process_init(args: CensusBuildArgs) -> None:
multiprocessing.set_start_method("spawn", True)

logging_init(args)


class ProcessResourceGetter:
"""
Access to process resource state, primary for diagnostic/debugging purposes. Currently
provides current and high water mark for:
* thread count
* mmaps
Linux-only at the moment.
"""

# historical maxima
max_thread_count = -1
max_map_count = -1

@property
def thread_count(self) -> int:
"""Return the thread count for the current process. Retain the historical maximum."""
if platform.system() != "Linux":
return -1

with open("/proc/self/status") as f:
status = f.read()
thread_count = int(re.split(".*\nThreads:\t(\d+)\n.*", status)[1])
self.max_thread_count = max(thread_count, self.max_thread_count)
return thread_count

@property
def map_count(self) -> int:
"""Return the memory map count for the current process. Retain the historical maximum."""
if platform.system() != "Linux":
return -1

with open("/proc/self/maps") as f:
maps = f.read()
map_count = maps.count("\n")
self.max_map_count = max(map_count, self.max_map_count)
return map_count


_resouce_getter = ProcessResourceGetter()


def log_process_resource_status(preface: str = "Resource use:") -> None:
"""Print current and historical max of thread and (memory) map counts"""
if platform.system() == "Linux":
logging.debug(
f"{preface} threads: {_resouce_getter.thread_count} "
f"[max: {_resouce_getter.max_thread_count}], "
f"maps: {_resouce_getter.map_count} "
f"[max: {_resouce_getter.max_map_count}]"
)

0 comments on commit 79c6240

Please sign in to comment.