Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][2/n] Core structured logging: add Text formatter and pass log config to worker process #45344

Merged
merged 23 commits into from
Jun 4, 2024
Merged
55 changes: 43 additions & 12 deletions python/ray/_private/structured_logging/formatters.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,55 @@
import logging
import json
from ray._private.structured_logging.constants import LogKey, LOGRECORD_STANDARD_ATTRS
from ray._private.ray_constants import LOGGER_FORMAT


class JSONFormatter(logging.Formatter):
def format(self, record):
record_format = {
LogKey.ASCTIME: self.formatTime(record),
def generate_record_format_attrs(
formatter: logging.Formatter,
record: logging.LogRecord,
exclude_standard_attrs,
) -> dict:
record_format_attrs = {}
# If `exclude_standard_attrs` is False, include the standard attributes.
# Otherwise, include only Ray and user-provided context.
if not exclude_standard_attrs:
record_format_attrs = {
LogKey.ASCTIME: formatter.formatTime(record),
LogKey.LEVELNAME: record.levelname,
LogKey.MESSAGE: record.getMessage(),
LogKey.FILENAME: record.filename,
LogKey.LINENO: record.lineno,
}
if record.exc_info:
if not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
record_format[LogKey.EXC_TEXT] = record.exc_text

for key, value in record.__dict__.items():
# Both Ray and user-provided context are stored in `record_format`.
if key not in LOGRECORD_STANDARD_ATTRS:
record_format[key] = value
return json.dumps(record_format)
record.exc_text = formatter.formatException(record.exc_info)
record_format_attrs[LogKey.EXC_TEXT] = record.exc_text

for key, value in record.__dict__.items():
# Both Ray and user-provided context are stored in `record_format`.
if key not in LOGRECORD_STANDARD_ATTRS:
record_format_attrs[key] = value
return record_format_attrs


class JSONFormatter(logging.Formatter):
def format(self, record):
record_format_attrs = generate_record_format_attrs(
self, record, exclude_standard_attrs=False
)
return json.dumps(record_format_attrs)


class TextFormatter(logging.Formatter):
def __init__(self) -> None:
self._inner_formatter = logging.Formatter(LOGGER_FORMAT)

def format(self, record: logging.LogRecord) -> str:
s = self._inner_formatter.format(record)
record_format_attrs = generate_record_format_attrs(
self, record, exclude_standard_attrs=True
)
additional_attrs = " ".join(
[f"{key}={value}" for key, value in record_format_attrs.items()]
)
return f"{s} {additional_attrs}"
31 changes: 31 additions & 0 deletions python/ray/_private/structured_logging/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from ray._private.structured_logging.formatters import TextFormatter
from ray._private.structured_logging.filters import CoreContextFilter

LOG_MODE_DICT = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can just define this in the constants.py?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving to constants.py will cause a circular import.

  • LOG_MODE_DICT requires TextFormatter and CoreContextFilter from formatters.py and filters.py.
  • Both filters.py and formatters.py also import constants.py.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated 2ec1b4f

"TEXT": lambda log_level: {
"version": 1,
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"version - to be set to an integer value representing the schema version. The only valid value at present is 1, but having this key allows the schema to evolve while still preserving backwards compatibility."

https://docs.python.org/3/library/logging.config.html

"disable_existing_loggers": False,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disable_existing_loggers – If specified as False, loggers which exist when this call is made are left enabled. The default is True because this enables old behaviour in a backward-compatible way. This behaviour is to disable any existing non-root loggers unless they or their ancestors are explicitly named in the logging configuration.

https://docs.python.org/3/library/logging.config.html

"formatters": {
"text": {
"()": TextFormatter,
},
},
"filters": {
"core_context": {
"()": CoreContextFilter,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The key '()' has been used as the special key because it is not a valid keyword parameter name, and so will not clash with the names of the keyword arguments used in the call. The '()' also serves as a mnemonic that the corresponding value is a callable."

Ref: https://docs.python.org/3/library/logging.config.html#logging-config-dictschema

},
},
"handlers": {
"console": {
"level": log_level,
"class": "logging.StreamHandler",
"formatter": "text",
"filters": ["core_context"],
},
},
"root": {
"level": log_level,
"handlers": ["console"],
},
},
}
17 changes: 16 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2583,6 +2583,21 @@ def maybe_initialize_job_config():
print(job_id_magic_token, end="")
print(job_id_magic_token, file=sys.stderr, end="")

