Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync 1.3x -> master #486

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ creating a new release entry be sure to copy & paste the span tag with the
`actions:bind` attribute, which is used by a regex to find the text to be
updated. Only the first match gets replaced, so it's fine to leave the old
ones in. -->
-------------------------------------------------------------------------------
## __cylc-uiserver-1.3.1 (<span actions:bind='release-date'>Upcoming</span>)__

<!-- [Updated cylc-ui to x.y.z](https://github.com/cylc/cylc-ui/blob/master/CHANGES.md) -->

### Fixes

[#379](https://github.com/cylc/cylc-uiserver/pull/379) - Fixed lack of info
for errors recorded in logs.

-------------------------------------------------------------------------------
## __cylc-uiserver-1.3.0 (<span actions:bind='release-date'>Released 2023-07-21</span>)__
Expand Down
79 changes: 34 additions & 45 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import time
from typing import Dict, Optional, Set

from cylc.flow.exceptions import WorkflowStopped
from cylc.flow.id import Tokens
from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.network import MSG_TIMEOUT
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.data_store_mgr import (
EDGES, DATA_TEMPLATE, ALL_DELTAS, DELTAS_MAP, WORKFLOW,
Expand Down Expand Up @@ -337,7 +337,7 @@ def _reconcile_update(self, topic, delta, w_id):
),
self.loop
)
_, new_delta_msg = future.result(self.RECONCILE_TIMEOUT)
new_delta_msg = future.result(self.RECONCILE_TIMEOUT)
new_delta = DELTAS_MAP[topic]()
new_delta.ParseFromString(new_delta_msg)
self._clear_data_field(w_id, topic)
Expand Down Expand Up @@ -366,52 +366,41 @@ async def _entire_workflow_update(

# Request new data
req_method = 'pb_entire_workflow'
req_kwargs = (
{
'client': info['req_client'],
'command': req_method,
'req_context': w_id
}

requests = {
w_id: workflow_request(
client=info['req_client'], command=req_method, log=self.log
)
for w_id, info in self.workflows_mgr.workflows.items()
if info.get('req_client') # skip stopped workflows
and (not ids or w_id in ids)
}
results = await asyncio.gather(
*requests.values(), return_exceptions=True
)

gathers = [
workflow_request(**kwargs)
for kwargs in req_kwargs
if not ids or kwargs['req_context'] in ids
]
items = await asyncio.gather(*gathers, return_exceptions=True)

successes: Set[str] = set()
for item in items:
if isinstance(item, Exception):
self.log.exception(
'Failed to update entire local data-store '
'of a workflow', exc_info=item
)
else:
w_id, result = item
if result is not None and result != MSG_TIMEOUT:
pb_data = PB_METHOD_MAP[req_method]()
pb_data.ParseFromString(result)
new_data = deepcopy(DATA_TEMPLATE)
for field, value in pb_data.ListFields():
if field.name == WORKFLOW:
new_data[field.name].CopyFrom(value)
new_data['delta_times'] = {
key: value.last_updated
for key in DATA_TEMPLATE
}
continue
new_data[field.name] = {n.id: n for n in value}
self.data[w_id] = new_data
successes.add(w_id)
else:
for w_id, result in zip(requests, results):
if isinstance(result, Exception):
if not isinstance(result, WorkflowStopped):
self.log.error(
f'Error: communicating with {w_id} - {result}'
'Failed to update entire local data-store '
f'of a workflow: {result}'
)

continue
pb_data = PB_METHOD_MAP[req_method]()
pb_data.ParseFromString(result)
new_data = deepcopy(DATA_TEMPLATE)
for field, value in pb_data.ListFields():
if field.name == WORKFLOW:
new_data[field.name].CopyFrom(value)
new_data['delta_times'] = {
key: value.last_updated
for key in DATA_TEMPLATE
}
continue
new_data[field.name] = {n.id: n for n in value}
self.data[w_id] = new_data
successes.add(w_id)
return successes

def _update_contact(
Expand Down Expand Up @@ -488,12 +477,12 @@ def _get_status_msg(self, w_id: str, is_active: bool) -> str:
if is_active:
# this will get overridden when we sync with the workflow
# set a sensible default here incase the sync takes a while
return 'Running'
return 'running'
w_id = Tokens(w_id)['workflow']
db_file = Path(get_workflow_srv_dir(w_id), WorkflowFiles.Service.DB)
if db_file.exists():
# the workflow has previously run
return 'Stopped'
return 'stopped'
else:
# the workflow has not yet run
return 'Not yet run'
return 'not yet run'
4 changes: 3 additions & 1 deletion cylc/uiserver/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Test code and fixtures."""

import asyncio
from getpass import getuser
import inspect
import logging
Expand Down Expand Up @@ -47,6 +46,7 @@ class AsyncClientFixture(WorkflowRuntimeClient):
pattern = zmq.REQ
host = ''
port = 0
workflow = 'myflow'

def __init__(self):
self.returns = None
Expand All @@ -57,6 +57,8 @@ def will_return(self, returns):
async def async_request(
self, command, args=None, timeout=None, req_meta=None
):
if isinstance(self.returns, Exception):
raise self.returns
if (
inspect.isclass(self.returns)
and issubclass(self.returns, Exception)
Expand Down
43 changes: 30 additions & 13 deletions cylc/uiserver/tests/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import pytest
import zmq

from cylc.flow.exceptions import ClientTimeout, WorkflowStopped
from cylc.flow.id import Tokens
from cylc.flow.network import (MSG_TIMEOUT, ZMQSocketBase)
from cylc.flow.network import ZMQSocketBase
from cylc.flow.workflow_files import ContactFileFields as CFF

from cylc.uiserver.data_store_mgr import DataStoreMgr
Expand Down Expand Up @@ -69,7 +70,7 @@ async def test_entire_workflow_update_ignores_timeout_message(
receives a ``MSG_TIMEOUT`` message.
"""
w_id = 'workflow_id'
async_client.will_return(MSG_TIMEOUT)
async_client.will_return(ClientTimeout)

# Set the client used by our test workflow.
data_store_mgr.workflows_mgr.workflows[w_id] = {
Expand All @@ -81,7 +82,7 @@ async def test_entire_workflow_update_ignores_timeout_message(
# calling ``workflow_request``.
await data_store_mgr._entire_workflow_update()

# When a ``MSG_TIMEOUT`` happens, the ``DataStoreMgr`` object ignores
# When a ClientTimeout happens, the ``DataStoreMgr`` object ignores
# that message. So it means that its ``.data`` dictionary MUST NOT
# have an entry for the Workflow ID.
assert w_id not in data_store_mgr.data
Expand All @@ -90,7 +91,7 @@ async def test_entire_workflow_update_ignores_timeout_message(
async def test_entire_workflow_update_gather_error(
async_client: AsyncClientFixture,
data_store_mgr: DataStoreMgr,
caplog,
caplog: pytest.LogCaptureFixture,
):
"""
Test that if ``asyncio.gather`` in ``entire_workflow_update``
Expand All @@ -102,8 +103,7 @@ async def test_entire_workflow_update_gather_error(
#
# This test wants to confirm this is not raised, but instead the
# error is returned, so that we can inspect, log, etc.
error_type = ValueError
async_client.will_return(error_type)
async_client.will_return(ValueError)

# Set the client used by our test workflow.
data_store_mgr.workflows_mgr.workflows['workflow_id'] = {
Expand All @@ -113,16 +113,33 @@ async def test_entire_workflow_update_gather_error(
# Call the entire_workflow_update function.
# This should use the client defined above (``async_client``) when
# calling ``workflow_request``.
caplog.clear()
await data_store_mgr._entire_workflow_update()
assert caplog.record_tuples == [
(
'cylc',
40,
'Failed to update entire local data-store of a workflow'
)
('cylc', 40, 'Error communicating with myflow'),
('cylc', 40, 'x'),
('cylc', 40,
'Failed to update entire local data-store of a workflow: x'),
]
exc_info = caplog.records[1].exc_info
assert exc_info and exc_info[0] == ValueError


async def test_entire_workflow_update__stopped_workflow(
async_client: AsyncClientFixture,
data_store_mgr: DataStoreMgr,
caplog: pytest.LogCaptureFixture,
):
"""Test that DataStoreMgr._entire_workflow_update() handles a stopped
workflow reasonably."""
exc = WorkflowStopped('myflow')
async_client.will_return(exc)
data_store_mgr.workflows_mgr.workflows['workflow_id'] = {
'req_client': async_client
}
await data_store_mgr._entire_workflow_update()
assert caplog.record_tuples == [
('cylc', 40, f'WorkflowStopped: {exc}'),
]
assert caplog.records[0].exc_info[0] == error_type


async def test_register_workflow(
Expand Down
72 changes: 27 additions & 45 deletions cylc/uiserver/tests/test_workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from itertools import product
import logging
from random import random
from typing import Type

import pytest

Expand All @@ -40,49 +41,27 @@

# --- workflow_request

async def test_workflow_request_client_timeout(
async_client: AsyncClientFixture):
async_client.will_return(ClientTimeout)
ctx, msg = await workflow_request(client=async_client, command='')
assert not ctx
assert 'timeout' in msg.lower() # type: ignore[attr-defined]


@pytest.mark.parametrize(
'exc', [ClientError, ClientTimeout]
)
async def test_workflow_request_client_error(
async_client: AsyncClientFixture, caplog):
caplog.set_level(logging.CRITICAL, logger='cylc')
async_client.will_return(ClientError)
ctx, msg = await workflow_request(client=async_client, command='')
assert not ctx
assert not msg
exc: Type[Exception],
async_client: AsyncClientFixture,
caplog: pytest.LogCaptureFixture
):
caplog.set_level(logging.ERROR, logger='cylc')
logger = logging.getLogger('cylc')
async_client.will_return(exc)
with pytest.raises(exc):
await workflow_request(client=async_client, command='', log=logger)
assert exc.__name__ in caplog.text


@pytest.mark.parametrize(
"returns,command,req_context,expected_ctx,expected_msg",
[
pytest.param(
42, 'cmd', None, 'cmd', 42
),
pytest.param(
42, '', None, '', 42
),
pytest.param(
42, 'cmd', 'some-context', 'some-context', 42
)
])
async def test_workflow_request(
async_client: AsyncClientFixture,
returns,
command,
req_context,
expected_ctx,
expected_msg
):
async_client.will_return(returns)
ctx, msg = await workflow_request(
client=async_client, command=command, req_context=req_context)
assert expected_ctx == ctx
assert expected_msg == msg
async def test_workflow_request(async_client: AsyncClientFixture):
"""Test normal response of workflow_request matches async_request"""
async_client.will_return(42)
res = await workflow_request(client=async_client, command='')
assert res == 42


# --- WorkflowsManager
Expand Down Expand Up @@ -226,7 +205,7 @@ async def test_workflow_state_change_uuid(


async def test_multi_request(
workflows_manager,
workflows_manager: WorkflowsManager,
async_client: AsyncClientFixture
):
workflow_id = 'multi-request-workflow'
Expand All @@ -251,17 +230,18 @@ async def test_multi_request(
response = await workflows_manager.multi_request(
'', [workflow_id], None, multi_args)
assert len(response) == 1
assert value == response[0]
assert response[0] == res


async def test_multi_request_gather_errors(
workflows_manager,
async_client: AsyncClientFixture,
caplog
caplog: pytest.LogCaptureFixture
):
workflow_id = 'gather-error-workflow'
error_type = ValueError
async_client.will_return(error_type)
async_client.workflow = workflow_id

workflows_manager.workflows[workflow_id] = {
'req_client': async_client
Expand All @@ -270,9 +250,11 @@ async def test_multi_request_gather_errors(
caplog.clear()
await workflows_manager.multi_request('', [workflow_id], None, None)
assert caplog.record_tuples == [
('cylc', 40, 'Failed to send requests to multiple workflows')
('cylc', 40, f'Error communicating with {workflow_id}'),
('cylc', 40, 'x'),
]
assert caplog.records[0].exc_info[0] == error_type
exc_info = caplog.records[1].exc_info
assert exc_info and exc_info[0] == error_type


async def test_crashed_workflow(one_workflow_aiter, caplog, uis_caplog):
Expand Down
Loading