Skip to content

Commit

Permalink
[Core][2/n] Core structured logging: add Text formatter and pass log …
Browse files Browse the repository at this point in the history
…config to worker process (#45344)

Signed-off-by: kaihsun <[email protected]>
  • Loading branch information
kevin85421 authored Jun 4, 2024
1 parent bec91a5 commit 32cddae
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 20 deletions.
4 changes: 4 additions & 0 deletions python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ def _configure_system():
wait,
)

from ray._private.structured_logging.logging_config import LoggingConfig # noqa: E402

# We import ray.actor because some code is run in actor.py which initializes
# some functions in the worker.
import ray.actor # noqa: E402,F401
Expand Down Expand Up @@ -201,6 +203,7 @@ def __getattr__(self, attr):
"LOCAL_MODE",
"SCRIPT_MODE",
"WORKER_MODE",
"LoggingConfig",
]

# Public APIs that should automatically trigger ray.init().
Expand Down Expand Up @@ -239,6 +242,7 @@ def __getattr__(self, attr):
"show_in_dashboard",
"shutdown",
"timeline",
"LoggingConfig",
}

assert set(__all__) == AUTO_INIT_APIS | NON_AUTO_INIT_APIS
Expand Down
30 changes: 30 additions & 0 deletions python/ray/_private/structured_logging/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,33 @@ class LogKey(str, Enum):
FILENAME = "filename"
LINENO = "lineno"
EXC_TEXT = "exc_text"


LOG_MODE_DICT = {
"TEXT": lambda log_level: {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"text": {
"()": "ray._private.structured_logging.formatters.TextFormatter",
},
},
"filters": {
"core_context": {
"()": "ray._private.structured_logging.filters.CoreContextFilter",
},
},
"handlers": {
"console": {
"level": log_level,
"class": "logging.StreamHandler",
"formatter": "text",
"filters": ["core_context"],
},
},
"root": {
"level": log_level,
"handlers": ["console"],
},
},
}
55 changes: 43 additions & 12 deletions python/ray/_private/structured_logging/formatters.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,55 @@
import logging
import json
from ray._private.structured_logging.constants import LogKey, LOGRECORD_STANDARD_ATTRS
from ray._private.ray_constants import LOGGER_FORMAT


