Path: blob/main/singlestoredb/tests/test_basics.py
469 views
#!/usr/bin/env python1# type: ignore2"""Basic SingleStoreDB connection testing."""3import datetime4import decimal5import math6import os7import unittest8from typing import Optional910from requests.exceptions import InvalidJSONError1112try:13import numpy as np14has_numpy = True15except ImportError:16has_numpy = False1718try:19import shapely.wkt20has_shapely = True21except ImportError:22has_shapely = False2324try:25import pygeos26from pygeos.testing import assert_geometries_equal27has_pygeos = True28except ImportError:29has_pygeos = False3031try:32import pydantic33has_pydantic = True34except ImportError:35has_pydantic = False3637import singlestoredb as s238from . import utils39# import traceback404142class TestBasics(unittest.TestCase):4344dbname: str = ''45dbexisted: bool = False4647@classmethod48def setUpClass(cls):49sql_file = os.path.join(os.path.dirname(__file__), 'test.sql')50cls.dbname, cls.dbexisted = utils.load_sql(sql_file)5152@classmethod53def tearDownClass(cls):54if not cls.dbexisted:55utils.drop_database(cls.dbname)5657def setUp(self):58self.conn = s2.connect(database=type(self).dbname)59self.cur = self.conn.cursor()6061def tearDown(self):62try:63if self.cur is not None:64self.cur.close()65except Exception:66# traceback.print_exc()67pass6869try:70if self.conn is not None:71self.conn.close()72except Exception:73# traceback.print_exc()74pass7576def test_connection(self):77self.cur.execute('show databases')78dbs = set([x[0] for x in self.cur.fetchall()])79assert type(self).dbname in dbs, dbs8081def test_fetchall(self):82self.cur.execute('select * from data')8384out = self.cur.fetchall()8586desc = self.cur.description87rowcount = self.cur.rowcount88rownumber = self.cur.rownumber89lastrowid = self.cur.lastrowid9091assert sorted(out) == sorted([92('a', 'antelopes', 2),93('b', 'bears', 2),94('c', 'cats', 5),95('d', 'dogs', 4),96('e', 'elephants', 0),97]), out9899assert rowcount in (5, -1), rowcount100assert rownumber == 5, rownumber101assert lastrowid is None, lastrowid102assert len(desc) == 3, desc103assert desc[0].name == 'id', desc[0].name104assert desc[0].type_code in [253, 15], desc[0].type_code105assert desc[1].name == 'name', desc[1].name106assert desc[1].type_code in [253, 15], desc[1].type_code107assert desc[2].name == 'value', desc[2].name108assert desc[2].type_code == 8, desc[2].type_code109110def test_fetchone(self):111self.cur.execute('select * from data')112113out = []114while True:115row = self.cur.fetchone()116if row is None:117break118out.append(row)119assert self.cur.rownumber == len(out), self.cur.rownumber120121desc = self.cur.description122rowcount = self.cur.rowcount123rownumber = self.cur.rownumber124lastrowid = self.cur.lastrowid125126assert sorted(out) == sorted([127('a', 'antelopes', 2),128('b', 'bears', 2),129('c', 'cats', 5),130('d', 'dogs', 4),131('e', 'elephants', 0),132]), out133134assert rowcount in (5, -1), rowcount135assert rownumber == 5, rownumber136assert lastrowid is None, lastrowid137assert len(desc) == 3, desc138assert desc[0].name == 'id', desc[0].name139assert desc[0].type_code in [253, 15], desc[0].type_code140assert desc[1].name == 'name', desc[1].name141assert desc[1].type_code in [253, 15], desc[1].type_code142assert desc[2].name == 'value', desc[2].name143assert desc[2].type_code == 8, desc[2].type_code144145def test_fetchmany(self):146self.cur.execute('select * from data')147148out = []149while True:150rows = self.cur.fetchmany(size=3)151assert len(rows) <= 3, rows152if not rows:153break154out.extend(rows)155assert self.cur.rownumber == len(out), self.cur.rownumber156157desc = self.cur.description158rowcount = self.cur.rowcount159rownumber = self.cur.rownumber160lastrowid = self.cur.lastrowid161162assert sorted(out) == sorted([163('a', 'antelopes', 2),164('b', 'bears', 2),165('c', 'cats', 5),166('d', 'dogs', 4),167('e', 'elephants', 0),168]), out169170assert rowcount in (5, -1), rowcount171assert rownumber == 5, rownumber172assert lastrowid is None, lastrowid173assert len(desc) == 3, desc174assert desc[0].name == 'id'175assert desc[0].type_code in [253, 15]176assert desc[1].name == 'name'177assert desc[1].type_code in [253, 15]178assert desc[2].name == 'value'179assert desc[2].type_code == 8180181def test_arraysize(self):182self.cur.execute('select * from data')183184self.cur.arraysize = 3185assert self.cur.arraysize == 3186187rows = self.cur.fetchmany()188assert len(rows) == 3, rows189assert self.cur.rownumber == 3, self.cur.rownumber190191self.cur.arraysize = 1192assert self.cur.arraysize == 1193194rows = self.cur.fetchmany()195assert len(rows) == 1, rows196assert self.cur.rownumber == 4, self.cur.rownumber197198rows = self.cur.fetchmany()199assert len(rows) == 1, rows200assert self.cur.rownumber == 5, self.cur.rownumber201202rows = self.cur.fetchall()203assert len(rows) == 0, rows204assert self.cur.rownumber == 5, self.cur.rownumber205206def test_execute_with_dict_params(self):207self.cur.execute('select * from data where id < %(name)s', dict(name='d'))208out = self.cur.fetchall()209210desc = self.cur.description211rowcount = self.cur.rowcount212lastrowid = self.cur.lastrowid213214assert sorted(out) == sorted([215('a', 'antelopes', 2),216('b', 'bears', 2),217('c', 'cats', 5),218]), out219220assert rowcount in (3, -1), rowcount221assert lastrowid is None, lastrowid222assert len(desc) == 3, desc223assert desc[0].name == 'id', desc[0].name224assert desc[0].type_code in [253, 15], desc[0].type_code225assert desc[1].name == 'name', desc[1].name226assert desc[1].type_code in [253, 15], desc[1].type_code227assert desc[2].name == 'value', desc[2].name228assert desc[2].type_code == 8, desc[2].type_code229230with self.assertRaises(KeyError):231self.cur.execute('select * from data where id < %(name)s', dict(foo='d'))232233def test_execute_with_positional_params(self):234self.cur.execute('select * from data where id < %s', ['d'])235out = self.cur.fetchall()236237desc = self.cur.description238rowcount = self.cur.rowcount239lastrowid = self.cur.lastrowid240241assert sorted(out) == sorted([242('a', 'antelopes', 2),243('b', 'bears', 2),244('c', 'cats', 5),245]), out246247assert rowcount in (3, -1), rowcount248assert lastrowid is None, lastrowid249assert len(desc) == 3, desc250assert desc[0].name == 'id', desc[0].name251assert desc[0].type_code in [253, 15], desc[0].type_code252assert desc[1].name == 'name', desc[1].name253assert desc[1].type_code in [253, 15], desc[1].type_code254assert desc[2].name == 'value', desc[2].name255assert desc[2].type_code == 8, desc[2].type_code256257with self.assertRaises(TypeError):258self.cur.execute(259'select * from data where id < %s and id > %s', ['d', 'e', 'f'],260)261262with self.assertRaises(TypeError):263self.cur.execute('select * from data where id < %s and id > %s', ['d'])264265def test_execute_with_escaped_positional_substitutions(self):266self.cur.execute(267'select `id`, `time` from alltypes where `time` = %s', ['00:07:00'],268)269out = self.cur.fetchall()270assert out[0] == (0, datetime.timedelta(seconds=420)), out[0]271272self.cur.execute('select `id`, `time` from alltypes where `time` = "00:07:00"')273out = self.cur.fetchall()274assert out[0] == (0, datetime.timedelta(seconds=420)), out[0]275276# with self.assertRaises(IndexError):277# self.cur.execute(278# 'select `id`, `time` from alltypes where `id` = %1s '279# 'or `time` = "00:07:00"', [0],280# )281282self.cur.execute(283'select `id`, `time` from alltypes where `id` = %s '284'or `time` = "00:07:00"', [0],285)286out = self.cur.fetchall()287assert out[0] == (0, datetime.timedelta(seconds=420)), out[0]288289def test_execute_with_escaped_substitutions(self):290self.cur.execute(291'select `id`, `time` from alltypes where `time` = %(time)s',292dict(time='00:07:00'),293)294out = self.cur.fetchall()295assert out[0] == (0, datetime.timedelta(seconds=420)), out[0]296297self.cur.execute(298'select `id`, `time` from alltypes where `time` = %(time)s',299dict(time='00:07:00'),300)301out = self.cur.fetchall()302assert len(out) == 1, out303304with self.assertRaises(KeyError):305self.cur.execute(306'select `id`, `time`, `char_100` from alltypes '307'where `time` = %(time)s or `char_100` like "foo:bar"',308dict(x='00:07:00'),309)310311self.cur.execute(312'select `id`, `time`, `char_100` from alltypes '313'where `time` = %(time)s or `char_100` like "foo::bar"',314dict(time='00:07:00'),315)316out = self.cur.fetchall()317assert out[0][:2] == (0, datetime.timedelta(seconds=420)), out[0]318319def test_is_connected(self):320assert self.conn.is_connected()321assert self.cur.is_connected()322self.cur.close()323assert not self.cur.is_connected()324assert self.conn.is_connected()325self.conn.close()326assert not self.cur.is_connected()327assert not self.conn.is_connected()328329def test_connection_attr(self):330# Use context manager to get to underlying object (self.conn is a weakref.proxy)331with self.conn as conn:332assert conn is self.conn333334def test_executemany(self):335# NOTE: Doesn't actually do anything since no rows match336self.cur.executemany(337'delete from data where id > %(name)s',338[dict(name='z'), dict(name='y')],339)340341def test_executemany_no_args(self):342self.cur.executemany('select * from data where id > "z"')343344def test_context_managers(self):345with s2.connect() as conn:346with conn.cursor() as cur:347assert cur.is_connected()348assert conn.is_connected()349assert not cur.is_connected()350assert not conn.is_connected()351352def test_iterator(self):353self.cur.execute('select * from data')354355out = []356for row in self.cur:357out.append(row)358359assert sorted(out) == sorted([360('a', 'antelopes', 2),361('b', 'bears', 2),362('c', 'cats', 5),363('d', 'dogs', 4),364('e', 'elephants', 0),365]), out366367def test_urls(self):368from singlestoredb.connection import build_params369from singlestoredb.config import get_option370371# Full URL (without scheme)372url = 'me:[email protected]:3307/mydb'373out = build_params(host=url)374assert out['driver'] == get_option('driver'), out['driver']375assert out['host'] == 's2host.com', out['host']376assert out['port'] == 3307, out['port']377assert out['database'] == 'mydb', out['database']378assert out['user'] == 'me', out['user']379assert out['password'] == 'p455w0rd', out['password']380381# Full URL (with scheme)382url = 'http://me:[email protected]:3307/mydb'383out = build_params(host=url)384assert out['driver'] == 'http', out['driver']385assert out['host'] == 's2host.com', out['host']386assert out['port'] == 3307, out['port']387assert out['database'] == 'mydb', out['database']388assert out['user'] == 'me', out['user']389assert out['password'] == 'p455w0rd', out['password']390391# No port392url = 'me:[email protected]/mydb'393out = build_params(host=url)394assert out['driver'] == get_option('driver'), out['driver']395assert out['host'] == 's2host.com', out['host']396if out['driver'] in ['http', 'https']:397assert out['port'] in [get_option('http_port'), 80, 443], out['port']398else:399assert out['port'] in [get_option('port'), 3306], out['port']400assert out['database'] == 'mydb', out['database']401assert out['user'] == 'me', out['user']402assert out['password'] == 'p455w0rd', out['password']403404# No http port405url = 'http://me:[email protected]/mydb'406out = build_params(host=url)407assert out['driver'] == 'http', out['driver']408assert out['host'] == 's2host.com', out['host']409assert out['port'] in [get_option('http_port'), 80], out['port']410assert out['database'] == 'mydb', out['database']411assert out['user'] == 'me', out['user']412assert out['password'] == 'p455w0rd', out['password']413414# No https port415url = 'https://me:[email protected]/mydb'416out = build_params(host=url)417assert out['driver'] == 'https', out['driver']418assert out['host'] == 's2host.com', out['host']419assert out['port'] in [get_option('http_port'), 443], out['port']420assert out['database'] == 'mydb', out['database']421assert out['user'] == 'me', out['user']422assert out['password'] == 'p455w0rd', out['password']423424# Invalid port425url = 'https://me:[email protected]:foo/mydb'426with self.assertRaises(ValueError):427build_params(host=url)428429# Empty password430url = 'me:@s2host.com/mydb'431out = build_params(host=url)432assert out['driver'] == get_option('driver'), out['driver']433assert out['host'] == 's2host.com', out['host']434if out['driver'] in ['http', 'https']:435assert out['port'] in [get_option('http_port'), 80, 443], out['port']436else:437assert out['port'] in [get_option('port'), 3306], out['port']438assert out['database'] == 'mydb', out['database']439assert out['user'] == 'me', out['user']440assert out['password'] == '', out['password']441442# No user/password443url = 's2host.com/mydb'444out = build_params(host=url)445assert out['driver'] == get_option('driver'), out['driver']446assert out['host'] == 's2host.com', out['host']447if out['driver'] in ['http', 'https']:448assert out['port'] in [get_option('http_port'), 80, 443], out['port']449else:450assert out['port'] in [get_option('port'), 3306], out['port']451assert out['database'] == 'mydb', out['database']452assert 'user' not in out or out['user'] == get_option('user'), out['user']453assert 'password' not in out or out['password'] == get_option(454'password',455), out['password']456457# Just hostname458url = 's2host.com'459out = build_params(host=url)460assert out['driver'] == get_option('driver'), out['driver']461assert out['host'] == 's2host.com', out['host']462if out['driver'] in ['http', 'https']:463assert out['port'] in [get_option('http_port'), 80, 443], out['port']464else:465assert out['port'] in [get_option('port'), 3306], out['port']466assert 'database' not in out467assert 'user' not in out or out['user'] == get_option('user'), out['user']468assert 'password' not in out or out['password'] == get_option(469'password',470), out['password']471472# Just hostname and port473url = 's2host.com:1000'474out = build_params(host=url)475assert out['driver'] == get_option('driver'), out['driver']476assert out['host'] == 's2host.com', out['host']477assert out['port'] == 1000, out['port']478assert 'database' not in out479assert 'user' not in out or out['user'] == get_option('user'), out['user']480assert 'password' not in out or out['password'] == get_option(481'password',482), out['password']483484# Query options485url = 's2host.com:1000?local_infile=1&charset=utf8'486out = build_params(host=url)487assert out['driver'] == get_option('driver'), out['driver']488assert out['host'] == 's2host.com', out['host']489assert out['port'] == 1000, out['port']490assert 'database' not in out491assert 'user' not in out or out['user'] == get_option('user'), out['user']492assert 'password' not in out or out['password'] == get_option(493'password',494), out['password']495assert out['local_infile'] is True, out['local_infile']496assert out['charset'] == 'utf8', out['charset']497498# Bad query option499url = 's2host.com:1000?bad_param=10'500with self.assertRaises(ValueError):501build_params(host=url)502503def test_wrap_exc(self):504with self.assertRaises(s2.ProgrammingError) as cm:505self.cur.execute('garbage syntax')506507exc = cm.exception508assert exc.errno == 1064, exc.errno509assert 'You have an error in your SQL syntax' in exc.errmsg, exc.errmsg510511def test_extended_types(self):512if not has_numpy or not has_pygeos or not has_shapely:513self.skipTest('Test requires numpy, pygeos, and shapely')514515import uuid516517key = str(uuid.uuid4())518519# shapely data520data = [521(5221, 'POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))', 'POINT(1.5 1.5)',523[0.5, 0.6], datetime.datetime(1950, 1, 2, 12, 13, 14),524datetime.date(1950, 1, 2), datetime.time(12, 13, 14),525datetime.timedelta(seconds=123456), key,526),527(5282, 'POLYGON((5 1, 6 1, 6 2, 5 2, 5 1))', 'POINT(5.5 1.5)',529[1.3, 2.5], datetime.datetime(1960, 3, 4, 15, 16, 17),530datetime.date(1960, 3, 4), datetime.time(15, 16, 17),531datetime.timedelta(seconds=2), key,532),533(5343, 'POLYGON((5 5, 6 5, 6 6, 5 6, 5 5))', 'POINT(5.5 5.5)',535[10.3, 11.1], datetime.datetime(1970, 6, 7, 18, 19, 20),536datetime.date(1970, 5, 6), datetime.time(18, 19, 20),537datetime.timedelta(seconds=-2), key,538),539(5404, 'POLYGON((1 5, 2 5, 2 6, 1 6, 1 5))', 'POINT(1.5 5.5)',541[3.3, 3.4], datetime.datetime(1980, 8, 9, 21, 22, 23),542datetime.date(1980, 7, 8), datetime.time(21, 22, 23),543datetime.timedelta(seconds=-123456), key,544),545(5465, 'POLYGON((3 3, 4 3, 4 4, 3 4, 3 3))', 'POINT(3.5 3.5)',547[2.9, 9.5], datetime.datetime(2010, 10, 11, 1, 2, 3),548datetime.date(2010, 8, 9), datetime.time(1, 2, 3),549datetime.timedelta(seconds=0), key,550),551]552553new_data = []554for i, row in enumerate(data):555row = list(row)556row[1] = shapely.wkt.loads(row[1])557row[2] = shapely.wkt.loads(row[2])558if 'http' in self.conn.driver:559row[3] = ''560else:561row[3] = np.array(row[3], dtype='<f4')562new_data.append(row)563564self.cur.executemany(565'INSERT INTO extended_types '566'(id, geography, geographypoint, vectors, dt, d, t, td, testkey) '567'VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)', new_data,568)569570self.cur.execute(571'SELECT * FROM extended_types WHERE testkey = %s ORDER BY id', [key],572)573574for data_row, row in zip(new_data, self.cur):575assert data_row[0] == row[0]576assert data_row[1].equals_exact(shapely.wkt.loads(row[1]), 1e-4)577assert data_row[2].equals_exact(shapely.wkt.loads(row[2]), 1e-4)578if 'http' in self.conn.driver:579assert row[3] == b''580else:581assert (data_row[3] == np.frombuffer(row[3], dtype='<f4')).all()582583# pygeos data584data = [585(5866, 'POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))', 'POINT(1.5 1.5)',587[0.5, 0.6], datetime.datetime(1950, 1, 2, 12, 13, 14),588datetime.date(1950, 1, 2), datetime.time(12, 13, 14),589datetime.timedelta(seconds=123456), key,590),591(5927, 'POLYGON((5 1, 6 1, 6 2, 5 2, 5 1))', 'POINT(5.5 1.5)',593[1.3, 2.5], datetime.datetime(1960, 3, 4, 15, 16, 17),594datetime.date(1960, 3, 4), datetime.time(15, 16, 17),595datetime.timedelta(seconds=2), key,596),597(5988, 'POLYGON((5 5, 6 5, 6 6, 5 6, 5 5))', 'POINT(5.5 5.5)',599[10.3, 11.1], datetime.datetime(1970, 6, 7, 18, 19, 20),600datetime.date(1970, 5, 6), datetime.time(18, 19, 20),601datetime.timedelta(seconds=-2), key,602),603(6049, 'POLYGON((1 5, 2 5, 2 6, 1 6, 1 5))', 'POINT(1.5 5.5)',605[3.3, 3.4], datetime.datetime(1980, 8, 9, 21, 22, 23),606datetime.date(1980, 7, 8), datetime.time(21, 22, 23),607datetime.timedelta(seconds=-123456), key,608),609(61010, 'POLYGON((3 3, 4 3, 4 4, 3 4, 3 3))', 'POINT(3.5 3.5)',611[2.9, 9.5], datetime.datetime(2010, 10, 11, 1, 2, 3),612datetime.date(2010, 8, 9), datetime.time(1, 2, 3),613datetime.timedelta(seconds=0), key,614),615]616617new_data = []618for i, row in enumerate(data):619row = list(row)620row[1] = pygeos.io.from_wkt(row[1])621row[2] = pygeos.io.from_wkt(row[2])622if 'http' in self.conn.driver:623row[3] = ''624else:625row[3] = np.array(row[3], dtype='<f4')626new_data.append(row)627628self.cur.executemany(629'INSERT INTO extended_types '630'(id, geography, geographypoint, vectors, dt, d, t, td, testkey) '631'VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)', new_data,632)633634self.cur.execute(635'SELECT * FROM extended_types WHERE id >= 6 and testkey = %s ORDER BY id', [636key,637],638)639640for data_row, row in zip(new_data, self.cur):641assert data_row[0] == row[0]642assert_geometries_equal(data_row[1], pygeos.io.from_wkt(row[1]))643assert_geometries_equal(data_row[2], pygeos.io.from_wkt(row[2]))644if 'http' in self.conn.driver:645assert row[3] == b''646else:647assert (data_row[3] == np.frombuffer(row[3], dtype='<f4')).all()648649def test_alltypes(self):650self.cur.execute('select * from alltypes where id = 0')651names = [x[0] for x in self.cur.description]652types = [x[1] for x in self.cur.description]653out = self.cur.fetchone()654row = dict(zip(names, out))655typ = dict(zip(names, types))656657bits = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]658659def otype(x):660return x661662assert row['id'] == 0, row['id']663assert typ['id'] == otype(3), typ['id']664665assert row['tinyint'] == 80, row['tinyint']666assert typ['tinyint'] == otype(1), typ['tinyint']667668assert row['bool'] == 0, row['bool']669assert typ['bool'] == otype(1), typ['bool']670671assert row['boolean'] == 1, row['boolean']672assert typ['boolean'] == otype(1), typ['boolean']673674assert row['smallint'] == -27897, row['smallint']675assert typ['smallint'] == otype(2), typ['smallint']676677assert row['mediumint'] == 104729, row['mediumint']678assert typ['mediumint'] == otype(9), typ['mediumint']679680assert row['int24'] == -200899, row['int24']681assert typ['int24'] == otype(9), typ['int24']682683assert row['int'] == -1295369311, row['int']684assert typ['int'] == otype(3), typ['int']685686assert row['integer'] == -1741727421, row['integer']687assert typ['integer'] == otype(3), typ['integer']688689assert row['bigint'] == -266883847, row['bigint']690assert typ['bigint'] == otype(8), typ['bigint']691692assert row['float'] == -146487000.0, row['float']693assert typ['float'] == otype(4), typ['float']694695assert row['double'] == -474646154.719356, row['double']696assert typ['double'] == otype(5), typ['double']697698assert row['real'] == -901409776.279346, row['real']699assert typ['real'] == otype(5), typ['real']700701assert row['decimal'] == decimal.Decimal('28111097.610822'), row['decimal']702assert typ['decimal'] == otype(246), typ['decimal']703704assert row['dec'] == decimal.Decimal('389451155.931428'), row['dec']705assert typ['dec'] == otype(246), typ['dec']706707assert row['fixed'] == decimal.Decimal('-143773416.044092'), row['fixed']708assert typ['fixed'] == otype(246), typ['fixed']709710assert row['numeric'] == decimal.Decimal('866689461.300046'), row['numeric']711assert typ['numeric'] == otype(246), typ['numeric']712713assert row['date'] == datetime.date(8524, 11, 10), row['date']714assert typ['date'] == 10, typ['date']715716assert row['time'] == datetime.timedelta(minutes=7), row['time']717assert typ['time'] == 11, typ['time']718719assert row['time_6'] == datetime.timedelta(720hours=1, minutes=10, microseconds=2,721), row['time_6']722assert typ['time_6'] == 11, typ['time_6']723724assert row['datetime'] == datetime.datetime(7259948, 3, 11, 15, 29, 22,726), row['datetime']727assert typ['datetime'] == 12, typ['datetime']728729assert row['datetime_6'] == datetime.datetime(7301756, 10, 29, 2, 2, 42, 8,731), row['datetime_6']732assert typ['datetime_6'] == 12, typ['datetime_6']733734assert row['timestamp'] == datetime.datetime(7351980, 12, 31, 1, 10, 23,736), row['timestamp']737assert typ['timestamp'] == otype(7), typ['timestamp']738739assert row['timestamp_6'] == datetime.datetime(7401991, 1, 2, 22, 15, 10, 6,741), row['timestamp_6']742assert typ['timestamp_6'] == otype(7), typ['timestamp_6']743744assert row['year'] == 1923, row['year']745assert typ['year'] == otype(13), typ['year']746747assert row['char_100'] == \748'This is a test of a 100 character column.', row['char_100']749assert typ['char_100'] == otype(254), typ['char_100']750751assert row['binary_100'] == bytearray(bits + [0] * 84), row['binary_100']752assert typ['binary_100'] == otype(254), typ['binary_100']753754assert row['varchar_200'] == \755'This is a test of a variable character column.', row['varchar_200']756assert typ['varchar_200'] == otype(253), typ['varchar_200'] # why not 15?757758assert row['varbinary_200'] == bytearray(bits * 2), row['varbinary_200']759assert typ['varbinary_200'] == otype(253), typ['varbinary_200'] # why not 15?760761assert row['longtext'] == 'This is a longtext column.', row['longtext']762assert typ['longtext'] == otype(251), typ['longtext']763764assert row['mediumtext'] == 'This is a mediumtext column.', row['mediumtext']765assert typ['mediumtext'] == otype(250), typ['mediumtext']766767assert row['text'] == 'This is a text column.', row['text']768assert typ['text'] == otype(252), typ['text']769770assert row['tinytext'] == 'This is a tinytext column.'771assert typ['tinytext'] == otype(249), typ['tinytext']772773assert row['longblob'] == bytearray(bits * 3), row['longblob']774assert typ['longblob'] == otype(251), typ['longblob']775776assert row['mediumblob'] == bytearray(bits * 2), row['mediumblob']777assert typ['mediumblob'] == otype(250), typ['mediumblob']778779assert row['blob'] == bytearray(bits), row['blob']780assert typ['blob'] == otype(252), typ['blob']781782assert row['tinyblob'] == bytearray([10, 11, 12, 13, 14, 15]), row['tinyblob']783assert typ['tinyblob'] == otype(249), typ['tinyblob']784785assert row['json'] == {'a': 10, 'b': 2.75, 'c': 'hello world'}, row['json']786assert typ['json'] == otype(245), typ['json']787788assert row['enum'] == 'one', row['enum']789assert typ['enum'] == otype(253), typ['enum'] # mysql code: 247790791# TODO: HTTP sees this as a varchar, so it doesn't become a set.792assert row['set'] in [{'two'}, 'two'], row['set']793assert typ['set'] == otype(253), typ['set'] # mysql code: 248794795assert row['bit'] == b'\x00\x00\x00\x00\x00\x00\x00\x80', row['bit']796assert typ['bit'] == otype(16), typ['bit']797798def test_alltypes_nulls(self):799self.cur.execute('select * from alltypes where id = 1')800names = [x[0] for x in self.cur.description]801types = [x[1] for x in self.cur.description]802out = self.cur.fetchone()803row = dict(zip(names, out))804typ = dict(zip(names, types))805806def otype(x):807return x808809assert row['id'] == 1, row['id']810assert typ['id'] == otype(3), typ['id']811812assert row['tinyint'] is None, row['tinyint']813assert typ['tinyint'] == otype(1), typ['tinyint']814815assert row['bool'] is None, row['bool']816assert typ['bool'] == otype(1), typ['bool']817818assert row['boolean'] is None, row['boolean']819assert typ['boolean'] == otype(1), typ['boolean']820821assert row['smallint'] is None, row['smallint']822assert typ['smallint'] == otype(2), typ['smallint']823824assert row['mediumint'] is None, row['mediumint']825assert typ['mediumint'] == otype(9), typ['mediumint']826827assert row['int24'] is None, row['int24']828assert typ['int24'] == otype(9), typ['int24']829830assert row['int'] is None, row['int']831assert typ['int'] == otype(3), typ['int']832833assert row['integer'] is None, row['integer']834assert typ['integer'] == otype(3), typ['integer']835836assert row['bigint'] is None, row['bigint']837assert typ['bigint'] == otype(8), typ['bigint']838839assert row['float'] is None, row['float']840assert typ['float'] == otype(4), typ['float']841842assert row['double'] is None, row['double']843assert typ['double'] == otype(5), typ['double']844845assert row['real'] is None, row['real']846assert typ['real'] == otype(5), typ['real']847848assert row['decimal'] is None, row['decimal']849assert typ['decimal'] == otype(246), typ['decimal']850851assert row['dec'] is None, row['dec']852assert typ['dec'] == otype(246), typ['dec']853854assert row['fixed'] is None, row['fixed']855assert typ['fixed'] == otype(246), typ['fixed']856857assert row['numeric'] is None, row['numeric']858assert typ['numeric'] == otype(246), typ['numeric']859860assert row['date'] is None, row['date']861assert typ['date'] == 10, typ['date']862863assert row['time'] is None, row['time']864assert typ['time'] == 11, typ['time']865866assert row['time'] is None, row['time']867assert typ['time_6'] == 11, typ['time_6']868869assert row['datetime'] is None, row['datetime']870assert typ['datetime'] == 12, typ['datetime']871872assert row['datetime_6'] is None, row['datetime_6']873assert typ['datetime'] == 12, typ['datetime']874875assert row['timestamp'] is None, row['timestamp']876assert typ['timestamp'] == otype(7), typ['timestamp']877878assert row['timestamp_6'] is None, row['timestamp_6']879assert typ['timestamp_6'] == otype(7), typ['timestamp_6']880881assert row['year'] is None, row['year']882assert typ['year'] == otype(13), typ['year']883884assert row['char_100'] is None, row['char_100']885assert typ['char_100'] == otype(254), typ['char_100']886887assert row['binary_100'] is None, row['binary_100']888assert typ['binary_100'] == otype(254), typ['binary_100']889890assert row['varchar_200'] is None, typ['varchar_200']891assert typ['varchar_200'] == otype(253), typ['varchar_200'] # why not 15?892893assert row['varbinary_200'] is None, row['varbinary_200']894assert typ['varbinary_200'] == otype(253), typ['varbinary_200'] # why not 15?895896assert row['longtext'] is None, row['longtext']897assert typ['longtext'] == otype(251), typ['longtext']898899assert row['mediumtext'] is None, row['mediumtext']900assert typ['mediumtext'] == otype(250), typ['mediumtext']901902assert row['text'] is None, row['text']903assert typ['text'] == otype(252), typ['text']904905assert row['tinytext'] is None, row['tinytext']906assert typ['tinytext'] == otype(249), typ['tinytext']907908assert row['longblob'] is None, row['longblob']909assert typ['longblob'] == otype(251), typ['longblob']910911assert row['mediumblob'] is None, row['mediumblob']912assert typ['mediumblob'] == otype(250), typ['mediumblob']913914assert row['blob'] is None, row['blob']915assert typ['blob'] == otype(252), typ['blob']916917assert row['tinyblob'] is None, row['tinyblob']918assert typ['tinyblob'] == otype(249), typ['tinyblob']919920assert row['json'] is None, row['json']921assert typ['json'] == otype(245), typ['json']922923assert row['enum'] is None, row['enum']924assert typ['enum'] == otype(253), typ['enum'] # mysql code: 247925926assert row['set'] is None, row['set']927assert typ['set'] == otype(253), typ['set'] # mysql code: 248928929assert row['bit'] is None, row['bit']930assert typ['bit'] == otype(16), typ['bit']931932def test_alltypes_mins(self):933self.cur.execute('select * from alltypes where id = 2')934names = [x[0] for x in self.cur.description]935out = self.cur.fetchone()936row = dict(zip(names, out))937938expected = dict(939id=2,940tinyint=-128,941unsigned_tinyint=0,942bool=-128,943boolean=-128,944smallint=-32768,945unsigned_smallint=0,946mediumint=-8388608,947unsigned_mediumint=0,948int24=-8388608,949unsigned_int24=0,950int=-2147483648,951unsigned_int=0,952integer=-2147483648,953unsigned_integer=0,954bigint=-9223372036854775808,955unsigned_bigint=0,956float=0,957double=-1.7976931348623158e308,958real=-1.7976931348623158e308,959decimal=decimal.Decimal('-99999999999999.999999'),960dec=-decimal.Decimal('99999999999999.999999'),961fixed=decimal.Decimal('-99999999999999.999999'),962numeric=decimal.Decimal('-99999999999999.999999'),963date=datetime.date(1000, 1, 1),964time=-1 * datetime.timedelta(hours=838, minutes=59, seconds=59),965time_6=-1 * datetime.timedelta(hours=838, minutes=59, seconds=59),966datetime=datetime.datetime(1000, 1, 1, 0, 0, 0),967datetime_6=datetime.datetime(1000, 1, 1, 0, 0, 0, 0),968timestamp=datetime.datetime(1970, 1, 1, 0, 0, 1),969timestamp_6=datetime.datetime(1970, 1, 1, 0, 0, 1, 0),970year=1901,971char_100='',972binary_100=b'\x00' * 100,973varchar_200='',974varbinary_200=b'',975longtext='',976mediumtext='',977text='',978tinytext='',979longblob=b'',980mediumblob=b'',981blob=b'',982tinyblob=b'',983json={},984enum='one',985set='two',986bit=b'\x00\x00\x00\x00\x00\x00\x00\x00',987)988989for k, v in sorted(row.items()):990assert v == expected[k], '{} != {} in key {}'.format(v, expected[k], k)991992def test_alltypes_maxs(self):993self.cur.execute('select * from alltypes where id = 3')994names = [x[0] for x in self.cur.description]995out = self.cur.fetchone()996row = dict(zip(names, out))997998expected = dict(999id=3,1000tinyint=127,1001unsigned_tinyint=255,1002bool=127,1003boolean=127,1004smallint=32767,1005unsigned_smallint=65535,1006mediumint=8388607,1007unsigned_mediumint=16777215,1008int24=8388607,1009unsigned_int24=16777215,1010int=2147483647,1011unsigned_int=4294967295,1012integer=2147483647,1013unsigned_integer=4294967295,1014bigint=9223372036854775807,1015unsigned_bigint=18446744073709551615,1016float=0,1017double=1.7976931348623158e308,1018real=1.7976931348623158e308,1019decimal=decimal.Decimal('99999999999999.999999'),1020dec=decimal.Decimal('99999999999999.999999'),1021fixed=decimal.Decimal('99999999999999.999999'),1022numeric=decimal.Decimal('99999999999999.999999'),1023date=datetime.date(9999, 12, 31),1024time=datetime.timedelta(hours=838, minutes=59, seconds=59),1025time_6=datetime.timedelta(hours=838, minutes=59, seconds=59),1026datetime=datetime.datetime(9999, 12, 31, 23, 59, 59),1027datetime_6=datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),1028timestamp=datetime.datetime(2038, 1, 19, 3, 14, 7),1029timestamp_6=datetime.datetime(2038, 1, 19, 3, 14, 7, 999999),1030year=2155,1031char_100='',1032binary_100=b'\x00' * 100,1033varchar_200='',1034varbinary_200=b'',1035longtext='',1036mediumtext='',1037text='',1038tinytext='',1039longblob=b'',1040mediumblob=b'',1041blob=b'',1042tinyblob=b'',1043json={},1044enum='one',1045set='two',1046bit=b'\xff\xff\xff\xff\xff\xff\xff\xff',1047)10481049for k, v in sorted(row.items()):1050# TODO: Figure out how to get time zones working1051if 'timestamp' in k:1052continue1053assert v == expected[k], '{} != {} in key {}'.format(v, expected[k], k)10541055def test_alltypes_zeros(self):1056self.cur.execute('select * from alltypes where id = 4')1057names = [x[0] for x in self.cur.description]1058out = self.cur.fetchone()1059row = dict(zip(names, out))10601061expected = dict(1062id=4,1063tinyint=0,1064unsigned_tinyint=0,1065bool=0,1066boolean=0,1067smallint=0,1068unsigned_smallint=0,1069mediumint=0,1070unsigned_mediumint=0,1071int24=0,1072unsigned_int24=0,1073int=0,1074unsigned_int=0,1075integer=0,1076unsigned_integer=0,1077bigint=0,1078unsigned_bigint=0,1079float=0,1080double=0,1081real=0,1082decimal=decimal.Decimal('0.0'),1083dec=decimal.Decimal('0.0'),1084fixed=decimal.Decimal('0.0'),1085numeric=decimal.Decimal('0.0'),1086date=None,1087time=datetime.timedelta(hours=0, minutes=0, seconds=0),1088time_6=datetime.timedelta(hours=0, minutes=0, seconds=0, microseconds=0),1089datetime=None,1090datetime_6=None,1091timestamp=None,1092timestamp_6=None,1093year=None,1094char_100='',1095binary_100=b'\x00' * 100,1096varchar_200='',1097varbinary_200=b'',1098longtext='',1099mediumtext='',1100text='',1101tinytext='',1102longblob=b'',1103mediumblob=b'',1104blob=b'',1105tinyblob=b'',1106json={},1107enum='one',1108set='two',1109bit=b'\x00\x00\x00\x00\x00\x00\x00\x00',1110)11111112for k, v in sorted(row.items()):1113assert v == expected[k], '{} != {} in key {}'.format(v, expected[k], k)11141115def _test_MySQLdb(self):1116try:1117import json1118import MySQLdb1119except (ModuleNotFoundError, ImportError):1120self.skipTest('MySQLdb is not installed')11211122self.cur.execute('select * from alltypes order by id')1123s2_out = self.cur.fetchall()11241125port = self.conn.connection_params['port']1126if 'http' in self.conn.driver:1127port = 330611281129args = dict(1130host=self.conn.connection_params['host'],1131port=port,1132user=self.conn.connection_params['user'],1133password=self.conn.connection_params['password'],1134database=type(self).dbname,1135)11361137with MySQLdb.connect(**args) as conn:1138conn.converter[245] = json.loads1139with conn.cursor() as cur:1140cur.execute('select * from alltypes order by id')1141mydb_out = cur.fetchall()11421143for a, b in zip(s2_out, mydb_out):1144assert a == b, (a, b)11451146def test_int_string(self):1147string = 'a' * 481148self.cur.execute(f"SELECT 1, '{string}'")1149self.assertEqual((1, string), self.cur.fetchone())11501151def test_double_string(self):1152string = 'a' * 491153self.cur.execute(f"SELECT 1.2 :> DOUBLE, '{string}'")1154self.assertEqual((1.2, string), self.cur.fetchone())11551156def test_year_string(self):1157string = 'a' * 491158self.cur.execute(f"SELECT 1999 :> YEAR, '{string}'")1159self.assertEqual((1999, string), self.cur.fetchone())11601161def test_nan_as_null(self):1162with self.assertRaises((s2.ProgrammingError, InvalidJSONError)):1163self.cur.execute('SELECT %s :> DOUBLE AS X', [math.nan])11641165with s2.connect(database=type(self).dbname, nan_as_null=True) as conn:1166with conn.cursor() as cur:1167cur.execute('SELECT %s :> DOUBLE AS X', [math.nan])1168self.assertEqual(None, list(cur)[0][0])11691170with s2.connect(database=type(self).dbname, nan_as_null=True) as conn:1171with conn.cursor() as cur:1172cur.execute('SELECT %s :> DOUBLE AS X', [1.234])1173self.assertEqual(1.234, list(cur)[0][0])11741175def test_inf_as_null(self):1176with self.assertRaises((s2.ProgrammingError, InvalidJSONError)):1177self.cur.execute('SELECT %s :> DOUBLE AS X', [math.inf])11781179with s2.connect(database=type(self).dbname, inf_as_null=True) as conn:1180with conn.cursor() as cur:1181cur.execute('SELECT %s :> DOUBLE AS X', [math.inf])1182self.assertEqual(None, list(cur)[0][0])11831184with s2.connect(database=type(self).dbname, inf_as_null=True) as conn:1185with conn.cursor() as cur:1186cur.execute('SELECT %s :> DOUBLE AS X', [1.234])1187self.assertEqual(1.234, list(cur)[0][0])11881189def test_encoding_errors(self):1190with s2.connect(1191database=type(self).dbname,1192encoding_errors='strict',1193) as conn:1194with conn.cursor() as cur:1195cur.execute('SELECT * FROM badutf8')1196list(cur)11971198with s2.connect(1199database=type(self).dbname,1200encoding_errors='backslashreplace',1201) as conn:1202with conn.cursor() as cur:1203cur.execute('SELECT * FROM badutf8')1204list(cur)12051206def test_character_lengths(self):1207if 'http' in self.conn.driver:1208self.skipTest('Character lengths too long for HTTP interface')12091210tbl_id = str(id(self))12111212self.cur.execute('DROP TABLE IF EXISTS test_character_lengths')1213self.cur.execute(rf'''1214CREATE TABLE `test_character_lengths_{tbl_id}` (1215`id` text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,1216`char_col` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,1217`int_col` INT,1218PRIMARY KEY (`id`),1219SORT KEY `id` (`id`)1220) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL1221AUTOSTATS_HISTOGRAM_MODE=CREATE1222AUTOSTATS_SAMPLING=ON1223SQL_MODE='STRICT_ALL_TABLES'1224''')12251226CHAR_STR_SHORT = 'a'1227CHAR_STR_LONG = 'a' * (2**8-1)1228SHORT_STR_SHORT = 'a' * ((2**8-1) + 1)1229SHORT_STR_LONG = 'a' * (2**16-1)1230INT24_STR_SHORT = 'a' * ((2**16-1) + 1)1231INT24_STR_LONG = 'a' * (2**24-1)1232INT64_STR_SHORT = 'a' * ((2**24-1) + 1)1233INT64_STR_LONG = 'a' * ((2**24-1) + 100000)12341235data = [1236['CHAR_SHORT', CHAR_STR_SHORT, 123456],1237['CHAR_LONG', CHAR_STR_LONG, 123456],1238['SHORT_SHORT', SHORT_STR_SHORT, 123456],1239['SHORT_LONG', SHORT_STR_LONG, 123456],1240['INT24_SHORT', INT24_STR_SHORT, 123456],1241['INT24_LONG', INT24_STR_LONG, 123456],1242['INT64_SHORT', INT64_STR_SHORT, 123456],1243['INT64_LONG', INT64_STR_LONG, 123456],1244]12451246self.cur.executemany(1247f'INSERT INTO test_character_lengths_{tbl_id}(id, char_col, int_col) '1248'VALUES (%s, %s, %s)', data,1249)12501251for i, row in enumerate(data):1252self.cur.execute(1253f'SELECT id, char_col, int_col FROM test_character_lengths_{tbl_id} '1254'WHERE id = %s',1255[row[0]],1256)1257assert data[i] == list(list(self.cur)[0])12581259try:1260self.cur.execute(f'DROP TABLE test_character_lengths_{tbl_id}')1261except Exception:1262pass12631264def test_pydantic(self):1265if not has_pydantic:1266self.skipTest('Test requires pydantic')12671268tblname = 'foo_' + str(id(self))12691270class FooData(pydantic.BaseModel):1271x: Optional[int]1272y: Optional[float]1273z: Optional[str] = None12741275self.cur.execute(f'''1276CREATE TABLE {tblname}(1277x INT,1278y DOUBLE,1279z TEXT1280)1281''')12821283self.cur.execute(1284f'INSERT INTO {tblname}(x, y) VALUES (%(x)s, %(y)s)',1285FooData(x=2, y=3.23),1286)12871288self.cur.execute('SELECT * FROM ' + tblname)12891290assert list(sorted(self.cur.fetchall())) == \1291list(sorted([(2, 3.23, None)]))12921293self.cur.executemany(1294f'INSERT INTO {tblname}(x) VALUES (%(x)s)',1295[FooData(x=3, y=3.12), FooData(x=10, y=100.11)],1296)12971298self.cur.execute('SELECT * FROM ' + tblname)12991300assert list(sorted(self.cur.fetchall())) == \1301list(1302sorted([1303(2, 3.23, None),1304(3, None, None),1305(10, None, None),1306]),1307)13081309def test_charset(self):1310self.skipTest('Skip until charset commands are re-implemented')13111312with s2.connect(database=type(self).dbname) as conn:1313with conn.cursor() as cur:1314cur.execute('''1315select json_extract_string('{"foo":"😀"}', "bar");1316''')13171318if 'http' in self.conn.driver:1319self.skipTest('Charset is not use in HTTP interface')13201321with self.assertRaises(s2.OperationalError):1322with s2.connect(database=type(self).dbname, charset='utf8') as conn:1323with conn.cursor() as cur:1324cur.execute('''1325select json_extract_string('{"foo":"😀"}', "bar");1326''')132713281329if __name__ == '__main__':1330import nose21331nose2.main()133213331334