Skip to content

Commit

Permalink
Cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
jotonedev committed Sep 17, 2024
1 parent 767ae16 commit 7323fc9
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 129 deletions.
35 changes: 0 additions & 35 deletions pyown/__main__.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,16 @@
import asyncio
import argparse

from tqdm import tqdm

from .client import OWNClient
from .message import *
from .constants.who import *


def build_active_power_request(device: str) -> RawMessage:
"""Build the active power request message."""
return RawMessage(["#" + ENERGY_MANAGEMENT, device, "113"])


async def main(host: str, port: int, password: int) -> None:
"""Run the OpenWebNet client."""
client = OWNClient(host, port, password)
await client.connect()


print(f"Connected to {host}:{port}")

# Scan the OpenWebNet bus for energy management devices
while True:
await asyncio.sleep(0.5)
print()
try:
for i in range(1, 4):
who = f"5{i}"

message = build_active_power_request(who)
await client.send(message)
response = await client.recv()

if response == ACK:
continue

if await client.recv() != ACK:
continue

device = int(response.tags[-3])
power = int(response.tags[-1])

print(f"Device: {device}, Power: {power}W")
except KeyboardInterrupt:
break

await client.close()
print("Disconnected")

Expand Down
35 changes: 20 additions & 15 deletions pyown/auth/open.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
def ownCalcPass(password: str | int, nonce: str, test: bool = False) -> str:
start = True
def ownCalcPass(password: str | int, nonce: str) -> str:
"""
Encode the password using the OPEN algorithm.
Source: https://rosettacode.org/wiki/OpenWebNet_password#Python
Parameters:
password (str | int): The password to encode, must be composed of only digits.
nonce (str): The nonce received from the gateway.
Returns:
str: The encoded password.
"""
start = True
num1 = 0
num2 = 0

if isinstance(password, str):
password = int(password)

if test:
print("password: %08x" % (password))
for c in nonce :

for c in nonce:
if c != "0":
if start:
num2 = password
start = False
if test:
print("c: %s num1: %08x num2: %08x" % (c, num1, num2))
if c == '1':
num1 = (num2 & 0xFFFFFF80) >> 7
num2 = num2 << 25
Expand All @@ -34,22 +41,20 @@ def ownCalcPass(password: str | int, nonce: str, test: bool = False) -> str:
num1 = num2 << 12
num2 = num2 >> 20
elif c == '7':
num1 = num2 & 0x0000FF00 | (( num2 & 0x000000FF ) << 24 ) | (( num2 & 0x00FF0000 ) >> 16 )
num2 = ( num2 & 0xFF000000 ) >> 8
num1 = num2 & 0x0000FF00 | ((num2 & 0x000000FF) << 24) | ((num2 & 0x00FF0000) >> 16)
num2 = (num2 & 0xFF000000) >> 8
elif c == '8':
num1 = (num2 & 0x0000FFFF) << 16 | ( num2 >> 24 )
num1 = (num2 & 0x0000FFFF) << 16 | (num2 >> 24)
num2 = (num2 & 0x00FF0000) >> 8
elif c == '9':
num1 = ~num2
else :
else:
num1 = num2

num1 &= 0xFFFFFFFF
num2 &= 0xFFFFFFFF
if (c not in "09"):
if c not in "09":
num1 |= num2
if test:
print(" num1: %08x num2: %08x" % (num1, num2))
num2 = num1

return str(num1)
14 changes: 7 additions & 7 deletions pyown/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from asyncio import StreamReader, StreamWriter

from typing import Final, Literal

Expand All @@ -17,16 +18,19 @@


class OWNClient:
def __init__(self, host: str, port: str, password: str):
def __init__(self, host: str, port: str | int, password: str | int):
"""
Initialize the OpenWebNet client.
:param host: the IP address of the openwebnet gateway.
:param port: the port of the openwebnet gateway.
:param password: the password used to connect to the openwebnet gateway.
"""
self.host = host
self.port = port
self.password = password
self.port = port if isinstance(port, int) else int(port)
self.password = password if isinstance(password, int) else int(password)

self.reader: None | StreamReader = None
self.writer: None | StreamWriter = None

