diff --git a/jarbas_hive_mind/discovery/__init__.py b/jarbas_hive_mind/discovery/__init__.py index b09d47f..adf8831 100644 --- a/jarbas_hive_mind/discovery/__init__.py +++ b/jarbas_hive_mind/discovery/__init__.py @@ -1,220 +1 @@ -import upnpclient -import threading -from time import sleep -from ovos_utils.log import LOG -from ovos_utils.xml_helper import xml2dict -from jarbas_hive_mind.slave import HiveMindSlave -from jarbas_hive_mind.slave.terminal import HiveMindTerminal -import requests - - -class _Device: - def __init__(self, host, device_type='HiveMind-websocket'): - self.host = host - self.device_type = device_type - - @property - def services(self): - return {} - - @property - def location(self): - return None - - @property - def device_name(self): - return self.host - - @property - def friendly_name(self): - return self.device_name - - @property - def model_description(self): - return self.device_name - - @property - def model_name(self): - return self.device_type - - @property - def model_number(self): - return "0.1" - - @property - def udn(self): - return self.model_name + ":" + self.model_number - - @property - def address(self): - return self.location - - @property - def data(self): - return {"host": self.host, - "type": self.device_type} - - -class HiveMindNode: - def __init__(self, d=None): - self.device = d - self._data = None - - @property - def services(self): - return self.device.service_map - - @property - def xml(self): - return self.device.location - - @property - def device_name(self): - return self.device.device_name - - @property - def friendly_name(self): - return self.device.friendly_name - - @property - def description(self): - return self.device.model_description - - @property - def node_tyoe(self): - return self.device.model_name - - @property - def version(self): - return self.device.model_number - - @property - def device_id(self): - return self.device.udn - - @property - def data(self): - if self.xml and self._data is None: - LOG.info("Fetching Node data: {url}".format(url=self.xml)) - xml = requests.get(self.xml).text - self._data = xml2dict(xml) - return self._data - - @property - def address(self): - try: - if self.device.location: - services = self.data["root"]["device"]['serviceList'] - for s in services: - service = services[s] - if service["serviceType"] == \ - 'urn:jarbasAi:HiveMind:service:Master': - return service["URLBase"] - except: - pass - return self.device.data.get("host") - - @property - def host(self): - return ":".join(self.address.split(":")[:-1]) - - @property - def port(self): - return int(self.address.split(":")[-1]) - - def connect(self, headers, crypto_key=None, bus=None, - node_type=None): - try: - # TODO cyclic import - from jarbas_hive_mind import HiveMindConnection - con = HiveMindConnection(self.host, self.port) - if node_type: - clazz = node_type - else: - if bus: - clazz = HiveMindSlave - else: - clazz = HiveMindTerminal - if bus: - component = clazz(bus=bus, - headers=headers, - crypto_key=crypto_key) - else: - component = clazz(headers=headers, - crypto_key=crypto_key) - # will check url for ssl - LOG.debug("Connecting to HiveMind websocket @ {url}".format( - url=self.address)) - con.connect(component) - except Exception as e: - LOG.error("Connection failed") - LOG.exception(e) - - -class LocalDiscovery(threading.Thread): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.pause = 20 # scan every 20 seconds - self._nodes = {} - self.blacklist = [] - self.running = False - self.connected = False - - def on_new_upnp_node(self, node): - LOG.info("UpNp Node Found: " + node.xml) - self._nodes[node.xml] = node - self.on_new_node(node) - - def on_new_node(self, node): - pass - - @property - def nodes(self): - return self._nodes - - def scan(self): - for node_url in self._nodes: - if node_url in self.blacklist: - continue - yield node_url - for node_url in self.upnp_scan(): - if node_url in self.blacklist: - continue - self.blacklist.append(node_url) - yield node_url - - def upnp_scan(self): - devices = upnpclient.discover() - for d in devices: - if d.location in self.nodes: - continue - if d.model_name == "HiveMind-core": - node = HiveMindNode(d) - self.on_new_upnp_node(node) - yield node.xml - - def run(self) -> None: - self.running = True - while self.running: - self.scan() - sleep(self.pause) - self.stop() - - def stop(self): - self.running = False - - def search_and_connect(self, *args, **kwargs): - while True: - # allow zeroconf to get result before upnp check - sleep(0.5) - if self.connected: - continue - for node_url in self.scan(): - if node_url in self.blacklist: - continue - - self.blacklist.append(node_url) - node = self.nodes[node_url] - node.connect(*args, **kwargs) - self.connected = True - +from HiveMind_presence import * \ No newline at end of file diff --git a/jarbas_hive_mind/discovery/ssdp.py b/jarbas_hive_mind/discovery/ssdp.py index c30eb78..f00bc73 100644 --- a/jarbas_hive_mind/discovery/ssdp.py +++ b/jarbas_hive_mind/discovery/ssdp.py @@ -1,232 +1 @@ -# Licensed under the MIT license -# http://opensource.org/licenses/mit-license.php - -# Copyright 2005, Tim Potter -# Copyright 2006 John-Mark Gurney -# Copyright (C) 2006 Fluendo, S.A. (www.fluendo.com). -# Copyright 2006,2007,2008,2009 Frank Scholz -# Copyright 2016 Erwan Martin -# -# Implementation of a SSDP server. -# - -import random -import time -import socket -from email.utils import formatdate -from errno import ENOPROTOOPT -import threading -from ovos_utils.log import LOG - -SSDP_PORT = 1900 -SSDP_ADDR = '239.255.255.250' -SERVER_ID = 'HiveMind SSDP Server V0.1' - - -class SSDPServer(threading.Thread): - """A class implementing a SSDP server. The notify_received and - searchReceived methods are called when the appropriate type of - datagram is received by the server.""" - known = {} - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.sock = None - self.running = False - - def run(self): - self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - if hasattr(socket, "SO_REUSEPORT"): - try: - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - except socket.error as le: - # RHEL6 defines SO_REUSEPORT but it doesn't work - if le.errno == ENOPROTOOPT: - pass - else: - raise - - addr = socket.inet_aton(SSDP_ADDR) - interface = socket.inet_aton('0.0.0.0') - cmd = socket.IP_ADD_MEMBERSHIP - self.sock.setsockopt(socket.IPPROTO_IP, cmd, addr + interface) - self.sock.bind(('0.0.0.0', SSDP_PORT)) - self.sock.settimeout(1) - self.running = True - while self.running: - try: - data, addr = self.sock.recvfrom(1024) - self.datagram_received(data, addr) - except socket.timeout: - continue - self.shutdown() - self.running = False - - def shutdown(self): - for st in self.known: - if self.known[st]['MANIFESTATION'] == 'local': - self.do_byebye(st) - - def datagram_received(self, data, host_port): - """Handle a received multicast datagram.""" - - (host, port) = host_port - - try: - header, payload = data.decode().split('\r\n\r\n')[:2] - except ValueError as err: - LOG.error(err) - return - - lines = header.split('\r\n') - cmd = lines[0].split(' ') - lines = map(lambda x: x.replace(': ', ':', 1), lines[1:]) - lines = filter(lambda x: len(x) > 0, lines) - - headers = [x.split(':', 1) for x in lines] - headers = dict(map(lambda x: (x[0].lower(), x[1]), headers)) - - #LOG.info('SSDP command %s %s - from %s:%d' % (cmd[0], cmd[1], host, - # port)) - #LOG.debug('with headers: {}.'.format(headers)) - if cmd[0] == 'M-SEARCH' and cmd[1] == '*': - # SSDP discovery - self.discovery_request(headers, (host, port)) - elif cmd[0] == 'NOTIFY' and cmd[1] == '*': - # SSDP presence - # LOG.debug('NOTIFY *') - pass - else: - LOG.warning('Unknown SSDP command %s %s' % (cmd[0], cmd[1])) - - def register(self, manifestation, usn, st, location, server=SERVER_ID, cache_control='max-age=1800', silent=False, - host=None): - """Register a service or device that this SSDP server will - respond to.""" - - LOG.info('Registering %s (%s)' % (st, location)) - - self.known[usn] = {} - self.known[usn]['USN'] = usn - self.known[usn]['LOCATION'] = location - self.known[usn]['ST'] = st - self.known[usn]['EXT'] = '' - self.known[usn]['SERVER'] = server - self.known[usn]['CACHE-CONTROL'] = cache_control - - self.known[usn]['MANIFESTATION'] = manifestation - self.known[usn]['SILENT'] = silent - self.known[usn]['HOST'] = host - self.known[usn]['last-seen'] = time.time() - - if manifestation == 'local' and self.sock: - self.do_notify(usn) - - def unregister(self, usn): - LOG.info("Un-registering %s" % usn) - del self.known[usn] - - def is_known(self, usn): - return usn in self.known - - def send_it(self, response, destination, delay, usn): - #LOG.debug('send discovery response delayed by %ds for %s to %r' % ( - # delay, usn, destination)) - try: - self.sock.sendto(response.encode(), destination) - except (AttributeError, socket.error) as msg: - LOG.warning("failure sending out byebye notification: %r" % msg) - - def discovery_request(self, headers, host_port): - """Process a discovery request. The response must be sent to - the address specified by (host, port).""" - - (host, port) = host_port - - #LOG.info('Discovery request from (%s,%d) for %s' % (host, port, - # headers['st'])) - #LOG.info('Discovery request for %s' % headers['st']) - - # Do we know about this service? - for i in self.known.values(): - if i['MANIFESTATION'] == 'remote': - continue - if headers['st'] == 'ssdp:all' and i['SILENT']: - continue - if i['ST'] == headers['st'] or headers['st'] == 'ssdp:all': - response = ['HTTP/1.1 200 OK'] - - usn = None - for k, v in i.items(): - if k == 'USN': - usn = v - if k not in ('MANIFESTATION', 'SILENT', 'HOST'): - response.append('%s: %s' % (k, v)) - - if usn: - response.append('DATE: %s' % formatdate(timeval=None, localtime=False, usegmt=True)) - - response.extend(('', '')) - delay = random.randint(0, int(headers['mx'])) - - self.send_it('\r\n'.join(response), (host, port), delay, usn) - - def do_notify(self, usn): - """Do notification""" - - if self.known[usn]['SILENT']: - return - LOG.info('Sending alive notification for %s' % usn) - - resp = [ - 'NOTIFY * HTTP/1.1', - 'HOST: %s:%d' % (SSDP_ADDR, SSDP_PORT), - 'NTS: ssdp:alive', - ] - stcpy = dict(self.known[usn].items()) - stcpy['NT'] = stcpy['ST'] - del stcpy['ST'] - del stcpy['MANIFESTATION'] - del stcpy['SILENT'] - del stcpy['HOST'] - del stcpy['last-seen'] - - resp.extend(map(lambda x: ': '.join(x), stcpy.items())) - resp.extend(('', '')) - LOG.debug('do_notify content', resp) - try: - self.sock.sendto('\r\n'.join(resp).encode(), (SSDP_ADDR, SSDP_PORT)) - self.sock.sendto('\r\n'.join(resp).encode(), (SSDP_ADDR, SSDP_PORT)) - except (AttributeError, socket.error) as msg: - LOG.warning("failure sending out alive notification: %r" % msg) - - def do_byebye(self, usn): - """Do byebye""" - - LOG.info('Sending byebye notification for %s' % usn) - - resp = [ - 'NOTIFY * HTTP/1.1', - 'HOST: %s:%d' % (SSDP_ADDR, SSDP_PORT), - 'NTS: ssdp:byebye', - ] - try: - stcpy = dict(self.known[usn].items()) - stcpy['NT'] = stcpy['ST'] - del stcpy['ST'] - del stcpy['MANIFESTATION'] - del stcpy['SILENT'] - del stcpy['HOST'] - del stcpy['last-seen'] - resp.extend(map(lambda x: ': '.join(x), stcpy.items())) - resp.extend(('', '')) - #LOG.debug('do_byebye content', resp) - if self.sock: - try: - self.sock.sendto('\r\n'.join(resp), (SSDP_ADDR, SSDP_PORT)) - except (AttributeError, socket.error) as msg: - LOG.error("failure sending out byebye notification: %r" % - msg) - except KeyError as msg: - LOG.error("error building byebye notification: %r" % msg) \ No newline at end of file +from HiveMind_presence.ssdp import * diff --git a/jarbas_hive_mind/discovery/upnp_server.py b/jarbas_hive_mind/discovery/upnp_server.py index 9700b1e..bfb2cd5 100644 --- a/jarbas_hive_mind/discovery/upnp_server.py +++ b/jarbas_hive_mind/discovery/upnp_server.py @@ -1,162 +1 @@ -from http.server import BaseHTTPRequestHandler, HTTPServer -import threading - -PORT_NUMBER = 8080 - - -class UPNPHTTPServerHandler(BaseHTTPRequestHandler): - """ - A HTTP handler that serves the UPnP XML files. - """ - - # Handler for the GET requests - def do_GET(self): - if self.path == "/" + self.server.scpd_xml_path: - self.send_response(200) - self.send_header('Content-type', 'application/xml') - self.end_headers() - self.wfile.write(self.scpd_xml.encode()) - return - if self.path == "/" + self.server.device_xml_path: - self.send_response(200) - self.send_header('Content-type', 'application/xml') - self.end_headers() - self.wfile.write(self.device_xml.encode()) - return - else: - self.send_response(404) - self.send_header('Content-type', 'text/html') - self.end_headers() - self.wfile.write(b"Not found.") - return - - @property - def services_xml(self): - xml = """ - - {hive_url} - urn:jarbasAi:HiveMind:service:Master - urn:jarbasAi:HiveMind:serviceId:HiveMindNode - /HiveMind - - {scpd_path} - - """ - return xml.format(scpd_path=self.server.scpd_xml_path, - hive_url=self.server.presentation_url) - - @property - def device_xml(self): - """ - Get the main device descriptor xml file. - """ - xml = """ - - {major} - {minor} - - - urn:schemas-upnp-org:device:Basic:1 - {friendly_name} - {manufacturer} - {manufacturer_url} - {model_description} - {model_name} - {model_number} - {model_url} - {serial_number} - uuid:{uuid} - {services_xml} - {presentation_url} - -""" - return xml.format(friendly_name=self.server.friendly_name, - manufacturer=self.server.manufacturer, - manufacturer_url=self.server.manufacturer_url, - model_description=self.server.model_description, - model_name=self.server.model_name, - model_number=self.server.model_number, - model_url=self.server.model_url, - serial_number=self.server.serial_number, - uuid=self.server.uuid, - presentation_url=self.server.presentation_url, - scpd_path=self.server.scpd_xml_path, - services_xml=self.services_xml, - major=self.server.major_version, - minor=self.server.minor_version - # device_xml_path=self.device_xml_path - ) - - @property - def scpd_xml(self): - """ - Get the device WSD file. - """ - return """ - -1 -0 - -""" - - -class UPNPHTTPServerBase(HTTPServer): - """ - A simple HTTP server that knows the information about a UPnP device. - """ - - def __init__(self, server_address, request_handler_class): - HTTPServer.__init__(self, server_address, request_handler_class) - self.port = None - self.friendly_name = None - self.manufacturer = None - self.manufacturer_url = None - self.model_description = None - self.model_name = None - self.model_url = None - self.serial_number = None - self.uuid = None - self.presentation_url = None - self.scpd_xml_path = None - self.device_xml_path = None - self.major_version = None - self.minor_version = None - - -class UPNPHTTPServer(threading.Thread): - """ - A thread that runs UPNPHTTPServerBase. - """ - - def __init__(self, port, friendly_name, manufacturer, manufacturer_url, - model_description, model_name, - model_number, model_url, serial_number, uuid, - presentation_url, host=""): - threading.Thread.__init__(self, daemon=True) - self.server = UPNPHTTPServerBase(('', port), UPNPHTTPServerHandler) - self.server.port = port - self.server.friendly_name = friendly_name - self.server.manufacturer = manufacturer - self.server.manufacturer_url = manufacturer_url - self.server.model_description = model_description - self.server.model_name = model_name - self.server.model_number = model_number - self.server.model_url = model_url - self.server.serial_number = serial_number - self.server.uuid = uuid - self.server.presentation_url = presentation_url - self.server.scpd_xml_path = 'scpd.xml' - self.server.device_xml_path = "device.xml" - self.server.major_version = 0 - self.server.minor_version = 1 - self.host = host - - @property - def path(self): - path = 'http://{ip}:{port}/{path}'.format(ip=self.host, - port=8088, - path=self.server.device_xml_path) - return path - - def run(self): - self.server.serve_forever() +from HiveMind_presence.upnp_server import * diff --git a/jarbas_hive_mind/nodes/master.py b/jarbas_hive_mind/nodes/master.py index 0978d6b..6fdee96 100644 --- a/jarbas_hive_mind/nodes/master.py +++ b/jarbas_hive_mind/nodes/master.py @@ -10,11 +10,10 @@ from jarbas_hive_mind.interface import HiveMindMasterInterface import json from jarbas_hive_mind.message import HiveMessage, HiveMessageType -from jarbas_hive_mind.discovery.ssdp import SSDPServer -from jarbas_hive_mind.discovery.upnp_server import UPNPHTTPServer from jarbas_hive_mind.nodes import HiveMindNodeType import uuid from ovos_utils.messagebus import FakeBus +from HiveMind_presence import LocalPresence # protocol @@ -167,35 +166,11 @@ def __init__(self, bus=None, announce=True, *args, **kwargs): self.interface = HiveMindMasterInterface(self) self.announce = announce - self.upnp_server = None - self.ssdp = None + self.presence = None def start_announcing(self): - device_uuid = uuid.uuid4() - local_ip_address = get_ip() - hivemind_socket = self.listener.address.replace("0.0.0.0", - local_ip_address) - if self.ssdp is None or self.upnp_server is None: - self.upnp_server = UPNPHTTPServer(8088, - friendly_name="JarbasHiveMind Master", - manufacturer='JarbasAI', - manufacturer_url='https://ai-jarbas.gitbook.io/jarbasai/', - model_description='Jarbas HiveMind', - model_name="HiveMind-core", - model_number="0.9", - model_url="https://github.com/OpenJarbas/HiveMind-core", - serial_number=self.protocol.platform, - uuid=device_uuid, - presentation_url=hivemind_socket, - host=local_ip_address) - self.upnp_server.start() - - self.ssdp = SSDPServer() - self.ssdp.register('local', - 'uuid:{}::upnp:HiveMind-websocket'.format(device_uuid), - 'upnp:HiveMind-websocket', - self.upnp_server.path) - self.ssdp.start() + self.presence = LocalPresence(port=self.listener.port, ssl=self.listener.is_secure) + self.presence.start() def bind(self, listener): self.listener = listener @@ -226,6 +201,8 @@ def register_mycroft_messages(self): self.bus.on('hive.send', self.handle_send) def shutdown(self): + if self.presence: + self.presence.stop() self.bus.remove('message', self.handle_outgoing_mycroft) self.bus.remove('hive.send', self.handle_send) diff --git a/requirements.txt b/requirements.txt index 008068b..1382f7a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,5 +6,5 @@ twisted ovos_utils>=0.0.6 json_database>=0.2.6 pycryptodomex -upnpclient>=0.0.8 +HiveMind_presence~=0.0.2a2 mycroft-messagebus-client>=0.9.1 diff --git a/setup.py b/setup.py index e5c562d..e6c57f8 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ "ovos_utils>=0.0.6", "json_database>=0.2.6", "pycryptodomex", - "upnpclient>=0.0.8"], + "HiveMind_presence~=0.0.2a2"], url='https://github.com/JarbasAl/hive_mind', license='MIT', author='jarbasAI',