Skip to content

Commit

Permalink
[data] Add usage tag for which block formats are used (ray-project#34384
Browse files Browse the repository at this point in the history
)

Signed-off-by: Jack He <[email protected]>
  • Loading branch information
ericl authored and ProjectsByJackHe committed May 4, 2023
1 parent e3bed7a commit c9263f2
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 13 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import ray
from ray.data._internal.logical.optimizers import get_execution_plan
from ray.data._internal.logical.util import record_operators_usage
from ray.data._internal.usage import record_operators_usage
from ray.data.context import DataContext
from ray.types import ObjectRef
from ray.data.block import Block, BlockMetadata, List
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from typing import Dict
from typing import Dict, TYPE_CHECKING
import json
import threading

from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.write_operator import Write

if TYPE_CHECKING:
from ray.data._internal.logical.interfaces import LogicalOperator

# Guards the below dicts.
_recording_lock = threading.Lock()
# The dictionary for the operator name and count.
_recorded_operators = dict()
_recorded_operators_lock = threading.Lock()
# The dictionary for the block format name and count.
_recorded_block_formats = dict()

# The white list of operator names allowed to be recorded.
_op_name_white_list = [
Expand Down Expand Up @@ -59,12 +62,21 @@
]


def record_operators_usage(op: LogicalOperator):
def record_block_format_usage(block_format: str):
with _recording_lock:
_recorded_block_formats.setdefault(block_format, 0)
_recorded_block_formats[block_format] += 1
formats_json_str = json.dumps(_recorded_block_formats)

record_extra_usage_tag(TagKey.DATA_BLOCK_FORMATS, formats_json_str)


def record_operators_usage(op: "LogicalOperator"):
"""Record logical operator usage with Ray telemetry."""
ops_dict = dict()
_collect_operators_to_dict(op, ops_dict)
ops_json_str = ""
with _recorded_operators_lock:
with _recording_lock:
for op, count in ops_dict.items():
_recorded_operators.setdefault(op, 0)
_recorded_operators[op] += count
Expand All @@ -73,8 +85,11 @@ def record_operators_usage(op: LogicalOperator):
record_extra_usage_tag(TagKey.DATA_LOGICAL_OPS, ops_json_str)


def _collect_operators_to_dict(op: LogicalOperator, ops_dict: Dict[str, int]):
def _collect_operators_to_dict(op: "LogicalOperator", ops_dict: Dict[str, int]):
"""Collect the logical operator name and count into `ops_dict`."""
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.write_operator import Write

for child in op.input_dependencies:
_collect_operators_to_dict(child, ops_dict)

Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import ray
from ray import ObjectRefGenerator
from ray.data._internal.util import _check_pyarrow_version
from ray.data._internal.usage import record_block_format_usage
from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI

Expand Down Expand Up @@ -387,18 +388,22 @@ def for_block(block: Block) -> "BlockAccessor[T]":
if isinstance(block, pyarrow.Table):
from ray.data._internal.arrow_block import ArrowBlockAccessor

record_block_format_usage("arrow")
return ArrowBlockAccessor(block)
elif isinstance(block, pandas.DataFrame):
from ray.data._internal.pandas_block import PandasBlockAccessor

record_block_format_usage("pandas")
return PandasBlockAccessor(block)
elif isinstance(block, bytes):
from ray.data._internal.arrow_block import ArrowBlockAccessor

record_block_format_usage("arrow")
return ArrowBlockAccessor.from_bytes(block)
elif isinstance(block, list):
from ray.data._internal.simple_block import SimpleBlockAccessor

record_block_format_usage("simple")
return SimpleBlockAccessor(block)
else:
raise TypeError("Not a block type: {} ({})".format(block, type(block)))
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
FlatMap,
)
from ray.data._internal.logical.operators.n_ary_operator import Zip
from ray.data._internal.logical.util import (
from ray.data._internal.usage import (
_recorded_operators,
_recorded_operators_lock,
_recording_lock,
_op_name_white_list,
)
from ray.data._internal.planner.planner import Planner
Expand All @@ -62,10 +62,10 @@ def _check_usage_record(op_names: List[str], clear_after_check: Optional[bool] =
(so that subsequent checks do not use existing records of operator usage)."""
for op_name in op_names:
assert op_name in _op_name_white_list
with _recorded_operators_lock:
with _recording_lock:
assert _recorded_operators.get(op_name, 0) > 0, _recorded_operators
if clear_after_check:
with _recorded_operators_lock:
with _recording_lock:
_recorded_operators.clear()


Expand Down
11 changes: 11 additions & 0 deletions python/ray/data/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np

from ray.data._internal.util import _check_pyarrow_version, _split_list
from ray.data._internal.usage import _recorded_block_formats
from ray.data._internal.memory_tracing import (
trace_allocation,
trace_deallocation,
Expand Down Expand Up @@ -87,6 +88,16 @@ def test_list_splits():
assert _split_list(["foo", 1, [0], None], 3) == [["foo", 1], [[0]], [None]]


def test_block_format_usage():
assert not _recorded_block_formats
ray.data.range(10).show()
assert set(_recorded_block_formats.keys()) == {"simple"}
ray.data.range_table(10).show()
assert set(_recorded_block_formats.keys()) == {"simple", "arrow"}
ray.data.range_table(10).map_batches(lambda x: x).show()
assert set(_recorded_block_formats.keys()) == {"simple", "arrow", "pandas"}


if __name__ == "__main__":
import sys

Expand Down
3 changes: 3 additions & 0 deletions src/ray/protobuf/usage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ enum TagKey {
// Logical operators, stored in JSON format with operator name and count.
// Example: {"MapBatches": 2, "Filter": 1}
DATA_LOGICAL_OPS = 400;
// Block formats: simple, pandas, or arrow.
// Example: {"pandas": 2, "numpy": 1}
DATA_BLOCK_FORMATS = 401;

// AIR
// Name of AIR trainer, or "Custom" if user-defined.
Expand Down

0 comments on commit c9263f2

Please sign in to comment.