Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/functions/ext/utils.py
469 views
1
#!/usr/bin/env python
2
import datetime
3
import json
4
import logging
5
import re
6
import sys
7
import zipfile
8
from copy import copy
9
from typing import Any
10
from typing import Dict
11
from typing import List
12
from typing import Union
13
14
try:
15
import tomllib
16
except ImportError:
17
import tomli as tomllib # type: ignore
18
19
try:
20
from uvicorn.logging import DefaultFormatter
21
22
except ImportError:
23
24
class DefaultFormatter(logging.Formatter): # type: ignore
25
26
def formatMessage(self, record: logging.LogRecord) -> str:
27
recordcopy = copy(record)
28
levelname = recordcopy.levelname
29
seperator = ' ' * (8 - len(recordcopy.levelname))
30
recordcopy.__dict__['levelprefix'] = levelname + ':' + seperator
31
return super().formatMessage(recordcopy)
32
33
34
class JSONFormatter(logging.Formatter):
35
"""Custom JSON formatter for structured logging."""
36
37
def format(self, record: logging.LogRecord) -> str:
38
# Create proper ISO timestamp with microseconds
39
timestamp = datetime.datetime.fromtimestamp(
40
record.created, tz=datetime.timezone.utc,
41
)
42
# Keep only 3 digits for milliseconds
43
iso_timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
44
45
log_entry = {
46
'timestamp': iso_timestamp,
47
'level': record.levelname,
48
'logger': record.name,
49
'message': record.getMessage(),
50
}
51
52
# Add extra fields if present
53
allowed_fields = [
54
'app_name', 'request_id', 'function_name',
55
'content_type', 'accepts', 'metrics',
56
]
57
for field in allowed_fields:
58
if hasattr(record, field):
59
log_entry[field] = getattr(record, field)
60
61
# Add exception info if present
62
if record.exc_info:
63
log_entry['exception'] = self.formatException(record.exc_info)
64
65
return json.dumps(log_entry)
66
67
68
def get_logger(name: str) -> logging.Logger:
69
"""Return a logger with JSON formatting."""
70
logger = logging.getLogger(name)
71
72
# Only configure if not already configured with JSON formatter
73
has_json_formatter = any(
74
isinstance(getattr(handler, 'formatter', None), JSONFormatter)
75
for handler in logger.handlers
76
)
77
78
if not logger.handlers or not has_json_formatter:
79
# Clear handlers only if we need to reconfigure
80
logger.handlers.clear()
81
handler = logging.StreamHandler()
82
formatter = JSONFormatter()
83
handler.setFormatter(formatter)
84
logger.addHandler(handler)
85
logger.setLevel(logging.INFO)
86
87
# Prevent propagation to avoid duplicate messages or different formatting
88
logger.propagate = False
89
90
return logger
91
92
93
def read_config(
94
archive: str,
95
keys: Union[str, List[str]],
96
config_file: str = 'pyproject.toml',
97
) -> Dict[str, Any]:
98
"""
99
Read a key from a Toml config file.
100
101
Parameters
102
----------
103
archive : str
104
Path to an environment file
105
keys : str or List[str]
106
Period-separated paths to the desired keys
107
config_file : str, optional
108
Name of the config file in the zip file
109
110
Returns
111
-------
112
Dict[str, Any]
113
114
"""
115
defaults = {}
116
keys = [keys] if isinstance(keys, str) else list(keys)
117
with zipfile.ZipFile(archive) as arc:
118
try:
119
orig_options = tomllib.loads(arc.read(config_file).decode('utf8'))
120
verify_python_version(orig_options)
121
for key in keys:
122
path = key.split('.')
123
options = orig_options
124
while path:
125
options = options.get(path.pop(0), {})
126
for k, v in options.items():
127
defaults[k.lower().replace('-', '_')] = v
128
except KeyError:
129
pass
130
return defaults
131
132
133
def verify_python_version(options: Dict[str, Any]) -> None:
134
"""Verify the version of Python matches the pyproject.toml requirement."""
135
requires_python = options.get('project', {}).get('requires_python', None)
136
if not requires_python:
137
return
138
139
m = re.match(r'\s*([<=>])+\s*((?:\d+\.)+\d+)\s*', requires_python)
140
if not m:
141
raise ValueError(f'python version string is not valid: {requires_python}')
142
143
operator = m.group(1)
144
version_info = tuple(int(x) for x in m.group(2))
145
146
if operator == '<=':
147
if not (sys.version_info <= version_info):
148
raise RuntimeError(
149
'python version is not compatible: ' +
150
f'{sys.version_info} > {m.group(2)}',
151
)
152
153
elif operator == '>=':
154
if not (sys.version_info >= version_info):
155
raise RuntimeError(
156
'python version is not compatible: ' +
157
f'{sys.version_info} < {m.group(2)}',
158
)
159
160
elif operator in ['==', '=']:
161
if not (sys.version_info == version_info):
162
raise RuntimeError(
163
'python version is not compatible: ' +
164
f'{sys.version_info} != {m.group(2)}',
165
)
166
167
elif operator == '>':
168
if not (sys.version_info > version_info):
169
raise RuntimeError(
170
'python version is not compatible: ' +
171
f'{sys.version_info} <= {m.group(2)}',
172
)
173
174
elif operator == '<':
175
if not (sys.version_info < version_info):
176
raise RuntimeError(
177
'python version is not compatible: ' +
178
f'{sys.version_info} >= {m.group(2)}',
179
)
180
181
else:
182
raise ValueError(f'invalid python_version operator: {operator}')
183
184
185
def to_toml(data: Dict[str, Any]) -> str:
186
"""Dump data to a pyproject.toml."""
187
out = []
188
for top_k, top_v in data.items():
189
if top_v is None:
190
continue
191
top_k = top_k.replace('_', '-')
192
out.append('')
193
out.append(f'[{top_k}]')
194
for k, v in top_v.items():
195
if v is None:
196
continue
197
k = k.replace('_', '-')
198
if isinstance(v, (tuple, list)):
199
out.append(f'{k} = [')
200
items = []
201
for item in v:
202
if item is None:
203
pass
204
elif isinstance(item, (tuple, list)):
205
items.append(f' {json.dumps(item)}')
206
elif isinstance(item, dict):
207
items.append(
208
re.sub(r'"([^"]+)":', r'\1 =', f' {json.dumps(item)}'),
209
)
210
else:
211
items.append(f' {json.dumps([item])[1:-1]}')
212
out.append(',\n'.join(items))
213
out.append(']')
214
elif isinstance(v, dict):
215
out.append(re.sub(r'"([^"]+)":', r'\1 =', f' {json.dumps(v)}'))
216
else:
217
out.append(f'{k} = {json.dumps([v])[1:-1]}')
218
return '\n'.join(out).strip()
219
220