Path: blob/main/singlestoredb/mysql/tests/test_basic.py
469 views
# type: ignore1import datetime2import json3import time45import pytest67import singlestoredb.mysql as sv8import singlestoredb.mysql.cursors as cursors9from singlestoredb.mysql.tests import base101112__all__ = ['TestConversion', 'TestCursor', 'TestBulkInserts']131415class TestConversion(base.PyMySQLTestCase):1617def test_datatypes(self):18"""Test every data type."""19conn = self.connect()20c = conn.cursor()21c.execute(22'create table test_datatypes (b bit, i int, l bigint, f real, '23' s varchar(32), u varchar(32), bb blob, '24' d date, dt datetime, ts timestamp, '25' td time, t time, st datetime)',26)27try:28# insert values2930v = (31True,32-3,33123456789012,345.7,35"hello'\" world",36'Espa\xc3\xb1ol',37'binary\x00data'.encode(conn.encoding),38datetime.date(1988, 2, 2),39datetime.datetime(2014, 5, 15, 7, 45, 57),40datetime.timedelta(5, 6),41datetime.time(16, 32),42time.localtime(),43)44c.execute(45'insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) '46' values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',47v,48)49c.execute('select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes')50r = c.fetchone()51self.assertEqual(b'\x00\x00\x00\x00\x00\x00\x00\x01', r[0])52self.assertEqual(v[1:10], r[1:10])53self.assertEqual(54datetime.timedelta(0, 60 * (v[10].hour * 60 + v[10].minute)), r[10],55)56self.assertEqual(datetime.datetime(*v[-1][:6]), r[-1])5758c.execute('delete from test_datatypes')5960# check nulls61c.execute(62'insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) '63' values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',64[None] * 12,65)66c.execute('select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes')67r = c.fetchone()68self.assertEqual(tuple([None] * 12), r)6970c.execute('delete from test_datatypes')7172# check sequences type73for seq_type in (tuple, list, set, frozenset):74c.execute(75'insert into test_datatypes (i, l) values (2,4), (6,8), (10,12)',76)77seq = seq_type([2, 6])78c.execute(79'select l from test_datatypes where i in %s order by i', (seq,),80)81r = c.fetchall()82# NOTE: C extension returns a list, not a tuple83self.assertEqual(((4,), (8,)), tuple(r))84c.execute('delete from test_datatypes')8586finally:87c.execute('drop table test_datatypes')8889def test_dict(self):90"""Test dict escaping."""91conn = self.connect()92c = conn.cursor()93c.execute('create table test_dict (a integer, b integer, c integer)')94try:95c.execute(96'insert into test_dict (a,b,c) values (%(a)s, %(b)s, %(c)s)',97{'a': 1, 'b': 2, 'c': 3},98)99c.execute('select a,b,c from test_dict')100self.assertEqual((1, 2, 3), c.fetchone())101finally:102c.execute('drop table test_dict')103104def test_string(self):105conn = self.connect()106c = conn.cursor()107c.execute('create table test_dict (a text)')108test_value = 'I am a test string'109try:110c.execute('insert into test_dict (a) values (%s)', test_value)111c.execute('select a from test_dict')112self.assertEqual((test_value,), c.fetchone())113finally:114c.execute('drop table test_dict')115116def test_integer(self):117conn = self.connect()118c = conn.cursor()119c.execute('create table test_dict (a integer)')120test_value = 12345121try:122c.execute('insert into test_dict (a) values (%s)', test_value)123c.execute('select a from test_dict')124self.assertEqual((test_value,), c.fetchone())125finally:126c.execute('drop table test_dict')127128def test_binary(self):129"""Test binary data."""130data = bytes(bytearray(range(255)))131conn = self.connect()132self.safe_create_table(133conn, 'test_binary', 'create table test_binary (b binary(255))',134)135136with conn.cursor() as c:137c.execute('insert into test_binary (b) values (_binary %s)', (data,))138c.execute('select b from test_binary')139self.assertEqual(data, c.fetchone()[0])140141def test_blob(self):142"""Test blob data."""143data = bytes(bytearray(range(256)) * 4)144conn = self.connect()145self.safe_create_table(conn, 'test_blob', 'create table test_blob (b blob)')146147with conn.cursor() as c:148c.execute('insert into test_blob (b) values (_binary %s)', (data,))149c.execute('select b from test_blob')150self.assertEqual(data, c.fetchone()[0])151152def test_untyped(self):153"""Test conversion of null, empty string."""154conn = self.connect()155c = conn.cursor()156c.execute("select null,''")157self.assertEqual((None, ''), c.fetchone())158c.execute("select '',null")159self.assertEqual(('', None), c.fetchone())160161def test_timedelta(self):162"""Test timedelta conversion."""163conn = self.connect()164c = conn.cursor()165c.execute(166"select time('12:30'), time('23:12:59'), time('23:12:59.05100'), "167" time('-12:30'), time('-23:12:59'), time('-23:12:59.05100'), "168" time('-00:30')",169)170self.assertEqual(171(172datetime.timedelta(0, 45000),173datetime.timedelta(0, 83579),174datetime.timedelta(0, 83579, 51000),175-datetime.timedelta(0, 45000),176-datetime.timedelta(0, 83579),177-datetime.timedelta(0, 83579, 51000),178-datetime.timedelta(0, 1800),179),180c.fetchone(),181)182183def test_datetime_microseconds(self):184"""Test datetime conversion w microseconds."""185186conn = self.connect()187c = conn.cursor()188dt = datetime.datetime(2013, 11, 12, 9, 9, 9, 123450)189c.execute('create table test_datetime (id int, ts datetime(6))')190try:191c.execute('insert into test_datetime values (%s, %s)', (1, dt))192c.execute('select ts from test_datetime')193self.assertEqual((dt,), c.fetchone())194finally:195c.execute('drop table test_datetime')196197198class TestCursor(base.PyMySQLTestCase):199# this test case does not work quite right yet, however,200# we substitute in None for the erroneous field which is201# compatible with the DB-API 2.0 spec and has not broken202# any unit tests for anything we've tried.203204# def test_description(self):205# """ test description attribute """206# # result is from MySQLdb module207# r = (('Host', 254, 11, 60, 60, 0, 0),208# ('User', 254, 16, 16, 16, 0, 0),209# ('Password', 254, 41, 41, 41, 0, 0),210# ('Select_priv', 254, 1, 1, 1, 0, 0),211# ('Insert_priv', 254, 1, 1, 1, 0, 0),212# ('Update_priv', 254, 1, 1, 1, 0, 0),213# ('Delete_priv', 254, 1, 1, 1, 0, 0),214# ('Create_priv', 254, 1, 1, 1, 0, 0),215# ('Drop_priv', 254, 1, 1, 1, 0, 0),216# ('Reload_priv', 254, 1, 1, 1, 0, 0),217# ('Shutdown_priv', 254, 1, 1, 1, 0, 0),218# ('Process_priv', 254, 1, 1, 1, 0, 0),219# ('File_priv', 254, 1, 1, 1, 0, 0),220# ('Grant_priv', 254, 1, 1, 1, 0, 0),221# ('References_priv', 254, 1, 1, 1, 0, 0),222# ('Index_priv', 254, 1, 1, 1, 0, 0),223# ('Alter_priv', 254, 1, 1, 1, 0, 0),224# ('Show_db_priv', 254, 1, 1, 1, 0, 0),225# ('Super_priv', 254, 1, 1, 1, 0, 0),226# ('Create_tmp_table_priv', 254, 1, 1, 1, 0, 0),227# ('Lock_tables_priv', 254, 1, 1, 1, 0, 0),228# ('Execute_priv', 254, 1, 1, 1, 0, 0),229# ('Repl_slave_priv', 254, 1, 1, 1, 0, 0),230# ('Repl_client_priv', 254, 1, 1, 1, 0, 0),231# ('Create_view_priv', 254, 1, 1, 1, 0, 0),232# ('Show_view_priv', 254, 1, 1, 1, 0, 0),233# ('Create_routine_priv', 254, 1, 1, 1, 0, 0),234# ('Alter_routine_priv', 254, 1, 1, 1, 0, 0),235# ('Create_user_priv', 254, 1, 1, 1, 0, 0),236# ('Event_priv', 254, 1, 1, 1, 0, 0),237# ('Trigger_priv', 254, 1, 1, 1, 0, 0),238# ('ssl_type', 254, 0, 9, 9, 0, 0),239# ('ssl_cipher', 252, 0, 65535, 65535, 0, 0),240# ('x509_issuer', 252, 0, 65535, 65535, 0, 0),241# ('x509_subject', 252, 0, 65535, 65535, 0, 0),242# ('max_questions', 3, 1, 11, 11, 0, 0),243# ('max_updates', 3, 1, 11, 11, 0, 0),244# ('max_connections', 3, 1, 11, 11, 0, 0),245# ('max_user_connections', 3, 1, 11, 11, 0, 0))246# conn = self.connect()247# c = conn.cursor()248# c.execute("select * from mysql.user")249#250# self.assertEqual(r, c.description)251252def test_fetch_no_result(self):253"""Test a fetchone() with no rows."""254conn = self.connect()255c = conn.cursor()256c.execute('create table test_nr (b varchar(32))')257try:258data = 'pymysql'259c.execute('insert into test_nr (b) values (%s)', (data,))260self.assertEqual(None, c.fetchone())261finally:262c.execute('drop table test_nr')263264def test_aggregates(self):265"""Test aggregate functions."""266conn = self.connect()267c = conn.cursor()268try:269c.execute('create table test_aggregates (i integer)')270for i in range(0, 10):271c.execute('insert into test_aggregates (i) values (%s)', (i,))272c.execute('select sum(i) from test_aggregates')273(r,) = c.fetchone()274self.assertEqual(sum(range(0, 10)), r)275finally:276c.execute('drop table test_aggregates')277278def test_single_tuple(self):279"""Test a single tuple."""280conn = self.connect()281c = conn.cursor()282self.safe_create_table(283conn, 'mystuff', 'create table mystuff (id integer primary key)',284)285c.execute('insert into mystuff (id) values (1)')286c.execute('insert into mystuff (id) values (2)')287c.execute('select id from mystuff where id in %s', ((1,),))288self.assertEqual([(1,)], list(c.fetchall()))289c.close()290291def test_json(self):292args = self.databases[0].copy()293args['charset'] = 'utf8mb4'294conn = sv.connect(**args)295# MariaDB only has limited JSON support, stores data as longtext296# https://mariadb.com/kb/en/json-data-type/297if not self.mysql_server_is(conn, (5, 7, 0)):298pytest.skip('JSON type is only supported on MySQL >= 5.7')299300self.safe_create_table(301conn,302'test_json',303"""\304create table test_json (305id int not null,306json JSON not null,307primary key (id)308);""",309)310cur = conn.cursor()311312json_str = '{"hello": "こんにちは"}'313cur.execute('INSERT INTO test_json (id, `json`) values (42, %s)', (json_str,))314cur.execute('SELECT `json` from `test_json` WHERE `id`=42')315res = cur.fetchone()[0]316self.assertEqual(res, json.loads(json_str))317318# cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))319cur.execute('SELECT %s :> JSON AS x', (json_str,))320res = cur.fetchone()[0]321self.assertEqual(res, json.loads(json_str))322323324class TestBulkInserts(base.PyMySQLTestCase):325326cursor_type = cursors.DictCursor327328def setUp(self):329super(TestBulkInserts, self).setUp()330self.conn = conn = self.connect(cursorclass=self.cursor_type)331c = conn.cursor() # noqa: F841332333# create a table ane some data to query334self.safe_create_table(335conn,336'bulkinsert',337"""\338CREATE TABLE bulkinsert339(340id int,341name char(20),342age int,343height int,344PRIMARY KEY (id)345)346""",347)348349def _verify_records(self, data):350conn = self.connect()351cursor = conn.cursor()352cursor.execute('SELECT id, name, age, height from bulkinsert')353result = cursor.fetchall()354self.assertEqual(sorted(data), sorted(result))355356def test_bulk_insert(self):357conn = self.connect()358cursor = conn.cursor()359360data = [(0, 'bob', 21, 123), (1, 'jim', 56, 45), (2, 'fred', 100, 180)]361cursor.executemany(362'insert into bulkinsert (id, name, age, height) ' 'values (%s,%s,%s,%s)',363data,364)365self.assertEqual(366cursor._executed,367bytearray(368b'insert into bulkinsert (id, name, age, height) values '369b"(0,'bob',21,123),(1,'jim',56,45),(2,'fred',100,180)",370),371)372cursor.execute('commit')373self._verify_records(data)374375def test_bulk_insert_multiline_statement(self):376conn = self.connect()377cursor = conn.cursor()378data = [(0, 'bob', 21, 123), (1, 'jim', 56, 45), (2, 'fred', 100, 180)]379cursor.executemany(380"""insert381into bulkinsert (id, name,382age, height)383values (%s,384%s , %s,385%s )386""",387data,388)389self.assertEqual(390cursor._executed.strip(),391bytearray(392b"""insert393into bulkinsert (id, name,394age, height)395values (0,396'bob' , 21,397123 ),(1,398'jim' , 56,39945 ),(2,400'fred' , 100,401180 )""",402),403)404cursor.execute('commit')405self._verify_records(data)406407def test_bulk_insert_single_record(self):408conn = self.connect()409cursor = conn.cursor()410data = [(0, 'bob', 21, 123)]411cursor.executemany(412'insert into bulkinsert (id, name, age, height) ' 'values (%s,%s,%s,%s)',413data,414)415cursor.execute('commit')416self._verify_records(data)417418def test_issue_288(self):419"""Executemany should work with "insert ... on update"."""420conn = self.connect()421cursor = conn.cursor()422data = [(0, 'bob', 21, 123), (1, 'jim', 56, 45), (2, 'fred', 100, 180)]423cursor.executemany(424"""insert425into bulkinsert (id, name,426age, height)427values (%s,428%s , %s,429%s ) on duplicate key update430age = values(age)431""",432data,433)434self.assertEqual(435cursor._executed.strip(),436bytearray(437b"""insert438into bulkinsert (id, name,439age, height)440values (0,441'bob' , 21,442123 ),(1,443'jim' , 56,44445 ),(2,445'fred' , 100,446180 ) on duplicate key update447age = values(age)""",448),449)450cursor.execute('commit')451self._verify_records(data)452453454