Path: blob/main/singlestoredb/pytest.py
469 views
#!/usr/bin/env python1"""Pytest plugin"""2import logging3import os4import subprocess5import time6from enum import Enum7from typing import Iterator8from typing import Optional910import pytest1112from . import connect13from .connection import Connection14from .connection import Cursor151617logger = logging.getLogger(__name__)181920# How many times to attempt to connect to the container21STARTUP_CONNECT_ATTEMPTS = 1022# How long to wait between connection attempts23STARTUP_CONNECT_TIMEOUT_SECONDS = 224# How many times to check if all connections are closed25TEARDOWN_WAIT_ATTEMPTS = 2026# How long to wait between checking connections27TEARDOWN_WAIT_SECONDS = 2282930class ExecutionMode(Enum):31SEQUENTIAL = 132LEADER = 233FOLLOWER = 3343536@pytest.fixture(scope='session')37def execution_mode() -> ExecutionMode:38"""Determine the pytest mode for this process"""3940worker = os.environ.get('PYTEST_XDIST_WORKER')41worker_count = os.environ.get('PYTEST_XDIST_WORKER_COUNT')4243# If we're not in pytest-xdist, the mode is Sequential44if worker is None or worker_count is None:45logger.debug('XDIST environment vars not found')46return ExecutionMode.SEQUENTIAL4748logger.debug(f'PYTEST_XDIST_WORKER == {worker}')49logger.debug(f'PYTEST_XDIST_WORKER_COUNT == {worker_count}')5051# If we're the only worker, than the mode is Sequential52if worker_count == '1':53return ExecutionMode.SEQUENTIAL54else:55# The first worker (named "gw0") is the leader56# if there are multiple workers57if worker == 'gw0':58return ExecutionMode.LEADER59else:60return ExecutionMode.FOLLOWER616263@pytest.fixture(scope='session')64def node_name() -> Iterator[str]:65"""Determine the name of this worker node"""6667worker = os.environ.get('PYTEST_XDIST_WORKER')6869if worker is None:70logger.debug('XDIST environment vars not found')71yield 'master'72else:73logger.debug(f'PYTEST_XDIST_WORKER == {worker}')74yield worker757677class _TestContainerManager():78"""Manages the setup and teardown of a SingleStoreDB Dev Container"""7980def __init__(self) -> None:81self.container_name = 'singlestoredb-test-container'82self.dev_image_name = 'ghcr.io/singlestore-labs/singlestoredb-dev'8384assert 'SINGLESTORE_LICENSE' in os.environ, 'SINGLESTORE_LICENSE not set'8586self.root_password = 'Q8r4D7yXR8oqn'87self.environment_vars = {88'SINGLESTORE_LICENSE': None,89'ROOT_PASSWORD': f"\"{self.root_password}\"",90'SINGLESTORE_SET_GLOBAL_DEFAULT_PARTITIONS_PER_LEAF': '1',91}9293self.ports = ['3306', '8080', '9000']9495self.url = f'root:{self.root_password}@127.0.0.1:3306'9697def start(self) -> None:98command = ' '.join(self._start_command())99100logger.info(f'Starting container {self.container_name}')101try:102license = os.environ['SINGLESTORE_LICENSE']103env = {104'SINGLESTORE_LICENSE': license,105}106subprocess.check_call(command, shell=True, env=env)107except Exception as e:108logger.exception(e)109raise RuntimeError(110'Failed to start container. '111'Is one already running?',112) from e113logger.debug('Container started')114115def _start_command(self) -> Iterator[str]:116yield 'docker run -d --name'117yield self.container_name118for key, value in self.environment_vars.items():119yield '-e'120if value is None:121yield key122else:123yield f'{key}={value}'124125for port in self.ports:126yield '-p'127yield f'{port}:{port}'128129yield self.dev_image_name130131def print_logs(self) -> None:132logs_command = ['docker', 'logs', self.container_name]133logger.info('Getting logs')134logger.info(subprocess.check_output(logs_command))135136def connect(self) -> Connection:137# Run all but one attempts trying again if they fail138for i in range(STARTUP_CONNECT_ATTEMPTS - 1):139try:140return connect(self.url)141except Exception:142logger.debug(f'Database not available yet (attempt #{i}).')143time.sleep(STARTUP_CONNECT_TIMEOUT_SECONDS)144else:145# Try one last time and report error if it fails146try:147return connect(self.url)148except Exception as e:149logger.error('Timed out while waiting to connect to database.')150logger.exception(e)151self.print_logs()152raise RuntimeError('Failed to connect to database') from e153154def wait_till_connections_closed(self) -> None:155heart_beat = connect(self.url)156for i in range(TEARDOWN_WAIT_ATTEMPTS):157connections = self.get_open_connections(heart_beat)158if connections is None:159raise RuntimeError('Could not determine the number of open connections.')160logger.debug(161f'Waiting for other connections (n={connections-1}) '162f'to close (attempt #{i})',163)164time.sleep(TEARDOWN_WAIT_SECONDS)165else:166logger.warning('Timed out while waiting for other connections to close')167self.print_logs()168169def get_open_connections(self, conn: Connection) -> Optional[int]:170for row in conn.show.status(extended=True):171name = row['Name']172value = row['Value']173logger.info(f'{name} = {value}')174if name == 'Threads_connected':175return int(value)176177return None178179def stop(self) -> None:180logger.info('Cleaning up SingleStore DB dev container')181logger.debug('Stopping container')182try:183subprocess.check_call(f'docker stop {self.container_name}', shell=True)184except Exception as e:185logger.exception(e)186raise RuntimeError('Failed to stop container.') from e187188logger.debug('Removing container')189try:190subprocess.check_call(f'docker rm {self.container_name}', shell=True)191except Exception as e:192logger.exception(e)193raise RuntimeError('Failed to stop container.') from e194195196@pytest.fixture(scope='session')197def singlestoredb_test_container(198execution_mode: ExecutionMode,199) -> Iterator[_TestContainerManager]:200"""Sets up and tears down the test container"""201202if not isinstance(execution_mode, ExecutionMode):203raise TypeError(f"Invalid execution mode '{execution_mode}'")204205container_manager = _TestContainerManager()206207# In sequential operation do all the steps208if execution_mode == ExecutionMode.SEQUENTIAL:209logger.debug('Not distributed')210container_manager.start()211yield container_manager212container_manager.stop()213214# In distributed execution as leader,215# do the steps but wait for other workers before stopping216elif execution_mode == ExecutionMode.LEADER:217logger.debug('Distributed leader')218container_manager.start()219yield container_manager220container_manager.wait_till_connections_closed()221container_manager.stop()222223# In distributed exeuction as a non-leader,224# don't worry about the container lifecycle225elif execution_mode == ExecutionMode.FOLLOWER:226logger.debug('Distributed follower')227yield container_manager228229230@pytest.fixture(scope='session')231def singlestoredb_connection(232singlestoredb_test_container: _TestContainerManager,233) -> Iterator[Connection]:234"""Creates and closes the connection"""235236connection = singlestoredb_test_container.connect()237logger.debug('Connected to database.')238239yield connection240241logger.debug('Closing connection')242connection.close()243244245class _NameAllocator():246"""Generates unique names for each database"""247248def __init__(self, id: str) -> None:249self.id = id250self.names = 0251252def get_name(self) -> str:253name = f'x_db_{self.id}_{self.names}'254self.names += 1255return name256257258@pytest.fixture(scope='session')259def name_allocator(node_name: str) -> Iterator[_NameAllocator]:260"""Makes a worker-local name allocator using the node name"""261262yield _NameAllocator(node_name)263264265@pytest.fixture266def singlestoredb_tempdb(267singlestoredb_connection: Connection, name_allocator: _NameAllocator,268) -> Iterator[Cursor]:269"""Provides a connection to a unique temporary test database"""270271assert singlestoredb_connection.is_connected(), 'Database is no longer connected'272db = name_allocator.get_name()273274with singlestoredb_connection.cursor() as cursor:275logger.debug(f"Creating temporary DB \"{db}\"")276cursor.execute(f'CREATE DATABASE {db}')277cursor.execute(f'USE {db}')278279yield cursor280281logger.debug(f"Dropping temporary DB \"{db}\"")282cursor.execute(f'DROP DATABASE {db}')283284285