forked from tehkillerbee/mopidy-tidal
-
Notifications
You must be signed in to change notification settings - Fork 0
/
auth_http_server.py
executable file
·112 lines (94 loc) · 3.66 KB
/
auth_http_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import threading
from functools import partial
from string import whitespace
from mopidy_tidal.session import PersistentSession
try:
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from urllib import unquote
except ImportError:
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import unquote
HTML_BODY = """<!DOCTYPE html>
<html>
<head>
<title>TIDAL OAuth Login</title>
</head>
<body>
<h1>KEEP THIS TAB OPEN</h1>
<a href={authurl} target="_blank" rel="noopener noreferrer">Click here to be forwarded to TIDAL Login page</a>
{interactive}
</body>
</html>
""".format
INTERACTIVE_HTML_BODY = """
<p>...then, after login, copy the whole final URL of the page you ended up to.</p>
<p>Probably a "not found" page, nevertheless we need the whole URL as is.</p>
<form method="post">
<label for="code">Paste here your final URL location:</label>
<input type="url" id="code" name="code">
<input type="submit" value="Submit">
</form>
"""
def start_oauth_deamon(session, port, login_result):
handler = partial(HTTPHandler, session, login_result)
daemon = threading.Thread(
name="TidalOAuthLogin",
target=HTTPServer(('', port), handler).serve_forever
)
daemon.daemon = True # Set as a daemon so it will be killed once the main thread is dead.
daemon.start()
class HTTPHandler(BaseHTTPRequestHandler, object):
def __init__(self, session: PersistentSession, login_result_holder, *args, **kwargs):
self.login_handler = LoginHandler(session, login_result_holder)
super().__init__(*args, **kwargs)
def do_GET(self):
self.send_response(200)
self.end_headers()
interactive = INTERACTIVE_HTML_BODY if self.login_handler.is_pkce else ''
self.wfile.write(HTML_BODY(authurl=self.login_handler.get_login_url(), interactive=interactive).encode())
def do_POST(self):
content_length = int(self.headers.get("Content-Length"), 0)
body = self.rfile.read(content_length).decode()
try:
form = {k: v for k, v in (p.split("=", 1) for p in body.split("&"))}
code_url = unquote(form['code'].strip(whitespace))
except:
self.send_response(400)
self.end_headers()
self.wfile.write(b"Malformed request")
raise
else:
try:
self.login_handler.set_login_result(code_url)
self.send_response(200)
self.end_headers()
self.wfile.write(b"Success!\nCredentials auto-refresh is on.\nEnjoy your music!")
except:
self.send_response(401)
self.end_headers()
self.wfile.write(b"Failed to get final key! :(")
raise
class LoginHandler:
def __init__(self, session: PersistentSession, login_result_holder):
self._session = session
self._login_result_holder = login_result_holder
self.is_pkce = session.is_pkce
def _login_oauth(self):
login, future = self._session.login_oauth()
future.add_done_callback(lambda f: self.set_login_result(f.exception()))
return login.verification_uri_complete
def _login_pkce(self):
return self._session.pkce_login_url()
def get_login_url(self):
if self.is_pkce:
return self._login_pkce()
else:
return self._login_oauth()
def set_login_result(self, data):
if self.is_pkce:
try:
self._session.process_auth_token(self._session.pkce_get_auth_token(data))
data = None
except Exception as e:
data = e
self._login_result_holder.put(data)