Skip to content

Commit

Permalink
Add code to support plugins
Browse files Browse the repository at this point in the history
This first implementation for plugins allows decode of non-standard tlv8 characteristics.

In addition:
* Fix decoder for supported audio configuration
* add decoder for supported-rtp-configuration characteristic
* Bump version to 0.17.0
  • Loading branch information
jlusiardi authored Apr 24, 2020
1 parent d37e1b7 commit c7de98d
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 54 deletions.
52 changes: 52 additions & 0 deletions doc/Plugins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Plugins

Plugins for HomeKit Python will be used to offer support for vendor specific
services and characteristics.

Currently there are two use cases for plugins:

* decoders for vendor specific characteristics of format tlv
* commands to interact with vendor specific characteristics

As an example, let's have a look at
[the Koogeek P1EU](./tested_devices/Koogeek%20P1EU.md). This device has some
vendor specific characteristics mostly read only like the characteristic
with UUID `4AAAF93E-0DEC-11E5-B939-0800200C9A66` (`MONTH_DATA_LAST_YEAR`
holding information on power consumption on a monthly basis for the last year).
To read those characteristics with the `get_accessory` or `get_characteristic`
commands with the `-d` (decode) option set, we need special decoders. Other
characteristics can also be written, in case of this accessory its
`4AAAF942-0DEC-11E5-B939-0800200C9A66` (`TIMER_SETTINGS`). To manipulate the
setting of the independent timing function a new command would be useful.

## How to create a plugin

Simple, create a PyPI module. The name pattern of the module is not specified.
The dependency list of the module **must** contain `homekit_python`

### Plugins providing only additional commands

A plugin that provides just additional commands are not bound to anything else.

### Plugins providing vendor specific

A plugin that provides decoders for vendor specific characteristics **must**
have a dependency on `homekit_python>=0.17.0` (this is the version plugins
got implemented) and **must** contain a package whose name starts with
`homekit_`. Within the plugin, there **must** be this structure:

```
plugin_module_directory
\homekit_XXX
\model
\characteristics
\uuid_XXXXXXXX_YYYY_ZZZZ_AAAA_BBBBBBBBBBBB.py
\...
```

