-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Serve] Add actor id and worker id to Serve structured logs #43725
Changes from 4 commits
aa21e5a
0ab01ce
ad62669
074c511
356d544
59ef04f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
RAY_SERVE_ENABLE_JSON_LOGGING, | ||
RAY_SERVE_ENABLE_MEMORY_PROFILING, | ||
RAY_SERVE_LOG_TO_STDERR, | ||
SERVE_LOG_ACTOR_ID, | ||
SERVE_LOG_APPLICATION, | ||
SERVE_LOG_COMPONENT, | ||
SERVE_LOG_COMPONENT_ID, | ||
|
@@ -26,6 +27,7 @@ | |
SERVE_LOG_REQUEST_ID, | ||
SERVE_LOG_ROUTE, | ||
SERVE_LOG_TIME, | ||
SERVE_LOG_WORKER_ID, | ||
SERVE_LOGGER_NAME, | ||
) | ||
from ray.serve.schema import EncodingType, LoggingConfig | ||
|
@@ -47,16 +49,38 @@ class ServeJSONFormatter(logging.Formatter): | |
based on the field of record. | ||
""" | ||
|
||
ADD_IF_EXIST_FIELDS = [ | ||
SERVE_LOG_REQUEST_ID, | ||
SERVE_LOG_ROUTE, | ||
SERVE_LOG_APPLICATION, | ||
] | ||
|
||
def __init__( | ||
self, | ||
component_name: str, | ||
component_id: str, | ||
component_type: Optional[ServeComponentType] = None, | ||
): | ||
runtime_context = ray.get_runtime_context() | ||
self.component_log_fmt = { | ||
SERVE_LOG_LEVEL_NAME: SERVE_LOG_RECORD_FORMAT[SERVE_LOG_LEVEL_NAME], | ||
SERVE_LOG_TIME: SERVE_LOG_RECORD_FORMAT[SERVE_LOG_TIME], | ||
SERVE_LOG_ACTOR_ID: runtime_context.get_actor_id(), | ||
SERVE_LOG_WORKER_ID: runtime_context.get_worker_id(), | ||
} | ||
try: | ||
runtime_context = ray.get_runtime_context() | ||
actor_id = runtime_context.get_actor_id() | ||
if actor_id: | ||
self.component_log_fmt[SERVE_LOG_ACTOR_ID] = actor_id | ||
worker_id = runtime_context.get_worker_id() | ||
if worker_id: | ||
self.component_log_fmt[SERVE_LOG_WORKER_ID] = worker_id | ||
except Exception: | ||
# If get_runtime_context() fails for any reason, do nothing (no adding | ||
# actor_id and/or worker_id to the fmt) | ||
pass | ||
|
||
if component_type and component_type == ServeComponentType.REPLICA: | ||
self.component_log_fmt[SERVE_LOG_DEPLOYMENT] = component_name | ||
self.component_log_fmt[SERVE_LOG_REPLICA] = component_id | ||
|
@@ -82,21 +106,12 @@ def format(self, record: logging.LogRecord) -> str: | |
record_attributes = copy.deepcopy(record.__dict__) | ||
record_format[SERVE_LOG_LEVEL_NAME] = record.levelname | ||
record_format[SERVE_LOG_TIME] = self.asctime_formatter.format(record) | ||
|
||
if SERVE_LOG_REQUEST_ID in record_attributes: | ||
record_format[SERVE_LOG_REQUEST_ID] = record_attributes[ | ||
SERVE_LOG_REQUEST_ID | ||
] | ||
if SERVE_LOG_ROUTE in record_attributes: | ||
record_format[SERVE_LOG_ROUTE] = record_attributes[SERVE_LOG_ROUTE] | ||
|
||
if SERVE_LOG_APPLICATION in record_attributes: | ||
record_format[SERVE_LOG_APPLICATION] = record_attributes[ | ||
SERVE_LOG_APPLICATION | ||
] | ||
|
||
record_format[SERVE_LOG_MESSAGE] = self.message_formatter.format(record) | ||
|
||
for field in ServeJSONFormatter.ADD_IF_EXIST_FIELDS: | ||
if field in record_attributes: | ||
record_format[field] = record_attributes[field] | ||
Comment on lines
+107
to
+109
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe the attributes added here can be included in the base There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would require us to pass more things to the constructor, but I guess it makes more sense there since they won't change between each log messages. Let me update it :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These ones are just calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can't you just call it in there? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh that's right. Let me just call those in init 😅 |
||
|
||
if SERVE_LOG_EXTRA_FIELDS in record_attributes: | ||
if not isinstance(record_attributes[SERVE_LOG_EXTRA_FIELDS], dict): | ||
raise ValueError( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -248,6 +248,8 @@ def fn(*args): | |
"app_name": request_context.app_name, | ||
"log_file": logger.handlers[1].baseFilename, | ||
"replica": serve.get_replica_context().replica_tag, | ||
"actor_id": ray.get_runtime_context().get_actor_id(), | ||
"worker_id": ray.get_runtime_context().get_worker_id(), | ||
} | ||
|
||
@serve.deployment( | ||
|
@@ -263,6 +265,8 @@ def __call__(self, req: starlette.requests.Request): | |
"app_name": request_context.app_name, | ||
"log_file": logger.handlers[1].baseFilename, | ||
"replica": serve.get_replica_context().replica_tag, | ||
"actor_id": ray.get_runtime_context().get_actor_id(), | ||
"worker_id": ray.get_runtime_context().get_worker_id(), | ||
} | ||
|
||
serve.run(fn.bind(), name="app1", route_prefix="/fn") | ||
|
@@ -306,21 +310,28 @@ def check_log(): | |
class_method_replica_id = resp2["replica"].split("#")[-1] | ||
if json_log_format: | ||
user_method_log_regex = ( | ||
f'.*"deployment": "{resp["app_name"]}_fn", ' | ||
".*" | ||
f'"actor_id": "{resp["actor_id"]}", ' | ||
f'"worker_id": "{resp["worker_id"]}", ' | ||
f'"deployment": "{resp["app_name"]}_fn", ' | ||
f'"replica": "{method_replica_id}", ' | ||
f'"component_name": "replica", ' | ||
'"message":.* user func.*, ' | ||
f'"request_id": "{resp["request_id"]}", ' | ||
f'"route": "{resp["route"]}", ' | ||
f'"application": "{resp["app_name"]}", "message":.* user func.*' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are the message fields removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's just reordered to before the request id. This is due to my refactor for drying up the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nope doesn't matter |
||
f'"application": "{resp["app_name"]}"' | ||
) | ||
user_class_method_log_regex = ( | ||
f'.*"deployment": "{resp2["app_name"]}_Model", ' | ||
".*" | ||
f'"actor_id": "{resp2["actor_id"]}", ' | ||
f'"worker_id": "{resp2["worker_id"]}", ' | ||
f'"deployment": "{resp2["app_name"]}_Model", ' | ||
f'"replica": "{class_method_replica_id}", ' | ||
f'"component_name": "replica", ' | ||
'"message":.* user log message from class method.*' | ||
f'"request_id": "{resp2["request_id"]}", ' | ||
f'"route": "{resp2["route"]}", ' | ||
f'"application": "{resp2["app_name"]}", "message":.* user log ' | ||
"message from class method.*" | ||
f'"application": "{resp2["app_name"]}"' | ||
) | ||
else: | ||
user_method_log_regex = ( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you probably meant to put the
try-except
block above the dict construction :)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I meant to remove them in line 68-69 and just have they added to the dictionary in the try-except block 😅