-
Notifications
You must be signed in to change notification settings - Fork 43
/
authentication.py
144 lines (127 loc) · 6.24 KB
/
authentication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import pickle
import os.path
import glob
from typing import List
from googleapiclient import discovery
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.auth.credentials import Credentials
class AuthenticatedService:
"""Handles authentication of the `GoogleCalendar`"""
_READ_WRITE_SCOPES = 'https://www.googleapis.com/auth/calendar'
_LIST_ORDERS = ("startTime", "updated")
def __init__(
self,
*,
credentials: Credentials = None,
credentials_path: str = None,
token_path: str = None,
save_token: bool = True,
read_only: bool = False,
authentication_flow_host: str = 'localhost',
authentication_flow_port: int = 8080,
authentication_flow_bind_addr: str = None
):
"""
Specify ``credentials`` to use in requests or ``credentials_path`` and ``token_path`` to get credentials from.
:param credentials:
Credentials with token and refresh token.
If specified, ``credentials_path``, ``token_path``, and ``save_token`` are ignored.
If not specified, credentials are retrieved from "token.pickle" file (specified in ``token_path`` or
default path) or with authentication flow using secret from "credentials.json" ("client_secret_*.json")
(specified in ``credentials_path`` or default path)
:param credentials_path:
Path to "credentials.json" ("client_secret_*.json") file.
Default: ~/.credentials/credentials.json or ~/.credentials/client_secret*.json
:param token_path:
Existing path to load the token from, or path to save the token after initial authentication flow.
Default: "token.pickle" in the same directory as the credentials_path
:param save_token:
Whether to pickle token after authentication flow for future uses
:param read_only:
If require read only access. Default: False
:param authentication_flow_host:
Host to receive response during authentication flow
:param authentication_flow_port:
Port to receive response during authentication flow
:param authentication_flow_bind_addr:
Optional IP address for the redirect server to listen on when it is not the same as host
(e.g. in a container)
"""
if credentials:
self.credentials = self._ensure_refreshed(credentials)
else:
credentials_path = credentials_path or self._get_default_credentials_path()
credentials_dir, credentials_file = os.path.split(credentials_path)
token_path = token_path or os.path.join(credentials_dir, 'token.pickle')
scopes = [self._READ_WRITE_SCOPES + ('.readonly' if read_only else '')]
self.credentials = self._get_credentials(
token_path,
credentials_dir,
credentials_file,
scopes,
save_token,
authentication_flow_host,
authentication_flow_port,
authentication_flow_bind_addr
)
self.service = discovery.build('calendar', 'v3', credentials=self.credentials)
@staticmethod
def _ensure_refreshed(
credentials: Credentials
) -> Credentials:
if not credentials.valid and credentials.expired:
credentials.refresh(Request())
return credentials
@staticmethod
def _get_credentials(
token_path: str,
credentials_dir: str,
credentials_file: str,
scopes: List[str],
save_token: bool,
host: str,
port: int,
bind_addr: str
) -> Credentials:
credentials = None
if os.path.exists(token_path):
with open(token_path, 'rb') as token_file:
credentials = pickle.load(token_file)
if not credentials or not credentials.valid:
if credentials and credentials.expired and credentials.refresh_token:
credentials.refresh(Request())
else:
credentials_path = os.path.join(credentials_dir, credentials_file)
flow = InstalledAppFlow.from_client_secrets_file(credentials_path, scopes)
credentials = flow.run_local_server(host=host, port=port, bind_addr=bind_addr)
if save_token:
with open(token_path, 'wb') as token_file:
pickle.dump(credentials, token_file)
return credentials
@staticmethod
def _get_default_credentials_path() -> str:
"""Checks if `.credentials` folder in home directory exists and contains `credentials.json` or
`client_secret*.json` file.
:raises ValueError: if `.credentials` folder does not exist, none of `credentials.json` or `client_secret*.json`
files do not exist, or there are multiple `client_secret*.json` files.
:return: expanded path to `credentials.json` or `client_secret*.json` file
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
raise FileNotFoundError(f'Default credentials directory "{credential_dir}" does not exist.')
credential_path = os.path.join(credential_dir, 'credentials.json')
if os.path.exists(credential_path):
return credential_path
else:
credentials_files = glob.glob(credential_dir + '/client_secret*.json')
if len(credentials_files) > 1:
raise ValueError(f"Multiple credential files found in {credential_dir}.\n"
f"Try specifying the credentials file, e.x.:\n"
f"GoogleCalendar(credentials_path='{credentials_files[0]}')")
elif not credentials_files:
raise FileNotFoundError(f'Credentials file (credentials.json or client_secret*.json)'
f'not found in the default path: "{credential_dir}".')
else:
return credentials_files[0]