Skip to content

Commit

Permalink
A small optimization to the node setup (#798)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt authored Jul 11, 2024
1 parent 91e6c53 commit 0b85b49
Showing 1 changed file with 65 additions and 57 deletions.
122 changes: 65 additions & 57 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def __init__(
self._aiozc: AsyncZeroconf | None = None
self._fallback_node_scanner_timer: asyncio.TimerHandle | None = None
self._fallback_node_scanner_task: asyncio.Task | None = None
self._node_setup_throttle = asyncio.Semaphore(5)
self._thread_node_setup_throttle = asyncio.Semaphore(5)
self._mdns_event_timer: dict[str, asyncio.TimerHandle] = {}
self._polled_attributes: dict[int, set[str]] = {}
self._custom_attribute_poller_timer: asyncio.TimerHandle | None = None
Expand Down Expand Up @@ -1105,6 +1105,9 @@ async def _setup_node(self, node_id: int) -> None:
node_logger = LOGGER.getChild(f"node_{node_id}")
node_data = self._nodes[node_id]
log_timers: dict[int, asyncio.TimerHandle] = {}
is_thread_node = (
node_data.attributes.get(ROUTING_ROLE_ATTRIBUTE_PATH) is not None
)

async def log_node_long_setup(time_start: float) -> None:
"""Temporary measure to track a locked-up SDK issue in some (special) circumstances."""
Expand Down Expand Up @@ -1136,22 +1139,49 @@ async def log_node_long_setup(time_start: float) -> None:
log_timers[node_id] = self._loop.call_later(
15 * 60, lambda: asyncio.create_task(log_node_long_setup(time_start))
)
# release semaphore to give an additional free slot for setup
# otherwise no thread nodes will be setup if 5 are stuck in this state
if is_thread_node:
self._thread_node_setup_throttle.release()

# use semaphore for thread based devices to (somewhat)
# throttle the traffic that setup/initial subscription generates
if is_thread_node:
await self._thread_node_setup_throttle.acquire()
time_start = time.time()
# we want to track nodes that take too long so we log it when we detect that
log_timers[node_id] = self._loop.call_later(
15 * 60, lambda: asyncio.create_task(log_node_long_setup(time_start))
)
try:
node_logger.info("Setting-up node...")

async with self._node_setup_throttle:
time_start = time.time()
# we want to track nodes that take too long so we log it when we detect that
log_timers[node_id] = self._loop.call_later(
15 * 60, lambda: asyncio.create_task(log_node_long_setup(time_start))
)
# try to resolve the node using the sdk first before do anything else
try:
node_logger.info("Setting-up node...")
await self._chip_device_controller.find_or_establish_case_session(
node_id=node_id
)
except NodeNotResolving as err:
node_logger.warning(
"Setup for node failed: %s",
str(err) or err.__class__.__name__,
# log full stack trace if verbose logging is enabled
exc_info=err if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it comes available again.
return

# try to resolve the node using the sdk first before do anything else
# (re)interview node (only) if needed
if (
# re-interview if we dont have any node attributes (empty node)
not node_data.attributes
# re-interview if the data model schema has changed
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
):
try:
await self._chip_device_controller.find_or_establish_case_session(
node_id=node_id
)
except NodeNotResolving as err:
await self.interview_node(node_id)
except NodeInterviewFailed as err:
node_logger.warning(
"Setup for node failed: %s",
str(err) or err.__class__.__name__,
Expand All @@ -1164,52 +1194,30 @@ async def log_node_long_setup(time_start: float) -> None:
# when it comes available again.
return

# (re)interview node (only) if needed
if (
# re-interview if we dont have any node attributes (empty node)
not node_data.attributes
# re-interview if the data model schema has changed
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
):
try:
await self.interview_node(node_id)
except NodeInterviewFailed as err:
node_logger.warning(
"Setup for node failed: %s",
str(err) or err.__class__.__name__,
# log full stack trace if verbose logging is enabled
exc_info=err
if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL)
else None,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it comes available again.
return

# setup subscriptions for the node
try:
await self._subscribe_node(node_id)
except ChipStackError as err:
node_logger.warning(
"Unable to subscribe to Node: %s",
str(err) or err.__class__.__name__,
# log full stack trace if verbose logging is enabled
exc_info=err
if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL)
else None,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.
return
# setup subscriptions for the node
try:
await self._subscribe_node(node_id)
except ChipStackError as err:
node_logger.warning(
"Unable to subscribe to Node: %s",
str(err) or err.__class__.__name__,
# log full stack trace if verbose logging is enabled
exc_info=err if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.
return

# check if this node has any custom clusters that need to be polled
if polled_attributes := check_polled_attributes(node_data):
self._polled_attributes[node_id] = polled_attributes
self._schedule_custom_attributes_poller()
# check if this node has any custom clusters that need to be polled
if polled_attributes := check_polled_attributes(node_data):
self._polled_attributes[node_id] = polled_attributes
self._schedule_custom_attributes_poller()

finally:
log_timers[node_id].cancel()
self._nodes_in_setup.discard(node_id)
finally:
log_timers[node_id].cancel()
self._nodes_in_setup.discard(node_id)
if is_thread_node:
self._thread_node_setup_throttle.release()

def _handle_endpoints_removed(self, node_id: int, endpoints: Iterable[int]) -> None:
"""Handle callback for when bridge endpoint(s) get deleted."""
Expand Down

0 comments on commit 0b85b49

Please sign in to comment.