diff --git a/CHANGES.md b/CHANGES.md
index 52019d35..0fa31ce5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -10,6 +10,15 @@ creating a new release entry be sure to copy & paste the span tag with the
`actions:bind` attribute, which is used by a regex to find the text to be
updated. Only the first match gets replaced, so it's fine to leave the old
ones in. -->
+-------------------------------------------------------------------------------
+## __cylc-uiserver-1.3.1 (Upcoming)__
+
+
+
+### Fixes
+
+[#379](https://github.com/cylc/cylc-uiserver/pull/379) - Fixed lack of info
+for errors recorded in logs.
-------------------------------------------------------------------------------
## __cylc-uiserver-1.3.0 (Released 2023-07-21)__
diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py
index 19047425..101fc361 100644
--- a/cylc/uiserver/data_store_mgr.py
+++ b/cylc/uiserver/data_store_mgr.py
@@ -38,9 +38,9 @@
import time
from typing import Dict, Optional, Set
+from cylc.flow.exceptions import WorkflowStopped
from cylc.flow.id import Tokens
from cylc.flow.network.server import PB_METHOD_MAP
-from cylc.flow.network import MSG_TIMEOUT
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.data_store_mgr import (
EDGES, DATA_TEMPLATE, ALL_DELTAS, DELTAS_MAP, WORKFLOW,
@@ -337,7 +337,7 @@ def _reconcile_update(self, topic, delta, w_id):
),
self.loop
)
- _, new_delta_msg = future.result(self.RECONCILE_TIMEOUT)
+ new_delta_msg = future.result(self.RECONCILE_TIMEOUT)
new_delta = DELTAS_MAP[topic]()
new_delta.ParseFromString(new_delta_msg)
self._clear_data_field(w_id, topic)
@@ -366,52 +366,41 @@ async def _entire_workflow_update(
# Request new data
req_method = 'pb_entire_workflow'
- req_kwargs = (
- {
- 'client': info['req_client'],
- 'command': req_method,
- 'req_context': w_id
- }
+
+ requests = {
+ w_id: workflow_request(
+ client=info['req_client'], command=req_method, log=self.log
+ )
for w_id, info in self.workflows_mgr.workflows.items()
if info.get('req_client') # skip stopped workflows
+ and (not ids or w_id in ids)
+ }
+ results = await asyncio.gather(
+ *requests.values(), return_exceptions=True
)
-
- gathers = [
- workflow_request(**kwargs)
- for kwargs in req_kwargs
- if not ids or kwargs['req_context'] in ids
- ]
- items = await asyncio.gather(*gathers, return_exceptions=True)
-
successes: Set[str] = set()
- for item in items:
- if isinstance(item, Exception):
- self.log.exception(
- 'Failed to update entire local data-store '
- 'of a workflow', exc_info=item
- )
- else:
- w_id, result = item
- if result is not None and result != MSG_TIMEOUT:
- pb_data = PB_METHOD_MAP[req_method]()
- pb_data.ParseFromString(result)
- new_data = deepcopy(DATA_TEMPLATE)
- for field, value in pb_data.ListFields():
- if field.name == WORKFLOW:
- new_data[field.name].CopyFrom(value)
- new_data['delta_times'] = {
- key: value.last_updated
- for key in DATA_TEMPLATE
- }
- continue
- new_data[field.name] = {n.id: n for n in value}
- self.data[w_id] = new_data
- successes.add(w_id)
- else:
+ for w_id, result in zip(requests, results):
+ if isinstance(result, Exception):
+ if not isinstance(result, WorkflowStopped):
self.log.error(
- f'Error: communicating with {w_id} - {result}'
+ 'Failed to update entire local data-store '
+ f'of a workflow: {result}'
)
-
+ continue
+ pb_data = PB_METHOD_MAP[req_method]()
+ pb_data.ParseFromString(result)
+ new_data = deepcopy(DATA_TEMPLATE)
+ for field, value in pb_data.ListFields():
+ if field.name == WORKFLOW:
+ new_data[field.name].CopyFrom(value)
+ new_data['delta_times'] = {
+ key: value.last_updated
+ for key in DATA_TEMPLATE
+ }
+ continue
+ new_data[field.name] = {n.id: n for n in value}
+ self.data[w_id] = new_data
+ successes.add(w_id)
return successes
def _update_contact(
@@ -488,12 +477,12 @@ def _get_status_msg(self, w_id: str, is_active: bool) -> str:
if is_active:
# this will get overridden when we sync with the workflow
# set a sensible default here incase the sync takes a while
- return 'Running'
+ return 'running'
w_id = Tokens(w_id)['workflow']
db_file = Path(get_workflow_srv_dir(w_id), WorkflowFiles.Service.DB)
if db_file.exists():
# the workflow has previously run
- return 'Stopped'
+ return 'stopped'
else:
# the workflow has not yet run
- return 'Not yet run'
+ return 'not yet run'
diff --git a/cylc/uiserver/tests/conftest.py b/cylc/uiserver/tests/conftest.py
index fc9f7da9..978d3ede 100644
--- a/cylc/uiserver/tests/conftest.py
+++ b/cylc/uiserver/tests/conftest.py
@@ -14,7 +14,6 @@
# along with this program. If not, see .
"""Test code and fixtures."""
-import asyncio
from getpass import getuser
import inspect
import logging
@@ -47,6 +46,7 @@ class AsyncClientFixture(WorkflowRuntimeClient):
pattern = zmq.REQ
host = ''
port = 0
+ workflow = 'myflow'
def __init__(self):
self.returns = None
@@ -57,6 +57,8 @@ def will_return(self, returns):
async def async_request(
self, command, args=None, timeout=None, req_meta=None
):
+ if isinstance(self.returns, Exception):
+ raise self.returns
if (
inspect.isclass(self.returns)
and issubclass(self.returns, Exception)
diff --git a/cylc/uiserver/tests/test_data_store_mgr.py b/cylc/uiserver/tests/test_data_store_mgr.py
index 8920cfb3..ad95cd5a 100644
--- a/cylc/uiserver/tests/test_data_store_mgr.py
+++ b/cylc/uiserver/tests/test_data_store_mgr.py
@@ -20,8 +20,9 @@
import pytest
import zmq
+from cylc.flow.exceptions import ClientTimeout, WorkflowStopped
from cylc.flow.id import Tokens
-from cylc.flow.network import (MSG_TIMEOUT, ZMQSocketBase)
+from cylc.flow.network import ZMQSocketBase
from cylc.flow.workflow_files import ContactFileFields as CFF
from cylc.uiserver.data_store_mgr import DataStoreMgr
@@ -69,7 +70,7 @@ async def test_entire_workflow_update_ignores_timeout_message(
receives a ``MSG_TIMEOUT`` message.
"""
w_id = 'workflow_id'
- async_client.will_return(MSG_TIMEOUT)
+ async_client.will_return(ClientTimeout)
# Set the client used by our test workflow.
data_store_mgr.workflows_mgr.workflows[w_id] = {
@@ -81,7 +82,7 @@ async def test_entire_workflow_update_ignores_timeout_message(
# calling ``workflow_request``.
await data_store_mgr._entire_workflow_update()
- # When a ``MSG_TIMEOUT`` happens, the ``DataStoreMgr`` object ignores
+ # When a ClientTimeout happens, the ``DataStoreMgr`` object ignores
# that message. So it means that its ``.data`` dictionary MUST NOT
# have an entry for the Workflow ID.
assert w_id not in data_store_mgr.data
@@ -90,7 +91,7 @@ async def test_entire_workflow_update_ignores_timeout_message(
async def test_entire_workflow_update_gather_error(
async_client: AsyncClientFixture,
data_store_mgr: DataStoreMgr,
- caplog,
+ caplog: pytest.LogCaptureFixture,
):
"""
Test that if ``asyncio.gather`` in ``entire_workflow_update``
@@ -102,8 +103,7 @@ async def test_entire_workflow_update_gather_error(
#
# This test wants to confirm this is not raised, but instead the
# error is returned, so that we can inspect, log, etc.
- error_type = ValueError
- async_client.will_return(error_type)
+ async_client.will_return(ValueError)
# Set the client used by our test workflow.
data_store_mgr.workflows_mgr.workflows['workflow_id'] = {
@@ -113,16 +113,33 @@ async def test_entire_workflow_update_gather_error(
# Call the entire_workflow_update function.
# This should use the client defined above (``async_client``) when
# calling ``workflow_request``.
- caplog.clear()
await data_store_mgr._entire_workflow_update()
assert caplog.record_tuples == [
- (
- 'cylc',
- 40,
- 'Failed to update entire local data-store of a workflow'
- )
+ ('cylc', 40, 'Error communicating with myflow'),
+ ('cylc', 40, 'x'),
+ ('cylc', 40,
+ 'Failed to update entire local data-store of a workflow: x'),
+ ]
+ exc_info = caplog.records[1].exc_info
+ assert exc_info and exc_info[0] == ValueError
+
+
+async def test_entire_workflow_update__stopped_workflow(
+ async_client: AsyncClientFixture,
+ data_store_mgr: DataStoreMgr,
+ caplog: pytest.LogCaptureFixture,
+):
+ """Test that DataStoreMgr._entire_workflow_update() handles a stopped
+ workflow reasonably."""
+ exc = WorkflowStopped('myflow')
+ async_client.will_return(exc)
+ data_store_mgr.workflows_mgr.workflows['workflow_id'] = {
+ 'req_client': async_client
+ }
+ await data_store_mgr._entire_workflow_update()
+ assert caplog.record_tuples == [
+ ('cylc', 40, f'WorkflowStopped: {exc}'),
]
- assert caplog.records[0].exc_info[0] == error_type
async def test_register_workflow(
diff --git a/cylc/uiserver/tests/test_workflows_mgr.py b/cylc/uiserver/tests/test_workflows_mgr.py
index ed5408ce..f922ac8e 100644
--- a/cylc/uiserver/tests/test_workflows_mgr.py
+++ b/cylc/uiserver/tests/test_workflows_mgr.py
@@ -16,6 +16,7 @@
from itertools import product
import logging
from random import random
+from typing import Type
import pytest
@@ -40,49 +41,27 @@
# --- workflow_request
-async def test_workflow_request_client_timeout(
- async_client: AsyncClientFixture):
- async_client.will_return(ClientTimeout)
- ctx, msg = await workflow_request(client=async_client, command='')
- assert not ctx
- assert 'timeout' in msg.lower() # type: ignore[attr-defined]
-
-
+@pytest.mark.parametrize(
+ 'exc', [ClientError, ClientTimeout]
+)
async def test_workflow_request_client_error(
- async_client: AsyncClientFixture, caplog):
- caplog.set_level(logging.CRITICAL, logger='cylc')
- async_client.will_return(ClientError)
- ctx, msg = await workflow_request(client=async_client, command='')
- assert not ctx
- assert not msg
+ exc: Type[Exception],
+ async_client: AsyncClientFixture,
+ caplog: pytest.LogCaptureFixture
+):
+ caplog.set_level(logging.ERROR, logger='cylc')
+ logger = logging.getLogger('cylc')
+ async_client.will_return(exc)
+ with pytest.raises(exc):
+ await workflow_request(client=async_client, command='', log=logger)
+ assert exc.__name__ in caplog.text
-@pytest.mark.parametrize(
- "returns,command,req_context,expected_ctx,expected_msg",
- [
- pytest.param(
- 42, 'cmd', None, 'cmd', 42
- ),
- pytest.param(
- 42, '', None, '', 42
- ),
- pytest.param(
- 42, 'cmd', 'some-context', 'some-context', 42
- )
- ])
-async def test_workflow_request(
- async_client: AsyncClientFixture,
- returns,
- command,
- req_context,
- expected_ctx,
- expected_msg
-):
- async_client.will_return(returns)
- ctx, msg = await workflow_request(
- client=async_client, command=command, req_context=req_context)
- assert expected_ctx == ctx
- assert expected_msg == msg
+async def test_workflow_request(async_client: AsyncClientFixture):
+ """Test normal response of workflow_request matches async_request"""
+ async_client.will_return(42)
+ res = await workflow_request(client=async_client, command='')
+ assert res == 42
# --- WorkflowsManager
@@ -226,7 +205,7 @@ async def test_workflow_state_change_uuid(
async def test_multi_request(
- workflows_manager,
+ workflows_manager: WorkflowsManager,
async_client: AsyncClientFixture
):
workflow_id = 'multi-request-workflow'
@@ -251,17 +230,18 @@ async def test_multi_request(
response = await workflows_manager.multi_request(
'', [workflow_id], None, multi_args)
assert len(response) == 1
- assert value == response[0]
+ assert response[0] == res
async def test_multi_request_gather_errors(
workflows_manager,
async_client: AsyncClientFixture,
- caplog
+ caplog: pytest.LogCaptureFixture
):
workflow_id = 'gather-error-workflow'
error_type = ValueError
async_client.will_return(error_type)
+ async_client.workflow = workflow_id
workflows_manager.workflows[workflow_id] = {
'req_client': async_client
@@ -270,9 +250,11 @@ async def test_multi_request_gather_errors(
caplog.clear()
await workflows_manager.multi_request('', [workflow_id], None, None)
assert caplog.record_tuples == [
- ('cylc', 40, 'Failed to send requests to multiple workflows')
+ ('cylc', 40, f'Error communicating with {workflow_id}'),
+ ('cylc', 40, 'x'),
]
- assert caplog.records[0].exc_info[0] == error_type
+ exc_info = caplog.records[1].exc_info
+ assert exc_info and exc_info[0] == error_type
async def test_crashed_workflow(one_workflow_aiter, caplog, uis_caplog):
diff --git a/cylc/uiserver/workflows_mgr.py b/cylc/uiserver/workflows_mgr.py
index 5d036481..e441c388 100644
--- a/cylc/uiserver/workflows_mgr.py
+++ b/cylc/uiserver/workflows_mgr.py
@@ -28,14 +28,14 @@
from pathlib import Path
import sys
from typing import (
- TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Union
+ TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union
)
import zmq.asyncio
from cylc.flow.id import Tokens
from cylc.flow.exceptions import ClientError, ClientTimeout
-from cylc.flow.network import API, MSG_TIMEOUT
+from cylc.flow.network import API
from cylc.flow.network.client import WorkflowRuntimeClient
from cylc.flow.network.scan import (
api_version,
@@ -61,11 +61,10 @@ async def workflow_request(
command: str,
args: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
- req_context: Optional[str] = None,
*,
log: Optional['Logger'] = None,
req_meta: Optional[Dict[str, Any]] = None
-) -> Tuple[str, object]:
+) -> Union[bytes, object]:
"""Workflow request command.
Args:
@@ -73,30 +72,30 @@ async def workflow_request(
command: Command/Endpoint name.
args: Endpoint arguments.
timeout: Client request timeout (secs).
- req_context: A string to identifier.
req_meta: Meta data related to request, e.g. auth_user
-
- Returns:
- tuple: (req_context, result)
"""
- if req_context is None:
- req_context = command
try:
- result = await client.async_request(
- command, args, timeout, req_meta=req_meta)
- return (req_context, result)
- except ClientTimeout as exc:
+ return await client.async_request(
+ command, args, timeout, req_meta=req_meta
+ )
+ except (ClientTimeout, ClientError) as exc:
+ # Expected error
if log:
- log.exception(exc)
+ log.error(f"{type(exc).__name__}: {exc}")
else:
print(exc, file=sys.stderr)
- return (req_context, MSG_TIMEOUT)
- except ClientError as exc:
+ exc.workflow = client.workflow
+ raise exc
+ except Exception as exc:
+ # Unexpected error
+ msg = f"Error communicating with {client.workflow}"
if log:
+ log.error(msg)
log.exception(exc)
else:
+ print(msg, file=sys.stderr)
print(exc, file=sys.stderr)
- return (req_context, None)
+ raise exc
async def run_coros_in_order(*coros):
@@ -367,7 +366,7 @@ async def multi_request(
multi_args: Optional[Dict[str, Any]] = None,
timeout=None,
req_meta: Optional[Dict[str, Any]] = None
- ) -> List[object]:
+ ) -> List[Union[bytes, object, Exception]]:
"""Send requests to multiple workflows."""
if args is None:
args = {}
@@ -375,43 +374,20 @@ async def multi_request(
multi_args = {}
if req_meta is None:
req_meta = {}
- req_args = {
- w_id: (
+ gathers = [
+ workflow_request(
self.workflows[w_id]['req_client'],
command,
multi_args.get(w_id, args),
timeout,
+ log=self.log,
+ req_meta=req_meta
)
for w_id in workflows
# skip stopped workflows
if self.workflows.get(w_id, {}).get('req_client')
- }
- gathers = [
- workflow_request(
- *request_args,
- req_context=info,
- log=self.log,
- req_meta=req_meta
- )
- for info, request_args in req_args.items()
]
- results = await asyncio.gather(*gathers, return_exceptions=True)
- res = []
- for result in results:
- if isinstance(result, Exception):
- self.log.exception(
- 'Failed to send requests to multiple workflows',
- exc_info=result
- )
- else:
- _, val = result
- res.extend([
- msg_core
- for msg_core in list(val.values())[0].get('result')
- if isinstance(val, dict)
- and list(val.values())
- ])
- return res
+ return await asyncio.gather(*gathers, return_exceptions=True)
async def scan(self):
"""Request a new workflow scan."""