Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python SDK: Add Gopro-specific advertisement parsing #633

Merged
merged 3 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Demo to retrieve and parse bleak-level advertisements"""

import asyncio

from bleak import BleakScanner

from open_gopro.models.ble_advertisement import AdvData, GoProAdvData


async def main() -> None:
adv_data = AdvData()

async with BleakScanner(service_uuids=["0000fea6-0000-1000-8000-00805f9b34fb"]) as scanner:
async for _, data in scanner.advertisement_data():
adv_data.update(data)
if adv_data.local_name: # Once we've received the scan response...
break

print(f"GoPro Data: {GoProAdvData.fromAdvData(adv_data)}")


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
"""GoPro specific advertisement entities and parsing structures"""

from __future__ import annotations

import json
from dataclasses import asdict, dataclass, field
from typing import Any

from bleak.backends.scanner import AdvertisementData
from construct import (
Adapter,
BitStruct,
Byte,
Bytes,
Enum,
Flag,
GreedyString,
Hex,
Int16ub,
Int16ul,
PaddedString,
Padding,
Struct,
this,
)

from open_gopro.util import deeply_update_dict


class Hexlify(Adapter):
"""Construct adapter for pretty hex representation"""

def _decode(self, obj: bytes, context: Any, path: Any) -> str:
return obj.hex(":")

def _encode(self, obj: str, context: Any, path: Any) -> list[int]:
return list(map(int, obj.split(":")))


camera_status_struct = BitStruct(
"processor_state" / Flag,
"wifi_ap_state" / Flag,
"peripheral_pairing_state" / Flag,
"central_role_enabled" / Flag,
"is_new_media_available" / Flag,
"reserved" / Padding(3),
)

camera_id_struct = Enum(
Byte,
Hero11Black=56,
Fraction=66,
)


camera_capability_struct = BitStruct(
"cnc" / Flag,
"ble_metadata" / Flag,
"wideband_audio" / Flag,
"concurrent_master_slave" / Flag,
"onboarding" / Flag,
"new_media_available" / Flag,
"reserved" / Padding(10),
)

media_offload_status_struct = BitStruct(
"available" / Flag,
"new_media_available" / Flag,
"battery_ok" / Flag,
"sd_card_ok" / Flag,
"busy" / Flag,
"paused" / Flag,
"reserved" / Padding(2),
)

manuf_data_struct = Struct(
"schema_version" / Byte,
"camera_status" / camera_status_struct,
"camera_id" / camera_id_struct,
"camera_capabilities" / camera_capability_struct,
"id_hash" / Hexlify(Bytes(6)),
"media_offload_status" / media_offload_status_struct,
)

adv_data_struct = Struct(
"flags_length" / Byte,
"flags" / Hex(Int16ub),
"uuids_length" / Byte,
"uuids_type" / Hex(Byte),
"uuids" / Hex(Int16ul),
"manuf_length" / Byte,
"manuf_type" / Hex(Byte),
"company_id" / Hex(Int16ub),
"manuf_data" / manuf_data_struct,
)

service_data_struct = Struct(
"ap_mac_address" / Hexlify(Bytes(4)),
"serial_number" / GreedyString("utf8"),
)

scan_response_struct = Struct(
"name_length" / Byte,
"name_type" / Hex(Byte),
"name" / PaddedString(this.name_length - 1, encoding="utf8"),
"service_length" / Byte,
"service_type" / Hex(Byte),
"service_uuid" / Hex(Int16ul),
"service_data" / service_data_struct,
)


@dataclass
class Jsonable:
"""Mixin to use pretty hex presentation for JSON decoding"""

def __str__(self) -> str:
def default_decode(obj: Any) -> Any:
if isinstance(obj, (bytes, bytearray)):
return obj.hex(":")
return str(obj)

return json.dumps(asdict(self), indent=4, default=default_decode)


@dataclass
class GoProAdvData(Jsonable):
"""GoPro-specific advertising data"""

name: str
schema_version: int
processor_state: bool
wifi_ap_state: bool
peripheral_pairing_state: bool
is_new_media_available: bool
camera_id: str
supports_cnc: bool
supports_ble_metadata: bool
supports_wideband_audio: bool
supports_concurrent_master_slave: bool
supports_onboarding: bool
supports_new_media_available: bool
id_hash: bytes
is_media_upload_new_media_available: bool
is_media_upload_available: bool
is_media_upload_battery_ok: bool
is_media_upload_sd_card_ok: bool
is_media_upload_busy: bool
is_media_upload_paused: bool
ap_mac_address: bytes
partial_serial_number: bytes

@classmethod
def fromAdvData(cls, data: AdvData) -> GoProAdvData:
"""Build GoPro specific advertisement from standard BLE advertisement data

