Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up log lookup in UI backend #394

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions services/ui_backend_service/api/log.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

from typing import Optional, Tuple

from services.data.db_utils import DBResponse, translate_run_key, translate_task_key, DBPagination, DBResponse
from services.data.db_utils import translate_run_key, translate_task_key, DBPagination, DBResponse
from services.utils import handle_exceptions, web_response
from .utils import format_response_list, get_pathspec_from_request

Expand Down Expand Up @@ -210,14 +209,35 @@ async def get_task_by_request(self, request):
return db_response.body
return None

async def _fetch_task_dict(self, request):
flow_id, run_number, step_name, task_id, attempt_id = \
get_pathspec_from_request(request)
if attempt_id is None:
task = await self.get_task_by_request(request)
if not task:
return web_response(404, {'data': []})
else:
run_key, run_value = translate_run_key(run_number)
task_key, task_value = translate_task_key(task_id)
task = {"flow_id": flow_id,
"step_name": step_name,
"attempt_id": int(attempt_id),
run_key: run_value,
task_key: task_value}
return task

async def get_task_log(self, request, logtype=STDOUT):
"fetches log and emits it as a list of rows wrapped in json"
task = await self.get_task_by_request(request)
if not task:
return web_response(404, {'data': []})
"""fetches log and emits it as a list of rows wrapped in json"""
task = await self._fetch_task_dict(request)
limit, page, reverse_order = get_pagination_params(request)

lines, page_count = await read_and_output(self.cache, task, logtype, limit, page, reverse_order)
try:
lines, page_count = await read_and_output(self.cache, task, logtype, limit, page, reverse_order)
except LogException as e:
if e.id == 'MetaflowNotFound':
return web_response(404, {'data': []})
else:
raise

# paginated response
response = DBResponse(200, lines)
Expand All @@ -226,10 +246,8 @@ async def get_task_log(self, request, logtype=STDOUT):
return web_response(status, body)

async def get_task_log_file(self, request, logtype=STDOUT):
"fetches log and emits it as a single file download response"
task = await self.get_task_by_request(request)
if not task:
return web_response(404, {'data': []})
"""fetches log and emits it as a single file download response"""
task = await self._fetch_task_dict(request)

log_filename = "{type}_{flow_id}_{run_number}_{step_name}_{task_id}-{attempt}.txt".format(
type="stdout" if logtype == STDOUT else "stderr",
Expand All @@ -240,7 +258,13 @@ async def get_task_log_file(self, request, logtype=STDOUT):
attempt=task['attempt_id']
)

lines, _ = await read_and_output(self.cache, task, logtype, output_raw=True)
try:
lines, _ = await read_and_output(self.cache, task, logtype, output_raw=True)
except LogException as e:
if e.id == 'MetaflowNotFound':
return web_response(404, {'data': []})
else:
raise
return file_download_response(log_filename, lines)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
add_flow, add_run, add_step, add_task,
_test_list_resources
)
from ...api.log import LogException

pytestmark = [pytest.mark.integration_tests]


Expand Down Expand Up @@ -37,7 +39,10 @@ async def read_and_output(cache_client, task, logtype, limit=0, page=1, reverse_
assert page == 1
assert reverse_order is False
assert output_raw is False
return [], 1
if task['attempt_id'] == 0:
return [], 1
else:
raise LogException(id='MetaflowNotFound')

with mock.patch("services.ui_backend_service.api.log.read_and_output", new=read_and_output):
_, data = await _test_list_resources(cli, db, "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/logs/out".format(**_task), 200, None)
Expand Down
4 changes: 2 additions & 2 deletions services/ui_backend_service/tests/integration_tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,14 +323,14 @@ async def _test_single_resource(cli, db: AsyncPostgresDB, path: str, expected_st
return resp.status, data


def _test_dict_approx(actual, expected, approx_keys, threshold=1000):
def _test_dict_approx(actual, expected, approx_keys, threshold=5000):
"Assert that two dicts are almost equal, allowing for some leeway on specified keys"
# NOTE: This is mainly required for testing resources that produce data during query execution. For example
# when using extract(epoch from now()) we can not accurately expect what the timestamp returned from the api should be.
# TODO: If possible, a less error prone solution would be to somehow mock/freeze the now() on a per-test basis.
for k, v in actual.items():
if k in approx_keys:
assert v == pytest.approx(expected[k], rel=threshold)
assert v == pytest.approx(expected[k], abs=threshold)
else:
assert v == expected[k]

Expand Down