Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][2/n] Core structured logging: add Text formatter and pass log config to worker process #45344

Merged
merged 23 commits into from
Jun 4, 2024
Merged
4 changes: 4 additions & 0 deletions python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ def _configure_system():
wait,
)

from ray._private.structured_logging.logging_config import LoggingConfig # noqa: E402

# We import ray.actor because some code is run in actor.py which initializes
# some functions in the worker.
import ray.actor # noqa: E402,F401
Expand Down Expand Up @@ -201,6 +203,7 @@ def __getattr__(self, attr):
"LOCAL_MODE",
"SCRIPT_MODE",
"WORKER_MODE",
"LoggingConfig",
]

# Public APIs that should automatically trigger ray.init().
Expand Down Expand Up @@ -239,6 +242,7 @@ def __getattr__(self, attr):
"show_in_dashboard",
"shutdown",
"timeline",
"LoggingConfig",
}

assert set(__all__) == AUTO_INIT_APIS | NON_AUTO_INIT_APIS
Expand Down
30 changes: 30 additions & 0 deletions python/ray/_private/structured_logging/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,33 @@ class LogKey(str, Enum):
FILENAME = "filename"
LINENO = "lineno"
EXC_TEXT = "exc_text"


LOG_MODE_DICT = {
"TEXT": lambda log_level: {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"text": {
"()": "ray._private.structured_logging.formatters.TextFormatter",
},
},
"filters": {
"core_context": {
"()": "ray._private.structured_logging.filters.CoreContextFilter",
},
},
"handlers": {
"console": {
"level": log_level,
"class": "logging.StreamHandler",
"formatter": "text",
"filters": ["core_context"],
},
},
"root": {
"level": log_level,
"handlers": ["console"],
},
},
}
55 changes: 43 additions & 12 deletions python/ray/_private/structured_logging/formatters.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,55 @@
import logging
import json
from ray._private.structured_logging.constants import LogKey, LOGRECORD_STANDARD_ATTRS
from ray._private.ray_constants import LOGGER_FORMAT


