Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Add usage tag for which block formats are used #34384

Merged
merged 4 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import ray
from ray.data._internal.logical.optimizers import get_execution_plan
from ray.data._internal.logical.util import record_operators_usage
from ray.data._internal.usage import record_operators_usage
from ray.data.context import DataContext
from ray.types import ObjectRef
from ray.data.block import Block, BlockMetadata, List
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from typing import Dict
from typing import Dict, TYPE_CHECKING
import json
import threading

from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.write_operator import Write

if TYPE_CHECKING:
from ray.data._internal.logical.interfaces import LogicalOperator

# Guards the below dicts.
_recording_lock = threading.Lock()
# The dictionary for the operator name and count.
_recorded_operators = dict()
_recorded_operators_lock = threading.Lock()
# The dictionary for the block format name and count.
_recorded_block_formats = dict()

# The white list of operator names allowed to be recorded.
_op_name_white_list = [
Expand Down Expand Up @@ -59,12 +62,21 @@
]


def record_operators_usage(op: LogicalOperator):
def record_block_format_usage(block_format: str):
with _recording_lock:
_recorded_block_formats.setdefault(block_format, 0)
_recorded_block_formats[block_format] += 1
formats_json_str = json.dumps(_recorded_block_formats)

record_extra_usage_tag(TagKey.DATA_BLOCK_FORMATS, formats_json_str)


def record_operators_usage(op: "LogicalOperator"):
"""Record logical operator usage with Ray telemetry."""
ops_dict = dict()
_collect_operators_to_dict(op, ops_dict)
ops_json_str = ""
with _recorded_operators_lock:
with _recording_lock:
for op, count in ops_dict.items():
_recorded_operators.setdefault(op, 0)
_recorded_operators[op] += count
Expand All @@ -73,8 +85,11 @@ def record_operators_usage(op: LogicalOperator):
record_extra_usage_tag(TagKey.DATA_LOGICAL_OPS, ops_json_str)


def _collect_operators_to_dict(op: LogicalOperator, ops_dict: Dict[str, int]):
def _collect_operators_to_dict(op: "LogicalOperator", ops_dict: Dict[str, int]):
"""Collect the logical operator name and count into `ops_dict`."""
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.write_operator import Write

for child in op.input_dependencies:
_collect_operators_to_dict(child, ops_dict)

Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import ray
from ray import ObjectRefGenerator
from ray.data._internal.util import _check_pyarrow_version
from ray.data._internal.usage import record_block_format_usage
from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI

Expand Down Expand Up @@ -386,18 +387,22 @@ def for_block(block: Block) -> "BlockAccessor[T]":
if isinstance(block, pyarrow.Table):
from ray.data._internal.arrow_block import ArrowBlockAccessor

record_block_format_usage("arrow")
return ArrowBlockAccessor(block)
elif isinstance(block, pandas.DataFrame):
from ray.data._internal.pandas_block import PandasBlockAccessor

record_block_format_usage("pandas")
return PandasBlockAccessor(block)
elif isinstance(block, bytes):
from ray.data._internal.arrow_block import ArrowBlockAccessor

record_block_format_usage("arrow")
return ArrowBlockAccessor.from_bytes(block)
elif isinstance(block, list):
from ray.data._internal.simple_block import SimpleBlockAccessor

record_block_format_usage("simple")
return SimpleBlockAccessor(block)
else:
raise TypeError("Not a block type: {} ({})".format(block, type(block)))
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
FlatMap,
)
from ray.data._internal.logical.operators.n_ary_operator import Zip
from ray.data._internal.logical.util import (
from ray.data._internal.usage import (
_recorded_operators,
_recorded_operators_lock,
_op_name_white_list,
Expand Down
11 changes: 11 additions & 0 deletions python/ray/data/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np

from ray.data._internal.util import _check_pyarrow_version, _split_list
from ray.data._internal.usage import _recorded_block_formats
from ray.data._internal.memory_tracing import (
trace_allocation,
trace_deallocation,
Expand All @@ -11,6 +12,16 @@
from ray.data.tests.conftest import * # noqa: F401, F403


def test_block_format_usage():
assert not _recorded_block_formats
ray.data.range(10).show()
assert set(_recorded_block_formats.keys()) == {"simple"}
ray.data.range_table(10).show()
assert set(_recorded_block_formats.keys()) == {"simple", "arrow"}
ray.data.range_table(10).map_batches(lambda x: x).show()
assert set(_recorded_block_formats.keys()) == {"simple", "arrow", "pandas"}


def test_check_pyarrow_version_bounds(unsupported_pyarrow_version):
# Test that pyarrow versions outside of the defined bounds cause an ImportError to
# be raised.
Expand Down
3 changes: 3 additions & 0 deletions src/ray/protobuf/usage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ enum TagKey {
// Logical operators, stored in JSON format with operator name and count.
// Example: {"MapBatches": 2, "Filter": 1}
DATA_LOGICAL_OPS = 400;
// Block formats: simple, pandas, or arrow.
// Example: {"pandas": 2, "numpy": 1}
DATA_BLOCK_FORMATS = 401;

// AIR
// Name of AIR trainer, or "Custom" if user-defined.
Expand Down