Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/pytest.py
469 views
1
#!/usr/bin/env python
2
"""Pytest plugin"""
3
import logging
4
import os
5
import subprocess
6
import time
7
from enum import Enum
8
from typing import Iterator
9
from typing import Optional
10
11
import pytest
12
13
from . import connect
14
from .connection import Connection
15
from .connection import Cursor
16
17
18
logger = logging.getLogger(__name__)
19
20
21
# How many times to attempt to connect to the container
22
STARTUP_CONNECT_ATTEMPTS = 10
23
# How long to wait between connection attempts
24
STARTUP_CONNECT_TIMEOUT_SECONDS = 2
25
# How many times to check if all connections are closed
26
TEARDOWN_WAIT_ATTEMPTS = 20
27
# How long to wait between checking connections
28
TEARDOWN_WAIT_SECONDS = 2
29
30
31
class ExecutionMode(Enum):
32
SEQUENTIAL = 1
33
LEADER = 2
34
FOLLOWER = 3
35
36
37
@pytest.fixture(scope='session')
38
def execution_mode() -> ExecutionMode:
39
"""Determine the pytest mode for this process"""
40
41
worker = os.environ.get('PYTEST_XDIST_WORKER')
42
worker_count = os.environ.get('PYTEST_XDIST_WORKER_COUNT')
43
44
# If we're not in pytest-xdist, the mode is Sequential
45
if worker is None or worker_count is None:
46
logger.debug('XDIST environment vars not found')
47
return ExecutionMode.SEQUENTIAL
48
49
logger.debug(f'PYTEST_XDIST_WORKER == {worker}')
50
logger.debug(f'PYTEST_XDIST_WORKER_COUNT == {worker_count}')
51
52
# If we're the only worker, than the mode is Sequential
53
if worker_count == '1':
54
return ExecutionMode.SEQUENTIAL
55
else:
56
# The first worker (named "gw0") is the leader
57
# if there are multiple workers
58
if worker == 'gw0':
59
return ExecutionMode.LEADER
60
else:
61
return ExecutionMode.FOLLOWER
62
63
64
@pytest.fixture(scope='session')
65
def node_name() -> Iterator[str]:
66
"""Determine the name of this worker node"""
67
68
worker = os.environ.get('PYTEST_XDIST_WORKER')
69
70
if worker is None:
71
logger.debug('XDIST environment vars not found')
72
yield 'master'
73
else:
74
logger.debug(f'PYTEST_XDIST_WORKER == {worker}')
75
yield worker
76
77
78
class _TestContainerManager():
79
"""Manages the setup and teardown of a SingleStoreDB Dev Container"""
80
81
def __init__(self) -> None:
82
self.container_name = 'singlestoredb-test-container'
83
self.dev_image_name = 'ghcr.io/singlestore-labs/singlestoredb-dev'
84
85
assert 'SINGLESTORE_LICENSE' in os.environ, 'SINGLESTORE_LICENSE not set'
86
87
self.root_password = 'Q8r4D7yXR8oqn'
88
self.environment_vars = {
89
'SINGLESTORE_LICENSE': None,
90
'ROOT_PASSWORD': f"\"{self.root_password}\"",
91
'SINGLESTORE_SET_GLOBAL_DEFAULT_PARTITIONS_PER_LEAF': '1',
92
}
93
94
self.ports = ['3306', '8080', '9000']
95
96
self.url = f'root:{self.root_password}@127.0.0.1:3306'
97
98
def start(self) -> None:
99
command = ' '.join(self._start_command())
100
101
logger.info(f'Starting container {self.container_name}')
102
try:
103
license = os.environ['SINGLESTORE_LICENSE']
104
env = {
105
'SINGLESTORE_LICENSE': license,
106
}
107
subprocess.check_call(command, shell=True, env=env)
108
except Exception as e:
109
logger.exception(e)
110
raise RuntimeError(
111
'Failed to start container. '
112
'Is one already running?',
113
) from e
114
logger.debug('Container started')
115
116
def _start_command(self) -> Iterator[str]:
117
yield 'docker run -d --name'
118
yield self.container_name
119
for key, value in self.environment_vars.items():
120
yield '-e'
121
if value is None:
122
yield key
123
else:
124
yield f'{key}={value}'
125
126
for port in self.ports:
127
yield '-p'
128
yield f'{port}:{port}'
129
130
yield self.dev_image_name
131
132
def print_logs(self) -> None:
133
logs_command = ['docker', 'logs', self.container_name]
134
logger.info('Getting logs')
135
logger.info(subprocess.check_output(logs_command))
136
137
def connect(self) -> Connection:
138
# Run all but one attempts trying again if they fail
139
for i in range(STARTUP_CONNECT_ATTEMPTS - 1):
140
try:
141
return connect(self.url)
142
except Exception:
143
logger.debug(f'Database not available yet (attempt #{i}).')
144
time.sleep(STARTUP_CONNECT_TIMEOUT_SECONDS)
145
else:
146
# Try one last time and report error if it fails
147
try:
148
return connect(self.url)
149
except Exception as e:
150
logger.error('Timed out while waiting to connect to database.')
151
logger.exception(e)
152
self.print_logs()
153
raise RuntimeError('Failed to connect to database') from e
154
155
def wait_till_connections_closed(self) -> None:
156
heart_beat = connect(self.url)
157
for i in range(TEARDOWN_WAIT_ATTEMPTS):
158
connections = self.get_open_connections(heart_beat)
159
if connections is None:
160
raise RuntimeError('Could not determine the number of open connections.')
161
logger.debug(
162
f'Waiting for other connections (n={connections-1}) '
163
f'to close (attempt #{i})',
164
)
165
time.sleep(TEARDOWN_WAIT_SECONDS)
166
else:
167
logger.warning('Timed out while waiting for other connections to close')
168
self.print_logs()
169
170
def get_open_connections(self, conn: Connection) -> Optional[int]:
171
for row in conn.show.status(extended=True):
172
name = row['Name']
173
value = row['Value']
174
logger.info(f'{name} = {value}')
175
if name == 'Threads_connected':
176
return int(value)
177
178
return None
179
180
def stop(self) -> None:
181
logger.info('Cleaning up SingleStore DB dev container')
182
logger.debug('Stopping container')
183
try:
184
subprocess.check_call(f'docker stop {self.container_name}', shell=True)
185
except Exception as e:
186
logger.exception(e)
187
raise RuntimeError('Failed to stop container.') from e
188
189
logger.debug('Removing container')
190
try:
191
subprocess.check_call(f'docker rm {self.container_name}', shell=True)
192
except Exception as e:
193
logger.exception(e)
194
raise RuntimeError('Failed to stop container.') from e
195
196
197
@pytest.fixture(scope='session')
198
def singlestoredb_test_container(
199
execution_mode: ExecutionMode,
200
) -> Iterator[_TestContainerManager]:
201
"""Sets up and tears down the test container"""
202
203
if not isinstance(execution_mode, ExecutionMode):
204
raise TypeError(f"Invalid execution mode '{execution_mode}'")
205
206
container_manager = _TestContainerManager()
207
208
# In sequential operation do all the steps
209
if execution_mode == ExecutionMode.SEQUENTIAL:
210
logger.debug('Not distributed')
211
container_manager.start()
212
yield container_manager
213
container_manager.stop()
214
215
# In distributed execution as leader,
216
# do the steps but wait for other workers before stopping
217
elif execution_mode == ExecutionMode.LEADER:
218
logger.debug('Distributed leader')
219
container_manager.start()
220
yield container_manager
221
container_manager.wait_till_connections_closed()
222
container_manager.stop()
223
224
# In distributed exeuction as a non-leader,
225
# don't worry about the container lifecycle
226
elif execution_mode == ExecutionMode.FOLLOWER:
227
logger.debug('Distributed follower')
228
yield container_manager
229
230
231
@pytest.fixture(scope='session')
232
def singlestoredb_connection(
233
singlestoredb_test_container: _TestContainerManager,
234
) -> Iterator[Connection]:
235
"""Creates and closes the connection"""
236
237
connection = singlestoredb_test_container.connect()
238
logger.debug('Connected to database.')
239
240
yield connection
241
242
logger.debug('Closing connection')
243
connection.close()
244
245
246
class _NameAllocator():
247
"""Generates unique names for each database"""
248
249
def __init__(self, id: str) -> None:
250
self.id = id
251
self.names = 0
252
253
def get_name(self) -> str:
254
name = f'x_db_{self.id}_{self.names}'
255
self.names += 1
256
return name
257
258
259
@pytest.fixture(scope='session')
260
def name_allocator(node_name: str) -> Iterator[_NameAllocator]:
261
"""Makes a worker-local name allocator using the node name"""
262
263
yield _NameAllocator(node_name)
264
265
266
@pytest.fixture
267
def singlestoredb_tempdb(
268
singlestoredb_connection: Connection, name_allocator: _NameAllocator,
269
) -> Iterator[Cursor]:
270
"""Provides a connection to a unique temporary test database"""
271
272
assert singlestoredb_connection.is_connected(), 'Database is no longer connected'
273
db = name_allocator.get_name()
274
275
with singlestoredb_connection.cursor() as cursor:
276
logger.debug(f"Creating temporary DB \"{db}\"")
277
cursor.execute(f'CREATE DATABASE {db}')
278
cursor.execute(f'USE {db}')
279
280
yield cursor
281
282
logger.debug(f"Dropping temporary DB \"{db}\"")
283
cursor.execute(f'DROP DATABASE {db}')
284
285