Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/apps/_config.py
469 views
1
import os
2
from dataclasses import dataclass
3
from typing import Optional
4
5
6
@dataclass
7
class AppConfig:
8
listen_port: int
9
base_url: str
10
base_path: str
11
notebook_server_id: str
12
app_token: Optional[str]
13
user_token: Optional[str]
14
running_interactively: bool
15
is_gateway_enabled: bool
16
is_local_dev: bool
17
18
@staticmethod
19
def _read_variable(name: str) -> str:
20
value = os.environ.get(name)
21
if value is None:
22
raise RuntimeError(
23
f'Missing {name} environment variable. '
24
'Is the code running outside SingleStoreDB notebook environment?',
25
)
26
return value
27
28
@classmethod
29
def from_env(cls) -> 'AppConfig':
30
port = cls._read_variable('SINGLESTOREDB_APP_LISTEN_PORT')
31
base_url = cls._read_variable('SINGLESTOREDB_APP_BASE_URL')
32
base_path = cls._read_variable('SINGLESTOREDB_APP_BASE_PATH')
33
notebook_server_id = cls._read_variable('SINGLESTOREDB_NOTEBOOK_SERVER_ID')
34
is_local_dev_env_var = cls._read_variable('SINGLESTOREDB_IS_LOCAL_DEV')
35
36
workload_type = os.environ.get('SINGLESTOREDB_WORKLOAD_TYPE')
37
running_interactively = workload_type == 'InteractiveNotebook'
38
39
is_gateway_enabled = 'SINGLESTOREDB_NOVA_GATEWAY_ENDPOINT' in os.environ
40
41
app_token = os.environ.get('SINGLESTOREDB_APP_TOKEN')
42
user_token = os.environ.get('SINGLESTOREDB_USER_TOKEN')
43
44
# Make sure the required variables are present
45
# and present useful error message if not
46
if running_interactively:
47
if is_gateway_enabled:
48
app_token = cls._read_variable('SINGLESTOREDB_APP_TOKEN')
49
else:
50
user_token = cls._read_variable('SINGLESTOREDB_USER_TOKEN')
51
52
return cls(
53
listen_port=int(port),
54
base_url=base_url,
55
base_path=base_path,
56
notebook_server_id=notebook_server_id,
57
app_token=app_token,
58
user_token=user_token,
59
running_interactively=running_interactively,
60
is_gateway_enabled=is_gateway_enabled,
61
is_local_dev=is_local_dev_env_var == 'true',
62
)
63
64
@property
65
def token(self) -> Optional[str]:
66
"""
67
Returns None if running non-interactively
68
"""
69
if self.is_gateway_enabled:
70
return self.app_token
71
else:
72
return self.user_token
73
74