diff --git a/pyown/__main__.py b/pyown/__main__.py index 59dd73a..f8de86f 100644 --- a/pyown/__main__.py +++ b/pyown/__main__.py @@ -1,16 +1,7 @@ import asyncio import argparse -from tqdm import tqdm - from .client import OWNClient -from .message import * -from .constants.who import * - - -def build_active_power_request(device: str) -> RawMessage: - """Build the active power request message.""" - return RawMessage(["#" + ENERGY_MANAGEMENT, device, "113"]) async def main(host: str, port: int, password: int) -> None: @@ -18,34 +9,8 @@ async def main(host: str, port: int, password: int) -> None: client = OWNClient(host, port, password) await client.connect() - print(f"Connected to {host}:{port}") - # Scan the OpenWebNet bus for energy management devices - while True: - await asyncio.sleep(0.5) - print() - try: - for i in range(1, 4): - who = f"5{i}" - - message = build_active_power_request(who) - await client.send(message) - response = await client.recv() - - if response == ACK: - continue - - if await client.recv() != ACK: - continue - - device = int(response.tags[-3]) - power = int(response.tags[-1]) - - print(f"Device: {device}, Power: {power}W") - except KeyboardInterrupt: - break - await client.close() print("Disconnected") diff --git a/pyown/auth/open.py b/pyown/auth/open.py index b317aff..e6bb6c8 100644 --- a/pyown/auth/open.py +++ b/pyown/auth/open.py @@ -1,20 +1,27 @@ -def ownCalcPass(password: str | int, nonce: str, test: bool = False) -> str: - start = True +def ownCalcPass(password: str | int, nonce: str) -> str: + """ + Encode the password using the OPEN algorithm. + Source: https://rosettacode.org/wiki/OpenWebNet_password#Python + + Parameters: + password (str | int): The password to encode, must be composed of only digits. + nonce (str): The nonce received from the gateway. + + Returns: + str: The encoded password. + """ + start = True num1 = 0 num2 = 0 if isinstance(password, str): password = int(password) - - if test: - print("password: %08x" % (password)) - for c in nonce : + + for c in nonce: if c != "0": if start: num2 = password start = False - if test: - print("c: %s num1: %08x num2: %08x" % (c, num1, num2)) if c == '1': num1 = (num2 & 0xFFFFFF80) >> 7 num2 = num2 << 25 @@ -34,22 +41,20 @@ def ownCalcPass(password: str | int, nonce: str, test: bool = False) -> str: num1 = num2 << 12 num2 = num2 >> 20 elif c == '7': - num1 = num2 & 0x0000FF00 | (( num2 & 0x000000FF ) << 24 ) | (( num2 & 0x00FF0000 ) >> 16 ) - num2 = ( num2 & 0xFF000000 ) >> 8 + num1 = num2 & 0x0000FF00 | ((num2 & 0x000000FF) << 24) | ((num2 & 0x00FF0000) >> 16) + num2 = (num2 & 0xFF000000) >> 8 elif c == '8': - num1 = (num2 & 0x0000FFFF) << 16 | ( num2 >> 24 ) + num1 = (num2 & 0x0000FFFF) << 16 | (num2 >> 24) num2 = (num2 & 0x00FF0000) >> 8 elif c == '9': num1 = ~num2 - else : + else: num1 = num2 num1 &= 0xFFFFFFFF num2 &= 0xFFFFFFFF - if (c not in "09"): + if c not in "09": num1 |= num2 - if test: - print(" num1: %08x num2: %08x" % (num1, num2)) num2 = num1 return str(num1) diff --git a/pyown/client.py b/pyown/client.py index e0b62c5..aa4fd7c 100644 --- a/pyown/client.py +++ b/pyown/client.py @@ -1,4 +1,5 @@ import asyncio +from asyncio import StreamReader, StreamWriter from typing import Final, Literal @@ -17,7 +18,7 @@ class OWNClient: - def __init__(self, host: str, port: str, password: str): + def __init__(self, host: str, port: str | int, password: str | int): """ Initialize the OpenWebNet client. :param host: the IP address of the openwebnet gateway. @@ -25,8 +26,11 @@ def __init__(self, host: str, port: str, password: str): :param password: the password used to connect to the openwebnet gateway. """ self.host = host - self.port = port - self.password = password + self.port = port if isinstance(port, int) else int(port) + self.password = password if isinstance(password, int) else int(password) + + self.reader: None | StreamReader = None + self.writer: None | StreamWriter = None async def connect(self, session_type: Literal["9", "1"] = COMMAND_SESSION): self.reader, self.writer = await asyncio.open_connection( @@ -40,10 +44,6 @@ async def connect(self, session_type: Literal["9", "1"] = COMMAND_SESSION): raise ConnectionError(f"Unexpected response: {msg}") # Send the session type await self.send(RawMessage(tags=["99", session_type])) - - # Authenticate - - async def _open_auth(self, msg: OWNMessage): """ diff --git a/pyown/constants/__init__.py b/pyown/constants/__init__.py index e69de29..72320f2 100644 --- a/pyown/constants/__init__.py +++ b/pyown/constants/__init__.py @@ -0,0 +1 @@ +from .who import WHO diff --git a/pyown/constants/who.py b/pyown/constants/who.py index d6446c4..4d31c40 100644 --- a/pyown/constants/who.py +++ b/pyown/constants/who.py @@ -1,83 +1,57 @@ -from typing import Final, Union +from enum import StrEnum __all__ = [ - "SCENE", - "LIGHTING", - "AUTOMATION", - "LOAD_CONTROL", - "THERMOREGULATION", - "BURGLAR_ALARM", - "VIDEO_DOOR_ENTRY", - "GATEWAY", - "CEN_1", - "SOUND_DIFFUSION_1", - "MH200N_SCENE", - "ENERGY_MANAGEMENT", - "SOUND_DIFFUSION_2", - "CEN_2", - "AUTOMATION_DIAGNOSTICS", - "THERMOREGULATION_DIAGNOSTICS", - "DEVICE_DIAGNOSTICS", "WHO", "WHO_MAP", ] -SCENE: Final = "0" -LIGHTING: Final = "1" -AUTOMATION: Final = "2" -LOAD_CONTROL: Final = "3" -THERMOREGULATION: Final = "4" -BURGLAR_ALARM: Final = "5" -VIDEO_DOOR_ENTRY: Final = "6" -GATEWAY: Final = "13" -CEN_1: Final = "15" -SOUND_DIFFUSION_1: Final = "16" -MH200N_SCENE: Final = "17" -ENERGY_MANAGEMENT: Final = "18" -SOUND_DIFFUSION_2: Final = "22" -CEN_2: Final = "25" -AUTOMATION_DIAGNOSTICS: Final = "1001" -THERMOREGULATION_DIAGNOSTICS: Final = "1004" -DEVICE_DIAGNOSTICS: Final = "1013" -ENERGY_DIAGNOSTICS: Final = "1018" -WHO = Union[ - SCENE, - LIGHTING, - AUTOMATION, - LOAD_CONTROL, - THERMOREGULATION, - BURGLAR_ALARM, - VIDEO_DOOR_ENTRY, - GATEWAY, - CEN_1, - SOUND_DIFFUSION_1, - MH200N_SCENE, - ENERGY_MANAGEMENT, - SOUND_DIFFUSION_2, - CEN_2, - AUTOMATION_DIAGNOSTICS, - THERMOREGULATION_DIAGNOSTICS, - DEVICE_DIAGNOSTICS, -] +class WHO(StrEnum): + SCENE: str = "0" + LIGHTING: str = "1" + AUTOMATION: str = "2" + LOAD_CONTROL: str = "3" + THERMOREGULATION: str = "4" + BURGLAR_ALARM: str = "5" + VIDEO_DOOR_ENTRY: str = "6" + GATEWAY: str = "13" + CEN_1: str = "15" + SOUND_DIFFUSION_1: str = "16" + MH200N_SCENE: str = "17" + ENERGY_MANAGEMENT: str = "18" + SOUND_DIFFUSION_2: str = "22" + CEN_2: str = "25" + AUTOMATION_DIAGNOSTICS: str = "1001" + THERMOREGULATION_DIAGNOSTICS: str = "1004" + DEVICE_DIAGNOSTICS: str = "1013" + ENERGY_DIAGNOSTICS: str = "1018" + + @property + def name(self) -> str: + return WHO_MAP[self] + + @property + def number(self) -> int: + return int(self) + WHO_MAP = { - SCENE: "Scene", - LIGHTING: "Lighting", - AUTOMATION: "Automation", - LOAD_CONTROL: "Load control", - THERMOREGULATION: "Thermoregulation", - BURGLAR_ALARM: "Burglar alarm", - VIDEO_DOOR_ENTRY: "Video door entry", - GATEWAY: "Gateway management", - CEN_1: "CEN", - SOUND_DIFFUSION_1: "Sound diffusion 1", - MH200N_SCENE: "MH200N Scene", - ENERGY_MANAGEMENT: "Energy management", - SOUND_DIFFUSION_2: "Sound diffusion 2", - CEN_2: "CEN plus / scenarios plus / dry contacts", - AUTOMATION_DIAGNOSTICS: "Automation diagnostics", - THERMOREGULATION_DIAGNOSTICS: "Thermoregulation diagnostics", - DEVICE_DIAGNOSTICS: "Device diagnostics", + WHO.SCENE: "Scene", + WHO.LIGHTING: "Lighting", + WHO.AUTOMATION: "Automation", + WHO.LOAD_CONTROL: "Load control", + WHO.THERMOREGULATION: "Thermoregulation", + WHO.BURGLAR_ALARM: "Burglar alarm", + WHO.VIDEO_DOOR_ENTRY: "Video door entry", + WHO.GATEWAY: "Gateway management", + WHO.CEN_1: "CEN", + WHO.SOUND_DIFFUSION_1: "Sound diffusion 1", + WHO.MH200N_SCENE: "MH200N Scene", + WHO.ENERGY_MANAGEMENT: "Energy management", + WHO.SOUND_DIFFUSION_2: "Sound diffusion 2", + WHO.CEN_2: "CEN plus / scenarios plus / dry contacts", + WHO.AUTOMATION_DIAGNOSTICS: "Automation diagnostics", + WHO.THERMOREGULATION_DIAGNOSTICS: "Thermoregulation diagnostics", + WHO.DEVICE_DIAGNOSTICS: "Device diagnostics", }