Path: blob/main/singlestoredb/mysql/tests/test_issues.py
469 views
# type: ignore1import datetime2import time3import warnings45import pytest67import singlestoredb.mysql as sv8from singlestoredb.mysql.tests import base910__all__ = ['TestOldIssues', 'TestNewIssues', 'TestGitHubIssues']111213class TestOldIssues(base.PyMySQLTestCase):1415def test_issue_3(self):16"""Undefined methods datetime_or_None, date_or_None."""17conn = self.connect()18c = conn.cursor()19with warnings.catch_warnings():20warnings.filterwarnings('ignore')21c.execute('drop table if exists issue3')22c.execute('create table issue3 (d date, t time, dt datetime, ts timestamp)')23try:24c.execute(25'insert into issue3 (d, t, dt, ts) values (%s,%s,%s,%s)',26(None, None, None, None),27)28c.execute('select d from issue3')29self.assertEqual(None, c.fetchone()[0])30c.execute('select t from issue3')31self.assertEqual(None, c.fetchone()[0])32c.execute('select dt from issue3')33self.assertEqual(None, c.fetchone()[0])34c.execute('select ts from issue3')35self.assertIn(36type(c.fetchone()[0]),37(type(None), datetime.datetime),38'expected Python type None or datetime from SQL timestamp',39)40finally:41c.execute('drop table issue3')4243def test_issue_4(self):44"""Can't retrieve TIMESTAMP fields."""45conn = self.connect()46c = conn.cursor()47with warnings.catch_warnings():48warnings.filterwarnings('ignore')49c.execute('drop table if exists issue4')50c.execute('create table issue4 (ts timestamp)')51try:52c.execute('insert into issue4 (ts) values (now())')53c.execute('select ts from issue4')54self.assertTrue(isinstance(c.fetchone()[0], datetime.datetime))55finally:56c.execute('drop table issue4')5758def test_issue_5(self):59"""Query on information_schema.tables fails."""60con = self.connect()61cur = con.cursor()62cur.execute('select * from information_schema.tables')6364@pytest.mark.skip(reason='database is not created')65def test_issue_6(self):66"""TypeError: ord() expected a character, but string of length 0 found."""67# ToDo: this test requires access to db 'mysql'.68kwargs = self.databases[0].copy()69kwargs['database'] = 'mysql'70conn = sv.connect(**kwargs)71c = conn.cursor()72c.execute('select * from user')73conn.close()7475def test_issue_8(self):76"""Primary Key and Index error when selecting data."""77conn = self.connect()78c = conn.cursor()79with warnings.catch_warnings():80warnings.filterwarnings('ignore')81c.execute('drop table if exists test')82c.execute(83"""CREATE TABLE `test` (`station` int NOT NULL DEFAULT '0', `dh`84datetime NOT NULL DEFAULT '2015-01-01 00:00:00', `echeance` int NOT NULL85DEFAULT '0', `me` double DEFAULT NULL, `mo` double DEFAULT NULL, PRIMARY86KEY (`station`,`dh`,`echeance`)) ENGINE=MyISAM DEFAULT CHARSET=latin1;""",87)88try:89self.assertIn(c.execute('SELECT * FROM test'), [0, -1])90c.execute('ALTER TABLE `test` ADD INDEX `idx_station` (`station`)')91self.assertIn(c.execute('SELECT * FROM test'), [0, -1])92finally:93c.execute('drop table test')9495def test_issue_13(self):96"""Can't handle large result fields."""97conn = self.connect()98cur = conn.cursor()99with warnings.catch_warnings():100warnings.filterwarnings('ignore')101cur.execute('drop table if exists issue13')102try:103cur.execute('create table issue13 (t text)')104# ticket says 18k105size = 18 * 1024106cur.execute('insert into issue13 (t) values (%s)', ('x' * size,))107cur.execute('select t from issue13')108# use assertTrue so that obscenely huge error messages don't print109r = cur.fetchone()[0]110self.assertTrue('x' * size == r)111finally:112cur.execute('drop table issue13')113114def test_issue_15(self):115"""Query should be expanded before perform character encoding."""116conn = self.connect()117c = conn.cursor()118with warnings.catch_warnings():119warnings.filterwarnings('ignore')120c.execute('drop table if exists issue15')121c.execute('create table issue15 (t varchar(32))')122try:123c.execute('insert into issue15 (t) values (%s)', ('\xe4\xf6\xfc',))124c.execute('select t from issue15')125self.assertEqual('\xe4\xf6\xfc', c.fetchone()[0])126finally:127c.execute('drop table issue15')128129def test_issue_16(self):130"""Patch for string and tuple escaping."""131conn = self.connect()132c = conn.cursor()133with warnings.catch_warnings():134warnings.filterwarnings('ignore')135c.execute('drop table if exists issue16')136c.execute(137'create table issue16 (name varchar(32) primary key, email varchar(32))',138)139try:140c.execute(141"insert into issue16 (name, email) values ('pete', 'floydophone')",142)143c.execute('select email from issue16 where name=%s', ('pete',))144self.assertEqual('floydophone', c.fetchone()[0])145finally:146c.execute('drop table issue16')147148@pytest.mark.skip(149'test_issue_17() requires a custom, legacy MySQL configuration '150'and will not be run.',151)152def test_issue_17(self):153"""Could not connect mysql use password."""154conn = self.connect()155host = self.databases[0]['host']156db = self.databases[0]['database']157c = conn.cursor()158159# grant access to a table to a user with a password160try:161with warnings.catch_warnings():162warnings.filterwarnings('ignore')163c.execute('drop table if exists issue17')164c.execute('create table issue17 (x varchar(32) primary key)')165c.execute("insert into issue17 (x) values ('hello, world!')")166c.execute(167(168"grant all privileges on %s.issue17 to 'issue17user'@'%%' "169"identified by '1234'"170) % db,171)172conn.commit()173174conn2 = sv.connect(host=host, user='issue17user', passwd='1234', db=db)175c2 = conn2.cursor()176c2.execute('select x from issue17')177self.assertEqual('hello, world!', c2.fetchone()[0])178finally:179c.execute('drop table issue17')180181182class TestNewIssues(base.PyMySQLTestCase):183184def test_issue_34(self):185try:186sv.connect(host='localhost', port=1237, user='root')187self.fail()188except sv.OperationalError as e:189self.assertEqual(2003, e.args[0])190except Exception:191self.fail()192193def test_issue_33(self):194conn = sv.connect(charset='utf8', **self.databases[0])195self.safe_create_table(196conn, 'hei\xdfe', 'create table hei\xdfe (name varchar(32))',197)198c = conn.cursor()199c.execute("insert into hei\xdfe (name) values ('Pi\xdfata')")200c.execute('select name from hei\xdfe')201self.assertEqual('Pi\xdfata', c.fetchone()[0])202203@pytest.mark.skip('This test requires manual intervention')204def test_issue_35(self):205conn = self.connect()206c = conn.cursor()207print('sudo killall -9 mysqld within the next 10 seconds')208try:209c.execute('select sleep(10)')210self.fail()211except sv.OperationalError as e:212self.assertEqual(2013, e.args[0])213214def test_issue_36(self):215# connection 0 is super user, connection 1 isn't216conn = self.connections[1]217c = conn.cursor()218c.execute('show processlist')219kill_id = None220for row in c.fetchall():221id = row[0]222info = row[7]223if info == 'show processlist':224kill_id = id225break226self.assertEqual(kill_id, conn.thread_id())227# now nuke the connection228self.connections[0].kill(kill_id)229# make sure this connection has broken230try:231c.execute('show tables')232self.fail()233except Exception:234pass235c.close()236conn.close()237238# check the process list from the other connection239try:240# Wait since Travis-CI sometimes fail this test.241time.sleep(0.1)242243c = self.connections[0].cursor()244c.execute('show processlist')245ids = [row[0] for row in c.fetchall()]246self.assertFalse(kill_id in ids)247finally:248del self.connections[1]249250@pytest.mark.skip(reason='@foo is not set in SingleStoreDB')251def test_issue_37(self):252conn = self.connect()253c = conn.cursor()254self.assertEqual(1, c.execute('SELECT @foo'))255self.assertEqual((None,), c.fetchone())256self.assertEqual(0, c.execute("SET @foo = 'bar'"))257c.execute("set @foo = 'bar'")258259def test_issue_38(self):260conn = self.connect()261c = conn.cursor()262datum = 'a' * 1024 * 1023 # reduced size for most default mysql installs263264try:265with warnings.catch_warnings():266warnings.filterwarnings('ignore')267c.execute('drop table if exists issue38')268c.execute('create table issue38 (id integer, data mediumblob)')269c.execute('insert into issue38 values (1, %s)', (datum,))270finally:271c.execute('drop table issue38')272273def disabled_test_issue_54(self):274conn = self.connect()275c = conn.cursor()276with warnings.catch_warnings():277warnings.filterwarnings('ignore')278c.execute('drop table if exists issue54')279big_sql = 'select * from issue54 where '280big_sql += ' and '.join('%d=%d' % (i, i) for i in range(0, 100000))281282try:283c.execute('create table issue54 (id integer primary key)')284c.execute('insert into issue54 (id) values (7)')285c.execute(big_sql)286self.assertEqual(7, c.fetchone()[0])287finally:288c.execute('drop table issue54')289290291class TestGitHubIssues(base.PyMySQLTestCase):292293def test_issue_66(self):294"""'Connection' object has no attribute 'insert_id'."""295conn = self.connect()296c = conn.cursor()297self.assertEqual(0, conn.insert_id())298try:299with warnings.catch_warnings():300warnings.filterwarnings('ignore')301c.execute('drop table if exists issue66')302c.execute(303'create table issue66 (id integer primary key auto_increment, x integer)',304)305c.execute('insert into issue66 (x) values (1)')306c.execute('insert into issue66 (x) values (1)')307self.assertEqual(2, conn.insert_id())308finally:309c.execute('drop table issue66')310311def test_issue_79(self):312"""Duplicate field overwrites the previous one in the result of DictCursor."""313conn = self.connect(cursorclass=sv.cursors.DictCursor)314c = conn.cursor()315316with warnings.catch_warnings():317warnings.filterwarnings('ignore')318c.execute('drop table if exists a')319c.execute('drop table if exists b')320c.execute("""CREATE TABLE a (id int, value int)""")321c.execute("""CREATE TABLE b (id int, value int)""")322323a = (1, 11)324b = (1, 22)325try:326c.execute('insert into a values (%s, %s)', a)327c.execute('insert into b values (%s, %s)', b)328329c.execute('SELECT * FROM a inner join b on a.id = b.id')330r = c.fetchall()[0]331self.assertEqual(r['id'], 1)332self.assertEqual(r['value'], 11)333self.assertEqual(r['b.value'], 22)334finally:335c.execute('drop table a')336c.execute('drop table b')337338def test_issue_95(self):339"""Leftover trailing OK packet for "CALL my_sp" queries."""340conn = self.connect()341cur = conn.cursor()342with warnings.catch_warnings():343warnings.filterwarnings('ignore')344cur.execute('DROP PROCEDURE IF EXISTS `foo`')345cur.execute(346"""CREATE PROCEDURE `foo` () AS347BEGIN348ECHO SELECT 1;349END""",350)351try:352cur.execute("""CALL foo()""")353cur.execute("""SELECT 1""")354self.assertEqual(cur.fetchone()[0], 1)355finally:356with warnings.catch_warnings():357warnings.filterwarnings('ignore')358cur.execute('DROP PROCEDURE IF EXISTS `foo`')359360def test_issue_114(self):361"""Autocommit is not set after reconnecting with ping()."""362conn = sv.connect(charset='utf8', **self.databases[0])363conn.autocommit(False)364c = conn.cursor()365366if type(c).__name__.startswith('SS'):367self.skipTest('Test hangs with unbuffered cursor')368369c.execute("""select @@autocommit;""")370self.assertFalse(c.fetchone()[0])371conn.close()372conn.ping()373c.execute("""select @@autocommit;""")374self.assertFalse(c.fetchone()[0])375conn.close()376377# Ensure autocommit() is still working378conn = sv.connect(charset='utf8', **self.databases[0])379c = conn.cursor()380c.execute("""select @@autocommit;""")381self.assertFalse(c.fetchone()[0])382conn.close()383conn.ping()384conn.autocommit(True)385c.execute("""select @@autocommit;""")386self.assertTrue(c.fetchone()[0])387conn.close()388389def test_issue_175(self):390"""The number of fields returned by server is read in wrong way."""391conn = self.connect()392cur = conn.cursor()393for length in (200, 300):394columns = ', '.join('c{0} integer'.format(i) for i in range(length))395sql = 'create table test_field_count ({0})'.format(columns)396try:397cur.execute(sql)398cur.execute('select * from test_field_count')399assert len(cur.description) == length400finally:401with warnings.catch_warnings():402warnings.filterwarnings('ignore')403cur.execute('drop table if exists test_field_count')404405def test_issue_321(self):406"""Test iterable as query argument."""407conn = sv.connect(charset='utf8', **self.databases[0])408self.safe_create_table(409conn,410'issue321',411'create table issue321 (value_1 varchar(1), value_2 varchar(1))',412)413414sql_insert = 'insert into issue321 (value_1, value_2) values (%s, %s)'415sql_dict_insert = (416'insert into issue321 (value_1, value_2) '417'values (%(value_1)s, %(value_2)s)'418)419sql_select = 'select * from issue321 ' + \420'where value_1 in %s and value_2=%s order by value_1'421data = [422[('a',), '\u0430'],423[['b'], '\u0430'],424{'value_1': [['c']], 'value_2': '\u0430'},425]426cur = conn.cursor()427self.assertEqual(cur.execute(sql_insert, data[0]), 1)428self.assertEqual(cur.execute(sql_insert, data[1]), 1)429self.assertEqual(cur.execute(sql_dict_insert, data[2]), 1)430self.assertIn(cur.execute(sql_select, [('a', 'b', 'c'), '\u0430']), [3, -1])431self.assertEqual(cur.fetchone(), ('a', '\u0430'))432self.assertEqual(cur.fetchone(), ('b', '\u0430'))433self.assertEqual(cur.fetchone(), ('c', '\u0430'))434435def test_issue_364(self):436"""Test mixed unicode/binary arguments in executemany."""437conn = sv.connect(charset='utf8mb4', **self.databases[0])438self.safe_create_table(439conn,440'issue364',441'create table issue364 (value_1 binary(3), value_2 varchar(3)) '442'engine=InnoDB default charset=utf8mb4',443)444445sql = 'insert into issue364 (value_1, value_2) values (_binary %s, %s)'446usql = 'insert into issue364 (value_1, value_2) values (_binary %s, %s)'447values = [sv.Binary(b'\x00\xff\x00'), '\xe4\xf6\xfc']448449# test single insert and select450cur = conn.cursor()451cur.execute(sql, args=values)452cur.execute('select * from issue364')453self.assertEqual(cur.fetchone(), tuple(values))454455# test single insert unicode query456cur.execute(usql, args=values)457458# test multi insert and select459cur.executemany(sql, args=(values, values, values))460cur.execute('select * from issue364')461for row in cur.fetchall():462self.assertEqual(row, tuple(values))463464# test multi insert with unicode query465cur.executemany(usql, args=(values, values, values))466467@pytest.mark.skip(reason='syntax not supported by SingleStoreDB')468def test_issue_363(self):469"""Test binary / geometry types."""470conn = sv.connect(charset='utf8', **self.databases[0])471self.safe_create_table(472conn,473'issue363',474'CREATE TABLE issue363 ( '475'id INTEGER PRIMARY KEY, geom LINESTRING NOT NULL /*!80003 SRID 0 */, '476'SPATIAL KEY geom (geom)) '477'ENGINE=MyISAM',478)479480cur = conn.cursor()481query = (482'INSERT INTO issue363 (id, geom) VALUES'483"(1998, ST_GeomFromText('LINESTRING(1.1 1.1,2.2 2.2)'))"484)485cur.execute(query)486487# select WKT488query = 'SELECT ST_AsText(geom) FROM issue363'489cur.execute(query)490row = cur.fetchone()491self.assertEqual(row, ('LINESTRING(1.1 1.1,2.2 2.2)',))492493# select WKB494query = 'SELECT ST_AsBinary(geom) FROM issue363'495cur.execute(query)496row = cur.fetchone()497self.assertEqual(498row,499(500b'\x01\x02\x00\x00\x00\x02\x00\x00\x00'501b'\x9a\x99\x99\x99\x99\x99\xf1?'502b'\x9a\x99\x99\x99\x99\x99\xf1?'503b'\x9a\x99\x99\x99\x99\x99\x01@'504b'\x9a\x99\x99\x99\x99\x99\x01@',505),506)507508# select internal binary509cur.execute('SELECT geom FROM issue363')510row = cur.fetchone()511# don't assert the exact internal binary value, as it could512# vary across implementations513self.assertTrue(isinstance(row[0], bytes))514515516