Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use DBResponse for get_run_ids and get_task_ids #391

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,20 @@ def get_table_by_name(self, table_name: str):
async def get_run_ids(self, flow_id: str, run_id: str):
run = await self.run_table_postgres.get_run(flow_id, run_id,
expanded=True)
return run.body['run_number'], run.body['run_id']
if run.response_code != 200:
return run
body = {'run_number': run.body['run_number'], 'run_id': run.body['run_id']}
return DBResponse(response_code=run.response_code, body=body)

async def get_task_ids(self, flow_id: str, run_id: str,
step_name: str, task_name: str):

task = await self.task_table_postgres.get_task(flow_id, run_id,
step_name, task_name,
expanded=True)
return task.body['task_id'], task.body['task_name']
if task.response_code != 200:
return task
body = {'task_id': task.body['task_id'], 'task_name': task.body['task_name']}
return DBResponse(response_code=task.response_code, body=body)


class AsyncPostgresDB(object):
Expand Down
24 changes: 12 additions & 12 deletions services/metadata_service/api/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,18 +478,18 @@ async def create_artifacts(self, request):
body = await read_body(request.content)
count = 0

try:
run_number, run_id = await self._db.get_run_ids(flow_name, run_number)
task_id, task_name = await self._db.get_task_ids(
flow_name, run_number, step_name, task_id
)
except Exception:
return web.Response(
status=400,
body=json.dumps(
{"message": "need to register run_id and task_id first"}
),
)
db_response = await self._db.get_run_ids(flow_name, run_number)
if db_response.response_code != 200:
return web.Response(status=db_response.response_code,
body=json.dumps(http_500(db_response.body)))
run_number, run_id = db_response.body["run_number"], db_response.body["run_id"]

db_response = await self._db.get_task_ids(flow_name, run_number,
step_name, task_id)
if db_response.response_code != 200:
return web.Response(status=db_response.response_code,
body=json.dumps(http_500(db_response.body)))
task_id, task_name = db_response.body["task_id"], db_response.body["task_name"]

# todo change to bulk insert
for artifact in body:
Expand Down
22 changes: 14 additions & 8 deletions services/metadata_service/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
from services.utils import read_body
from services.metadata_service.api.utils import format_response, \
handle_exceptions
handle_exceptions, http_500
import asyncio
from services.data.postgres_async_db import AsyncPostgresDB

Expand Down Expand Up @@ -173,13 +173,19 @@ async def create_metadata(self, request):

body = await read_body(request.content)
count = 0
try:
run_number, run_id = await self._db.get_run_ids(flow_name, run_number)
task_id, task_name = await self._db.get_task_ids(flow_name, run_number,
step_name, task_id)
except Exception:
return web.Response(status=400, body=json.dumps(
{"message": "need to register run_id and task_id first"}))

db_response = await self._db.get_run_ids(flow_name, run_number)
if db_response.response_code != 200:
return web.Response(status=db_response.response_code,
body=json.dumps(http_500(db_response.body)))
run_number, run_id = db_response.body["run_number"], db_response.body["run_id"]

db_response = await self._db.get_task_ids(flow_name, run_number,
step_name, task_id)
if db_response.response_code != 200:
return web.Response(status=db_response.response_code,
body=json.dumps(http_500(db_response.body)))
task_id, task_name = db_response.body["task_id"], db_response.body["task_name"]

