Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Time ray data tasks (total time and udf time) #43241

Merged
merged 16 commits into from
Mar 4, 2024

Conversation

omatthew98
Copy link
Contributor

@omatthew98 omatthew98 commented Feb 17, 2024

Why are these changes needed?

This allows us to track both the total time spent in Ray Data tasks, and the time specifically spent in UDFs (primarily through map and filter functions applied to the dataset). An example output can be found below:

Operator 1 ReadRange->Map(sleep): 25 tasks executed, 25 blocks produced in 49.54s
* Remote wall time: 8.07s min, 20.68s max, 20.16s mean, 503.97s total
* Remote cpu time: 42.02ms min, 113.24ms max, 96.57ms mean, 2.41s total
* UDF time: 8.07s min, 20.67s max, 20.15s mean, 503.75s total
* Peak heap memory usage (MiB): 143890.62 min, 145140.62 max, 144508 mean
* Output num rows per block: 16 min, 41 max, 40 mean, 1000 total
* Output size bytes per block: 128 min, 328 max, 320 mean, 8000 total
* Output rows per task: 16 min, 41 max, 40 mean, 25 tasks used
* Tasks per node: 25 min, 25 max, 25 mean; 1 nodes used
* Extra metrics: {'num_inputs_received': 12, 'bytes_inputs_received': 30893, 'num_task_inputs_processed': 1, 'bytes_task_inputs_processed': 2574, 'bytes_inputs_of_submitted_tasks': 30893, 'num_task_outputs_generated': 1, 'bytes_task_outputs_generated': 328, 'rows_task_outputs_generated': 41, 'num_outputs_taken': 1, 'bytes_outputs_taken': 328, 'num_outputs_of_finished_tasks': 1, 'bytes_outputs_of_finished_tasks': 328, 'num_tasks_submitted': 12, 'num_tasks_running': 11, 'num_tasks_have_outputs': 1, 'num_tasks_finished': 1, 'num_tasks_failed': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 28319, 'obj_store_mem_freed': 2574, 'obj_store_mem_spilled': 0, 'block_generation_time': 20.659513541, 'total_data_tasks_time': 20.75380225, 'total_data_udfs_time': 0, 'cpu_usage': 11, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}

Dataset iterator time breakdown:
* Total time overall: 49.59s
    * Total time in Ray Data iterator initialization code: 20.79s
    * Total time user thread is blocked by Ray Data iter_batches: 28.79s
    * Total execution time for user thread: 2.71ms
* Batch iteration time breakdown (summed across prefetch threads):
    * In ray.get(): 52.83us min, 484.83us max, 144.99us avg, 3.62ms total
    * In batch creation: 10.71us min, 2.49ms max, 19.49us avg, 19.49ms total
    * In batch formatting: 8.38us min, 355.29us max, 17.13us avg, 17.13ms total

Code to generate the above:

import ray
import sys
import time
from ray.data import DataContext

context = DataContext.get_current()
context.verbose_stats_logs = True

def sleep(x):
    time.sleep(0.5)
    return x

num_rows = sys.argv[1] if len(sys.argv) > 1 else 10

# make this an actor class, don't pass a function
ds = ray.data.range(num_rows).map(sleep)

for _ in ds.iter_batches(batch_size=1):
    continue

print(ds.stats())

Related issue number

Closes #42801

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

jjyao and others added 3 commits February 15, 2024 14:09
Those methods have been deprecated for 3 years and they are DeveloperAPI so it's safe to remove them.

Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Matthew Owen <[email protected]>
Signed-off-by: Matthew Owen <[email protected]>
@scottjlee scottjlee self-assigned this Feb 17, 2024
@@ -112,6 +114,14 @@ class OpRuntimeMetrics:
default=0, metadata={"map_only": True, "export_metric": True}
)

# TODO(mowen): Is this actually different than the wall time already computed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you verify this TODO / is it resolved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some offline investigation, I noticed that this 1. very closely tracked the existing walltime stats and 2. was not properly being synced to the dataset stats (i.e. the time was being aggregated appropriately in the OpRuntimeMetrics object, but the dataset stats was not seeing the correct value). As such removing this for now, might revisit post GA.

total_data_tasks_time: float = field(default=0, metadata={"export_metric": False})

# Total time spent in UDFs
total_data_udfs_time: float = field(default=0, metadata={"export": False})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we change these export metric fields to True before merging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually going to remove this from OpRuntimeMetrics since it is being tracked in the BlockExecStats and then aggregated in the DatasetStats.

is_map=False,
extra_metrics=[
"'num_output_N': N",
],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated change?

Comment on lines 120 to 123
total_data_tasks_time: float = field(default=0, metadata={"export_metric": False})

# Total time spent in UDFs
total_data_udfs_time: float = field(default=0, metadata={"export": False})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since all the metrics are related to ray data, we can probably omit data from the name/output. how about total_ray_tasks_time and total_udfs_time?

@omatthew98 omatthew98 changed the title Time ray data tasks (total time and udf time) [Data] Time ray data tasks (total time and udf time) Feb 29, 2024
) -> Iterable[MapTransformFnData]:
while True:
try:
start = time.perf_counter()
Copy link
Contributor

@raulchen raulchen Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if _is_udf == False, we can skip creating the counter object to reduce overheads.
Also, I'm wondering maybe we should do timing at the MapTransformer level. So we don't need to wrap every subclass of MapTransformFn.
Basically, if the fn._is_udf = True, we create wrapper around it at runtime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to push the timing up into the MapTransformer level and it does seem much cleaner that way. The timing wrappers are only added if the the MapTransformerFn has _is_udf set to True.

@omatthew98 omatthew98 added the release-blocker P0 Issue that blocks the release label Mar 4, 2024
@omatthew98 omatthew98 requested a review from raulchen March 4, 2024 21:28
@raulchen raulchen merged commit be5acfb into ray-project:master Mar 4, 2024
8 of 9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-blocker P0 Issue that blocks the release
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Data] Measure time spent in Ray Data tasks
5 participants