Skip to content

Commit

Permalink
Analysis schema for historic job data (#530)
Browse files Browse the repository at this point in the history
Add UISJob to schema

* Change jobs query to use list of tasks
* Refactored get_task and get_job functions into one set of functions
* Added tests for schema.py
* Removed pytest_asyncio as it is not needed
* Added unit tests for get_elements()
* Remove flawed if condition

---------

Co-authored-by: JAllen42 <[email protected]>
Co-authored-by: Ronnie Dutta <[email protected]>
  • Loading branch information
3 people authored Mar 8, 2024
1 parent 63e5baa commit d075928
Show file tree
Hide file tree
Showing 2 changed files with 273 additions and 20 deletions.
110 changes: 97 additions & 13 deletions cylc/uiserver/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@
from graphene.types.generic import GenericScalar

from cylc.flow.id import Tokens
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.workflow_files import WorkflowFiles
from cylc.flow.network.schema import (
CyclePoint,
GenericResponse,
ID,
SortArgs,
Task,
Job,
Mutations,
Queries,
process_resolver_info,
Expand Down Expand Up @@ -281,14 +285,12 @@ class Meta:
result = GenericScalar()


async def get_jobs(root, info, **kwargs):
async def get_elements(root, info, **kwargs):
if kwargs['live']:
return await get_nodes_all(root, info, **kwargs)

_, field_ids = process_resolver_info(root, info, kwargs)

if hasattr(kwargs, 'id'):
kwargs['ids'] = [kwargs.get('id')]
if field_ids:
if isinstance(field_ids, str):
field_ids = [field_ids]
Expand All @@ -306,16 +308,13 @@ async def get_jobs(root, info, **kwargs):
kwargs['exworkflows'] = [
Tokens(w_id) for w_id in kwargs['exworkflows']]

return await list_jobs(kwargs)
return await list_elements(kwargs)


async def list_jobs(args):
async def list_elements(args):
if not args['workflows']:
raise Exception('At least one workflow must be provided.')
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.workflow_files import WorkflowFiles
jobs = []
elements = []
for workflow in args['workflows']:
db_file = get_workflow_run_dir(
workflow['workflow'],
Expand All @@ -324,11 +323,15 @@ async def list_jobs(args):
)
with CylcWorkflowDAO(db_file, is_public=True) as dao:
conn = dao.connect()
jobs.extend(make_query(conn, workflow))
return jobs
if 'tasks' in args:
elements.extend(
run_jobs_query(conn, workflow, args.get('tasks')))
else:
elements.extend(run_task_query(conn, workflow))
return elements


def make_query(conn, workflow):
def run_task_query(conn, workflow):

# TODO: support all arguments including states
# https://github.com/cylc/cylc-uiserver/issues/440
Expand Down Expand Up @@ -425,6 +428,7 @@ def make_query(conn, workflow):
'mean_queue_time': row[10],
'max_queue_time': row[11],
'std_dev_queue_time': (row[12] - row[10]**2)**0.5,
# Prevents null entries when there are too few tasks for quartiles
'queue_quartiles': [row[13],
row[13] if row[14] is None else row[14],
row[13] if row[15] is None else row[15]],
Expand All @@ -433,6 +437,7 @@ def make_query(conn, workflow):
'mean_run_time': row[17],
'max_run_time': row[18],
'std_dev_run_time': (row[19] - row[17]**2)**0.5,
# Prevents null entries when there are too few tasks for quartiles
'run_quartiles': [row[20],
row[20] if row[21] is None else row[21],
row[20] if row[22] is None else row[22]],
Expand All @@ -441,6 +446,7 @@ def make_query(conn, workflow):
'mean_total_time': row[24],
'max_total_time': row[25],
'std_dev_total_time': (row[26] - row[24] ** 2) ** 0.5,
# Prevents null entries when there are too few tasks for quartiles
'total_quartiles': [row[27],
row[27] if row[28] is None else row[28],
row[27] if row[29] is None else row[29]],
Expand All @@ -451,6 +457,60 @@ def make_query(conn, workflow):
return tasks


def run_jobs_query(conn, workflow, tasks):

# TODO: support all arguments including states
# https://github.com/cylc/cylc-uiserver/issues/440
jobs = []

# Create sql snippet used to limit which tasks are returned by query
if tasks:
where_clauses = "' OR name = '".join(tasks)
where_clauses = f" AND (name = '{where_clauses}')"
else:
where_clauses = ''
for row in conn.execute(f'''
SELECT
name,
cycle,
submit_num,
submit_status,
time_run,
time_run_exit,
job_id,
platform_name,
time_submit,
STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_submit) AS total_time,
STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_run) AS run_time,
STRFTIME('%s', time_run) - STRFTIME('%s', time_submit) AS queue_time
FROM
task_jobs
WHERE
run_status = 0
{where_clauses};
'''):
jobs.append({
'id': workflow.duplicate(
cycle=row[1],
task=row[0],
job=row[2]
),
'name': row[0],
'cycle_point': row[1],
'submit_num': row[2],
'state': row[3],
'started_time': row[4],
'finished_time': row[5],
'job_ID': row[6],
'platform': row[7],
'submitted_time': row[8],
'total_time': row[9],
'run_time': row[10],
'queue_time': row[11]
})
return jobs


class UISTask(Task):

platform = graphene.String()
Expand Down Expand Up @@ -484,6 +544,13 @@ class UISTask(Task):
count = graphene.Int()


class UISJob(Job):

total_time = graphene.Int()
queue_time = graphene.Int()
run_time = graphene.Int()


class UISQueries(Queries):

class LogFiles(graphene.ObjectType):
Expand Down Expand Up @@ -511,14 +578,31 @@ class LogFiles(graphene.ObjectType):
description=Task._meta.description,
live=graphene.Boolean(default_value=True),
strip_null=STRIP_NULL_DEFAULT,
resolver=get_jobs,
resolver=get_elements,
workflows=graphene.List(ID, default_value=[]),
exworkflows=graphene.List(ID, default_value=[]),
ids=graphene.List(ID, default_value=[]),
exids=graphene.List(ID, default_value=[]),
mindepth=graphene.Int(default_value=-1),
maxdepth=graphene.Int(default_value=-1),
sort=SortArgs(default_value=None),

)

jobs = graphene.List(
UISJob,
description=Job._meta.description,
live=graphene.Boolean(default_value=True),
strip_null=STRIP_NULL_DEFAULT,
resolver=get_elements,
workflows=graphene.List(ID, default_value=[]),
exworkflows=graphene.List(ID, default_value=[]),
ids=graphene.List(ID, default_value=[]),
exids=graphene.List(ID, default_value=[]),
mindepth=graphene.Int(default_value=-1),
maxdepth=graphene.Int(default_value=-1),
sort=SortArgs(default_value=None),
tasks=graphene.List(ID, default_value=[])
)


Expand Down
Loading

0 comments on commit d075928

Please sign in to comment.