Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/pytest.py
798 views
1
#!/usr/bin/env python
2
"""Pytest plugin"""
3
import logging
4
import os
5
import socket
6
import subprocess
7
import time
8
import uuid
9
from collections.abc import Iterator
10
from enum import Enum
11
from typing import Optional
12
13
import pytest
14
15
from . import connect
16
from .connection import Connection
17
from .connection import Cursor
18
19
20
logger = logging.getLogger(__name__)
21
22
23
# How many times to attempt to connect to the container
24
STARTUP_CONNECT_ATTEMPTS = 10
25
# How long to wait between connection attempts
26
STARTUP_CONNECT_TIMEOUT_SECONDS = 2
27
# How many times to check if all connections are closed
28
TEARDOWN_WAIT_ATTEMPTS = 20
29
# How long to wait between checking connections
30
TEARDOWN_WAIT_SECONDS = 2
31
32
33
def _find_free_port() -> int:
34
"""Find a free port by binding to port 0 and getting the assigned port."""
35
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
36
s.bind(('', 0))
37
s.listen(1)
38
return s.getsockname()[1]
39
40
41
class ExecutionMode(Enum):
42
SEQUENTIAL = 1
43
LEADER = 2
44
FOLLOWER = 3
45
46
47
@pytest.fixture(scope='session')
48
def execution_mode() -> ExecutionMode:
49
"""Determine the pytest mode for this process"""
50
51
worker = os.environ.get('PYTEST_XDIST_WORKER')
52
worker_count = os.environ.get('PYTEST_XDIST_WORKER_COUNT')
53
54
# If we're not in pytest-xdist, the mode is Sequential
55
if worker is None or worker_count is None:
56
logger.debug('XDIST environment vars not found')
57
return ExecutionMode.SEQUENTIAL
58
59
logger.debug(f'PYTEST_XDIST_WORKER == {worker}')
60
logger.debug(f'PYTEST_XDIST_WORKER_COUNT == {worker_count}')
61
62
# If we're the only worker, than the mode is Sequential
63
if worker_count == '1':
64
return ExecutionMode.SEQUENTIAL
65
else:
66
# The first worker (named "gw0") is the leader
67
# if there are multiple workers
68
if worker == 'gw0':
69
return ExecutionMode.LEADER
70
else:
71
return ExecutionMode.FOLLOWER
72
73
74
@pytest.fixture(scope='session')
75
def node_name() -> Iterator[str]:
76
"""Determine the name of this worker node"""
77
78
worker = os.environ.get('PYTEST_XDIST_WORKER')
79
80
if worker is None:
81
logger.debug('XDIST environment vars not found')
82
yield 'master'
83
else:
84
logger.debug(f'PYTEST_XDIST_WORKER == {worker}')
85
yield worker
86
87
88
class _TestContainerManager():
89
"""Manages the setup and teardown of a SingleStoreDB Dev Container
90
91
If SINGLESTOREDB_URL environment variable is set, the manager will use
92
the existing server instead of starting a Docker container. This allows
93
tests to run against either an existing server or an automatically
94
managed Docker container.
95
"""
96
97
def __init__(self) -> None:
98
# Check if SINGLESTOREDB_URL is already set - if so, use existing server
99
self.existing_url = os.environ.get('SINGLESTOREDB_URL')
100
self.use_existing = self.existing_url is not None
101
102
if self.use_existing:
103
logger.info('Using existing SingleStore server from SINGLESTOREDB_URL')
104
self.url = self.existing_url
105
# No need to initialize Docker-related attributes
106
return
107
108
logger.info('SINGLESTOREDB_URL not set, will start Docker container')
109
110
# Generate unique container name using UUID and worker ID
111
worker = os.environ.get('PYTEST_XDIST_WORKER', 'master')
112
unique_id = uuid.uuid4().hex[:8]
113
self.container_name = f'singlestoredb-test-{worker}-{unique_id}'
114
115
self.dev_image_name = 'ghcr.io/singlestore-labs/singlestoredb-dev'
116
117
# Use SINGLESTORE_LICENSE from environment, or empty string as fallback
118
# Empty string works for the client SDK
119
license = os.environ.get('SINGLESTORE_LICENSE', '')
120
if not license:
121
logger.info('SINGLESTORE_LICENSE not set, using empty string')
122
123
self.root_password = 'Q8r4D7yXR8oqn'
124
self.environment_vars = {
125
'SINGLESTORE_LICENSE': license,
126
'ROOT_PASSWORD': f"\"{self.root_password}\"",
127
'SINGLESTORE_SET_GLOBAL_DEFAULT_PARTITIONS_PER_LEAF': '1',
128
}
129
130
# Use dynamic port allocation to avoid conflicts
131
self.mysql_port = _find_free_port()
132
self.http_port = _find_free_port()
133
self.studio_port = _find_free_port()
134
self.ports = [
135
(self.mysql_port, '3306'), # External port -> Internal port
136
(self.studio_port, '8080'), # Studio
137
(self.http_port, '9000'), # Data API
138
]
139
140
self.url = f'root:{self.root_password}@127.0.0.1:{self.mysql_port}'
141
142
@property
143
def http_connection_url(self) -> Optional[str]:
144
"""HTTP connection URL for the SingleStoreDB server using Data API."""
145
if self.use_existing:
146
# If using existing server, HTTP URL not available from manager
147
return None
148
return (
149
f'singlestoredb+http://root:{self.root_password}@'
150
f'127.0.0.1:{self.http_port}'
151
)
152
153
def _container_exists(self) -> bool:
154
"""Check if a container with this name already exists."""
155
try:
156
result = subprocess.run(
157
[
158
'docker', 'ps', '-a', '--filter',
159
f'name={self.container_name}',
160
'--format', '{{.Names}}',
161
],
162
capture_output=True,
163
text=True,
164
check=True,
165
)
166
return self.container_name in result.stdout
167
except subprocess.CalledProcessError:
168
return False
169
170
def _cleanup_existing_container(self) -> None:
171
"""Stop and remove any existing container with the same name."""
172
if not self._container_exists():
173
return
174
175
logger.info(f'Found existing container {self.container_name}, cleaning up')
176
try:
177
# Try to stop the container (ignore if it's already stopped)
178
subprocess.run(
179
['docker', 'stop', self.container_name],
180
capture_output=True,
181
check=False,
182
)
183
# Remove the container
184
subprocess.run(
185
['docker', 'rm', self.container_name],
186
capture_output=True,
187
check=True,
188
)
189
logger.debug(f'Cleaned up existing container {self.container_name}')
190
except subprocess.CalledProcessError as e:
191
logger.warning(f'Failed to cleanup existing container: {e}')
192
# Continue anyway - the unique name should prevent most conflicts
193
194
def start(self) -> None:
195
# Clean up any existing container with the same name
196
self._cleanup_existing_container()
197
198
command = ' '.join(self._start_command())
199
200
logger.info(
201
f'Starting container {self.container_name} on ports {self.mysql_port}, '
202
f'{self.http_port}, {self.studio_port}',
203
)
204
try:
205
license = os.environ.get('SINGLESTORE_LICENSE', '')
206
env = {
207
'SINGLESTORE_LICENSE': license,
208
}
209
# Capture output to avoid printing the container ID hash
210
subprocess.check_call(
211
command, shell=True, env=env,
212
stdout=subprocess.DEVNULL,
213
)
214
215
except Exception as e:
216
logger.exception(e)
217
raise RuntimeError(
218
f'Failed to start container {self.container_name}. '
219
f'Command: {command}',
220
) from e
221
logger.debug('Container started')
222
223
def _start_command(self) -> Iterator[str]:
224
yield 'docker run -d --name'
225
yield self.container_name
226
for key, value in self.environment_vars.items():
227
yield '-e'
228
if value is None:
229
yield key
230
else:
231
yield f'{key}={value}'
232
233
for external_port, internal_port in self.ports:
234
yield '-p'
235
yield f'{external_port}:{internal_port}'
236
237
yield self.dev_image_name
238
239
def print_logs(self) -> None:
240
logs_command = ['docker', 'logs', self.container_name]
241
logger.info('Getting logs')
242
logger.info(subprocess.check_output(logs_command))
243
244
def connect(self) -> Connection:
245
# Run all but one attempts trying again if they fail
246
for i in range(STARTUP_CONNECT_ATTEMPTS - 1):
247
try:
248
return connect(self.url)
249
except Exception:
250
logger.debug(f'Database not available yet (attempt #{i}).')
251
time.sleep(STARTUP_CONNECT_TIMEOUT_SECONDS)
252
else:
253
# Try one last time and report error if it fails
254
try:
255
return connect(self.url)
256
except Exception as e:
257
logger.error('Timed out while waiting to connect to database.')
258
logger.exception(e)
259
self.print_logs()
260
raise RuntimeError('Failed to connect to database') from e
261
262
def wait_till_connections_closed(self) -> None:
263
heart_beat = connect(self.url)
264
for i in range(TEARDOWN_WAIT_ATTEMPTS):
265
connections = self.get_open_connections(heart_beat)
266
if connections is None:
267
raise RuntimeError('Could not determine the number of open connections.')
268
logger.debug(
269
f'Waiting for other connections (n={connections-1}) '
270
f'to close (attempt #{i})',
271
)
272
time.sleep(TEARDOWN_WAIT_SECONDS)
273
else:
274
logger.warning('Timed out while waiting for other connections to close')
275
self.print_logs()
276
277
def get_open_connections(self, conn: Connection) -> Optional[int]:
278
for row in conn.show.status(extended=True):
279
name = row['Name']
280
value = row['Value']
281
logger.info(f'{name} = {value}')
282
if name == 'Threads_connected':
283
return int(value)
284
285
return None
286
287
def stop(self) -> None:
288
logger.info('Cleaning up SingleStore DB dev container')
289
logger.debug('Stopping container')
290
try:
291
subprocess.check_call(
292
f'docker stop {self.container_name}',
293
shell=True,
294
stdout=subprocess.DEVNULL,
295
)
296
297
except Exception as e:
298
logger.exception(e)
299
raise RuntimeError('Failed to stop container.') from e
300
301
logger.debug('Removing container')
302
try:
303
subprocess.check_call(
304
f'docker rm {self.container_name}',
305
shell=True,
306
stdout=subprocess.DEVNULL,
307
)
308
309
except Exception as e:
310
logger.exception(e)
311
raise RuntimeError('Failed to remove container.') from e
312
313
314
@pytest.fixture(scope='session')
315
def singlestoredb_test_container(
316
execution_mode: ExecutionMode,
317
) -> Iterator[_TestContainerManager]:
318
"""Sets up and tears down the test container
319
320
If SINGLESTOREDB_URL is set in the environment, uses the existing server
321
and skips Docker container lifecycle management. Otherwise, automatically
322
starts a Docker container for testing.
323
"""
324
325
if not isinstance(execution_mode, ExecutionMode):
326
raise TypeError(f"Invalid execution mode '{execution_mode}'")
327
328
container_manager = _TestContainerManager()
329
330
# If using existing server, skip all Docker lifecycle management
331
if container_manager.use_existing:
332
logger.info('Using existing server, skipping Docker container lifecycle')
333
yield container_manager
334
return
335
336
# In sequential operation do all the steps
337
if execution_mode == ExecutionMode.SEQUENTIAL:
338
logger.debug('Not distributed')
339
container_manager.start()
340
yield container_manager
341
container_manager.stop()
342
343
# In distributed execution as leader,
344
# do the steps but wait for other workers before stopping
345
elif execution_mode == ExecutionMode.LEADER:
346
logger.debug('Distributed leader')
347
container_manager.start()
348
yield container_manager
349
container_manager.wait_till_connections_closed()
350
container_manager.stop()
351
352
# In distributed exeuction as a non-leader,
353
# don't worry about the container lifecycle
354
elif execution_mode == ExecutionMode.FOLLOWER:
355
logger.debug('Distributed follower')
356
yield container_manager
357
358
359
@pytest.fixture(scope='session')
360
def singlestoredb_connection(
361
singlestoredb_test_container: _TestContainerManager,
362
) -> Iterator[Connection]:
363
"""Creates and closes the connection"""
364
365
connection = singlestoredb_test_container.connect()
366
logger.debug('Connected to database.')
367
368
yield connection
369
370
logger.debug('Closing connection')
371
connection.close()
372
373
374
class _NameAllocator():
375
"""Generates unique names for each database"""
376
377
def __init__(self, id: str) -> None:
378
self.id = id
379
self.names = 0
380
381
def get_name(self) -> str:
382
name = f'x_db_{self.id}_{self.names}'
383
self.names += 1
384
return name
385
386
387
@pytest.fixture(scope='session')
388
def name_allocator(node_name: str) -> Iterator[_NameAllocator]:
389
"""Makes a worker-local name allocator using the node name"""
390
391
yield _NameAllocator(node_name)
392
393
394
@pytest.fixture
395
def singlestoredb_tempdb(
396
singlestoredb_connection: Connection, name_allocator: _NameAllocator,
397
) -> Iterator[Cursor]:
398
"""Provides a connection to a unique temporary test database"""
399
400
assert singlestoredb_connection.is_connected(), 'Database is no longer connected'
401
db = name_allocator.get_name()
402
403
with singlestoredb_connection.cursor() as cursor:
404
logger.debug(f"Creating temporary DB \"{db}\"")
405
cursor.execute(f'CREATE DATABASE {db}')
406
cursor.execute(f'USE {db}')
407
408
yield cursor
409
410
logger.debug(f"Dropping temporary DB \"{db}\"")
411
cursor.execute(f'DROP DATABASE {db}')
412
413