From 0850ee12e71ce1cd350507b7a19ac320edc57bd2 Mon Sep 17 00:00:00 2001 From: Nihal Harish Date: Fri, 28 Aug 2020 13:03:30 -0700 Subject: [PATCH] Revert "Add ability to only save shapes of tensors (#328)" (#337) This reverts commit c9eb76984ab42bbfe17e6b72b25c1edb1bfdfbc2. --- docs/api.md | 1 - smdebug/core/hook.py | 78 ++---------- smdebug/core/index_reader.py | 100 +++++++-------- smdebug/core/locations.py | 14 --- smdebug/core/reduction_config.py | 30 +---- smdebug/core/tensor.py | 35 +----- smdebug/core/tfevent/index_file_writer.py | 36 ++---- smdebug/core/writer.py | 114 +++++------------- smdebug/exceptions.py | 15 --- smdebug/tensorflow/base_hook.py | 86 +++++-------- smdebug/trials/local_trial.py | 8 +- smdebug/trials/s3_trial.py | 8 +- smdebug/trials/trial.py | 102 ++++++++-------- tests/mxnet/test_hook_reduce_config.py | 20 --- .../test_hook_save_shape.json | 9 -- tests/pytorch/test_reduce_config.py | 82 ------------- .../tensorflow/hooks/test_estimator_modes.py | 32 +---- tests/tensorflow/hooks/test_reductions.py | 16 --- tests/tensorflow/keras/test_keras.py | 15 --- tests/tensorflow2/test_keras.py | 26 +--- tests/tensorflow2/test_keras_mirrored.py | 15 --- tests/utils.py | 44 ------- 22 files changed, 205 insertions(+), 681 deletions(-) delete mode 100644 tests/pytorch/test_json_configs/test_hook_save_shape.json diff --git a/docs/api.md b/docs/api.md index fe01532df..92ac9ecc0 100644 --- a/docs/api.md +++ b/docs/api.md @@ -96,7 +96,6 @@ include_workers include_regex reductions save_raw_tensor -save_shape save_interval save_steps start_step diff --git a/smdebug/core/hook.py b/smdebug/core/hook.py index 758782620..10577fed6 100644 --- a/smdebug/core/hook.py +++ b/smdebug/core/hook.py @@ -46,7 +46,7 @@ size_and_shape, validate_custom_tensor_value, ) -from smdebug.core.writer import FileWriter, ShapeWriter +from smdebug.core.writer import FileWriter from smdebug.exceptions import InvalidCollectionConfiguration try: @@ -222,7 +222,7 @@ def __init__( self.mode = ModeKeys.GLOBAL self.mode_steps = {ModeKeys.GLOBAL: init_step} self.writer = None - self.shape_writer = None + if is_sagemaker_job() and SageMakerFileMetricsWriter is not None: self.metrics_writer = SageMakerFileMetricsWriter() else: @@ -343,12 +343,6 @@ def _get_collections_to_save_for_step(self) -> Set["Collection"]: ) return self._collections_to_save_for_step - def _saving_shapes_in_step(self) -> bool: - for coll in self._get_collections_to_save_for_step(): - if coll.reduction_config.save_shape is True: - return True - return False - def _get_collections_with_tensor(self, tensor_name) -> Set["Collection"]: self._assert_prep() # for tf this will be prepopulated in check_and_add_tensor @@ -410,17 +404,6 @@ def _prepare_collections(self): self.prepared_collections = True #### End of Save Manager methods #### - @staticmethod - def _close_given_writer_map(writer_dict): - # Delete all the dist training writers - to_delete_writers = [] - for key, writer in writer_dict.items(): - # close calls flush - writer.close() - to_delete_writers.append(key) - - for key in to_delete_writers: - del writer_dict[key] def _close_writers(self) -> None: if self.dry_run: @@ -434,11 +417,16 @@ def _close_writers(self) -> None: self.writer.close() self.writer = None - self._close_given_writer_map(self.tb_writers) + to_delete_writers = [] - if self.shape_writer is not None: - self.shape_writer.close() - self.shape_writer = None + # Delete all the tb writers + for mode, writer in self.tb_writers.items(): + if writer is not None: + writer.flush() + writer.close() + to_delete_writers.append(mode) + for mode in to_delete_writers: + del self.tb_writers[mode] def _initialize_writers(self, only_initialize_if_missing=False) -> None: # Function is overridden in smdebug/tensorflow/base_hook.py @@ -466,24 +454,9 @@ def _initialize_writers(self, only_initialize_if_missing=False) -> None: if self.save_all_workers is False: if self.worker != self.chief_worker: return - self.writer = FileWriter(trial_dir=self.out_dir, step=self.step, worker=self.worker) - if self._saving_shapes_in_step(): - self.shape_writer = ShapeWriter( - trial_dir=self.out_dir, - step=self.step, - worker=self.worker, - index_writer=self.writer.index_writer, - ) - - def _get_single_process_writers(self, shape_writers=False) -> List[FileWriter]: - if shape_writers is False: - return [self.writer] if self.writer else [] - else: - return [self.shape_writer] if self.shape_writer else [] - - def _get_writers(self, tensor_name, tensor_ref=None, shape_writers=False) -> List[FileWriter]: + def _get_writers(self, tensor_name, tensor_ref=None) -> List[FileWriter]: """ :param tensor_name: :param tensor_ref: used by TF @@ -491,7 +464,7 @@ def _get_writers(self, tensor_name, tensor_ref=None, shape_writers=False) -> Lis """ if self.save_all_workers is False and self.worker != self.chief_worker: return [] - return self._get_single_process_writers(shape_writers) + return [self.writer] if self.writer else [] def _maybe_get_tb_writer(self) -> Optional[FileWriter]: """ Returns a FileWriter object if `hook.tensorboard_dir` has been specified, else None. @@ -753,28 +726,6 @@ def _write_raw_tensor(self, tensor_name, tensor_value, save_collections, tensor_ self._write_raw_tensor_simple(tensor_name, tensor_value, tensor_ref=tensor_ref) break - def _write_shape(self, tensor_name, tensor_value, save_collections, tensor_ref=None): - shape_writers = self._get_writers(tensor_name, tensor_ref=tensor_ref, shape_writers=True) - for s_col in save_collections: - reduction_config = s_col.reduction_config - if self.dry_run is False and reduction_config.save_shape is True: - numpy_tensor_value = self._make_numpy_array(tensor_value) - this_size, this_shape = size_and_shape(numpy_tensor_value) - if tensor_ref is not None and tensor_ref.tf_obj is not None: - original_name = tensor_ref.tf_obj.name - else: - original_name = None - - for writer in shape_writers: - writer.write_shape( - tensor_name, - this_shape, - self.mode, - self.mode_steps[self.mode], - original_name=original_name, - ) - break - def _write_raw_tensor_simple(self, tensor_name, tensor_value, tensor_ref=None, timestamp=None): # tensor_ref is used by TF # todo: if fp16, check perf of saving as fp16 in proto vs as fp32 @@ -854,9 +805,6 @@ def _write_for_tensor(self, tensor_name, tensor_value, save_collections, tensor_ :param save_collections: list of collections which are being saved for this step """ self._log_save(tensor_name, save_collections) - - self._write_shape(tensor_name, tensor_value, save_collections, tensor_ref=tensor_ref) - # write reductions defined for collections this tensor may be part of self._write_reductions(tensor_name, tensor_value, save_collections, tensor_ref=tensor_ref) diff --git a/smdebug/core/index_reader.py b/smdebug/core/index_reader.py index 2c182d0d0..3b23e1671 100644 --- a/smdebug/core/index_reader.py +++ b/smdebug/core/index_reader.py @@ -16,7 +16,7 @@ MISSING_EVENT_FILE_RETRY_LIMIT, MISSING_EVENT_FILE_RETRY_LIMIT_KEY, ) -from smdebug.core.locations import IndexFileLocationUtils, TensorLocation, TensorShape +from smdebug.core.locations import IndexFileLocationUtils, TensorLocation from smdebug.core.logger import get_logger from smdebug.core.modes import ModeKeys from smdebug.core.s3_utils import list_s3_objects @@ -120,22 +120,12 @@ def fetch_tensor_value(self, tensor_location: TensorLocation): def list_event_files(self, start_after_prefix): pass + @abstractmethod def load_tensor_data_from_index_files( self, start_after_key=None, range_steps=None ) -> Tuple[Dict[str, Dict[int, Dict[str, TensorLocation]]], str]: """Return a triply nested dict referring to tensor data.""" - responses, steps, last_index_token, workers = self.read_index_files( - start_after_key, range_steps - ) - - tensor_data = {} - for step, response, worker in zip(steps, responses, workers): - tensor_data = self._update_tensors_from_json( - tensor_data, step, response, self.path, worker - ) - return tensor_data, last_index_token - @abstractmethod def _is_event_file_present(self, file_name) -> bool: pass @@ -213,10 +203,8 @@ def _validate(index_dict): raise IndexReaderException("meta section is not present") if len(index_dict["meta"]) == 0: raise IndexReaderException("meta section is empty") - if "tensor_payload" not in index_dict and "shape_payload" not in index_dict: - raise IndexReaderException( - "neither tensor_payload nor shape_payload sections are present" - ) + if "tensor_payload" not in index_dict: + raise IndexReaderException("tensor_payload section is not present") def _update_tensors_from_json( self, index_tensors_dict, step, response: bytes, path, worker @@ -245,41 +233,28 @@ def _update_tensors_from_json( mode = index_meta["mode"] mode = ModeKeys[mode.strip()] mode_step = index_meta["mode_step"] - - to_update_index_dict = [] - - if "tensor_payload" in index_dict and len(index_dict["tensor_payload"]): - event_file_name = os.path.join(path, index_meta["event_file_name"]) - for tensor in index_dict["tensor_payload"]: - tensor_name = tensor["tensorname"] - start_idx = tensor["start_idx"] - length = tensor["length"] - tensor_location = TensorLocation( - tensor_name, mode, mode_step, event_file_name, start_idx, length, worker - ) - to_update_index_dict.append((tensor_name, step, tensor_location)) - - if "shape_payload" in index_dict and len(index_dict["shape_payload"]): - for tensor in index_dict["shape_payload"]: - tensor_name = tensor["tensorname"] - original_name = tensor["originalname"] - shape = tensor["shape"] - ts = TensorShape(tensor_name, mode, mode_step, shape, original_name) - to_update_index_dict.append((tensor_name, step, ts)) - - for tu in to_update_index_dict: - tensor_name, step, obj = tu - if isinstance(obj, TensorLocation): - obj_dict = {"tensor_location": obj} - elif isinstance(obj, TensorShape): - obj_dict = {"tensor_shape": obj} + event_file_name = os.path.join(path, index_meta["event_file_name"]) + tensors = index_dict["tensor_payload"] + for tensor in tensors: + tensor_name = tensor["tensorname"] + start_idx = tensor["start_idx"] + length = tensor["length"] + tensor_location = TensorLocation( + tensor_name, mode, mode_step, event_file_name, start_idx, length, worker + ) if tensor_name in index_tensors_dict: if step in index_tensors_dict[tensor_name]: - index_tensors_dict[tensor_name][step].update({worker: obj_dict}) + index_tensors_dict[tensor_name][step].update( + {worker: {"tensor_location": tensor_location}} + ) else: - index_tensors_dict[tensor_name].update({step: {worker: obj_dict}}) + index_tensors_dict[tensor_name].update( + {step: {worker: {"tensor_location": tensor_location}}} + ) else: - index_tensors_dict[tensor_name] = {step: {worker: obj_dict}} + index_tensors_dict[tensor_name] = { + step: {worker: {"tensor_location": tensor_location}} + } return index_tensors_dict @@ -310,6 +285,22 @@ def fetch_tensor_value(self, tensor_location: TensorLocation) -> np.ndarray: tensor_name, step, tensor_data, mode, mode_step = tensor_tuple return tensor_data + def load_tensor_data_from_index_files( + self, start_after_key=None, range_steps=None + ) -> Tuple[Dict[str, Dict[int, Dict[str, TensorLocation]]], str]: + """Return a triply nested dict referring to tensor data.""" + + responses, steps, last_index_token, workers = self.read_index_files( + start_after_key, range_steps + ) + + tensor_data = {} + for step, response, worker in zip(steps, responses, workers): + tensor_data = self._update_tensors_from_json( + tensor_data, step, response, self.path, worker + ) + return tensor_data, last_index_token + def read_index_files( self, start_after_key: str, range_steps=None ) -> Tuple[List[bytes], list, str, List[str]]: @@ -407,6 +398,21 @@ def fetch_tensor_value(self, tensor_location: TensorLocation) -> np.ndarray: tensor_name, step, tensor_data, mode, mode_step = tensor_tuple return tensor_data + def load_tensor_data_from_index_files( + self, start_after_key=None, range_steps=None + ) -> Tuple[Dict[str, Dict[int, Dict[str, TensorLocation]]], str]: + """Return a triply nested dict referring to tensor data.""" + + responses, steps, last_index_token, workers = self.read_index_files( + start_after_key, range_steps + ) + tensor_data = {} + for step, response, worker in zip(steps, responses, workers): + tensor_data = self._update_tensors_from_json( + tensor_data, step, response, self.path, worker + ) + return tensor_data, last_index_token + def read_index_files( self, start_after_key: str, range_steps=None ) -> Tuple[List[bytes], list, str, List[str]]: diff --git a/smdebug/core/locations.py b/smdebug/core/locations.py index 9712f58a2..af703e514 100644 --- a/smdebug/core/locations.py +++ b/smdebug/core/locations.py @@ -24,20 +24,6 @@ def to_dict(self): return {"tensorname": self.tensorname, "start_idx": self.start_idx, "length": self.length} -class TensorShape: - def __init__(self, name, mode, mode_step, shape, original_name=None): - if original_name is None: - original_name = name - self.name = name - self.original_name = original_name - self.mode = mode - self.mode_step = mode_step - self.shape = tuple(shape) - - def to_dict(self): - return {"tensorname": self.name, "originalname": self.original_name, "shape": self.shape} - - STEP_NUMBER_FORMATTING_LENGTH = "012" diff --git a/smdebug/core/reduction_config.py b/smdebug/core/reduction_config.py index 1fa6121b6..9a24ff6d6 100644 --- a/smdebug/core/reduction_config.py +++ b/smdebug/core/reduction_config.py @@ -3,23 +3,12 @@ from typing import Any, Dict # First Party -from smdebug.core.logger import get_logger from smdebug.core.utils import split -logger = get_logger() - - ALLOWED_REDUCTIONS = ["min", "max", "mean", "std", "variance", "sum", "prod"] ALLOWED_NORMS = ["l1", "l2"] REDUCTION_CONFIG_VERSION_NUM = "v0" -ALLOWED_PARAMS = [ - "reductions", - "abs_reductions", - "norms", - "abs_norms", - "save_raw_tensor", - "save_shape", -] +ALLOWED_PARAMS = ["reductions", "abs_reductions", "norms", "abs_norms", "save_raw_tensor"] class ReductionConfig: @@ -60,14 +49,12 @@ def __init__( norms=None, abs_norms=None, save_raw_tensor=False, - save_shape=False, ): self.reductions = reductions if reductions is not None else [] self.abs_reductions = abs_reductions if abs_reductions is not None else [] self.norms = norms if norms is not None else [] self.abs_norms = abs_norms if abs_norms is not None else [] self.save_raw_tensor = save_raw_tensor - self.save_shape = save_shape ## DO NOT REMOVE, if you add anything here, please make sure that _check & from_json is updated accordingly self._check() @@ -88,8 +75,6 @@ def _check(self): raise ValueError("abs_norms can only be one of " + ",".join(ALLOWED_NORMS)) if not isinstance(self.save_raw_tensor, bool): raise ValueError(f"save_raw_tensor={self.save_raw_tensor} must be a boolean") - if not isinstance(self.save_shape, bool): - raise ValueError(f"save_shape={self.save_shape} must be a boolean") @classmethod def from_dict(cls, params: Dict[str, Any]) -> "ReductionConfig": @@ -98,7 +83,7 @@ def from_dict(cls, params: Dict[str, Any]) -> "ReductionConfig": return None if not isinstance(params, dict): raise ValueError(f"params={params} must be dict") - save_shape = params.get("save_shape", False) + save_raw_tensor = params.get("save_raw_tensor", False) # Parse comma-separated string into array all_reductions = split(params.get("reductions", "")) @@ -123,7 +108,6 @@ def from_dict(cls, params: Dict[str, Any]) -> "ReductionConfig": norms=norms, abs_norms=abs_norms, save_raw_tensor=save_raw_tensor, - save_shape=save_shape, ) @classmethod @@ -132,6 +116,7 @@ def from_json(cls, json_str: str) -> "ReductionConfig": return cls.from_dict(d) def to_json_dict(self) -> Dict[str, Any]: + save_raw_tensor = self.save_raw_tensor # Convert reductions from various arrays into single comma-separated string all_reductions = [] for red in self.reductions: @@ -144,11 +129,7 @@ def to_json_dict(self) -> Dict[str, Any]: all_reductions.append(f"abs_{red}_norm") all_reductions_str = ",".join(all_reductions) # Return the dict - return { - "save_raw_tensor": self.save_raw_tensor, - "reductions": all_reductions_str, - "save_shape": self.save_shape, - } + return {"save_raw_tensor": save_raw_tensor, "reductions": all_reductions_str} def to_json(self) -> str: return json.dumps(self.to_json_dict()) @@ -163,11 +144,10 @@ def __eq__(self, other): and self.norms == other.norms and self.abs_norms == other.abs_norms and self.save_raw_tensor == other.save_raw_tensor - and self.save_shape == other.save_shape ) def __repr__(self): return ( f", save_shape={self.save_shape}, save_raw_tensor={self.save_raw_tensor}" + f"abs_reductions={self.abs_reductions}, norms={self.norms}, abs_norms={self.abs_norms}>" ) diff --git a/smdebug/core/tensor.py b/smdebug/core/tensor.py index de52f7858..c268c48fc 100644 --- a/smdebug/core/tensor.py +++ b/smdebug/core/tensor.py @@ -10,7 +10,6 @@ from smdebug.exceptions import ( InvalidWorker, NoMoreData, - ShapeUnavailableForStep, StepNotYetAvailable, StepUnavailable, TensorUnavailableForStep, @@ -63,16 +62,6 @@ def set_step_location(self, step_num, worker, location): s = self._steps[step_num][worker] s.location = location - def set_step_shape(self, step_num, worker, shape): - step = Step(step_num, shape=shape) - if step_num not in self._steps: - self._steps[step_num] = {worker: step} - elif worker not in self._steps[step_num]: - self._steps[step_num].update({worker: step}) - - s = self._steps[step_num][worker] - s.shape = shape - def set_step_reduction_value(self, step_num, worker, red_name, abs, red_value): if step_num not in self._steps: s = Step(step_num) @@ -99,11 +88,10 @@ def step(self, step_num): class Step: """Contains the step number, value, location, and reduction values/locations.""" - def __init__(self, step_num, value=None, location=None, shape=None): + def __init__(self, step_num, value=None, location=None): self.step_num = step_num self.value = value self.location = location - self.shape = shape # mapping from (red_name, abs) to value self._reduction_values = {} @@ -138,9 +126,6 @@ class Tensor: def __init__(self, name, trial, cache): self._mode_steps = {} self.name = name - # SMdebug modifies some names of tensors to be more descriptive - # In such cases we save here the original name - self.original_name = None self.trial = trial self.cache = cache @@ -279,16 +264,6 @@ def value(self, step_num, mode=ModeKeys.GLOBAL, worker=None): has_reductions = has_reduction_locations or has_reduction_values raise TensorUnavailableForStep(self.name, step_num, mode, has_reductions) - def shape(self, step_num, mode=ModeKeys.GLOBAL, worker=None): - s = self._step(step_num=step_num, mode=mode, worker=worker) - if s.shape is not None: - return s.shape - try: - value = self.value(step_num, mode, worker) - return value.shape - except TensorUnavailableForStep: - raise ShapeUnavailableForStep(self.name, step_num, mode) - def reduction_values(self, step_num, mode=ModeKeys.GLOBAL, worker=None): s = self._step(step_num=step_num, mode=mode, worker=worker) if s is not None: @@ -359,13 +334,9 @@ def _create_mode_step(self, mode, mode_step): if mode not in self._mode_steps: self._mode_steps[mode] = ModeSteps(mode) - def add_step(self, mode, mode_step, worker, tensor_location, tensor_shape): + def add_step(self, mode, mode_step, worker, location): self._create_mode_step(mode, mode_step) - if tensor_location is not None: - self._mode_steps[mode].set_step_location(mode_step, worker, tensor_location) - if tensor_shape is not None: - self._mode_steps[mode].set_step_shape(mode_step, worker, tensor_shape.shape) - self.original_name = tensor_shape.original_name + self._mode_steps[mode].set_step_location(mode_step, worker, location) def add_reduction_step(self, mode, mode_step, worker, red_name, abs, red_location): self._create_mode_step(mode, mode_step) diff --git a/smdebug/core/tfevent/index_file_writer.py b/smdebug/core/tfevent/index_file_writer.py index 886c7760e..80cb54b68 100644 --- a/smdebug/core/tfevent/index_file_writer.py +++ b/smdebug/core/tfevent/index_file_writer.py @@ -13,7 +13,6 @@ def __init__(self, file_path): self.file_path = file_path self.index_payload = [] self.index_meta = {} - self.shape_payload = [] self.writer = None def __exit__(self): @@ -29,7 +28,7 @@ def _init_writer(self): def add_index(self, tensorlocation): if not self.writer: self._init_writer() - if not self.index_meta or not "event_file_name" in self.index_meta: + if not self.index_meta: self.index_meta = { "mode": tensorlocation.mode, "mode_step": tensorlocation.mode_step, @@ -37,13 +36,6 @@ def add_index(self, tensorlocation): } self.index_payload.append(tensorlocation.to_dict()) - def add_shape(self, tensorshape): - if not self.writer: - self._init_writer() - if not self.index_meta: - self.index_meta = {"mode": tensorshape.mode, "mode_step": tensorshape.mode_step} - self.shape_payload.append(tensorshape.to_dict()) - def flush(self): """Flushes the event string to file.""" if not self.writer: @@ -52,45 +44,33 @@ def flush(self): raise ValueError( f"Cannot write empty index_meta={self.index_meta} to file {self.file_path}" ) - if not self.index_payload and not self.shape_payload: + if not self.index_payload: raise ValueError( - f"Cannot write empty payload: index_payload={self.index_payload}, shape_payload={self.shape_payload} to file {self.file_path}" + f"Cannot write empty index_payload={self.index_payload} to file {self.file_path}" ) - index = Index( - meta=self.index_meta, - tensor_payload=self.index_payload, - shape_payload=self.shape_payload, - ) + index = Index(meta=self.index_meta, tensor_payload=self.index_payload) self.writer.write(index.to_json()) self.writer.flush() self.index_meta = {} - self.index_payload = [] - self.shape_payload = [] + self.index_payload = {} def close(self): """Closes the record writer.""" if self.writer is not None: - if self.index_meta and (self.index_payload or self.shape_payload): + if self.index_meta and self.index_payload: self.flush() self.writer.close() self.writer = None class Index: - def __init__(self, meta=None, tensor_payload=None, shape_payload=None): + def __init__(self, meta=None, tensor_payload=None): self.meta = meta self.tensor_payload = tensor_payload - self.shape_payload = shape_payload def to_json(self): - return json.dumps( - { - "meta": self.meta, - "tensor_payload": self.tensor_payload, - "shape_payload": self.shape_payload, - } - ) + return json.dumps({"meta": self.meta, "tensor_payload": self.tensor_payload}) class EventWithIndex(object): diff --git a/smdebug/core/writer.py b/smdebug/core/writer.py index 3966da82b..a342cf433 100644 --- a/smdebug/core/writer.py +++ b/smdebug/core/writer.py @@ -16,9 +16,6 @@ # under the License. """APIs for logging data in the event file.""" -# Standard Library -from typing import Tuple - # First Party from smdebug.core.modes import MODE_PLUGIN_NAME, MODE_STEP_PLUGIN_NAME from smdebug.core.tfevent.event_file_writer import EventFileWriter @@ -34,64 +31,14 @@ from smdebug.core.tfevent.util import make_tensor_proto # Local -from .locations import ( - IndexFileLocationUtils, - TensorboardFileLocation, - TensorFileLocation, - TensorShape, -) +from .locations import IndexFileLocationUtils, TensorboardFileLocation, TensorFileLocation from .logger import get_logger from .modes import ModeKeys logger = get_logger() -class BaseWriter: - def __init__(self, trial_dir, worker, step=0, mode=ModeKeys.GLOBAL): - self.trial_dir = trial_dir - self.step = step - self.worker = worker - if worker is None: - assert False, "Worker should not be none. Check worker name initialization" - self.mode = mode - self._writer = None - self._index_writer = None - - def name(self): - return self._writer.name() - - def __enter__(self): - """Make usable with "with" statement.""" - return self - - def __exit__(self, unused_type, unused_value, unused_traceback): - """Make usable with "with" statement.""" - self.close() - - def flush(self): - """Flushes the event file to disk. - Call this method to make sure that all pending events have been written to disk. - """ - self._writer.flush() - # don't flush index writer as we only want to flush on close - - @classmethod - def create_index_writer(cls, trial_dir, worker, step): - el = TensorFileLocation(step_num=step, worker_name=worker) - event_file_path = el.get_file_location(trial_dir=trial_dir) - index_file_path = IndexFileLocationUtils.get_index_key_for_step(trial_dir, step, worker) - return IndexWriter(index_file_path) - - @property - def index_writer(self): - return self._index_writer - - @index_writer.setter - def index_writer(self, iw): - self._index_writer = iw - - -class FileWriter(BaseWriter): +class FileWriter: def __init__( self, trial_dir, @@ -103,7 +50,6 @@ def __init__( flush_secs=120, verbose=False, write_checksum=False, - index_writer=None, ): """Creates a `FileWriter` and an file. On construction the summary writer creates a new event file in `trial_dir`. @@ -125,16 +71,19 @@ def __init__( verbose : bool Determines whether to print logging messages. """ - super(FileWriter, self).__init__(trial_dir, worker, step, mode) + self.trial_dir = trial_dir + self.step = step + self.worker = worker + if worker is None: + assert False, "Worker should not be none. Check worker name initialization" + self.mode = mode if wtype == "events": - if index_writer is None: - self.index_writer = self.create_index_writer( - trial_dir=trial_dir, worker=worker, step=step - ) - else: - self.index_writer = index_writer el = TensorFileLocation(step_num=self.step, worker_name=self.worker) event_file_path = el.get_file_location(trial_dir=self.trial_dir) + index_file_path = IndexFileLocationUtils.get_index_key_for_step( + self.trial_dir, self.step, self.worker + ) + self.index_writer = IndexWriter(index_file_path) elif wtype == "tensorboard": el = TensorboardFileLocation( step_num=self.step, worker_name=self.worker, mode=self.mode @@ -154,6 +103,14 @@ def __init__( ) self._default_bins = _get_default_bins() + def __enter__(self): + """Make usable with "with" statement.""" + return self + + def __exit__(self, unused_type, unused_value, unused_traceback): + """Make usable with "with" statement.""" + self.close() + @staticmethod def _get_metadata(mode, mode_step): sm2 = SummaryMetadata.PluginData(plugin_name=MODE_STEP_PLUGIN_NAME, content=str(mode_step)) @@ -230,6 +187,13 @@ def write_scalar_summary(self, name, value, global_step, timestamp: float = None s = scalar_summary(name, value) self._writer.write_summary(s, global_step, timestamp=timestamp) + def flush(self): + """Flushes the event file to disk. + Call this method to make sure that all pending events have been written to disk. + """ + self._writer.flush() + # don't flush index writer as we only want to flush on close + def close(self): """Flushes the event file to disk and close the file. Call this method when you do not need the summary writer anymore. @@ -238,6 +202,9 @@ def close(self): if self.index_writer is not None: self.index_writer.close() + def name(self): + return self._writer.name() + @staticmethod def _check_mode_step(mode, mode_step, global_step): if mode_step is None: @@ -249,24 +216,3 @@ def _check_mode_step(mode, mode_step, global_step): ex_str = "mode can be one of " + ", ".join(mode_keys) raise ValueError(ex_str) return mode, mode_step - - -class ShapeWriter(BaseWriter): - def __init__(self, trial_dir, worker, index_writer, step=0, mode=ModeKeys.GLOBAL): - super(ShapeWriter, self).__init__(trial_dir, worker, step, mode) - self._index_writer = index_writer - - def write_shape( - self, name, shape: Tuple[int], mode=ModeKeys.GLOBAL, mode_step=None, original_name=None - ): - self._index_writer.add_shape( - TensorShape(name, mode.name, mode_step, shape, original_name=original_name) - ) - - def flush(self): - self._index_writer.flush() - - def close(self): - """Flushes the event file to disk and close the file. - """ - self._index_writer.close() diff --git a/smdebug/exceptions.py b/smdebug/exceptions.py index e2ed43da9..6d917e6bf 100644 --- a/smdebug/exceptions.py +++ b/smdebug/exceptions.py @@ -68,21 +68,6 @@ def __str__(self): return msg -class ShapeUnavailableForStep(Exception): - def __init__(self, tname, step, mode=modes.GLOBAL): - self.step = step - self.mode = mode - self.tname = tname - - def __str__(self): - msg = ( - "Shape for tensor {} is not available for step {} " - "with mode {} as it was not saved." - "".format(self.tname, self.step, self.mode.name) - ) - return msg - - class TensorUnavailable(Exception): def __init__(self, tname): self.tname = tname diff --git a/smdebug/tensorflow/base_hook.py b/smdebug/tensorflow/base_hook.py index ad6557ade..a8cc9f679 100644 --- a/smdebug/tensorflow/base_hook.py +++ b/smdebug/tensorflow/base_hook.py @@ -16,7 +16,7 @@ from smdebug.core.reductions import get_numpy_reduction, get_reduction_tensor_name from smdebug.core.tfevent.util import make_numpy_array from smdebug.core.utils import serialize_tf_device -from smdebug.core.writer import FileWriter, ShapeWriter +from smdebug.core.writer import FileWriter # Local from .collection import CollectionKeys, CollectionManager @@ -86,7 +86,6 @@ def __init__( Example -> /job:worker/replica:0/task:1/device:GPU:0 : _job-worker_replica-0_task-1_device-GPU-0""" self.device_map = {} self.writer_map = {} - self.shape_writer_map = {} # This will be None if the var wasn't set, i.e. not param server self.tf_config_json = load_tf_config_json(os.getenv("TF_CONFIG")) self._hook_supported = None @@ -262,7 +261,7 @@ def _set_chief_worker(self): elif self.distribution_strategy == TFDistributionStrategy.UNSUPPORTED: raise NotImplementedError - def _get_writers(self, tensor_name, tensor_ref, shape_writers=False) -> List[FileWriter]: + def _get_writers(self, tensor_name, tensor_ref) -> List[FileWriter]: """ For tensors generated during distributed tf jobs, we map the tensor to a writer with its device attribute. @@ -278,8 +277,8 @@ def _get_writers(self, tensor_name, tensor_ref, shape_writers=False) -> List[Fil TFDistributionStrategy.PARAMETER_SERVER, TFDistributionStrategy.HOROVOD, ]: - if self.save_all_workers is True or self.worker == self.chief_worker: - return self._get_single_process_writers(shape_writers) + if (self.save_all_workers is True or self.worker == self.chief_worker) and self.writer: + return [self.writer] elif self.distribution_strategy == TFDistributionStrategy.MIRRORED: if len(self.device_map): # else is for metrics in Keras @@ -290,25 +289,17 @@ def _get_writers(self, tensor_name, tensor_ref, shape_writers=False) -> List[Fil # if device str is empty or cpu in worker if not bool(worker) or "CPU" in worker: if self.save_all_workers: - if shape_writers is False: - return list(self.writer_map.values()) - else: - return list(self.shape_writer_map.values()) + return list(self.writer_map.values()) else: - if shape_writers is False: - return [self.writer_map[self.device_map[self.chief_worker]]] - else: - return [self.shape_writer_map[self.device_map[self.chief_worker]]] + return [self.writer_map[self.device_map[self.chief_worker]]] elif self.save_all_workers or worker == self.chief_worker: - if shape_writers is False: - return [self.writer_map[self.device_map[worker]]] - else: - return [self.shape_writer_map[self.device_map[worker]]] - else: + return [self.writer_map[self.device_map[worker]]] + elif self.writer: # training on CPU when all device strings have cpu - return self._get_single_process_writers(shape_writers) + return [self.writer] elif self.distribution_strategy == TFDistributionStrategy.NONE: - return self._get_single_process_writers(shape_writers) + if self.writer: + return [self.writer] else: raise NotImplementedError # when self.writer is None, returns empty list @@ -329,13 +320,6 @@ def _initialize_writers(self, only_initialize_if_missing=False) -> None: self.writer = FileWriter( trial_dir=self.out_dir, step=self.step, worker=self.worker ) - if self._saving_shapes_in_step(): - self.shape_writer = ShapeWriter( - trial_dir=self.out_dir, - step=self.step, - worker=self.worker, - index_writer=self.writer.index_writer, - ) elif self.distribution_strategy == TFDistributionStrategy.MIRRORED: if len(self.device_map): for device, device_string in self.device_map.items(): @@ -345,37 +329,15 @@ def _initialize_writers(self, only_initialize_if_missing=False) -> None: self.writer_map[device_string] = FileWriter( trial_dir=self.out_dir, step=self.step, worker=device_string ) - if self._saving_shapes_in_step(): - self.shape_writer_map[device_string] = ShapeWriter( - trial_dir=self.out_dir, - step=self.step, - worker=self.worker, - index_writer=self.writer_map[device_string].index_writer, - ) else: # training on CPU when all device strings have cpu if self.writer is None or only_initialize_if_missing is False: self.writer = FileWriter( trial_dir=self.out_dir, step=self.step, worker=self.worker ) - if self._saving_shapes_in_step(): - self.shape_writer = ShapeWriter( - trial_dir=self.out_dir, - step=self.step, - worker=self.worker, - index_writer=self.writer.index_writer, - ) - elif self.distribution_strategy == TFDistributionStrategy.NONE: if self.writer is None or only_initialize_if_missing is False: self.writer = FileWriter(trial_dir=self.out_dir, step=self.step, worker=self.worker) - if self._saving_shapes_in_step(): - self.shape_writer = ShapeWriter( - trial_dir=self.out_dir, - step=self.step, - worker=self.worker, - index_writer=self.writer.index_writer, - ) else: raise NotImplementedError @@ -391,13 +353,25 @@ def _close_writers(self) -> None: self.writer.close() self.writer = None - self._close_given_writer_map(self.writer_map) - self._close_given_writer_map(self.shape_writer_map) - self._close_given_writer_map(self.tb_writers) - - if self.shape_writer is not None: - self.shape_writer.close() - self.shape_writer = None + # Delete all the dist training writers + to_delete_writers = [] + for device, writer in self.writer_map.items(): + writer.flush() + writer.close() + to_delete_writers.append(device) + + for device in to_delete_writers: + del self.writer_map[device] + + to_delete_writers = [] + # Delete all the tb writers + for mode, writer in self.tb_writers.items(): + if writer is not None: + writer.flush() + writer.close() + to_delete_writers.append(mode) + for mode in to_delete_writers: + del self.tb_writers[mode] def _export_model(self): tb_writer = self._maybe_get_tb_writer() diff --git a/smdebug/trials/local_trial.py b/smdebug/trials/local_trial.py index 272acff83..0fa4e76ba 100644 --- a/smdebug/trials/local_trial.py +++ b/smdebug/trials/local_trial.py @@ -34,11 +34,17 @@ def __init__( self.index_reader = LocalIndexReader(self.path) self.logger.info(f"Loading trial {name} at path {self.trial_dir}") self._load_collections() - self.refresh_data() + self._load_tensors() def _get_collection_files(self) -> list: return list_collection_files_in_directory(get_path_to_collections(self.path)) + def _load_tensors_from_index_tensors(self, index_tensors_dict): + for tname in index_tensors_dict: + for step, itds in index_tensors_dict[tname].items(): + for worker in itds: + self._add_tensor(int(step), worker, itds[worker]["tensor_location"]) + def _read_collections(self, collection_files): first_collection_file = collection_files[0] # First Collection File self.collection_manager = CollectionManager.load(first_collection_file) diff --git a/smdebug/trials/s3_trial.py b/smdebug/trials/s3_trial.py index 374ecdfad..0a3cb6389 100644 --- a/smdebug/trials/s3_trial.py +++ b/smdebug/trials/s3_trial.py @@ -45,7 +45,7 @@ def __init__( self.path = "s3://" + os.path.join(self.bucket_name, self.prefix_name) self.index_reader = S3IndexReader(self.path) self._load_collections() - self.refresh_data() + self._load_tensors() def _get_collection_files(self) -> list: collection_files, _ = list_s3_objects( @@ -56,6 +56,12 @@ def _get_collection_files(self) -> list: ) return collection_files + def _load_tensors_from_index_tensors(self, index_tensors_dict): + for tname in index_tensors_dict: + for step, itds in index_tensors_dict[tname].items(): + for worker in itds: + self._add_tensor(int(step), worker, itds[worker]["tensor_location"]) + def _read_collections(self, collection_files): first_collection_file = collection_files[0] # First Collection File key = os.path.join(first_collection_file) diff --git a/smdebug/trials/trial.py b/smdebug/trials/trial.py index c1c8d9c84..5008ef6a7 100644 --- a/smdebug/trials/trial.py +++ b/smdebug/trials/trial.py @@ -14,7 +14,7 @@ TRAINING_END_DELAY_REFRESH_DEFAULT, TRAINING_END_DELAY_REFRESH_KEY, ) -from smdebug.core.locations import IndexFileLocationUtils, TensorLocation, TensorShape +from smdebug.core.locations import IndexFileLocationUtils, TensorLocation from smdebug.core.logger import get_logger from smdebug.core.modes import ModeKeys from smdebug.core.reductions import REDUCTIONS_PREFIX, reverse_reduction_tensor_name @@ -190,7 +190,7 @@ def maybe_refresh(self, name=None): retry_count = 2 while retry_count > 0: if name is None: - self.refresh_data() + self.refresh_tensors() else: self.refresh_tensor(name) if retry_count > 1: @@ -215,7 +215,7 @@ def maybe_refresh(self, name=None): def refresh_tensor(self, tname, steps=None): # for now we load all tensors at once - self.refresh_data() + self.refresh_tensors() def tensor(self, tname): # will not show tensor if it was not written yet @@ -231,14 +231,14 @@ def has_tensor(self, tname): self.maybe_refresh(tname) return tname in self._tensors - def _populate_step_dict(self, mode, mode_step, step_num): - if mode != ModeKeys.GLOBAL: - if mode not in self._mode_to_global: - self._mode_to_global[mode] = {} - if mode_step not in self._mode_to_global[mode]: - self._mode_to_global[mode][mode_step] = int(step_num) + def _populate_step_dict(self, tensor_object, step_num): + if tensor_object.mode != ModeKeys.GLOBAL: + if tensor_object.mode not in self._mode_to_global: + self._mode_to_global[tensor_object.mode] = {} + if tensor_object.mode_step not in self._mode_to_global[tensor_object.mode]: + self._mode_to_global[tensor_object.mode][tensor_object.mode_step] = int(step_num) if step_num not in self._global_to_mode: - self._global_to_mode[step_num] = (mode, mode_step) + self._global_to_mode[step_num] = (tensor_object.mode, tensor_object.mode_step) def _populate_workers_for_global_step(self, step, worker) -> None: """ @@ -263,7 +263,7 @@ def _populate_workers_for_global_step(self, step, worker) -> None: self.last_complete_step = step self.logger.debug(f"Populating last completing step to: {step}") - def _populate_global_step_to_tensor_name_map(self, tensorname: str, step_num) -> None: + def _populate_global_step_to_tensor_name_map(self, tensor: TensorLocation, step_num) -> None: """ The self.global_step_to_tensors_map dictionary holds a mapping of step number and a set of all the tensor names that have been written for the step. @@ -274,67 +274,47 @@ def _populate_global_step_to_tensor_name_map(self, tensorname: str, step_num) -> """ if step_num not in self.global_step_to_tensors_map: self.global_step_to_tensors_map[step_num] = set() - self.global_step_to_tensors_map[step_num].add(tensorname) + self.global_step_to_tensors_map[step_num].add(tensor.tensorname) - def _populate_mode_to_tensor_name_map(self, tensorname, mode) -> None: + def _populate_mode_to_tensor_name_map(self, tensor: TensorLocation) -> None: """ The self.mode_to_tensors_map dictionary holds a mapping of mode and a set of all the tensor names that have been written for the mode. :param tensor: :return: """ - if mode != ModeKeys.GLOBAL: - if mode not in self.mode_to_tensors_map: - self.mode_to_tensors_map[mode] = set() - self.mode_to_tensors_map[mode].add(tensorname) + if tensor.mode != ModeKeys.GLOBAL: + if tensor.mode not in self.mode_to_tensors_map: + self.mode_to_tensors_map[tensor.mode] = set() + self.mode_to_tensors_map[tensor.mode].add(tensor.tensorname) - def _load_tensors_from_index_tensors(self, index_tensors_dict): - for tname in index_tensors_dict: - for step, itds in index_tensors_dict[tname].items(): - for worker in itds: - self._add_tensor( - int(step), - worker, - itds[worker].get("tensor_location", None), - itds[worker].get("tensor_shape", None), - ) - - def _add_tensor( - self, step_num, worker, tensor_location: TensorLocation, tensor_shape: TensorShape - ): + def _add_tensor(self, step_num, worker, tensor_object: TensorLocation): is_reduction = False - if tensor_location is not None: - tensorname = tensor_location.tensorname - mode = tensor_location.mode - mode_step = tensor_location.mode_step - elif tensor_shape is not None: - tensorname = tensor_shape.name - mode = tensor_shape.mode - mode_step = tensor_shape.mode_step - else: - raise RuntimeError("both tensor_location and tensor_shape can't be None") - - if REDUCTIONS_PREFIX in tensorname: - tensorname, red_name, abs = reverse_reduction_tensor_name(tensorname) + if REDUCTIONS_PREFIX in tensor_object.tensorname: + tname, red_name, abs = reverse_reduction_tensor_name(tensor_object.tensorname) + tensor_object.tensorname = tname is_reduction = True + else: + tname = tensor_object.tensorname - if tensorname not in self._tensors: - tensor = Tensor(tensorname, trial=self, cache=self.cache) - self._tensors[tensorname] = tensor + if tname not in self._tensors: + tensor = Tensor(tname, trial=self, cache=self.cache) + self._tensors[tname] = tensor - tensor = self._tensors[tensorname] + tensor = self._tensors[tname] if is_reduction: - tensor.add_reduction_step(mode, mode_step, worker, red_name, abs, tensor_location) + tensor.add_reduction_step( + tensor_object.mode, tensor_object.mode_step, worker, red_name, abs, tensor_object + ) else: - # shape can only be passed for actual tensor, not reductions - tensor.add_step(mode, mode_step, worker, tensor_location, tensor_shape) + tensor.add_step(tensor_object.mode, tensor_object.mode_step, worker, tensor_object) - self._populate_step_dict(mode, mode_step, step_num) - self._populate_global_step_to_tensor_name_map(tensorname, step_num) + self._populate_step_dict(tensor_object, step_num) + self._populate_global_step_to_tensor_name_map(tensor_object, step_num) self._populate_workers_for_global_step(step_num, worker) - self._populate_mode_to_tensor_name_map(tensorname, mode) + self._populate_mode_to_tensor_name_map(tensor_object) def _tensors_matching_regex(self, regex_list) -> set: matched_tensornames = set() @@ -580,6 +560,10 @@ def has_passed_step(self, step, mode=ModeKeys.GLOBAL) -> StepState: return StepState.UNAVAILABLE return StepState.NOT_YET_AVAILABLE + def _load_tensors(self): + if self.index_mode: + self._load_tensors_from_index_files() + def _update_last_index_token(self, new_index_token: str) -> None: """ This function updates the last_index_token in the following scenarios: @@ -641,7 +625,15 @@ def _update_last_index_token(self, new_index_token: str) -> None: f"Updating last_complete_step to: {self.last_complete_step}. " ) - def refresh_data(self): + def _load_tensors_from_index_files(self): + self.index_tensors_dict, new_index_token = self.index_reader.load_tensor_data_from_index_files( + start_after_key=self.last_index_token, range_steps=self.range_steps + ) + self._load_tensors_from_index_tensors(self.index_tensors_dict) + if new_index_token: # new index token can be None if there are no new index files + self._update_last_index_token(new_index_token) + + def refresh_tensors(self): # TODO if job finished if self.index_mode: index_tensors_dict, new_index_token = self.index_reader.load_tensor_data_from_index_files( diff --git a/tests/mxnet/test_hook_reduce_config.py b/tests/mxnet/test_hook_reduce_config.py index 46476414d..245a16a80 100644 --- a/tests/mxnet/test_hook_reduce_config.py +++ b/tests/mxnet/test_hook_reduce_config.py @@ -2,9 +2,6 @@ import shutil from datetime import datetime -# Third Party -from tests.utils import verify_shapes - # First Party from smdebug.mxnet import ReductionConfig, SaveConfig from smdebug.mxnet.hook import Hook as t_hook @@ -27,7 +24,6 @@ def test_save_config(hook=None, out_dir=None): hook = t_hook( out_dir=out_dir, save_config=global_save_config, - save_all=True, include_collections=[ "weights", "biases", @@ -86,22 +82,6 @@ def test_save_config(hook=None, out_dir=None): shutil.rmtree(out_dir) -def test_save_shapes(out_dir): - global_reduce_config = ReductionConfig(save_shape=True) - global_save_config = SaveConfig(save_steps=[0, 1]) - - hook = t_hook( - out_dir=out_dir, - save_config=global_save_config, - save_all=True, - reduction_config=global_reduce_config, - ) - run_mnist_gluon_model(hook=hook, num_steps_train=5) - verify_shapes(out_dir, 0) - verify_shapes(out_dir, 1) - shutil.rmtree(out_dir) - - def test_save_config_hook_from_json(): from smdebug.core.json_config import CONFIG_FILE_PATH_ENV_STR import os diff --git a/tests/pytorch/test_json_configs/test_hook_save_shape.json b/tests/pytorch/test_json_configs/test_hook_save_shape.json deleted file mode 100644 index 166051af4..000000000 --- a/tests/pytorch/test_json_configs/test_hook_save_shape.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "S3Path": "s3://kjndjknd_bucket/prefix", - "LocalPath": "/tmp/test_output/test_hook_save_shape/jsonloading", - "HookParameters": { - "save_all": true, - "save_shape": true, - "save_steps": "0,1" - } - } diff --git a/tests/pytorch/test_reduce_config.py b/tests/pytorch/test_reduce_config.py index de97b30da..230e0410c 100644 --- a/tests/pytorch/test_reduce_config.py +++ b/tests/pytorch/test_reduce_config.py @@ -5,10 +5,7 @@ # Third Party import torch -import torch.nn as nn -import torch.nn.functional as F import torch.optim as optim -from tests.utils import verify_shapes # First Party from smdebug.pytorch import ReductionConfig, SaveConfig @@ -89,85 +86,6 @@ def test_reduce_config(hook=None, out_dir=None): shutil.rmtree(out_dir) -def test_save_shapes(hook=None, out_dir=None): - class ChildA(nn.Module): - def __init__(self): - super(ChildA, self).__init__() - self.child2 = ChildB() - self.relu0 = nn.ReLU() - - def forward(self, x): - return self.relu0(self.child2(x)) - - class ChildB(nn.Module): - def __init__(self): - super(ChildB, self).__init__() - self.conv1 = nn.Conv2d(1, 20, 5, 1) - - def forward(self, x): - return self.conv1(x) - - class NestedNet(nn.Module): - def __init__(self): - super(NestedNet, self).__init__() - self.child1 = ChildA() - self.max_pool = nn.MaxPool2d(2, stride=2) - self.conv2 = nn.Conv2d(20, 50, 5, 1) - relu_module = nn.ReLU() - self.relu1 = nn.ReLU() - self.max_pool2 = nn.MaxPool2d(2, stride=2) - self.fc1 = nn.Linear(4 * 4 * 50, 500) - self.relu2 = nn.ReLU() - self.fc2 = nn.Linear(500, 10) - - def forward(self, x): - x = self.child1(x) - x = self.max_pool(x) - x = self.relu1(self.conv2(x)) - x = self.max_pool2(x) - x = x.view(-1, 4 * 4 * 50) - x = self.relu2(self.fc1(x)) - x = self.fc2(x) - return F.log_softmax(x, dim=1) - - hook_created = False - if hook is None: - global_reduce_config = ReductionConfig(save_shape=True) - global_save_config = SaveConfig(save_steps=[0]) - - run_id = "trial_" + datetime.now().strftime("%Y%m%d-%H%M%S%f") - out_dir = "/tmp/" + run_id - hook = t_hook( - out_dir=out_dir, - save_config=global_save_config, - save_all=True, - reduction_config=global_reduce_config, - ) - hook_created = True - - model = NestedNet().to(torch.device("cpu")) - hook.register_module(model) - optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9) - train(model, hook, torch.device("cpu"), optimizer, num_steps=10) - # different versions seem to output different number of loss tensors - verify_shapes(out_dir, 0) - if hook_created: - shutil.rmtree(out_dir) - - -def test_save_shapes_json(): - from smdebug.core.json_config import CONFIG_FILE_PATH_ENV_STR - - out_dir = "/tmp/test_output/test_hook_save_shape/jsonloading" - shutil.rmtree(out_dir, True) - os.environ[ - CONFIG_FILE_PATH_ENV_STR - ] = "tests/pytorch/test_json_configs/test_hook_save_shape.json" - hook = t_hook.create_from_json_file() - test_save_shapes(hook=hook, out_dir=out_dir) - shutil.rmtree(out_dir, True) - - # Test creating hook by loading the json file with reduction configs. def test_reduce_config_with_json(): from smdebug.core.json_config import CONFIG_FILE_PATH_ENV_STR diff --git a/tests/tensorflow/hooks/test_estimator_modes.py b/tests/tensorflow/hooks/test_estimator_modes.py index b7de19d6e..e7bd40945 100644 --- a/tests/tensorflow/hooks/test_estimator_modes.py +++ b/tests/tensorflow/hooks/test_estimator_modes.py @@ -18,7 +18,6 @@ import pytest import tensorflow as tf from tests.analysis.utils import delete_s3_prefix -from tests.utils import verify_shapes # First Party import smdebug.tensorflow as smd @@ -31,12 +30,10 @@ def help_test_mnist( path, save_config=None, - reduction_config=None, hook=None, set_modes=True, num_steps=10, num_eval_steps=None, - save_all=False, steps=None, include_collections=None, ): @@ -128,11 +125,7 @@ def cnn_model_fn(features, labels, mode): if include_collections is None: include_collections = ["weights", "gradients", "default", "losses"] hook = smd.SessionHook( - out_dir=trial_dir, - save_config=save_config, - include_collections=include_collections, - save_all=save_all, - reduction_config=reduction_config, + out_dir=trial_dir, save_config=save_config, include_collections=include_collections ) if num_eval_steps is None: @@ -194,29 +187,6 @@ def test_mnist(out_dir, on_s3=False): helper_test_mnist_trial(out_dir) -@pytest.mark.slow # 0:02 to run -def test_mnist_shapes(out_dir, on_s3=False): - if on_s3: - run_id = "trial_" + datetime.now().strftime("%Y%m%d-%H%M%S%f") - bucket = "smdebug-testing" - prefix = "outputs/hooks/estimator_modes/" + run_id - out_dir = f"s3://{bucket}/{prefix}" - help_test_mnist( - out_dir, - save_all=True, - save_config=smd.SaveConfig(save_steps=[0]), - num_steps=1, - steps=None, - reduction_config=smd.ReductionConfig(save_shape=True), - ) - verify_shapes(out_dir, 0) - - -@pytest.mark.slow # 0:02 to run -def test_mnist_shapes_s3(out_dir): - test_mnist_shapes(out_dir, on_s3=True) - - @pytest.mark.slow # 0:02 to run def test_mnist_local_json(out_dir, monkeypatch): monkeypatch.setenv( diff --git a/tests/tensorflow/hooks/test_reductions.py b/tests/tensorflow/hooks/test_reductions.py index e009f4565..4fde66a49 100644 --- a/tests/tensorflow/hooks/test_reductions.py +++ b/tests/tensorflow/hooks/test_reductions.py @@ -1,8 +1,5 @@ # Standard Library -# Third Party -from tests.utils import verify_shapes - # First Party import smdebug.tensorflow as smd from smdebug.core.json_config import CONFIG_FILE_PATH_ENV_STR @@ -60,19 +57,6 @@ def test_reductions(out_dir, save_raw_tensor=False): helper_test_reductions(out_dir, hook, save_raw_tensor) -def test_shapes(out_dir, save_raw_tensor=False): - pre_test_clean_up() - rdnc = smd.ReductionConfig(save_shape=True, save_raw_tensor=save_raw_tensor) - hook = smd.SessionHook( - out_dir=out_dir, - save_config=smd.SaveConfig(save_interval=1), - reduction_config=rdnc, - include_collections=["weights", "gradients", "losses"], - ) - simple_model(hook) - verify_shapes(out_dir, 0) - - def test_reductions_with_raw_tensor(out_dir): test_reductions(out_dir, save_raw_tensor=True) diff --git a/tests/tensorflow/keras/test_keras.py b/tests/tensorflow/keras/test_keras.py index bfd5e7cc7..2fcb379f6 100644 --- a/tests/tensorflow/keras/test_keras.py +++ b/tests/tensorflow/keras/test_keras.py @@ -5,7 +5,6 @@ import pytest import tensorflow as tf from tests.tensorflow.utils import create_trial_fast_refresh -from tests.utils import verify_shapes # First Party from smdebug.core.access_layer import has_training_ended @@ -225,20 +224,6 @@ def test_tf_keras(out_dir): exhaustive_check(out_dir, True) -@pytest.mark.slow # 0:07 to run -def test_tf_keras_shapes(out_dir): - train_model( - out_dir, - save_all=True, - reduction_config=ReductionConfig(save_shape=True), - use_tf_keras=True, - save_config=SaveConfig(save_steps=[0, 10]), - eager=False, - steps=["train", "eval", "predict", "train"], - ) - verify_shapes(out_dir, 0) - - @pytest.mark.slow # 0:03 to run def test_tf_keras_non_keras_opt(out_dir): include_collections = [ diff --git a/tests/tensorflow2/test_keras.py b/tests/tensorflow2/test_keras.py index ab799b4fa..089cc2d9e 100644 --- a/tests/tensorflow2/test_keras.py +++ b/tests/tensorflow2/test_keras.py @@ -16,7 +16,7 @@ from tests.constants import TEST_DATASET_S3_PATH from tests.tensorflow2.utils import is_tf_2_2, is_tf_2_3 from tests.tensorflow.utils import create_trial_fast_refresh -from tests.utils import use_s3_datasets, verify_shapes +from tests.utils import use_s3_datasets # First Party import smdebug.tensorflow as smd @@ -186,18 +186,6 @@ def helper_keras_gradtape( hook.close() -def test_keras_gradtape_shapes(out_dir): - hook = smd.KerasHook( - out_dir=out_dir, - save_all=True, - save_config=SaveConfig(save_steps=[0]), - reduction_config=ReductionConfig(save_shape=True), - ) - helper_keras_gradtape(trial_dir=out_dir, hook=hook) - verify_shapes(out_dir, 0) - verify_shapes(out_dir, 500) - - @pytest.mark.skip_if_non_eager @pytest.mark.slow @pytest.mark.parametrize("saveall", [True, False]) @@ -468,18 +456,6 @@ def test_keras_fit(out_dir, tf_eager_mode, saveall): assert trial.tensor(tname).value(0) is not None -def test_keras_fit_shapes(out_dir): - hook = smd.KerasHook( - out_dir=out_dir, - save_all=True, - save_config=SaveConfig(save_steps=[0]), - reduction_config=ReductionConfig(save_shape=True), - ) - helper_keras_fit(trial_dir=out_dir, hook=hook) - print(create_trial_fast_refresh(out_dir).tensor_names(step=0)) - verify_shapes(out_dir, 0) - - @pytest.mark.slow def test_base_reductions(out_dir, tf_eager_mode): helper_keras_fit( diff --git a/tests/tensorflow2/test_keras_mirrored.py b/tests/tensorflow2/test_keras_mirrored.py index 3f0bafe4f..d857218ab 100644 --- a/tests/tensorflow2/test_keras_mirrored.py +++ b/tests/tensorflow2/test_keras_mirrored.py @@ -13,7 +13,6 @@ from tests.core.utils import verify_files from tests.tensorflow2.utils import is_tf_2_2, is_tf_2_3 from tests.tensorflow.utils import create_trial_fast_refresh -from tests.utils import verify_shapes # First Party import smdebug.tensorflow as smd @@ -291,20 +290,6 @@ def test_save_all(out_dir, tf_eager_mode, workers): verify_files(out_dir, save_config, saved_scalars) -@pytest.mark.slow -def test_shapes(out_dir, tf_eager_mode): - strategy, _ = train_model( - out_dir, - save_all=True, - save_config=SaveConfig(save_steps=[0]), - reduction_config=ReductionConfig(save_shape=True), - steps=["train"], - eager=tf_eager_mode, - ) - multiworker = strategy.num_replicas_in_sync > 1 - verify_shapes(out_dir, 0, multiworker=multiworker) - - @pytest.mark.slow def test_base_reductions(out_dir, tf_eager_mode): train_model( diff --git a/tests/utils.py b/tests/utils.py index af827f264..d5db2a8ba 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -5,20 +5,16 @@ # Third Party import boto3 -import numpy as np from tests.constants import TEST_DATASET_S3_PATH -from tests.tensorflow.utils import create_trial_fast_refresh # First Party from smdebug.core.config_constants import ( CONFIG_FILE_PATH_ENV_STR, DEFAULT_SAGEMAKER_OUTDIR, DEFAULT_SAGEMAKER_TENSORBOARD_PATH, - DEFAULT_WORKER_NAME, TENSORBOARD_CONFIG_FILE_PATH_ENV_STR, ) from smdebug.core.utils import is_s3, remove_file_if_exists -from smdebug.exceptions import TensorUnavailableForStep def use_s3_datasets(): @@ -31,46 +27,6 @@ def use_s3_datasets(): return False -def is_scalar(x): - if isinstance(x, list): - if len(x) == 1: - return True - elif isinstance(x, np.ndarray): - return True - return False - - -def verify_shapes(out_dir, step_num, multiworker=False): - trial = create_trial_fast_refresh(out_dir) - for tname in trial.tensor_names(step=step_num): - tensor = trial.tensor(tname) - if multiworker is False: - assert isinstance(tensor.shape(step_num), tuple), (tname, tensor.shape(step_num)) - try: - if not is_scalar(tensor.value(step_num)): - # test did not save value except scalars which dont use reduction config - # so it should raise the below exception - assert False - except TensorUnavailableForStep: - pass - else: - workers = tensor.workers(step_num) - assert len(workers) > 1 - for w in workers: - try: - if not is_scalar(tensor.value(step_num, worker=w)): - # test did not save value so it should raise the below exception - assert False - except TensorUnavailableForStep: - pass - - assert isinstance(tensor.shape(step_num, worker=w), tuple), ( - tname, - w, - tensor.shape(step_num, worker=w), - ) - - class SagemakerSimulator(object): """ Creates an environment variable pointing to a JSON config file, and creates the config file.