Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple metric to measure file read latency #4

Draft
wants to merge 1 commit into
base: pinterest/main-2.9.3
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
)
from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI

from ray.util.metrics import Histogram

import time

if TYPE_CHECKING:
import pandas as pd
import pyarrow
Expand Down Expand Up @@ -163,6 +167,12 @@ def __init__(
self._ignore_missing_paths = ignore_missing_paths
self._include_paths = include_paths
paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem)

self.read_task_generation_latency = Histogram(
"read_tasks_generation_latency",
description="Latencies of requests in ms."
)

paths, file_sizes = map(
list,
zip(
Expand Down Expand Up @@ -233,6 +243,7 @@ def estimate_inmemory_data_size(self) -> Optional[int]:
total_size += sz
return total_size

# Need to understand how long this takes in total
def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
import numpy as np

Expand Down Expand Up @@ -272,6 +283,7 @@ def read_files(
parse = PathPartitionParser(partitioning)
partitions = parse(read_path)

# Put histogram here to see how long it takes to convert to pyarrow.NativeFile
with _open_file_with_retry(
read_path,
lambda: open_input_source(fs, read_path, **open_stream_args),
Expand Down Expand Up @@ -312,7 +324,11 @@ def read_task_fn():
# fix https://github.com/ray-project/ray/issues/24296
parallelism = min(parallelism, len(paths))

# This happens at the start, which is why the starting of a given set takes so long
# Could this itself be parallelized?
read_tasks = []
print(f"[Connor] My number of read paths are: {len(read_tasks)}")
start = time.time()
for read_paths, file_sizes in zip(
np.array_split(paths, parallelism), np.array_split(file_sizes, parallelism)
):
Expand All @@ -332,6 +348,9 @@ def read_task_fn():

read_tasks.append(read_task)

latency = time.time() - start
print(f"[Connor] Latency to generate read tasks in seconds is: {latency}")
self.read_task_generation_latency.observe(1000 * latency)
return read_tasks

def _open_input_source(
Expand Down
Loading