Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/apps/_python_udfs.py
469 views
1
import asyncio
2
import os
3
import typing
4
5
from ..functions.ext.asgi import Application
6
from ._config import AppConfig
7
from ._connection_info import UdfConnectionInfo
8
from ._process import kill_process_by_port
9
10
if typing.TYPE_CHECKING:
11
from ._uvicorn_util import AwaitableUvicornServer
12
13
# Keep track of currently running server
14
_running_server: 'typing.Optional[AwaitableUvicornServer]' = None
15
16
# Maximum number of UDFs allowed
17
MAX_UDFS_LIMIT = 10
18
19
20
async def run_udf_app(
21
log_level: str = 'error',
22
kill_existing_app_server: bool = True,
23
) -> UdfConnectionInfo:
24
global _running_server
25
from ._uvicorn_util import AwaitableUvicornServer
26
27
try:
28
import uvicorn
29
except ImportError:
30
raise ImportError('package uvicorn is required to run python udfs')
31
32
app_config = AppConfig.from_env()
33
34
if kill_existing_app_server:
35
# Shutdown the server gracefully if it was started by us.
36
# Since the uvicorn server doesn't start a new subprocess
37
# killing the process would result in kernel dying.
38
if _running_server is not None:
39
await _running_server.shutdown()
40
_running_server = None
41
42
# Kill if any other process is occupying the port
43
kill_process_by_port(app_config.listen_port)
44
45
base_url = generate_base_url(app_config)
46
47
udf_suffix = ''
48
if app_config.running_interactively:
49
udf_suffix = '_test'
50
app = Application(
51
url=base_url,
52
app_mode='managed',
53
name_suffix=udf_suffix,
54
log_level=log_level,
55
)
56
57
if not app.endpoints:
58
raise ValueError('You must define at least one function.')
59
if len(app.endpoints) > MAX_UDFS_LIMIT:
60
raise ValueError(
61
f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.',
62
)
63
64
config = uvicorn.Config(
65
app,
66
host='0.0.0.0',
67
port=app_config.listen_port,
68
log_config=app.get_uvicorn_log_config(),
69
)
70
71
# Register the functions only if the app is running interactively.
72
if app_config.running_interactively:
73
app.register_functions(replace=True)
74
75
_running_server = AwaitableUvicornServer(config)
76
asyncio.create_task(_running_server.serve())
77
await _running_server.wait_for_startup()
78
79
print(f'Python UDF registered at {base_url}')
80
81
return UdfConnectionInfo(base_url, app.get_function_info())
82
83
84
def generate_base_url(app_config: AppConfig) -> str:
85
if not app_config.is_gateway_enabled:
86
raise RuntimeError('Python UDFs are not available if Nova Gateway is not enabled')
87
88
if not app_config.running_interactively:
89
return app_config.base_url
90
91
# generate python udf endpoint for interactive notebooks
92
gateway_url = os.environ.get('SINGLESTOREDB_NOVA_GATEWAY_ENDPOINT')
93
if app_config.is_local_dev:
94
gateway_url = os.environ.get('SINGLESTOREDB_NOVA_GATEWAY_DEV_ENDPOINT')
95
if gateway_url is None:
96
raise RuntimeError(
97
'Missing SINGLESTOREDB_NOVA_GATEWAY_DEV_ENDPOINT environment variable.',
98
)
99
100
return f'{gateway_url}/pythonudfs/{app_config.notebook_server_id}/interactive/'
101
102