Args:
data (AdvData): standard BLE advertisement data

Returns:
GoProAdvData: parsed GoPro specific advertising data
"""
manuf_data = manuf_data_struct.parse(list(data.manufacturer_data.values())[0])
service_data = service_data_struct.parse(list(data.service_data.values())[0])
return GoProAdvData(
# Name from scan response data
name=data.local_name,
# Schema version from advertising data manufacturer data
schema_version=manuf_data.schema_version,
# Camera status from advertising data manufacturer data
processor_state=manuf_data.camera_status.processor_state,
wifi_ap_state=manuf_data.camera_status.wifi_ap_state,
peripheral_pairing_state=manuf_data.camera_status.peripheral_pairing_state,
is_new_media_available=manuf_data.camera_status.is_new_media_available,
# Camera ID from advertising data manufacturer data
camera_id=manuf_data.camera_id,
# Camera capabilities from advertising data manufacturer data
supports_ble_metadata=manuf_data.camera_capabilities.ble_metadata,
supports_cnc=manuf_data.camera_capabilities.cnc,
supports_onboarding=manuf_data.camera_capabilities.onboarding,
supports_wideband_audio=manuf_data.camera_capabilities.wideband_audio,
supports_concurrent_master_slave=manuf_data.camera_capabilities.concurrent_master_slave,
supports_new_media_available=manuf_data.camera_capabilities.new_media_available,
# ID Hash from advertising data manufacturer's data
id_hash=manuf_data.id_hash,
# Media offload status status from advertising data manufacturer's data
is_media_upload_new_media_available=manuf_data.media_offload_status.new_media_available,
is_media_upload_available=manuf_data.media_offload_status.available,
is_media_upload_battery_ok=manuf_data.media_offload_status.battery_ok,
is_media_upload_sd_card_ok=manuf_data.media_offload_status.sd_card_ok,
is_media_upload_busy=manuf_data.media_offload_status.busy,
is_media_upload_paused=manuf_data.media_offload_status.paused,
# Mac address from scan response data service data
ap_mac_address=service_data.ap_mac_address,
# Partial serial number from scan response data service data
partial_serial_number=service_data.serial_number,
)


@dataclass
class AdvData(Jsonable):
"""Standard BLE advertising data

Only contains fields that are currently used by GoPro
"""

local_name: str = ""
manufacturer_data: dict[str, Any] = field(default_factory=dict)
service_uuids: list[str] = field(default_factory=list)
service_data: dict = field(default_factory=dict)

def update(self, data: AdvertisementData) -> None:
"""Update with a (potentially incomplete) advertisement

Args:
data (AdvertisementData): advertisement to use for updating
"""
self_dict = asdict(self)
for k, v in data._asdict().items():
if not v:
continue
if isinstance(v, dict):
setattr(self, k, deeply_update_dict(self_dict[k], v))
elif isinstance(v, list):
setattr(self, k, [*self_dict[k], v])
else:
setattr(self, k, v)
18 changes: 18 additions & 0 deletions demos/python/sdk_wireless_camera_control/open_gopro/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,21 @@ def get_current_dst_aware_time() -> tuple[datetime, int, bool]:
if is_dst:
offset += 60
return (now, int(offset), is_dst)


def deeply_update_dict(d: dict, u: dict) -> dict:
"""Recursively update a dict