# Configure worker process's Python logging.
log_config_dict = {}
serialized_py_logging_config = \
core_worker.get_job_config().serialized_py_logging_config
if serialized_py_logging_config:
py_logging_config = pickle.loads(serialized_py_logging_config)
log_config_dict = py_logging_config.get_dict_config()
if log_config_dict:
try:
logging.config.dictConfig(log_config_dict)
except Exception as e:
backtrace = \
"".join(traceback.format_exception(type(e), e, e.__traceback__))
print(backtrace)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't add print here, users can't see any error message from the driver process. The following screenshot is an example of backtrace.

 import ray
 import logging
 from ray.job_config import LoggingConfig

 ray.init(
     job_config=ray.job_config.JobConfig(py_logging_config=LoggingConfig({"abc": "123"}))
 )

 def init_logger():
     return logging.getLogger()

 logger = logging.getLogger("ray")
 logger.info("Driver process", extra={"username": "johndoe"})

 @ray.remote
 def f():
     logger = init_logger()
     logger.info("A Ray task")

 task_obj_ref = f.remote()
 ray.get(task_obj_ref)
Screenshot 2024-05-27 at 11 58 05 PM

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this print() for now to be consistent with the error handling of worker_process_startup_hook. Users are able to know the worker crash reason from ray list workers and ActorDiedException

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this print() for now to be consistent with the error handling of worker_process_startup_hook. Users are able to know the worker crash reason from ray list workers and ActorDiedException

I don't think any user can troubleshoot this kind of issue on their own before we configure the root logger of the driver process. It's OK to remove print here because:

  • We currently only support encoding and verify it during its initialization process. Hence, it is impossible to go into this except statement.
  • We have currently decided to configure the root logger of the driver process, so the error message will also be printed in the driver process, even though we don't add print(backtrace).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated 36dd127

core_worker.drain_and_exit_worker("user", backtrace)
job_config_initialized = True


Expand Down Expand Up @@ -3389,7 +3404,7 @@ cdef class CoreWorker:

if exit_type == "user":
c_exit_type = WORKER_EXIT_TYPE_USER_ERROR
if exit_type == "system":
elif exit_type == "system":
c_exit_type = WORKER_EXIT_TYPE_SYSTEM_ERROR
elif exit_type == "intentional_system_exit":
c_exit_type = WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM_ERROR
Expand Down
61 changes: 61 additions & 0 deletions python/ray/job_config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,68 @@
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

import ray.cloudpickle as pickle
from ray._private.structured_logging.utils import LOG_MODE_DICT
from ray.util.annotations import PublicAPI

if TYPE_CHECKING:
from ray.runtime_env import RuntimeEnv


@PublicAPI(stability="alpha")
class LoggingConfig:
def __init__(self, dict_config: Union[dict, str] = "TEXT", log_level: str = "INFO"):
"""
The class is used to store the Python logging configuration. It will be passed
to all Ray tasks and actors that belong to this job.

Examples:
.. testcode::

import ray
import logging
from ray.job_config import LoggingConfig

ray.init(
job_config=ray.job_config.JobConfig(py_logging_config=LoggingConfig("TEXT"))
)

@ray.remote
def f():
logger = logging.getLogger()
logger.info("This is a Ray task")

obj_ref = f.remote()
ray.get(obj_ref)

Args:
dict_config: dict_config can be a string or a dictionary. If it is a
string, it should be one of the keys in LOG_MODE_DICT, which has
the corresponding predefined logging configuration. If it is a
dictionary, it should be a valid logging configuration dictionary
that can be passed to the function `logging.config.dictConfig`.
log_level: The log level for the logging configuration. It only takes
effect when dict_config is a string.
"""
if isinstance(dict_config, str):
if dict_config not in LOG_MODE_DICT:
raise ValueError(
f"Invalid encoding type: {dict_config}. "
f"Valid encoding types are: {list(LOG_MODE_DICT.keys())}"
)
self.dict_config = dict_config
self.log_level = log_level

def get_dict_config(self) -> dict:
"""Get the logging configuration based on the encoding type.
Returns:
dict: The logging configuration.
"""
if isinstance(self.dict_config, str):
return LOG_MODE_DICT[self.dict_config](self.log_level)
return self.dict_config


@PublicAPI
class JobConfig:
"""A class used to store the configurations of a job.
Expand Down Expand Up @@ -50,6 +106,7 @@ def __init__(
ray_namespace: Optional[str] = None,
default_actor_lifetime: str = "non_detached",
_py_driver_sys_path: Optional[List[str]] = None,
py_logging_config: Optional[LoggingConfig] = None,
):
#: The jvm options for java workers of the job.
self.jvm_options = jvm_options or []
Expand All @@ -70,6 +127,8 @@ def __init__(
self.set_default_actor_lifetime(default_actor_lifetime)
# A list of directories that specify the search path for python workers.
self._py_driver_sys_path = _py_driver_sys_path or []
# Python logging configurations that will be passed to Ray tasks/actors.
self.py_logging_config = py_logging_config

def set_metadata(self, key: str, value: str) -> None:
"""Add key-value pair to the metadata dictionary.
Expand Down Expand Up @@ -188,6 +247,8 @@ def _get_proto_job_config(self):

