Skip to content

Commit

Permalink
Merge pull request #348 from oliver-sanders/workflow-event-logging
Browse files Browse the repository at this point in the history
data-store: log workflow events to info level
  • Loading branch information
hjoliver authored May 20, 2022
2 parents 45f4b94 + 03a38db commit d808078
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 9 deletions.
29 changes: 22 additions & 7 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,25 @@
)
from cylc.flow.workflow_status import WorkflowStatus

from .utils import fmt_call
from .workflows_mgr import workflow_request


def log_call(fcn):
"""Decorator for data store methods we want to log."""
fcn_name = f'[data-store] {fcn.__name__}'

def _inner(*args, **kwargs): # works for serial & async calls
nonlocal fcn
self = args[0]
# log this method call
self.log.info(fmt_call(fcn_name, args[1:], kwargs))
# then do it
return fcn(*args, **kwargs)

return _inner


class DataStoreMgr:
"""Manage the local data-store acquisition/updates for all workflows."""

Expand All @@ -75,13 +91,13 @@ def __init__(self, workflows_mgr, log):
self.executors = {}
self.delta_queues = {}

@log_call
async def register_workflow(self, w_id: str, is_active: bool) -> None:
"""Register a new workflow with the data store.
Call this when a new workflow is discovered on the file system
(e.g. installed).
"""
self.log.debug(f'register_workflow({w_id})')
self.delta_queues[w_id] = {}

# create new entry in the data store
Expand All @@ -95,12 +111,12 @@ async def register_workflow(self, w_id: str, is_active: bool) -> None:
status_msg=self._get_status_msg(w_id, is_active),
)

@log_call
async def unregister_workflow(self, w_id):
"""Remove a workflow from the data store entirely.
Call this when a workflow is deleted.
"""
self.log.debug(f'unregister_workflow({w_id})')
if w_id in self.data:
self._update_contact(w_id, pruned=True)
while any(
Expand All @@ -110,6 +126,7 @@ async def unregister_workflow(self, w_id):
await asyncio.sleep(self.PENDING_DELTA_CHECK_INTERVAL)
self._purge_workflow(w_id)

@log_call
async def connect_workflow(self, w_id, contact_data):
"""Initiate workflow subscriptions.
Expand All @@ -120,7 +137,6 @@ async def connect_workflow(self, w_id, contact_data):
blocking the main loop.
"""
self.log.debug(f'connect_workflow({w_id})')
if self.loop is None:
self.loop = asyncio.get_running_loop()

Expand All @@ -147,19 +163,19 @@ async def connect_workflow(self, w_id, contact_data):
if w_id not in successful_updates:
# something went wrong, undo any changes to allow for subsequent
# connection attempts
self.log.debug(f'failed to connect to {w_id}')
self.log.info(f'failed to connect to {w_id}')
self.disconnect_workflow(w_id)
return False
else:
# don't update the contact data until we have successfully updated
self._update_contact(w_id, contact_data)

@log_call
def disconnect_workflow(self, w_id):
"""Terminate workflow subscriptions.
Call this when a workflow has stopped.
"""
self.log.debug(f'disconnect_workflow({w_id})')
self._update_contact(
w_id,
status=WorkflowStatus.STOPPED.value,
Expand Down Expand Up @@ -190,9 +206,9 @@ def get_workflows(self):
active.add(w_id)
return active, inactive

@log_call
def _purge_workflow(self, w_id):
"""Purge the manager of a workflow's subscription and data."""
self.log.debug(f'delete_workflow({w_id})')
self.disconnect_workflow(w_id)
if w_id in self.data:
del self.data[w_id]
Expand Down Expand Up @@ -241,7 +257,6 @@ def _update_workflow_data(self, topic, delta, w_id):
loop_cnt += 1
continue
if topic == 'shutdown':
self.log.debug(f'shutdown({w_id})')
self._delta_store_to_queues(w_id, topic, delta)
# update the status to stopped and set the status message
self._update_contact(
Expand Down
4 changes: 2 additions & 2 deletions cylc/uiserver/tests/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ async def test_workflow_connect_fail(
# the connection should fail because our ZMQ socket is not a
# WorkflowRuntimeServer with the correct endpoints and auth
assert [record.message for record in caplog.records] == [
'connect_workflow(~user/workflow_id)',
"[data-store] connect_workflow('~user/workflow_id', <dict>)",
'failed to connect to ~user/workflow_id',
'disconnect_workflow(~user/workflow_id)',
"[data-store] disconnect_workflow('~user/workflow_id')",
]
finally:
# tidy up
Expand Down
49 changes: 49 additions & 0 deletions cylc/uiserver/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


def _repr(value):
if isinstance(value, dict):
return '<dict>'
if isinstance(value, set):
return '<set>'
return repr(value)


def fmt_call(name, args, kwargs):
"""Format a Python function call.
Examples:
It formats calls at they would appear in Python code:
>>> fmt_call('foo', (1,), {'x': True})
'foo(1, x=True)'
It handles different data types:
>>> fmt_call('foo', ('str', 42, True, None), {})
"foo('str', 42, True, None)"
It puts in placeholders for dicts and sets (too long for log output):
>>> fmt_call('foo', tuple(), {'a': {'x': 1}, 'b': {'y',}})
'foo(a=<dict>, b=<set>)'
"""
return f'{name}(' + ', '.join(
[
_repr(arg) for arg in args
] + [
f'{key}={_repr(value)}'
for key, value in kwargs.items()
]
) + ')'

0 comments on commit d808078

Please sign in to comment.