Args:
d (dict): original dict
u (dict): dict to apply updates from

Returns:
dict: updated original dict
"""
for k, v in u.items():
if isinstance(v, dict):
d[k] = deeply_update_dict(d.get(k, {}), v)
else:
d[k] = v
return d
114 changes: 113 additions & 1 deletion demos/python/sdk_wireless_camera_control/tests/unit/test_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from open_gopro.api.parsers import ByteParserBuilders
from open_gopro.communicator_interface import GoProBle
from open_gopro.constants import CmdId
from open_gopro.models.response import BleRespBuilder, GlobalParsers
from open_gopro.models.ble_advertisement import adv_data_struct, scan_response_struct
from open_gopro.models.response import GlobalParsers
from open_gopro.parser_interface import Parser
from open_gopro.proto import EnumResultGeneric, ResponseGetApEntries

Expand All @@ -32,3 +33,114 @@ def test_recursive_protobuf_proxying():
assert len(parsed.entries) == 2
assert parsed.entries[0].ssid == "one"
assert parsed.entries[1].ssid == "two"


def test_ble_advertisement_parsing():
# GIVEN
adv_data = bytes(
[
0x02,
0x01,
0x02,
0x03,
0x02,
0xA6,
0xFE,
0x0F,
0xFF,
0xF2,
0x02,
0x02,
0x01,
0x38,
0x33,
0x00,
0xB3,
0xFE,
0x2A,
0x79,
0xDC,
0xEB,
0x0F,
]
)

# WHEN
adv = adv_data_struct.parse(adv_data)
manuf_data = adv.manuf_data
camera_status = manuf_data.camera_status
camera_capabilities = manuf_data.camera_capabilities
media_offload_status = manuf_data.media_offload_status

# THEN
assert adv.flags == 0x0102
assert adv.uuids == 0xFEA6
assert adv.manuf_type == 0xFF
assert adv.company_id == 0xF202

assert manuf_data.schema_version == 2
assert str(manuf_data.camera_id) == "Hero11Black"
assert manuf_data.id_hash == "b3:fe:2a:79:dc:eb"

assert camera_status.processor_state == False
assert camera_status.wifi_ap_state == False
assert camera_status.peripheral_pairing_state == False
assert camera_status.central_role_enabled == False
assert camera_status.is_new_media_available == False

assert camera_capabilities.cnc == False
assert camera_capabilities.ble_metadata == False
assert camera_capabilities.wideband_audio == True
assert camera_capabilities.concurrent_master_slave == True
assert camera_capabilities.onboarding == False
assert camera_capabilities.new_media_available == False

assert media_offload_status.available == False
assert media_offload_status.new_media_available == False
assert media_offload_status.battery_ok == False
assert media_offload_status.sd_card_ok == False
assert media_offload_status.busy == True
assert media_offload_status.paused == True


def test_ble_scan_response_parsing():
# GIVEN
scan_response_data = bytes(
[
0x0B,
0x09,
0x47,
0x6F,
0x50,
0x72,
0x6F,
0x20,
0x31,
0x30,
0x35,
0x38,
0x0B,
0x16,
0xA6,
0xFE,
0xF7,
0xA9,
0x76,
0x88,
0x31,
0x30,
0x35,
0x38,
]
)

# WHEN
scan_response = scan_response_struct.parse(scan_response_data)
print(scan_response)

# THEN
assert scan_response.name == "GoPro 1058"
assert scan_response.service_type == 0x16
assert scan_response.service_uuid == 0xFEA6
assert scan_response.service_data.ap_mac_address == "f7:a9:76:88"
assert scan_response.service_data.serial_number == "1058"
Loading