Skip to content

Commit

Permalink
[Data] [hotfix] Fix test_stats (#33788)
Browse files Browse the repository at this point in the history
#33713 changed test_dataset_stats_basic to check for iterations stats when using the streaming executor.

#33620 changed the stats behavior for streaming executor and hadn't pulled in #33713 so test_stats was passing.

When both are merged in master, test_stats started failing. This PR fixes it.

---------

Signed-off-by: amogkam <[email protected]>
  • Loading branch information
amogkam authored Mar 28, 2023
1 parent 97f8ba1 commit 25d9917
Showing 1 changed file with 82 additions and 8 deletions.
90 changes: 82 additions & 8 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,46 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
stats = canonicalize(ds.cache().stats())

if context.new_execution_backend:
assert (
stats
== """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T
if context.use_streaming_executor:
assert (
stats
== """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \
'obj_store_mem_peak': N}
Stage N Map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \
'obj_store_mem_peak': N}
Dataset iterator time breakdown:
* Total time user code is blocked: T
* Total time in user code: T
* Total time overall: T
* Num blocks local: Z
* Num blocks remote: Z
* Num blocks unknown location: N
* Batch iteration time breakdown (summed across prefetch threads):
* In ray.get(): T min, T max, T avg, T total
* In batch creation: T min, T max, T avg, T total
* In batch formatting: T min, T max, T avg, T total
"""
)
else:
assert (
stats
== """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
Expand Down Expand Up @@ -154,11 +191,48 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
* In user code: T
* Total time: T
"""
)
)
else:
assert (
stats
== """Stage N Read->MapBatches(dummy_map_batches): N/N blocks executed in T
if context.use_streaming_executor:
assert (
stats
== """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \
'obj_store_mem_peak': N}
Stage N Map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
* Output num rows: N min, N max, N mean, N total
* Output size bytes: N min, N max, N mean, N total
* Tasks per node: N min, N max, N mean; N nodes used
* Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \
'obj_store_mem_peak': N}
Dataset iterator time breakdown:
* Total time user code is blocked: T
* Total time in user code: T
* Total time overall: T
* Num blocks local: Z
* Num blocks remote: Z
* Num blocks unknown location: N
* Batch iteration time breakdown (summed across prefetch threads):
* In ray.get(): T min, T max, T avg, T total
* In batch creation: T min, T max, T avg, T total
* In batch formatting: T min, T max, T avg, T total
"""
)
else:
assert (
stats
== """Stage N Read->MapBatches(dummy_map_batches): N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Peak heap memory usage (MiB): N min, N max, N mean
Expand All @@ -182,7 +256,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
* In user code: T
* Total time: T
"""
)
)


def test_dataset_stats_shuffle(ray_start_regular_shared):
Expand Down

0 comments on commit 25d9917

Please sign in to comment.