diff --git a/python/ray/_private/usage/usage_lib.py b/python/ray/_private/usage/usage_lib.py index 1f503f06a97f..2e1c26a02879 100644 --- a/python/ray/_private/usage/usage_lib.py +++ b/python/ray/_private/usage/usage_lib.py @@ -41,9 +41,11 @@ To see collected/reported data, see `usage_stats.json` inside a temp folder (e.g., /tmp/ray/session_[id]/*). """ +import glob import json import logging import os +import re import sys import time import uuid @@ -158,6 +160,52 @@ class UsageStatsEnabledness(Enum): _recorded_library_usages = set() +# NOTE: Do not change the write / read protocol. That will cause +# version incompatibility issues. +class LibUsageRecorder: + """A class to put/get the library usage to the ray tmp folder. + + See https://github.com/ray-project/ray/pull/25842 for more details. + """ + + def __init__(self, temp_dir_path: str): + self._lib_usage_dir = Path(temp_dir_path) + self._lib_usage_prefix = "_ray_lib_usage-" + self._lib_usage_filename_match = re.compile( + f"{self._lib_usage_prefix}([0-9a-zA-Z_]+).txt" + ) + + def put_lib_usage(self, lib_name: str): + """Put the library usage to the ray tmp folder.""" + lib_usage_file = self._lib_usage_dir / self._lib_usage_filename(lib_name) + lib_usage_file.touch(exist_ok=True) + + def read_lib_usages(self) -> List[str]: + """Read a list of library usages from the ray tmp folder.""" + # For checking if the file exists, it is okay to have a minor chance of + # having race condition. + lib_usages = [] + file_paths = glob.glob(f"{self._lib_usage_dir}/{self._lib_usage_prefix}*") + for file_path in file_paths: + file_path = Path(file_path) + if file_path.exists(): + lib_usages.append(self._get_lib_usage_from_filename(file_path.name)) + return lib_usages + + def delete_lib_usages(self): + """Delete all usage files. Test only""" + file_paths = glob.glob(f"{self._lib_usage_dir}/{self._lib_usage_prefix}*") + for file_path in file_paths: + file_path = Path(file_path) + file_path.unlink() + + def _lib_usage_filename(self, lib_name: str) -> str: + return f"{self._lib_usage_prefix}{lib_name}.txt" + + def _get_lib_usage_from_filename(self, filename: str) -> str: + return self._lib_usage_filename_match.match(filename).group(1) + + def _put_library_usage(library_usage: str): assert _internal_kv_initialized() try: @@ -169,11 +217,25 @@ def _put_library_usage(library_usage: str): except Exception as e: logger.debug(f"Failed to put library usage, {e}") + # Record the library usage to the temp (e.g., /tmp/ray) folder. + # Note that although we always write this file, it is not + # reported when the usage stats is disabled. + if ray.worker.global_worker.mode == ray.SCRIPT_MODE: + try: + lib_usage_recorder = LibUsageRecorder(ray._private.utils.get_ray_temp_dir()) + lib_usage_recorder.put_lib_usage(library_usage) + except Exception as e: + logger.debug(f"Failed to write a library usage to the home folder, {e}") + def record_library_usage(library_usage: str): """Record library usage (e.g. which library is used)""" if library_usage in _recorded_library_usages: return + if "-" in library_usage: + # - is not permitted since it should be used as a separator + # of the lib usage file name. See LibUsageRecorder. + raise ValueError("The library name contains a char - which is not permitted.") _recorded_library_usages.add(library_usage) if not _internal_kv_initialized(): @@ -399,6 +461,16 @@ def get_library_usages_to_report(gcs_client, num_retries: int) -> List[str]: for library_usage in library_usages: library_usage = library_usage.decode("utf-8") result.append(library_usage[len(usage_constant.LIBRARY_USAGE_PREFIX) :]) + + try: + historical_lib_usages = LibUsageRecorder( + ray._private.utils.get_ray_temp_dir() + ).read_lib_usages() + for library_usage in historical_lib_usages: + if library_usage not in result: + result.append(library_usage) + except Exception as e: + logger.info(f"Failed to read historical library usage {e}") return result except Exception as e: logger.info(f"Failed to get library usages to report {e}") diff --git a/python/ray/tests/test_usage_stats.py b/python/ray/tests/test_usage_stats.py index 3cc404267b47..d84619d064e8 100644 --- a/python/ray/tests/test_usage_stats.py +++ b/python/ray/tests/test_usage_stats.py @@ -91,6 +91,10 @@ def print_dashboard_log(): @pytest.fixture def reset_lib_usage(): yield + # Remove the lib usage so that it will be reset for each test. + ray_usage_lib.LibUsageRecorder( + ray._private.utils.get_ray_temp_dir() + ).delete_lib_usages() ray.experimental.internal_kv._internal_kv_reset() ray_usage_lib._recorded_library_usages.clear() @@ -161,6 +165,37 @@ def test_set_usage_stats_enabled_via_config(monkeypatch, tmp_path, reset_lib_usa ray_usage_lib.set_usage_stats_enabled_via_config(True) +def test_lib_usage_recorder(tmp_path): + recorder = ray_usage_lib.LibUsageRecorder(tmp_path) + lib_tune = "tune" + lib_rllib = "rllib" + + filename = recorder._lib_usage_filename(lib_tune) + assert recorder._get_lib_usage_from_filename(filename) == lib_tune + + # Write tune. + assert recorder.read_lib_usages() == [] + recorder.put_lib_usage(lib_tune) + assert recorder.read_lib_usages() == [lib_tune] + recorder.put_lib_usage(lib_tune) + assert recorder.read_lib_usages() == [lib_tune] + + # Test write is idempotent + for _ in range(5): + recorder.put_lib_usage(lib_tune) + assert recorder.read_lib_usages() == [lib_tune] + + # Write rllib. + recorder.put_lib_usage(lib_rllib) + assert set(recorder.read_lib_usages()) == {lib_tune, lib_rllib} + + # Test idempotency when there is more than 1 lib. + recorder.put_lib_usage(lib_rllib) + recorder.put_lib_usage(lib_rllib) + recorder.put_lib_usage(lib_tune) + assert set(recorder.read_lib_usages()) == {lib_tune, lib_rllib} + + @pytest.fixture def clear_loggers(): """Remove handlers from all loggers""" @@ -358,13 +393,20 @@ def test_library_usages(shutdown_only, reset_lib_usage): library_usages = ray_usage_lib.get_library_usages_to_report( ray.experimental.internal_kv.internal_kv_get_gcs_client(), num_retries=20 ) - assert set(library_usages) == { + tmp_path = ray._private.utils.get_ray_temp_dir() + lib_usages_from_home_folder = ray_usage_lib.LibUsageRecorder( + tmp_path + ).read_lib_usages() + expected = { "pre_init", "post_init", "dataset", "workflow", "serve", } + assert set(library_usages) == expected + assert set(lib_usages_from_home_folder) == expected + serve.shutdown() @@ -928,6 +970,67 @@ def verify(): wait_for_condition(verify) +@pytest.mark.skipif( + os.environ.get("RAY_MINIMAL") == "1", + reason="Test depends on library that's not downloaded from a minimal install.", +) +def test_lib_usage_record_from_init_session( + monkeypatch, ray_start_cluster, reset_lib_usage +): + """ + Make sure we store a lib usage to the /tmp/ray folder and report them + when any instance that has usage stats enabled. + """ + + # Start a driver without usage stats enabled. This will record + # lib_usage.txt. + script = """ +import ray +import os +from ray import train # noqa: F401 +from ray import tune # noqa: F401 +from ray.rllib.algorithms.ppo import PPO # noqa: F401 + +# Start a instance that disables usage stats. +ray.init() +""" + + run_string_as_driver(script) + + # Run the cluster that reports the usage stats. Make sure the lib usage is reported. + with monkeypatch.context() as m: + m.setenv("RAY_USAGE_STATS_ENABLED", "1") + m.setenv("RAY_USAGE_STATS_REPORT_URL", "http://127.0.0.1:8000/usage") + m.setenv("RAY_USAGE_STATS_REPORT_INTERVAL_S", "1") + cluster = ray_start_cluster + cluster.add_node(num_cpus=3) + ray.init(address=cluster.address) + + """ + Verify the library usage is recorded to the ray folder. + """ + lib_usages = ray_usage_lib.LibUsageRecorder( + ray._private.utils.get_ray_temp_dir() + ).read_lib_usages() + assert set(lib_usages) == {"train", "rllib", "tune"} + + """ + Verify the library usage is reported from the current instance. + """ + print("Verifying lib usage report.") + global_node = ray.worker._global_node + temp_dir = pathlib.Path(global_node.get_session_dir_path()) + + wait_for_condition(lambda: file_exists(temp_dir), timeout=30) + + def verify(): + lib_usages = read_file(temp_dir, "usage_stats")["library_usages"] + print(lib_usages) + return set(lib_usages) == {"rllib", "train", "tune"} + + wait_for_condition(verify) + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))