class JSONFormatter(logging.Formatter):
def format(self, record):
record_format = {
LogKey.ASCTIME: self.formatTime(record),
def generate_record_format_attrs(
formatter: logging.Formatter,
record: logging.LogRecord,
exclude_standard_attrs,
) -> dict:
record_format_attrs = {}
# If `exclude_standard_attrs` is False, include the standard attributes.
# Otherwise, include only Ray and user-provided context.
if not exclude_standard_attrs:
record_format_attrs = {
LogKey.ASCTIME: formatter.formatTime(record),
LogKey.LEVELNAME: record.levelname,
LogKey.MESSAGE: record.getMessage(),
LogKey.FILENAME: record.filename,
LogKey.LINENO: record.lineno,
}
if record.exc_info:
if not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
record_format[LogKey.EXC_TEXT] = record.exc_text

for key, value in record.__dict__.items():
# Both Ray and user-provided context are stored in `record_format`.
if key not in LOGRECORD_STANDARD_ATTRS:
record_format[key] = value
return json.dumps(record_format)
record.exc_text = formatter.formatException(record.exc_info)
record_format_attrs[LogKey.EXC_TEXT] = record.exc_text

for key, value in record.__dict__.items():
# Both Ray and user-provided context are stored in `record_format`.
if key not in LOGRECORD_STANDARD_ATTRS:
record_format_attrs[key] = value
return record_format_attrs


class JSONFormatter(logging.Formatter):
def format(self, record):
record_format_attrs = generate_record_format_attrs(
self, record, exclude_standard_attrs=False
)
return json.dumps(record_format_attrs)


class TextFormatter(logging.Formatter):
def __init__(self) -> None:
self._inner_formatter = logging.Formatter(LOGGER_FORMAT)

def format(self, record: logging.LogRecord) -> str:
s = self._inner_formatter.format(record)
record_format_attrs = generate_record_format_attrs(
self, record, exclude_standard_attrs=True
)
additional_attrs = " ".join(
[f"{key}={value}" for key, value in record_format_attrs.items()]
)
return f"{s} {additional_attrs}"
58 changes: 58 additions & 0 deletions python/ray/_private/structured_logging/logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from ray._private.structured_logging.constants import LOG_MODE_DICT
from ray.util.annotations import PublicAPI

from dataclasses import dataclass


@PublicAPI(stability="alpha")
@dataclass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kw_only=True

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kw_only is added in Python 3.10. It may be incompatible with some Ray images.

Screenshot 2024-06-03 at 4 27 31 PM

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

class LoggingConfig:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think we can make it a dataclass. Other configs in the codebase are dataclass such as FailureConfig

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the benefit of using dataclass here? It is not obvious to me, especially since we want to verify whether encoding is in LOG_MODE_DICT and raise a ValueError if it doesn't exist in the __init__ function. Hence, we can't benefit from the automatic generation of __init__ by dataclass.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataclass has __post_init__() for validation purpose.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python's dataclass is a decorator that provides an easy way to create classes that primarily store data without having to write boilerplate code. Here are some key advantages of using dataclass:

Advantages of Using dataclass
Reduced Boilerplate Code:

Automatically generates __init__, __repr__, __eq__, and other special methods based on the class attributes.
Saves time and effort by reducing the amount of code you need to write and maintain.
Improved Readability:

Enhances code readability and clarity by explicitly defining data structures in a concise manner.
Makes it clear that the class is intended to be used as a simple data container.
Built-in Methods:

Provides default implementations for common methods:
__init__: Initializes the class with the given attributes.
__repr__: Provides a string representation of the class instance, useful for debugging.
__eq__: Compares instances of the class based on attribute values.
__hash__: Allows instances to be used in hashed collections (e.g., sets, dictionaries) if frozen=True is set.
Type Annotations:

Encourages the use of type annotations, improving code quality and aiding in static analysis and IDE support.
Mutability Control:

Allows control over mutability with the frozen parameter:
frozen=True makes the instances of the class immutable, similar to tuples.
Ensures data integrity by preventing modification of attributes.
Default Values and Factory Functions:

Supports default values and default factory functions for attributes, providing flexibility in instance creation.
Automatic Order Methods:

With the order=True parameter, automatically generates comparison methods (__lt__, __le__, __gt__, __ge__) based on the order of attributes.
Easy Conversion to/from Other Formats:

Simplifies conversion between data classes and other formats (e.g., dictionaries, JSON) using utility functions.

Told by chatgpt

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataclass has post_init() for validation purpose.

I knew it. That's why I said that "we can't benefit from the automatic generation of init by dataclass". I also asked ChatGPT and read something similar to what you pasted above, but didn't find the benefits we were looking for in this case. I will update it to dataclass to keep it consistent with other config classes, but I am still curious about the motivation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated 5083efb

"""
Logging configuration for a Ray job. These configurations are used to set up the
root logger of the driver process and all Ray tasks and actor processes that belong
to the job.
Examples:
.. testcode::
import ray
import logging
ray.init(
logging_config=ray.LoggingConfig(encoding="TEXT", log_level="INFO")
)
@ray.remote
def f():
logger = logging.getLogger(__name__)
logger.info("This is a Ray task")
ray.get(f.remote())
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
.. testoutput::
:options: +MOCK
2024-06-03 07:53:50,815 INFO test.py:11 -- This is a Ray task job_id=01000000 worker_id=0dbbbd0f17d5343bbeee8228fa5ff675fe442445a1bc06ec899120a8 node_id=577706f1040ea8ebd76f7cf5a32338d79fe442e01455b9e7110cddfc task_id=c8ef45ccd0112571ffffffffffffffffffffffff01000000
Args:
encoding: Encoding type for the logs. The valid value is 'TEXT'
log_level: Log level for the logs. Defaults to 'INFO'. You can set
it to 'DEBUG' to receive more detailed debug logs.
""" # noqa: E501

encoding: str = "TEXT"
log_level: str = "INFO"

def __post_init__(self):
if self.encoding not in LOG_MODE_DICT:
raise ValueError(
f"Invalid encoding type: {self.encoding}. "
f"Valid encoding types are: {list(LOG_MODE_DICT.keys())}"
)

def _get_dict_config(self) -> dict:
"""Get the logging configuration based on the encoding type.
Returns:
dict: The logging configuration.
"""
return LOG_MODE_DICT[self.encoding](self.log_level)
27 changes: 22 additions & 5 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import ray._private.state
import ray._private.storage as storage

from ray._private.structured_logging.logging_config import LoggingConfig

Comment on lines +53 to +54
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need since we just do ray.LoggingConfig?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will cause a circular import issue if we use ray.LoggingConfig.

ImportError while loading conftest '/home/ubuntu/workspace/ray/python/ray/tests/conftest.py'.
conftest.py:21: in <module>
    import ray
../__init__.py:113: in <module>
    from ray._private.worker import (  # noqa: E402,F401
../_private/worker.py:1233: in <module>
    logging_config: Optional[ray.LoggingConfig] = None,
E   AttributeError: partially initialized module 'ray' has no attribute 'LoggingConfig' (most likely due to a circular import)

# Ray modules
import ray.actor
import ray.cloudpickle as pickle # noqa
Expand Down Expand Up @@ -1228,6 +1230,7 @@ def init(
configure_logging: bool = True,
logging_level: int = ray_constants.LOGGER_LEVEL,
logging_format: Optional[str] = None,
logging_config: Optional[LoggingConfig] = None,
log_to_driver: bool = True,
namespace: Optional[str] = None,
runtime_env: Optional[Union[Dict[str, Any], "RuntimeEnv"]] = None, # noqa: F821
Expand Down Expand Up @@ -1326,12 +1329,15 @@ def init(
configure_logging: True (default) if configuration of logging is
allowed here. Otherwise, the user may want to configure it
separately.
logging_level: Logging level, defaults to logging.INFO. Ignored unless
logging_level: Logging level for the "ray" logger of the driver process,
defaults to logging.INFO. Ignored unless "configure_logging" is true.
logging_format: Logging format for the "ray" logger of the driver process,
defaults to a string containing a timestamp, filename, line number, and
message. See the source file ray_constants.py for details. Ignored unless
"configure_logging" is true.
logging_format: Logging format, defaults to string containing a
timestamp, filename, line number, and message. See the source file
ray_constants.py for details. Ignored unless "configure_logging"
is true.
logging_config: [Experimental] Logging configuration will be applied to the
root loggers for both the driver process and all worker processes belonging
to the current job. See :class:`~LoggingConfig` for details.
log_to_driver: If true, the output from all of the worker
processes on all nodes will be directed to the driver.
namespace: A namespace is a logical grouping of jobs and named actors.
Expand Down Expand Up @@ -1384,11 +1390,17 @@ def init(
Exception: An exception is raised if an inappropriate combination of
arguments is passed in.
"""
# Configure the "ray" logger for the driver process.
if configure_logging:
setup_logger(logging_level, logging_format or ray_constants.LOGGER_FORMAT)
else:
logging.getLogger("ray").handlers.clear()

# Configure the logging settings for the driver process.
if logging_config:
dict_config = logging_config._get_dict_config()
logging.config.dictConfig(dict_config)

# Parse the hidden options:
_enable_object_reconstruction: bool = kwargs.pop(
"_enable_object_reconstruction", False
Expand Down Expand Up @@ -1561,6 +1573,11 @@ def sigterm_handler(signum, frame):
# Set runtime_env in job_config if passed in as part of ray.init()
job_config.set_runtime_env(runtime_env)

# Pass the logging_config to job_config to configure loggers of all worker
# processes belonging to the job.
if logging_config:
job_config.set_py_logging_config(logging_config)

redis_address, gcs_address = None, None
bootstrap_address = services.canonicalize_bootstrap_address(address, _temp_dir)
if bootstrap_address is not None:
Expand Down
16 changes: 15 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2583,6 +2583,20 @@ def maybe_initialize_job_config():
print(job_id_magic_token, end="")
print(job_id_magic_token, file=sys.stderr, end="")

# Configure worker process's Python logging.
log_config_dict = {}
serialized_py_logging_config = \
core_worker.get_job_config().serialized_py_logging_config
if serialized_py_logging_config:
logging_config = pickle.loads(serialized_py_logging_config)
log_config_dict = logging_config._get_dict_config()
if log_config_dict:
try:
logging.config.dictConfig(log_config_dict)
except Exception as e:
backtrace = \
"".join(traceback.format_exception(type(e), e, e.__traceback__))
core_worker.drain_and_exit_worker("user", backtrace)
job_config_initialized = True


Expand Down Expand Up @@ -3389,7 +3403,7 @@ cdef class CoreWorker:

if exit_type == "user":
c_exit_type = WORKER_EXIT_TYPE_USER_ERROR
if exit_type == "system":
elif exit_type == "system":
c_exit_type = WORKER_EXIT_TYPE_SYSTEM_ERROR
elif exit_type == "intentional_system_exit":
c_exit_type = WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM_ERROR
Expand Down
20 changes: 20 additions & 0 deletions python/ray/job_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

import ray.cloudpickle as pickle
from ray._private.structured_logging.logging_config import LoggingConfig
from ray.util.annotations import PublicAPI

if TYPE_CHECKING:
Expand Down Expand Up @@ -70,6 +72,8 @@ def __init__(
self.set_default_actor_lifetime(default_actor_lifetime)
# A list of directories that specify the search path for python workers.
self._py_driver_sys_path = _py_driver_sys_path or []
# Python logging configurations that will be passed to Ray tasks/actors.
self.py_logging_config = None

def set_metadata(self, key: str, value: str) -> None:
"""Add key-value pair to the metadata dictionary.
Expand Down Expand Up @@ -115,6 +119,20 @@ def set_runtime_env(
self.runtime_env = self._validate_runtime_env()
self._cached_pb = None

def set_py_logging_config(
self,
logging_config: Optional[LoggingConfig] = None,
):
"""Set the logging configuration for the job.
The logging configuration will be applied to the root loggers of
all Ray task and actor processes that belong to this job.
Args:
logging_config: The logging configuration to set.
"""
self.py_logging_config = logging_config

def set_ray_namespace(self, ray_namespace: str) -> None:
"""Set Ray :ref:`namespace <namespaces-guide>`.
Expand Down Expand Up @@ -188,6 +206,8 @@ def _get_proto_job_config(self):

if self._default_actor_lifetime is not None:
pb.default_actor_lifetime = self._default_actor_lifetime
if self.py_logging_config:
pb.serialized_py_logging_config = pickle.dumps(self.py_logging_config)
self._cached_pb = pb

return self._cached_pb
Expand Down
13 changes: 12 additions & 1 deletion python/ray/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from pathlib import Path
import pytest

import ray.cloudpickle as pickle

import ray
from ray._private.test_utils import (
run_string_as_driver,
Expand All @@ -19,7 +21,7 @@
format_web_url,
wait_for_pid_to_exit,
)
from ray.job_config import JobConfig
from ray.job_config import JobConfig, LoggingConfig
from ray.job_submission import JobSubmissionClient
from ray.dashboard.modules.job.pydantic_models import JobDetails

Expand Down Expand Up @@ -189,6 +191,15 @@ def test_config_metadata(shutdown_only):
assert dict(from_worker.metadata) == job_config.metadata


def test_logging_config_serialization():
logging_config = LoggingConfig(encoding="TEXT")
serialized_py_logging_config = pickle.dumps(logging_config)
job_config = JobConfig()
job_config.set_py_logging_config(logging_config)
pb = job_config._get_proto_job_config()
assert pb.serialized_py_logging_config == serialized_py_logging_config


def test_get_entrypoint():
get_entrypoint = """
from ray._private.utils import get_entrypoint_name
Expand Down
Loading
Loading