-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][state] Task log - Improve log tailing from log_client and support tailing from offsets [2/4] #28188
Conversation
Signed-off-by: rickyyx <[email protected]>
Signed-off-by: rickyyx <[email protected]>
Signed-off-by: rickyyx <[email protected]>
Signed-off-by: rickyyx <[email protected]>
Signed-off-by: rickyyx <[email protected]>
TODO: rerun the release test after review |
Let me review this asap |
Signed-off-by: rickyyx <[email protected]>
Signed-off-by: rickyyx <[email protected]>
@@ -276,7 +276,7 @@ def write_log(self, log_file_size_byte: int): | |||
|
|||
time_taken = 0 | |||
t_start = time.perf_counter() | |||
for s in get_log(actor_id=actor._actor_id.hex(), tail=-1): | |||
for s in get_log(actor_id=actor._actor_id.hex(), tail=1000000000): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this was the original failure case
It was a bit over-engineering for the original tailing usecases so I was planning to maybe simplify it. But with the recent requirement of streaming logs from offsets (begin -> end) for task logs, the added complexity is actually needed so I decided to just revive this PR. cc @rkooo567 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I assume the most of logics are changed when reviewing the PR? Looks like there are lots of removals.
Signed-off-by: rickyyx <[email protected]>
Yes, the end behavior should be the same.
I added quite some tests hoping to make sure the offsets bookkeeping is correct. |
Btw, is this API sufficient to implement the following logic?
can you tell me how we can achieve this one? |
So for pagination with lines. We will probably need some code (another PR on top of this) to translate line count to specific offsets. I think the frontend needs to keep track of lines count, an example API I could think of will be:
This will probably require finding out the corresponding offsets to start_lines each time. Alternatively we could return the offsets together with file content:
|
Hmm maybe -1000 approach could have duplicated logs depending on the situation? Maybe we can make the API return the absolute offset and we can do start = prev_absolute_offset - 1000, end = absolute But it makes sense we can build on top of this as a follow up! |
def tail(f: io.TextIOBase, lines: int): | ||
"""Tails the given file (in 'rb' mode) | ||
# Default stream entire file | ||
start_offset = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be configurable if we want to make it work with task log right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, will add fields when supporting it.
Return: | ||
Async generator of StreamReply | ||
""" | ||
assert "b" in file.mode, "Only binary file is supported." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it correct? what are other file formats? (is regular log also binary file?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we opened it in binary format for offset. Added comments.
cur_offset = start_offset | ||
|
||
# Until gRPC is done | ||
while not context.done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I forgot the semantics lol... When is the context set to be "done"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess it's just that the streaming context not closed.
Signed-off-by: rickyyx <[email protected]>
Signed-off-by: rickyyx <[email protected]>
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM! I will look into it one more time tomorrow
return end | ||
|
||
|
||
def find_end_offset_next_n_lines_from_offset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it test only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used for now - but I suppose it will be used to support pagination
old_pos = file.tell() # store old position | ||
file.seek(0, io.SEEK_END) # move file pointer to end of file | ||
end = file.tell() # return end of file offset | ||
file.seek(old_pos, io.SEEK_SET) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider making it a context manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to guratantee the API semantic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wdym here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I thought we could make it seek back if there's any exception occurred. But I think it should be fine for this case because if this will raise an exception the whole operation will just fail.
dashboard/modules/log/log_agent.py
Outdated
# in the block. | ||
# Use `split` here to split away the last | ||
lines = block_data.split(b"\n", num_lines - lines_more) | ||
# len(lines[0]) + 1 for the new line character split |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm not sure if I understood this comment. Also, isn't this supposed to be len(sum(len(line) for line in lines))?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, since it's tailing, so only the last lines in the block need to be accounted for from nbytes_from_end
.
line 1 \n line 2 \n line 3 \n line 4\n
If lines_more = 1, only line 4 needs to be included in the tail, so we split them with num_lines - lines_more
= 3 splits, producing 4 parts:
- line 1
- line 2
- line 3
- line 4
And we will include line 4's len in the tail results.
Signed-off-by: Ricky Xu <[email protected]>
Signed-off-by: Ricky Xu <[email protected]>
…ort tailing from offsets [2/4] (ray-project#28188) With verbose logging, the log file size might grow significantly. This PR prevents the grpc buffer overflow when tailing with large number of lines specified: Instead of reading last X lines into memory, it looks for the start of the last X lines, and read afterwards. Always stream log data in chunks
Why are these changes needed?
With verbose logging, the log file size might grow significantly. This PR prevents the grpc buffer overflow when tailing with large number of lines specified:
Related issue number
Closes #27009
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.