-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core][2/n] Core structured logging: add Text formatter and pass log config to worker process #45344
[Core][2/n] Core structured logging: add Text formatter and pass log config to worker process #45344
Changes from 17 commits
99024ee
b380e9a
6e78f48
9113d25
9218481
def7635
a16f762
571e2bc
8ec3107
1b37390
dc62d5c
69f8b57
d9e63aa
2ec1b4f
9ef1eee
f5f5dec
d811347
229f9d4
c8621a0
2893b84
5083efb
36dd127
83cf819
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,55 @@ | ||
import logging | ||
import json | ||
from ray._private.structured_logging.constants import LogKey, LOGRECORD_STANDARD_ATTRS | ||
from ray._private.ray_constants import LOGGER_FORMAT | ||
|
||
|
||
class JSONFormatter(logging.Formatter): | ||
def format(self, record): | ||
record_format = { | ||
LogKey.ASCTIME: self.formatTime(record), | ||
def generate_record_format_attrs( | ||
formatter: logging.Formatter, | ||
record: logging.LogRecord, | ||
exclude_standard_attrs, | ||
) -> dict: | ||
record_format_attrs = {} | ||
# If `exclude_standard_attrs` is False, include the standard attributes. | ||
# Otherwise, include only Ray and user-provided context. | ||
if not exclude_standard_attrs: | ||
record_format_attrs = { | ||
LogKey.ASCTIME: formatter.formatTime(record), | ||
LogKey.LEVELNAME: record.levelname, | ||
LogKey.MESSAGE: record.getMessage(), | ||
LogKey.FILENAME: record.filename, | ||
LogKey.LINENO: record.lineno, | ||
} | ||
if record.exc_info: | ||
if not record.exc_text: | ||
record.exc_text = self.formatException(record.exc_info) | ||
record_format[LogKey.EXC_TEXT] = record.exc_text | ||
|
||
for key, value in record.__dict__.items(): | ||
# Both Ray and user-provided context are stored in `record_format`. | ||
if key not in LOGRECORD_STANDARD_ATTRS: | ||
record_format[key] = value | ||
return json.dumps(record_format) | ||
record.exc_text = formatter.formatException(record.exc_info) | ||
record_format_attrs[LogKey.EXC_TEXT] = record.exc_text | ||
|
||
for key, value in record.__dict__.items(): | ||
# Both Ray and user-provided context are stored in `record_format`. | ||
if key not in LOGRECORD_STANDARD_ATTRS: | ||
record_format_attrs[key] = value | ||
return record_format_attrs | ||
|
||
|
||
class JSONFormatter(logging.Formatter): | ||
def format(self, record): | ||
record_format_attrs = generate_record_format_attrs( | ||
self, record, exclude_standard_attrs=False | ||
) | ||
return json.dumps(record_format_attrs) | ||
|
||
|
||
class TextFormatter(logging.Formatter): | ||
def __init__(self) -> None: | ||
self._inner_formatter = logging.Formatter(LOGGER_FORMAT) | ||
|
||
def format(self, record: logging.LogRecord) -> str: | ||
s = self._inner_formatter.format(record) | ||
record_format_attrs = generate_record_format_attrs( | ||
self, record, exclude_standard_attrs=True | ||
) | ||
additional_attrs = " ".join( | ||
[f"{key}={value}" for key, value in record_format_attrs.items()] | ||
) | ||
return f"{s} {additional_attrs}" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from typing import Optional | ||
|
||
from ray._private.structured_logging.constants import LOG_MODE_DICT | ||
from ray.util.annotations import PublicAPI | ||
|
||
|
||
@PublicAPI(stability="alpha") | ||
class LoggingConfig: | ||
def __init__(self, encoding: Optional[str] = "TEXT", log_level: str = "INFO"): | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
The class is used to store the Python logging configuration. It will be applied | ||
to the root loggers of the driver process and all Ray task and actor processes | ||
that belong to the this job. | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Examples: | ||
.. testcode:: | ||
|
||
import ray | ||
import logging | ||
|
||
ray.init( | ||
logging_config=ray.LoggingConfig(encoding="TEXT", log_level="INFO") | ||
) | ||
|
||
@ray.remote | ||
def f(): | ||
logger = logging.getLogger() | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.info("This is a Ray task") | ||
|
||
obj_ref = f.remote() | ||
ray.get(obj_ref) | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Args: | ||
encoding: encoding is a string, and it should be one of the keys in | ||
LOG_MODE_DICT, which has the corresponding predefined logging | ||
configuration. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Encoding type for the logs. The valid value is 'TEXT'". Ideally we can auto generate the docstring based on the value of LOG_MODE_DICT by doing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Updated c8621a0
TODO There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Do we need to include this in this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can do it next |
||
log_level: The log level for the logging configuration. | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
if encoding not in LOG_MODE_DICT: | ||
raise ValueError( | ||
f"Invalid encoding type: {encoding}. " | ||
f"Valid encoding types are: {list(LOG_MODE_DICT.keys())}" | ||
) | ||
self.encoding = encoding | ||
self.log_level = log_level | ||
|
||
def get_dict_config(self) -> dict: | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Get the logging configuration based on the encoding type. | ||
Returns: | ||
dict: The logging configuration. | ||
""" | ||
return LOG_MODE_DICT[self.encoding](self.log_level) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,8 @@ | |
import ray._private.state | ||
import ray._private.storage as storage | ||
|
||
from ray._private.structured_logging.logging_config import LoggingConfig | ||
|
||
Comment on lines
+53
to
+54
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need since we just do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will cause a circular import issue if we use
|
||
# Ray modules | ||
import ray.actor | ||
import ray.cloudpickle as pickle # noqa | ||
|
@@ -1228,6 +1230,7 @@ def init( | |
configure_logging: bool = True, | ||
logging_level: int = ray_constants.LOGGER_LEVEL, | ||
logging_format: Optional[str] = None, | ||
logging_config: Optional[LoggingConfig] = None, | ||
log_to_driver: bool = True, | ||
namespace: Optional[str] = None, | ||
runtime_env: Optional[Union[Dict[str, Any], "RuntimeEnv"]] = None, # noqa: F821 | ||
|
@@ -1332,6 +1335,8 @@ def init( | |
timestamp, filename, line number, and message. See the source file | ||
ray_constants.py for details. Ignored unless "configure_logging" | ||
is true. | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logging_config: Logging configuration will be applied to both the driver | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
process and all worker processes belonging to the same job. | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
log_to_driver: If true, the output from all of the worker | ||
processes on all nodes will be directed to the driver. | ||
namespace: A namespace is a logical grouping of jobs and named actors. | ||
|
@@ -1384,11 +1389,17 @@ def init( | |
Exception: An exception is raised if an inappropriate combination of | ||
arguments is passed in. | ||
""" | ||
# Configure the "ray" logger for the driver process. | ||
if configure_logging: | ||
setup_logger(logging_level, logging_format or ray_constants.LOGGER_FORMAT) | ||
else: | ||
logging.getLogger("ray").handlers.clear() | ||
|
||
# Configure the logging settings for the driver process. | ||
if logging_config: | ||
dict_config = logging_config.get_dict_config() | ||
logging.config.dictConfig(dict_config) | ||
|
||
# Parse the hidden options: | ||
_enable_object_reconstruction: bool = kwargs.pop( | ||
"_enable_object_reconstruction", False | ||
|
@@ -1561,6 +1572,11 @@ def sigterm_handler(signum, frame): | |
# Set runtime_env in job_config if passed in as part of ray.init() | ||
job_config.set_runtime_env(runtime_env) | ||
|
||
# Pass the logging_config to job_config to configure loggers of all worker | ||
# processes belonging to the job. | ||
if logging_config: | ||
job_config.set_logging_config(logging_config) | ||
|
||
redis_address, gcs_address = None, None | ||
bootstrap_address = services.canonicalize_bootstrap_address(address, _temp_dir) | ||
if bootstrap_address is not None: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2583,6 +2583,21 @@ def maybe_initialize_job_config(): | |
print(job_id_magic_token, end="") | ||
print(job_id_magic_token, file=sys.stderr, end="") | ||
|
||
# Configure worker process's Python logging. | ||
log_config_dict = {} | ||
serialized_logging_config = \ | ||
core_worker.get_job_config().serialized_logging_config | ||
if serialized_logging_config: | ||
logging_config = pickle.loads(serialized_logging_config) | ||
log_config_dict = logging_config.get_dict_config() | ||
if log_config_dict: | ||
try: | ||
logging.config.dictConfig(log_config_dict) | ||
except Exception as e: | ||
backtrace = \ | ||
"".join(traceback.format_exception(type(e), e, e.__traceback__)) | ||
print(backtrace) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't add import ray
import logging
from ray.job_config import LoggingConfig
ray.init(
job_config=ray.job_config.JobConfig(py_logging_config=LoggingConfig({"abc": "123"}))
)
def init_logger():
return logging.getLogger()
logger = logging.getLogger("ray")
logger.info("Driver process", extra={"username": "johndoe"})
@ray.remote
def f():
logger = init_logger()
logger.info("A Ray task")
task_obj_ref = f.remote()
ray.get(task_obj_ref) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's remove this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't think any user can troubleshoot this kind of issue on their own before we configure the root logger of the driver process. It's OK to remove
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated 36dd127 |
||
core_worker.drain_and_exit_worker("user", backtrace) | ||
job_config_initialized = True | ||
|
||
|
||
|
@@ -3389,7 +3404,7 @@ cdef class CoreWorker: | |
|
||
if exit_type == "user": | ||
c_exit_type = WORKER_EXIT_TYPE_USER_ERROR | ||
if exit_type == "system": | ||
elif exit_type == "system": | ||
c_exit_type = WORKER_EXIT_TYPE_SYSTEM_ERROR | ||
elif exit_type == "intentional_system_exit": | ||
c_exit_type = WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM_ERROR | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I think we can make it a dataclass. Other configs in the codebase are dataclass such as
FailureConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the benefit of using
dataclass
here? It is not obvious to me, especially since we want to verify whetherencoding
is inLOG_MODE_DICT
and raise aValueError
if it doesn't exist in the__init__
function. Hence, we can't benefit from the automatic generation of__init__
bydataclass
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dataclass has
__post_init__()
for validation purpose.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Told by chatgpt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I knew it. That's why I said that "we can't benefit from the automatic generation of init by dataclass". I also asked ChatGPT and read something similar to what you pasted above, but didn't find the benefits we were looking for in this case. I will update it to
dataclass
to keep it consistent with other config classes, but I am still curious about the motivation.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated 5083efb