Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/auth.py
469 views
1
#!/usr/bin/env python
2
import datetime
3
from typing import Any
4
from typing import List
5
from typing import Optional
6
from typing import Union
7
8
import jwt
9
10
11
# Credential types
12
PASSWORD = 'password'
13
JWT = 'jwt'
14
BROWSER_SSO = 'browser_sso'
15
16
# Single Sign-On URL
17
SSO_URL = 'https://portal.singlestore.com/engine-sso'
18
19
20
class JSONWebToken(object):
21
"""Container for JWT information."""
22
23
def __init__(
24
self, token: str, expires: datetime.datetime,
25
email: str, username: str, url: str = SSO_URL,
26
clusters: Optional[Union[str, List[str]]] = None,
27
databases: Optional[Union[str, List[str]]] = None,
28
timeout: int = 60,
29
):
30
self.token = token
31
self.expires = expires
32
self.email = email
33
self.username = username
34
self.model_version_number = 1
35
36
# Attributes needed for refreshing tokens
37
self.url = url
38
self.clusters = clusters
39
self.databases = databases
40
self.timeout = timeout
41
42
@classmethod
43
def from_token(cls, token: bytes, verify_signature: bool = False) -> 'JSONWebToken':
44
"""Validate the contents of the JWT."""
45
info = jwt.decode(token, options={'verify_signature': verify_signature})
46
47
if not info.get('sub', None) and not info.get('username', None):
48
raise ValueError("Missing 'sub' and 'username' in claims")
49
if not info.get('email', None):
50
raise ValueError("Missing 'email' in claims")
51
if not info.get('exp', None):
52
raise ValueError("Missing 'exp' in claims")
53
try:
54
expires = datetime.datetime.fromtimestamp(info['exp'], datetime.timezone.utc)
55
except Exception as exc:
56
raise ValueError("Invalid 'exp' in claims: {}".format(str(exc)))
57
58
username = info.get('username', info.get('sub', None))
59
email = info['email']
60
61
return cls(token.decode('utf-8'), expires=expires, email=email, username=username)
62
63
def __str__(self) -> str:
64
return self.token
65
66
def __repr__(self) -> str:
67
return repr(self.token)
68
69
@property
70
def is_expired(self) -> bool:
71
"""Determine if the token has expired."""
72
return self.expires >= datetime.datetime.now()
73
74
def refresh(self, force: bool = False) -> bool:
75
"""
76
Refresh the token as needed.
77
78
Parameters
79
----------
80
force : bool, optional
81
Should a new token be generated even if the existing
82
one has not expired yet?
83
84
Returns
85
-------
86
bool : Indicating whether the token was refreshed or not
87
88
"""
89
if force or self.is_expired:
90
out = get_jwt(
91
self.email, url=self.url, clusters=self.clusters,
92
databases=self.databases, timeout=self.timeout,
93
)
94
self.token = out.token
95
self.expires = out.expires
96
return True
97
return False
98
99
100
def _listify(s: Optional[Union[str, List[str]]]) -> Optional[str]:
101
"""Return a list of strings in a comma-separated string."""
102
if s is None:
103
return None
104
if not isinstance(s, str):
105
return ','.join(s)
106
return s
107
108
109
def get_jwt(
110
email: str, url: str = SSO_URL,
111
clusters: Optional[Union[str, List[str]]] = None,
112
databases: Optional[Union[str, List[str]]] = None,
113
timeout: int = 60, browser: Optional[Union[str, List[str]]] = None,
114
) -> JSONWebToken:
115
"""
116
Retrieve a JWT token from the SingleStoreDB single-sign-on URL.
117
118
Parameters
119
----------
120
email : str
121
EMail of the database user
122
url : str, optional
123
The URL of the single-sign-on token generator
124
clusters : str or list[str], optional
125
The name of the cluster being connected to
126
databases : str or list[str], optional
127
The name of the database being connected to
128
timeout : int, optional
129
Number of seconds to wait before timing out the authentication request
130
browser : str or list[str], optional
131
Browser to use instead of the default. This value can be any of the
132
names specified in Python's `webbrowser` module. This includes
133
'google-chrome', 'chrome', 'chromium', 'chromium-browser', 'firefox',
134
etc. Note that at the time of this writing, Safari was not
135
compatible. If a list of names is specified, each one tried until
136
a working browser is located.
137
138
Returns
139
-------
140
JSONWebToken
141
142
"""
143
import platform
144
import webbrowser
145
import time
146
import threading
147
import urllib
148
from http.server import BaseHTTPRequestHandler, HTTPServer
149
150
from .config import get_option
151
152
token = []
153
error = []
154
155
class AuthServer(BaseHTTPRequestHandler):
156
157
def log_message(self, format: str, *args: Any) -> None:
158
return
159
160
def do_POST(self) -> None:
161
content_len = int(self.headers.get('Content-Length', 0))
162
post_body = self.rfile.read(content_len)
163
164
try:
165
out = JSONWebToken.from_token(post_body)
166
except Exception as exc:
167
self.send_response(400, exc.args[0])
168
self.send_header('Content-Type', 'text/plain')
169
self.end_headers()
170
error.append(exc)
171
return
172
173
token.append(out)
174
175
self.send_response(204)
176
self.send_header('Access-Control-Allow-Origin', '*')
177
self.send_header('Content-Type', 'text/plain')
178
self.end_headers()
179
180
server = None
181
182
try:
183
server = HTTPServer(('127.0.0.1', 0), AuthServer)
184
threading.Thread(target=server.serve_forever).start()
185
186
host = server.server_address[0]
187
if isinstance(host, bytes):
188
host = host.decode('utf-8')
189
190
query = urllib.parse.urlencode({
191
k: v for k, v in dict(
192
email=email,
193
returnTo=f'http://{host}:{server.server_address[1]}',
194
db=_listify(databases),
195
cluster=_listify(clusters),
196
).items() if v is not None
197
})
198
199
if browser is None:
200
browser = get_option('sso_browser')
201
202
# On Mac, always specify a list of browsers to check because Safari
203
# is not compatible.
204
if browser is None and platform.platform().lower().startswith('mac'):
205
browser = [
206
'chrome', 'google-chrome', 'chromium',
207
'chromium-browser', 'firefox',
208
]
209
210
if browser and isinstance(browser, str):
211
browser = [browser]
212
213
if browser:
214
exc: Optional[Exception] = None
215
for item in browser:
216
try:
217
webbrowser.get(item).open(f'{url}?{query}')
218
break
219
except webbrowser.Error as wexc:
220
exc = wexc
221
pass
222
if exc is not None:
223
raise RuntimeError(
224
'Could not find compatible web browser for accessing JWT',
225
)
226
else:
227
webbrowser.open(f'{url}?{query}')
228
229
for i in range(timeout * 2):
230
if error:
231
raise error[0]
232
if token:
233
out = token[0]
234
out.url = url
235
out.clusters = clusters
236
out.databases = databases
237
out.timeout = timeout
238
return out
239
time.sleep(0.5)
240
241
finally:
242
if server is not None:
243
server.shutdown()
244
245
raise RuntimeError('Timeout waiting for token')
246
247