Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/mysql/tests/test_basic.py
469 views
1
# type: ignore
2
import datetime
3
import json
4
import time
5
6
import pytest
7
8
import singlestoredb.mysql as sv
9
import singlestoredb.mysql.cursors as cursors
10
from singlestoredb.mysql.tests import base
11
12
13
__all__ = ['TestConversion', 'TestCursor', 'TestBulkInserts']
14
15
16
class TestConversion(base.PyMySQLTestCase):
17
18
def test_datatypes(self):
19
"""Test every data type."""
20
conn = self.connect()
21
c = conn.cursor()
22
c.execute(
23
'create table test_datatypes (b bit, i int, l bigint, f real, '
24
' s varchar(32), u varchar(32), bb blob, '
25
' d date, dt datetime, ts timestamp, '
26
' td time, t time, st datetime)',
27
)
28
try:
29
# insert values
30
31
v = (
32
True,
33
-3,
34
123456789012,
35
5.7,
36
"hello'\" world",
37
'Espa\xc3\xb1ol',
38
'binary\x00data'.encode(conn.encoding),
39
datetime.date(1988, 2, 2),
40
datetime.datetime(2014, 5, 15, 7, 45, 57),
41
datetime.timedelta(5, 6),
42
datetime.time(16, 32),
43
time.localtime(),
44
)
45
c.execute(
46
'insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) '
47
' values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',
48
v,
49
)
50
c.execute('select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes')
51
r = c.fetchone()
52
self.assertEqual(b'\x00\x00\x00\x00\x00\x00\x00\x01', r[0])
53
self.assertEqual(v[1:10], r[1:10])
54
self.assertEqual(
55
datetime.timedelta(0, 60 * (v[10].hour * 60 + v[10].minute)), r[10],
56
)
57
self.assertEqual(datetime.datetime(*v[-1][:6]), r[-1])
58
59
c.execute('delete from test_datatypes')
60
61
# check nulls
62
c.execute(
63
'insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) '
64
' values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',
65
[None] * 12,
66
)
67
c.execute('select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes')
68
r = c.fetchone()
69
self.assertEqual(tuple([None] * 12), r)
70
71
c.execute('delete from test_datatypes')
72
73
# check sequences type
74
for seq_type in (tuple, list, set, frozenset):
75
c.execute(
76
'insert into test_datatypes (i, l) values (2,4), (6,8), (10,12)',
77
)
78
seq = seq_type([2, 6])
79
c.execute(
80
'select l from test_datatypes where i in %s order by i', (seq,),
81
)
82
r = c.fetchall()
83
# NOTE: C extension returns a list, not a tuple
84
self.assertEqual(((4,), (8,)), tuple(r))
85
c.execute('delete from test_datatypes')
86
87
finally:
88
c.execute('drop table test_datatypes')
89
90
def test_dict(self):
91
"""Test dict escaping."""
92
conn = self.connect()
93
c = conn.cursor()
94
c.execute('create table test_dict (a integer, b integer, c integer)')
95
try:
96
c.execute(
97
'insert into test_dict (a,b,c) values (%(a)s, %(b)s, %(c)s)',
98
{'a': 1, 'b': 2, 'c': 3},
99
)
100
c.execute('select a,b,c from test_dict')
101
self.assertEqual((1, 2, 3), c.fetchone())
102
finally:
103
c.execute('drop table test_dict')
104
105
def test_string(self):
106
conn = self.connect()
107
c = conn.cursor()
108
c.execute('create table test_dict (a text)')
109
test_value = 'I am a test string'
110
try:
111
c.execute('insert into test_dict (a) values (%s)', test_value)
112
c.execute('select a from test_dict')
113
self.assertEqual((test_value,), c.fetchone())
114
finally:
115
c.execute('drop table test_dict')
116
117
def test_integer(self):
118
conn = self.connect()
119
c = conn.cursor()
120
c.execute('create table test_dict (a integer)')
121
test_value = 12345
122
try:
123
c.execute('insert into test_dict (a) values (%s)', test_value)
124
c.execute('select a from test_dict')
125
self.assertEqual((test_value,), c.fetchone())
126
finally:
127
c.execute('drop table test_dict')
128
129
def test_binary(self):
130
"""Test binary data."""
131
data = bytes(bytearray(range(255)))
132
conn = self.connect()
133
self.safe_create_table(
134
conn, 'test_binary', 'create table test_binary (b binary(255))',
135
)
136
137
with conn.cursor() as c:
138
c.execute('insert into test_binary (b) values (_binary %s)', (data,))
139
c.execute('select b from test_binary')
140
self.assertEqual(data, c.fetchone()[0])
141
142
def test_blob(self):
143
"""Test blob data."""
144
data = bytes(bytearray(range(256)) * 4)
145
conn = self.connect()
146
self.safe_create_table(conn, 'test_blob', 'create table test_blob (b blob)')
147
148
with conn.cursor() as c:
149
c.execute('insert into test_blob (b) values (_binary %s)', (data,))
150
c.execute('select b from test_blob')
151
self.assertEqual(data, c.fetchone()[0])
152
153
def test_untyped(self):
154
"""Test conversion of null, empty string."""
155
conn = self.connect()
156
c = conn.cursor()
157
c.execute("select null,''")
158
self.assertEqual((None, ''), c.fetchone())
159
c.execute("select '',null")
160
self.assertEqual(('', None), c.fetchone())
161
162
def test_timedelta(self):
163
"""Test timedelta conversion."""
164
conn = self.connect()
165
c = conn.cursor()
166
c.execute(
167
"select time('12:30'), time('23:12:59'), time('23:12:59.05100'), "
168
" time('-12:30'), time('-23:12:59'), time('-23:12:59.05100'), "
169
" time('-00:30')",
170
)
171
self.assertEqual(
172
(
173
datetime.timedelta(0, 45000),
174
datetime.timedelta(0, 83579),
175
datetime.timedelta(0, 83579, 51000),
176
-datetime.timedelta(0, 45000),
177
-datetime.timedelta(0, 83579),
178
-datetime.timedelta(0, 83579, 51000),
179
-datetime.timedelta(0, 1800),
180
),
181
c.fetchone(),
182
)
183
184
def test_datetime_microseconds(self):
185
"""Test datetime conversion w microseconds."""
186
187
conn = self.connect()
188
c = conn.cursor()
189
dt = datetime.datetime(2013, 11, 12, 9, 9, 9, 123450)
190
c.execute('create table test_datetime (id int, ts datetime(6))')
191
try:
192
c.execute('insert into test_datetime values (%s, %s)', (1, dt))
193
c.execute('select ts from test_datetime')
194
self.assertEqual((dt,), c.fetchone())
195
finally:
196
c.execute('drop table test_datetime')
197
198
199
class TestCursor(base.PyMySQLTestCase):
200
# this test case does not work quite right yet, however,
201
# we substitute in None for the erroneous field which is
202
# compatible with the DB-API 2.0 spec and has not broken
203
# any unit tests for anything we've tried.
204
205
# def test_description(self):
206
# """ test description attribute """
207
# # result is from MySQLdb module
208
# r = (('Host', 254, 11, 60, 60, 0, 0),
209
# ('User', 254, 16, 16, 16, 0, 0),
210
# ('Password', 254, 41, 41, 41, 0, 0),
211
# ('Select_priv', 254, 1, 1, 1, 0, 0),
212
# ('Insert_priv', 254, 1, 1, 1, 0, 0),
213
# ('Update_priv', 254, 1, 1, 1, 0, 0),
214
# ('Delete_priv', 254, 1, 1, 1, 0, 0),
215
# ('Create_priv', 254, 1, 1, 1, 0, 0),
216
# ('Drop_priv', 254, 1, 1, 1, 0, 0),
217
# ('Reload_priv', 254, 1, 1, 1, 0, 0),
218
# ('Shutdown_priv', 254, 1, 1, 1, 0, 0),
219
# ('Process_priv', 254, 1, 1, 1, 0, 0),
220
# ('File_priv', 254, 1, 1, 1, 0, 0),
221
# ('Grant_priv', 254, 1, 1, 1, 0, 0),
222
# ('References_priv', 254, 1, 1, 1, 0, 0),
223
# ('Index_priv', 254, 1, 1, 1, 0, 0),
224
# ('Alter_priv', 254, 1, 1, 1, 0, 0),
225
# ('Show_db_priv', 254, 1, 1, 1, 0, 0),
226
# ('Super_priv', 254, 1, 1, 1, 0, 0),
227
# ('Create_tmp_table_priv', 254, 1, 1, 1, 0, 0),
228
# ('Lock_tables_priv', 254, 1, 1, 1, 0, 0),
229
# ('Execute_priv', 254, 1, 1, 1, 0, 0),
230
# ('Repl_slave_priv', 254, 1, 1, 1, 0, 0),
231
# ('Repl_client_priv', 254, 1, 1, 1, 0, 0),
232
# ('Create_view_priv', 254, 1, 1, 1, 0, 0),
233
# ('Show_view_priv', 254, 1, 1, 1, 0, 0),
234
# ('Create_routine_priv', 254, 1, 1, 1, 0, 0),
235
# ('Alter_routine_priv', 254, 1, 1, 1, 0, 0),
236
# ('Create_user_priv', 254, 1, 1, 1, 0, 0),
237
# ('Event_priv', 254, 1, 1, 1, 0, 0),
238
# ('Trigger_priv', 254, 1, 1, 1, 0, 0),
239
# ('ssl_type', 254, 0, 9, 9, 0, 0),
240
# ('ssl_cipher', 252, 0, 65535, 65535, 0, 0),
241
# ('x509_issuer', 252, 0, 65535, 65535, 0, 0),
242
# ('x509_subject', 252, 0, 65535, 65535, 0, 0),
243
# ('max_questions', 3, 1, 11, 11, 0, 0),
244
# ('max_updates', 3, 1, 11, 11, 0, 0),
245
# ('max_connections', 3, 1, 11, 11, 0, 0),
246
# ('max_user_connections', 3, 1, 11, 11, 0, 0))
247
# conn = self.connect()
248
# c = conn.cursor()
249
# c.execute("select * from mysql.user")
250
#
251
# self.assertEqual(r, c.description)
252
253
def test_fetch_no_result(self):
254
"""Test a fetchone() with no rows."""
255
conn = self.connect()
256
c = conn.cursor()
257
c.execute('create table test_nr (b varchar(32))')
258
try:
259
data = 'pymysql'
260
c.execute('insert into test_nr (b) values (%s)', (data,))
261
self.assertEqual(None, c.fetchone())
262
finally:
263
c.execute('drop table test_nr')
264
265
def test_aggregates(self):
266
"""Test aggregate functions."""
267
conn = self.connect()
268
c = conn.cursor()
269
try:
270
c.execute('create table test_aggregates (i integer)')
271
for i in range(0, 10):
272
c.execute('insert into test_aggregates (i) values (%s)', (i,))
273
c.execute('select sum(i) from test_aggregates')
274
(r,) = c.fetchone()
275
self.assertEqual(sum(range(0, 10)), r)
276
finally:
277
c.execute('drop table test_aggregates')
278
279
def test_single_tuple(self):
280
"""Test a single tuple."""
281
conn = self.connect()
282
c = conn.cursor()
283
self.safe_create_table(
284
conn, 'mystuff', 'create table mystuff (id integer primary key)',
285
)
286
c.execute('insert into mystuff (id) values (1)')
287
c.execute('insert into mystuff (id) values (2)')
288
c.execute('select id from mystuff where id in %s', ((1,),))
289
self.assertEqual([(1,)], list(c.fetchall()))
290
c.close()
291
292
def test_json(self):
293
args = self.databases[0].copy()
294
args['charset'] = 'utf8mb4'
295
conn = sv.connect(**args)
296
# MariaDB only has limited JSON support, stores data as longtext
297
# https://mariadb.com/kb/en/json-data-type/
298
if not self.mysql_server_is(conn, (5, 7, 0)):
299
pytest.skip('JSON type is only supported on MySQL >= 5.7')
300
301
self.safe_create_table(
302
conn,
303
'test_json',
304
"""\
305
create table test_json (
306
id int not null,
307
json JSON not null,
308
primary key (id)
309
);""",
310
)
311
cur = conn.cursor()
312
313
json_str = '{"hello": "こんにちは"}'
314
cur.execute('INSERT INTO test_json (id, `json`) values (42, %s)', (json_str,))
315
cur.execute('SELECT `json` from `test_json` WHERE `id`=42')
316
res = cur.fetchone()[0]
317
self.assertEqual(res, json.loads(json_str))
318
319
# cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
320
cur.execute('SELECT %s :> JSON AS x', (json_str,))
321
res = cur.fetchone()[0]
322
self.assertEqual(res, json.loads(json_str))
323
324
325
class TestBulkInserts(base.PyMySQLTestCase):
326
327
cursor_type = cursors.DictCursor
328
329
def setUp(self):
330
super(TestBulkInserts, self).setUp()
331
self.conn = conn = self.connect(cursorclass=self.cursor_type)
332
c = conn.cursor() # noqa: F841
333
334
# create a table ane some data to query
335
self.safe_create_table(
336
conn,
337
'bulkinsert',
338
"""\
339
CREATE TABLE bulkinsert
340
(
341
id int,
342
name char(20),
343
age int,
344
height int,
345
PRIMARY KEY (id)
346
)
347
""",
348
)
349
350
def _verify_records(self, data):
351
conn = self.connect()
352
cursor = conn.cursor()
353
cursor.execute('SELECT id, name, age, height from bulkinsert')
354
result = cursor.fetchall()
355
self.assertEqual(sorted(data), sorted(result))
356
357
def test_bulk_insert(self):
358
conn = self.connect()
359
cursor = conn.cursor()
360
361
data = [(0, 'bob', 21, 123), (1, 'jim', 56, 45), (2, 'fred', 100, 180)]
362
cursor.executemany(
363
'insert into bulkinsert (id, name, age, height) ' 'values (%s,%s,%s,%s)',
364
data,
365
)
366
self.assertEqual(
367
cursor._executed,
368
bytearray(
369
b'insert into bulkinsert (id, name, age, height) values '
370
b"(0,'bob',21,123),(1,'jim',56,45),(2,'fred',100,180)",
371
),
372
)
373
cursor.execute('commit')
374
self._verify_records(data)
375
376
def test_bulk_insert_multiline_statement(self):
377
conn = self.connect()
378
cursor = conn.cursor()
379
data = [(0, 'bob', 21, 123), (1, 'jim', 56, 45), (2, 'fred', 100, 180)]
380
cursor.executemany(
381
"""insert
382
into bulkinsert (id, name,
383
age, height)
384
values (%s,
385
%s , %s,
386
%s )
387
""",
388
data,
389
)
390
self.assertEqual(
391
cursor._executed.strip(),
392
bytearray(
393
b"""insert
394
into bulkinsert (id, name,
395
age, height)
396
values (0,
397
'bob' , 21,
398
123 ),(1,
399
'jim' , 56,
400
45 ),(2,
401
'fred' , 100,
402
180 )""",
403
),
404
)
405
cursor.execute('commit')
406
self._verify_records(data)
407
408
def test_bulk_insert_single_record(self):
409
conn = self.connect()
410
cursor = conn.cursor()
411
data = [(0, 'bob', 21, 123)]
412
cursor.executemany(
413
'insert into bulkinsert (id, name, age, height) ' 'values (%s,%s,%s,%s)',
414
data,
415
)
416
cursor.execute('commit')
417
self._verify_records(data)
418
419
def test_issue_288(self):
420
"""Executemany should work with "insert ... on update"."""
421
conn = self.connect()
422
cursor = conn.cursor()
423
data = [(0, 'bob', 21, 123), (1, 'jim', 56, 45), (2, 'fred', 100, 180)]
424
cursor.executemany(
425
"""insert
426
into bulkinsert (id, name,
427
age, height)
428
values (%s,
429
%s , %s,
430
%s ) on duplicate key update
431
age = values(age)
432
""",
433
data,
434
)
435
self.assertEqual(
436
cursor._executed.strip(),
437
bytearray(
438
b"""insert
439
into bulkinsert (id, name,
440
age, height)
441
values (0,
442
'bob' , 21,
443
123 ),(1,
444
'jim' , 56,
445
45 ),(2,
446
'fred' , 100,
447
180 ) on duplicate key update
448
age = values(age)""",
449
),
450
)
451
cursor.execute('commit')
452
self._verify_records(data)
453
454