Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[data] Add usage tag for which block formats are used (#34384)" #34569

Merged
merged 3 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import ray
from ray.data._internal.logical.optimizers import get_execution_plan
from ray.data._internal.usage import record_operators_usage
from ray.data._internal.logical.util import record_operators_usage
from ray.data.context import DataContext
from ray.types import ObjectRef
from ray.data.block import Block, BlockMetadata, List
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from typing import Dict, TYPE_CHECKING
from typing import Dict
import json
import threading

from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.write_operator import Write

if TYPE_CHECKING:
from ray.data._internal.logical.interfaces import LogicalOperator

# Guards the below dicts.
_recording_lock = threading.Lock()
# The dictionary for the operator name and count.
_recorded_operators = dict()
# The dictionary for the block format name and count.
_recorded_block_formats = dict()
_recorded_operators_lock = threading.Lock()

# The white list of operator names allowed to be recorded.
_op_name_white_list = [
Expand Down Expand Up @@ -62,21 +59,12 @@
]


def record_block_format_usage(block_format: str):
with _recording_lock:
_recorded_block_formats.setdefault(block_format, 0)
_recorded_block_formats[block_format] += 1
formats_json_str = json.dumps(_recorded_block_formats)

record_extra_usage_tag(TagKey.DATA_BLOCK_FORMATS, formats_json_str)


def record_operators_usage(op: "LogicalOperator"):
def record_operators_usage(op: LogicalOperator):
"""Record logical operator usage with Ray telemetry."""
ops_dict = dict()
_collect_operators_to_dict(op, ops_dict)
ops_json_str = ""
with _recording_lock:
with _recorded_operators_lock:
for op, count in ops_dict.items():
_recorded_operators.setdefault(op, 0)
_recorded_operators[op] += count
Expand All @@ -85,11 +73,8 @@ def record_operators_usage(op: "LogicalOperator"):
record_extra_usage_tag(TagKey.DATA_LOGICAL_OPS, ops_json_str)


def _collect_operators_to_dict(op: "LogicalOperator", ops_dict: Dict[str, int]):
def _collect_operators_to_dict(op: LogicalOperator, ops_dict: Dict[str, int]):
"""Collect the logical operator name and count into `ops_dict`."""
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.write_operator import Write

for child in op.input_dependencies:
_collect_operators_to_dict(child, ops_dict)

Expand Down
14 changes: 0 additions & 14 deletions python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import ray
from ray import ObjectRefGenerator
from ray.data._internal.util import _check_pyarrow_version, _truncated_repr
from ray.data._internal.usage import record_block_format_usage
from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI, PublicAPI

Expand Down Expand Up @@ -433,31 +432,18 @@ def for_block(block: Block) -> "BlockAccessor[T]":
if isinstance(block, pyarrow.Table):
from ray.data._internal.arrow_block import ArrowBlockAccessor

record_block_format_usage("arrow")
return ArrowBlockAccessor(block)
elif isinstance(block, pandas.DataFrame):
from ray.data._internal.pandas_block import PandasBlockAccessor

record_block_format_usage("pandas")
return PandasBlockAccessor(block)
elif isinstance(block, bytes):
from ray.data._internal.arrow_block import ArrowBlockAccessor

record_block_format_usage("arrow")
return ArrowBlockAccessor.from_bytes(block)
elif isinstance(block, list):
from ray.data._internal.simple_block import SimpleBlockAccessor

ctx = ray.data.DatasetContext.get_current()
if ctx.strict_mode:
raise StrictModeError(
f"Error validating {_truncated_repr(block)}: "
"Standalone Python objects are not "
"allowed in strict mode. To use Python objects in a datastream, "
"wrap them in a dict of numpy arrays, e.g., "
"return `{'item': np.array(batch)}` instead of just `batch`."
)
record_block_format_usage("simple")
return SimpleBlockAccessor(block)
else:
raise TypeError("Not a block type: {} ({})".format(block, type(block)))
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
FlatMap,
)
from ray.data._internal.logical.operators.n_ary_operator import Zip
from ray.data._internal.usage import (
from ray.data._internal.logical.util import (
_recorded_operators,
_recording_lock,
_recorded_operators_lock,
_op_name_white_list,
)
from ray.data._internal.planner.planner import Planner
Expand All @@ -62,10 +62,10 @@ def _check_usage_record(op_names: List[str], clear_after_check: Optional[bool] =
(so that subsequent checks do not use existing records of operator usage)."""
for op_name in op_names:
assert op_name in _op_name_white_list
with _recording_lock:
with _recorded_operators_lock:
assert _recorded_operators.get(op_name, 0) > 0, _recorded_operators
if clear_after_check:
with _recording_lock:
with _recorded_operators_lock:
_recorded_operators.clear()


Expand Down
11 changes: 0 additions & 11 deletions python/ray/data/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import numpy as np

from ray.data._internal.util import _check_pyarrow_version, _split_list
from ray.data._internal.usage import _recorded_block_formats
from ray.data._internal.memory_tracing import (
trace_allocation,
trace_deallocation,
Expand Down Expand Up @@ -88,16 +87,6 @@ def test_list_splits():
assert _split_list(["foo", 1, [0], None], 3) == [["foo", 1], [[0]], [None]]


def test_block_format_usage():
assert not _recorded_block_formats
ray.data.range(10).show()
assert set(_recorded_block_formats.keys()) == {"simple"}
ray.data.range_table(10).show()
assert set(_recorded_block_formats.keys()) == {"simple", "arrow"}
ray.data.range_table(10).map_batches(lambda x: x).show()
assert set(_recorded_block_formats.keys()) == {"simple", "arrow", "pandas"}


if __name__ == "__main__":
import sys

Expand Down
3 changes: 0 additions & 3 deletions src/ray/protobuf/usage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ enum TagKey {
// Logical operators, stored in JSON format with operator name and count.
// Example: {"MapBatches": 2, "Filter": 1}
DATA_LOGICAL_OPS = 400;
// Block formats: simple, pandas, or arrow.
// Example: {"pandas": 2, "numpy": 1}
DATA_BLOCK_FORMATS = 401;

// AIR
// Name of AIR trainer, or "Custom" if user-defined.
Expand Down