Skip to content

Commit

Permalink
Fxing cov. There is still something wrong when cancelling it....it fr…
Browse files Browse the repository at this point in the history
…eezes. Devices must be awaited now. New BAC0.start() fucntion to replace lite, connect, etc... Fix arg for iam.
  • Loading branch information
ChristianTremblay committed Aug 21, 2024
1 parent 73cae11 commit 4855ab8
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 93 deletions.
7 changes: 2 additions & 5 deletions BAC0/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from . import core, tasks
from .core.devices.Device import DeviceLoad as load
from .core.devices.Device import device as device
from .core.devices.Device import device_async as aDevice
from .core.devices.Trends import TrendLog as TrendLog
from .core.utils.notes import update_log_level as log_level
from .infos import __version__ as version
Expand All @@ -36,10 +35,8 @@
from .tasks.Poll import SimplePoll as poll

from .scripts.Lite import Lite as lite # to maintain compatibility with old code

# from .scripts.Lite import Lite as app
# Import proprietary classes
# from .core.proprietary_objects.legacy import jci
from .scripts.Lite import Lite as connect # to maintain compatibility with old code
from .scripts.Lite import Lite as start # this would be the new preferred way to start a BAC0 app

except ImportError as error:
print("=" * 80)
Expand Down
18 changes: 9 additions & 9 deletions BAC0/core/devices/Device.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,15 +438,15 @@ def __repr__(self) -> str:
return f"{self.properties.name} / Undefined"


def device(*args: Any, **kwargs: Any) -> Device:
dev = Device(*args, **kwargs)
t = asyncio.create_task(dev.new_state(DeviceDisconnected))
dev.creation_task = t
while not t.done:
pass
return dev

async def device_async(*args: Any, **kwargs: Any) -> Device:
#def device(*args: Any, **kwargs: Any) -> Device:
# dev = Device(*args, **kwargs)
# t = asyncio.create_task(dev.new_state(DeviceDisconnected))
# dev.creation_task = t
# while not t.done:
# pass
# return dev

async def device(*args: Any, **kwargs: Any) -> Device:
dev = Device(*args, **kwargs)
await dev.new_state(DeviceDisconnected)
return dev
Expand Down
155 changes: 87 additions & 68 deletions BAC0/core/devices/Points.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class Point:
"""

_cache_delta = timedelta(seconds=5)
_cov_identifier = 0
_last_cov_identifier = 0
_running_cov_tasks = {}

def __init__(
self,
Expand Down Expand Up @@ -636,9 +637,7 @@ def __len__(self):
"""
return len(self.history)

def subscribe_cov(
self, confirmed: bool = True, lifetime: int = None, callback=None
):
def subscribe_cov(self, confirmed: bool = False, lifetime: int = 900):
"""
Subscribes to the Change of Value (COV) service for this point.
Expand All @@ -659,73 +658,22 @@ def subscribe_cov(
Returns:
None
"""
address = Address(self.properties.device.properties.address)
obj_identifier = ObjectIdentifier(
(self.properties.type, int(self.properties.address))
)
_app = self.properties.device.properties.network.this_application.app
process_identifier = Point._cov_identifier + 1
Point._cov_identifier = process_identifier
self.cov_registered = True

