Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/functions/ext/mmap.py
469 views
1
#!/usr/bin/env python
2
"""
3
Module for creating collocated Python UDFs
4
5
This module implements the collocated form of external functions for
6
SingleStoreDB. This mode uses a socket for control communications
7
and memory mapped files for passing the data to and from the UDF.
8
9
The command below is a sample invocation. It exports all functions
10
within the `myfuncs` Python module that have a `@udf` decorator on
11
them. The `--db` option specifies a database connection string.
12
If this exists, the UDF application will connect to the database
13
and register all functions. The `--replace-existing` option indicates
14
that existing functions should be replaced::
15
16
python -m singlestoredb.functions.ext.mmap \
17
--db=root:@127.0.0.1:9306/cosmeticshop --replace-existing \
18
myfuncs
19
20
The `myfuncs` package can be any Python package in your Python path.
21
It must contain functions marked with a `@udf` decorator and the
22
types must be annotated or specified using the `@udf` decorator
23
similar to the following::
24
25
from singlestoredb.functions import udf
26
27
@udf
28
def print_it(x2: float, x3: str) -> str:
29
return int(x2) * x3
30
31
@udf.pandas
32
def print_it_pandas(x2: float, x3: str) -> str:
33
return x2.astype(np.int64) * x3.astype(str)
34
35
With the functions registered, you can now run the UDFs::
36
37
SELECT print_it(3.14, 'my string');
38
SELECT print_it_pandas(3.14, 'my string');
39
40
"""
41
import argparse
42
import array
43
import asyncio
44
import io
45
import logging
46
import mmap
47
import multiprocessing
48
import os
49
import secrets
50
import socket
51
import struct
52
import sys
53
import tempfile
54
import threading
55
import traceback
56
import urllib
57
import zipfile
58
from typing import Any
59
from typing import Dict
60
from typing import List
61
from typing import Optional
62
63
from . import asgi
64
from . import utils
65
from ... import manage_workspaces
66
from ...config import get_option
67
68
69
logger = utils.get_logger('singlestoredb.functions.ext.mmap')
70
71
72
def _handle_request(app: Any, connection: Any, client_address: Any) -> None:
73
"""
74
Handle function call request.
75
76
Parameters:
77
app : ASGI app
78
An ASGI application from the singlestoredb.functions.ext.asgi module
79
connection : socket connection
80
Socket connection for function control messages
81
client_address : string
82
Address of connecting client
83
84
"""
85
logger.info('connection from {}'.format(str(connection).split(', ')[0][-4:]))
86
87
# Receive the request header. Format:
88
# server version: uint64
89
# length of function name: uint64
90
buf = connection.recv(16)
91
version, namelen = struct.unpack('<qq', buf)
92
93
# Python's recvmsg returns a tuple. We only really care about the first
94
# two parts. The recvmsg call has a weird way of specifying the size for
95
# the file descriptor array; basically, we're indicating we want to read
96
# two 32-bit ints (for the input and output files).
97
fd_model = array.array('i', [0, 0])
98
msg, ancdata, flags, addr = connection.recvmsg(
99
namelen,
100
socket.CMSG_LEN(2 * fd_model.itemsize),
101
)
102
assert len(ancdata) == 1
103
104
# The function's name will be in the "message" area of the recvmsg response.
105
# It will be populated with `namelen` bytes.
106
name = msg.decode('utf8')
107
108
# Two file descriptors are transferred to us from the database via the
109
# `sendmsg` protocol. These are for reading the input rows and writing
110
# the output rows, respectively.
111
fd0, fd1 = struct.unpack('<ii', ancdata[0][2])
112
ifile = os.fdopen(fd0, 'rb')
113
ofile = os.fdopen(fd1, 'wb')
114
115
# Keep receiving data on this socket until we run out.
116
while True:
117
118
# Read in the length of this row, a uint64. No data means we're done
119
# receiving.
120
buf = connection.recv(8)
121
if not buf:
122
break
123
length = struct.unpack('<q', buf)[0]
124
if not length:
125
break
126
127
# Map in the input shared memory segment from the fd we received via
128
# recvmsg.
129
mem = mmap.mmap(
130
ifile.fileno(),
131
length,
132
mmap.MAP_SHARED,
133
mmap.PROT_READ,
134
)
135
136
# Read row data
137
response_size = 0
138
out = io.BytesIO()
139
140
ifile.seek(0)
141
try:
142
# Run the function
143
asyncio.run(
144
app.call(
145
name,
146
io.BytesIO(ifile.read(length)),
147
out,
148
data_format='rowdat_1',
149
data_version='1.0',
150
),
151
)
152
153
# Write results
154
buf = out.getbuffer()
155
response_size = len(buf)
156
ofile.truncate(max(128*1024, response_size))
157
ofile.seek(0)
158
ofile.write(buf)
159
ofile.flush()
160
161
# Complete the request by send back the status as two uint64s on the
162
# socket:
163
# - http status
164
# - size of data in output shared memory
165
connection.send(struct.pack('<qq', 200, response_size))
166
167
except Exception as exc:
168
errmsg = f'error occurred in executing function `{name}`: {exc}\n'
169
logger.error(errmsg.rstrip())
170
for line in traceback.format_exception(exc): # type: ignore
171
logger.error(line.rstrip())
172
connection.send(
173
struct.pack(
174
f'<qq{len(errmsg)}s', 500,
175
len(errmsg), errmsg.encode('utf8'),
176
),
177
)
178
break
179
180
finally:
181
# Close the shared memory object.
182
mem.close()
183
184
# Close shared memory files.
185
ifile.close()
186
ofile.close()
187
188
# Close the connection
189
connection.close()
190
191
192
def main(argv: Optional[List[str]] = None) -> None:
193
"""
194
Main program for collocated Python UDFs
195
196
Parameters
197
----------
198
argv : List[str], optional
199
List of command-line parameters
200
201
"""
202
tmpdir = None
203
functions = []
204
defaults: Dict[str, Any] = {}
205
for i in range(2):
206
parser = argparse.ArgumentParser(
207
prog='python -m singlestoredb.functions.ext.mmap',
208
description='Run a collacated Python UDF server',
209
)
210
parser.add_argument(
211
'--max-connections', metavar='n', type=int,
212
default=get_option('external_function.max_connections'),
213
help='maximum number of server connections before refusing them',
214
)
215
parser.add_argument(
216
'--single-thread', action='store_true',
217
default=get_option('external_function.single_thread'),
218
help='should the server run in single-thread mode?',
219
)
220
parser.add_argument(
221
'--socket-path', metavar='file-path',
222
default=(
223
get_option('external_function.socket_path') or
224
os.path.join(tempfile.gettempdir(), secrets.token_hex(16))
225
),
226
help='path to communications socket',
227
)
228
parser.add_argument(
229
'--db', metavar='conn-str',
230
default=os.environ.get('SINGLESTOREDB_URL', ''),
231
help='connection string to use for registering functions',
232
)
233
parser.add_argument(
234
'--replace-existing', action='store_true',
235
help='should existing functions of the same name '
236
'in the database be replaced?',
237
)
238
parser.add_argument(
239
'--log-level', metavar='[info|debug|warning|error]',
240
default=get_option('external_function.log_level'),
241
help='logging level',
242
)
243
parser.add_argument(
244
'--process-mode', metavar='[thread|subprocess]',
245
default=get_option('external_function.process_mode'),
246
help='how to handle concurrent handlers',
247
)
248
parser.add_argument(
249
'functions', metavar='module.or.func.path', nargs='*',
250
help='functions or modules to export in UDF server',
251
)
252
253
args = parser.parse_args(argv)
254
255
logger.setLevel(getattr(logging, args.log_level.upper()))
256
257
if i > 0:
258
break
259
260
# Download Stage files as needed
261
for i, f in enumerate(args.functions):
262
if f.startswith('stage://'):
263
url = urllib.parse.urlparse(f)
264
if not url.path or url.path == '/':
265
raise ValueError(f'no stage path was specified: {f}')
266
if url.path.endswith('/'):
267
raise ValueError(f'an environment file must be specified: {f}')
268
269
mgr = manage_workspaces()
270
if url.hostname:
271
wsg = mgr.get_workspace_group(url.hostname)
272
elif os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP'):
273
wsg = mgr.get_workspace_group(
274
os.environ['SINGLESTOREDB_WORKSPACE_GROUP'],
275
)
276
else:
277
raise ValueError(f'no workspace group specified: {f}')
278
279
if tmpdir is None:
280
tmpdir = tempfile.TemporaryDirectory()
281
282
local_path = os.path.join(tmpdir.name, url.path.split('/')[-1])
283
wsg.stage.download_file(url.path, local_path)
284
args.functions[i] = local_path
285
286
elif f.startswith('http://') or f.startswith('https://'):
287
if tmpdir is None:
288
tmpdir = tempfile.TemporaryDirectory()
289
290
local_path = os.path.join(tmpdir.name, f.split('/')[-1])
291
urllib.request.urlretrieve(f, local_path)
292
args.functions[i] = local_path
293
294
# See if any of the args are zip files (assume they are environment files)
295
modules = [(x, zipfile.is_zipfile(x)) for x in args.functions]
296
envs = [x[0] for x in modules if x[1]]
297
others = [x[0] for x in modules if not x[1]]
298
299
if envs and len(envs) > 1:
300
raise RuntimeError('only one environment file may be specified.')
301
302
if envs and others:
303
raise RuntimeError('environment files and other modules can not be mixed.')
304
305
# See if an environment file was specified. If so, use those settings
306
# as the defaults and reprocess command line.
307
if envs:
308
# Add zip file to the Python path
309
sys.path.insert(0, envs[0])
310
functions = [os.path.splitext(os.path.basename(envs[0]))[0]]
311
312
# Add pyproject.toml variables and redo command-line processing
313
defaults = utils.read_config(
314
envs[0],
315
['tool.external_function', 'tool.external-function.collocated'],
316
)
317
if defaults:
318
continue
319
320
args.functions = functions or args.functions or None
321
args.replace_existing = args.replace_existing \
322
or defaults.get('replace_existing') \
323
or get_option('external_function.replace_existing')
324
325
if os.path.exists(args.socket_path):
326
try:
327
os.unlink(args.socket_path)
328
except (IOError, OSError):
329
raise RuntimeError(
330
f'could not remove existing socket path: {args.socket_path}',
331
)
332
333
# Create application from functions / module
334
app = asgi.create_app(
335
functions=args.functions,
336
url=args.socket_path,
337
data_format='rowdat_1',
338
app_mode='collocated',
339
)
340
341
funcs = app.get_create_functions(replace=args.replace_existing)
342
if not funcs:
343
raise RuntimeError('no functions specified')
344
345
for f in funcs:
346
logger.info(f'function: {f}')
347
348
# Create the Unix socket server.
349
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
350
351
# Bind our server to the path.
352
server.bind(args.socket_path)
353
354
logger.info(f'using socket path: {args.socket_path}')
355
356
# Listen for incoming connections. Argument is the number of connections to
357
# keep in the backlog before we begin refusing them; 32 is plenty for this
358
# simple case.
359
server.listen(args.max_connections)
360
361
try:
362
# Register functions with database
363
if args.db:
364
logger.info('registering functions with database')
365
app.register_functions(args.db, replace=args.replace_existing)
366
367
# Accept connections forever.
368
while True:
369
# Listen for the next connection on our port.
370
connection, client_address = server.accept()
371
372
if args.process_mode == 'thread':
373
tcls = threading.Thread
374
else:
375
tcls = multiprocessing.Process # type: ignore
376
377
t = tcls(
378
target=_handle_request,
379
args=(app, connection, client_address),
380
)
381
382
t.start()
383
384
# NOTE: The following line forces this process to handle requests
385
# serially. This makes it easier to understand what's going on.
386
# In real life, though, parallel is much faster. To use parallel
387
# handling, just comment out the next line.
388
if args.single_thread:
389
t.join()
390
391
except KeyboardInterrupt:
392
return
393
394
finally:
395
if args.db:
396
logger.info('dropping functions from database')
397
app.drop_functions(args.db)
398
399
# Remove the socket file before we exit.
400
try:
401
os.unlink(args.socket_path)
402
except (IOError, OSError):
403
logger.error(f'could not remove socket path: {args.socket_path}')
404
405
406
if __name__ == '__main__':
407
try:
408
main()
409
except RuntimeError as exc:
410
logger.error(str(exc))
411
sys.exit(1)
412
except KeyboardInterrupt:
413
pass
414
415