Skip to content

Commit

Permalink
Revert "[data] Add usage tag for which block formats are used (ray-pr…
Browse files Browse the repository at this point in the history
…oject#34384)"

This reverts commit ffeedbf.
  • Loading branch information
xwjiang2010 committed Apr 19, 2023
1 parent ef5bff7 commit 7b1ceac
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 57 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import ray
from ray.data._internal.logical.optimizers import get_execution_plan
from ray.data._internal.usage import record_operators_usage
from ray.data._internal.logical.util import record_operators_usage
from ray.data.context import DataContext
from ray.types import ObjectRef
from ray.data.block import Block, BlockMetadata, List
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from typing import Dict, TYPE_CHECKING
from typing import Dict
import json
import threading

from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.write_operator import Write

if TYPE_CHECKING:
from ray.data._internal.logical.interfaces import LogicalOperator

# Guards the below dicts.
_recording_lock = threading.Lock()
# The dictionary for the operator name and count.
_recorded_operators = dict()
# The dictionary for the block format name and count.
_recorded_block_formats = dict()
_recorded_operators_lock = threading.Lock()

# The white list of operator names allowed to be recorded.
_op_name_white_list = [
Expand Down Expand Up @@ -62,21 +59,12 @@
]


def record_block_format_usage(block_format: str):
with _recording_lock:
_recorded_block_formats.setdefault(block_format, 0)
_recorded_block_formats[block_format] += 1
formats_json_str = json.dumps(_recorded_block_formats)

record_extra_usage_tag(TagKey.DATA_BLOCK_FORMATS, formats_json_str)


def record_operators_usage(op: "LogicalOperator"):
def record_operators_usage(op: LogicalOperator):
"""Record logical operator usage with Ray telemetry."""
ops_dict = dict()
_collect_operators_to_dict(op, ops_dict)
ops_json_str = ""
with _recording_lock:
with _recorded_operators_lock:
for op, count in ops_dict.items():
_recorded_operators.setdefault(op, 0)
_recorded_operators[op] += count
Expand All @@ -85,11 +73,8 @@ def record_operators_usage(op: "LogicalOperator"):
record_extra_usage_tag(TagKey.DATA_LOGICAL_OPS, ops_json_str)


def _collect_operators_to_dict(op: "LogicalOperator", ops_dict: Dict[str, int]):
def _collect_operators_to_dict(op: LogicalOperator, ops_dict: Dict[str, int]):
"""Collect the logical operator name and count into `ops_dict`."""
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.write_operator import Write

for child in op.input_dependencies:
_collect_operators_to_dict(child, ops_dict)

Expand Down
15 changes: 0 additions & 15 deletions python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import ray
from ray import ObjectRefGenerator
from ray.data._internal.util import _check_pyarrow_version, _truncated_repr
from ray.data._internal.usage import record_block_format_usage
from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI, PublicAPI

Expand Down Expand Up @@ -433,31 +432,17 @@ def for_block(block: Block) -> "BlockAccessor[T]":
if isinstance(block, pyarrow.Table):
from ray.data._internal.arrow_block import ArrowBlockAccessor

record_block_format_usage("arrow")
return ArrowBlockAccessor(block)
elif isinstance(block, pandas.DataFrame):
from ray.data._internal.pandas_block import PandasBlockAccessor

record_block_format_usage("pandas")
return PandasBlockAccessor(block)
elif isinstance(block, bytes):
from ray.data._internal.arrow_block import ArrowBlockAccessor

record_block_format_usage("arrow")
return ArrowBlockAccessor.from_bytes(block)
elif isinstance(block, list):
from ray.data._internal.simple_block import SimpleBlockAccessor

ctx = ray.data.DatasetContext.get_current()
if ctx.strict_mode:
raise StrictModeError(
f"Error validating {_truncated_repr(block)}: "
"Standalone Python objects are not "
"allowed in strict mode. To use Python objects in a datastream, "
"wrap them in a dict of numpy arrays, e.g., "
"return `{'item': np.array(batch)}` instead of just `batch`."
)
record_block_format_usage("simple")
return SimpleBlockAccessor(block)
else:
raise TypeError("Not a block type: {} ({})".format(block, type(block)))
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
FlatMap,
)
from ray.data._internal.logical.operators.n_ary_operator import Zip
from ray.data._internal.usage import (
from ray.data._internal.logical.util import (
_recorded_operators,
_recording_lock,
_recorded_operators_lock,
_op_name_white_list,
)
from ray.data._internal.planner.planner import Planner
Expand All @@ -62,10 +62,10 @@ def _check_usage_record(op_names: List[str], clear_after_check: Optional[bool] =
(so that subsequent checks do not use existing records of operator usage)."""
for op_name in op_names:
assert op_name in _op_name_white_list
with _recording_lock:
with _recorded_operators_lock:
assert _recorded_operators.get(op_name, 0) > 0, _recorded_operators
if clear_after_check:
with _recording_lock:
with _recorded_operators_lock:
_recorded_operators.clear()


Expand Down
11 changes: 0 additions & 11 deletions python/ray/data/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import numpy as np

from ray.data._internal.util import _check_pyarrow_version, _split_list
from ray.data._internal.usage import _recorded_block_formats
from ray.data._internal.memory_tracing import (
trace_allocation,
trace_deallocation,
Expand Down Expand Up @@ -88,16 +87,6 @@ def test_list_splits():
assert _split_list(["foo", 1, [0], None], 3) == [["foo", 1], [[0]], [None]]


def test_block_format_usage():
assert not _recorded_block_formats
ray.data.range(10).show()
assert set(_recorded_block_formats.keys()) == {"simple"}
ray.data.range_table(10).show()
assert set(_recorded_block_formats.keys()) == {"simple", "arrow"}
ray.data.range_table(10).map_batches(lambda x: x).show()
assert set(_recorded_block_formats.keys()) == {"simple", "arrow", "pandas"}


if __name__ == "__main__":
import sys

Expand Down
3 changes: 0 additions & 3 deletions src/ray/protobuf/usage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ enum TagKey {
// Logical operators, stored in JSON format with operator name and count.
// Example: {"MapBatches": 2, "Filter": 1}
DATA_LOGICAL_OPS = 400;
// Block formats: simple, pandas, or arrow.
// Example: {"pandas": 2, "numpy": 1}
DATA_BLOCK_FORMATS = 401;

// AIR
// Name of AIR trainer, or "Custom" if user-defined.
Expand Down

0 comments on commit 7b1ceac

Please sign in to comment.