Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/apps/_cloud_functions.py
469 views
1
import asyncio
2
import textwrap
3
import typing
4
5
from ._config import AppConfig
6
from ._connection_info import ConnectionInfo
7
from ._process import kill_process_by_port
8
9
if typing.TYPE_CHECKING:
10
from fastapi import FastAPI
11
from ._uvicorn_util import AwaitableUvicornServer
12
13
# Keep track of currently running server
14
_running_server: 'typing.Optional[AwaitableUvicornServer]' = None
15
16
17
async def run_function_app(
18
app: 'FastAPI',
19
log_level: str = 'error',
20
kill_existing_app_server: bool = True,
21
) -> ConnectionInfo:
22
global _running_server
23
from ._uvicorn_util import AwaitableUvicornServer
24
25
try:
26
import uvicorn
27
except ImportError:
28
raise ImportError('package uvicorn is required to run cloud functions')
29
try:
30
import fastapi
31
except ImportError:
32
raise ImportError('package fastapi is required to run cloud functions')
33
34
if not isinstance(app, fastapi.FastAPI):
35
raise TypeError('app is not an instance of FastAPI')
36
37
app_config = AppConfig.from_env()
38
39
if kill_existing_app_server:
40
# Shutdown the server gracefully if it was started by us.
41
# Since the uvicorn server doesn't start a new subprocess
42
# killing the process would result in kernel dying.
43
if _running_server is not None:
44
await _running_server.shutdown()
45
_running_server = None
46
47
# Kill if any other process is occupying the port
48
kill_process_by_port(app_config.listen_port)
49
50
# Add `GET /` route, used for liveness check
51
@app.get('/')
52
def ping() -> str:
53
return 'Success!'
54
55
app.root_path = app_config.base_path
56
57
config = uvicorn.Config(
58
app,
59
host='0.0.0.0',
60
port=app_config.listen_port,
61
log_level=log_level,
62
)
63
_running_server = AwaitableUvicornServer(config)
64
65
asyncio.create_task(_running_server.serve())
66
await _running_server.wait_for_startup()
67
68
connection_info = ConnectionInfo(app_config.base_url, app_config.token)
69
70
if app_config.running_interactively:
71
if app_config.is_gateway_enabled:
72
print(
73
'Cloud function available at '
74
f'{app_config.base_url}docs?authToken={app_config.token}',
75
)
76
else:
77
curl_header = f'-H "Authorization: Bearer {app_config.token}"'
78
curl_example = f'curl "{app_config.base_url}" {curl_header}'
79
print(
80
textwrap.dedent(f"""
81
Cloud function available at {app_config.base_url}
82
83
Auth Token: {app_config.token}
84
85
Curl example: {curl_example}
86
87
""").strip(),
88
)
89
90
return connection_info
91
92