Path: blob/main/singlestoredb/mysql/tests/base.py
469 views
# type: ignore1import json2import os3import platform4import re5import unittest6import warnings78import singlestoredb.mysql as sv9from singlestoredb.connection import build_params1011DBNAME_BASE = 'singlestoredb__test_%s_%s_%s_%s_' % \12(13*platform.python_version_tuple()[:2],14platform.system(), platform.machine(),15)161718class PyMySQLTestCase(unittest.TestCase):19# You can specify your test environment creating a file named20# "databases.json" or editing the `databases` variable below.21fname = os.path.join(os.path.dirname(__file__), 'databases.json')22if os.path.exists(fname):23with open(fname) as f:24databases = json.load(f)25else:26params = build_params()27databases = [28{29'host': params['host'],30'port': params['port'],31'user': params['user'],32'passwd': params['password'],33'database': DBNAME_BASE + '1',34'use_unicode': True,35'local_infile': True,36'buffered': params['buffered'],37},38{39'host': params['host'], 'user': params['user'],40'port': params['port'], 'passwd': params['password'],41'database': DBNAME_BASE + '2',42'buffered': params['buffered'],43},44]4546def mysql_server_is(self, conn, version_tuple):47"""48Return True if the given connection is on the version given or greater.4950This only checks the server version string provided when the51connection is established, therefore any check for a version tuple52greater than (5, 5, 5) will always fail on MariaDB, as it always53starts with 5.5.5, e.g. 5.5.5-10.7.1-MariaDB-1:10.7.1+maria~focal.5455Examples56--------5758if self.mysql_server_is(conn, (5, 6, 4)):59# do something for MySQL 5.6.4 and above6061"""62server_version = conn.get_server_info()63server_version_tuple = tuple(64(int(dig) if dig is not None else 0)65for dig in re.match(r'(\d+)\.(\d+)\.(\d+)', server_version).group(1, 2, 3)66)67return server_version_tuple >= version_tuple6869_connections = None7071@property72def connections(self):73if self._connections is None:74self._connections = []75for params in self.databases:76conn = sv.connect(**params)77self._connections.append(conn)78self.addCleanup(self._teardown_connections)79return self._connections8081def connect(self, **params):82p = self.databases[0].copy()83p.update(params)84conn = sv.connect(**p)8586@self.addCleanup87def teardown():88if conn.open:89conn.close()9091return conn9293def _teardown_connections(self):94if self._connections:95for connection in self._connections:96if connection.open:97connection.close()98self._connections = None99100def safe_create_table(self, connection, tablename, ddl, cleanup=True):101"""102Create a table.103104Ensures any existing version of that table is first dropped.105106Also adds a cleanup rule to drop the table after the test107completes.108109"""110cursor = connection.cursor()111112with warnings.catch_warnings():113warnings.simplefilter('ignore')114cursor.execute('drop table if exists `%s`' % (tablename,))115cursor.execute(ddl)116cursor.close()117if cleanup:118self.addCleanup(self.drop_table, connection, tablename)119120def drop_table(self, connection, tablename):121cursor = connection.cursor()122with warnings.catch_warnings():123warnings.simplefilter('ignore')124cursor.execute('drop table if exists `%s`' % (tablename,))125cursor.close()126127128