Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

start implementation of UPnP-gw-DeviceProtection-V1 #35

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,64 @@ device.Layer3Forwarding1.GetDefaultConnectionService(

If you've set either at `Device` level, they can be overridden per-call by
setting them to `None`.


#### HTTPS Certificate

UPnP DeviceProtection:1 Standardized secured SSL connection to Devices (server):
[UPnP-gw-DeviceProtection-V1-Service](http://upnp.org/specs/gw/UPnP-gw-DeviceProtection-V1-Service.pdf)

From §1.1.2: `Devices and Control Points will generate their own CA certificates`.

This means two things:
- your control-point (client) must accept Device (server) certificate, which might not be signed by trusted autorithy - eg self-signed.
- your control-point (client) must provide a certificate to the Device (server), wich also can be self-signed.

In order to do that, two paramters have been added to kwargs:
- `AllowSelfSignedSSL`: a boolean allowing upnpclient to connect to not-trusted devices
- `cert`: to allow user-provided certificate to be used for connection

```python
mycert = ("C:\\fooo.crt", "C:\\fooo.key")
device = upnpclient.Device(
"https://192.168.1.1:5000/rootDesc.xml",
AllowSelfSignedSSL = True,
cert = mycert,
)
```

Or

```python
devices = upnpclient.discover(AllowSelfSignedSSL=True,AllowSelfSignedSSL = True,cert = mycert)
```

Note: At the moment, upnpclient will not try to access the SSL URL in discover mode (described in §2.3.1 as `SECURELOCATION.UPNP.ORG` header extension)


#### Custom SSDP inbound port

SSDP protocol is not well supported by firewalls (like netfilter/conntrack) so if you run this control-point client on a critical device, you may have problems setting filter rules.

Main problem is the defaut SSDP behavior which use random inbound UDP port to receive SSDP responses.

To address that problem, we add a workaround option that let you fix this udp input port:

```python
device = upnpclient.Device(
"https://192.168.1.1:5000/rootDesc.xml",
SSDPInPort=20000
)
```

Or

```python
devices = upnpclient.discover(AllowSelfSignedSSL=True,SSDPInPort=30000)
```

Then you can allow this path in your firewall configuration.

Example for iptables:

```iptables -A INPUT [-i <<control-point input interface>>] -d <<control-point ip address>> -p udp --dport <<control-point fixed ssdp input port>> -j ACCEPT```
15 changes: 12 additions & 3 deletions upnpclient/soap.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,23 @@ class SOAP(object):
This class defines a simple SOAP client.
"""

def __init__(self, url, service_type):
def __init__(self,action, url, service_type,**kwargs):
self.action = action
self.url = url
self.service_type = service_type
# FIXME: Use urlparse for this:
self._host = self.url.split("//", 1)[1].split("/", 1)[
0
] # Get hostname portion of url
self._log = _getLogger("SOAP")

self.ClientCert = None
if "ClientCert" in kwargs:
self.ClientCert = kwargs["ClientCert"]

self.AllowSelfSignedSSL = False
if "AllowSelfSignedSSL" in kwargs:
self.AllowSelfSignedSSL = kwargs["AllowSelfSignedSSL"]

def _extract_upnperror(self, err_xml):
"""
Expand Down Expand Up @@ -104,8 +113,8 @@ def call(self, action_name, arg_in=None, http_auth=None, http_headers=None):
headers.update(http_headers or {})

try:
resp = requests.post(
self.url, body, headers=headers, timeout=SOAP_TIMEOUT, auth=http_auth
resp = self.action.service.device.session.post(
self.url, body, headers=headers, timeout=SOAP_TIMEOUT, auth=http_auth,cert=self.ClientCert, verify=not(self.AllowSelfSignedSSL)
)
resp.raise_for_status()
except requests.exceptions.HTTPError as exc:
Expand Down
16 changes: 10 additions & 6 deletions upnpclient/ssdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,21 @@ def ssdp_request(ssdp_st, ssdp_mx=SSDP_MX):
).encode("utf-8")


def scan(timeout=5):
def scan(timeout=5,**kwargs):
urls = []
sockets = []
ssdp_requests = [ssdp_request(ST_ALL), ssdp_request(ST_ROOTDEVICE)]
stop_wait = datetime.now() + timedelta(seconds=timeout)

SSDPInPort = 0
if "SSDPInPort" in kwargs:
SSDPInPort = kwargs["SSDPInPort"]

for addr in get_addresses_ipv4():
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, SSDP_MX)
sock.bind((addr, 0))
sock.bind((addr, SSDPInPort))
sockets.append(sock)
except socket.error:
pass
Expand Down Expand Up @@ -109,18 +113,18 @@ def get_addresses_ipv4():
)


