diff --git a/README.md b/README.md index a6967c4..3ed0afe 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # python-eq3bt -Python library and a command line tool for EQ3 Bluetooth smart thermostats, uses bluepy for BTLE communication. +Python library and a command line tool for EQ3 Bluetooth smart thermostats, uses bluepy or gattlib for BTLE communication. This library is a simplified version of bluepy_devices from Markus Peter (https://github.com/bimbar/bluepy_devices.git) with support for more features and better device handling. diff --git a/eq3bt/eq3btsmart.py b/eq3bt/eq3btsmart.py index 4eb19de..5ed2ffa 100644 --- a/eq3bt/eq3btsmart.py +++ b/eq3bt/eq3btsmart.py @@ -15,7 +15,6 @@ from construct import Byte -from .connection import BTLEConnection from .structures import AwayDataAdapter, DeviceId, Schedule, Status _LOGGER = logging.getLogger(__name__) @@ -72,7 +71,7 @@ class TemperatureException(Exception): class Thermostat: """Representation of a EQ3 Bluetooth Smart thermostat.""" - def __init__(self, _mac, _iface=None, connection_cls=BTLEConnection): + def __init__(self, _mac, _iface=None, connection_cls=None): """Initialize the thermostat.""" self._target_temperature = Mode.Unknown @@ -95,6 +94,8 @@ def __init__(self, _mac, _iface=None, connection_cls=BTLEConnection): self._firmware_version = None self._device_serial = None + if connection_cls is None: + from .connection import BTLEConnection as connection_cls self._conn = connection_cls(_mac, _iface) self._conn.set_callback(PROP_NTFY_HANDLE, self.handle_notification) diff --git a/eq3bt/eq3cli.py b/eq3bt/eq3cli.py index a2c9cf4..1720545 100644 --- a/eq3bt/eq3cli.py +++ b/eq3bt/eq3cli.py @@ -19,15 +19,21 @@ def validate_mac(ctx, param, mac): @click.option("--mac", envvar="EQ3_MAC", required=True, callback=validate_mac) @click.option("--interface", default=None) @click.option("--debug/--normal", default=False) +@click.option("--backend", type=click.Choice(["bluepy", "gattlib"]), default="bluepy") @click.pass_context -def cli(ctx, mac, interface, debug): +def cli(ctx, mac, interface, debug, backend): """Tool to query and modify the state of EQ3 BT smart thermostat.""" if debug: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) - thermostat = Thermostat(mac, interface) + if backend == "gattlib": + from .gattlibconnection import BTLEConnection + + thermostat = Thermostat(mac, interface, BTLEConnection) + else: + thermostat = Thermostat(mac, interface) thermostat.update() ctx.obj = thermostat @@ -154,7 +160,7 @@ def offset(dev, offset): @cli.command() -@click.argument("away_end", type=click.Datetime(), default=None, required=False) +@click.argument("away_end", type=click.DateTime(), default=None, required=False) @click.argument("temperature", type=float, default=None, required=False) @pass_dev def away(dev, away_end, temperature): diff --git a/eq3bt/gattlibconnection.py b/eq3bt/gattlibconnection.py new file mode 100644 index 0000000..17ba2d3 --- /dev/null +++ b/eq3bt/gattlibconnection.py @@ -0,0 +1,95 @@ +""" +A simple adapter to gattlib. +Handles Connection duties (reconnecting etc.) transparently. +""" +import codecs +import logging +import threading + +import gattlib + +DEFAULT_TIMEOUT = 1 + +_LOGGER = logging.getLogger(__name__) + + +class BTLEConnection: + """Representation of a BTLE Connection.""" + + def __init__(self, mac, iface): + """Initialize the connection.""" + + self._conn = None + self._mac = mac + self._iface = iface + self._callbacks = {} + self._notifyevent = None + + def __enter__(self): + """ + Context manager __enter__ for connecting the device + :rtype: BTLEConnection + :return: + """ + _LOGGER.debug("Trying to connect to %s", self._mac) + if self._iface is None: + self._conn = gattlib.GATTRequester(self._mac, False) + else: + self._conn = gattlib.GATTRequester(self._mac, False, self._iface) + self._conn.on_notification = self.on_notification + try: + self._conn.connect() + except gattlib.BTBaseException as ex: + _LOGGER.debug( + "Unable to connect to the device %s, retrying: %s", self._mac, ex + ) + try: + self._conn.connect() + except Exception as ex2: + _LOGGER.debug("Second connection try to %s failed: %s", self._mac, ex2) + raise + + _LOGGER.debug("Connected to %s", self._mac) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self._conn: + self._conn.disconnect() + self._conn = None + + def on_notification(self, handle, data): + """Handle Callback from a Bluetooth (GATT) request.""" + _LOGGER.debug( + "Got notification from %s: %s", handle, codecs.encode(data, "hex") + ) + if handle in self._callbacks: + self._callbacks[handle](data[3:]) + if self._notifyevent: + self._notifyevent.set() + + @property + def mac(self): + """Return the MAC address of the connected device.""" + return self._mac + + def set_callback(self, handle, function): + """Set the callback for a Notification handle. It will be called with the parameter data, which is binary.""" + self._callbacks[handle] = function + + def make_request(self, handle, value, timeout=DEFAULT_TIMEOUT, with_response=True): + """Write a GATT Command without callback - not utf-8.""" + try: + with self: + _LOGGER.debug( + "Writing %s to %s", + codecs.encode(value, "hex"), + handle, + ) + self._notifyevent = threading.Event() + self._conn.write_by_handle(handle, value) + if timeout: + _LOGGER.debug("Waiting for notifications for %s", timeout) + self._notifyevent.wait(timeout) + except gattlib.BTBaseException as ex: + _LOGGER.debug("Got exception from gattlib while making a request: %s", ex) + raise diff --git a/pyproject.toml b/pyproject.toml index 394e871..23f1007 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,10 @@ python = "^3.7" click = "*" bluepy = ">=1.0.5" construct = "*" +gattlib = { version = "*", optional = true } + +[tool.poetry.extras] +gattlib = ["gattlib"] [tool.poetry.dev-dependencies] pytest = "*"