From 6b203f79457d78d5735a513721ab8181ddee9f1e Mon Sep 17 00:00:00 2001 From: Connor Date: Tue, 23 Apr 2024 10:21:38 -0700 Subject: [PATCH] commit changes --- .../data/datasource/file_based_datasource.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/python/ray/data/datasource/file_based_datasource.py b/python/ray/data/datasource/file_based_datasource.py index 94a31bdf1fec..abb85c84c48d 100644 --- a/python/ray/data/datasource/file_based_datasource.py +++ b/python/ray/data/datasource/file_based_datasource.py @@ -50,6 +50,10 @@ ) from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI +from ray.util.metrics import Histogram + +import time + if TYPE_CHECKING: import pandas as pd import pyarrow @@ -163,6 +167,12 @@ def __init__( self._ignore_missing_paths = ignore_missing_paths self._include_paths = include_paths paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem) + + self.read_task_generation_latency = Histogram( + "read_tasks_generation_latency", + description="Latencies of requests in ms." + ) + paths, file_sizes = map( list, zip( @@ -233,6 +243,7 @@ def estimate_inmemory_data_size(self) -> Optional[int]: total_size += sz return total_size + # Need to understand how long this takes in total def get_read_tasks(self, parallelism: int) -> List[ReadTask]: import numpy as np @@ -272,6 +283,7 @@ def read_files( parse = PathPartitionParser(partitioning) partitions = parse(read_path) + # Put histogram here to see how long it takes to convert to pyarrow.NativeFile with _open_file_with_retry( read_path, lambda: open_input_source(fs, read_path, **open_stream_args), @@ -312,7 +324,11 @@ def read_task_fn(): # fix https://github.com/ray-project/ray/issues/24296 parallelism = min(parallelism, len(paths)) + # This happens at the start, which is why the starting of a given set takes so long + # Could this itself be parallelized? read_tasks = [] + print(f"[Connor] My number of read paths are: {len(read_tasks)}") + start = time.time() for read_paths, file_sizes in zip( np.array_split(paths, parallelism), np.array_split(file_sizes, parallelism) ): @@ -332,6 +348,9 @@ def read_task_fn(): read_tasks.append(read_task) + latency = time.time() - start + print(f"[Connor] Latency to generate read tasks in seconds is: {latency}") + self.read_task_generation_latency.observe(1000 * latency) return read_tasks def _open_input_source(