Skip to content

Commit

Permalink
migrating and improving manual_test_create_device
Browse files Browse the repository at this point in the history
leads to improving write REGEX
  • Loading branch information
ChristianTremblay committed Sep 16, 2024
1 parent 642a41f commit bf1cbde
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 38 deletions.
2 changes: 1 addition & 1 deletion BAC0/core/io/Write.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def write()
# some debugging
_debug = 0
_LOG = ModuleLogger(globals())
WRITE_REGEX = r"(?P<address>\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b|(\b\d+:\d+\b)) (?P<objId>(@obj_)?[-\w:]*[: ]*\d*) (?P<propId>(@prop_)?\w*(-\w*)?)[ ]?(?P<value>-*\w*)?[ ]?(?P<indx>-|\d*)?[ ]?(?P<priority>(1[0-6]|[0-9]))?"
WRITE_REGEX = r"(?P<address>\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?::\d+)?\b|(\b\d+:\d+\b)) (?P<objId>(@obj_)?[-\w:]*[: ]*\d*) (?P<propId>(@prop_)?\w*(-\w*)?)[ ]?(?P<value>-*\w*)?[ ]?(?P<indx>-|\d*)?[ ]?(?P<priority>(1[0-6]|[0-9]))?"
write_pattern = re.compile(WRITE_REGEX)


Expand Down
98 changes: 61 additions & 37 deletions tests/manual_test_create_device.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import random

import BAC0
from BAC0.core.devices.local.factory import (
Expand All @@ -14,7 +15,11 @@
make_state_text,
multistate_input,
multistate_output,
multistate_value,
)
from BAC0.scripts.script_runner import run

bacnet = None


def add_points(qty_per_type, device):
Expand All @@ -36,17 +41,17 @@ def add_points(qty_per_type, device):
)

states = make_state_text(["Normal", "Alarm", "Super Emergency"])
# _new_objects = multistate_value(
# description="An Alarm Value",
# properties={"stateText": states},
# name="BIG-ALARM",
# is_commandable=False,
# )
_new_objects = multistate_value(
description="An Alarm Value",
properties={"stateText": states},
name="BIG-ALARM",
is_commandable=False,
)

# All others using default implementation
for _ in range(qty_per_type):
# _new_objects = analog_output(presentValue=89.9)
_new_objects = analog_value(presentValue=79.9)
_new_objects = analog_value(presentValue=79.9, is_commandable=True)
_new_objects = binary_input()
_new_objects = binary_output()
_new_objects = binary_value()
Expand All @@ -60,36 +65,55 @@ def add_points(qty_per_type, device):


async def main():
bacnet = BAC0.lite()

# We'll use 3 devices with our first instance
device_app = BAC0.lite(port=47809, deviceId=101, localObjName="App1")
device30_app = BAC0.lite(port=47810, deviceId=102, localObjName="App2")
device300_app = BAC0.lite(port=47811, deviceId=103, localObjName="App3")

add_points(2, device_app)
add_points(3, device30_app)
add_points(4, device300_app)

ip = device_app.localIPAddr.addrTuple[0]
boid = device_app.Boid

ip_30 = device30_app.localIPAddr.addrTuple[0]
boid_30 = device30_app.Boid

ip_300 = device300_app.localIPAddr.addrTuple[0]
boid_300 = device300_app.Boid

# Connect to test device using main network
test_device = BAC0.device("{}:47809".format(ip), boid, bacnet, poll=10)
test_device_30 = BAC0.device("{}:47810".format(ip_30), boid_30, bacnet, poll=0)
test_device_300 = BAC0.device("{}:47811".format(ip_300), boid_300, bacnet, poll=0)
while True:
await asyncio.sleep(0.01)
# print(test_device.points)
# (test_device_30.points)
# (test_device_300.points)
# We'll use 3 devices plus our main instance
async with BAC0.start(localObjName="bacnet") as bacnet:
async with BAC0.start(port=47809, localObjName="App1") as device_app:
async with BAC0.start(port=47810, localObjName="App2") as device30_app:
async with BAC0.start(port=47811, localObjName="App3") as device300_app:
add_points(2, device_app)
add_points(3, device30_app)
add_points(4, device300_app)

ip = device_app.localIPAddr.addrTuple[0]
boid = device_app.Boid

ip_30 = device30_app.localIPAddr.addrTuple[0]
boid_30 = device30_app.Boid

ip_300 = device300_app.localIPAddr.addrTuple[0]
boid_300 = device300_app.Boid

# Connect to test device using main network
test_device = await BAC0.device(
f"{ip}:47809", boid, bacnet, poll=10
)
test_device_30 = await BAC0.device(
f"{ip_30}:47810", boid_30, bacnet, poll=0
)
test_device_300 = await BAC0.device(
f"{ip_300}:47811", boid_300, bacnet, poll=0
)
bacnet._log.info("CTRL-C to exit")

while True:
await asyncio.sleep(2)
bacnet._log.info(f"{test_device['BIG-ALARM']}")
new_val_for_av1 = random.randint(1, 100)
bacnet._log.info(f"Setting AV-1 to {new_val_for_av1}")
# test_device_30['AV-1'] = new_val_for_av1
await test_device_30["AV-1"].write(new_val_for_av1, priority=16)
await asyncio.sleep(2)
bacnet._log.info(
f"Forcing a read on test_device_30/AV-1 : {await test_device_30['AV-1'].value}"
)
test_device_300["AV-1"] = new_val_for_av1
await asyncio.sleep(2)
bacnet._log.info(
f"Forcing a read on test_device_300/AV-1 : {await test_device_300['AV-1'].value}"
)
# (test_device_300.points)


if __name__ == "__main__":
asyncio.run(main())
run(main, bacnet)
# asyncio.run(main())

0 comments on commit bf1cbde

Please sign in to comment.