From 0d020047ca4fe31a36c5133546883b8440a8abff Mon Sep 17 00:00:00 2001 From: Benjamin Kane Date: Wed, 25 May 2022 16:18:36 -0400 Subject: [PATCH] ranged media file respones --- fiftyone/server/routes/media.py | 129 ++++++++++++++++++++++++++++++-- 1 file changed, 122 insertions(+), 7 deletions(-) diff --git a/fiftyone/server/routes/media.py b/fiftyone/server/routes/media.py index e4871664fd..45194b4909 100644 --- a/fiftyone/server/routes/media.py +++ b/fiftyone/server/routes/media.py @@ -5,19 +5,134 @@ | `voxel51.com `_ | """ +import typing as t + +import aiofiles +from aiofiles.threadpool.binary import AsyncBufferedReader +from aiofiles.os import stat as aio_stat from starlette.endpoints import HTTPEndpoint from starlette.requests import Request -from starlette.responses import FileResponse +from starlette.responses import ( + FileResponse, + Response, + StreamingResponse, + guess_type, +) + + +async def ranged( + file: AsyncBufferedReader, + start: int = 0, + end: int = None, + block_size: int = 8192, +) -> t.AsyncGenerator: + consumed = 0 + + await file.seek(start) + + while True: + data_length = ( + min(block_size, end - start - consumed) if end else block_size + ) + + if data_length <= 0: + break + + data = await file.read(data_length) + + if not data: + break + + consumed += data_length + + yield data + + if hasattr(file, "close"): + await file.close() class Media(HTTPEndpoint): - def get(self, request: Request): + async def get( + self, request: Request + ) -> t.Union[FileResponse, StreamingResponse]: path = request.query_params["filepath"] - return FileResponse( - path, - headers={ + response: t.Union[FileResponse, StreamingResponse] + if request.headers.get("range"): + response = await self.ranged_file_response(path, request) + else: + response = FileResponse( + path, + ) + response.headers["Accept-Ranges"] = "bytes" + + return response + + async def ranged_file_response( + self, path: str, request: Request + ) -> StreamingResponse: + file = await aiofiles.open(path, "rb") + file_size = (await aio_stat(path)).st_size + content_range = request.headers.get("range") + content_length = file_size + status_code = 200 + headers = {} + + if content_range is not None: + content_range = content_range.strip().lower() + + content_ranges = content_range.split("=")[-1] + + range_start, range_end, *_ = map( + str.strip, (content_ranges + "-").split("-") + ) + + start, end = ( + int(range_start) if range_start else 0, + int(range_end) if range_end else file_size - 1, + ) + range_start = max(0, start) + range_end = min(file_size - 1, int(end)) + + content_length = (end - start) + 1 + + file_response = ranged(file, start=start, end=end + 1) + + status_code = 206 + + headers["Content-Range"] = f"bytes {start}-{end}/{file_size}" + + response = StreamingResponse( + file_response, + media_type=guess_type(path)[0], + status_code=status_code, + ) + + response.headers.update( + { "Accept-Ranges": "bytes", - "Access-Control-Allow-Origin": "*", - }, + "Content-Length": str(content_length), + **headers, + } ) + + return response + + async def head(self, request: Request) -> Response: + path = request.query_params["filepath"] + response = Response() + size = (await aio_stat(path)).st_size + response.headers.update( + { + "Accept-Ranges": "bytes", + "Content-Type": guess_type(path)[0], + "Content-Length": size, + } + ) + return response + + async def options(self, request: Request) -> Response: + response = Response() + response.headers["Accept-Ranges"] = "bytes" + response.headers["Allow"] = "OPTIONS, GET, HEAD" + return response