Each of these `uuid_XXXXXXXX_YYYY_ZZZZ_AAAA_BBBBBBBBBBBB.py` must contain a
function `decoder`:
```python
def decoder(bytes_data: bytes) -> tlv8.EntryList:
pass
```
45 changes: 7 additions & 38 deletions homekit/debug_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
from homekit.accessoryserver import AccessoryRequestHandler
from homekit.model import Accessory
from homekit.model.services import AbstractService, ServicesTypes
from homekit.model.characteristics import AbstractCharacteristic, CharacteristicFormats, CharacteristicsTypes
from homekit.model.characteristics import AbstractCharacteristic, CharacteristicFormats, CharacteristicsTypes, \
CharacteristicsDecoderLoader
from homekit.http_impl import HttpStatusCodes
from homekit.controller.tools import AbstractPairing
from homekit.log_support import setup_logging, add_log_arguments
Expand Down Expand Up @@ -150,41 +151,6 @@ def __init__(self, iid: int, characteristic_type: str, characteristic_format: st
AbstractCharacteristic.__init__(self, iid, characteristic_type, characteristic_format)


class CharacteristicsDecoderLoader:
"""
class to dynamically load decoders for tlv8 characteristics.
"""

def __init__(self):
self.decoders = {}

def load(self, char_type: str):
"""
This function loads a decoder for the specified characteristics:
- get the name of the characteristic via the given uuid (via `CharacteristicsTypes.get_short()`)
- load a module from `homekit.model.characteristics` plus the name of the characteristic
- the module must contain a function `decoder`
:param char_type: the uuid of the characteristic
:return: a function that decodes the value of the characteristic into a `tlv8.EntryList`
"""
characteristic_name = CharacteristicsTypes.get_short(char_type)
mod_name = characteristic_name.replace('-', '_')
if char_type not in self.decoders:
try:
logging.info('loading module %s for type %s', mod_name, char_type)
module = importlib.import_module('homekit.model.characteristics.' + mod_name)
decoder = getattr(module, 'decoder')
self.decoders[char_type] = decoder
return decoder
except Exception as e:
logging.error('Error loading decoder: %s for type %s', e, char_type)
return None
else:
logging.info('got decoder for %s from cache', char_type)
return self.decoders[char_type]


decoder_loader = CharacteristicsDecoderLoader()


Expand All @@ -207,8 +173,11 @@ def log_transferred_value(text: str, aid: int, characteristic: AbstractCharacter
filtered_bytes_value = base64.b64decode(filtered_value)
decoder = decoder_loader.load(characteristic.type)
if decoder:
debug_value = tlv8.format_string(decoder(bytes_value))
filtered_debug_value = tlv8.format_string(decoder(filtered_bytes_value))
try:
debug_value = tlv8.format_string(decoder(bytes_value))
filtered_debug_value = tlv8.format_string(decoder(filtered_bytes_value))
except Exception as e:
logging.error('problem decoding', e)
else:
debug_value = tlv8.format_string(tlv8.deep_decode(bytes_value))
filtered_debug_value = tlv8.format_string(tlv8.deep_decode(filtered_bytes_value))
Expand Down
57 changes: 49 additions & 8 deletions homekit/get_accessories.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import sys
import argparse
import logging
import tlv8
import base64

from homekit.controller import Controller
from homekit.model.characteristics import CharacteristicsTypes
from homekit.model.characteristics import CharacteristicsTypes, CharacteristicsDecoderLoader
from homekit.model.services import ServicesTypes
from homekit.log_support import setup_logging, add_log_arguments
from homekit.model.characteristics.characteristic_formats import CharacteristicFormats


def setup_args_parser():
Expand All @@ -35,10 +38,38 @@ def setup_args_parser():
help='Specify output format')
parser.add_argument('--adapter', action='store', dest='adapter', default='hci0',
help='the bluetooth adapter to be used (defaults to hci0)')
parser.add_argument('-d', action='store_true', required=False, dest='decode',
help='If set, try to find a decoder for each characteristic and show the decoded value')
add_log_arguments(parser)
return parser.parse_args()


def decode_values(input_data):
"""
Try to decode the values from the given data structure. This depends on:
- if the field is of type `CharacteristicFormats.tlv8`
- a decoder was found for the characteristic
:param input_data: data returned from `AbstractPairing.list_accessories_and_characteristics`
:return: the input data with decoded value fields
"""
loader = CharacteristicsDecoderLoader()
for accessory in input_data:
for service in accessory['services']:
for characteristic in service['characteristics']:
c_format = characteristic['format']
c_type = characteristic['type']
# TODO what about CharacteristicFormats.data?
if c_format in [CharacteristicFormats.tlv8]:
decoder = loader.load(c_type)
if decoder:
try:
characteristic['value'] = decoder(base64.b64decode(characteristic['value']))
except Exception as e:
logging.error('error during decode %s: %s' % (c_type, characteristic['value']), e)
return input_data


if __name__ == '__main__':
args = setup_args_parser()

Expand All @@ -65,9 +96,12 @@ def setup_args_parser():
logging.debug(e, exc_info=True)
sys.exit(-1)

if args.decode:
data = decode_values(data)

# prepare output
if args.output == 'json':
print(json.dumps(data, indent=4))
print(json.dumps(data, indent=4, cls=tlv8.JsonEncoder))

if args.output == 'compact':
for accessory in data:
Expand All @@ -81,12 +115,19 @@ def setup_args_parser():
c_iid = characteristic['iid']
value = characteristic.get('value', '')
c_type = characteristic['type']
c_format = characteristic['format']
# we need to get the entry list from the decoder into a string and reformat it for better
# readability.
if args.decode and c_format in [CharacteristicFormats.tlv8] and isinstance(value, tlv8.EntryList):
value = tlv8.format_string(value)
value = ' '.join(value.splitlines(keepends=True))
value = '\n ' + value
perms = ','.join(characteristic['perms'])
desc = characteristic.get('description', '')
c_type = CharacteristicsTypes.get_short(c_type)
print(' {aid}.{iid}: {value} ({description}) >{ctype}< [{perms}]'.format(aid=aid,
iid=c_iid,
value=value,
ctype=c_type,
perms=perms,
description=desc))
print(' {aid}.{iid}: ({description}) >{ctype}< [{perms}]'.format(aid=aid,
iid=c_iid,
ctype=c_type,
perms=perms,
description=desc))
print(' Value: {value}'.format(value=value))
52 changes: 50 additions & 2 deletions homekit/get_characteristic.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
import argparse
import sys
import logging
import tlv8
import base64

from homekit.controller import Controller
from homekit.log_support import setup_logging, add_log_arguments
from homekit.model.characteristics import CharacteristicsDecoderLoader
from homekit.controller.tools import AbstractPairing
from homekit.model.characteristics.characteristic_formats import CharacteristicFormats


def setup_args_parser():
Expand All @@ -40,12 +45,42 @@ def setup_args_parser():
help='read out the types for the characteristics as well')
parser.add_argument('-e', action='store_true', required=False, dest='events',
help='read out the events for the characteristics as well')
parser.add_argument('-d', action='store_true', required=False, dest='decode',
help='If set, try to find a decoder for each characteristic and show the decoded value')
parser.add_argument('--adapter', action='store', dest='adapter', default='hci0',
help='the bluetooth adapter to be used (defaults to hci0)')
add_log_arguments(parser)
return parser.parse_args()


