Skip to content

Commit

Permalink
Shutdown subscription after exactly 30 minutes (#876)
Browse files Browse the repository at this point in the history
Co-authored-by: Marcel van der Veldt <[email protected]>
  • Loading branch information
agners and marcelveldt committed Sep 4, 2024
1 parent 5c130ec commit b3fd089
Showing 1 changed file with 28 additions and 21 deletions.
49 changes: 28 additions & 21 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from chip.clusters.ClusterObjects import ALL_ATTRIBUTES, ALL_CLUSTERS, Cluster
from chip.discovery import DiscoveryType
from chip.exceptions import ChipStackError
from chip.native import PyChipError
from zeroconf import BadTypeInNameException, IPVersion, ServiceStateChange, Zeroconf
from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf

Expand Down Expand Up @@ -80,8 +81,8 @@
NODE_SUBSCRIPTION_CEILING_WIFI = 60
NODE_SUBSCRIPTION_CEILING_THREAD = 60
NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED = 600
NODE_RESUBSCRIBE_ATTEMPTS_UNAVAILABLE = 3
NODE_RESUBSCRIBE_TIMEOUT_OFFLINE = 30 * 60 * 1000
NODE_RESUBSCRIBE_ATTEMPTS_UNAVAILABLE = 2
NODE_RESUBSCRIBE_TIMEOUT_OFFLINE = 30 * 60
NODE_RESUBSCRIBE_FORCE_TIMEOUT = 5
NODE_PING_TIMEOUT = 10
NODE_PING_TIMEOUT_BATTERY_POWERED = 60
Expand Down Expand Up @@ -149,7 +150,8 @@ def __init__(
self._node_last_seen_on_mdns: dict[int, float] = {}
self._nodes: dict[int, MatterNodeData] = {}
self._last_known_ip_addresses: dict[int, list[str]] = {}
self._last_subscription_attempt: dict[int, int] = {}
self._resubscription_attempt: dict[int, int] = {}
self._first_resubscribe_attempt: dict[int, float] = {}
self._known_commissioning_params: dict[int, CommissioningParameters] = {}
self._known_commissioning_params_timers: dict[int, asyncio.TimerHandle] = {}
self._aiobrowser: AsyncServiceBrowser | None = None
Expand Down Expand Up @@ -1188,29 +1190,37 @@ def resubscription_attempted(
nextResubscribeIntervalMsec: int,
) -> None:
# pylint: disable=unused-argument, invalid-name
resubscription_attempt = self._resubscription_attempt[node_id]
node_logger.info(
"Previous subscription failed with Error: %s, re-subscribing in %s ms...",
terminationError,
nextResubscribeIntervalMsec,
"Subscription failed with %s, resubscription attempt %s",
str(PyChipError(code=terminationError)),
resubscription_attempt,
)
resubscription_attempt = self._last_subscription_attempt[node_id] + 1
self._last_subscription_attempt[node_id] = resubscription_attempt
self._resubscription_attempt[node_id] = resubscription_attempt + 1
if resubscription_attempt == 0:
self._first_resubscribe_attempt[node_id] = time.time()
# Mark node as unavailable and signal consumers.
# We debounce it a bit so we only mark the node unavailable
# after some resubscription attempts and we shutdown the subscription
# if the resubscription interval exceeds 30 minutes (TTL of mdns).
# The node will be auto picked up by mdns if it's alive again.
# after some resubscription attempts.
if resubscription_attempt >= NODE_RESUBSCRIBE_ATTEMPTS_UNAVAILABLE:
self._node_unavailable(node_id)
if nextResubscribeIntervalMsec > NODE_RESUBSCRIBE_TIMEOUT_OFFLINE:
# Shutdown the subscription if we tried to resubscribe for more than 30
# minutes (typical TTL of mDNS). We assume this device got powered off.
# When the device gets powered on again, it typically announces itself via
# mDNS again. The mDNS browsing code will setup the subscription again.
if (
time.time() - self._first_resubscribe_attempt[node_id]
> NODE_RESUBSCRIBE_TIMEOUT_OFFLINE
):
asyncio.create_task(self._node_offline(node_id))

def resubscription_succeeded(
transaction: Attribute.SubscriptionTransaction,
) -> None:
# pylint: disable=unused-argument, invalid-name
node_logger.info("Re-Subscription succeeded")
self._last_subscription_attempt[node_id] = 0
self._resubscription_attempt[node_id] = 0
self._first_resubscribe_attempt.pop(node_id, None)
# mark node as available and signal consumers
node = self._nodes[node_id]
if not node.available:
Expand All @@ -1233,7 +1243,7 @@ def resubscription_succeeded(
interval_ceiling = NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED
else:
interval_ceiling = NODE_SUBSCRIPTION_CEILING_THREAD
self._last_subscription_attempt[node_id] = 0
self._resubscription_attempt[node_id] = 0
# set-up the actual subscription
sub: Attribute.SubscriptionTransaction = (
await self._chip_device_controller.read_attribute(
Expand Down Expand Up @@ -1606,15 +1616,12 @@ def _node_unavailable(
async def _node_offline(self, node_id: int) -> None:
"""Mark node as offline."""
# shutdown existing subscriptions
node_logger = self.get_node_logger(LOGGER, node_id)
node_logger.info("Node considered offline, shutdown subscription")
await self._chip_device_controller.shutdown_subscription(node_id)

# mark node as unavailable (if it wasn't already)
node = self._nodes[node_id]
if not node.available:
return # nothing to do to
node.available = False
self.server.signal_event(EventType.NODE_UPDATED, node)
node_logger = self.get_node_logger(LOGGER, node_id)
node_logger.info("Marked node as offline")
self._node_unavailable(node_id)

async def _fallback_node_scanner(self) -> None:
"""Scan for operational nodes in the background that are missed by mdns."""
Expand Down

0 comments on commit b3fd089

Please sign in to comment.