Skip to content

Commit

Permalink
[Serve] Add actor id and worker id to Serve structured logs (#43725)
Browse files Browse the repository at this point in the history
Add actor_id and worker_id to Serve structured json logs.

---------

Signed-off-by: Gene Su <[email protected]>
  • Loading branch information
GeneDer authored Mar 6, 2024
1 parent 87ec8f6 commit 245df53
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 16 deletions.
2 changes: 2 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@
SERVE_LOG_COMPONENT = "component_name"
SERVE_LOG_COMPONENT_ID = "component_id"
SERVE_LOG_MESSAGE = "message"
SERVE_LOG_ACTOR_ID = "actor_id"
SERVE_LOG_WORKER_ID = "worker_id"
# This is a reserved for python logging module attribute, it should not be changed.
SERVE_LOG_LEVEL_NAME = "levelname"
SERVE_LOG_TIME = "asctime"
Expand Down
35 changes: 24 additions & 11 deletions python/ray/serve/_private/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
RAY_SERVE_ENABLE_JSON_LOGGING,
RAY_SERVE_ENABLE_MEMORY_PROFILING,
RAY_SERVE_LOG_TO_STDERR,
SERVE_LOG_ACTOR_ID,
SERVE_LOG_APPLICATION,
SERVE_LOG_COMPONENT,
SERVE_LOG_COMPONENT_ID,
Expand All @@ -26,6 +27,7 @@
SERVE_LOG_REQUEST_ID,
SERVE_LOG_ROUTE,
SERVE_LOG_TIME,
SERVE_LOG_WORKER_ID,
SERVE_LOGGER_NAME,
)
from ray.serve.schema import EncodingType, LoggingConfig
Expand All @@ -47,6 +49,12 @@ class ServeJSONFormatter(logging.Formatter):
based on the field of record.
"""

ADD_IF_EXIST_FIELDS = [
SERVE_LOG_REQUEST_ID,
SERVE_LOG_ROUTE,
SERVE_LOG_APPLICATION,
]

def __init__(
self,
component_name: str,
Expand All @@ -57,6 +65,19 @@ def __init__(
SERVE_LOG_LEVEL_NAME: SERVE_LOG_RECORD_FORMAT[SERVE_LOG_LEVEL_NAME],
SERVE_LOG_TIME: SERVE_LOG_RECORD_FORMAT[SERVE_LOG_TIME],
}
try:
runtime_context = ray.get_runtime_context()
actor_id = runtime_context.get_actor_id()
if actor_id:
self.component_log_fmt[SERVE_LOG_ACTOR_ID] = actor_id
worker_id = runtime_context.get_worker_id()
if worker_id:
self.component_log_fmt[SERVE_LOG_WORKER_ID] = worker_id
except Exception:
# If get_runtime_context() fails for any reason, do nothing (no adding
# actor_id and/or worker_id to the fmt)
pass

if component_type and component_type == ServeComponentType.REPLICA:
self.component_log_fmt[SERVE_LOG_DEPLOYMENT] = component_name
self.component_log_fmt[SERVE_LOG_REPLICA] = component_id
Expand All @@ -83,17 +104,9 @@ def format(self, record: logging.LogRecord) -> str:
record_format[SERVE_LOG_LEVEL_NAME] = record.levelname
record_format[SERVE_LOG_TIME] = self.asctime_formatter.format(record)

if SERVE_LOG_REQUEST_ID in record_attributes:
record_format[SERVE_LOG_REQUEST_ID] = record_attributes[
SERVE_LOG_REQUEST_ID
]
if SERVE_LOG_ROUTE in record_attributes:
record_format[SERVE_LOG_ROUTE] = record_attributes[SERVE_LOG_ROUTE]

if SERVE_LOG_APPLICATION in record_attributes:
record_format[SERVE_LOG_APPLICATION] = record_attributes[
SERVE_LOG_APPLICATION
]
for field in ServeJSONFormatter.ADD_IF_EXIST_FIELDS:
if field in record_attributes:
record_format[field] = record_attributes[field]

record_format[SERVE_LOG_MESSAGE] = self.message_formatter.format(record)

Expand Down
21 changes: 16 additions & 5 deletions python/ray/serve/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ def fn(*args):
"app_name": request_context.app_name,
"log_file": logger.handlers[1].baseFilename,
"replica": serve.get_replica_context().replica_tag,
"actor_id": ray.get_runtime_context().get_actor_id(),
"worker_id": ray.get_runtime_context().get_worker_id(),
}

@serve.deployment(
Expand All @@ -263,6 +265,8 @@ def __call__(self, req: starlette.requests.Request):
"app_name": request_context.app_name,
"log_file": logger.handlers[1].baseFilename,
"replica": serve.get_replica_context().replica_tag,
"actor_id": ray.get_runtime_context().get_actor_id(),
"worker_id": ray.get_runtime_context().get_worker_id(),
}

serve.run(fn.bind(), name="app1", route_prefix="/fn")
Expand Down Expand Up @@ -306,21 +310,28 @@ def check_log():
class_method_replica_id = resp2["replica"].split("#")[-1]
if json_log_format:
user_method_log_regex = (
f'.*"deployment": "{resp["app_name"]}_fn", '
".*"
f'"actor_id": "{resp["actor_id"]}", '
f'"worker_id": "{resp["worker_id"]}", '
f'"deployment": "{resp["app_name"]}_fn", '
f'"replica": "{method_replica_id}", '
f'"component_name": "replica", '
f'"request_id": "{resp["request_id"]}", '
f'"route": "{resp["route"]}", '
f'"application": "{resp["app_name"]}", "message":.* user func.*'
f'"application": "{resp["app_name"]}", '
'"message":.* user func.*'
)
user_class_method_log_regex = (
f'.*"deployment": "{resp2["app_name"]}_Model", '
".*"
f'"actor_id": "{resp2["actor_id"]}", '
f'"worker_id": "{resp2["worker_id"]}", '
f'"deployment": "{resp2["app_name"]}_Model", '
f'"replica": "{class_method_replica_id}", '
f'"component_name": "replica", '
f'"request_id": "{resp2["request_id"]}", '
f'"route": "{resp2["route"]}", '
f'"application": "{resp2["app_name"]}", "message":.* user log '
"message from class method.*"
f'"application": "{resp2["app_name"]}", '
'"message":.* user log message from class method.*'
)
else:
user_method_log_regex = (
Expand Down

0 comments on commit 245df53

Please sign in to comment.