Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/mysql/tests/test_connection.py
469 views
1
# type: ignore
2
import datetime
3
import ssl
4
import time
5
from unittest import mock
6
7
import pytest
8
9
import singlestoredb.mysql as sv
10
from singlestoredb.mysql.constants import CLIENT
11
from singlestoredb.mysql.tests import base
12
13
14
class TempUser:
15
16
def __init__(self, c, user, db, auth=None, authdata=None, password=None):
17
self._c = c
18
self._user = user
19
self._db = db
20
create = 'CREATE USER ' + user
21
if password is not None:
22
create += " IDENTIFIED BY '%s'" % password
23
elif auth is not None:
24
create += ' IDENTIFIED WITH %s' % auth
25
if authdata is not None:
26
create += " AS '%s'" % authdata
27
try:
28
c.execute(create)
29
self._created = True
30
except sv.err.InternalError:
31
# already exists - TODO need to check the same plugin applies
32
self._created = False
33
try:
34
c.execute('GRANT SELECT ON %s.* TO %s' % (db, user))
35
self._grant = True
36
except sv.err.InternalError:
37
self._grant = False
38
39
def __enter__(self):
40
return self
41
42
def __exit__(self, exc_type, exc_value, traceback):
43
if self._grant:
44
self._c.execute('REVOKE SELECT ON %s.* FROM %s' % (self._db, self._user))
45
if self._created:
46
self._c.execute('DROP USER %s' % self._user)
47
48
49
class TestAuthentication(base.PyMySQLTestCase):
50
51
socket_auth = False
52
socket_found = False
53
two_questions_found = False
54
three_attempts_found = False
55
pam_found = False
56
mysql_old_password_found = False
57
sha256_password_found = False
58
ed25519_found = False
59
60
import os
61
62
osuser = os.environ.get('USER')
63
64
# # socket auth requires the current user and for the connection to be a socket
65
# # rest do grants @localhost due to incomplete logic - TODO change to @% then
66
# db = base.PyMySQLTestCase.databases[0].copy()
67
68
# socket_auth = db.get('unix_socket') is not None and db.get('host') in (
69
# 'localhost',
70
# '127.0.0.1',
71
# )
72
73
# dbname = db['database']
74
75
# cur = sv.connect(**db).cursor()
76
# db.pop('user', None)
77
# cur.execute('SHOW PLUGINS')
78
# for r in cur:
79
# if (r[1], r[2]) != ('ACTIVE', 'AUTHENTICATION'):
80
# continue
81
# if r[3] == 'auth_socket.so' or r[0] == 'unix_socket':
82
# socket_plugin_name = r[0]
83
# socket_found = True
84
# elif r[3] == 'dialog_examples.so':
85
# if r[0] == 'two_questions':
86
# two_questions_found = True
87
# elif r[0] == 'three_attempts':
88
# three_attempts_found = True
89
# elif r[0] == 'pam':
90
# pam_found = True
91
# pam_plugin_name = r[3].split('.')[0]
92
# if pam_plugin_name == 'auth_pam':
93
# pam_plugin_name = 'pam'
94
# # MySQL: authentication_pam
95
# # https://dev.mysql.com/doc/refman/5.5/en/pam-authentication-plugin.html
96
97
# # MariaDB: pam
98
# # https://mariadb.com/kb/en/mariadb/pam-authentication-plugin/
99
100
# # Names differ but functionality is close
101
# elif r[0] == 'mysql_old_password':
102
# mysql_old_password_found = True
103
# elif r[0] == 'sha256_password':
104
# sha256_password_found = True
105
# elif r[0] == 'ed25519':
106
# ed25519_found = True
107
# # else:
108
# # print("plugin: %r" % r[0])
109
110
@pytest.mark.skip(reason='not currently supported in SingleStoreDB')
111
def test_plugin(self):
112
conn = self.connect()
113
cur = conn.cursor()
114
cur.execute(
115
"select plugin from mysql.user where concat(user, '@', host)=current_user()",
116
)
117
for r in cur:
118
self.assertIn(conn._auth_plugin_name, (r[0], 'mysql_native_password'))
119
120
@pytest.mark.skip(reason='not currently supported by SingleStoreDB')
121
@pytest.mark.skipif(not socket_auth, reason='connection to unix_socket required')
122
@pytest.mark.skipif(socket_found, reason='socket plugin already installed')
123
def testSocketAuthInstallPlugin(self):
124
# needs plugin. lets install it.
125
cur = self.connect().cursor()
126
try:
127
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
128
TestAuthentication.socket_found = True
129
self.socket_plugin_name = 'auth_socket'
130
self.realtestSocketAuth()
131
except sv.err.InternalError:
132
try:
133
cur.execute("install soname 'auth_socket'")
134
TestAuthentication.socket_found = True
135
self.socket_plugin_name = 'unix_socket'
136
self.realtestSocketAuth()
137
except sv.err.InternalError:
138
TestAuthentication.socket_found = False
139
pytest.skip("we couldn't install the socket plugin")
140
finally:
141
if TestAuthentication.socket_found:
142
cur.execute('uninstall plugin %s' % self.socket_plugin_name)
143
144
@pytest.mark.skip(reason='not currently supported by SingleStoreDB')
145
@pytest.mark.skipif(not socket_auth, reason='connection to unix_socket required')
146
@pytest.mark.skipif(not socket_found, reason='no socket plugin')
147
def testSocketAuth(self):
148
self.realtestSocketAuth()
149
150
def realtestSocketAuth(self):
151
with TempUser(
152
self.connect().cursor(),
153
TestAuthentication.osuser + '@localhost',
154
self.databases[0]['database'],
155
self.socket_plugin_name,
156
) as _:
157
sv.connect(user=TestAuthentication.osuser, **self.db)
158
159
class Dialog:
160
fail = False
161
162
def __init__(self, con):
163
self.fail = TestAuthentication.Dialog.fail
164
pass
165
166
def prompt(self, echo, prompt):
167
if self.fail:
168
self.fail = False
169
return b'bad guess at a password'
170
return self.m.get(prompt)
171
172
class DialogHandler:
173
def __init__(self, con):
174
self.con = con
175
176
def authenticate(self, pkt):
177
while True:
178
flag = pkt.read_uint8()
179
echo = (flag & 0x06) == 0x02 # noqa: F841
180
last = (flag & 0x01) == 0x01
181
prompt = pkt.read_all()
182
183
if prompt == b'Password, please:':
184
self.con.write_packet(b'stillnotverysecret\0')
185
else:
186
self.con.write_packet(b'no idea what to do with this prompt\0')
187
pkt = self.con._read_packet()
188
pkt.check_error()
189
if pkt.is_ok_packet() or last:
190
break
191
return pkt
192
193
class DefectiveHandler:
194
def __init__(self, con):
195
self.con = con
196
197
@pytest.mark.skip(reason='not currently supported by SingleStoreDB')
198
@pytest.mark.skipif(not socket_auth, reason='connection to unix_socket required')
199
@pytest.mark.skipif(
200
two_questions_found, reason='two_questions plugin already installed',
201
)
202
def testDialogAuthTwoQuestionsInstallPlugin(self):
203
# needs plugin. lets install it.
204
cur = self.connect().cursor()
205
try:
206
cur.execute("install plugin two_questions soname 'dialog_examples.so'")
207
TestAuthentication.two_questions_found = True
208
self.realTestDialogAuthTwoQuestions()
209
except sv.err.InternalError:
210
pytest.skip("we couldn't install the two_questions plugin")
211
finally:
212
if TestAuthentication.two_questions_found:
213
cur.execute('uninstall plugin two_questions')
214
215
@pytest.mark.skip(reason='not currently supported by SingleStoreDB')
216
@pytest.mark.skipif(not socket_auth, reason='connection to unix_socket required')
217
@pytest.mark.skipif(not two_questions_found, reason='no two questions auth plugin')
218
def testDialogAuthTwoQuestions(self):
219
self.realTestDialogAuthTwoQuestions()
220
221
def realTestDialogAuthTwoQuestions(self):
222
TestAuthentication.Dialog.fail = False
223
TestAuthentication.Dialog.m = {
224
b'Password, please:': b'notverysecret',
225
b'Are you sure ?': b'yes, of course',
226
}
227
with TempUser(
228
self.connect().cursor(),
229
'singlestoredb_2q@localhost',
230
self.databases[0]['database'],
231
'two_questions',
232
'notverysecret',
233
) as _:
234
with self.assertRaises(sv.err.OperationalError):
235
sv.connect(user='singlestoredb_2q', **self.db)
236
sv.connect(
237
user='singlestoredb_2q',
238
auth_plugin_map={b'dialog': TestAuthentication.Dialog},
239
**self.db,
240
)
241
242
@pytest.mark.skip(reason='not currently supported by SingleStoreDB')
243
@pytest.mark.skipif(not socket_auth, reason='connection to unix_socket required')
244
@pytest.mark.skipif(
245
three_attempts_found, reason='three_attempts plugin already installed',
246
)
247
def testDialogAuthThreeAttemptsQuestionsInstallPlugin(self):
248
# needs plugin. lets install it.
249
cur = self.connect().cursor()
250
try:
251
cur.execute("install plugin three_attempts soname 'dialog_examples.so'")
252
TestAuthentication.three_attempts_found = True
253
self.realTestDialogAuthThreeAttempts()
254
except sv.err.InternalError:
255
pytest.skip("we couldn't install the three_attempts plugin")
256
finally:
257
if TestAuthentication.three_attempts_found:
258
cur.execute('uninstall plugin three_attempts')
259
260
@pytest.mark.skip(reason='not currently supported by SingleStoreDB')
261
@pytest.mark.skipif(not socket_auth, reason='connection to unix_socket required')
262
@pytest.mark.skipif(not three_attempts_found, reason='no three attempts plugin')
263
def testDialogAuthThreeAttempts(self):
264
self.realTestDialogAuthThreeAttempts()
265
266
def realTestDialogAuthThreeAttempts(self):
267
TestAuthentication.Dialog.m = {b'Password, please:': b'stillnotverysecret'}
268
TestAuthentication.Dialog.fail = (
269
True # fail just once. We've got three attempts after all
270
)
271
with TempUser(
272
self.connect().cursor(),
273
'singlestoredb_3a@localhost',
274
self.databases[0]['database'],
275
'three_attempts',
276
'stillnotverysecret',
277
) as _:
278
sv.connect(
279
user='singlestoredb_3a',
280
auth_plugin_map={b'dialog': TestAuthentication.Dialog},
281
**self.db,
282
)
283
sv.connect(
284
user='singlestoredb_3a',
285
auth_plugin_map={b'dialog': TestAuthentication.DialogHandler},
286
**self.db,
287
)
288
with self.assertRaises(sv.err.OperationalError):
289
sv.connect(
290
user='singlestoredb_3a',
291
auth_plugin_map={b'dialog': object},
292
**self.db,
293
)
294
295
with self.assertRaises(sv.err.OperationalError):
296
sv.connect(
297
user='singlestoredb_3a',
298
auth_plugin_map={b'dialog': TestAuthentication.DefectiveHandler},
299
**self.db,
300
)
301
with self.assertRaises(sv.err.OperationalError):
302
sv.connect(
303
user='singlestoredb_3a',
304
auth_plugin_map={b'notdialogplugin': TestAuthentication.Dialog},
305
**self.db,
306
)
307
TestAuthentication.Dialog.m = {b'Password, please:': b'I do not know'}
308
with self.assertRaises(sv.err.OperationalError):
309
sv.connect(
310
user='singlestoredb_3a',
311
auth_plugin_map={b'dialog': TestAuthentication.Dialog},
312
**self.db,
313
)
314
TestAuthentication.Dialog.m = {b'Password, please:': None}
315
with self.assertRaises(sv.err.OperationalError):
316
sv.connect(
317
user='singlestoredb_3a',
318
auth_plugin_map={b'dialog': TestAuthentication.Dialog},
319
**self.db,
320
)
321
322
@pytest.mark.skipif(not socket_auth, reason='connection to unix_socket required')
323
@pytest.mark.skipif(pam_found, reason='pam plugin already installed')
324
@pytest.mark.skipif(
325
os.environ.get('PASSWORD') is None, reason='PASSWORD env var required',
326
)
327
@pytest.mark.skipif(
328
os.environ.get('PAMSERVICE') is None, reason='PAMSERVICE env var required',
329
)
330
def testPamAuthInstallPlugin(self):
331
# needs plugin. lets install it.
332
cur = self.connect().cursor()
333
try:
334
cur.execute("install plugin pam soname 'auth_pam.so'")
335
TestAuthentication.pam_found = True
336
self.realTestPamAuth()
337
except sv.err.InternalError:
338
pytest.skip("we couldn't install the auth_pam plugin")
339
finally:
340
if TestAuthentication.pam_found:
341
cur.execute('uninstall plugin pam')
342
343
@pytest.mark.skipif(not socket_auth, reason='connection to unix_socket required')
344
@pytest.mark.skipif(not pam_found, reason='no pam plugin')
345
@pytest.mark.skipif(
346
os.environ.get('PASSWORD') is None, reason='PASSWORD env var required',
347
)
348
@pytest.mark.skipif(
349
os.environ.get('PAMSERVICE') is None, reason='PAMSERVICE env var required',
350
)
351
def testPamAuth(self):
352
self.realTestPamAuth()
353
354
@pytest.mark.skip(reason='skip PAM tests on SingleStoreDB')
355
def realTestPamAuth(self):
356
db = self.db.copy()
357
import os
358
359
db['password'] = os.environ.get('PASSWORD')
360
cur = self.connect().cursor()
361
try:
362
cur.execute('show grants for ' + TestAuthentication.osuser + '@localhost')
363
grants = cur.fetchone()[0]
364
cur.execute('drop user ' + TestAuthentication.osuser + '@localhost')
365
except sv.OperationalError as e:
366
# assuming the user doesn't exist which is ok too
367
self.assertEqual(1045, e.args[0])
368
grants = None
369
with TempUser(
370
cur,
371
TestAuthentication.osuser + '@localhost',
372
self.databases[0]['database'],
373
'pam',
374
os.environ.get('PAMSERVICE'),
375
) as _:
376
try:
377
c = sv.connect(user=TestAuthentication.osuser, **db) # noqa: F841
378
db['password'] = 'very bad guess at password'
379
with self.assertRaises(sv.err.OperationalError):
380
sv.connect(
381
user=TestAuthentication.osuser,
382
auth_plugin_map={
383
b'mysql_cleartext_password':
384
TestAuthentication.DefectiveHandler,
385
},
386
**self.db,
387
)
388
except sv.OperationalError as e:
389
self.assertEqual(1045, e.args[0])
390
# we had 'bad guess at password' work with pam. Well at least
391
# we get a permission denied here
392
with self.assertRaises(sv.err.OperationalError):
393
sv.connect(
394
user=TestAuthentication.osuser,
395
auth_plugin_map={
396
b'mysql_cleartext_password':
397
TestAuthentication.DefectiveHandler,
398
},
399
**self.db,
400
)
401
if grants:
402
# recreate the user
403
cur.execute(grants)
404
405
@pytest.mark.skip(reason='not currently supported by SingleStoreDB')
406
@pytest.mark.skipif(not socket_auth, reason='connection to unix_socket required')
407
@pytest.mark.skipif(
408
not sha256_password_found,
409
reason='no sha256 password authentication plugin found',
410
)
411
def testAuthSHA256(self):
412
conn = self.connect()
413
c = conn.cursor()
414
with TempUser(
415
c,
416
'singlestoredb_sha256@localhost',
417
self.databases[0]['database'],
418
'sha256_password',
419
) as _:
420
c.execute("SET PASSWORD FOR 'singlestoredb_sha256'@'localhost' ='Sh@256Pa33'")
421
c.execute('FLUSH PRIVILEGES')
422
db = self.db.copy()
423
db['password'] = 'Sh@256Pa33'
424
# Although SHA256 is supported, need the configuration of public
425
# key of the mysql server. Currently will get error by this test.
426
with self.assertRaises(sv.err.OperationalError):
427
sv.connect(user='singlestoredb_sha256', **db)
428
429
@pytest.mark.skipif(not ed25519_found, reason='no ed25519 authention plugin')
430
def testAuthEd25519(self):
431
db = self.db.copy()
432
db.pop('password', None)
433
conn = self.connect()
434
c = conn.cursor()
435
c.execute("select ed25519_password(''), ed25519_password('ed25519_password')")
436
for r in c:
437
empty_pass = r[0].decode('ascii')
438
non_empty_pass = r[1].decode('ascii')
439
440
with TempUser(
441
c,
442
'singlestoredb_ed25519',
443
self.databases[0]['database'],
444
'ed25519',
445
empty_pass,
446
) as _:
447
sv.connect(user='singlestoredb_ed25519', password='', **db)
448
449
with TempUser(
450
c,
451
'singlestoredb_ed25519',
452
self.databases[0]['database'],
453
'ed25519',
454
non_empty_pass,
455
) as _:
456
sv.connect(user='singlestoredb_ed25519', password='ed25519_password', **db)
457
458
459
class TestConnection(base.PyMySQLTestCase):
460
461
def test_utf8mb4(self):
462
"""This test requires MySQL >= 5.5."""
463
arg = self.databases[0].copy()
464
arg['charset'] = 'utf8mb4'
465
conn = sv.connect(**arg) # noqa: F841
466
467
def test_largedata(self):
468
"""Large query and response (>=16MB)."""
469
cur = self.connect().cursor()
470
cur.execute('SELECT @@max_allowed_packet')
471
if cur.fetchone()[0] < 16 * 1024 * 1024 + 10:
472
print('Set max_allowed_packet to bigger than 17MB')
473
return
474
t = 'a' * (16 * 1024 * 1024)
475
cur.execute("SELECT '" + t + "'")
476
assert cur.fetchone()[0] == t
477
478
def test_autocommit(self):
479
con = self.connect()
480
self.assertFalse(con.get_autocommit())
481
482
cur = con.cursor()
483
cur.execute('SET AUTOCOMMIT=1')
484
self.assertTrue(con.get_autocommit())
485
486
con.autocommit(False)
487
self.assertFalse(con.get_autocommit())
488
cur.execute('SELECT @@AUTOCOMMIT')
489
self.assertEqual(cur.fetchone()[0], 0)
490
491
def test_select_db(self):
492
con = self.connect()
493
current_db = self.databases[0]['database']
494
other_db = self.databases[1]['database']
495
496
cur = con.cursor()
497
cur.execute('SELECT database()')
498
self.assertEqual(cur.fetchone()[0], current_db)
499
500
con.select_db(other_db)
501
cur.execute('SELECT database()')
502
self.assertEqual(cur.fetchone()[0], other_db)
503
504
@pytest.mark.skip(reason='wait_timeout= does not work')
505
def test_connection_gone_away(self):
506
"""
507
http://dev.mysql.com/doc/refman/5.0/en/gone-away.html
508
http://dev.mysql.com/doc/refman/5.0/en/error-messages-client.html#error_cr_server_gone_error
509
510
"""
511
con = self.connect()
512
cur = con.cursor()
513
cur.execute('SET wait_timeout=1')
514
time.sleep(2)
515
with self.assertRaises(sv.OperationalError) as cm:
516
cur.execute('SELECT 1+1')
517
# error occures while reading, not writing because of socket buffer.
518
# self.assertEqual(cm.exception.args[0], 2006)
519
self.assertIn(cm.exception.args[0], (2006, 2013))
520
521
def test_init_command(self):
522
conn = self.connect(
523
init_command='SELECT "bar"; SELECT "baz"',
524
client_flag=CLIENT.MULTI_STATEMENTS,
525
)
526
c = conn.cursor()
527
c.execute('select "foobar";')
528
self.assertEqual(('foobar',), c.fetchone())
529
conn.close()
530
with self.assertRaises(sv.err.Error):
531
conn.ping(reconnect=False)
532
533
def test_read_default_group(self):
534
conn = self.connect(
535
read_default_group='client',
536
)
537
self.assertTrue(conn.open)
538
539
def test_set_charset(self):
540
c = self.connect()
541
c.set_charset('utf8mb4')
542
# TODO validate setting here
543
544
def test_defer_connect(self):
545
import socket
546
547
d = self.databases[0].copy()
548
try:
549
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
550
sock.connect(d['unix_socket'])
551
except KeyError:
552
sock.close()
553
sock = socket.create_connection(
554
(d.get('host', 'localhost'), d.get('port', 3306)),
555
)
556
for k in ['unix_socket', 'host', 'port']:
557
try:
558
del d[k]
559
except KeyError:
560
pass
561
562
c = sv.connect(defer_connect=True, **d)
563
self.assertFalse(c.open)
564
c.connect(sock)
565
c.close()
566
sock.close()
567
568
@pytest.mark.skip(reason='disable local user tests')
569
def test_ssl_connect(self):
570
dummy_ssl_context = mock.Mock(options=0)
571
with mock.patch(
572
'singlestoredb.connections.Connection.connect',
573
) as _, mock.patch(
574
'singlestoredb.connections.ssl.create_default_context',
575
new=mock.Mock(return_value=dummy_ssl_context),
576
) as create_default_context:
577
sv.connect(
578
ssl={
579
'ca': 'ca',
580
'cert': 'cert',
581
'key': 'key',
582
'cipher': 'cipher',
583
},
584
)
585
assert create_default_context.called
586
assert dummy_ssl_context.check_hostname
587
assert dummy_ssl_context.verify_mode == ssl.CERT_REQUIRED
588
dummy_ssl_context.load_cert_chain.assert_called_with('cert', keyfile='key')
589
dummy_ssl_context.set_ciphers.assert_called_with('cipher')
590
591
dummy_ssl_context = mock.Mock(options=0)
592
with mock.patch(
593
'singlestoredb.connections.Connection.connect',
594
) as _, mock.patch(
595
'singelstoredb.connections.ssl.create_default_context',
596
new=mock.Mock(return_value=dummy_ssl_context),
597
) as create_default_context:
598
sv.connect(
599
ssl={
600
'ca': 'ca',
601
'cert': 'cert',
602
'key': 'key',
603
},
604
)
605
assert create_default_context.called
606
assert dummy_ssl_context.check_hostname
607
assert dummy_ssl_context.verify_mode == ssl.CERT_REQUIRED
608
dummy_ssl_context.load_cert_chain.assert_called_with('cert', keyfile='key')
609
dummy_ssl_context.set_ciphers.assert_not_called
610
611
dummy_ssl_context = mock.Mock(options=0)
612
with mock.patch(
613
'singelstoredb.connections.Connection.connect',
614
) as _, mock.patch(
615
'singlestoredb.connections.ssl.create_default_context',
616
new=mock.Mock(return_value=dummy_ssl_context),
617
) as create_default_context:
618
sv.connect(
619
ssl_ca='ca',
620
)
621
assert create_default_context.called
622
assert not dummy_ssl_context.check_hostname
623
assert dummy_ssl_context.verify_mode == ssl.CERT_NONE
624
dummy_ssl_context.load_cert_chain.assert_not_called
625
dummy_ssl_context.set_ciphers.assert_not_called
626
627
dummy_ssl_context = mock.Mock(options=0)
628
with mock.patch(
629
'singlestoredb.connections.Connection.connect',
630
) as _, mock.patch(
631
'singlestoredb.connections.ssl.create_default_context',
632
new=mock.Mock(return_value=dummy_ssl_context),
633
) as create_default_context:
634
sv.connect(
635
ssl_ca='ca',
636
ssl_cert='cert',
637
ssl_key='key',
638
)
639
assert create_default_context.called
640
assert not dummy_ssl_context.check_hostname
641
assert dummy_ssl_context.verify_mode == ssl.CERT_NONE
642
dummy_ssl_context.load_cert_chain.assert_called_with('cert', keyfile='key')
643
dummy_ssl_context.set_ciphers.assert_not_called
644
645
for ssl_verify_cert in (True, '1', 'yes', 'true'):
646
dummy_ssl_context = mock.Mock(options=0)
647
with mock.patch(
648
'singlestoredb.connections.Connection.connect',
649
) as _, mock.patch(
650
'singlestoredb.connections.ssl.create_default_context',
651
new=mock.Mock(return_value=dummy_ssl_context),
652
) as create_default_context:
653
sv.connect(
654
ssl_cert='cert',
655
ssl_key='key',
656
ssl_verify_cert=ssl_verify_cert,
657
)
658
assert create_default_context.called
659
assert not dummy_ssl_context.check_hostname
660
assert dummy_ssl_context.verify_mode == ssl.CERT_REQUIRED
661
dummy_ssl_context.load_cert_chain.assert_called_with(
662
'cert', keyfile='key',
663
)
664
dummy_ssl_context.set_ciphers.assert_not_called
665
666
for ssl_verify_cert in (None, False, '0', 'no', 'false'):
667
dummy_ssl_context = mock.Mock(options=0)
668
with mock.patch(
669
'singlestoredb.connections.Connection.connect',
670
) as _, mock.patch(
671
'singlestoredb.connections.ssl.create_default_context',
672
new=mock.Mock(return_value=dummy_ssl_context),
673
) as create_default_context:
674
sv.connect(
675
ssl_cert='cert',
676
ssl_key='key',
677
ssl_verify_cert=ssl_verify_cert,
678
)
679
assert create_default_context.called
680
assert not dummy_ssl_context.check_hostname
681
assert dummy_ssl_context.verify_mode == ssl.CERT_NONE
682
dummy_ssl_context.load_cert_chain.assert_called_with(
683
'cert', keyfile='key',
684
)
685
dummy_ssl_context.set_ciphers.assert_not_called
686
687
for ssl_ca in ('ca', None):
688
for ssl_verify_cert in ('foo', 'bar', ''):
689
dummy_ssl_context = mock.Mock(options=0)
690
with mock.patch(
691
'singlestoredb.connections.Connection.connect',
692
) as _, mock.patch(
693
'singlestoredb.connections.ssl.create_default_context',
694
new=mock.Mock(return_value=dummy_ssl_context),
695
) as create_default_context:
696
sv.connect(
697
ssl_ca=ssl_ca,
698
ssl_cert='cert',
699
ssl_key='key',
700
ssl_verify_cert=ssl_verify_cert,
701
)
702
assert create_default_context.called
703
assert not dummy_ssl_context.check_hostname
704
assert dummy_ssl_context.verify_mode == (
705
ssl.CERT_REQUIRED if ssl_ca is not None else ssl.CERT_NONE
706
), (ssl_ca, ssl_verify_cert)
707
dummy_ssl_context.load_cert_chain.assert_called_with(
708
'cert', keyfile='key',
709
)
710
dummy_ssl_context.set_ciphers.assert_not_called
711
712
dummy_ssl_context = mock.Mock(options=0)
713
with mock.patch(
714
'singlestoredb.connections.Connection.connect',
715
) as _, mock.patch(
716
'singlestoredb.connections.ssl.create_default_context',
717
new=mock.Mock(return_value=dummy_ssl_context),
718
) as create_default_context:
719
sv.connect(
720
ssl_ca='ca',
721
ssl_cert='cert',
722
ssl_key='key',
723
ssl_verify_identity=True,
724
)
725
assert create_default_context.called
726
assert dummy_ssl_context.check_hostname
727
assert dummy_ssl_context.verify_mode == ssl.CERT_NONE
728
dummy_ssl_context.load_cert_chain.assert_called_with('cert', keyfile='key')
729
dummy_ssl_context.set_ciphers.assert_not_called
730
731
dummy_ssl_context = mock.Mock(options=0)
732
with mock.patch(
733
'singlestoredb.connections.Connection.connect',
734
) as _, mock.patch(
735
'singlestoredb.connections.ssl.create_default_context',
736
new=mock.Mock(return_value=dummy_ssl_context),
737
) as create_default_context:
738
sv.connect(
739
ssl_disabled=True,
740
ssl={
741
'ca': 'ca',
742
'cert': 'cert',
743
'key': 'key',
744
},
745
)
746
assert not create_default_context.called
747
748
dummy_ssl_context = mock.Mock(options=0)
749
with mock.patch(
750
'singlestoredb.connections.Connection.connect',
751
) as _, mock.patch(
752
'singlestoredb.connections.ssl.create_default_context',
753
new=mock.Mock(return_value=dummy_ssl_context),
754
) as create_default_context:
755
sv.connect(
756
ssl_disabled=True,
757
ssl_ca='ca',
758
ssl_cert='cert',
759
ssl_key='key',
760
)
761
assert not create_default_context.called
762
763
764
# A custom type and function to escape it
765
class Foo:
766
value = 'bar'
767
768
769
def escape_foo(x, d):
770
return x.value
771
772
773
class TestEscape(base.PyMySQLTestCase):
774
775
def test_escape_string(self):
776
con = self.connect()
777
cur = con.cursor() # noqa: F841
778
779
self.assertEqual(con.escape("foo'bar"), "'foo\\'bar'")
780
# # added NO_AUTO_CREATE_USER as not including it in 5.7 generates warnings
781
# # mysql-8.0 removes the option however
782
# if self.mysql_server_is(con, (8, 0, 0)):
783
# cur.execute("SET sql_mode='NO_BACKSLASH_ESCAPES'")
784
# else:
785
# cur.execute("SET sql_mode='NO_BACKSLASH_ESCAPES,NO_AUTO_CREATE_USER'")
786
# self.assertEqual(con.escape("foo'bar"), "'foo''bar'")
787
788
def test_escape_builtin_encoders(self):
789
con = self.connect()
790
cur = con.cursor() # noqa: F841
791
792
val = datetime.datetime(2012, 3, 4, 5, 6)
793
self.assertEqual(con.escape(val, con.encoders), "'2012-03-04 05:06:00'")
794
795
def test_escape_custom_object(self):
796
con = self.connect()
797
cur = con.cursor() # noqa: F841
798
799
mapping = {Foo: escape_foo}
800
self.assertEqual(con.escape(Foo(), mapping), 'bar')
801
802
def test_escape_fallback_encoder(self):
803
con = self.connect()
804
cur = con.cursor() # noqa: F841
805
806
class Custom(str):
807
pass
808
809
mapping = {str: sv.converters.escape_string}
810
self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
811
812
def test_escape_no_default(self):
813
con = self.connect()
814
cur = con.cursor() # noqa: F841
815
816
self.assertRaises(TypeError, con.escape, 42, {})
817
818
def test_escape_dict_value(self):
819
con = self.connect()
820
cur = con.cursor() # noqa: F841
821
822
mapping = con.encoders.copy()
823
mapping[Foo] = escape_foo
824
self.assertEqual(con.escape({'foo': Foo()}, mapping), {'foo': 'bar'})
825
826
def test_escape_list_item(self):
827
con = self.connect()
828
cur = con.cursor() # noqa: F841
829
830
mapping = con.encoders.copy()
831
mapping[Foo] = escape_foo
832
self.assertEqual(con.escape([Foo()], mapping), '(bar)')
833
834
def test_previous_cursor_not_closed(self):
835
con = self.connect(
836
init_command='SELECT "bar"; SELECT "baz"',
837
client_flag=CLIENT.MULTI_STATEMENTS,
838
)
839
cur1 = con.cursor()
840
cur1.execute('SELECT 1; SELECT 2')
841
cur2 = con.cursor()
842
cur2.execute('SELECT 3')
843
self.assertEqual(cur2.fetchone()[0], 3)
844
845
def test_commit_during_multi_result(self):
846
con = self.connect(client_flag=CLIENT.MULTI_STATEMENTS)
847
cur = con.cursor()
848
cur.execute('SELECT 1; SELECT 2')
849
con.commit()
850
cur.execute('SELECT 3')
851
self.assertEqual(cur.fetchone()[0], 3)
852
853