Skip to content

Commit

Permalink
[core][state] Task log - Improve log tailing from log_client and supp…
Browse files Browse the repository at this point in the history
…ort tailing from offsets [2/4] (ray-project#28188)

With verbose logging, the log file size might grow significantly. This PR prevents the grpc buffer overflow when tailing with large number of lines specified:

Instead of reading last X lines into memory, it looks for the start of the last X lines, and read afterwards.
Always stream log data in chunks
  • Loading branch information
rickyyx authored May 5, 2023
1 parent d8321a7 commit 0a15649
Show file tree
Hide file tree
Showing 5 changed files with 481 additions and 99 deletions.
296 changes: 228 additions & 68 deletions dashboard/modules/log/log_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import ray.dashboard.utils as dashboard_utils
import ray.dashboard.optional_utils as dashboard_optional_utils
import asyncio
import grpc
import io
import os


from pathlib import Path

from ray.core.generated import reporter_pb2
Expand All @@ -16,6 +18,193 @@
logger = logging.getLogger(__name__)
routes = dashboard_optional_utils.ClassMethodRouteTable

# 64 KB
BLOCK_SIZE = 1 << 16

# Keep-alive interval for reading the file
DEFAULT_KEEP_ALIVE_INTERVAL_SEC = 1


def find_end_offset_file(file: io.BufferedIOBase) -> int:
"""
Find the offset of the end of a file without changing the file pointer.
Args:
file: File object
Returns:
Offset of the end of a file.
"""
old_pos = file.tell() # store old position
file.seek(0, io.SEEK_END) # move file pointer to end of file
end = file.tell() # return end of file offset
file.seek(old_pos, io.SEEK_SET)
return end


def find_end_offset_next_n_lines_from_offset(
file: io.BufferedIOBase, start_offset: int, n: int
) -> int:
"""
Find the offsets of next n lines from a start offset.
Args:
file: File object
start_offset: Start offset to read from, inclusive.
n: Number of lines to find.
Returns:
Offset of the end of the next n line (exclusive)
"""
file.seek(start_offset) # move file pointer to start offset
end_offset = None
for _ in range(n): # loop until we find n lines or reach end of file
line = file.readline() # read a line and consume new line character
if not line: # end of file
break
end_offset = file.tell() # end offset.

logger.debug(f"Found next {n} lines from {start_offset} offset")
return (
end_offset if end_offset is not None else file.seek(0, io.SEEK_END)
) # return last line offset or end of file offset if no lines found


def find_start_offset_last_n_lines_from_offset(
file: io.BufferedIOBase, offset: int, n: int, block_size: int = BLOCK_SIZE
) -> int:
"""
Find the offset of the beginning of the line of the last X lines from an offset.
Args:
file: File object
offset: Start offset from which to find last X lines, -1 means end of file.
The offset is exclusive, i.e. data at the offset is not included
in the result.
n: Number of lines to find
block_size: Block size to read from file
Returns:
Offset of the beginning of the line of the last X lines from a start offset.
"""
logger.debug(f"Finding last {n} lines from {offset} offset")
if offset == -1:
offset = file.seek(0, io.SEEK_END) # move file pointer to end of file
else:
file.seek(offset, io.SEEK_SET) # move file pointer to start offset

if n == 0:
return offset
nbytes_from_end = (
0 # Number of bytes that should be tailed from the end of the file
)
# Non new line terminating offset, adjust the line count and treat the non-newline
# terminated line as the last line. e.g. line 1\nline 2
file.seek(max(0, offset - 1), os.SEEK_SET)
if file.read(1) != b"\n":
n -= 1

# Remaining number of lines to tail
lines_more = n
read_offset = max(0, offset - block_size)
# So that we know how much to read on the last block (the block 0)
prev_offset = offset

while lines_more >= 0 and read_offset >= 0:
# Seek to the current block start
file.seek(read_offset, 0)
# Read the current block (or less than block) data
block_data = file.read(min(block_size, prev_offset - read_offset))
num_lines = block_data.count(b"\n")
if num_lines > lines_more:
# This is the last block to read.
# Need to find the offset of exact number of lines to tail
# in the block.
# Use `split` here to split away the extra lines, i.e.
# first `num_lines - lines_more` lines.
lines = block_data.split(b"\n", num_lines - lines_more)
# Added the len of those lines that at the end of the block.
nbytes_from_end += len(lines[-1])
break

# Need to read more blocks.
lines_more -= num_lines
nbytes_from_end += len(block_data)

if read_offset == 0:
# We have read all blocks (since the start)
break
# Continuing with the previous block
prev_offset = read_offset
read_offset = max(0, read_offset - block_size)

offset_read_start = offset - nbytes_from_end
assert (
offset_read_start >= 0
), f"Read start offset({offset_read_start}) should be non-negative"
return offset_read_start


async def _stream_log_in_chunk(
context: grpc.aio.ServicerContext,
file: io.BufferedIOBase,
start_offset: int,
end_offset: int = -1,
keep_alive_interval_sec: int = -1,
block_size: int = BLOCK_SIZE,
):
"""Streaming log in chunk from start to end offset.
Stream binary file content in chunks from start offset to an end
offset if provided, else to the end of the file.
Args:
context: gRPC server side context
file: Binary file to stream
start_offset: File offset where streaming starts
end_offset: If -1, implying streaming til the EOF.
keep_alive_interval_sec: Duration for which streaming will be
retried when reaching the file end, -1 means no retry.
block_size: Number of bytes per chunk, exposed for testing
Return:
Async generator of StreamReply
"""
assert "b" in file.mode, "Only binary file is supported."
assert not (
keep_alive_interval_sec >= 0 and end_offset is not -1
), "Keep-alive is not allowed when specifying an end offset"

file.seek(start_offset, 0)
cur_offset = start_offset

# Until gRPC is done
while not context.done():
# Read in block
if end_offset != -1:
to_read = min(end_offset - cur_offset, block_size)
else:
to_read = block_size

bytes = file.read(to_read)

if bytes == b"":
# Stop reading
if keep_alive_interval_sec >= 0:
await asyncio.sleep(keep_alive_interval_sec)
# Try reading again
continue

# Have read the entire file, done
break
logger.debug(f"Sending {len(bytes)} bytes at {cur_offset}")
yield reporter_pb2.StreamLogReply(data=bytes)

# Have read the requested section [start_offset, end_offset), done
cur_offset += len(bytes)
if end_offset != -1 and cur_offset >= end_offset:
break


class LogAgent(dashboard_utils.DashboardAgentModule):
def __init__(self, dashboard_agent):
Expand All @@ -31,29 +220,25 @@ def is_minimal_module():
return False


# 64 KB
BLOCK_SIZE = 1 << 16


class LogAgentV1Grpc(
dashboard_utils.DashboardAgentModule, reporter_pb2_grpc.ReporterServiceServicer
):
class LogAgentV1Grpc(dashboard_utils.DashboardAgentModule):
def __init__(self, dashboard_agent):
super().__init__(dashboard_agent)

async def run(self, server):
if server:
reporter_pb2_grpc.add_LogServiceServicer_to_server(self, server)

# TODO: should this return True
@staticmethod
def is_minimal_module():
# Dashboard is only available with non-minimal install now.
return False

async def ListLogs(self, request, context):
"""
Lists all files in the active Ray logs directory.
Part of `LogService` gRPC.
NOTE: These RPCs are used by state_head.py, not log_head.py
"""
path = Path(self._dashboard_agent.log_dir)
Expand All @@ -73,82 +258,57 @@ async def StreamLog(self, request, context):
the end of the file if `request.keep_alive == True`. Else, it terminates the
stream once there are no more bytes to read from the log file.
Part of `LogService` gRPC.
NOTE: These RPCs are used by state_head.py, not log_head.py
"""
# NOTE: If the client side connection is closed, this handler will
# be automatically terminated.
lines = request.lines if request.lines else 1000

filepath = f"{self._dashboard_agent.log_dir}/{request.log_file_name}"
if "/" in request.log_file_name or not os.path.isfile(filepath):
if not os.path.isfile(filepath):
await context.send_initial_metadata(
[[log_consts.LOG_GRPC_ERROR, log_consts.FILE_NOT_FOUND]]
)
else:
with open(filepath, "rb") as f:
await context.send_initial_metadata([])
# If requesting the whole file, we stream it since it may be large.
if lines == -1:
while not context.done():
bytes = f.read(BLOCK_SIZE)
if bytes == b"":
end = f.tell()
break
yield reporter_pb2.StreamLogReply(data=bytes)
else:
bytes, end = tail(f, lines)
yield reporter_pb2.StreamLogReply(data=bytes + b"\n")
if request.keep_alive:
interval = request.interval if request.interval else 1
f.seek(end)
while not context.done():
await asyncio.sleep(interval)
bytes = f.read()
if bytes != b"":
yield reporter_pb2.StreamLogReply(data=bytes)

# Default stream entire file
start_offset = 0
end_offset = find_end_offset_file(f)

def tail(f: io.TextIOBase, lines: int):
"""Tails the given file (in 'rb' mode)
if lines != -1:
# If specified tail line number,
# look for the file offset with the line count
start_offset = find_start_offset_last_n_lines_from_offset(
f, offset=end_offset, n=lines
)

We assume that any "lines" parameter is not significant (<100,000 lines)
and will result in a buffer with a small memory profile (<1MB)
Taken from: https://stackoverflow.com/a/136368/8299684
Examples:
Args:
f: text file in 'rb' mode
lines: The number of lines to read from the end of the file.
Returns:
string containing the lines of the file,
the position of the last byte read in units of bytes
"""

total_lines_wanted = lines
# If keep alive: following the log every 'interval'
keep_alive_interval_sec = -1
if request.keep_alive:
keep_alive_interval_sec = (
request.interval
if request.interval
else DEFAULT_KEEP_ALIVE_INTERVAL_SEC
)

# Seek to the end of the file
f.seek(0, 2)
block_end_byte = f.tell()
# When following (keep_alive), it will read beyond the end
end_offset = -1

last_byte_read = block_end_byte
lines_to_go = total_lines_wanted
block_number = -1
blocks = []
logger.info(
f"Tailing logs from {start_offset} to {end_offset} for {lines}, "
f"with keep_alive={keep_alive_interval_sec}"
)

# Read blocks into memory until we have seen at least
# `total_lines_wanted` number of lines. Then, return a string
# containing the last `total_lines_wanted` number of lines
while lines_to_go > 0 and block_end_byte > 0:
if block_end_byte - BLOCK_SIZE > 0:
f.seek(block_number * BLOCK_SIZE, 2)
blocks.append(f.read(BLOCK_SIZE))
else:
f.seek(0, 0)
blocks.append(f.read(block_end_byte))
lines_found = blocks[-1].count(b"\n")
lines_to_go -= lines_found
block_end_byte -= BLOCK_SIZE
block_number -= 1
all_read_text = b"".join(reversed(blocks))
return b"\n".join(all_read_text.splitlines()[-total_lines_wanted:]), last_byte_read
# Read and send the file data in chunk
async for chunk_res in _stream_log_in_chunk(
context=context,
file=f,
start_offset=start_offset,
end_offset=end_offset,
keep_alive_interval_sec=keep_alive_interval_sec,
):
yield chunk_res
4 changes: 2 additions & 2 deletions dashboard/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def run(self, server):
def is_minimal_module():
"""
Return True if the module is minimal, meaning it
should work with `pip install ray` that doesn't requires additonal
should work with `pip install ray` that doesn't requires additional
dependencies.
"""

Expand Down Expand Up @@ -87,7 +87,7 @@ async def run(self, server):
def is_minimal_module():
"""
Return True if the module is minimal, meaning it
should work with `pip install ray` that doesn't requires additonal
should work with `pip install ray` that doesn't requires additional
dependencies.
"""

Expand Down
20 changes: 20 additions & 0 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1143,3 +1143,23 @@ def enable_syncer_test(request, monkeypatch):
yield
monkeypatch.delenv("RAY_use_ray_syncer")
ray._raylet.Config.initialize("")


@pytest.fixture(scope="function")
def temp_file(request):
with tempfile.NamedTemporaryFile("r+b") as fp:
yield fp


@pytest.fixture(scope="module")
def random_ascii_file(request):
import random
import string

file_size = getattr(request, "param", 1 << 10)

with tempfile.NamedTemporaryFile(mode="r+b") as fp:
fp.write("".join(random.choices(string.ascii_letters, k=file_size)).encode())
fp.flush()

yield fp
Loading

0 comments on commit 0a15649

Please sign in to comment.