def discover(timeout=5):
def discover(timeout=5,**kwargs):
"""
Convenience method to discover UPnP devices on the network. Returns a
list of `upnp.Device` instances. Any invalid servers are silently
ignored.
"""
"""
devices = {}
for entry in scan(timeout):
for entry in scan(timeout,**kwargs):
if entry.location in devices:
continue
try:
devices[entry.location] = Device(entry.location)
devices[entry.location] = Device(entry.location,**kwargs)
except Exception as exc:
log = _getLogger("ssdp")
log.error("Error '%s' for %s", exc, entry)
Expand Down
125 changes: 109 additions & 16 deletions upnpclient/upnp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections import OrderedDict

import six
import requests
from requests import Session
from requests.compat import urljoin, urlparse
from dateutil.parser import parse as parse_date
from lxml import etree
Expand All @@ -18,6 +18,83 @@
from .marshal import marshal_value




"""
Subclassing HTTP / requests to get peer_certificate back from lower levels
"""
from typing import Optional, Mapping, Any
from http.client import HTTPSConnection
from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK
from urllib3.poolmanager import PoolManager,key_fn_by_scheme
from urllib3.connectionpool import HTTPSConnectionPool,HTTPConnectionPool
from urllib3.connection import HTTPSConnection,HTTPConnection
from urllib3.response import HTTPResponse as URLLIB3_HTTPResponse

#force urllib3 to use pyopenssl
import urllib3.contrib.pyopenssl
urllib3.contrib.pyopenssl.inject_into_urllib3()

class HTTPSConnection_withcert(HTTPSConnection):
def __init__(self, *args, **kw):
self.peer_certificate = None
super().__init__(*args, **kw)
def connect(self):
res = super().connect()
self.peer_certificate = self.sock.connection.get_peer_certificate()
return res

class HTTPResponse_withcert(URLLIB3_HTTPResponse):
def __init__(self, *args, **kwargs):
self.peer_certificate = None
res = super().__init__( *args, **kwargs)
self.peer_certificate = self._connection.peer_certificate
return res

class HTTPSConnectionPool_withcert(HTTPSConnectionPool):
ConnectionCls = HTTPSConnection_withcert
ResponseCls = HTTPResponse_withcert

class PoolManager_withcert(PoolManager):
def __init__(
self,
num_pools: int = 10,
headers: Optional[Mapping[str, str]] = None,
**connection_pool_kw: Any,
) -> None:
super().__init__(num_pools,headers,**connection_pool_kw)
self.pool_classes_by_scheme = {"http": HTTPConnectionPool, "https": HTTPSConnectionPool_withcert}
self.key_fn_by_scheme = key_fn_by_scheme.copy()

class HTTPAdapter_withcert(HTTPAdapter):
_clsHTTPResponse = HTTPResponse_withcert
def build_response(self, request, resp):
response = super().build_response( request, resp)
response.peer_certificate = resp.peer_certificate
return response

def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs):
#do not call super() to not initialize PoolManager twice
# save these values for pickling
self._pool_connections = connections
self._pool_maxsize = maxsize
self._pool_block = block

self.poolmanager = PoolManager_withcert(num_pools=connections,
maxsize=maxsize,
block=block,
strict=True,
**pool_kwargs)

class Session_withcert(Session):
def __init__(self):
super().__init__()
self.mount('https://', HTTPAdapter_withcert())





class UPNPError(Exception):
"""
Exception class for UPnP errors.
Expand Down Expand Up @@ -86,14 +163,14 @@ class Device(CallActionMixin):
urn:upnp-org:serviceId:wandsllc:pvc_Internet
urn:upnp-org:serviceId:wanipc:Internet
"""

def __init__(
self,
location,
device_name=None,
ignore_urlbase=False,
http_auth=None,
http_headers=None,
**kwargs
):
"""
Create a new Device instance. `location` is an URL to an XML file
Expand All @@ -107,10 +184,23 @@ def __init__(

self.http_auth = http_auth
self.http_headers = http_headers

resp = requests.get(
location, timeout=HTTP_TIMEOUT, auth=self.http_auth, headers=self.http_headers

self.ClientCert = None
if "ClientCert" in kwargs:
self.ClientCert = kwargs["ClientCert"]

self.AllowSelfSignedSSL = False
if "AllowSelfSignedSSL" in kwargs:
self.AllowSelfSignedSSL = kwargs["AllowSelfSignedSSL"]
print(self.ClientCert)

self.session = Session_withcert()
resp = self.session.get(
location, timeout=HTTP_TIMEOUT, auth=self.http_auth, headers=self.http_headers,cert=self.ClientCert, verify=not(self.AllowSelfSignedSSL)
)

print(resp.peer_certificate.get_subject())

resp.raise_for_status()

root = etree.fromstring(resp.content)
Expand All @@ -136,7 +226,8 @@ def __init__(
self._find = partial(root.find, namespaces=root.nsmap)
self._findall = partial(root.findall, namespaces=root.nsmap)
self._read_services()

print(self._url_base)
pass
def __repr__(self):
return "<Device '%s'>" % (self.friendly_name)

Expand Down Expand Up @@ -191,7 +282,7 @@ def _read_services(self):
)
self.services.append(svc)
self.service_map[svc.name] = svc

pass
def find_action(self, action_name):
"""Find an action by name.
Convenience method that searches through all the services offered by
Expand Down Expand Up @@ -239,14 +330,16 @@ def __init__(
self._log.debug("%s SCPDURL: %s", self.service_id, self.scpd_url)
self._log.debug("%s controlURL: %s", self.service_id, self._control_url)
self._log.debug("%s eventSubURL: %s", self.service_id, self._event_sub_url)

print(self._url_base)
url = urljoin(self._url_base, self.scpd_url)
self._log.debug("Reading %s", url)
resp = requests.get(
resp = self.device.session.get(
url,
timeout=HTTP_TIMEOUT,
auth=self.device.http_auth,
headers=self.device.http_headers,
cert=self.device.ClientCert,
verify=not(self.device.AllowSelfSignedSSL)
)
resp.raise_for_status()
self.scpd_xml = etree.fromstring(resp.content)
Expand Down Expand Up @@ -398,8 +491,8 @@ def subscribe(self, callback_url, timeout=None):
)
if timeout is not None:
headers["TIMEOUT"] = "Second-%s" % timeout
resp = requests.request(
"SUBSCRIBE", url, headers=headers, auth=self.device.http_auth
resp = self.device.session.post(
"SUBSCRIBE", url, headers=headers, auth=self.device.http_auth,cert=self.ClientCert, verify=not(self.device.AllowSelfSignedSSL)
)
resp.raise_for_status()
return Service.validate_subscription_response(resp)
Expand All @@ -412,8 +505,8 @@ def renew_subscription(self, sid, timeout=None):
headers = dict(HOST=urlparse(url).netloc, SID=sid)
if timeout is not None:
headers["TIMEOUT"] = "Second-%s" % timeout
resp = requests.request(
"SUBSCRIBE", url, headers=headers, auth=self.device.http_auth
resp = self.device.session.post(
"SUBSCRIBE", url, headers=headers, auth=self.device.http_auth,cert=self.ClientCert, verify=not(self.device.AllowSelfSignedSSL)
)
resp.raise_for_status()
return Service.validate_subscription_renewal_response(resp)
Expand All @@ -424,8 +517,8 @@ def cancel_subscription(self, sid):
"""
url = urljoin(self._url_base, self._event_sub_url)
headers = dict(HOST=urlparse(url).netloc, SID=sid)
resp = requests.request(
"UNSUBSCRIBE", url, headers=headers, auth=self.device.http_auth
resp = self.device.session.post(
"UNSUBSCRIBE", url, headers=headers, auth=self.device.http_auth,cert=self.ClientCert, verify=not(self.device.AllowSelfSignedSSL)
)
resp.raise_for_status()

Expand Down Expand Up @@ -468,7 +561,7 @@ def __call__(self, http_auth=None, http_headers=None, **kwargs):

# Make the actual call
self._log.debug(">> %s (%s)", self.name, call_kwargs)
soap_client = SOAP(self.url, self.service_type)
soap_client = SOAP(self,self.url, self.service_type,AllowSelfSignedSSL=self.service.device.AllowSelfSignedSSL,ClientCert=self.service.device.ClientCert)

soap_response = soap_client.call(
self.name,
Expand Down