Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/functions/ext/asgi.py
469 views
1
#!/usr/bin/env python3
2
"""
3
Web application for SingleStoreDB external functions.
4
5
This module supplies a function that can create web apps intended for use
6
with the external function feature of SingleStoreDB. The application
7
function is a standard ASGI <https://asgi.readthedocs.io/en/latest/index.html>
8
request handler for use with servers such as Uvicorn <https://www.uvicorn.org>.
9
10
An external function web application can be created using the `create_app`
11
function. By default, the exported Python functions are specified by
12
environment variables starting with SINGLESTOREDB_EXT_FUNCTIONS. See the
13
documentation in `create_app` for the full syntax. If the application is
14
created in Python code rather than from the command-line, exported
15
functions can be specified in the parameters.
16
17
An example of starting a server is shown below.
18
19
Example
20
-------
21
> SINGLESTOREDB_EXT_FUNCTIONS='myfuncs.[percentile_90,percentile_95]' \
22
python3 -m singlestoredb.functions.ext.asgi
23
24
"""
25
import argparse
26
import asyncio
27
import contextvars
28
import dataclasses
29
import datetime
30
import functools
31
import importlib.util
32
import inspect
33
import io
34
import itertools
35
import json
36
import logging
37
import os
38
import re
39
import secrets
40
import sys
41
import tempfile
42
import textwrap
43
import threading
44
import time
45
import typing
46
import urllib
47
import uuid
48
import zipfile
49
import zipimport
50
from types import ModuleType
51
from typing import Any
52
from typing import Awaitable
53
from typing import Callable
54
from typing import Dict
55
from typing import Iterable
56
from typing import List
57
from typing import Optional
58
from typing import Sequence
59
from typing import Set
60
from typing import Tuple
61
from typing import Union
62
63
from . import arrow
64
from . import json as jdata
65
from . import rowdat_1
66
from . import utils
67
from ... import connection
68
from ... import manage_workspaces
69
from ...config import get_option
70
from ...mysql.constants import FIELD_TYPE as ft
71
from ..signature import get_signature
72
from ..signature import signature_to_sql
73
from ..typing import Masked
74
from ..typing import Table
75
from .timer import Timer
76
from singlestoredb.docstring.parser import parse
77
from singlestoredb.functions.dtypes import escape_name
78
79
try:
80
import cloudpickle
81
has_cloudpickle = True
82
except ImportError:
83
has_cloudpickle = False
84
85
try:
86
from pydantic import BaseModel
87
has_pydantic = True
88
except ImportError:
89
has_pydantic = False
90
91
92
logger = utils.get_logger('singlestoredb.functions.ext.asgi')
93
94
# If a number of processes is specified, create a pool of workers
95
num_processes = max(0, int(os.environ.get('SINGLESTOREDB_EXT_NUM_PROCESSES', 0)))
96
if num_processes > 1:
97
try:
98
from ray.util.multiprocessing import Pool
99
except ImportError:
100
from multiprocessing import Pool
101
func_map = Pool(num_processes).starmap
102
else:
103
func_map = itertools.starmap
104
105
106
async def to_thread(
107
func: Any, /, *args: Any, **kwargs: Dict[str, Any],
108
) -> Any:
109
loop = asyncio.get_running_loop()
110
ctx = contextvars.copy_context()
111
func_call = functools.partial(ctx.run, func, *args, **kwargs)
112
return await loop.run_in_executor(None, func_call)
113
114
115
# Use negative values to indicate unsigned ints / binary data / usec time precision
116
rowdat_1_type_map = {
117
'bool': ft.LONGLONG,
118
'int8': ft.LONGLONG,
119
'int16': ft.LONGLONG,
120
'int32': ft.LONGLONG,
121
'int64': ft.LONGLONG,
122
'uint8': -ft.LONGLONG,
123
'uint16': -ft.LONGLONG,
124
'uint32': -ft.LONGLONG,
125
'uint64': -ft.LONGLONG,
126
'float32': ft.DOUBLE,
127
'float64': ft.DOUBLE,
128
'str': ft.STRING,
129
'bytes': -ft.STRING,
130
}
131
132
133
def get_func_names(funcs: str) -> List[Tuple[str, str]]:
134
"""
135
Parse all function names from string.
136
137
Parameters
138
----------
139
func_names : str
140
String containing one or more function names. The syntax is
141
as follows: [func-name-1@func-alias-1,func-name-2@func-alias-2,...].
142
The optional '@name' portion is an alias if you want the function
143
to be renamed.
144
145
Returns
146
-------
147
List[Tuple[str]] : a list of tuples containing the names and aliases
148
of each function.
149
150
"""
151
if funcs.startswith('['):
152
func_names = funcs.replace('[', '').replace(']', '').split(',')
153
func_names = [x.strip() for x in func_names]
154
else:
155
func_names = [funcs]
156
157
out = []
158
for name in func_names:
159
alias = name
160
if '@' in name:
161
name, alias = name.split('@', 1)
162
out.append((name, alias))
163
164
return out
165
166
167
def as_tuple(x: Any) -> Any:
168
"""Convert object to tuple."""
169
if has_pydantic and isinstance(x, BaseModel):
170
return tuple(x.model_dump().values())
171
if dataclasses.is_dataclass(x):
172
return dataclasses.astuple(x) # type: ignore
173
if isinstance(x, dict):
174
return tuple(x.values())
175
return tuple(x)
176
177
178
def as_list_of_tuples(x: Any) -> Any:
179
"""Convert object to a list of tuples."""
180
if isinstance(x, Table):
181
x = x[0]
182
if isinstance(x, (list, tuple)) and len(x) > 0:
183
if isinstance(x[0], (list, tuple)):
184
return x
185
if has_pydantic and isinstance(x[0], BaseModel):
186
return [tuple(y.model_dump().values()) for y in x]
187
if dataclasses.is_dataclass(x[0]):
188
return [dataclasses.astuple(y) for y in x]
189
if isinstance(x[0], dict):
190
return [tuple(y.values()) for y in x]
191
return [(y,) for y in x]
192
return x
193
194
195
def get_dataframe_columns(df: Any) -> List[Any]:
196
"""Return columns of data from a dataframe/table."""
197
if isinstance(df, Table):
198
if len(df) == 1:
199
df = df[0]
200
else:
201
return list(df)
202
203
if isinstance(df, Masked):
204
return [df]
205
206
if isinstance(df, tuple):
207
return list(df)
208
209
rtype = str(type(df)).lower()
210
if 'dataframe' in rtype:
211
return [df[x] for x in df.columns]
212
elif 'table' in rtype:
213
return df.columns
214
elif 'series' in rtype:
215
return [df]
216
elif 'array' in rtype:
217
return [df]
218
elif 'tuple' in rtype:
219
return list(df)
220
221
raise TypeError(
222
'Unsupported data type for dataframe columns: '
223
f'{rtype}',
224
)
225
226
227
def get_array_class(data_format: str) -> Callable[..., Any]:
228
"""
229
Get the array class for the current data format.
230
231
"""
232
if data_format == 'polars':
233
import polars as pl
234
array_cls = pl.Series
235
elif data_format == 'arrow':
236
import pyarrow as pa
237
array_cls = pa.array
238
elif data_format == 'pandas':
239
import pandas as pd
240
array_cls = pd.Series
241
else:
242
import numpy as np
243
array_cls = np.array
244
return array_cls
245
246
247
def get_masked_params(func: Callable[..., Any]) -> List[bool]:
248
"""
249
Get the list of masked parameters for the function.
250
251
Parameters
252
----------
253
func : Callable
254
The function to call as the endpoint
255
256
Returns
257
-------
258
List[bool]
259
Boolean list of masked parameters
260
261
"""
262
params = inspect.signature(func).parameters
263
return [typing.get_origin(x.annotation) is Masked for x in params.values()]
264
265
266
def build_tuple(x: Any) -> Any:
267
"""Convert object to tuple."""
268
return tuple(x) if isinstance(x, Masked) else (x, None)
269
270
271
def cancel_on_event(
272
cancel_event: threading.Event,
273
) -> None:
274
"""
275
Cancel the function call if the cancel event is set.
276
277
Parameters
278
----------
279
cancel_event : threading.Event
280
The event to check for cancellation
281
282
Raises
283
------
284
asyncio.CancelledError
285
If the cancel event is set
286
287
"""
288
if cancel_event.is_set():
289
task = asyncio.current_task()
290
if task is not None:
291
task.cancel()
292
raise asyncio.CancelledError(
293
'Function call was cancelled by client',
294
)
295
296
297
def build_udf_endpoint(
298
func: Callable[..., Any],
299
returns_data_format: str,
300
) -> Callable[..., Any]:
301
"""
302
Build a UDF endpoint for scalar / list types (row-based).
303
304
Parameters
305
----------
306
func : Callable
307
The function to call as the endpoint
308
returns_data_format : str
309
The format of the return values
310
311
Returns
312
-------
313
Callable
314
The function endpoint
315
316
"""
317
if returns_data_format in ['scalar', 'list']:
318
319
is_async = asyncio.iscoroutinefunction(func)
320
321
async def do_func(
322
cancel_event: threading.Event,
323
timer: Timer,
324
row_ids: Sequence[int],
325
rows: Sequence[Sequence[Any]],
326
) -> Tuple[Sequence[int], List[Tuple[Any, ...]]]:
327
'''Call function on given rows of data.'''
328
out = []
329
async with timer('call_function'):
330
for row in rows:
331
cancel_on_event(cancel_event)
332
if is_async:
333
out.append(await func(*row))
334
else:
335
out.append(func(*row))
336
return row_ids, list(zip(out))
337
338
return do_func
339
340
return build_vector_udf_endpoint(func, returns_data_format)
341
342
343
def build_vector_udf_endpoint(
344
func: Callable[..., Any],
345
returns_data_format: str,
346
) -> Callable[..., Any]:
347
"""
348
Build a UDF endpoint for vector formats (column-based).
349
350
Parameters
351
----------
352
func : Callable
353
The function to call as the endpoint
354
returns_data_format : str
355
The format of the return values
356
357
Returns
358
-------
359
Callable
360
The function endpoint
361
362
"""
363
masks = get_masked_params(func)
364
array_cls = get_array_class(returns_data_format)
365
is_async = asyncio.iscoroutinefunction(func)
366
367
async def do_func(
368
cancel_event: threading.Event,
369
timer: Timer,
370
row_ids: Sequence[int],
371
cols: Sequence[Tuple[Sequence[Any], Optional[Sequence[bool]]]],
372
) -> Tuple[
373
Sequence[int],
374
List[Tuple[Sequence[Any], Optional[Sequence[bool]]]],
375
]:
376
'''Call function on given columns of data.'''
377
row_ids = array_cls(row_ids)
378
379
# Call the function with `cols` as the function parameters
380
async with timer('call_function'):
381
if cols and cols[0]:
382
if is_async:
383
out = await func(*[x if m else x[0] for x, m in zip(cols, masks)])
384
else:
385
out = func(*[x if m else x[0] for x, m in zip(cols, masks)])
386
else:
387
if is_async:
388
out = await func()
389
else:
390
out = func()
391
392
cancel_on_event(cancel_event)
393
394
# Single masked value
395
if isinstance(out, Masked):
396
return row_ids, [tuple(out)]
397
398
# Multiple return values
399
if isinstance(out, tuple):
400
return row_ids, [build_tuple(x) for x in out]
401
402
# Single return value
403
return row_ids, [(out, None)]
404
405
return do_func
406
407
408
def build_tvf_endpoint(
409
func: Callable[..., Any],
410
returns_data_format: str,
411
) -> Callable[..., Any]:
412
"""
413
Build a TVF endpoint for scalar / list types (row-based).
414
415
Parameters
416
----------
417
func : Callable
418
The function to call as the endpoint
419
returns_data_format : str
420
The format of the return values
421
422
Returns
423
-------
424
Callable
425
The function endpoint
426
427
"""
428
if returns_data_format in ['scalar', 'list']:
429
430
is_async = asyncio.iscoroutinefunction(func)
431
432
async def do_func(
433
cancel_event: threading.Event,
434
timer: Timer,
435
row_ids: Sequence[int],
436
rows: Sequence[Sequence[Any]],
437
) -> Tuple[Sequence[int], List[Tuple[Any, ...]]]:
438
'''Call function on given rows of data.'''
439
out_ids: List[int] = []
440
out = []
441
# Call function on each row of data
442
async with timer('call_function'):
443
for i, row in zip(row_ids, rows):
444
cancel_on_event(cancel_event)
445
if is_async:
446
res = await func(*row)
447
else:
448
res = func(*row)
449
out.extend(as_list_of_tuples(res))
450
out_ids.extend([row_ids[i]] * (len(out)-len(out_ids)))
451
return out_ids, out
452
453
return do_func
454
455
return build_vector_tvf_endpoint(func, returns_data_format)
456
457
458
def build_vector_tvf_endpoint(
459
func: Callable[..., Any],
460
returns_data_format: str,
461
) -> Callable[..., Any]:
462
"""
463
Build a TVF endpoint for vector formats (column-based).
464
465
Parameters
466
----------
467
func : Callable
468
The function to call as the endpoint
469
returns_data_format : str
470
The format of the return values
471
472
Returns
473
-------
474
Callable
475
The function endpoint
476
477
"""
478
masks = get_masked_params(func)
479
array_cls = get_array_class(returns_data_format)
480
481
async def do_func(
482
cancel_event: threading.Event,
483
timer: Timer,
484
row_ids: Sequence[int],
485
cols: Sequence[Tuple[Sequence[Any], Optional[Sequence[bool]]]],
486
) -> Tuple[
487
Sequence[int],
488
List[Tuple[Sequence[Any], Optional[Sequence[bool]]]],
489
]:
490
'''Call function on given columns of data.'''
491
# NOTE: There is no way to determine which row ID belongs to
492
# each result row, so we just have to use the same
493
# row ID for all rows in the result.
494
495
is_async = asyncio.iscoroutinefunction(func)
496
497
# Call function on each column of data
498
async with timer('call_function'):
499
if cols and cols[0]:
500
if is_async:
501
func_res = await func(
502
*[x if m else x[0] for x, m in zip(cols, masks)],
503
)
504
else:
505
func_res = func(
506
*[x if m else x[0] for x, m in zip(cols, masks)],
507
)
508
else:
509
if is_async:
510
func_res = await func()
511
else:
512
func_res = func()
513
514
res = get_dataframe_columns(func_res)
515
516
cancel_on_event(cancel_event)
517
518
# Generate row IDs
519
if isinstance(res[0], Masked):
520
row_ids = array_cls([row_ids[0]] * len(res[0][0]))
521
else:
522
row_ids = array_cls([row_ids[0]] * len(res[0]))
523
524
return row_ids, [build_tuple(x) for x in res]
525
526
return do_func
527
528
529
def make_func(
530
name: str,
531
func: Callable[..., Any],
532
) -> Tuple[Callable[..., Any], Dict[str, Any]]:
533
"""
534
Make a function endpoint.
535
536
Parameters
537
----------
538
name : str
539
Name of the function to create
540
func : Callable
541
The function to call as the endpoint
542
database : str, optional
543
The database to use for the function definition
544
545
Returns
546
-------
547
(Callable, Dict[str, Any])
548
549
"""
550
info: Dict[str, Any] = {}
551
552
sig = get_signature(func, func_name=name)
553
554
function_type = sig.get('function_type', 'udf')
555
args_data_format = sig.get('args_data_format', 'scalar')
556
returns_data_format = sig.get('returns_data_format', 'scalar')
557
timeout = (
558
func._singlestoredb_attrs.get('timeout') or # type: ignore
559
get_option('external_function.timeout')
560
)
561
562
if function_type == 'tvf':
563
do_func = build_tvf_endpoint(func, returns_data_format)
564
else:
565
do_func = build_udf_endpoint(func, returns_data_format)
566
567
do_func.__name__ = name
568
do_func.__doc__ = func.__doc__
569
570
# Store signature for generating CREATE FUNCTION calls
571
info['signature'] = sig
572
573
# Set data format
574
info['args_data_format'] = args_data_format
575
info['returns_data_format'] = returns_data_format
576
577
# Set function type
578
info['function_type'] = function_type
579
580
# Set timeout
581
info['timeout'] = max(timeout, 1)
582
583
# Set async flag
584
info['is_async'] = asyncio.iscoroutinefunction(func)
585
586
# Setup argument types for rowdat_1 parser
587
colspec = []
588
for x in sig['args']:
589
dtype = x['dtype'].replace('?', '')
590
if dtype not in rowdat_1_type_map:
591
raise TypeError(f'no data type mapping for {dtype}')
592
colspec.append((x['name'], rowdat_1_type_map[dtype]))
593
info['colspec'] = colspec
594
595
# Setup return type
596
returns = []
597
for x in sig['returns']:
598
dtype = x['dtype'].replace('?', '')
599
if dtype not in rowdat_1_type_map:
600
raise TypeError(f'no data type mapping for {dtype}')
601
returns.append((x['name'], rowdat_1_type_map[dtype]))
602
info['returns'] = returns
603
604
return do_func, info
605
606
607
async def cancel_on_timeout(timeout: int) -> None:
608
"""Cancel request if it takes too long."""
609
await asyncio.sleep(timeout)
610
raise asyncio.CancelledError(
611
'Function call was cancelled due to timeout',
612
)
613
614
615
async def cancel_on_disconnect(
616
receive: Callable[..., Awaitable[Any]],
617
) -> None:
618
"""Cancel request if client disconnects."""
619
while True:
620
message = await receive()
621
if message.get('type', '') == 'http.disconnect':
622
raise asyncio.CancelledError(
623
'Function call was cancelled by client',
624
)
625
626
627
async def cancel_all_tasks(tasks: Iterable[asyncio.Task[Any]]) -> None:
628
"""Cancel all tasks."""
629
for task in tasks:
630
task.cancel()
631
await asyncio.gather(*tasks, return_exceptions=True)
632
633
634
def start_counter() -> float:
635
"""Start a timer and return the start time."""
636
return time.perf_counter()
637
638
639
def end_counter(start: float) -> float:
640
"""End a timer and return the elapsed time."""
641
return time.perf_counter() - start
642
643
644
class Application(object):
645
"""
646
Create an external function application.
647
648
If `functions` is None, the environment is searched for function
649
specifications in variables starting with `SINGLESTOREDB_EXT_FUNCTIONS`.
650
Any number of environment variables can be specified as long as they
651
have this prefix. The format of the environment variable value is the
652
same as for the `functions` parameter.
653
654
Parameters
655
----------
656
functions : str or Iterable[str], optional
657
Python functions are specified using a string format as follows:
658
* Single function : <pkg1>.<func1>
659
* Multiple functions : <pkg1>.[<func1-name,func2-name,...]
660
* Function aliases : <pkg1>.[<func1@alias1,func2@alias2,...]
661
* Multiple packages : <pkg1>.<func1>:<pkg2>.<func2>
662
app_mode : str, optional
663
The mode of operation for the application: remote, managed, or collocated
664
url : str, optional
665
The URL of the function API
666
data_format : str, optional
667
The format of the data rows: 'rowdat_1' or 'json'
668
data_version : str, optional
669
The version of the call format to expect: '1.0'
670
link_name : str, optional
671
The link name to use for the external function application. This is
672
only for pre-existing links, and can only be used without
673
``link_config`` and ``link_credentials``.
674
link_config : Dict[str, Any], optional
675
The CONFIG section of a LINK definition. This dictionary gets
676
converted to JSON for the CREATE LINK call.
677
link_credentials : Dict[str, Any], optional
678
The CREDENTIALS section of a LINK definition. This dictionary gets
679
converted to JSON for the CREATE LINK call.
680
name_prefix : str, optional
681
Prefix to add to function names when registering with the database
682
name_suffix : str, optional
683
Suffix to add to function names when registering with the database
684
function_database : str, optional
685
The database to use for external function definitions.
686
log_file : str, optional
687
File path to write logs to instead of console. If None, logs are
688
written to console. When specified, application logger handlers
689
are replaced with a file handler.
690
log_level : str, optional
691
Logging level for the application logger. Valid values are 'info',
692
'debug', 'warning', 'error'. Defaults to 'info'.
693
disable_metrics : bool, optional
694
Disable logging of function call metrics. Defaults to False.
695
app_name : str, optional
696
Name for the application instance. Used to create a logger-specific
697
name. If not provided, a random name will be generated.
698
699
"""
700
701
# Plain text response start
702
text_response_dict: Dict[str, Any] = dict(
703
type='http.response.start',
704
status=200,
705
headers=[(b'content-type', b'text/plain')],
706
)
707
708
# Error response start
709
error_response_dict: Dict[str, Any] = dict(
710
type='http.response.start',
711
status=401,
712
headers=[(b'content-type', b'text/plain')],
713
)
714
715
# JSON response start
716
json_response_dict: Dict[str, Any] = dict(
717
type='http.response.start',
718
status=200,
719
headers=[(b'content-type', b'application/json')],
720
)
721
722
# ROWDAT_1 response start
723
rowdat_1_response_dict: Dict[str, Any] = dict(
724
type='http.response.start',
725
status=200,
726
headers=[(b'content-type', b'x-application/rowdat_1')],
727
)
728
729
# Apache Arrow response start
730
arrow_response_dict: Dict[str, Any] = dict(
731
type='http.response.start',
732
status=200,
733
headers=[(b'content-type', b'application/vnd.apache.arrow.file')],
734
)
735
736
# Path not found response start
737
path_not_found_response_dict: Dict[str, Any] = dict(
738
type='http.response.start',
739
status=404,
740
)
741
742
# Response body template
743
body_response_dict: Dict[str, Any] = dict(
744
type='http.response.body',
745
)
746
747
# Data format + version handlers
748
handlers = {
749
(b'application/octet-stream', b'1.0', 'scalar'): dict(
750
load=rowdat_1.load,
751
dump=rowdat_1.dump,
752
response=rowdat_1_response_dict,
753
),
754
(b'application/octet-stream', b'1.0', 'list'): dict(
755
load=rowdat_1.load,
756
dump=rowdat_1.dump,
757
response=rowdat_1_response_dict,
758
),
759
(b'application/octet-stream', b'1.0', 'pandas'): dict(
760
load=rowdat_1.load_pandas,
761
dump=rowdat_1.dump_pandas,
762
response=rowdat_1_response_dict,
763
),
764
(b'application/octet-stream', b'1.0', 'numpy'): dict(
765
load=rowdat_1.load_numpy,
766
dump=rowdat_1.dump_numpy,
767
response=rowdat_1_response_dict,
768
),
769
(b'application/octet-stream', b'1.0', 'polars'): dict(
770
load=rowdat_1.load_polars,
771
dump=rowdat_1.dump_polars,
772
response=rowdat_1_response_dict,
773
),
774
(b'application/octet-stream', b'1.0', 'arrow'): dict(
775
load=rowdat_1.load_arrow,
776
dump=rowdat_1.dump_arrow,
777
response=rowdat_1_response_dict,
778
),
779
(b'application/json', b'1.0', 'scalar'): dict(
780
load=jdata.load,
781
dump=jdata.dump,
782
response=json_response_dict,
783
),
784
(b'application/json', b'1.0', 'list'): dict(
785
load=jdata.load,
786
dump=jdata.dump,
787
response=json_response_dict,
788
),
789
(b'application/json', b'1.0', 'pandas'): dict(
790
load=jdata.load_pandas,
791
dump=jdata.dump_pandas,
792
response=json_response_dict,
793
),
794
(b'application/json', b'1.0', 'numpy'): dict(
795
load=jdata.load_numpy,
796
dump=jdata.dump_numpy,
797
response=json_response_dict,
798
),
799
(b'application/json', b'1.0', 'polars'): dict(
800
load=jdata.load_polars,
801
dump=jdata.dump_polars,
802
response=json_response_dict,
803
),
804
(b'application/json', b'1.0', 'arrow'): dict(
805
load=jdata.load_arrow,
806
dump=jdata.dump_arrow,
807
response=json_response_dict,
808
),
809
(b'application/vnd.apache.arrow.file', b'1.0', 'scalar'): dict(
810
load=arrow.load,
811
dump=arrow.dump,
812
response=arrow_response_dict,
813
),
814
(b'application/vnd.apache.arrow.file', b'1.0', 'pandas'): dict(
815
load=arrow.load_pandas,
816
dump=arrow.dump_pandas,
817
response=arrow_response_dict,
818
),
819
(b'application/vnd.apache.arrow.file', b'1.0', 'numpy'): dict(
820
load=arrow.load_numpy,
821
dump=arrow.dump_numpy,
822
response=arrow_response_dict,
823
),
824
(b'application/vnd.apache.arrow.file', b'1.0', 'polars'): dict(
825
load=arrow.load_polars,
826
dump=arrow.dump_polars,
827
response=arrow_response_dict,
828
),
829
(b'application/vnd.apache.arrow.file', b'1.0', 'arrow'): dict(
830
load=arrow.load_arrow,
831
dump=arrow.dump_arrow,
832
response=arrow_response_dict,
833
),
834
}
835
836
# Valid URL paths
837
invoke_path = ('invoke',)
838
show_create_function_path = ('show', 'create_function')
839
show_function_info_path = ('show', 'function_info')
840
status = ('status',)
841
842
def __init__(
843
self,
844
functions: Optional[
845
Union[
846
str,
847
Iterable[str],
848
Callable[..., Any],
849
Iterable[Callable[..., Any]],
850
ModuleType,
851
Iterable[ModuleType],
852
]
853
] = None,
854
app_mode: str = get_option('external_function.app_mode'),
855
url: str = get_option('external_function.url'),
856
data_format: str = get_option('external_function.data_format'),
857
data_version: str = get_option('external_function.data_version'),
858
link_name: Optional[str] = get_option('external_function.link_name'),
859
link_config: Optional[Dict[str, Any]] = None,
860
link_credentials: Optional[Dict[str, Any]] = None,
861
name_prefix: str = get_option('external_function.name_prefix'),
862
name_suffix: str = get_option('external_function.name_suffix'),
863
function_database: Optional[str] = None,
864
log_file: Optional[str] = get_option('external_function.log_file'),
865
log_level: str = get_option('external_function.log_level'),
866
disable_metrics: bool = get_option('external_function.disable_metrics'),
867
app_name: Optional[str] = get_option('external_function.app_name'),
868
) -> None:
869
if link_name and (link_config or link_credentials):
870
raise ValueError(
871
'`link_name` can not be used with `link_config` or `link_credentials`',
872
)
873
874
if link_config is None:
875
link_config = json.loads(
876
get_option('external_function.link_config') or '{}',
877
) or None
878
879
if link_credentials is None:
880
link_credentials = json.loads(
881
get_option('external_function.link_credentials') or '{}',
882
) or None
883
884
# Generate application name if not provided
885
if app_name is None:
886
app_name = f'udf_app_{secrets.token_hex(4)}'
887
888
self.name = app_name
889
890
# Create logger instance specific to this application
891
self.logger = utils.get_logger(f'singlestoredb.functions.ext.asgi.{self.name}')
892
893
# List of functions specs
894
specs: List[Union[str, Callable[..., Any], ModuleType]] = []
895
896
# Look up Python function specifications
897
if functions is None:
898
env_vars = [
899
x for x in os.environ.keys()
900
if x.startswith('SINGLESTOREDB_EXT_FUNCTIONS')
901
]
902
if env_vars:
903
specs = [os.environ[x] for x in env_vars]
904
else:
905
import __main__
906
specs = [__main__]
907
908
elif isinstance(functions, ModuleType):
909
specs = [functions]
910
911
elif isinstance(functions, str):
912
specs = [functions]
913
914
elif callable(functions):
915
specs = [functions]
916
917
else:
918
specs = list(functions)
919
920
# Add functions to application
921
endpoints = dict()
922
external_functions = dict()
923
for funcs in itertools.chain(specs):
924
925
if isinstance(funcs, str):
926
# Module name
927
if importlib.util.find_spec(funcs) is not None:
928
items = importlib.import_module(funcs)
929
for x in vars(items).values():
930
if not hasattr(x, '_singlestoredb_attrs'):
931
continue
932
name = x._singlestoredb_attrs.get('name', x.__name__)
933
name = f'{name_prefix}{name}{name_suffix}'
934
external_functions[x.__name__] = x
935
func, info = make_func(name, x)
936
endpoints[name.encode('utf-8')] = func, info
937
938
# Fully qualified function name
939
elif '.' in funcs:
940
pkg_path, func_names = funcs.rsplit('.', 1)
941
pkg = importlib.import_module(pkg_path)
942
943
if pkg is None:
944
raise RuntimeError(f'Could not locate module: {pkg}')
945
946
# Add endpoint for each exported function
947
for name, alias in get_func_names(func_names):
948
item = getattr(pkg, name)
949
alias = f'{name_prefix}{name}{name_suffix}'
950
external_functions[name] = item
951
func, info = make_func(alias, item)
952
endpoints[alias.encode('utf-8')] = func, info
953
954
else:
955
raise RuntimeError(f'Could not locate module: {funcs}')
956
957
elif isinstance(funcs, ModuleType):
958
for x in vars(funcs).values():
959
if not hasattr(x, '_singlestoredb_attrs'):
960
continue
961
name = x._singlestoredb_attrs.get('name', x.__name__)
962
name = f'{name_prefix}{name}{name_suffix}'
963
external_functions[x.__name__] = x
964
func, info = make_func(name, x)
965
endpoints[name.encode('utf-8')] = func, info
966
967
else:
968
alias = funcs.__name__
969
external_functions[funcs.__name__] = funcs
970
alias = f'{name_prefix}{alias}{name_suffix}'
971
func, info = make_func(alias, funcs)
972
endpoints[alias.encode('utf-8')] = func, info
973
974
self.app_mode = app_mode
975
self.url = url
976
self.data_format = data_format
977
self.data_version = data_version
978
self.link_name = link_name
979
self.link_config = link_config
980
self.link_credentials = link_credentials
981
self.endpoints = endpoints
982
self.external_functions = external_functions
983
self.function_database = function_database
984
self.log_file = log_file
985
self.log_level = log_level
986
self.disable_metrics = disable_metrics
987
988
# Configure logging
989
self._configure_logging()
990
991
def _configure_logging(self) -> None:
992
"""Configure logging based on the log_file settings."""
993
# Set logger level
994
self.logger.setLevel(getattr(logging, self.log_level.upper()))
995
996
# Remove all existing handlers to ensure clean configuration
997
self.logger.handlers.clear()
998
999
# Configure log file if specified
1000
if self.log_file:
1001
# Create file handler
1002
file_handler = logging.FileHandler(self.log_file)
1003
file_handler.setLevel(getattr(logging, self.log_level.upper()))
1004
1005
# Use JSON formatter for file logging
1006
formatter = utils.JSONFormatter()
1007
file_handler.setFormatter(formatter)
1008
1009
# Add the handler to the logger
1010
self.logger.addHandler(file_handler)
1011
else:
1012
# For console logging, create a new stream handler with JSON formatter
1013
console_handler = logging.StreamHandler()
1014
console_handler.setLevel(getattr(logging, self.log_level.upper()))
1015
console_handler.setFormatter(utils.JSONFormatter())
1016
self.logger.addHandler(console_handler)
1017
1018
# Prevent propagation to avoid duplicate or differently formatted messages
1019
self.logger.propagate = False
1020
1021
def get_uvicorn_log_config(self) -> Dict[str, Any]:
1022
"""
1023
Create uvicorn log config that matches the Application's logging format.
1024
1025
This method returns the log configuration used by uvicorn, allowing external
1026
users to match the logging format of the Application class.
1027
1028
Returns
1029
-------
1030
Dict[str, Any]
1031
Log configuration dictionary compatible with uvicorn's log_config parameter
1032
1033
"""
1034
log_config = {
1035
'version': 1,
1036
'disable_existing_loggers': False,
1037
'formatters': {
1038
'json': {
1039
'()': 'singlestoredb.functions.ext.utils.JSONFormatter',
1040
},
1041
},
1042
'handlers': {
1043
'default': {
1044
'class': (
1045
'logging.FileHandler' if self.log_file
1046
else 'logging.StreamHandler'
1047
),
1048
'formatter': 'json',
1049
},
1050
},
1051
'loggers': {
1052
'uvicorn': {
1053
'handlers': ['default'],
1054
'level': self.log_level.upper(),
1055
'propagate': False,
1056
},
1057
'uvicorn.error': {
1058
'handlers': ['default'],
1059
'level': self.log_level.upper(),
1060
'propagate': False,
1061
},
1062
'uvicorn.access': {
1063
'handlers': ['default'],
1064
'level': self.log_level.upper(),
1065
'propagate': False,
1066
},
1067
},
1068
}
1069
1070
# Add filename to file handler if log file is specified
1071
if self.log_file:
1072
log_config['handlers']['default']['filename'] = self.log_file # type: ignore
1073
1074
return log_config
1075
1076
async def __call__(
1077
self,
1078
scope: Dict[str, Any],
1079
receive: Callable[..., Awaitable[Any]],
1080
send: Callable[..., Awaitable[Any]],
1081
) -> None:
1082
'''
1083
Application request handler.
1084
1085
Parameters
1086
----------
1087
scope : dict
1088
ASGI request scope
1089
receive : Callable
1090
Function to receieve request information
1091
send : Callable
1092
Function to send response information
1093
1094
'''
1095
request_id = str(uuid.uuid4())
1096
1097
timer = Timer(
1098
app_name=self.name,
1099
id=request_id,
1100
timestamp=datetime.datetime.now(
1101
datetime.timezone.utc,
1102
).strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
1103
)
1104
call_timer = Timer(
1105
app_name=self.name,
1106
id=request_id,
1107
timestamp=datetime.datetime.now(
1108
datetime.timezone.utc,
1109
).strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
1110
)
1111
1112
if scope['type'] != 'http':
1113
raise ValueError(f"Expected HTTP scope, got {scope['type']}")
1114
1115
method = scope['method']
1116
path = tuple(x for x in scope['path'].split('/') if x)
1117
headers = dict(scope['headers'])
1118
1119
content_type = headers.get(
1120
b'content-type',
1121
b'application/octet-stream',
1122
)
1123
accepts = headers.get(b'accepts', content_type)
1124
func_name = headers.get(b's2-ef-name', b'')
1125
func_endpoint = self.endpoints.get(func_name)
1126
ignore_cancel = headers.get(b's2-ef-ignore-cancel', b'false') == b'true'
1127
1128
timer.metadata['function'] = func_name.decode('utf-8') if func_name else ''
1129
call_timer.metadata['function'] = timer.metadata['function']
1130
1131
func = None
1132
func_info: Dict[str, Any] = {}
1133
if func_endpoint is not None:
1134
func, func_info = func_endpoint
1135
1136
# Call the endpoint
1137
if method == 'POST' and func is not None and path == self.invoke_path:
1138
1139
self.logger.info(
1140
'Function call initiated',
1141
extra={
1142
'app_name': self.name,
1143
'request_id': request_id,
1144
'function_name': func_name.decode('utf-8'),
1145
'content_type': content_type.decode('utf-8'),
1146
'accepts': accepts.decode('utf-8'),
1147
},
1148
)
1149
1150
args_data_format = func_info['args_data_format']
1151
returns_data_format = func_info['returns_data_format']
1152
data = []
1153
more_body = True
1154
with timer('receive_data'):
1155
while more_body:
1156
request = await receive()
1157
if request.get('type', '') == 'http.disconnect':
1158
raise RuntimeError('client disconnected')
1159
data.append(request['body'])
1160
more_body = request.get('more_body', False)
1161
1162
data_version = headers.get(b's2-ef-version', b'')
1163
input_handler = self.handlers[(content_type, data_version, args_data_format)]
1164
output_handler = self.handlers[(accepts, data_version, returns_data_format)]
1165
1166
try:
1167
all_tasks = []
1168
result = []
1169
1170
cancel_event = threading.Event()
1171
1172
with timer('parse_input'):
1173
inputs = input_handler['load']( # type: ignore
1174
func_info['colspec'], b''.join(data),
1175
)
1176
1177
func_task = asyncio.create_task(
1178
func(cancel_event, call_timer, *inputs)
1179
if func_info['is_async']
1180
else to_thread(
1181
lambda: asyncio.run(
1182
func(cancel_event, call_timer, *inputs),
1183
),
1184
),
1185
)
1186
disconnect_task = asyncio.create_task(
1187
asyncio.sleep(int(1e9))
1188
if ignore_cancel else cancel_on_disconnect(receive),
1189
)
1190
timeout_task = asyncio.create_task(
1191
cancel_on_timeout(func_info['timeout']),
1192
)
1193
1194
all_tasks += [func_task, disconnect_task, timeout_task]
1195
1196
async with timer('function_wrapper'):
1197
done, pending = await asyncio.wait(
1198
all_tasks, return_when=asyncio.FIRST_COMPLETED,
1199
)
1200
1201
await cancel_all_tasks(pending)
1202
1203
for task in done:
1204
if task is disconnect_task:
1205
cancel_event.set()
1206
raise asyncio.CancelledError(
1207
'Function call was cancelled by client disconnect',
1208
)
1209
1210
elif task is timeout_task:
1211
cancel_event.set()
1212
raise asyncio.TimeoutError(
1213
'Function call was cancelled due to timeout',
1214
)
1215
1216
elif task is func_task:
1217
result.extend(task.result())
1218
1219
with timer('format_output'):
1220
body = output_handler['dump'](
1221
[x[1] for x in func_info['returns']], *result, # type: ignore
1222
)
1223
1224
await send(output_handler['response'])
1225
1226
except asyncio.TimeoutError:
1227
self.logger.exception(
1228
'Function call timeout',
1229
extra={
1230
'app_name': self.name,
1231
'request_id': request_id,
1232
'function_name': func_name.decode('utf-8'),
1233
'timeout': func_info['timeout'],
1234
},
1235
)
1236
body = (
1237
'[TimeoutError] Function call timed out after ' +
1238
str(func_info['timeout']) +
1239
' seconds'
1240
).encode('utf-8')
1241
await send(self.error_response_dict)
1242
1243
except asyncio.CancelledError:
1244
self.logger.exception(
1245
'Function call cancelled',
1246
extra={
1247
'app_name': self.name,
1248
'request_id': request_id,
1249
'function_name': func_name.decode('utf-8'),
1250
},
1251
)
1252
body = b'[CancelledError] Function call was cancelled'
1253
await send(self.error_response_dict)
1254
1255
except Exception as e:
1256
self.logger.exception(
1257
'Function call error',
1258
extra={
1259
'app_name': self.name,
1260
'request_id': request_id,
1261
'function_name': func_name.decode('utf-8'),
1262
'exception_type': type(e).__name__,
1263
},
1264
)
1265
body = f'[{type(e).__name__}] {str(e).strip()}'.encode('utf-8')
1266
await send(self.error_response_dict)
1267
1268
finally:
1269
await cancel_all_tasks(all_tasks)
1270
1271
# Handle api reflection
1272
elif method == 'GET' and path == self.show_create_function_path:
1273
host = headers.get(b'host', b'localhost:80')
1274
reflected_url = f'{scope["scheme"]}://{host.decode("utf-8")}/invoke'
1275
1276
syntax = []
1277
for key, (endpoint, endpoint_info) in self.endpoints.items():
1278
if not func_name or key == func_name:
1279
syntax.append(
1280
signature_to_sql(
1281
endpoint_info['signature'],
1282
url=self.url or reflected_url,
1283
data_format=self.data_format,
1284
database=self.function_database or None,
1285
),
1286
)
1287
body = '\n'.join(syntax).encode('utf-8')
1288
1289
await send(self.text_response_dict)
1290
1291
# Return function info
1292
elif method == 'GET' and (path == self.show_function_info_path or not path):
1293
functions = self.get_function_info()
1294
body = json.dumps(dict(functions=functions)).encode('utf-8')
1295
await send(self.text_response_dict)
1296
1297
# Return status
1298
elif method == 'GET' and path == self.status:
1299
body = json.dumps(dict(status='ok')).encode('utf-8')
1300
await send(self.text_response_dict)
1301
1302
# Path not found
1303
else:
1304
body = b''
1305
await send(self.path_not_found_response_dict)
1306
1307
# Send body
1308
with timer('send_response'):
1309
out = self.body_response_dict.copy()
1310
out['body'] = body
1311
await send(out)
1312
1313
for k, v in call_timer.metrics.items():
1314
timer.metrics[k] = v
1315
1316
if not self.disable_metrics:
1317
metrics = timer.finish()
1318
self.logger.info(
1319
'Function call metrics',
1320
extra={
1321
'app_name': self.name,
1322
'request_id': request_id,
1323
'function_name': timer.metadata.get('function', ''),
1324
'metrics': metrics,
1325
},
1326
)
1327
1328
def _create_link(
1329
self,
1330
config: Optional[Dict[str, Any]],
1331
credentials: Optional[Dict[str, Any]],
1332
) -> Tuple[str, str]:
1333
"""Generate CREATE LINK command."""
1334
if self.link_name:
1335
return self.link_name, ''
1336
1337
if not config and not credentials:
1338
return '', ''
1339
1340
link_name = f'py_ext_func_link_{secrets.token_hex(14)}'
1341
out = [f'CREATE LINK {link_name} AS HTTP']
1342
1343
if config:
1344
out.append(f"CONFIG '{json.dumps(config)}'")
1345
1346
if credentials:
1347
out.append(f"CREDENTIALS '{json.dumps(credentials)}'")
1348
1349
return link_name, ' '.join(out) + ';'
1350
1351
def _locate_app_functions(self, cur: Any) -> Tuple[Set[str], Set[str]]:
1352
"""Locate all current functions and links belonging to this app."""
1353
funcs, links = set(), set()
1354
if self.function_database:
1355
database_prefix = escape_name(self.function_database) + '.'
1356
cur.execute(f'SHOW FUNCTIONS IN {escape_name(self.function_database)}')
1357
else:
1358
database_prefix = ''
1359
cur.execute('SHOW FUNCTIONS')
1360
1361
for row in list(cur):
1362
name, ftype, link = row[0], row[1], row[-1]
1363
# Only look at external functions
1364
if 'external' not in ftype.lower():
1365
continue
1366
# See if function URL matches url
1367
cur.execute(f'SHOW CREATE FUNCTION {database_prefix}{escape_name(name)}')
1368
for fname, _, code, *_ in list(cur):
1369
m = re.search(r" (?:\w+) (?:SERVICE|MANAGED) '([^']+)'", code)
1370
if m and m.group(1) == self.url:
1371
funcs.add(f'{database_prefix}{escape_name(fname)}')
1372
if link and re.match(r'^py_ext_func_link_\S{14}$', link):
1373
links.add(link)
1374
1375
return funcs, links
1376
1377
def get_function_info(
1378
self,
1379
func_name: Optional[str] = None,
1380
) -> Dict[str, Any]:
1381
"""
1382
Return the functions and function signature information.
1383
1384
Returns
1385
-------
1386
Dict[str, Any]
1387
1388
"""
1389
functions = {}
1390
no_default = object()
1391
1392
# Generate CREATE FUNCTION SQL for each function using get_create_functions
1393
create_sqls = self.get_create_functions(replace=True)
1394
sql_map = {}
1395
for (_, info), sql in zip(self.endpoints.values(), create_sqls):
1396
sig = info['signature']
1397
sql_map[sig['name']] = sql
1398
1399
for key, (func, info) in self.endpoints.items():
1400
# Get info from docstring
1401
doc_summary = ''
1402
doc_long_description = ''
1403
doc_params = {}
1404
doc_returns = None
1405
doc_examples = []
1406
if func.__doc__:
1407
try:
1408
docs = parse(func.__doc__)
1409
doc_params = {p.arg_name: p for p in docs.params}
1410
doc_returns = docs.returns
1411
if not docs.short_description and docs.long_description:
1412
doc_summary = docs.long_description or ''
1413
else:
1414
doc_summary = docs.short_description or ''
1415
doc_long_description = docs.long_description or ''
1416
for ex in docs.examples:
1417
ex_dict: Dict[str, Any] = {
1418
'description': None,
1419
'code': None,
1420
'output': None,
1421
}
1422
if ex.description:
1423
ex_dict['description'] = ex.description
1424
if ex.snippet:
1425
code, output = [], []
1426
for line in ex.snippet.split('\n'):
1427
line = line.rstrip()
1428
if re.match(r'^(\w+>|>>>|\.\.\.)', line):
1429
code.append(line)
1430
else:
1431
output.append(line)
1432
ex_dict['code'] = '\n'.join(code) or None
1433
ex_dict['output'] = '\n'.join(output) or None
1434
if ex.post_snippet:
1435
ex_dict['postscript'] = ex.post_snippet
1436
doc_examples.append(ex_dict)
1437
1438
except Exception as e:
1439
self.logger.warning(
1440
'Could not parse docstring for function',
1441
extra={
1442
'app_name': self.name,
1443
'function_name': key.decode('utf-8'),
1444
'error': str(e),
1445
},
1446
)
1447
1448
if not func_name or key == func_name:
1449
sig = info['signature']
1450
args = []
1451
1452
# Function arguments
1453
for i, a in enumerate(sig.get('args', [])):
1454
name = a['name']
1455
dtype = a['dtype']
1456
nullable = '?' in dtype
1457
args.append(
1458
dict(
1459
name=name,
1460
dtype=dtype.replace('?', ''),
1461
nullable=nullable,
1462
description=(doc_params[name].description or '')
1463
if name in doc_params else '',
1464
),
1465
)
1466
if a.get('default', no_default) is not no_default:
1467
args[-1]['default'] = a['default']
1468
1469
# Return values
1470
ret = sig.get('returns', [])
1471
returns = []
1472
1473
for a in ret:
1474
dtype = a['dtype']
1475
nullable = '?' in dtype
1476
returns.append(
1477
dict(
1478
dtype=dtype.replace('?', ''),
1479
nullable=nullable,
1480
description=doc_returns.description
1481
if doc_returns else '',
1482
),
1483
)
1484
if a.get('name', None):
1485
returns[-1]['name'] = a['name']
1486
if a.get('default', no_default) is not no_default:
1487
returns[-1]['default'] = a['default']
1488
1489
sql = sql_map.get(sig['name'], '')
1490
functions[sig['name']] = dict(
1491
args=args,
1492
returns=returns,
1493
function_type=info['function_type'],
1494
sql_statement=sql,
1495
summary=doc_summary,
1496
long_description=doc_long_description,
1497
examples=doc_examples,
1498
)
1499
1500
return functions
1501
1502
def get_create_functions(
1503
self,
1504
replace: bool = False,
1505
) -> List[str]:
1506
"""
1507
Generate CREATE FUNCTION code for all functions.
1508
1509
Parameters
1510
----------
1511
replace : bool, optional
1512
Should existing functions be replaced?
1513
1514
Returns
1515
-------
1516
List[str]
1517
1518
"""
1519
if not self.endpoints:
1520
return []
1521
1522
out = []
1523
link = ''
1524
if self.app_mode.lower() == 'remote':
1525
link, link_str = self._create_link(self.link_config, self.link_credentials)
1526
if link and link_str:
1527
out.append(link_str)
1528
1529
for key, (endpoint, endpoint_info) in self.endpoints.items():
1530
out.append(
1531
signature_to_sql(
1532
endpoint_info['signature'],
1533
url=self.url,
1534
data_format=self.data_format,
1535
app_mode=self.app_mode,
1536
replace=replace,
1537
link=link or None,
1538
database=self.function_database or None,
1539
),
1540
)
1541
1542
return out
1543
1544
def register_functions(
1545
self,
1546
*connection_args: Any,
1547
replace: bool = False,
1548
**connection_kwargs: Any,
1549
) -> None:
1550
"""
1551
Register functions with the database.
1552
1553
Parameters
1554
----------
1555
*connection_args : Any
1556
Database connection parameters
1557
replace : bool, optional
1558
Should existing functions be replaced?
1559
**connection_kwargs : Any
1560
Database connection parameters
1561
1562
"""
1563
with connection.connect(*connection_args, **connection_kwargs) as conn:
1564
with conn.cursor() as cur:
1565
if replace:
1566
funcs, links = self._locate_app_functions(cur)
1567
for fname in funcs:
1568
cur.execute(f'DROP FUNCTION IF EXISTS {fname}')
1569
for link in links:
1570
cur.execute(f'DROP LINK {link}')
1571
for func in self.get_create_functions(replace=replace):
1572
cur.execute(func)
1573
1574
def drop_functions(
1575
self,
1576
*connection_args: Any,
1577
**connection_kwargs: Any,
1578
) -> None:
1579
"""
1580
Drop registered functions from database.
1581
1582
Parameters
1583
----------
1584
*connection_args : Any
1585
Database connection parameters
1586
**connection_kwargs : Any
1587
Database connection parameters
1588
1589
"""
1590
with connection.connect(*connection_args, **connection_kwargs) as conn:
1591
with conn.cursor() as cur:
1592
funcs, links = self._locate_app_functions(cur)
1593
for fname in funcs:
1594
cur.execute(f'DROP FUNCTION IF EXISTS {fname}')
1595
for link in links:
1596
cur.execute(f'DROP LINK {link}')
1597
1598
async def call(
1599
self,
1600
name: str,
1601
data_in: io.BytesIO,
1602
data_out: io.BytesIO,
1603
data_format: Optional[str] = None,
1604
data_version: Optional[str] = None,
1605
) -> None:
1606
"""
1607
Call a function in the application.
1608
1609
Parameters
1610
----------
1611
name : str
1612
Name of the function to call
1613
data_in : io.BytesIO
1614
The input data rows
1615
data_out : io.BytesIO
1616
The output data rows
1617
data_format : str, optional
1618
The format of the input and output data
1619
data_version : str, optional
1620
The version of the data format
1621
1622
"""
1623
data_format = data_format or self.data_format
1624
data_version = data_version or self.data_version
1625
1626
async def receive() -> Dict[str, Any]:
1627
return dict(body=data_in.read())
1628
1629
async def send(content: Dict[str, Any]) -> None:
1630
status = content.get('status', 200)
1631
if status != 200:
1632
raise KeyError(f'error occurred when calling `{name}`: {status}')
1633
data_out.write(content.get('body', b''))
1634
1635
accepts = dict(
1636
json=b'application/json',
1637
rowdat_1=b'application/octet-stream',
1638
arrow=b'application/vnd.apache.arrow.file',
1639
)
1640
1641
# Mock an ASGI scope
1642
scope = dict(
1643
type='http',
1644
path='invoke',
1645
method='POST',
1646
headers={
1647
b'content-type': accepts[data_format.lower()],
1648
b'accepts': accepts[data_format.lower()],
1649
b's2-ef-name': name.encode('utf-8'),
1650
b's2-ef-version': data_version.encode('utf-8'),
1651
b's2-ef-ignore-cancel': b'true',
1652
},
1653
)
1654
1655
await self(scope, receive, send)
1656
1657
def to_environment(
1658
self,
1659
name: str,
1660
destination: str = '.',
1661
version: Optional[str] = None,
1662
dependencies: Optional[List[str]] = None,
1663
authors: Optional[List[Dict[str, str]]] = None,
1664
maintainers: Optional[List[Dict[str, str]]] = None,
1665
description: Optional[str] = None,
1666
container_service: Optional[Dict[str, Any]] = None,
1667
external_function: Optional[Dict[str, Any]] = None,
1668
external_function_remote: Optional[Dict[str, Any]] = None,
1669
external_function_collocated: Optional[Dict[str, Any]] = None,
1670
overwrite: bool = False,
1671
) -> None:
1672
"""
1673
Convert application to an environment file.
1674
1675
Parameters
1676
----------
1677
name : str
1678
Name of the output environment
1679
destination : str, optional
1680
Location of the output file
1681
version : str, optional
1682
Version of the package
1683
dependencies : List[str], optional
1684
List of dependency specifications like in a requirements.txt file
1685
authors : List[Dict[str, Any]], optional
1686
Dictionaries of author information. Keys may include: email, name
1687
maintainers : List[Dict[str, Any]], optional
1688
Dictionaries of maintainer information. Keys may include: email, name
1689
description : str, optional
1690
Description of package
1691
container_service : Dict[str, Any], optional
1692
Container service specifications
1693
external_function : Dict[str, Any], optional
1694
External function specifications (applies to both remote and collocated)
1695
external_function_remote : Dict[str, Any], optional
1696
Remote external function specifications
1697
external_function_collocated : Dict[str, Any], optional
1698
Collocated external function specifications
1699
overwrite : bool, optional
1700
Should destination file be overwritten if it exists?
1701
1702
"""
1703
if not has_cloudpickle:
1704
raise RuntimeError('the cloudpicke package is required for this operation')
1705
1706
# Write to temporary location if a remote destination is specified
1707
tmpdir = None
1708
if destination.startswith('stage://'):
1709
tmpdir = tempfile.TemporaryDirectory()
1710
local_path = os.path.join(tmpdir.name, f'{name}.env')
1711
else:
1712
local_path = os.path.join(destination, f'{name}.env')
1713
if not overwrite and os.path.exists(local_path):
1714
raise OSError(f'path already exists: {local_path}')
1715
1716
with zipfile.ZipFile(local_path, mode='w') as z:
1717
# Write metadata
1718
z.writestr(
1719
'pyproject.toml', utils.to_toml({
1720
'project': dict(
1721
name=name,
1722
version=version,
1723
dependencies=dependencies,
1724
requires_python='== ' +
1725
'.'.join(str(x) for x in sys.version_info[:3]),
1726
authors=authors,
1727
maintainers=maintainers,
1728
description=description,
1729
),
1730
'tool.container-service': container_service,
1731
'tool.external-function': external_function,
1732
'tool.external-function.remote': external_function_remote,
1733
'tool.external-function.collocated': external_function_collocated,
1734
}),
1735
)
1736
1737
# Write Python package
1738
z.writestr(
1739
f'{name}/__init__.py',
1740
textwrap.dedent(f'''
1741
import pickle as _pkl
1742
globals().update(
1743
_pkl.loads({cloudpickle.dumps(self.external_functions)}),
1744
)
1745
__all__ = {list(self.external_functions.keys())}''').strip(),
1746
)
1747
1748
# Upload to Stage as needed
1749
if destination.startswith('stage://'):
1750
url = urllib.parse.urlparse(re.sub(r'/+$', r'', destination) + '/')
1751
if not url.path or url.path == '/':
1752
raise ValueError(f'no stage path was specified: {destination}')
1753
1754
mgr = manage_workspaces()
1755
if url.hostname:
1756
wsg = mgr.get_workspace_group(url.hostname)
1757
elif os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP'):
1758
wsg = mgr.get_workspace_group(
1759
os.environ['SINGLESTOREDB_WORKSPACE_GROUP'],
1760
)
1761
else:
1762
raise ValueError(f'no workspace group specified: {destination}')
1763
1764
# Make intermediate directories
1765
if url.path.count('/') > 1:
1766
wsg.stage.mkdirs(os.path.dirname(url.path))
1767
1768
wsg.stage.upload_file(
1769
local_path, url.path + f'{name}.env',
1770
overwrite=overwrite,
1771
)
1772
os.remove(local_path)
1773
1774
1775
def main(argv: Optional[List[str]] = None) -> None:
1776
"""
1777
Main program for HTTP-based Python UDFs
1778
1779
Parameters
1780
----------
1781
argv : List[str], optional
1782
List of command-line parameters
1783
1784
"""
1785
try:
1786
import uvicorn
1787
except ImportError:
1788
raise ImportError('the uvicorn package is required to run this command')
1789
1790
# Should we run in embedded mode (typically for Jupyter)
1791
try:
1792
asyncio.get_running_loop()
1793
use_async = True
1794
except RuntimeError:
1795
use_async = False
1796
1797
# Temporary directory for Stage environment files
1798
tmpdir = None
1799
1800
# Depending on whether we find an environment file specified, we
1801
# may have to process the command line twice.
1802
functions = []
1803
defaults: Dict[str, Any] = {}
1804
for i in range(2):
1805
1806
parser = argparse.ArgumentParser(
1807
prog='python -m singlestoredb.functions.ext.asgi',
1808
description='Run an HTTP-based Python UDF server',
1809
)
1810
parser.add_argument(
1811
'--url', metavar='url',
1812
default=defaults.get(
1813
'url',
1814
get_option('external_function.url'),
1815
),
1816
help='URL of the UDF server endpoint',
1817
)
1818
parser.add_argument(
1819
'--host', metavar='host',
1820
default=defaults.get(
1821
'host',
1822
get_option('external_function.host'),
1823
),
1824
help='bind socket to this host',
1825
)
1826
parser.add_argument(
1827
'--port', metavar='port', type=int,
1828
default=defaults.get(
1829
'port',
1830
get_option('external_function.port'),
1831
),
1832
help='bind socket to this port',
1833
)
1834
parser.add_argument(
1835
'--db', metavar='conn-str',
1836
default=defaults.get(
1837
'connection',
1838
get_option('external_function.connection'),
1839
),
1840
help='connection string to use for registering functions',
1841
)
1842
parser.add_argument(
1843
'--replace-existing', action='store_true',
1844
help='should existing functions of the same name '
1845
'in the database be replaced?',
1846
)
1847
parser.add_argument(
1848
'--data-format', metavar='format',
1849
default=defaults.get(
1850
'data_format',
1851
get_option('external_function.data_format'),
1852
),
1853
choices=['rowdat_1', 'json'],
1854
help='format of the data rows',
1855
)
1856
parser.add_argument(
1857
'--data-version', metavar='version',
1858
default=defaults.get(
1859
'data_version',
1860
get_option('external_function.data_version'),
1861
),
1862
help='version of the data row format',
1863
)
1864
parser.add_argument(
1865
'--link-name', metavar='name',
1866
default=defaults.get(
1867
'link_name',
1868
get_option('external_function.link_name'),
1869
) or '',
1870
help='name of the link to use for connections',
1871
)
1872
parser.add_argument(
1873
'--link-config', metavar='json',
1874
default=str(
1875
defaults.get(
1876
'link_config',
1877
get_option('external_function.link_config'),
1878
) or '{}',
1879
),
1880
help='link config in JSON format',
1881
)
1882
parser.add_argument(
1883
'--link-credentials', metavar='json',
1884
default=str(
1885
defaults.get(
1886
'link_credentials',
1887
get_option('external_function.link_credentials'),
1888
) or '{}',
1889
),
1890
help='link credentials in JSON format',
1891
)
1892
parser.add_argument(
1893
'--log-level', metavar='[info|debug|warning|error]',
1894
default=defaults.get(
1895
'log_level',
1896
get_option('external_function.log_level'),
1897
),
1898
help='logging level',
1899
)
1900
parser.add_argument(
1901
'--log-file', metavar='filepath',
1902
default=defaults.get(
1903
'log_file',
1904
get_option('external_function.log_file'),
1905
),
1906
help='File path to write logs to instead of console',
1907
)
1908
parser.add_argument(
1909
'--disable-metrics', action='store_true',
1910
default=defaults.get(
1911
'disable_metrics',
1912
get_option('external_function.disable_metrics'),
1913
),
1914
help='Disable logging of function call metrics',
1915
)
1916
parser.add_argument(
1917
'--name-prefix', metavar='name_prefix',
1918
default=defaults.get(
1919
'name_prefix',
1920
get_option('external_function.name_prefix'),
1921
),
1922
help='Prefix to add to function names',
1923
)
1924
parser.add_argument(
1925
'--name-suffix', metavar='name_suffix',
1926
default=defaults.get(
1927
'name_suffix',
1928
get_option('external_function.name_suffix'),
1929
),
1930
help='Suffix to add to function names',
1931
)
1932
parser.add_argument(
1933
'--function-database', metavar='function_database',
1934
default=defaults.get(
1935
'function_database',
1936
get_option('external_function.function_database'),
1937
),
1938
help='Database to use for the function definition',
1939
)
1940
parser.add_argument(
1941
'--app-name', metavar='app_name',
1942
default=defaults.get(
1943
'app_name',
1944
get_option('external_function.app_name'),
1945
),
1946
help='Name for the application instance',
1947
)
1948
parser.add_argument(
1949
'functions', metavar='module.or.func.path', nargs='*',
1950
help='functions or modules to export in UDF server',
1951
)
1952
1953
args = parser.parse_args(argv)
1954
1955
if i > 0:
1956
break
1957
1958
# Download Stage files as needed
1959
for i, f in enumerate(args.functions):
1960
if f.startswith('stage://'):
1961
url = urllib.parse.urlparse(f)
1962
if not url.path or url.path == '/':
1963
raise ValueError(f'no stage path was specified: {f}')
1964
if url.path.endswith('/'):
1965
raise ValueError(f'an environment file must be specified: {f}')
1966
1967
mgr = manage_workspaces()
1968
if url.hostname:
1969
wsg = mgr.get_workspace_group(url.hostname)
1970
elif os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP'):
1971
wsg = mgr.get_workspace_group(
1972
os.environ['SINGLESTOREDB_WORKSPACE_GROUP'],
1973
)
1974
else:
1975
raise ValueError(f'no workspace group specified: {f}')
1976
1977
if tmpdir is None:
1978
tmpdir = tempfile.TemporaryDirectory()
1979
1980
local_path = os.path.join(tmpdir.name, url.path.split('/')[-1])
1981
wsg.stage.download_file(url.path, local_path)
1982
args.functions[i] = local_path
1983
1984
elif f.startswith('http://') or f.startswith('https://'):
1985
if tmpdir is None:
1986
tmpdir = tempfile.TemporaryDirectory()
1987
1988
local_path = os.path.join(tmpdir.name, f.split('/')[-1])
1989
urllib.request.urlretrieve(f, local_path)
1990
args.functions[i] = local_path
1991
1992
# See if any of the args are zip files (assume they are environment files)
1993
modules = [(x, zipfile.is_zipfile(x)) for x in args.functions]
1994
envs = [x[0] for x in modules if x[1]]
1995
others = [x[0] for x in modules if not x[1]]
1996
1997
if envs and len(envs) > 1:
1998
raise RuntimeError('only one environment file may be specified')
1999
2000
if envs and others:
2001
raise RuntimeError('environment files and other modules can not be mixed.')
2002
2003
# See if an environment file was specified. If so, use those settings
2004
# as the defaults and reprocess command line.
2005
if envs:
2006
# Add pyproject.toml variables and redo command-line processing
2007
defaults = utils.read_config(
2008
envs[0],
2009
['tool.external-function', 'tool.external-function.remote'],
2010
)
2011
2012
# Load zip file as a module
2013
modname = os.path.splitext(os.path.basename(envs[0]))[0]
2014
zi = zipimport.zipimporter(envs[0])
2015
mod = zi.load_module(modname)
2016
if mod is None:
2017
raise RuntimeError(f'environment file could not be imported: {envs[0]}')
2018
functions = [mod]
2019
2020
if defaults:
2021
continue
2022
2023
args.functions = functions or args.functions or None
2024
args.replace_existing = args.replace_existing \
2025
or defaults.get('replace_existing') \
2026
or get_option('external_function.replace_existing')
2027
2028
# Substitute in host / port if specified
2029
if args.host != defaults.get('host') or args.port != defaults.get('port'):
2030
u = urllib.parse.urlparse(args.url)
2031
args.url = u._replace(netloc=f'{args.host}:{args.port}').geturl()
2032
2033
# Create application from functions / module
2034
app = Application(
2035
functions=args.functions,
2036
url=args.url,
2037
data_format=args.data_format,
2038
data_version=args.data_version,
2039
link_name=args.link_name or None,
2040
link_config=json.loads(args.link_config) or None,
2041
link_credentials=json.loads(args.link_credentials) or None,
2042
app_mode='remote',
2043
name_prefix=args.name_prefix,
2044
name_suffix=args.name_suffix,
2045
function_database=args.function_database or None,
2046
log_file=args.log_file,
2047
log_level=args.log_level,
2048
disable_metrics=args.disable_metrics,
2049
app_name=args.app_name,
2050
)
2051
2052
funcs = app.get_create_functions(replace=args.replace_existing)
2053
if not funcs:
2054
raise RuntimeError('no functions specified')
2055
2056
for f in funcs:
2057
app.logger.info(f)
2058
2059
try:
2060
if args.db:
2061
app.logger.info('Registering functions with database')
2062
app.register_functions(
2063
args.db,
2064
replace=args.replace_existing,
2065
)
2066
2067
app_args = {
2068
k: v for k, v in dict(
2069
host=args.host or None,
2070
port=args.port or None,
2071
log_level=args.log_level,
2072
lifespan='off',
2073
).items() if v is not None
2074
}
2075
2076
# Configure uvicorn logging to use JSON format matching Application's format
2077
app_args['log_config'] = app.get_uvicorn_log_config()
2078
2079
if use_async:
2080
asyncio.create_task(_run_uvicorn(uvicorn, app, app_args, db=args.db))
2081
else:
2082
uvicorn.run(app, **app_args)
2083
2084
finally:
2085
if not use_async and args.db:
2086
app.logger.info('Dropping functions from database')
2087
app.drop_functions(args.db)
2088
2089
2090
async def _run_uvicorn(
2091
uvicorn: Any,
2092
app: Any,
2093
app_args: Any,
2094
db: Optional[str] = None,
2095
) -> None:
2096
"""Run uvicorn server and clean up functions after shutdown."""
2097
await uvicorn.Server(uvicorn.Config(app, **app_args)).serve()
2098
if db:
2099
app.logger.info('Dropping functions from database')
2100
app.drop_functions(db)
2101
2102
2103
create_app = Application
2104
2105
2106
if __name__ == '__main__':
2107
try:
2108
main()
2109
except RuntimeError as exc:
2110
logger.error(str(exc))
2111
sys.exit(1)
2112
except KeyboardInterrupt:
2113
pass
2114
2115