if self._default_actor_lifetime is not None:
pb.default_actor_lifetime = self._default_actor_lifetime
if self.py_logging_config is not None:
pb.serialized_py_logging_config = pickle.dumps(self.py_logging_config)
self._cached_pb = pb

return self._cached_pb
Expand Down
20 changes: 19 additions & 1 deletion python/ray/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from pathlib import Path
import pytest

import ray.cloudpickle as pickle

import ray
from ray._private.test_utils import (
run_string_as_driver,
Expand All @@ -19,7 +21,7 @@
format_web_url,
wait_for_pid_to_exit,
)
from ray.job_config import JobConfig
from ray.job_config import JobConfig, LoggingConfig
from ray.job_submission import JobSubmissionClient
from ray.dashboard.modules.job.pydantic_models import JobDetails

Expand Down Expand Up @@ -189,6 +191,22 @@ def test_config_metadata(shutdown_only):
assert dict(from_worker.metadata) == job_config.metadata


class TestPyLoggingConfig:
def test_serialized_log_config_dict(self):
py_logging_config = LoggingConfig({"abc": "xyz"})
serialized_py_logging_config = pickle.dumps(py_logging_config)
job_config = JobConfig(py_logging_config=py_logging_config)
pb = job_config._get_proto_job_config()
assert pb.serialized_py_logging_config == serialized_py_logging_config

def test_log_config_str(self):
py_logging_config = LoggingConfig("TEXT")
serialized_py_logging_config = pickle.dumps(py_logging_config)
job_config = JobConfig(py_logging_config=py_logging_config)
pb = job_config._get_proto_job_config()
assert pb.serialized_py_logging_config == serialized_py_logging_config


def test_get_entrypoint():
get_entrypoint = """
from ray._private.utils import get_entrypoint_name
Expand Down
109 changes: 108 additions & 1 deletion python/ray/tests/test_structured_logging.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging.config
import pytest
import ray
import os
Expand All @@ -6,7 +7,9 @@
import json

from ray._private.structured_logging.filters import CoreContextFilter
from ray._private.structured_logging.formatters import JSONFormatter
from ray._private.structured_logging.formatters import JSONFormatter, TextFormatter
from ray.job_config import LoggingConfig
from ray._private.test_utils import run_string_as_driver


class TestCoreContextFilter:
Expand Down Expand Up @@ -122,6 +125,110 @@ def test_record_with_user_provided_context(self, shutdown_only):
assert "exc_text" not in record_dict


class TestTextFormatter:
def test_record_with_user_provided_context(self):
formatter = TextFormatter()
record = logging.makeLogRecord({"user": "ray"})
formatted = formatter.format(record)
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
assert "user=ray" in formatted

def test_record_with_exception(self):
formatter = TextFormatter()
record = logging.LogRecord(
name="test_logger",
level=logging.INFO,
pathname="test.py",
lineno=1000,
msg="Test message",
args=None,
exc_info=None,
)
formatted = formatter.format(record)
for s in ["INFO", "Test message", "test.py:1000", "--"]:
assert s in formatted


class TestLoggingConfig:
def test_log_level(self):
log_level = "DEBUG"
logging_config = LoggingConfig(log_level=log_level)
dict_config = logging_config.get_dict_config()
assert dict_config["handlers"]["console"]["level"] == log_level
assert dict_config["root"]["level"] == log_level

def test_invalid_dict_config(self):
with pytest.raises(ValueError):
LoggingConfig(dict_config="INVALID").get_dict_config()


class TestTextModeE2E:
def test_text_mode_task(self, shutdown_only):
script = """
import ray
import logging
from ray.job_config import LoggingConfig

ray.init(
job_config=ray.job_config.JobConfig(py_logging_config=LoggingConfig("TEXT"))
)

@ray.remote
def f():
logger = logging.getLogger()
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
logger.info("This is a Ray task")

obj_ref = f.remote()
ray.get(obj_ref)
"""
stderr = run_string_as_driver(script)
should_exist = [
"job_id",
"worker_id",
"node_id",
"task_id",
"INFO",
"This is a Ray task",
]
for s in should_exist:
assert s in stderr
assert "actor_id" not in stderr

def test_text_mode_actor(self, shutdown_only):
script = """
import ray
import logging
from ray.job_config import LoggingConfig

ray.init(
job_config=ray.job_config.JobConfig(py_logging_config=LoggingConfig("TEXT"))
)

@ray.remote
class actor:
def __init__(self):
pass

def print_message(self):
logger = logging.getLogger()
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
logger.info("This is a Ray actor")

actor_instance = actor.remote()
ray.get(actor_instance.print_message.remote())
"""
stderr = run_string_as_driver(script)
should_exist = [
"job_id",
"worker_id",
"node_id",
"actor_id",
"task_id",
"INFO",
"This is a Ray actor",
]
for s in should_exist:
assert s in stderr


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
Loading
Loading