def get_characteristic_decoders(pairing: AbstractPairing) -> dict:
"""
This function filters characteristics of an accessory for decodable types (currently only TLV characteristics) and
tries to load the decoder functions for said characteristic's uuid.
:param pairing: an implementation of `AbstractPairing` (either for IP or BLE)
:return: a dict of aid/cid to decoder functions
"""
loaded_decoders = {}
loader = CharacteristicsDecoderLoader()
for a in pairing.list_accessories_and_characteristics():
aid = a['aid']
for s in a['services']:
for c in s['characteristics']:
c_format = c['format']
# TODO what about CharacteristicFormats.data?
if c_format not in [CharacteristicFormats.tlv8]:
continue
c_id = c['iid']
key = '{}.{}'.format(aid, c_id)
c_type = c['type']
decoder = loader.load(c_type)
if decoder is not None:
loaded_decoders[key] = decoder

return loaded_decoders


if __name__ == '__main__':
args = setup_args_parser()

Expand All @@ -71,10 +106,23 @@ def setup_args_parser():
logging.debug(e, exc_info=True)
sys.exit(-1)

if args.decode:
decoders = get_characteristic_decoders(pairing)
else:
decoders = {}

# print the data
tmp = {}
for k in data:
nk = str(k[0]) + '.' + str(k[1])
tmp[nk] = data[k]
value = data[k]

if decoders.get(nk):
try:
value['value'] = decoders.get(nk)(base64.b64decode(value['value']))
except Exception as e:
logging.ERROR('could not decode', e)

tmp[nk] = value

print(json.dumps(tmp, indent=4))
print(json.dumps(tmp, indent=4, cls=tlv8.JsonEncoder))
61 changes: 60 additions & 1 deletion homekit/model/characteristics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
'SaturationCharacteristicMixin', 'SerialNumberCharacteristic', 'TargetHeatingCoolingStateCharacteristic',
'TargetHeatingCoolingStateCharacteristicMixin', 'TargetTemperatureCharacteristic',
'TargetTemperatureCharacteristicMixin', 'TemperatureDisplayUnitCharacteristic', 'TemperatureDisplayUnitsMixin',
'VolumeCharacteristic', 'VolumeCharacteristicMixin'
'VolumeCharacteristic', 'VolumeCharacteristicMixin', 'CharacteristicsDecoderLoader'
]

import importlib
import logging
import pkgutil

from homekit.model.characteristics.characteristic_permissions import CharacteristicPermissions
from homekit.model.characteristics.characteristic_types import CharacteristicsTypes
from homekit.model.characteristics.characteristic_units import CharacteristicUnits
Expand Down Expand Up @@ -59,3 +63,58 @@
from homekit.model.characteristics.temperature_display_unit import TemperatureDisplayUnitsMixin, \
TemperatureDisplayUnitCharacteristic
from homekit.model.characteristics.volume import VolumeCharacteristic, VolumeCharacteristicMixin


class CharacteristicsDecoderLoader:
"""
class to dynamically load decoders for tlv8 characteristics.
"""

def __init__(self):
self.decoders = {}

def load(self, char_type: str):
"""
This function loads a decoder for the specified characteristics:
- get the name of the characteristic via the given uuid (via `CharacteristicsTypes.get_short()`)
- load a module from `homekit.model.characteristics` plus the name of the characteristic
- the module must contain a function `decoder`
:param char_type: the uuid of the characteristic
:return: a function that decodes the value of the characteristic into a `tlv8.EntryList`
"""
characteristic_name = CharacteristicsTypes.get_short(char_type)
if characteristic_name.startswith('Unknown'):
mod_name = 'uuid_{}'.format(char_type.replace('-', '_'))
logging.info('modname %s', mod_name)
else:
mod_name = characteristic_name.replace('-', '_')
if char_type not in self.decoders:

# try to dynamically load from the standard characteristics by name
try:
logging.info('loading module "%s" for type "%s"', mod_name, char_type)
module = importlib.import_module('homekit.model.characteristics.' + mod_name)
decoder = getattr(module, 'decoder')
self.decoders[char_type] = decoder
return decoder
except Exception as e:
logging.info('Error loading decoder: "%s" for type "%s"', e, char_type)

# try to load from all plugins, it may be a non-standard characteristic with vendor specific data
try:
for _, plugin_name, _ in pkgutil.iter_modules():
if not plugin_name.startswith('homekit_'):
continue
logging.info('loading module "%s" for type "%s" from plugin "%s"', mod_name, char_type, plugin_name)
module = importlib.import_module('.model.characteristics.' + mod_name, plugin_name)
decoder = getattr(module, 'decoder')
self.decoders[char_type] = decoder
return decoder
except Exception as e:
logging.info('Error loading decoder: "%s" for type "%s"', e, char_type)

return None
else:
logging.info('got decoder for %s from cache', char_type)
return self.decoders[char_type]
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,5 @@ def decoder(bytes_data):
AudioCodecParametersKeys.RTP_TIME: RtpTimeValues
}
},
SupportedAudioStreamConfigurationKeys.COMFORT_NOISE_SUPPORT: {

}
SupportedAudioStreamConfigurationKeys.COMFORT_NOISE_SUPPORT: tlv8.DataType.UNSIGNED_INTEGER
})
Loading

0 comments on commit c7de98d

Please sign in to comment.