async def cov_ctxmgr(
address: Address = None,
obj_identifier: ObjectIdentifier = None,
confirmed: bool = False,
lifetime: int = None,
identifier: int = None,
):
self.log(f"Subscribing to COV for {self.properties.name}", level="info")
try:
async with _app.change_of_value(
address,
obj_identifier,
identifier,
confirmed,
lifetime,
) as scm:
while self.cov_registered is True:
incoming = asyncio.ensure_future(scm.get_value())
done, pending = await asyncio.wait(
[incoming],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
self._log.info(
f"Canceling COV subscription for {self.properties.name}"
)
task.cancel()
if incoming in done:
property_identifier, property_value = incoming.result()
self.log(
f"COV notification received for {self.properties.name} | {property_identifier} -> {type(property_identifier)} with value {property_value} | {property_value} -> {type(property_value)}",
level="debug",
)
if property_identifier == PropertyIdentifier.presentValue:
val = extract_value_from_primitive_data(property_value)
self._trend(val)
elif property_identifier == PropertyIdentifier.statusFlags:
self.properties.status_flags = property_value
else:
self._log.warning(
f"Unsupported COV property identifier {property_identifier}"
)
except Exception as e:
self.log(f"Error in COV subscription : {e}", level="error")

asyncio.create_task(
cov_ctxmgr(
address=address,
obj_identifier=obj_identifier,
confirmed=confirmed,
lifetime=lifetime,
identifier=process_identifier,
)
self.cov_task = COVSubscription(
point=self, confirmed=confirmed, lifetime=lifetime
)
Point._running_cov_tasks[self.cov_task.process_identifier] = self.cov_task
self.cov_task.task = asyncio.create_task(self.cov_task.run())

def cancel_cov(self):
async def cancel_cov(self):
self.log(f"Canceling COV subscription for {self.properties.name}", level="info")
self.cov_registered = False
process_identifer = self.cov_task.process_identifier

if process_identifer not in Point._running_cov_tasks:
await self.response("COV subscription not found")
return
cov_subscription = Point._running_cov_tasks.pop(process_identifer)
cov_subscription.stop()
await cov_subscription.task

def update_description(self, value):
asyncio.create_task(self._update_description(value=value))
Expand Down Expand Up @@ -1530,6 +1478,77 @@ def release(self, value, *, prop="presentValue", priority=""):
raise OfflineException("Must be online to write")


class COVSubscription:
def __init__(
self, point: Point = None, lifetime: int = 900, confirmed: bool = False
):
self.address = Address(point.properties.device.properties.address)
self.cov_fini = asyncio.Event()
self.task = None
self.obj_identifier = ObjectIdentifier(
(point.properties.type, int(point.properties.address))
)
self._app = point.properties.device.properties.network.this_application.app
self.process_identifier = Point._last_cov_identifier + 1
Point._last_cov_identifier = self.process_identifier


self.point = point
self.lifetime = lifetime
self.confirmed = confirmed

async def run(self):
self.point.cov_registered = True
self.point.log(
f"Subscribing to COV for {self.point.properties.name}", level="info"
)
try:
async with self._app.change_of_value(
self.address,
self.obj_identifier,
self.process_identifier,
self.confirmed,
self.lifetime,
) as scm:
while self.point.cov_registered:
incoming: asyncio.Future = asyncio.ensure_future(scm.get_value())
done, pending = await asyncio.wait(
[incoming],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
self.point._log.info(
f"Canceling COV subscription for {self.point.properties.name}"
)
task.cancel()
if incoming in done:
property_identifier, property_value = incoming.result()
self.point.log(
f"COV notification received for {self.point.properties.name} | {property_identifier} -> {type(property_identifier)} with value {property_value} | {property_value} -> {type(property_value)}",
level="info",
)
if property_identifier == PropertyIdentifier.presentValue:
val = extract_value_from_primitive_data(
property_value
)
self.point._trend(val)
elif property_identifier == PropertyIdentifier.statusFlags:
self.point.properties.status_flags = property_value
else:
self.point._log.warning(
f"Unsupported COV property identifier {property_identifier}"
)
except Exception as e:
self.point.log(f"Error in COV subscription : {e}", level="error")

def stop(self):
self.point.log(
f"Stopping COV subscription for {self.point.properties.name}", level="info"
)
#self.cov_fini.set()
self.point.cov_registered = False


class OfflineException(Exception):
pass

Expand Down
35 changes: 24 additions & 11 deletions BAC0/core/functions/Alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@ class Alias:

async def who_is(self, address=None, low_limit=0, high_limit=4194303, timeout=3):
"""
Build a WhoIs request. WhoIs are sent to discover devices on the network.
If an address is specified, the request is sent to that address. Otherwise,
Build a WhoIs request. WhoIs requests are sent to discover devices on the network.
If an address is specified, the request is sent to that address. Otherwise,
the request is broadcast to the local network.
:param address: (optional) the address to send the request to
:param destination: (optional) the destination address
:param address: (optional) The address to send the request to.
:param destination: (optional) The destination address.
:returns: list of IAm responses
:returns: List of IAm responses.
Example::
whois()
whois('
import BAC0
bacnet = BAC0.lite()
bacnet.whois()
bacnet.whois('2:5')
"""
_iams = await self.this_application.app.who_is(
address=Address(address),
Expand All @@ -38,14 +41,12 @@ async def who_is(self, address=None, low_limit=0, high_limit=4194303, timeout=3)
)
return _iams

def iam(self, destination=None):
def iam(self, address=None):
"""
Build an IAm response. IAm are sent in response to a WhoIs request that;
matches our device ID, whose device range includes us, or is a broadcast.
Content is defined by the script (deviceId, vendor, etc...)
:returns: bool
Example::
iam()
Expand All @@ -54,9 +55,21 @@ def iam(self, destination=None):
_app: Application = _this_application.app
self.log("do_iam", level="debug")

_app.i_am()
_app.i_am(address=address)

async def whois_router_to_network(self, network=None, *, destination=None, timeout=3):
"""
Send a Who-Is-Router-To-Network request. This request is used to discover routers
on the network that can route messages to a specific network.
The function sends a broadcast message to the local network to find routers that
can route messages to the specified network. The response will contain information
about the routers that can handle the routing.
Example::
whois_router_to_network()
"""
# build a request
_this_application: BAC0Application = self.this_application
_app: Application = _this_application.app
Expand Down

0 comments on commit 4855ab8

Please sign in to comment.