Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ZHA support for Bosch Twinguard and siren install QR codes #107460

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions homeassistant/components/zha/core/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def error(self, msg, *args, **kwargs):
return self.log(logging.ERROR, msg, *args, **kwargs)


def convert_install_code(value: str) -> bytes:
def convert_install_code(value: str) -> zigpy.types.KeyData:
"""Convert string to install code bytes and validate length."""

try:
Expand All @@ -329,10 +329,11 @@ def convert_install_code(value: str) -> bytes:
if len(code) != 18: # 16 byte code + 2 crc bytes
raise vol.Invalid("invalid length of the install code")

if zigpy.util.convert_install_code(code) is None:
link_key = zigpy.util.convert_install_code(code)
if link_key is None:
raise vol.Invalid("invalid install code")

return code
return link_key


QR_CODES = (
Expand Down Expand Up @@ -360,13 +361,13 @@ def convert_install_code(value: str) -> bytes:
[0-9a-fA-F]{34}
([0-9a-fA-F]{16}) # IEEE address
DLK
([0-9a-fA-F]{36}) # install code
([0-9a-fA-F]{36}|[0-9a-fA-F]{32}) # install code / link key
$
""",
)


def qr_to_install_code(qr_code: str) -> tuple[zigpy.types.EUI64, bytes]:
def qr_to_install_code(qr_code: str) -> tuple[zigpy.types.EUI64, zigpy.types.KeyData]:
"""Try to parse the QR code.

if successful, return a tuple of a EUI64 address and install code.
Expand All @@ -379,10 +380,16 @@ def qr_to_install_code(qr_code: str) -> tuple[zigpy.types.EUI64, bytes]:

ieee_hex = binascii.unhexlify(match[1])
ieee = zigpy.types.EUI64(ieee_hex[::-1])

# Bosch supplies (A) device specific link key (DSLK) or (A) install code + crc
if "RB01SG" in code_pattern and len(match[2]) == 32:
link_key_hex = binascii.unhexlify(match[2])
link_key = zigpy.types.KeyData(link_key_hex)
return ieee, link_key
install_code = match[2]
# install_code sanity check
install_code = convert_install_code(install_code)
return ieee, install_code
link_key = convert_install_code(install_code)
return ieee, link_key

raise vol.Invalid(f"couldn't convert qr code: {qr_code}")

Expand Down
38 changes: 19 additions & 19 deletions homeassistant/components/zha/websocket_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import zigpy.backups
from zigpy.config import CONF_DEVICE
from zigpy.config.validators import cv_boolean
from zigpy.types.named import EUI64
from zigpy.types.named import EUI64, KeyData
from zigpy.zcl.clusters.security import IasAce
import zigpy.zdo.types as zdo_types

Expand Down Expand Up @@ -328,19 +328,19 @@ def async_cleanup() -> None:
connection.subscriptions[msg["id"]] = async_cleanup
zha_gateway.async_enable_debug_mode()
src_ieee: EUI64
code: bytes
link_key: KeyData
if ATTR_SOURCE_IEEE in msg:
src_ieee = msg[ATTR_SOURCE_IEEE]
code = msg[ATTR_INSTALL_CODE]
_LOGGER.debug("Allowing join for %s device with install code", src_ieee)
await zha_gateway.application_controller.permit_with_key(
time_s=duration, node=src_ieee, code=code
link_key = msg[ATTR_INSTALL_CODE]
_LOGGER.debug("Allowing join for %s device with link key", src_ieee)
await zha_gateway.application_controller.permit_with_link_key(
time_s=duration, node=src_ieee, link_key=link_key
)
elif ATTR_QR_CODE in msg:
src_ieee, code = msg[ATTR_QR_CODE]
_LOGGER.debug("Allowing join for %s device with install code", src_ieee)
await zha_gateway.application_controller.permit_with_key(
time_s=duration, node=src_ieee, code=code
src_ieee, link_key = msg[ATTR_QR_CODE]
_LOGGER.debug("Allowing join for %s device with link key", src_ieee)
await zha_gateway.application_controller.permit_with_link_key(
time_s=duration, node=src_ieee, link_key=link_key
)
else:
await zha_gateway.application_controller.permit(time_s=duration, node=ieee)
Expand Down Expand Up @@ -1249,21 +1249,21 @@ async def permit(service: ServiceCall) -> None:
duration: int = service.data[ATTR_DURATION]
ieee: EUI64 | None = service.data.get(ATTR_IEEE)
src_ieee: EUI64
code: bytes
link_key: KeyData
if ATTR_SOURCE_IEEE in service.data:
src_ieee = service.data[ATTR_SOURCE_IEEE]
code = service.data[ATTR_INSTALL_CODE]
_LOGGER.info("Allowing join for %s device with install code", src_ieee)
await application_controller.permit_with_key(
time_s=duration, node=src_ieee, code=code
link_key = service.data[ATTR_INSTALL_CODE]
_LOGGER.info("Allowing join for %s device with link key", src_ieee)
await application_controller.permit_with_link_key(
time_s=duration, node=src_ieee, link_key=link_key
)
return

if ATTR_QR_CODE in service.data:
src_ieee, code = service.data[ATTR_QR_CODE]
_LOGGER.info("Allowing join for %s device with install code", src_ieee)
await application_controller.permit_with_key(
time_s=duration, node=src_ieee, code=code
src_ieee, link_key = service.data[ATTR_QR_CODE]
_LOGGER.info("Allowing join for %s device with link key", src_ieee)
await application_controller.permit_with_link_key(
time_s=duration, node=src_ieee, link_key=link_key
)
return

Expand Down
66 changes: 45 additions & 21 deletions tests/components/zha/test_websocket_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import zigpy.profiles.zha
import zigpy.types
from zigpy.types.named import EUI64
import zigpy.util
import zigpy.zcl.clusters.general as general
from zigpy.zcl.clusters.general import Groups
import zigpy.zcl.clusters.security as security
Expand Down Expand Up @@ -528,7 +529,7 @@ async def test_permit_ha12(
assert app_controller.permit.await_count == 1
assert app_controller.permit.await_args[1]["time_s"] == duration
assert app_controller.permit.await_args[1]["node"] == node
assert app_controller.permit_with_key.call_count == 0
assert app_controller.permit_with_link_key.call_count == 0


IC_TEST_PARAMS = (
Expand All @@ -538,15 +539,19 @@ async def test_permit_ha12(
ATTR_INSTALL_CODE: "5279-7BF4-A508-4DAA-8E17-12B6-1741-CA02-4051",
},
zigpy.types.EUI64.convert(IEEE_SWITCH_DEVICE),
unhexlify("52797BF4A5084DAA8E1712B61741CA024051"),
zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
),
(
{
ATTR_SOURCE_IEEE: IEEE_SWITCH_DEVICE,
ATTR_INSTALL_CODE: "52797BF4A5084DAA8E1712B61741CA024051",
},
zigpy.types.EUI64.convert(IEEE_SWITCH_DEVICE),
unhexlify("52797BF4A5084DAA8E1712B61741CA024051"),
zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
),
)

Expand All @@ -566,10 +571,10 @@ async def test_permit_with_install_code(
DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id)
)
assert app_controller.permit.await_count == 0
assert app_controller.permit_with_key.call_count == 1
assert app_controller.permit_with_key.await_args[1]["time_s"] == 60
assert app_controller.permit_with_key.await_args[1]["node"] == src_ieee
assert app_controller.permit_with_key.await_args[1]["code"] == code
assert app_controller.permit_with_link_key.call_count == 1
assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60
assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee
assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code


IC_FAIL_PARAMS = (
Expand Down Expand Up @@ -621,19 +626,23 @@ async def test_permit_with_install_code_fail(
DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id)
)
assert app_controller.permit.await_count == 0
assert app_controller.permit_with_key.call_count == 0
assert app_controller.permit_with_link_key.call_count == 0


IC_QR_CODE_TEST_PARAMS = (
(
{ATTR_QR_CODE: "000D6FFFFED4163B|52797BF4A5084DAA8E1712B61741CA024051"},
zigpy.types.EUI64.convert("00:0D:6F:FF:FE:D4:16:3B"),
unhexlify("52797BF4A5084DAA8E1712B61741CA024051"),
zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
),
(
{ATTR_QR_CODE: "Z:000D6FFFFED4163B$I:52797BF4A5084DAA8E1712B61741CA024051"},
zigpy.types.EUI64.convert("00:0D:6F:FF:FE:D4:16:3B"),
unhexlify("52797BF4A5084DAA8E1712B61741CA024051"),
zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
),
(
{
Expand All @@ -643,7 +652,22 @@ async def test_permit_with_install_code_fail(
)
},
zigpy.types.EUI64.convert("04:CF:8C:DF:3C:3C:3C:3C"),
unhexlify("52797BF4A5084DAA8E1712B61741CA024051"),
zigpy.util.convert_install_code(
unhexlify("52797BF4A5084DAA8E1712B61741CA024051")
),
),
(
{
ATTR_QR_CODE: (
"RB01SG"
"0D836591B3CC0010000000000000000000"
"000D6F0019107BB1"
"DLK"
"E4636CB6C41617C3E08F7325FFBFE1F9"
)
},
zigpy.types.EUI64.convert("00:0D:6F:00:19:10:7B:B1"),
zigpy.types.KeyData.convert("E4:63:6C:B6:C4:16:17:C3:E0:8F:73:25:FF:BF:E1:F9"),
),
)

Expand All @@ -663,10 +687,10 @@ async def test_permit_with_qr_code(
DOMAIN, SERVICE_PERMIT, params, True, Context(user_id=hass_admin_user.id)
)
assert app_controller.permit.await_count == 0
assert app_controller.permit_with_key.call_count == 1
assert app_controller.permit_with_key.await_args[1]["time_s"] == 60
assert app_controller.permit_with_key.await_args[1]["node"] == src_ieee
assert app_controller.permit_with_key.await_args[1]["code"] == code
assert app_controller.permit_with_link_key.call_count == 1
assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60
assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee
assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code


@pytest.mark.parametrize(("params", "src_ieee", "code"), IC_QR_CODE_TEST_PARAMS)
Expand All @@ -685,10 +709,10 @@ async def test_ws_permit_with_qr_code(
assert msg["success"]

assert app_controller.permit.await_count == 0
assert app_controller.permit_with_key.call_count == 1
assert app_controller.permit_with_key.await_args[1]["time_s"] == 60
assert app_controller.permit_with_key.await_args[1]["node"] == src_ieee
assert app_controller.permit_with_key.await_args[1]["code"] == code
assert app_controller.permit_with_link_key.call_count == 1
assert app_controller.permit_with_link_key.await_args[1]["time_s"] == 60
assert app_controller.permit_with_link_key.await_args[1]["node"] == src_ieee
assert app_controller.permit_with_link_key.await_args[1]["link_key"] == code


@pytest.mark.parametrize("params", IC_FAIL_PARAMS)
Expand All @@ -707,7 +731,7 @@ async def test_ws_permit_with_install_code_fail(
assert msg["success"] is False

assert app_controller.permit.await_count == 0
assert app_controller.permit_with_key.call_count == 0
assert app_controller.permit_with_link_key.call_count == 0


@pytest.mark.parametrize(
Expand Down Expand Up @@ -744,7 +768,7 @@ async def test_ws_permit_ha12(
assert app_controller.permit.await_count == 1
assert app_controller.permit.await_args[1]["time_s"] == duration
assert app_controller.permit.await_args[1]["node"] == node
assert app_controller.permit_with_key.call_count == 0
assert app_controller.permit_with_link_key.call_count == 0


async def test_get_network_settings(
Expand Down
Loading