Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial stats framework for datasets #20867

Merged
merged 38 commits into from
Dec 9, 2021
Merged

Initial stats framework for datasets #20867

merged 38 commits into from
Dec 9, 2021

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Dec 3, 2021

Why are these changes needed?

This adds an initial Dataset.stats() framework for debugging dataset performance. At a high level, execution stats for tasks (e.g., CPU time) are attached to block metadata objects. Datasets have stats objects that hold references to these stats and parent dataset stats (this avoids stats holding references to parent datasets, allowing them to be gc'ed). Similarly, DatasetPipelines hold stats from recently computed datasets.

Currently only basic ops like map / map_batches are instrumented. TODO placeholders are left for future PRs.

In addition, we also collect statistics about iterator timings (time spent waiting / processing / in user code). Here's a sample output of getting stats in one of the most advanced use cases: iterating over a split of a dataset pipeline in a remote task:

import ray
import time

def pause(x):
    time.sleep(.0001)
    return x

ds = ray.data.range(10000)
ds = ds.map(lambda x: str(x + 1))

pipe = ds.repeat(5).map(pause).random_shuffle_each_window()

@ray.remote
def consume(p, stats=False):
    for x in p.iter_batches():
        pass
    if stats:
        print(p.stats())

a, b = pipe.split(2)
ray.get([consume.remote(a), consume.remote(b, True)])
(consume pid=723600) == Pipeline Window 2 ==
(consume pid=723600) Stage 0 read: 200/200 blocks executed in 0.13s
(consume pid=723600) * Wall time: 117.63us min, 6.6ms max, 413.57us mean, 82.71ms total
(consume pid=723600) * CPU time: 116.3us min, 6.48ms max, 380.74us mean, 76.15ms total
(consume pid=723600) * Output num rows: 50 min, 50 max, 50 mean, 10000 total
(consume pid=723600) * Output size bytes: 456 min, 456 max, 456 mean, 91200 total
(consume pid=723600) * Tasks per node: 200 min, 200 max, 200 mean; 1 nodes used
(consume pid=723600) 
(consume pid=723600) Stage 1 map: 200/200 blocks executed in 0.3s
(consume pid=723600) * Wall time: 294.12us min, 2.55ms max, 918.35us mean, 183.67ms total
(consume pid=723600) * CPU time: 292.68us min, 829.32us max, 554.42us mean, 110.88ms total
(consume pid=723600) * Output num rows: 50 min, 50 max, 50 mean, 10000 total
(consume pid=723600) * Output size bytes: 456 min, 456 max, 456 mean, 91200 total
(consume pid=723600) * Tasks per node: 200 min, 200 max, 200 mean; 1 nodes used
(consume pid=723600) 
(consume pid=723600) Stage 2 map: 200/200 blocks executed in 0.41s
(consume pid=723600) * Wall time: 8.06ms min, 18.29ms max, 9.37ms mean, 1.87s total
(consume pid=723600) * CPU time: 572.05us min, 3.43ms max, 992.16us mean, 198.43ms total
(consume pid=723600) * Output num rows: 50 min, 50 max, 50 mean, 10000 total
(consume pid=723600) * Output size bytes: 456 min, 456 max, 456 mean, 91200 total
(consume pid=723600) * Tasks per node: 200 min, 200 max, 200 mean; 1 nodes used
(consume pid=723600) 
(consume pid=723600) Stage 3 random_shuffle_TODO: 0/0 blocks executed in -1s
(consume pid=723600) 
(consume pid=723600) Dataset iterator time breakdown:
(consume pid=723600) * In ray.wait(): 1.85ms
(consume pid=723600) * In format_batch(): 15.28ms
(consume pid=723600) * In user code: 429.99us
(consume pid=723600) * Total time: 18.07ms
(consume pid=723600) 
(consume pid=723600) == Pipeline Window 3 ==
(consume pid=723600) Stage 0 read: [execution cached]
(consume pid=723600) Stage 1 map: [execution cached]
(consume pid=723600) Stage 2 map: 200/200 blocks executed in 0.46s
(consume pid=723600) * Wall time: 7.97ms min, 27.64ms max, 9.79ms mean, 1.96s total
(consume pid=723600) * CPU time: 592.86us min, 1.75ms max, 1.01ms mean, 201.85ms total
(consume pid=723600) * Output num rows: 50 min, 50 max, 50 mean, 10000 total
(consume pid=723600) * Output size bytes: 456 min, 456 max, 456 mean, 91200 total
(consume pid=723600) * Tasks per node: 200 min, 200 max, 200 mean; 1 nodes used
(consume pid=723600) 
(consume pid=723600) Stage 3 random_shuffle_TODO: 0/0 blocks executed in -1s
(consume pid=723600) 
(consume pid=723600) Dataset iterator time breakdown:
(consume pid=723600) * In ray.wait(): 1.2ms
(consume pid=723600) * In format_batch(): 10.03ms
(consume pid=723600) * In user code: 292.8us
(consume pid=723600) * Total time: 11.86ms
(consume pid=723600) 
(consume pid=723600) == Pipeline Window 4 ==
(consume pid=723600) Stage 0 read: [execution cached]
(consume pid=723600) Stage 1 map: [execution cached]
(consume pid=723600) Stage 2 map: 200/200 blocks executed in 0.42s
(consume pid=723600) * Wall time: 8.04ms min, 16.93ms max, 9.48ms mean, 1.9s total
(consume pid=723600) * CPU time: 662.75us min, 3.33ms max, 972.26us mean, 194.45ms total
(consume pid=723600) * Output num rows: 50 min, 50 max, 50 mean, 10000 total
(consume pid=723600) * Output size bytes: 456 min, 456 max, 456 mean, 91200 total
(consume pid=723600) * Tasks per node: 200 min, 200 max, 200 mean; 1 nodes used
(consume pid=723600) 
(consume pid=723600) Stage 3 random_shuffle_TODO: 0/0 blocks executed in -1s
(consume pid=723600) 
(consume pid=723600) Dataset iterator time breakdown:
(consume pid=723600) * In ray.wait(): 1.18ms
(consume pid=723600) * In format_batch(): 9.98ms
(consume pid=723600) * In user code: 284.75us
(consume pid=723600) * Total time: 11.78ms
(consume pid=723600) 
(consume pid=723600) ##### Overall Pipeline Time Breakdown #####
(consume pid=723600) * Time stalled waiting for next dataset: 2.74ms min, 701.09ms max, 491.05ms mean, 1.96s total
(consume pid=723600) * Time in dataset iterator: 315.69ms
(consume pid=723600) * Time in user code: 1.21ms
(consume pid=723600) * Total time: 5.0s

@@ -161,9 +164,40 @@ def read_datasource(datasource: Datasource[T],
read_tasks = datasource.prepare_read(parallelism, **read_args)
context = DatasetContext.get_current()

def remote_read(task: ReadTask) -> MaybeBlockPartition:
@ray.remote(num_cpus=0)
class StatsActor:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One sad thing is I ended up making a StatsActor for lazy block list (for datasource reads).

Alternatively we could refactor LazyBlockList to somehow get these stats to the parent dataset, but it seems tricky.

@ericl ericl changed the title [WIP] Initial stats framework for datasets Initial stats framework for datasets Dec 4, 2021
python/ray/data/dataset.py Outdated Show resolved Hide resolved

block_window = [] # Handle empty sliding window gracefully.
for block_window in _sliding_window(self._blocks.iter_blocks(),
prefetch_blocks + 1):
block_window = list(block_window)
wait_start = time.monotonic()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One random idea for this measuring time pattern: we can create a scoped timer and use the with statement:

with Timer(lambda second: _stats.iter_wait_s += second) as t:
   // code we want to measure

Sadly we cannot pass int by reference so I'm using a lambda here. Although seems it's not flexible enough to cover all use cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, but I actually think that you can just add this to the stats API, where you have DatasetsStats.measure_*() methods that return context managers that measure and store the execution time:

        with self._stats.measure_iter_total():
            def format_batch(batch: Block, format: str) -> BatchType:
                if batch_format == "native":
                    return batch
                elif batch_format == "pandas":
                    batch = BlockAccessor.for_block(batch)
                    return batch.to_pandas()
                elif batch_format == "pyarrow":
                    batch = BlockAccessor.for_block(batch)
                    return batch.to_arrow()
                else:
                    raise ValueError(
                        f"The given batch format: {batch_format} "
                        f"is invalid. Supported batch type: {BatchType}")

            batcher = Batcher(batch_size=batch_size)
            def batch_block(block: ObjectRef[Block]):
                with self._stats.measure_iter_process():
                    block = ray.get(block)
                    batcher.add(block)
                while batcher.has_batch():
                    process_start = time.monotonic()
                    with self._stats.measure_iter_process():
                        result = format_batch(batcher.next_batch(), batch_format)
                    with self._stats.measure_iter_user():
                        yield result

            block_window = []  # Handle empty sliding window gracefully.
            for block_window in _sliding_window(self._blocks.iter_blocks(),
                                                prefetch_blocks + 1):
                block_window = list(block_window)
                with self._stats.measure_iter_wait():
                    ray.wait(block_window, num_returns=1, fetch_local=True)
                yield from batch_block(block_window[0])

            # Consume remainder of final block window.
            for block in block_window[1:]:
                yield from batch_block(block)

            # Yield any remainder batches.
            if batcher.has_any() and not drop_last:
                with self._stats.measure_iter_user():
                    yield format_batch(batcher.next_batch(), batch_format)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if self.iter_total_s or self.iter_wait_s or self.iter_process_s:
out += "\nDataset iterator time breakdown:\n"
out += "* In ray.wait(): {}\n".format(fmt(self.iter_wait_s))
out += "* In format_batch(): {}\n".format(fmt(self.iter_process_s))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel its a bit confusing, if it's just format_batch, maybe rename to iter_format_batch_s?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return self.metadata, self.last_time - self.start_time


class DatasetStats:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a high level question: should we have per dataset stats or per operation (e.g. map) stats?

IIUC,for dataset.map(), the iterator time is recorded in the parent DatasetStats but the BlockExecStats is recorded in the child DatasetStats?

Should stats be operation centric:

dataset.map(..., stats: Optional[Stats])
print(stats)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally each operation == a dataset.

IIUC,for dataset.map(), the iterator time is recorded in the parent DatasetStats but the BlockExecStats is recorded in the child DatasetStats?

That's not the case. Iterator time is only relevant for the last dataset in a chain of transforms. Stats are always recorded for the child dataset.

Should stats be operation centric:

I don't think this is usable since this makes stats recording very difficult for long pipelines.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I think I misunderstand your code.

python/ray/data/block.py Show resolved Hide resolved
@@ -1836,6 +1846,8 @@ def iter_batches(self,
A list of iterators over record batches.
"""

time_start = time.monotonic()
Copy link
Contributor

@clarkzinzow clarkzinzow Dec 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's your reasoning for using time.monotonic() for measuring execution time? Do we care about monotonicity more than small interval accuracy?

I've generally used timeit.default_timer(), which is the recommended timer API by the Python standard (currently points to time.perf_counter()). Beyond time.perf_counter() being the generally recommended timer for instrumenting execution, time.monotonic()'s resolution for short timing intervals can be far worse than time.perf_counter() on some platforms (like Windows) since it will choose monotonicity over clock resolution if needed. This is generally why time.monotonic() is primarily used for implementing event scheduling and timeouts that are resilient to system time updates while time.perf_counter() is used for benchmarking. On MacOS and Linux, time.perf_counter() is high resolution and monotonic, and is the same as time.monotonic().

From the PEP introducing these time APIs:

time.monotonic(): timeout and scheduling, not affected by system clock updates
time.perf_counter(): benchmarking, most precise clock for short period
time.process_time(): profiling, CPU time of the process

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me change it.


block_window = [] # Handle empty sliding window gracefully.
for block_window in _sliding_window(self._blocks.iter_blocks(),
prefetch_blocks + 1):
block_window = list(block_window)
wait_start = time.monotonic()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, but I actually think that you can just add this to the stats API, where you have DatasetsStats.measure_*() methods that return context managers that measure and store the execution time:

        with self._stats.measure_iter_total():
            def format_batch(batch: Block, format: str) -> BatchType:
                if batch_format == "native":
                    return batch
                elif batch_format == "pandas":
                    batch = BlockAccessor.for_block(batch)
                    return batch.to_pandas()
                elif batch_format == "pyarrow":
                    batch = BlockAccessor.for_block(batch)
                    return batch.to_arrow()
                else:
                    raise ValueError(
                        f"The given batch format: {batch_format} "
                        f"is invalid. Supported batch type: {BatchType}")

            batcher = Batcher(batch_size=batch_size)
            def batch_block(block: ObjectRef[Block]):
                with self._stats.measure_iter_process():
                    block = ray.get(block)
                    batcher.add(block)
                while batcher.has_batch():
                    process_start = time.monotonic()
                    with self._stats.measure_iter_process():
                        result = format_batch(batcher.next_batch(), batch_format)
                    with self._stats.measure_iter_user():
                        yield result

            block_window = []  # Handle empty sliding window gracefully.
            for block_window in _sliding_window(self._blocks.iter_blocks(),
                                                prefetch_blocks + 1):
                block_window = list(block_window)
                with self._stats.measure_iter_wait():
                    ray.wait(block_window, num_returns=1, fetch_local=True)
                yield from batch_block(block_window[0])

            # Consume remainder of final block window.
            for block in block_window[1:]:
                yield from batch_block(block)

            # Yield any remainder batches.
            if batcher.has_any() and not drop_last:
                with self._stats.measure_iter_user():
                    yield format_batch(batcher.next_batch(), batch_format)

python/ray/data/impl/stats.py Show resolved Hide resolved
Comment on lines 229 to 230
# Drop the first sample since there's no pipelining there.
wait_time_s = self.wait_time_s[1:]
Copy link
Contributor

@clarkzinzow clarkzinzow Dec 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might surprise users, who may be trying to figure out why their pipeline is slow when it ends up being unavoidable latency of waiting for the first dataset in the pipeline. Could we add a flag to .summary_string() for including/excluding the first dataset's wait time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the exclude_first_window argument isn't used, probably forgot an if statement here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh 🤦 , fixed.

@ericl
Copy link
Contributor Author

ericl commented Dec 8, 2021

Pushed a change to re-use the stats actor, since it was slowing down tests a lot (more than 2x) (but this is not generally a problem in real usage). It's a bit tricky since we also re-start Ray a bunch in tests so have to properly re-generate the actor in these cases.

@ericl ericl added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Dec 8, 2021
@clarkzinzow clarkzinzow self-requested a review December 8, 2021 06:44
Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. Should we add a docs section for interpreting these stats, or save that for a future PR?

with self._stats.iter_user_s.timer():
yield result

self._stats.iter_total_s.add(time.perf_counter() - time_start)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for not using the .timer() context manager for this one? Seems like all of this could be under that context manager:

with self._stats.iter_total_s.timer():
    def format_batch(batch: Block, format: str) -> BatchType:
        ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah it was too indented. Didn't seem worth the tradeoff!

yield batch
wait_start = time.perf_counter()

self._stats.iter_total_s.add(time.perf_counter() - time_start)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here: any reason for not using the .timer() context manager for this one? Seems like all of this could be under that context manager.

with self._stats.iter_total_s.timer():
    for ds in self.iter_datasets():
        ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

# TODO(ekl) add a builder to fill these out in a simpler way.
def __init__(self):
self.wall_time_s: Optional[float] = None
self.cpu_time_s: Optional[float] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for not using the Timer context manager for these as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to address in a future PR (adding a builder).

Copy link
Collaborator

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will continue reviewing it this afternoon.

python/ray/data/dataset.py Show resolved Hide resolved
@@ -1836,6 +1846,8 @@ def iter_batches(self,
A list of iterators over record batches.
"""

time_start = time.perf_counter()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the entire code under a with statement: self._stats.iter_total_s.timer()?

@@ -89,6 +91,8 @@ def __next__(self):
except StopIteration:
pass

self._pipeline._stats.wait_time_s.append(time.perf_counter() - start)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too much indentation.

DatasetContext._set_current(context)
return task()
start_time, start_cpu = time.perf_counter(), time.process_time()
exec_stats = BlockExecStats()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe with exec_stats.timer() for both cpu and wall time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a TODO to add a builder here.

@ericl
Copy link
Contributor Author

ericl commented Dec 8, 2021

Yeah for now I think we shouldn't document, as it's pretty incomplete.

return str(round(seconds * 1000 * 1000, 2)) + "us"


class Timer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That one seems a bit different (more like a window gauge).

@ericl ericl merged commit 22ccc6b into ray-project:master Dec 9, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants