Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/mysql/_auth.py
469 views
1
# type: ignore
2
"""
3
Implements auth methods
4
"""
5
from .err import OperationalError
6
7
try:
8
from cryptography.hazmat.backends import default_backend
9
from cryptography.hazmat.primitives import serialization, hashes
10
from cryptography.hazmat.primitives.asymmetric import padding
11
12
_have_cryptography = True
13
except ImportError:
14
_have_cryptography = False
15
16
from functools import partial
17
import hashlib
18
19
from ..config import get_option
20
21
22
DEBUG = get_option('debug.connection')
23
SCRAMBLE_LENGTH = 20
24
sha1_new = partial(hashlib.new, 'sha1')
25
26
27
# mysql_native_password
28
# https://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
29
30
31
def scramble_native_password(password, message):
32
"""Scramble used for mysql_native_password."""
33
if not password:
34
return b''
35
36
stage1 = sha1_new(password).digest()
37
stage2 = sha1_new(stage1).digest()
38
s = sha1_new()
39
s.update(message[:SCRAMBLE_LENGTH])
40
s.update(stage2)
41
result = s.digest()
42
return _my_crypt(result, stage1)
43
44
45
def _my_crypt(message1, message2):
46
result = bytearray(message1)
47
48
for i in range(len(result)):
49
result[i] ^= message2[i]
50
51
return bytes(result)
52
53
54
# MariaDB's client_ed25519-plugin
55
# https://mariadb.com/kb/en/library/connection/#client_ed25519-plugin
56
57
_nacl_bindings = False
58
59
60
def _init_nacl():
61
global _nacl_bindings
62
try:
63
from nacl import bindings
64
65
_nacl_bindings = bindings
66
except ImportError:
67
raise RuntimeError(
68
"'pynacl' package is required for ed25519_password auth method",
69
)
70
71
72
def _scalar_clamp(s32):
73
ba = bytearray(s32)
74
ba0 = bytes(bytearray([ba[0] & 248]))
75
ba31 = bytes(bytearray([(ba[31] & 127) | 64]))
76
return ba0 + bytes(s32[1:31]) + ba31
77
78
79
def ed25519_password(password, scramble):
80
"""
81
Sign a random scramble with elliptic curve Ed25519.
82
83
Secret and public key are derived from password.
84
85
"""
86
# variable names based on rfc8032 section-5.1.6
87
#
88
if not _nacl_bindings:
89
_init_nacl()
90
91
# h = SHA512(password)
92
h = hashlib.sha512(password).digest()
93
94
# s = prune(first_half(h))
95
s = _scalar_clamp(h[:32])
96
97
# r = SHA512(second_half(h) || M)
98
r = hashlib.sha512(h[32:] + scramble).digest()
99
100
# R = encoded point [r]B
101
r = _nacl_bindings.crypto_core_ed25519_scalar_reduce(r)
102
R = _nacl_bindings.crypto_scalarmult_ed25519_base_noclamp(r)
103
104
# A = encoded point [s]B
105
A = _nacl_bindings.crypto_scalarmult_ed25519_base_noclamp(s)
106
107
# k = SHA512(R || A || M)
108
k = hashlib.sha512(R + A + scramble).digest()
109
110
# S = (k * s + r) mod L
111
k = _nacl_bindings.crypto_core_ed25519_scalar_reduce(k)
112
ks = _nacl_bindings.crypto_core_ed25519_scalar_mul(k, s)
113
S = _nacl_bindings.crypto_core_ed25519_scalar_add(ks, r)
114
115
# signature = R || S
116
return R + S
117
118
119
# sha256_password
120
121
122
def _roundtrip(conn, send_data):
123
conn.write_packet(send_data)
124
pkt = conn._read_packet()
125
pkt.check_error()
126
return pkt
127
128
129
def _xor_password(password, salt):
130
# Trailing NUL character will be added in Auth Switch Request.
131
# See https://github.com/mysql/mysql-server/blob/
132
# 7d10c82196c8e45554f27c00681474a9fb86d137/sql/auth/sha2_password.cc#L939-L945
133
salt = salt[:SCRAMBLE_LENGTH]
134
password_bytes = bytearray(password)
135
# salt = bytearray(salt) # for PY2 compat.
136
salt_len = len(salt)
137
for i in range(len(password_bytes)):
138
password_bytes[i] ^= salt[i % salt_len]
139
return bytes(password_bytes)
140
141
142
def sha2_rsa_encrypt(password, salt, public_key):
143
"""
144
Encrypt password with salt and public_key.
145
146
Used for sha256_password and caching_sha2_password.
147
148
"""
149
if not _have_cryptography:
150
raise RuntimeError(
151
"'cryptography' package is required for sha256_password or "
152
'caching_sha2_password auth methods',
153
)
154
message = _xor_password(password + b'\0', salt)
155
rsa_key = serialization.load_pem_public_key(public_key, default_backend())
156
return rsa_key.encrypt(
157
message,
158
padding.OAEP(
159
mgf=padding.MGF1(algorithm=hashes.SHA1()),
160
algorithm=hashes.SHA1(),
161
label=None,
162
),
163
)
164
165
166
def sha256_password_auth(conn, pkt):
167
if conn._secure:
168
if DEBUG:
169
print('sha256: Sending plain password')
170
data = conn.password + b'\0'
171
return _roundtrip(conn, data)
172
173
if pkt.is_auth_switch_request():
174
conn.salt = pkt.read_all()
175
if not conn.server_public_key and conn.password:
176
# Request server public key
177
if DEBUG:
178
print('sha256: Requesting server public key')
179
pkt = _roundtrip(conn, b'\1')
180
181
if pkt.is_extra_auth_data():
182
conn.server_public_key = pkt._data[1:]
183
if DEBUG:
184
print('Received public key:\n', conn.server_public_key.decode('ascii'))
185
186
if conn.password:
187
if not conn.server_public_key:
188
raise OperationalError("Couldn't receive server's public key")
189
190
data = sha2_rsa_encrypt(conn.password, conn.salt, conn.server_public_key)
191
else:
192
data = b''
193
194
return _roundtrip(conn, data)
195
196
197
def scramble_caching_sha2(password, nonce):
198
# (bytes, bytes) -> bytes
199
"""
200
Scramble algorithm used in cached_sha2_password fast path.
201
202
XOR(SHA256(password), SHA256(SHA256(SHA256(password)), nonce))
203
204
"""
205
if not password:
206
return b''
207
208
p1 = hashlib.sha256(password).digest()
209
p2 = hashlib.sha256(p1).digest()
210
p3 = hashlib.sha256(p2 + nonce).digest()
211
212
res = bytearray(p1)
213
for i in range(len(p3)):
214
res[i] ^= p3[i]
215
216
return bytes(res)
217
218
219
def caching_sha2_password_auth(conn, pkt):
220
# No password fast path
221
if not conn.password:
222
return _roundtrip(conn, b'')
223
224
if pkt.is_auth_switch_request():
225
# Try from fast auth
226
if DEBUG:
227
print('caching sha2: Trying fast path')
228
conn.salt = pkt.read_all()
229
scrambled = scramble_caching_sha2(conn.password, conn.salt)
230
pkt = _roundtrip(conn, scrambled)
231
# else: fast auth is tried in initial handshake
232
233
if not pkt.is_extra_auth_data():
234
raise OperationalError(
235
'caching sha2: Unknown packet for fast auth: %s' % pkt._data[:1],
236
)
237
238
# magic numbers:
239
# 2 - request public key
240
# 3 - fast auth succeeded
241
# 4 - need full auth
242
243
pkt.advance(1)
244
n = pkt.read_uint8()
245
246
if n == 3:
247
if DEBUG:
248
print('caching sha2: succeeded by fast path.')
249
pkt = conn._read_packet()
250
pkt.check_error() # pkt must be OK packet
251
return pkt
252
253
if n != 4:
254
raise OperationalError('caching sha2: Unknwon result for fast auth: %s' % n)
255
256
if DEBUG:
257
print('caching sha2: Trying full auth...')
258
259
if conn._secure:
260
if DEBUG:
261
print('caching sha2: Sending plain password via secure connection')
262
return _roundtrip(conn, conn.password + b'\0')
263
264
if not conn.server_public_key:
265
pkt = _roundtrip(conn, b'\x02') # Request public key
266
if not pkt.is_extra_auth_data():
267
raise OperationalError(
268
'caching sha2: Unknown packet for public key: %s' % pkt._data[:1],
269
)
270
271
conn.server_public_key = pkt._data[1:]
272
if DEBUG:
273
print(conn.server_public_key.decode('ascii'))
274
275
data = sha2_rsa_encrypt(conn.password, conn.salt, conn.server_public_key)
276
pkt = _roundtrip(conn, data)
277
278
279
def gssapi_auth(data):
280
try:
281
import gssapi.raw
282
except ImportError:
283
raise ImportError(
284
'The `gssapi` package must be '
285
'installed for Kerberos authentication',
286
)
287
288
ctx = None
289
try:
290
ctx = gssapi.raw.init_sec_context(
291
gssapi.raw.import_name(data, gssapi.raw.NameType.user),
292
flags=[0], lifetime=0,
293
)
294
return ctx.token
295
296
finally:
297
if ctx is not None:
298
gssapi.raw.delete_sec_context(ctx.context)
299
300