Skip to content

Commit

Permalink
tests for api listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
genesiscrew committed May 23, 2024
1 parent e3ecced commit 9e139af
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 1 deletion.
2 changes: 1 addition & 1 deletion tests/api/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async def test_api_close(connected_zboss, mocker):
def dict_minus(d, minus):
return {k: v for k, v in d.items() if k not in minus}

ignored_keys = ["_blocking_request_lock", "nvram", "_listeners"]
ignored_keys = ["_blocking_request_lock", "nvram"]

# Closing ZBOSS should reset it completely to that of a fresh object
# We have to ignore our mocked method and the lock
Expand Down
182 changes: 182 additions & 0 deletions tests/api/test_listeners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import asyncio
from unittest.mock import call

import pytest

import zigpy_zboss.types as t
import zigpy_zboss.commands as c
from zigpy_zboss.api import OneShotResponseListener, IndicationListener


@pytest.mark.asyncio
async def test_resolve(event_loop, mocker):
callback = mocker.Mock()
callback_listener = IndicationListener(
[c.NcpConfig.GetZigbeeRole.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
DeviceRole=t.DeviceRole(1)
)], callback
)

future = event_loop.create_future()
one_shot_listener = OneShotResponseListener([c.NcpConfig.GetZigbeeRole.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
DeviceRole=t.DeviceRole(1)
)], future)

match = c.NcpConfig.GetZigbeeRole.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
DeviceRole=t.DeviceRole(1)
)
no_match = c.NcpConfig.GetModuleVersion.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
FWVersion=1,
StackVersion=2,
ProtocolVersion=3,
)

assert callback_listener.resolve(match)
assert not callback_listener.resolve(no_match)
assert callback_listener.resolve(match)
assert not callback_listener.resolve(no_match)

assert one_shot_listener.resolve(match)
assert not one_shot_listener.resolve(no_match)

callback.assert_has_calls([call(match), call(match)])
assert callback.call_count == 2

assert (await future) == match

# Cancelling a callback will have no effect
assert not callback_listener.cancel()

# Cancelling a one-shot listener does not throw any errors
assert one_shot_listener.cancel()
assert one_shot_listener.cancel()
assert one_shot_listener.cancel()


@pytest.mark.asyncio
async def test_cancel(event_loop):
# Cancelling a one-shot listener prevents it from being fired
future = event_loop.create_future()
one_shot_listener = OneShotResponseListener([c.NcpConfig.GetZigbeeRole.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
partial=True
)], future)
one_shot_listener.cancel()

match = c.NcpConfig.GetZigbeeRole.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
DeviceRole=t.DeviceRole(1)
)
assert not one_shot_listener.resolve(match)

with pytest.raises(asyncio.CancelledError):
await future


@pytest.mark.asyncio
async def test_multi_cancel(event_loop, mocker):
callback = mocker.Mock()
callback_listener = IndicationListener(
[c.NcpConfig.GetZigbeeRole.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
partial=True
)], callback
)

future = event_loop.create_future()
one_shot_listener = OneShotResponseListener([c.NcpConfig.GetZigbeeRole.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
partial=True
)], future)

match = c.NcpConfig.GetZigbeeRole.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
DeviceRole=t.DeviceRole(1)
)
no_match = c.NcpConfig.GetModuleVersion.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
FWVersion=1,
StackVersion=2,
ProtocolVersion=3,
)

assert callback_listener.resolve(match)
assert not callback_listener.resolve(no_match)

assert one_shot_listener.resolve(match)
assert not one_shot_listener.resolve(no_match)

callback.assert_called_once_with(match)
assert (await future) == match


@pytest.mark.asyncio
async def test_api_cancel_listeners(connected_zboss, mocker):
zboss, zboss_server = connected_zboss

callback = mocker.Mock()

zboss.register_indication_listener(
c.NcpConfig.GetZigbeeRole.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
DeviceRole=t.DeviceRole(1)
), callback
)
future = zboss.wait_for_responses(
[
c.NcpConfig.GetZigbeeRole.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
DeviceRole=t.DeviceRole(1)
),
c.NcpConfig.GetModuleVersion.Rsp(
TSN=10,
StatusCat=t.StatusCategory(1),
StatusCode=20,
FWVersion=1,
StackVersion=2,
ProtocolVersion=3,
),
]
)

assert not future.done()
zboss.close()

with pytest.raises(asyncio.CancelledError):
await future

# add_done_callback won't be executed immediately
await asyncio.sleep(0.1)

# only one shot listerner is cleared
# we do not remove indication listeners
# because
assert len(zboss._listeners) == 0
1 change: 1 addition & 0 deletions zigpy_zboss/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def close(self) -> None:
for _header, listeners in self._listeners.items():
for listener in listeners:
listener.cancel()
self._listeners.clear()

self.version = None
self.capabilities = None
Expand Down

0 comments on commit 9e139af

Please sign in to comment.