Skip to content

Commit

Permalink
Refactored get_task and get_job functions into one set of functions, …
Browse files Browse the repository at this point in the history
…linting
  • Loading branch information
ChrisPaulBennett authored and MetRonnie committed Dec 5, 2023
1 parent b779063 commit 5816c53
Showing 1 changed file with 20 additions and 62 deletions.
82 changes: 20 additions & 62 deletions cylc/uiserver/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
from graphene.types.generic import GenericScalar

from cylc.flow.id import Tokens
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.workflow_files import WorkflowFiles
from cylc.flow.network.schema import (
CyclePoint,
GenericResponse,
Expand Down Expand Up @@ -282,7 +285,7 @@ class Meta:
result = GenericScalar()


async def get_tasks(root, info, **kwargs):
async def get_elements(root, info, **kwargs):
if kwargs['live']:
return await get_nodes_all(root, info, **kwargs)

Expand All @@ -296,7 +299,7 @@ async def get_tasks(root, info, **kwargs):
elif isinstance(field_ids, dict):
field_ids = list(field_ids)
kwargs['ids'] = field_ids
elif field_ids == []:
elif not field_ids:
return []

for arg in ('ids', 'exids'):
Expand All @@ -307,16 +310,13 @@ async def get_tasks(root, info, **kwargs):
kwargs['exworkflows'] = [
Tokens(w_id) for w_id in kwargs['exworkflows']]

return await list_tasks(kwargs)
return await list_elements(kwargs)


async def list_tasks(args):
async def list_elements(args):
if not args['workflows']:
raise Exception('At least one workflow must be provided.')
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.workflow_files import WorkflowFiles
tasks = []
elements = []
for workflow in args['workflows']:
db_file = get_workflow_run_dir(
workflow['workflow'],
Expand All @@ -325,58 +325,15 @@ async def list_tasks(args):
)
with CylcWorkflowDAO(db_file, is_public=True) as dao:
conn = dao.connect()
tasks.extend(make_query(conn, workflow))
return tasks

if 'tasks' in args:
elements.extend(
make_jobs_query(conn, workflow, args.get('tasks')))
else:
elements.extend(make_task_query(conn, workflow))
return elements

async def get_jobs(root, info, **kwargs):
if kwargs['live']:
return await get_nodes_all(root, info, **kwargs)

_, field_ids = process_resolver_info(root, info, kwargs)

if hasattr(kwargs, 'id'):
kwargs['ids'] = [kwargs.get('id')]
if field_ids:
if isinstance(field_ids, str):
field_ids = [field_ids]
elif isinstance(field_ids, dict):
field_ids = list(field_ids)
kwargs['ids'] = field_ids
elif field_ids == []:
return []

for arg in ('ids', 'exids'):
# live objects can be represented by a universal ID
kwargs[arg] = [Tokens(n_id, relative=True) for n_id in kwargs[arg]]
kwargs['workflows'] = [
Tokens(w_id) for w_id in kwargs['workflows']]
kwargs['exworkflows'] = [
Tokens(w_id) for w_id in kwargs['exworkflows']]

return await list_jobs(kwargs)


async def list_jobs(args):
if not args['workflows']:
raise Exception('At least one workflow must be provided.')
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.workflow_files import WorkflowFiles
jobs = []
for workflow in args['workflows']:
db_file = get_workflow_run_dir(
workflow['workflow'],
WorkflowFiles.LogDir.DIRNAME,
"db"
)
with CylcWorkflowDAO(db_file, is_public=True) as dao:
conn = dao.connect()
jobs.extend(make_jobs_query(conn, workflow, args.get('tasks')))
return jobs


def make_query(conn, workflow):
def make_task_query(conn, workflow):

# TODO: support all arguments including states
# https://github.com/cylc/cylc-uiserver/issues/440
Expand Down Expand Up @@ -473,6 +430,7 @@ def make_query(conn, workflow):
'mean_queue_time': row[10],
'max_queue_time': row[11],
'std_dev_queue_time': (row[12] - row[10]**2)**0.5,
# Prevents null entries when there are too few tasks for quartiles
'queue_quartiles': [row[13],
row[13] if row[14] is None else row[14],
row[13] if row[15] is None else row[15]],
Expand All @@ -481,6 +439,7 @@ def make_query(conn, workflow):
'mean_run_time': row[17],
'max_run_time': row[18],
'std_dev_run_time': (row[19] - row[17]**2)**0.5,
# Prevents null entries when there are too few tasks for quartiles
'run_quartiles': [row[20],
row[20] if row[21] is None else row[21],
row[20] if row[22] is None else row[22]],
Expand All @@ -489,6 +448,7 @@ def make_query(conn, workflow):
'mean_total_time': row[24],
'max_total_time': row[25],
'std_dev_total_time': (row[26] - row[24] ** 2) ** 0.5,
# Prevents null entries when there are too few tasks for quartiles
'total_quartiles': [row[27],
row[27] if row[28] is None else row[28],
row[27] if row[29] is None else row[29]],
Expand All @@ -511,7 +471,6 @@ def make_jobs_query(conn, workflow, tasks):
where_clauses = f" AND (name = '{where_clauses}')"
else:
where_clauses = ''

for row in conn.execute(f'''
SELECT
name,
Expand Down Expand Up @@ -551,7 +510,6 @@ def make_jobs_query(conn, workflow, tasks):
'run_time': row[10],
'queue_time': row[11]
})

return jobs


Expand Down Expand Up @@ -622,7 +580,7 @@ class LogFiles(graphene.ObjectType):
description=Task._meta.description,
live=graphene.Boolean(default_value=True),
strip_null=STRIP_NULL_DEFAULT,
resolver=get_tasks,
resolver=get_elements,
workflows=graphene.List(ID, default_value=[]),
exworkflows=graphene.List(ID, default_value=[]),
ids=graphene.List(ID, default_value=[]),
Expand All @@ -638,7 +596,7 @@ class LogFiles(graphene.ObjectType):
description=Job._meta.description,
live=graphene.Boolean(default_value=True),
strip_null=STRIP_NULL_DEFAULT,
resolver=get_jobs,
resolver=get_elements,
workflows=graphene.List(ID, default_value=[]),
exworkflows=graphene.List(ID, default_value=[]),
ids=graphene.List(ID, default_value=[]),
Expand Down

0 comments on commit 5816c53

Please sign in to comment.