for datum in body:
values = {
Expand Down
6 changes: 4 additions & 2 deletions services/metadata_service/api/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
handle_exceptions
from services.data.postgres_async_db import AsyncPostgresDB


class StepApi(object):
_step_table = None

Expand Down Expand Up @@ -157,7 +156,10 @@ async def create_step(self, request):
tags = body.get("tags")
system_tags = body.get("system_tags")

run_number, run_id = await self._db.get_run_ids(flow_id, run_number)
db_response = await self._db.get_run_ids(flow_id, run_number)
if db_response.response_code != 200:
return db_response
run_number, run_id = db_response.body["run_number"], db_response.body["run_id"]

step_row = StepRow(
flow_id, run_number, run_id, user, step_name, tags=tags,
Expand Down
5 changes: 4 additions & 1 deletion services/metadata_service/api/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ async def create_task(self, request):
return web.Response(status=400, body=json.dumps(
{"message": "provided task_name may not be a numeric"}))

run_number, run_id = await self._db.get_run_ids(flow_id, run_number)
db_response = await self._db.get_run_ids(flow_id, run_number)
if db_response.response_code != 200:
return db_response
run_number, run_id = db_response.body["run_number"], db_response.body["run_id"]

task = TaskRow(
flow_id=flow_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,35 +88,31 @@ async def test_artifact_post(cli, db):
cli,
path="/flows/NonExistentFlow/runs/{run_number}/steps/{step_name}/tasks/{task_id}/artifact".format(**_task),
payload=payload,
status=400,
expected_body={"message": "need to register run_id and task_id first"}
status=404,
)

# posting on a non-existent run number should result in an error
await assert_api_post_response(
cli,
path="/flows/{flow_id}/runs/1234/steps/{step_name}/tasks/{task_id}/artifact".format(**_task),
payload=payload,
status=400,
expected_body={"message": "need to register run_id and task_id first"}
status=404,
)

# posting on a non-existent step_name should result in an error
await assert_api_post_response(
cli,
path="/flows/{flow_id}/runs/{run_number}/steps/nonexistent/tasks/{task_id}/artifact".format(**_task),
payload=payload,
status=400,
expected_body={"message": "need to register run_id and task_id first"}
status=404,
)

# posting on a non-existent task_id should result in an error
await assert_api_post_response(
cli,
path="/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/1234/artifact".format(**_task),
payload=payload,
status=400,
expected_body={"message": "need to register run_id and task_id first"}
status=404,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,35 +68,31 @@ async def test_metadata_post(cli, db):
cli,
path="/flows/NonExistentFlow/runs/{run_number}/steps/{step_name}/tasks/{task_id}/metadata".format(**_task),
payload=payload,
status=400,
expected_body={"message": "need to register run_id and task_id first"}
status=404,
)

# posting on a non-existent run number should result in an error
await assert_api_post_response(
cli,
path="/flows/{flow_id}/runs/1234/steps/{step_name}/tasks/{task_id}/metadata".format(**_task),
payload=payload,
status=400,
expected_body={"message": "need to register run_id and task_id first"}
status=404,
)

# posting on a non-existent step_name should result in an error
await assert_api_post_response(
cli,
path="/flows/{flow_id}/runs/{run_number}/steps/nonexistent/tasks/{task_id}/metadata".format(**_task),
payload=payload,
status=400,
expected_body={"message": "need to register run_id and task_id first"}
status=404,
)

# posting on a non-existent task_id should result in an error
await assert_api_post_response(
cli,
path="/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/1234/metadata".format(**_task),
payload=payload,
status=400,
expected_body={"message": "need to register run_id and task_id first"}
status=404,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ def _check_response_body(body):
cli,
path="/flows/NonExistentFlow/runs/{run_number}/steps/test_step/step".format(**_run),
payload=payload,
status=500
status=404
)

# posting on a non-existent run number should result in an error
await assert_api_post_response(
cli,
path="/flows/{flow_id}/runs/1234/steps/test_step/step".format(**_run),
payload=payload,
status=500
status=404
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ def _check_response_body(body):
cli,
path="/flows/NonExistentFlow/runs/{run_number}/steps/{step_name}/task".format(**_task),
payload=payload,
status=500
status=404
)

# posting on a non-existent run number should result in an error
await assert_api_post_response(
cli,
path="/flows/{flow_id}/runs/1234/steps/{step_name}/task".format(**_task),
payload=payload,
status=500
status=404
)

# posting on a non-existent step_name should result in a 404 due to foreign key constraint
Expand Down