From 69bf248ace63e1be081c06f72671d01d27289007 Mon Sep 17 00:00:00 2001 From: John Toniutti Date: Fri, 27 Sep 2024 21:03:39 +0200 Subject: [PATCH] Implement My Open Web Net Introduction document - Add support for the various types of messages - Various improvements --- README.md | 2 +- pyown/messages/__init__.py | 3 + pyown/messages/ack.py | 8 +- pyown/messages/base.py | 2 +- pyown/messages/dimension.py | 125 +++++++++++++++++++++++++++++++ pyown/messages/nack.py | 8 +- pyown/messages/normal.py | 13 +++- pyown/messages/status.py | 48 ++++++++++++ pyown/tags/base.py | 9 +-- pyown/tags/who.py | 3 + tests/messages/test_ack.py | 1 + tests/messages/test_dimension.py | 91 ++++++++++++++++++++++ tests/messages/test_nack.py | 1 + tests/messages/test_normal.py | 31 ++++++++ tests/messages/test_status.py | 27 +++++++ tests/tags/test_tag.py | 26 +++---- 16 files changed, 368 insertions(+), 30 deletions(-) create mode 100644 pyown/messages/dimension.py create mode 100644 pyown/messages/status.py create mode 100644 tests/messages/test_dimension.py create mode 100644 tests/messages/test_normal.py create mode 100644 tests/messages/test_status.py diff --git a/README.md b/README.md index 5215f51..83adcdb 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ OpenWebNet is a home automation protocol developed by Bticino (now part of Legra lights, shutters, heating, etc. It was developed around 2000, and it's still used today in many installations. It does not implement any encryption, so it's not secure to use it over the internet. -Also many devices implement only the old password algorithm, which is easily bruteforceable. +Also, many devices implement only the old password algorithm, which is easily bruteforceable. So, when using OpenWebNet, be sure to use it only in a trusted network and taking security measures, like vlan separation. diff --git a/pyown/messages/__init__.py b/pyown/messages/__init__.py index 9883e4b..ede06c7 100644 --- a/pyown/messages/__init__.py +++ b/pyown/messages/__init__.py @@ -1,3 +1,6 @@ from .ack import ACK from .base import MessageType, BaseMessage, parse_message from .nack import NACK +from .status import * +from .dimension import * +from .normal import * diff --git a/pyown/messages/ack.py b/pyown/messages/ack.py index c76ce4c..b1107e5 100644 --- a/pyown/messages/ack.py +++ b/pyown/messages/ack.py @@ -10,8 +10,12 @@ class ACK(BaseMessage): - """Represent an ACK message""" - _type = MessageType.ACK + """ + Represent an ACK message + + Syntax: *#*1## + """ + _type: Final[MessageType] = MessageType.ACK _tags: Final[tuple[str]] = ("#", "1") _regex: Pattern[AnyStr] = re.compile(r"^\*#\*1##$") diff --git a/pyown/messages/base.py b/pyown/messages/base.py index fea6780..b8c0c35 100644 --- a/pyown/messages/base.py +++ b/pyown/messages/base.py @@ -20,8 +20,8 @@ class MessageType(StrEnum): NACK = "NACK" NORMAL = "NORMAL" STATUS_REQUEST = "STATUS REQUEST" - STATUS_RESPONSE = "STATUS RESPONSE" DIMENSION_REQUEST = "DIMENSION REQUEST" + DIMENSION_WRITING = "DIMENSION WRITING" DIMENSION_RESPONSE = "DIMENSION RESPONSE" GENERIC = "GENERIC" diff --git a/pyown/messages/dimension.py b/pyown/messages/dimension.py new file mode 100644 index 0000000..073f1e9 --- /dev/null +++ b/pyown/messages/dimension.py @@ -0,0 +1,125 @@ +import re +from typing import Self, Pattern, AnyStr, Final + +from .base import BaseMessage, MessageType +from ..tags import Who, What, Where, Dimension, Value + +__all__ = [ + "DimensionRequest", + "DimensionWriting", + "DimensionResponse", +] + + +class DimensionRequest(BaseMessage): + """ + Represent a dimension request message + + Syntax: *#who*where*dimension## + + This is sent by the client to the server + """ + _type: Final[MessageType] = MessageType.DIMENSION_REQUEST + _tags: Final[tuple[Who, Where, Dimension]] + + _regex: Pattern[AnyStr] = re.compile(r"^\*#[0-9#]+\*[0-9#]*\*[0-9]*##$") + + def __init__(self, tags: tuple[Who, Where, Dimension]): + self._tags = tags + + @property + def who(self) -> Who: + return self._tags[0] + + @property + def where(self) -> Where: + return self._tags[1] + + @property + def dimension(self) -> Dimension: + return self._tags[2] + + @property + def message(self) -> str: + return f"*#{self.who}*{self.where}*{self.dimension}##" + + @classmethod + def parse(cls, tags: list[str]) -> Self: + """Parse the tags of a message from the OpenWebNet bus.""" + + return cls( + tags=( + Who(tags[0].removeprefix("#")), + Where(tags[1]), + Dimension(tags[2]) + ) + ) + + +class DimensionWriting(BaseMessage): + """ + Represent a dimension writing message + + Syntax: *#who*where*#dimension*value1*value2*...*valueN## + + This is sent by the client to the server + """ + _type: MessageType = MessageType.DIMENSION_WRITING + _tags: Final[tuple[Who, Where, Dimension, Value, ...]] + + _regex: Pattern[AnyStr] = re.compile(r"^\*#[0-9#]+\*[0-9#]*\*#[0-9]*(?:\*[0-9#]*)*##$") + + def __init__(self, tags: tuple[Who, Where, Dimension, Value, ...]): + self._tags = tags + + @property + def who(self) -> Who: + return self._tags[0] + + @property + def where(self) -> Where: + return self._tags[1] + + @property + def dimension(self) -> Dimension: + return self._tags[2] + + @property + def values(self) -> tuple[Value]: + return self._tags[3:] + + @property + def message(self) -> str: + return f"*#{self.who}*{self.where}*#{self.dimension}*{'*'.join(self.values)}##" + + @classmethod + def parse(cls, tags: list[str]) -> Self: + """Parse the tags of a message from the OpenWebNet bus.""" + + values: list[Value] = [Value(t) for t in tags[3:]] + + # noinspection PyTypeChecker + return cls( + tags=( + Who(tags[0].removeprefix("#")), + Where(tags[1]), + Dimension(tags[2]).removeprefix("#"), + *values + ) + ) + + +class DimensionResponse(DimensionWriting, BaseMessage): + """ + Represent a dimension writing message + + Syntax: *#who*where*dimension*value1*value2*...*valueN## + + This is sent by the server to the client + """ + _type: Final[MessageType] = MessageType.DIMENSION_RESPONSE + _regex: Pattern[AnyStr] = re.compile(r"^\*#[0-9#]+\*[0-9#]*\*[0-9]*(?:\*[0-9#]*)*##$") + + @property + def message(self) -> str: + return f"*#{self.who}*{self.where}*{self.dimension}*{'*'.join(self.values)}##" diff --git a/pyown/messages/nack.py b/pyown/messages/nack.py index 58830d8..b002fff 100644 --- a/pyown/messages/nack.py +++ b/pyown/messages/nack.py @@ -10,8 +10,12 @@ class NACK(BaseMessage): - """Represent an NACK message""" - _type = MessageType.NACK + """ + Represent an NACK message + + Syntax: *#*0## + """ + _type: Final[MessageType] = MessageType.NACK _tags: Final[tuple[str]] = ("#", "0") _regex: Pattern[AnyStr] = re.compile(r"^\*#\*0##$") diff --git a/pyown/messages/normal.py b/pyown/messages/normal.py index c55de6e..abe8fd3 100644 --- a/pyown/messages/normal.py +++ b/pyown/messages/normal.py @@ -1,5 +1,5 @@ import re -from typing import Self, Pattern, AnyStr +from typing import Self, Pattern, AnyStr, Final from .base import BaseMessage, MessageType from ..tags import Who, What, Where @@ -10,9 +10,14 @@ class NormalMessage(BaseMessage): - """Represent an NACK message""" - _type = MessageType.NORMAL - _tags: tuple[Who, What, Where] + """ + Represent a Normal message + + Syntax: *who*what*where## + + """ + _type: Final[MessageType] = MessageType.NORMAL + _tags: Final[tuple[Who, What, Where]] _regex: Pattern[AnyStr] = re.compile(r"^\*[0-9#]+\*[0-9#]*\*[0-9#]*##$") diff --git a/pyown/messages/status.py b/pyown/messages/status.py new file mode 100644 index 0000000..8325d1c --- /dev/null +++ b/pyown/messages/status.py @@ -0,0 +1,48 @@ +import re +from typing import Self, Pattern, AnyStr, Final + +from .base import BaseMessage, MessageType +from ..tags import Who, What, Where + +__all__ = [ + "StatusRequest" +] + + +class StatusRequest(BaseMessage): + """ + Represent a status request message + + Syntax: *#who*where## + """ + _type: Final[MessageType] = MessageType.STATUS_REQUEST + _tags: Final[tuple[Who, Where]] + + _regex: Pattern[AnyStr] = re.compile(r"^\*#[0-9#]+\*[0-9#]*##$") + + def __init__(self, tags: tuple[Who, Where]): + self._tags = tags + + @property + def who(self) -> Who: + return self._tags[0] + + @property + def where(self) -> Where: + return self._tags[1] + + @property + def message(self) -> str: + return f"*#{self.who}*{self.where}##" + + @classmethod + def parse(cls, tags: list[str]) -> Self: + """Parse the tags of a message from the OpenWebNet bus.""" + + return cls( + tags=( + Who(tags[0].removeprefix("#")), + Where(tags[1]) + ) + ) + diff --git a/pyown/tags/base.py b/pyown/tags/base.py index 71294a9..7ad78f3 100644 --- a/pyown/tags/base.py +++ b/pyown/tags/base.py @@ -22,7 +22,7 @@ def __init__(self, string: str): super().__init__() @property - def value(self) -> int | None: + def tag(self) -> int | None: """Return the value of the tag without its parameters or prefix""" val = self.removeprefix("#") if len(val) > 0: @@ -35,15 +35,10 @@ def parameters(self) -> list[str] | None: """Return the parameters of the tag""" return None - @property - def tag(self) -> str: - """Return the tag""" - return self - class TagWithParameters(Tag): @property - def value(self) -> int | None: + def tag(self) -> int | None: """Return the value of the tag without its parameters or prefix""" val = self.split("#")[0] if len(val) > 0: diff --git a/pyown/tags/who.py b/pyown/tags/who.py index 43a2139..5c54255 100644 --- a/pyown/tags/who.py +++ b/pyown/tags/who.py @@ -28,6 +28,9 @@ class Who(Tag, enum.Enum): DEVICE_DIAGNOSTICS: str = "1013" ENERGY_DIAGNOSTICS: str = "1018" + def __str__(self) -> str: + return self.value + @property def name(self) -> str: return who_map[self] diff --git a/tests/messages/test_ack.py b/tests/messages/test_ack.py index be3178b..cff10b4 100644 --- a/tests/messages/test_ack.py +++ b/tests/messages/test_ack.py @@ -6,6 +6,7 @@ def test_nack(): ack = parse_message(message) + assert isinstance(ack, ACK) assert str(ack) == message assert ack.tags == ("#", "1") assert ack.type == MessageType.ACK diff --git a/tests/messages/test_dimension.py b/tests/messages/test_dimension.py new file mode 100644 index 0000000..3d6a4dc --- /dev/null +++ b/tests/messages/test_dimension.py @@ -0,0 +1,91 @@ +from pyown.messages import DimensionRequest, DimensionWriting, DimensionResponse, MessageType, parse_message + + +def test_dimension_request(): + msg = "*#13**1##" + + dimension_request = parse_message(msg) + + assert isinstance(dimension_request, DimensionRequest) + assert str(dimension_request) == msg + assert dimension_request.who == "13" + assert dimension_request.where == "" + assert dimension_request.dimension == "1" + assert dimension_request.type == MessageType.DIMENSION_REQUEST + + +def test_dimension_request_with_params(): + msg = "*#13*7#3*1##" # not a real message + + dimension_request = parse_message(msg) + + assert isinstance(dimension_request, DimensionRequest) + assert str(dimension_request) == msg + assert dimension_request.who == "13" + assert dimension_request.where == "7#3" + assert dimension_request.where.tag == 7 + assert dimension_request.where.parameters == ["3"] + assert dimension_request.dimension == "1" + assert dimension_request.type == MessageType.DIMENSION_REQUEST + + +def test_dimension_writing(): + msg = "*#13**#0*21*10*00*01##" + + dimension_writing = parse_message(msg) + + assert isinstance(dimension_writing, DimensionWriting) + assert str(dimension_writing) == msg + assert dimension_writing.who == "13" + assert dimension_writing.where == "" + assert dimension_writing.dimension == "0" + assert dimension_writing.values == ("21", "10", "00", "01") + assert dimension_writing.type == MessageType.DIMENSION_WRITING + + +# noinspection DuplicatedCode +def test_dimension_writing_with_params(): + msg = "*#13*7#3*#0*21*10*00*01##" + + dimension_writing = parse_message(msg) + + assert isinstance(dimension_writing, DimensionWriting) + assert str(dimension_writing) == msg + assert dimension_writing.who == "13" + assert dimension_writing.where == "7#3" + assert dimension_writing.where.tag == 7 + assert dimension_writing.where.parameters == ["3"] + assert dimension_writing.dimension == "0" + assert dimension_writing.values == ("21", "10", "00", "01") + assert dimension_writing.type == MessageType.DIMENSION_WRITING + + +def test_dimension_response(): + msg = "*#13**1*1*1*1*2012##" + + dimension_response = parse_message(msg) + + assert isinstance(dimension_response, DimensionResponse) + assert str(dimension_response) == msg + assert dimension_response.who == "13" + assert dimension_response.where == "" + assert dimension_response.dimension == "1" + assert dimension_response.values == ("1", "1", "1", "2012") + assert dimension_response.type == MessageType.DIMENSION_RESPONSE + + +# noinspection DuplicatedCode +def test_dimension_response_with_params(): + msg = "*#13*7#3*1*1*1*1*2012##" + + dimension_response = parse_message(msg) + + assert isinstance(dimension_response, DimensionResponse) + assert str(dimension_response) == msg + assert dimension_response.who == "13" + assert dimension_response.where == "7#3" + assert dimension_response.where.tag == 7 + assert dimension_response.where.parameters == ["3"] + assert dimension_response.dimension == "1" + assert dimension_response.values == ("1", "1", "1", "2012") + assert dimension_response.type == MessageType.DIMENSION_RESPONSE diff --git a/tests/messages/test_nack.py b/tests/messages/test_nack.py index f33db4c..1646883 100644 --- a/tests/messages/test_nack.py +++ b/tests/messages/test_nack.py @@ -6,6 +6,7 @@ def test_nack(): nack = parse_message(message) + assert isinstance(nack, NACK) assert str(nack) == message assert nack.tags == ("#", "0") assert nack.type == MessageType.NACK diff --git a/tests/messages/test_normal.py b/tests/messages/test_normal.py new file mode 100644 index 0000000..6eb8b7f --- /dev/null +++ b/tests/messages/test_normal.py @@ -0,0 +1,31 @@ +from pyown.messages import NormalMessage, MessageType, parse_message + + +def test_normal(): + msg = "*1*1*12##" + + normal = parse_message(msg) + + assert isinstance(normal, NormalMessage) + assert str(normal) == msg + assert normal.who == "1" + assert normal.what == "1" + assert normal.where == "12" + assert normal.type == MessageType.NORMAL + + +def test_normal_with_params(): + msg = "*2*1*41#4#2##" + + normal = parse_message(msg) + + assert isinstance(normal, NormalMessage) + assert str(normal) == msg + assert normal.who == "2" + assert normal.what == "1" + assert normal.where == "41#4#2" + assert normal.where.tag == 41 + assert normal.where.parameters == ["4", "2"] + assert normal.type == MessageType.NORMAL + + diff --git a/tests/messages/test_status.py b/tests/messages/test_status.py new file mode 100644 index 0000000..ed20c80 --- /dev/null +++ b/tests/messages/test_status.py @@ -0,0 +1,27 @@ +from pyown.messages import StatusRequest, MessageType, parse_message + + +def test_status_request(): + message = "*#1*12##" + + status_request = parse_message(message) + + assert isinstance(status_request, StatusRequest) + assert str(status_request) == message + assert status_request.who == "1" + assert status_request.where == "12" + assert status_request.type == MessageType.STATUS_REQUEST + + +def test_status_request_with_params(): + message = "*#1*41#4#2##" + + status_request = parse_message(message) + + assert isinstance(status_request, StatusRequest) + assert str(status_request) == message + assert status_request.who == "1" + assert status_request.where == "41#4#2" + assert status_request.where.tag == 41 + assert status_request.where.parameters == ["4", "2"] + assert status_request.type == MessageType.STATUS_REQUEST diff --git a/tests/tags/test_tag.py b/tests/tags/test_tag.py index 5109c8e..15d6bc8 100644 --- a/tests/tags/test_tag.py +++ b/tests/tags/test_tag.py @@ -7,25 +7,25 @@ def test_tag_with_params() -> None: # Check if value is correctly parsed tag = TagWithParameters("123") - assert tag.value == 123 + assert tag.tag == 123 assert tag.parameters == [] - assert tag.tag == "123" + assert tag == "123" # Check if parameters are correctly parsed tag = TagWithParameters("123#512#123#1#22#1213") - assert tag.value == 123 + assert tag.tag == 123 assert tag.parameters == ["512", "123", "1", "22", "1213"] - assert tag.tag == "123#512#123#1#22#1213" + assert tag == "123#512#123#1#22#1213" # Check if tag is parsed when value is missing tag = TagWithParameters("#512#123#1#22#1213") - assert tag.value is None + assert tag.tag is None # Check the empty tag tag = TagWithParameters("") - assert tag.value is None + assert tag.tag is None assert tag.parameters == [] - assert tag.tag == "" + assert tag == "" # Check if invalid characters raise an exception with pytest.raises(InvalidTag): @@ -40,21 +40,21 @@ def test_tag_with_params() -> None: def test_tag() -> None: tag = Tag("123") - assert tag.value == 123 + assert tag.tag == 123 assert tag.parameters is None - assert tag.tag == "123" + assert tag == "123" # In some type of messages the tag is prefixed with a hash even if it doesn't allow for parameters tag = Tag("#123") - assert tag.value == 123 + assert tag.tag == 123 assert tag.parameters is None - assert tag.tag == "#123" + assert tag == "#123" # Check the empty tag tag = Tag("") - assert tag.value is None + assert tag.tag is None assert tag.parameters is None - assert tag.tag == "" + assert tag == "" # Check if invalid characters raise an exception with pytest.raises(InvalidTag):