async def connect(self, session_type: Literal["9", "1"] = COMMAND_SESSION):
self.reader, self.writer = await asyncio.open_connection(
Expand All @@ -40,10 +44,6 @@ async def connect(self, session_type: Literal["9", "1"] = COMMAND_SESSION):
raise ConnectionError(f"Unexpected response: {msg}")
# Send the session type
await self.send(RawMessage(tags=["99", session_type]))

# Authenticate



async def _open_auth(self, msg: OWNMessage):
"""
Expand Down
1 change: 1 addition & 0 deletions pyown/constants/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .who import WHO
118 changes: 46 additions & 72 deletions pyown/constants/who.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,57 @@
from typing import Final, Union
from enum import StrEnum

__all__ = [
"SCENE",
"LIGHTING",
"AUTOMATION",
"LOAD_CONTROL",
"THERMOREGULATION",
"BURGLAR_ALARM",
"VIDEO_DOOR_ENTRY",
"GATEWAY",
"CEN_1",
"SOUND_DIFFUSION_1",
"MH200N_SCENE",
"ENERGY_MANAGEMENT",
"SOUND_DIFFUSION_2",
"CEN_2",
"AUTOMATION_DIAGNOSTICS",
"THERMOREGULATION_DIAGNOSTICS",
"DEVICE_DIAGNOSTICS",
"WHO",
"WHO_MAP",
]

SCENE: Final = "0"
LIGHTING: Final = "1"
AUTOMATION: Final = "2"
LOAD_CONTROL: Final = "3"
THERMOREGULATION: Final = "4"
BURGLAR_ALARM: Final = "5"
VIDEO_DOOR_ENTRY: Final = "6"
GATEWAY: Final = "13"
CEN_1: Final = "15"
SOUND_DIFFUSION_1: Final = "16"
MH200N_SCENE: Final = "17"
ENERGY_MANAGEMENT: Final = "18"
SOUND_DIFFUSION_2: Final = "22"
CEN_2: Final = "25"
AUTOMATION_DIAGNOSTICS: Final = "1001"
THERMOREGULATION_DIAGNOSTICS: Final = "1004"
DEVICE_DIAGNOSTICS: Final = "1013"
ENERGY_DIAGNOSTICS: Final = "1018"

WHO = Union[
SCENE,
LIGHTING,
AUTOMATION,
LOAD_CONTROL,
THERMOREGULATION,
BURGLAR_ALARM,
VIDEO_DOOR_ENTRY,
GATEWAY,
CEN_1,
SOUND_DIFFUSION_1,
MH200N_SCENE,
ENERGY_MANAGEMENT,
SOUND_DIFFUSION_2,
CEN_2,
AUTOMATION_DIAGNOSTICS,
THERMOREGULATION_DIAGNOSTICS,
DEVICE_DIAGNOSTICS,
]
class WHO(StrEnum):
SCENE: str = "0"
LIGHTING: str = "1"
AUTOMATION: str = "2"
LOAD_CONTROL: str = "3"
THERMOREGULATION: str = "4"
BURGLAR_ALARM: str = "5"
VIDEO_DOOR_ENTRY: str = "6"
GATEWAY: str = "13"
CEN_1: str = "15"
SOUND_DIFFUSION_1: str = "16"
MH200N_SCENE: str = "17"
ENERGY_MANAGEMENT: str = "18"
SOUND_DIFFUSION_2: str = "22"
CEN_2: str = "25"
AUTOMATION_DIAGNOSTICS: str = "1001"
THERMOREGULATION_DIAGNOSTICS: str = "1004"
DEVICE_DIAGNOSTICS: str = "1013"
ENERGY_DIAGNOSTICS: str = "1018"

@property
def name(self) -> str:
return WHO_MAP[self]

@property
def number(self) -> int:
return int(self)


WHO_MAP = {
SCENE: "Scene",
LIGHTING: "Lighting",
AUTOMATION: "Automation",
LOAD_CONTROL: "Load control",
THERMOREGULATION: "Thermoregulation",
BURGLAR_ALARM: "Burglar alarm",
VIDEO_DOOR_ENTRY: "Video door entry",
GATEWAY: "Gateway management",
CEN_1: "CEN",
SOUND_DIFFUSION_1: "Sound diffusion 1",
MH200N_SCENE: "MH200N Scene",
ENERGY_MANAGEMENT: "Energy management",
SOUND_DIFFUSION_2: "Sound diffusion 2",
CEN_2: "CEN plus / scenarios plus / dry contacts",
AUTOMATION_DIAGNOSTICS: "Automation diagnostics",
THERMOREGULATION_DIAGNOSTICS: "Thermoregulation diagnostics",
DEVICE_DIAGNOSTICS: "Device diagnostics",
WHO.SCENE: "Scene",
WHO.LIGHTING: "Lighting",
WHO.AUTOMATION: "Automation",
WHO.LOAD_CONTROL: "Load control",
WHO.THERMOREGULATION: "Thermoregulation",
WHO.BURGLAR_ALARM: "Burglar alarm",
WHO.VIDEO_DOOR_ENTRY: "Video door entry",
WHO.GATEWAY: "Gateway management",
WHO.CEN_1: "CEN",
WHO.SOUND_DIFFUSION_1: "Sound diffusion 1",
WHO.MH200N_SCENE: "MH200N Scene",
WHO.ENERGY_MANAGEMENT: "Energy management",
WHO.SOUND_DIFFUSION_2: "Sound diffusion 2",
WHO.CEN_2: "CEN plus / scenarios plus / dry contacts",
WHO.AUTOMATION_DIAGNOSTICS: "Automation diagnostics",
WHO.THERMOREGULATION_DIAGNOSTICS: "Thermoregulation diagnostics",
WHO.DEVICE_DIAGNOSTICS: "Device diagnostics",
}

0 comments on commit 7323fc9

Please sign in to comment.