Path: blob/main/singlestoredb/pytest.py
798 views
#!/usr/bin/env python1"""Pytest plugin"""2import logging3import os4import socket5import subprocess6import time7import uuid8from collections.abc import Iterator9from enum import Enum10from typing import Optional1112import pytest1314from . import connect15from .connection import Connection16from .connection import Cursor171819logger = logging.getLogger(__name__)202122# How many times to attempt to connect to the container23STARTUP_CONNECT_ATTEMPTS = 1024# How long to wait between connection attempts25STARTUP_CONNECT_TIMEOUT_SECONDS = 226# How many times to check if all connections are closed27TEARDOWN_WAIT_ATTEMPTS = 2028# How long to wait between checking connections29TEARDOWN_WAIT_SECONDS = 2303132def _find_free_port() -> int:33"""Find a free port by binding to port 0 and getting the assigned port."""34with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:35s.bind(('', 0))36s.listen(1)37return s.getsockname()[1]383940class ExecutionMode(Enum):41SEQUENTIAL = 142LEADER = 243FOLLOWER = 3444546@pytest.fixture(scope='session')47def execution_mode() -> ExecutionMode:48"""Determine the pytest mode for this process"""4950worker = os.environ.get('PYTEST_XDIST_WORKER')51worker_count = os.environ.get('PYTEST_XDIST_WORKER_COUNT')5253# If we're not in pytest-xdist, the mode is Sequential54if worker is None or worker_count is None:55logger.debug('XDIST environment vars not found')56return ExecutionMode.SEQUENTIAL5758logger.debug(f'PYTEST_XDIST_WORKER == {worker}')59logger.debug(f'PYTEST_XDIST_WORKER_COUNT == {worker_count}')6061# If we're the only worker, than the mode is Sequential62if worker_count == '1':63return ExecutionMode.SEQUENTIAL64else:65# The first worker (named "gw0") is the leader66# if there are multiple workers67if worker == 'gw0':68return ExecutionMode.LEADER69else:70return ExecutionMode.FOLLOWER717273@pytest.fixture(scope='session')74def node_name() -> Iterator[str]:75"""Determine the name of this worker node"""7677worker = os.environ.get('PYTEST_XDIST_WORKER')7879if worker is None:80logger.debug('XDIST environment vars not found')81yield 'master'82else:83logger.debug(f'PYTEST_XDIST_WORKER == {worker}')84yield worker858687class _TestContainerManager():88"""Manages the setup and teardown of a SingleStoreDB Dev Container8990If SINGLESTOREDB_URL environment variable is set, the manager will use91the existing server instead of starting a Docker container. This allows92tests to run against either an existing server or an automatically93managed Docker container.94"""9596def __init__(self) -> None:97# Check if SINGLESTOREDB_URL is already set - if so, use existing server98self.existing_url = os.environ.get('SINGLESTOREDB_URL')99self.use_existing = self.existing_url is not None100101if self.use_existing:102logger.info('Using existing SingleStore server from SINGLESTOREDB_URL')103self.url = self.existing_url104# No need to initialize Docker-related attributes105return106107logger.info('SINGLESTOREDB_URL not set, will start Docker container')108109# Generate unique container name using UUID and worker ID110worker = os.environ.get('PYTEST_XDIST_WORKER', 'master')111unique_id = uuid.uuid4().hex[:8]112self.container_name = f'singlestoredb-test-{worker}-{unique_id}'113114self.dev_image_name = 'ghcr.io/singlestore-labs/singlestoredb-dev'115116# Use SINGLESTORE_LICENSE from environment, or empty string as fallback117# Empty string works for the client SDK118license = os.environ.get('SINGLESTORE_LICENSE', '')119if not license:120logger.info('SINGLESTORE_LICENSE not set, using empty string')121122self.root_password = 'Q8r4D7yXR8oqn'123self.environment_vars = {124'SINGLESTORE_LICENSE': license,125'ROOT_PASSWORD': f"\"{self.root_password}\"",126'SINGLESTORE_SET_GLOBAL_DEFAULT_PARTITIONS_PER_LEAF': '1',127}128129# Use dynamic port allocation to avoid conflicts130self.mysql_port = _find_free_port()131self.http_port = _find_free_port()132self.studio_port = _find_free_port()133self.ports = [134(self.mysql_port, '3306'), # External port -> Internal port135(self.studio_port, '8080'), # Studio136(self.http_port, '9000'), # Data API137]138139self.url = f'root:{self.root_password}@127.0.0.1:{self.mysql_port}'140141@property142def http_connection_url(self) -> Optional[str]:143"""HTTP connection URL for the SingleStoreDB server using Data API."""144if self.use_existing:145# If using existing server, HTTP URL not available from manager146return None147return (148f'singlestoredb+http://root:{self.root_password}@'149f'127.0.0.1:{self.http_port}'150)151152def _container_exists(self) -> bool:153"""Check if a container with this name already exists."""154try:155result = subprocess.run(156[157'docker', 'ps', '-a', '--filter',158f'name={self.container_name}',159'--format', '{{.Names}}',160],161capture_output=True,162text=True,163check=True,164)165return self.container_name in result.stdout166except subprocess.CalledProcessError:167return False168169def _cleanup_existing_container(self) -> None:170"""Stop and remove any existing container with the same name."""171if not self._container_exists():172return173174logger.info(f'Found existing container {self.container_name}, cleaning up')175try:176# Try to stop the container (ignore if it's already stopped)177subprocess.run(178['docker', 'stop', self.container_name],179capture_output=True,180check=False,181)182# Remove the container183subprocess.run(184['docker', 'rm', self.container_name],185capture_output=True,186check=True,187)188logger.debug(f'Cleaned up existing container {self.container_name}')189except subprocess.CalledProcessError as e:190logger.warning(f'Failed to cleanup existing container: {e}')191# Continue anyway - the unique name should prevent most conflicts192193def start(self) -> None:194# Clean up any existing container with the same name195self._cleanup_existing_container()196197command = ' '.join(self._start_command())198199logger.info(200f'Starting container {self.container_name} on ports {self.mysql_port}, '201f'{self.http_port}, {self.studio_port}',202)203try:204license = os.environ.get('SINGLESTORE_LICENSE', '')205env = {206'SINGLESTORE_LICENSE': license,207}208# Capture output to avoid printing the container ID hash209subprocess.check_call(210command, shell=True, env=env,211stdout=subprocess.DEVNULL,212)213214except Exception as e:215logger.exception(e)216raise RuntimeError(217f'Failed to start container {self.container_name}. '218f'Command: {command}',219) from e220logger.debug('Container started')221222def _start_command(self) -> Iterator[str]:223yield 'docker run -d --name'224yield self.container_name225for key, value in self.environment_vars.items():226yield '-e'227if value is None:228yield key229else:230yield f'{key}={value}'231232for external_port, internal_port in self.ports:233yield '-p'234yield f'{external_port}:{internal_port}'235236yield self.dev_image_name237238def print_logs(self) -> None:239logs_command = ['docker', 'logs', self.container_name]240logger.info('Getting logs')241logger.info(subprocess.check_output(logs_command))242243def connect(self) -> Connection:244# Run all but one attempts trying again if they fail245for i in range(STARTUP_CONNECT_ATTEMPTS - 1):246try:247return connect(self.url)248except Exception:249logger.debug(f'Database not available yet (attempt #{i}).')250time.sleep(STARTUP_CONNECT_TIMEOUT_SECONDS)251else:252# Try one last time and report error if it fails253try:254return connect(self.url)255except Exception as e:256logger.error('Timed out while waiting to connect to database.')257logger.exception(e)258self.print_logs()259raise RuntimeError('Failed to connect to database') from e260261def wait_till_connections_closed(self) -> None:262heart_beat = connect(self.url)263for i in range(TEARDOWN_WAIT_ATTEMPTS):264connections = self.get_open_connections(heart_beat)265if connections is None:266raise RuntimeError('Could not determine the number of open connections.')267logger.debug(268f'Waiting for other connections (n={connections-1}) '269f'to close (attempt #{i})',270)271time.sleep(TEARDOWN_WAIT_SECONDS)272else:273logger.warning('Timed out while waiting for other connections to close')274self.print_logs()275276def get_open_connections(self, conn: Connection) -> Optional[int]:277for row in conn.show.status(extended=True):278name = row['Name']279value = row['Value']280logger.info(f'{name} = {value}')281if name == 'Threads_connected':282return int(value)283284return None285286def stop(self) -> None:287logger.info('Cleaning up SingleStore DB dev container')288logger.debug('Stopping container')289try:290subprocess.check_call(291f'docker stop {self.container_name}',292shell=True,293stdout=subprocess.DEVNULL,294)295296except Exception as e:297logger.exception(e)298raise RuntimeError('Failed to stop container.') from e299300logger.debug('Removing container')301try:302subprocess.check_call(303f'docker rm {self.container_name}',304shell=True,305stdout=subprocess.DEVNULL,306)307308except Exception as e:309logger.exception(e)310raise RuntimeError('Failed to remove container.') from e311312313@pytest.fixture(scope='session')314def singlestoredb_test_container(315execution_mode: ExecutionMode,316) -> Iterator[_TestContainerManager]:317"""Sets up and tears down the test container318319If SINGLESTOREDB_URL is set in the environment, uses the existing server320and skips Docker container lifecycle management. Otherwise, automatically321starts a Docker container for testing.322"""323324if not isinstance(execution_mode, ExecutionMode):325raise TypeError(f"Invalid execution mode '{execution_mode}'")326327container_manager = _TestContainerManager()328329# If using existing server, skip all Docker lifecycle management330if container_manager.use_existing:331logger.info('Using existing server, skipping Docker container lifecycle')332yield container_manager333return334335# In sequential operation do all the steps336if execution_mode == ExecutionMode.SEQUENTIAL:337logger.debug('Not distributed')338container_manager.start()339yield container_manager340container_manager.stop()341342# In distributed execution as leader,343# do the steps but wait for other workers before stopping344elif execution_mode == ExecutionMode.LEADER:345logger.debug('Distributed leader')346container_manager.start()347yield container_manager348container_manager.wait_till_connections_closed()349container_manager.stop()350351# In distributed exeuction as a non-leader,352# don't worry about the container lifecycle353elif execution_mode == ExecutionMode.FOLLOWER:354logger.debug('Distributed follower')355yield container_manager356357358@pytest.fixture(scope='session')359def singlestoredb_connection(360singlestoredb_test_container: _TestContainerManager,361) -> Iterator[Connection]:362"""Creates and closes the connection"""363364connection = singlestoredb_test_container.connect()365logger.debug('Connected to database.')366367yield connection368369logger.debug('Closing connection')370connection.close()371372373class _NameAllocator():374"""Generates unique names for each database"""375376def __init__(self, id: str) -> None:377self.id = id378self.names = 0379380def get_name(self) -> str:381name = f'x_db_{self.id}_{self.names}'382self.names += 1383return name384385386@pytest.fixture(scope='session')387def name_allocator(node_name: str) -> Iterator[_NameAllocator]:388"""Makes a worker-local name allocator using the node name"""389390yield _NameAllocator(node_name)391392393@pytest.fixture394def singlestoredb_tempdb(395singlestoredb_connection: Connection, name_allocator: _NameAllocator,396) -> Iterator[Cursor]:397"""Provides a connection to a unique temporary test database"""398399assert singlestoredb_connection.is_connected(), 'Database is no longer connected'400db = name_allocator.get_name()401402with singlestoredb_connection.cursor() as cursor:403logger.debug(f"Creating temporary DB \"{db}\"")404cursor.execute(f'CREATE DATABASE {db}')405cursor.execute(f'USE {db}')406407yield cursor408409logger.debug(f"Dropping temporary DB \"{db}\"")410cursor.execute(f'DROP DATABASE {db}')411412413