Path: blob/main/singlestoredb/tests/utils.py
469 views
#!/usr/bin/env python1# type: ignore2"""Utilities for testing."""3import os4import uuid5from typing import Any6from typing import Dict7from urllib.parse import urlparse89import singlestoredb as s210from singlestoredb.connection import build_params111213def apply_template(content: str, vars: Dict[str, Any]) -> str:14for k, v in vars.items():15key = '{{%s}}' % k16if key in content:17content = content.replace(key, v)18return content192021def load_sql(sql_file: str) -> str:22"""23Load a file containing SQL code.2425Parameters26----------27sql_file : str28Name of the SQL file to load.2930Returns31-------32(str, bool)33Name of database created for SQL file and a boolean indicating34whether the database already existed (meaning that it should not35be deleted when tests are finished).3637"""38dbname = None3940# Use an existing database name if given.41if 'SINGLESTOREDB_URL' in os.environ:42dbname = build_params(host=os.environ['SINGLESTOREDB_URL']).get('database')43elif 'SINGLESTOREDB_HOST' in os.environ:44dbname = build_params(host=os.environ['SINGLESTOREDB_HOST']).get('database')45elif 'SINGLESTOREDB_DATABASE' in os.environ:46dbname = os.environ['SINGLESTOREDB_DATBASE']4748# If no database name was specified, use initializer URL if given.49# HTTP can't change databases, so you can't initialize from HTTP50# while also creating a database.51args = {'local_infile': True}52if not dbname and 'SINGLESTOREDB_INIT_DB_URL' in os.environ:53args['host'] = os.environ['SINGLESTOREDB_INIT_DB_URL']5455http_port = 056if 'SINGLESTOREDB_URL' in os.environ:57url = os.environ['SINGLESTOREDB_URL']58if url.startswith('http:') or url.startswith('https:'):59urlp = urlparse(url)60if urlp.port:61http_port = urlp.port6263if 'SINGLESTOREDB_HTTP_PORT' in os.environ:64http_port = int(os.environ['SINGLESTOREDB_HTTP_PORT'])6566dbexisted = bool(dbname)6768template_vars = dict(DATABASE_NAME=dbname, TEST_PATH=os.path.dirname(sql_file))6970# Always use the default driver since not all operations are71# permitted in the HTTP API.72with open(sql_file, 'r') as infile:73with s2.connect(**args) as conn:74with conn.cursor() as cur:75try:76cur.execute('SET GLOBAL default_partitions_per_leaf=2')77cur.execute('SET GLOBAL log_file_size_partitions=1048576')78cur.execute('SET GLOBAL log_file_size_ref_dbs=1048576')79except s2.OperationalError:80pass8182if not dbname:83dbname = 'TEST_{}'.format(uuid.uuid4()).replace('-', '_')84cur.execute(f'CREATE DATABASE {dbname};')85cur.execute(f'USE {dbname};')8687template_vars['DATABASE_NAME'] = dbname8889# Execute lines in SQL.90for cmd in infile.read().split(';\n'):91cmd = apply_template(cmd.strip(), template_vars)92if cmd:93cmd += ';'94cur.execute(cmd)9596elif not conn.driver.startswith('http'):97cur.execute(f'USE {dbname};')9899# Start HTTP server as needed.100if http_port and not conn.driver.startswith('http'):101cur.execute(f'SET GLOBAL HTTP_PROXY_PORT={http_port};')102cur.execute('SET GLOBAL HTTP_API=ON;')103cur.execute('RESTART PROXY;')104105return dbname, dbexisted106107108def drop_database(name: str) -> None:109"""Drop a database with the given name."""110if name:111args = {}112if 'SINGLESTOREDB_INIT_DB_URL' in os.environ:113args['host'] = os.environ['SINGLESTOREDB_INIT_DB_URL']114with s2.connect(**args) as conn:115with conn.cursor() as cur:116cur.execute(f'DROP DATABASE {name};')117118119def create_user(name: str, password: str, dbname: str) -> None:120"""Create a user for the test database."""121if name:122args = {}123if 'SINGLESTOREDB_INIT_DB_URL' in os.environ:124args['host'] = os.environ['SINGLESTOREDB_INIT_DB_URL']125with s2.connect(**args) as conn:126with conn.cursor() as cur:127cur.execute(f'DROP USER IF EXISTS {name};')128cur.execute(f'CREATE USER "{name}"@"%" IDENTIFIED BY "{password}"')129cur.execute(f'GRANT ALL ON {dbname}.* to "{name}"@"%"')130131132def drop_user(name: str) -> None:133"""Drop a database with the given name."""134if name:135args = {}136if 'SINGLESTOREDB_INIT_DB_URL' in os.environ:137args['host'] = os.environ['SINGLESTOREDB_INIT_DB_URL']138with s2.connect(**args) as conn:139with conn.cursor() as cur:140cur.execute(f'DROP USER IF EXISTS {name};')141142143