diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 21fee309313b..5e4d44ddd751 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -129,6 +129,8 @@ def _configure_system(): wait, ) +from ray._private.structured_logging.logging_config import LoggingConfig # noqa: E402 + # We import ray.actor because some code is run in actor.py which initializes # some functions in the worker. import ray.actor # noqa: E402,F401 @@ -201,6 +203,7 @@ def __getattr__(self, attr): "LOCAL_MODE", "SCRIPT_MODE", "WORKER_MODE", + "LoggingConfig", ] # Public APIs that should automatically trigger ray.init(). @@ -239,6 +242,7 @@ def __getattr__(self, attr): "show_in_dashboard", "shutdown", "timeline", + "LoggingConfig", } assert set(__all__) == AUTO_INIT_APIS | NON_AUTO_INIT_APIS diff --git a/python/ray/_private/structured_logging/constants.py b/python/ray/_private/structured_logging/constants.py index 7f299f35e6a8..a0dac22302b4 100644 --- a/python/ray/_private/structured_logging/constants.py +++ b/python/ray/_private/structured_logging/constants.py @@ -45,3 +45,33 @@ class LogKey(str, Enum): FILENAME = "filename" LINENO = "lineno" EXC_TEXT = "exc_text" + + +LOG_MODE_DICT = { + "TEXT": lambda log_level: { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "text": { + "()": "ray._private.structured_logging.formatters.TextFormatter", + }, + }, + "filters": { + "core_context": { + "()": "ray._private.structured_logging.filters.CoreContextFilter", + }, + }, + "handlers": { + "console": { + "level": log_level, + "class": "logging.StreamHandler", + "formatter": "text", + "filters": ["core_context"], + }, + }, + "root": { + "level": log_level, + "handlers": ["console"], + }, + }, +} diff --git a/python/ray/_private/structured_logging/formatters.py b/python/ray/_private/structured_logging/formatters.py index 7938f1974a15..c00718c40948 100644 --- a/python/ray/_private/structured_logging/formatters.py +++ b/python/ray/_private/structured_logging/formatters.py @@ -1,12 +1,20 @@ import logging import json from ray._private.structured_logging.constants import LogKey, LOGRECORD_STANDARD_ATTRS +from ray._private.ray_constants import LOGGER_FORMAT -class JSONFormatter(logging.Formatter): - def format(self, record): - record_format = { - LogKey.ASCTIME: self.formatTime(record), +def generate_record_format_attrs( + formatter: logging.Formatter, + record: logging.LogRecord, + exclude_standard_attrs, +) -> dict: + record_format_attrs = {} + # If `exclude_standard_attrs` is False, include the standard attributes. + # Otherwise, include only Ray and user-provided context. + if not exclude_standard_attrs: + record_format_attrs = { + LogKey.ASCTIME: formatter.formatTime(record), LogKey.LEVELNAME: record.levelname, LogKey.MESSAGE: record.getMessage(), LogKey.FILENAME: record.filename, @@ -14,11 +22,34 @@ def format(self, record): } if record.exc_info: if not record.exc_text: - record.exc_text = self.formatException(record.exc_info) - record_format[LogKey.EXC_TEXT] = record.exc_text - - for key, value in record.__dict__.items(): - # Both Ray and user-provided context are stored in `record_format`. - if key not in LOGRECORD_STANDARD_ATTRS: - record_format[key] = value - return json.dumps(record_format) + record.exc_text = formatter.formatException(record.exc_info) + record_format_attrs[LogKey.EXC_TEXT] = record.exc_text + + for key, value in record.__dict__.items(): + # Both Ray and user-provided context are stored in `record_format`. + if key not in LOGRECORD_STANDARD_ATTRS: + record_format_attrs[key] = value + return record_format_attrs + + +class JSONFormatter(logging.Formatter): + def format(self, record): + record_format_attrs = generate_record_format_attrs( + self, record, exclude_standard_attrs=False + ) + return json.dumps(record_format_attrs) + + +class TextFormatter(logging.Formatter): + def __init__(self) -> None: + self._inner_formatter = logging.Formatter(LOGGER_FORMAT) + + def format(self, record: logging.LogRecord) -> str: + s = self._inner_formatter.format(record) + record_format_attrs = generate_record_format_attrs( + self, record, exclude_standard_attrs=True + ) + additional_attrs = " ".join( + [f"{key}={value}" for key, value in record_format_attrs.items()] + ) + return f"{s} {additional_attrs}" diff --git a/python/ray/_private/structured_logging/logging_config.py b/python/ray/_private/structured_logging/logging_config.py new file mode 100644 index 000000000000..825fcd232921 --- /dev/null +++ b/python/ray/_private/structured_logging/logging_config.py @@ -0,0 +1,58 @@ +from ray._private.structured_logging.constants import LOG_MODE_DICT +from ray.util.annotations import PublicAPI + +from dataclasses import dataclass + + +@PublicAPI(stability="alpha") +@dataclass +class LoggingConfig: + """ + Logging configuration for a Ray job. These configurations are used to set up the + root logger of the driver process and all Ray tasks and actor processes that belong + to the job. + + Examples: + .. testcode:: + + import ray + import logging + + ray.init( + logging_config=ray.LoggingConfig(encoding="TEXT", log_level="INFO") + ) + + @ray.remote + def f(): + logger = logging.getLogger(__name__) + logger.info("This is a Ray task") + + ray.get(f.remote()) + + .. testoutput:: + :options: +MOCK + + 2024-06-03 07:53:50,815 INFO test.py:11 -- This is a Ray task job_id=01000000 worker_id=0dbbbd0f17d5343bbeee8228fa5ff675fe442445a1bc06ec899120a8 node_id=577706f1040ea8ebd76f7cf5a32338d79fe442e01455b9e7110cddfc task_id=c8ef45ccd0112571ffffffffffffffffffffffff01000000 + + Args: + encoding: Encoding type for the logs. The valid value is 'TEXT' + log_level: Log level for the logs. Defaults to 'INFO'. You can set + it to 'DEBUG' to receive more detailed debug logs. + """ # noqa: E501 + + encoding: str = "TEXT" + log_level: str = "INFO" + + def __post_init__(self): + if self.encoding not in LOG_MODE_DICT: + raise ValueError( + f"Invalid encoding type: {self.encoding}. " + f"Valid encoding types are: {list(LOG_MODE_DICT.keys())}" + ) + + def _get_dict_config(self) -> dict: + """Get the logging configuration based on the encoding type. + Returns: + dict: The logging configuration. + """ + return LOG_MODE_DICT[self.encoding](self.log_level) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index d4bba87fc4e8..df1155315b73 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -50,6 +50,8 @@ import ray._private.state import ray._private.storage as storage +from ray._private.structured_logging.logging_config import LoggingConfig + # Ray modules import ray.actor import ray.cloudpickle as pickle # noqa @@ -1228,6 +1230,7 @@ def init( configure_logging: bool = True, logging_level: int = ray_constants.LOGGER_LEVEL, logging_format: Optional[str] = None, + logging_config: Optional[LoggingConfig] = None, log_to_driver: bool = True, namespace: Optional[str] = None, runtime_env: Optional[Union[Dict[str, Any], "RuntimeEnv"]] = None, # noqa: F821 @@ -1326,12 +1329,15 @@ def init( configure_logging: True (default) if configuration of logging is allowed here. Otherwise, the user may want to configure it separately. - logging_level: Logging level, defaults to logging.INFO. Ignored unless + logging_level: Logging level for the "ray" logger of the driver process, + defaults to logging.INFO. Ignored unless "configure_logging" is true. + logging_format: Logging format for the "ray" logger of the driver process, + defaults to a string containing a timestamp, filename, line number, and + message. See the source file ray_constants.py for details. Ignored unless "configure_logging" is true. - logging_format: Logging format, defaults to string containing a - timestamp, filename, line number, and message. See the source file - ray_constants.py for details. Ignored unless "configure_logging" - is true. + logging_config: [Experimental] Logging configuration will be applied to the + root loggers for both the driver process and all worker processes belonging + to the current job. See :class:`~LoggingConfig` for details. log_to_driver: If true, the output from all of the worker processes on all nodes will be directed to the driver. namespace: A namespace is a logical grouping of jobs and named actors. @@ -1384,11 +1390,17 @@ def init( Exception: An exception is raised if an inappropriate combination of arguments is passed in. """ + # Configure the "ray" logger for the driver process. if configure_logging: setup_logger(logging_level, logging_format or ray_constants.LOGGER_FORMAT) else: logging.getLogger("ray").handlers.clear() + # Configure the logging settings for the driver process. + if logging_config: + dict_config = logging_config._get_dict_config() + logging.config.dictConfig(dict_config) + # Parse the hidden options: _enable_object_reconstruction: bool = kwargs.pop( "_enable_object_reconstruction", False @@ -1561,6 +1573,11 @@ def sigterm_handler(signum, frame): # Set runtime_env in job_config if passed in as part of ray.init() job_config.set_runtime_env(runtime_env) + # Pass the logging_config to job_config to configure loggers of all worker + # processes belonging to the job. + if logging_config: + job_config.set_py_logging_config(logging_config) + redis_address, gcs_address = None, None bootstrap_address = services.canonicalize_bootstrap_address(address, _temp_dir) if bootstrap_address is not None: diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index e2a0754db893..cdfad9f89c2a 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2585,6 +2585,20 @@ def maybe_initialize_job_config(): print(job_id_magic_token, end="") print(job_id_magic_token, file=sys.stderr, end="") + # Configure worker process's Python logging. + log_config_dict = {} + serialized_py_logging_config = \ + core_worker.get_job_config().serialized_py_logging_config + if serialized_py_logging_config: + logging_config = pickle.loads(serialized_py_logging_config) + log_config_dict = logging_config._get_dict_config() + if log_config_dict: + try: + logging.config.dictConfig(log_config_dict) + except Exception as e: + backtrace = \ + "".join(traceback.format_exception(type(e), e, e.__traceback__)) + core_worker.drain_and_exit_worker("user", backtrace) job_config_initialized = True @@ -3391,7 +3405,7 @@ cdef class CoreWorker: if exit_type == "user": c_exit_type = WORKER_EXIT_TYPE_USER_ERROR - if exit_type == "system": + elif exit_type == "system": c_exit_type = WORKER_EXIT_TYPE_SYSTEM_ERROR elif exit_type == "intentional_system_exit": c_exit_type = WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM_ERROR diff --git a/python/ray/job_config.py b/python/ray/job_config.py index b3083876f8e3..43e109cd31c8 100644 --- a/python/ray/job_config.py +++ b/python/ray/job_config.py @@ -1,6 +1,8 @@ import uuid from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +import ray.cloudpickle as pickle +from ray._private.structured_logging.logging_config import LoggingConfig from ray.util.annotations import PublicAPI if TYPE_CHECKING: @@ -70,6 +72,8 @@ def __init__( self.set_default_actor_lifetime(default_actor_lifetime) # A list of directories that specify the search path for python workers. self._py_driver_sys_path = _py_driver_sys_path or [] + # Python logging configurations that will be passed to Ray tasks/actors. + self.py_logging_config = None def set_metadata(self, key: str, value: str) -> None: """Add key-value pair to the metadata dictionary. @@ -115,6 +119,20 @@ def set_runtime_env( self.runtime_env = self._validate_runtime_env() self._cached_pb = None + def set_py_logging_config( + self, + logging_config: Optional[LoggingConfig] = None, + ): + """Set the logging configuration for the job. + + The logging configuration will be applied to the root loggers of + all Ray task and actor processes that belong to this job. + + Args: + logging_config: The logging configuration to set. + """ + self.py_logging_config = logging_config + def set_ray_namespace(self, ray_namespace: str) -> None: """Set Ray :ref:`namespace `. @@ -188,6 +206,8 @@ def _get_proto_job_config(self): if self._default_actor_lifetime is not None: pb.default_actor_lifetime = self._default_actor_lifetime + if self.py_logging_config: + pb.serialized_py_logging_config = pickle.dumps(self.py_logging_config) self._cached_pb = pb return self._cached_pb diff --git a/python/ray/tests/test_job.py b/python/ray/tests/test_job.py index 021d36da2aa6..8c17f099b92b 100644 --- a/python/ray/tests/test_job.py +++ b/python/ray/tests/test_job.py @@ -11,6 +11,8 @@ from pathlib import Path import pytest +import ray.cloudpickle as pickle + import ray from ray._private.test_utils import ( run_string_as_driver, @@ -19,7 +21,7 @@ format_web_url, wait_for_pid_to_exit, ) -from ray.job_config import JobConfig +from ray.job_config import JobConfig, LoggingConfig from ray.job_submission import JobSubmissionClient from ray.dashboard.modules.job.pydantic_models import JobDetails @@ -189,6 +191,15 @@ def test_config_metadata(shutdown_only): assert dict(from_worker.metadata) == job_config.metadata +def test_logging_config_serialization(): + logging_config = LoggingConfig(encoding="TEXT") + serialized_py_logging_config = pickle.dumps(logging_config) + job_config = JobConfig() + job_config.set_py_logging_config(logging_config) + pb = job_config._get_proto_job_config() + assert pb.serialized_py_logging_config == serialized_py_logging_config + + def test_get_entrypoint(): get_entrypoint = """ from ray._private.utils import get_entrypoint_name diff --git a/python/ray/tests/test_structured_logging.py b/python/ray/tests/test_structured_logging.py index 8880d1db234b..5c5e6a5461e9 100644 --- a/python/ray/tests/test_structured_logging.py +++ b/python/ray/tests/test_structured_logging.py @@ -1,3 +1,4 @@ +import logging.config import pytest import ray import os @@ -6,7 +7,9 @@ import json from ray._private.structured_logging.filters import CoreContextFilter -from ray._private.structured_logging.formatters import JSONFormatter +from ray._private.structured_logging.formatters import JSONFormatter, TextFormatter +from ray.job_config import LoggingConfig +from ray._private.test_utils import run_string_as_driver class TestCoreContextFilter: @@ -122,6 +125,108 @@ def test_record_with_user_provided_context(self, shutdown_only): assert "exc_text" not in record_dict +class TestTextFormatter: + def test_record_with_user_provided_context(self): + formatter = TextFormatter() + record = logging.makeLogRecord({"user": "ray"}) + formatted = formatter.format(record) + assert "user=ray" in formatted + + def test_record_with_exception(self): + formatter = TextFormatter() + record = logging.LogRecord( + name="test_logger", + level=logging.INFO, + pathname="test.py", + lineno=1000, + msg="Test message", + args=None, + exc_info=None, + ) + formatted = formatter.format(record) + for s in ["INFO", "Test message", "test.py:1000", "--"]: + assert s in formatted + + +class TestLoggingConfig: + def test_log_level(self): + log_level = "DEBUG" + logging_config = LoggingConfig(log_level=log_level) + dict_config = logging_config._get_dict_config() + assert dict_config["handlers"]["console"]["level"] == log_level + assert dict_config["root"]["level"] == log_level + + def test_invalid_dict_config(self): + with pytest.raises(ValueError): + LoggingConfig(encoding="INVALID")._get_dict_config() + + +class TestTextModeE2E: + def test_text_mode_task(self, shutdown_only): + script = """ +import ray +import logging + +ray.init( + logging_config=ray.LoggingConfig(encoding="TEXT") +) + +@ray.remote +def f(): + logger = logging.getLogger(__name__) + logger.info("This is a Ray task") + +obj_ref = f.remote() +ray.get(obj_ref) +""" + stderr = run_string_as_driver(script) + should_exist = [ + "job_id", + "worker_id", + "node_id", + "task_id", + "INFO", + "This is a Ray task", + ] + for s in should_exist: + assert s in stderr + assert "actor_id" not in stderr + + def test_text_mode_actor(self, shutdown_only): + script = """ +import ray +import logging + +ray.init( + logging_config=ray.LoggingConfig(encoding="TEXT") +) + +@ray.remote +class actor: + def __init__(self): + pass + + def print_message(self): + logger = logging.getLogger(__name__) + logger.info("This is a Ray actor") + +actor_instance = actor.remote() +ray.get(actor_instance.print_message.remote()) +""" + stderr = run_string_as_driver(script) + should_exist = [ + "job_id", + "worker_id", + "node_id", + "actor_id", + "task_id", + "INFO", + "This is a Ray actor", + ] + for s in should_exist: + assert s in stderr + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 8f6d9c27cdee..ae321a8a7cad 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -408,6 +408,8 @@ message JobConfig { // System paths of the driver scripts. Python workers need to search // these paths to load modules. repeated string py_driver_sys_path = 8; + // Python logging configurations that will be passed to Ray tasks/actors. + bytes serialized_py_logging_config = 9; } message StreamingGeneratorReturnIdInfo {