class JSONFormatter(logging.Formatter):
def format(self, record):
record_format = {
LogKey.ASCTIME: self.formatTime(record),
def generate_record_format_attrs(
formatter: logging.Formatter,
record: logging.LogRecord,
exclude_standard_attrs,
) -> dict:
record_format_attrs = {}
# If `exclude_standard_attrs` is False, include the standard attributes.
# Otherwise, include only Ray and user-provided context.
if not exclude_standard_attrs:
record_format_attrs = {
LogKey.ASCTIME: formatter.formatTime(record),
LogKey.LEVELNAME: record.levelname,
LogKey.MESSAGE: record.getMessage(),
LogKey.FILENAME: record.filename,
LogKey.LINENO: record.lineno,
}
if record.exc_info:
if not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
record_format[LogKey.EXC_TEXT] = record.exc_text

for key, value in record.__dict__.items():
# Both Ray and user-provided context are stored in `record_format`.
if key not in LOGRECORD_STANDARD_ATTRS:
record_format[key] = value
return json.dumps(record_format)
record.exc_text = formatter.formatException(record.exc_info)
record_format_attrs[LogKey.EXC_TEXT] = record.exc_text

for key, value in record.__dict__.items():
# Both Ray and user-provided context are stored in `record_format`.
if key not in LOGRECORD_STANDARD_ATTRS:
record_format_attrs[key] = value
return record_format_attrs


class JSONFormatter(logging.Formatter):
def format(self, record):
record_format_attrs = generate_record_format_attrs(
self, record, exclude_standard_attrs=False
)
return json.dumps(record_format_attrs)


class TextFormatter(logging.Formatter):
def __init__(self) -> None:
self._inner_formatter = logging.Formatter(LOGGER_FORMAT)

def format(self, record: logging.LogRecord) -> str:
s = self._inner_formatter.format(record)
record_format_attrs = generate_record_format_attrs(
self, record, exclude_standard_attrs=True
)
additional_attrs = " ".join(
[f"{key}={value}" for key, value in record_format_attrs.items()]
)
return f"{s} {additional_attrs}"
58 changes: 58 additions & 0 deletions python/ray/_private/structured_logging/logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from ray._private.structured_logging.constants import LOG_MODE_DICT
from ray.util.annotations import PublicAPI

from dataclasses import dataclass


@PublicAPI(stability="alpha")
@dataclass
class LoggingConfig:
"""
Logging configuration for a Ray job. These configurations are used to set up the
root logger of the driver process and all Ray tasks and actor processes that belong
to the job.
Examples:
.. testcode::
import ray
import logging
ray.init(
logging_config=ray.LoggingConfig(encoding="TEXT", log_level="INFO")
)
@ray.remote
def f():
logger = logging.getLogger(__name__)
logger.info("This is a Ray task")
ray.get(f.remote())
.. testoutput::
:options: +MOCK
2024-06-03 07:53:50,815 INFO test.py:11 -- This is a Ray task job_id=01000000 worker_id=0dbbbd0f17d5343bbeee8228fa5ff675fe442445a1bc06ec899120a8 node_id=577706f1040ea8ebd76f7cf5a32338d79fe442e01455b9e7110cddfc task_id=c8ef45ccd0112571ffffffffffffffffffffffff01000000
Args:
encoding: Encoding type for the logs. The valid value is 'TEXT'
log_level: Log level for the logs. Defaults to 'INFO'. You can set
it to 'DEBUG' to receive more detailed debug logs.
""" # noqa: E501

encoding: str = "TEXT"
log_level: str = "INFO"

def __post_init__(self):
if self.encoding not in LOG_MODE_DICT:
raise ValueError(
f"Invalid encoding type: {self.encoding}. "
f"Valid encoding types are: {list(LOG_MODE_DICT.keys())}"
)

def _get_dict_config(self) -> dict:
"""Get the logging configuration based on the encoding type.
Returns:
dict: The logging configuration.
"""
return LOG_MODE_DICT[self.encoding](self.log_level)
27 changes: 22 additions & 5 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import ray._private.state
import ray._private.storage as storage

from ray._private.structured_logging.logging_config import LoggingConfig

# Ray modules
import ray.actor
import ray.cloudpickle as pickle # noqa
Expand Down Expand Up @@ -1228,6 +1230,7 @@ def init(
configure_logging: bool = True,
logging_level: int = ray_constants.LOGGER_LEVEL,
logging_format: Optional[str] = None,
logging_config: Optional[LoggingConfig] = None,
log_to_driver: bool = True,
namespace: Optional[str] = None,
runtime_env: Optional[Union[Dict[str, Any], "RuntimeEnv"]] = None, # noqa: F821
Expand Down Expand Up @@ -1326,12 +1329,15 @@ def init(
configure_logging: True (default) if configuration of logging is
allowed here. Otherwise, the user may want to configure it
separately.
logging_level: Logging level, defaults to logging.INFO. Ignored unless
logging_level: Logging level for the "ray" logger of the driver process,
defaults to logging.INFO. Ignored unless "configure_logging" is true.
logging_format: Logging format for the "ray" logger of the driver process,
defaults to a string containing a timestamp, filename, line number, and
message. See the source file ray_constants.py for details. Ignored unless
"configure_logging" is true.
logging_format: Logging format, defaults to string containing a
timestamp, filename, line number, and message. See the source file
ray_constants.py for details. Ignored unless "configure_logging"
is true.
logging_config: [Experimental] Logging configuration will be applied to the
root loggers for both the driver process and all worker processes belonging
to the current job. See :class:`~LoggingConfig` for details.
log_to_driver: If true, the output from all of the worker
processes on all nodes will be directed to the driver.
namespace: A namespace is a logical grouping of jobs and named actors.
Expand Down Expand Up @@ -1384,11 +1390,17 @@ def init(
Exception: An exception is raised if an inappropriate combination of
arguments is passed in.
"""
# Configure the "ray" logger for the driver process.
if configure_logging:
setup_logger(logging_level, logging_format or ray_constants.LOGGER_FORMAT)
else:
logging.getLogger("ray").handlers.clear()

# Configure the logging settings for the driver process.
if logging_config:
dict_config = logging_config._get_dict_config()
logging.config.dictConfig(dict_config)

# Parse the hidden options:
_enable_object_reconstruction: bool = kwargs.pop(
"_enable_object_reconstruction", False
Expand Down Expand Up @@ -1561,6 +1573,11 @@ def sigterm_handler(signum, frame):
# Set runtime_env in job_config if passed in as part of ray.init()
job_config.set_runtime_env(runtime_env)

# Pass the logging_config to job_config to configure loggers of all worker
# processes belonging to the job.
if logging_config:
job_config.set_py_logging_config(logging_config)

redis_address, gcs_address = None, None
bootstrap_address = services.canonicalize_bootstrap_address(address, _temp_dir)
if bootstrap_address is not None:
Expand Down
16 changes: 15 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2585,6 +2585,20 @@ def maybe_initialize_job_config():
print(job_id_magic_token, end="")
print(job_id_magic_token, file=sys.stderr, end="")

# Configure worker process's Python logging.
log_config_dict = {}
serialized_py_logging_config = \
core_worker.get_job_config().serialized_py_logging_config
if serialized_py_logging_config:
logging_config = pickle.loads(serialized_py_logging_config)
log_config_dict = logging_config._get_dict_config()
if log_config_dict:
try:
logging.config.dictConfig(log_config_dict)
except Exception as e:
backtrace = \
"".join(traceback.format_exception(type(e), e, e.__traceback__))
core_worker.drain_and_exit_worker("user", backtrace)
job_config_initialized = True


Expand Down Expand Up @@ -3391,7 +3405,7 @@ cdef class CoreWorker:

if exit_type == "user":
c_exit_type = WORKER_EXIT_TYPE_USER_ERROR
if exit_type == "system":
elif exit_type == "system":
c_exit_type = WORKER_EXIT_TYPE_SYSTEM_ERROR
elif exit_type == "intentional_system_exit":
c_exit_type = WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM_ERROR
Expand Down
20 changes: 20 additions & 0 deletions python/ray/job_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

import ray.cloudpickle as pickle
from ray._private.structured_logging.logging_config import LoggingConfig
from ray.util.annotations import PublicAPI

if TYPE_CHECKING:
Expand Down Expand Up @@ -70,6 +72,8 @@ def __init__(
self.set_default_actor_lifetime(default_actor_lifetime)
# A list of directories that specify the search path for python workers.
self._py_driver_sys_path = _py_driver_sys_path or []
# Python logging configurations that will be passed to Ray tasks/actors.
self.py_logging_config = None

def set_metadata(self, key: str, value: str) -> None:
"""Add key-value pair to the metadata dictionary.
Expand Down Expand Up @@ -115,6 +119,20 @@ def set_runtime_env(
self.runtime_env = self._validate_runtime_env()
self._cached_pb = None

def set_py_logging_config(
self,
logging_config: Optional[LoggingConfig] = None,
):
"""Set the logging configuration for the job.
The logging configuration will be applied to the root loggers of
all Ray task and actor processes that belong to this job.
Args:
logging_config: The logging configuration to set.
"""
self.py_logging_config = logging_config

def set_ray_namespace(self, ray_namespace: str) -> None:
"""Set Ray :ref:`namespace <namespaces-guide>`.
Expand Down Expand Up @@ -188,6 +206,8 @@ def _get_proto_job_config(self):

if self._default_actor_lifetime is not None:
pb.default_actor_lifetime = self._default_actor_lifetime
if self.py_logging_config:
pb.serialized_py_logging_config = pickle.dumps(self.py_logging_config)
self._cached_pb = pb

return self._cached_pb
Expand Down
13 changes: 12 additions & 1 deletion python/ray/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from pathlib import Path
import pytest

import ray.cloudpickle as pickle

import ray
from ray._private.test_utils import (
run_string_as_driver,
Expand All @@ -19,7 +21,7 @@
format_web_url,
wait_for_pid_to_exit,
)
from ray.job_config import JobConfig
from ray.job_config import JobConfig, LoggingConfig
from ray.job_submission import JobSubmissionClient
from ray.dashboard.modules.job.pydantic_models import JobDetails

Expand Down Expand Up @@ -189,6 +191,15 @@ def test_config_metadata(shutdown_only):
assert dict(from_worker.metadata) == job_config.metadata


def test_logging_config_serialization():
logging_config = LoggingConfig(encoding="TEXT")
serialized_py_logging_config = pickle.dumps(logging_config)
job_config = JobConfig()
job_config.set_py_logging_config(logging_config)
pb = job_config._get_proto_job_config()
assert pb.serialized_py_logging_config == serialized_py_logging_config


def test_get_entrypoint():
get_entrypoint = """
from ray._private.utils import get_entrypoint_name
Expand Down
Loading

0 comments on commit 32cddae

Please sign in to comment.