Path: blob/main/singlestoredb/functions/ext/asgi.py
469 views
#!/usr/bin/env python31"""2Web application for SingleStoreDB external functions.34This module supplies a function that can create web apps intended for use5with the external function feature of SingleStoreDB. The application6function is a standard ASGI <https://asgi.readthedocs.io/en/latest/index.html>7request handler for use with servers such as Uvicorn <https://www.uvicorn.org>.89An external function web application can be created using the `create_app`10function. By default, the exported Python functions are specified by11environment variables starting with SINGLESTOREDB_EXT_FUNCTIONS. See the12documentation in `create_app` for the full syntax. If the application is13created in Python code rather than from the command-line, exported14functions can be specified in the parameters.1516An example of starting a server is shown below.1718Example19-------20> SINGLESTOREDB_EXT_FUNCTIONS='myfuncs.[percentile_90,percentile_95]' \21python3 -m singlestoredb.functions.ext.asgi2223"""24import argparse25import asyncio26import contextvars27import dataclasses28import datetime29import functools30import importlib.util31import inspect32import io33import itertools34import json35import logging36import os37import re38import secrets39import sys40import tempfile41import textwrap42import threading43import time44import typing45import urllib46import uuid47import zipfile48import zipimport49from types import ModuleType50from typing import Any51from typing import Awaitable52from typing import Callable53from typing import Dict54from typing import Iterable55from typing import List56from typing import Optional57from typing import Sequence58from typing import Set59from typing import Tuple60from typing import Union6162from . import arrow63from . import json as jdata64from . import rowdat_165from . import utils66from ... import connection67from ... import manage_workspaces68from ...config import get_option69from ...mysql.constants import FIELD_TYPE as ft70from ..signature import get_signature71from ..signature import signature_to_sql72from ..typing import Masked73from ..typing import Table74from .timer import Timer75from singlestoredb.docstring.parser import parse76from singlestoredb.functions.dtypes import escape_name7778try:79import cloudpickle80has_cloudpickle = True81except ImportError:82has_cloudpickle = False8384try:85from pydantic import BaseModel86has_pydantic = True87except ImportError:88has_pydantic = False899091logger = utils.get_logger('singlestoredb.functions.ext.asgi')9293# If a number of processes is specified, create a pool of workers94num_processes = max(0, int(os.environ.get('SINGLESTOREDB_EXT_NUM_PROCESSES', 0)))95if num_processes > 1:96try:97from ray.util.multiprocessing import Pool98except ImportError:99from multiprocessing import Pool100func_map = Pool(num_processes).starmap101else:102func_map = itertools.starmap103104105async def to_thread(106func: Any, /, *args: Any, **kwargs: Dict[str, Any],107) -> Any:108loop = asyncio.get_running_loop()109ctx = contextvars.copy_context()110func_call = functools.partial(ctx.run, func, *args, **kwargs)111return await loop.run_in_executor(None, func_call)112113114# Use negative values to indicate unsigned ints / binary data / usec time precision115rowdat_1_type_map = {116'bool': ft.LONGLONG,117'int8': ft.LONGLONG,118'int16': ft.LONGLONG,119'int32': ft.LONGLONG,120'int64': ft.LONGLONG,121'uint8': -ft.LONGLONG,122'uint16': -ft.LONGLONG,123'uint32': -ft.LONGLONG,124'uint64': -ft.LONGLONG,125'float32': ft.DOUBLE,126'float64': ft.DOUBLE,127'str': ft.STRING,128'bytes': -ft.STRING,129}130131132def get_func_names(funcs: str) -> List[Tuple[str, str]]:133"""134Parse all function names from string.135136Parameters137----------138func_names : str139String containing one or more function names. The syntax is140as follows: [func-name-1@func-alias-1,func-name-2@func-alias-2,...].141The optional '@name' portion is an alias if you want the function142to be renamed.143144Returns145-------146List[Tuple[str]] : a list of tuples containing the names and aliases147of each function.148149"""150if funcs.startswith('['):151func_names = funcs.replace('[', '').replace(']', '').split(',')152func_names = [x.strip() for x in func_names]153else:154func_names = [funcs]155156out = []157for name in func_names:158alias = name159if '@' in name:160name, alias = name.split('@', 1)161out.append((name, alias))162163return out164165166def as_tuple(x: Any) -> Any:167"""Convert object to tuple."""168if has_pydantic and isinstance(x, BaseModel):169return tuple(x.model_dump().values())170if dataclasses.is_dataclass(x):171return dataclasses.astuple(x) # type: ignore172if isinstance(x, dict):173return tuple(x.values())174return tuple(x)175176177def as_list_of_tuples(x: Any) -> Any:178"""Convert object to a list of tuples."""179if isinstance(x, Table):180x = x[0]181if isinstance(x, (list, tuple)) and len(x) > 0:182if isinstance(x[0], (list, tuple)):183return x184if has_pydantic and isinstance(x[0], BaseModel):185return [tuple(y.model_dump().values()) for y in x]186if dataclasses.is_dataclass(x[0]):187return [dataclasses.astuple(y) for y in x]188if isinstance(x[0], dict):189return [tuple(y.values()) for y in x]190return [(y,) for y in x]191return x192193194def get_dataframe_columns(df: Any) -> List[Any]:195"""Return columns of data from a dataframe/table."""196if isinstance(df, Table):197if len(df) == 1:198df = df[0]199else:200return list(df)201202if isinstance(df, Masked):203return [df]204205if isinstance(df, tuple):206return list(df)207208rtype = str(type(df)).lower()209if 'dataframe' in rtype:210return [df[x] for x in df.columns]211elif 'table' in rtype:212return df.columns213elif 'series' in rtype:214return [df]215elif 'array' in rtype:216return [df]217elif 'tuple' in rtype:218return list(df)219220raise TypeError(221'Unsupported data type for dataframe columns: '222f'{rtype}',223)224225226def get_array_class(data_format: str) -> Callable[..., Any]:227"""228Get the array class for the current data format.229230"""231if data_format == 'polars':232import polars as pl233array_cls = pl.Series234elif data_format == 'arrow':235import pyarrow as pa236array_cls = pa.array237elif data_format == 'pandas':238import pandas as pd239array_cls = pd.Series240else:241import numpy as np242array_cls = np.array243return array_cls244245246def get_masked_params(func: Callable[..., Any]) -> List[bool]:247"""248Get the list of masked parameters for the function.249250Parameters251----------252func : Callable253The function to call as the endpoint254255Returns256-------257List[bool]258Boolean list of masked parameters259260"""261params = inspect.signature(func).parameters262return [typing.get_origin(x.annotation) is Masked for x in params.values()]263264265def build_tuple(x: Any) -> Any:266"""Convert object to tuple."""267return tuple(x) if isinstance(x, Masked) else (x, None)268269270def cancel_on_event(271cancel_event: threading.Event,272) -> None:273"""274Cancel the function call if the cancel event is set.275276Parameters277----------278cancel_event : threading.Event279The event to check for cancellation280281Raises282------283asyncio.CancelledError284If the cancel event is set285286"""287if cancel_event.is_set():288task = asyncio.current_task()289if task is not None:290task.cancel()291raise asyncio.CancelledError(292'Function call was cancelled by client',293)294295296def build_udf_endpoint(297func: Callable[..., Any],298returns_data_format: str,299) -> Callable[..., Any]:300"""301Build a UDF endpoint for scalar / list types (row-based).302303Parameters304----------305func : Callable306The function to call as the endpoint307returns_data_format : str308The format of the return values309310Returns311-------312Callable313The function endpoint314315"""316if returns_data_format in ['scalar', 'list']:317318is_async = asyncio.iscoroutinefunction(func)319320async def do_func(321cancel_event: threading.Event,322timer: Timer,323row_ids: Sequence[int],324rows: Sequence[Sequence[Any]],325) -> Tuple[Sequence[int], List[Tuple[Any, ...]]]:326'''Call function on given rows of data.'''327out = []328async with timer('call_function'):329for row in rows:330cancel_on_event(cancel_event)331if is_async:332out.append(await func(*row))333else:334out.append(func(*row))335return row_ids, list(zip(out))336337return do_func338339return build_vector_udf_endpoint(func, returns_data_format)340341342def build_vector_udf_endpoint(343func: Callable[..., Any],344returns_data_format: str,345) -> Callable[..., Any]:346"""347Build a UDF endpoint for vector formats (column-based).348349Parameters350----------351func : Callable352The function to call as the endpoint353returns_data_format : str354The format of the return values355356Returns357-------358Callable359The function endpoint360361"""362masks = get_masked_params(func)363array_cls = get_array_class(returns_data_format)364is_async = asyncio.iscoroutinefunction(func)365366async def do_func(367cancel_event: threading.Event,368timer: Timer,369row_ids: Sequence[int],370cols: Sequence[Tuple[Sequence[Any], Optional[Sequence[bool]]]],371) -> Tuple[372Sequence[int],373List[Tuple[Sequence[Any], Optional[Sequence[bool]]]],374]:375'''Call function on given columns of data.'''376row_ids = array_cls(row_ids)377378# Call the function with `cols` as the function parameters379async with timer('call_function'):380if cols and cols[0]:381if is_async:382out = await func(*[x if m else x[0] for x, m in zip(cols, masks)])383else:384out = func(*[x if m else x[0] for x, m in zip(cols, masks)])385else:386if is_async:387out = await func()388else:389out = func()390391cancel_on_event(cancel_event)392393# Single masked value394if isinstance(out, Masked):395return row_ids, [tuple(out)]396397# Multiple return values398if isinstance(out, tuple):399return row_ids, [build_tuple(x) for x in out]400401# Single return value402return row_ids, [(out, None)]403404return do_func405406407def build_tvf_endpoint(408func: Callable[..., Any],409returns_data_format: str,410) -> Callable[..., Any]:411"""412Build a TVF endpoint for scalar / list types (row-based).413414Parameters415----------416func : Callable417The function to call as the endpoint418returns_data_format : str419The format of the return values420421Returns422-------423Callable424The function endpoint425426"""427if returns_data_format in ['scalar', 'list']:428429is_async = asyncio.iscoroutinefunction(func)430431async def do_func(432cancel_event: threading.Event,433timer: Timer,434row_ids: Sequence[int],435rows: Sequence[Sequence[Any]],436) -> Tuple[Sequence[int], List[Tuple[Any, ...]]]:437'''Call function on given rows of data.'''438out_ids: List[int] = []439out = []440# Call function on each row of data441async with timer('call_function'):442for i, row in zip(row_ids, rows):443cancel_on_event(cancel_event)444if is_async:445res = await func(*row)446else:447res = func(*row)448out.extend(as_list_of_tuples(res))449out_ids.extend([row_ids[i]] * (len(out)-len(out_ids)))450return out_ids, out451452return do_func453454return build_vector_tvf_endpoint(func, returns_data_format)455456457def build_vector_tvf_endpoint(458func: Callable[..., Any],459returns_data_format: str,460) -> Callable[..., Any]:461"""462Build a TVF endpoint for vector formats (column-based).463464Parameters465----------466func : Callable467The function to call as the endpoint468returns_data_format : str469The format of the return values470471Returns472-------473Callable474The function endpoint475476"""477masks = get_masked_params(func)478array_cls = get_array_class(returns_data_format)479480async def do_func(481cancel_event: threading.Event,482timer: Timer,483row_ids: Sequence[int],484cols: Sequence[Tuple[Sequence[Any], Optional[Sequence[bool]]]],485) -> Tuple[486Sequence[int],487List[Tuple[Sequence[Any], Optional[Sequence[bool]]]],488]:489'''Call function on given columns of data.'''490# NOTE: There is no way to determine which row ID belongs to491# each result row, so we just have to use the same492# row ID for all rows in the result.493494is_async = asyncio.iscoroutinefunction(func)495496# Call function on each column of data497async with timer('call_function'):498if cols and cols[0]:499if is_async:500func_res = await func(501*[x if m else x[0] for x, m in zip(cols, masks)],502)503else:504func_res = func(505*[x if m else x[0] for x, m in zip(cols, masks)],506)507else:508if is_async:509func_res = await func()510else:511func_res = func()512513res = get_dataframe_columns(func_res)514515cancel_on_event(cancel_event)516517# Generate row IDs518if isinstance(res[0], Masked):519row_ids = array_cls([row_ids[0]] * len(res[0][0]))520else:521row_ids = array_cls([row_ids[0]] * len(res[0]))522523return row_ids, [build_tuple(x) for x in res]524525return do_func526527528def make_func(529name: str,530func: Callable[..., Any],531) -> Tuple[Callable[..., Any], Dict[str, Any]]:532"""533Make a function endpoint.534535Parameters536----------537name : str538Name of the function to create539func : Callable540The function to call as the endpoint541database : str, optional542The database to use for the function definition543544Returns545-------546(Callable, Dict[str, Any])547548"""549info: Dict[str, Any] = {}550551sig = get_signature(func, func_name=name)552553function_type = sig.get('function_type', 'udf')554args_data_format = sig.get('args_data_format', 'scalar')555returns_data_format = sig.get('returns_data_format', 'scalar')556timeout = (557func._singlestoredb_attrs.get('timeout') or # type: ignore558get_option('external_function.timeout')559)560561if function_type == 'tvf':562do_func = build_tvf_endpoint(func, returns_data_format)563else:564do_func = build_udf_endpoint(func, returns_data_format)565566do_func.__name__ = name567do_func.__doc__ = func.__doc__568569# Store signature for generating CREATE FUNCTION calls570info['signature'] = sig571572# Set data format573info['args_data_format'] = args_data_format574info['returns_data_format'] = returns_data_format575576# Set function type577info['function_type'] = function_type578579# Set timeout580info['timeout'] = max(timeout, 1)581582# Set async flag583info['is_async'] = asyncio.iscoroutinefunction(func)584585# Setup argument types for rowdat_1 parser586colspec = []587for x in sig['args']:588dtype = x['dtype'].replace('?', '')589if dtype not in rowdat_1_type_map:590raise TypeError(f'no data type mapping for {dtype}')591colspec.append((x['name'], rowdat_1_type_map[dtype]))592info['colspec'] = colspec593594# Setup return type595returns = []596for x in sig['returns']:597dtype = x['dtype'].replace('?', '')598if dtype not in rowdat_1_type_map:599raise TypeError(f'no data type mapping for {dtype}')600returns.append((x['name'], rowdat_1_type_map[dtype]))601info['returns'] = returns602603return do_func, info604605606async def cancel_on_timeout(timeout: int) -> None:607"""Cancel request if it takes too long."""608await asyncio.sleep(timeout)609raise asyncio.CancelledError(610'Function call was cancelled due to timeout',611)612613614async def cancel_on_disconnect(615receive: Callable[..., Awaitable[Any]],616) -> None:617"""Cancel request if client disconnects."""618while True:619message = await receive()620if message.get('type', '') == 'http.disconnect':621raise asyncio.CancelledError(622'Function call was cancelled by client',623)624625626async def cancel_all_tasks(tasks: Iterable[asyncio.Task[Any]]) -> None:627"""Cancel all tasks."""628for task in tasks:629task.cancel()630await asyncio.gather(*tasks, return_exceptions=True)631632633def start_counter() -> float:634"""Start a timer and return the start time."""635return time.perf_counter()636637638def end_counter(start: float) -> float:639"""End a timer and return the elapsed time."""640return time.perf_counter() - start641642643class Application(object):644"""645Create an external function application.646647If `functions` is None, the environment is searched for function648specifications in variables starting with `SINGLESTOREDB_EXT_FUNCTIONS`.649Any number of environment variables can be specified as long as they650have this prefix. The format of the environment variable value is the651same as for the `functions` parameter.652653Parameters654----------655functions : str or Iterable[str], optional656Python functions are specified using a string format as follows:657* Single function : <pkg1>.<func1>658* Multiple functions : <pkg1>.[<func1-name,func2-name,...]659* Function aliases : <pkg1>.[<func1@alias1,func2@alias2,...]660* Multiple packages : <pkg1>.<func1>:<pkg2>.<func2>661app_mode : str, optional662The mode of operation for the application: remote, managed, or collocated663url : str, optional664The URL of the function API665data_format : str, optional666The format of the data rows: 'rowdat_1' or 'json'667data_version : str, optional668The version of the call format to expect: '1.0'669link_name : str, optional670The link name to use for the external function application. This is671only for pre-existing links, and can only be used without672``link_config`` and ``link_credentials``.673link_config : Dict[str, Any], optional674The CONFIG section of a LINK definition. This dictionary gets675converted to JSON for the CREATE LINK call.676link_credentials : Dict[str, Any], optional677The CREDENTIALS section of a LINK definition. This dictionary gets678converted to JSON for the CREATE LINK call.679name_prefix : str, optional680Prefix to add to function names when registering with the database681name_suffix : str, optional682Suffix to add to function names when registering with the database683function_database : str, optional684The database to use for external function definitions.685log_file : str, optional686File path to write logs to instead of console. If None, logs are687written to console. When specified, application logger handlers688are replaced with a file handler.689log_level : str, optional690Logging level for the application logger. Valid values are 'info',691'debug', 'warning', 'error'. Defaults to 'info'.692disable_metrics : bool, optional693Disable logging of function call metrics. Defaults to False.694app_name : str, optional695Name for the application instance. Used to create a logger-specific696name. If not provided, a random name will be generated.697698"""699700# Plain text response start701text_response_dict: Dict[str, Any] = dict(702type='http.response.start',703status=200,704headers=[(b'content-type', b'text/plain')],705)706707# Error response start708error_response_dict: Dict[str, Any] = dict(709type='http.response.start',710status=401,711headers=[(b'content-type', b'text/plain')],712)713714# JSON response start715json_response_dict: Dict[str, Any] = dict(716type='http.response.start',717status=200,718headers=[(b'content-type', b'application/json')],719)720721# ROWDAT_1 response start722rowdat_1_response_dict: Dict[str, Any] = dict(723type='http.response.start',724status=200,725headers=[(b'content-type', b'x-application/rowdat_1')],726)727728# Apache Arrow response start729arrow_response_dict: Dict[str, Any] = dict(730type='http.response.start',731status=200,732headers=[(b'content-type', b'application/vnd.apache.arrow.file')],733)734735# Path not found response start736path_not_found_response_dict: Dict[str, Any] = dict(737type='http.response.start',738status=404,739)740741# Response body template742body_response_dict: Dict[str, Any] = dict(743type='http.response.body',744)745746# Data format + version handlers747handlers = {748(b'application/octet-stream', b'1.0', 'scalar'): dict(749load=rowdat_1.load,750dump=rowdat_1.dump,751response=rowdat_1_response_dict,752),753(b'application/octet-stream', b'1.0', 'list'): dict(754load=rowdat_1.load,755dump=rowdat_1.dump,756response=rowdat_1_response_dict,757),758(b'application/octet-stream', b'1.0', 'pandas'): dict(759load=rowdat_1.load_pandas,760dump=rowdat_1.dump_pandas,761response=rowdat_1_response_dict,762),763(b'application/octet-stream', b'1.0', 'numpy'): dict(764load=rowdat_1.load_numpy,765dump=rowdat_1.dump_numpy,766response=rowdat_1_response_dict,767),768(b'application/octet-stream', b'1.0', 'polars'): dict(769load=rowdat_1.load_polars,770dump=rowdat_1.dump_polars,771response=rowdat_1_response_dict,772),773(b'application/octet-stream', b'1.0', 'arrow'): dict(774load=rowdat_1.load_arrow,775dump=rowdat_1.dump_arrow,776response=rowdat_1_response_dict,777),778(b'application/json', b'1.0', 'scalar'): dict(779load=jdata.load,780dump=jdata.dump,781response=json_response_dict,782),783(b'application/json', b'1.0', 'list'): dict(784load=jdata.load,785dump=jdata.dump,786response=json_response_dict,787),788(b'application/json', b'1.0', 'pandas'): dict(789load=jdata.load_pandas,790dump=jdata.dump_pandas,791response=json_response_dict,792),793(b'application/json', b'1.0', 'numpy'): dict(794load=jdata.load_numpy,795dump=jdata.dump_numpy,796response=json_response_dict,797),798(b'application/json', b'1.0', 'polars'): dict(799load=jdata.load_polars,800dump=jdata.dump_polars,801response=json_response_dict,802),803(b'application/json', b'1.0', 'arrow'): dict(804load=jdata.load_arrow,805dump=jdata.dump_arrow,806response=json_response_dict,807),808(b'application/vnd.apache.arrow.file', b'1.0', 'scalar'): dict(809load=arrow.load,810dump=arrow.dump,811response=arrow_response_dict,812),813(b'application/vnd.apache.arrow.file', b'1.0', 'pandas'): dict(814load=arrow.load_pandas,815dump=arrow.dump_pandas,816response=arrow_response_dict,817),818(b'application/vnd.apache.arrow.file', b'1.0', 'numpy'): dict(819load=arrow.load_numpy,820dump=arrow.dump_numpy,821response=arrow_response_dict,822),823(b'application/vnd.apache.arrow.file', b'1.0', 'polars'): dict(824load=arrow.load_polars,825dump=arrow.dump_polars,826response=arrow_response_dict,827),828(b'application/vnd.apache.arrow.file', b'1.0', 'arrow'): dict(829load=arrow.load_arrow,830dump=arrow.dump_arrow,831response=arrow_response_dict,832),833}834835# Valid URL paths836invoke_path = ('invoke',)837show_create_function_path = ('show', 'create_function')838show_function_info_path = ('show', 'function_info')839status = ('status',)840841def __init__(842self,843functions: Optional[844Union[845str,846Iterable[str],847Callable[..., Any],848Iterable[Callable[..., Any]],849ModuleType,850Iterable[ModuleType],851]852] = None,853app_mode: str = get_option('external_function.app_mode'),854url: str = get_option('external_function.url'),855data_format: str = get_option('external_function.data_format'),856data_version: str = get_option('external_function.data_version'),857link_name: Optional[str] = get_option('external_function.link_name'),858link_config: Optional[Dict[str, Any]] = None,859link_credentials: Optional[Dict[str, Any]] = None,860name_prefix: str = get_option('external_function.name_prefix'),861name_suffix: str = get_option('external_function.name_suffix'),862function_database: Optional[str] = None,863log_file: Optional[str] = get_option('external_function.log_file'),864log_level: str = get_option('external_function.log_level'),865disable_metrics: bool = get_option('external_function.disable_metrics'),866app_name: Optional[str] = get_option('external_function.app_name'),867) -> None:868if link_name and (link_config or link_credentials):869raise ValueError(870'`link_name` can not be used with `link_config` or `link_credentials`',871)872873if link_config is None:874link_config = json.loads(875get_option('external_function.link_config') or '{}',876) or None877878if link_credentials is None:879link_credentials = json.loads(880get_option('external_function.link_credentials') or '{}',881) or None882883# Generate application name if not provided884if app_name is None:885app_name = f'udf_app_{secrets.token_hex(4)}'886887self.name = app_name888889# Create logger instance specific to this application890self.logger = utils.get_logger(f'singlestoredb.functions.ext.asgi.{self.name}')891892# List of functions specs893specs: List[Union[str, Callable[..., Any], ModuleType]] = []894895# Look up Python function specifications896if functions is None:897env_vars = [898x for x in os.environ.keys()899if x.startswith('SINGLESTOREDB_EXT_FUNCTIONS')900]901if env_vars:902specs = [os.environ[x] for x in env_vars]903else:904import __main__905specs = [__main__]906907elif isinstance(functions, ModuleType):908specs = [functions]909910elif isinstance(functions, str):911specs = [functions]912913elif callable(functions):914specs = [functions]915916else:917specs = list(functions)918919# Add functions to application920endpoints = dict()921external_functions = dict()922for funcs in itertools.chain(specs):923924if isinstance(funcs, str):925# Module name926if importlib.util.find_spec(funcs) is not None:927items = importlib.import_module(funcs)928for x in vars(items).values():929if not hasattr(x, '_singlestoredb_attrs'):930continue931name = x._singlestoredb_attrs.get('name', x.__name__)932name = f'{name_prefix}{name}{name_suffix}'933external_functions[x.__name__] = x934func, info = make_func(name, x)935endpoints[name.encode('utf-8')] = func, info936937# Fully qualified function name938elif '.' in funcs:939pkg_path, func_names = funcs.rsplit('.', 1)940pkg = importlib.import_module(pkg_path)941942if pkg is None:943raise RuntimeError(f'Could not locate module: {pkg}')944945# Add endpoint for each exported function946for name, alias in get_func_names(func_names):947item = getattr(pkg, name)948alias = f'{name_prefix}{name}{name_suffix}'949external_functions[name] = item950func, info = make_func(alias, item)951endpoints[alias.encode('utf-8')] = func, info952953else:954raise RuntimeError(f'Could not locate module: {funcs}')955956elif isinstance(funcs, ModuleType):957for x in vars(funcs).values():958if not hasattr(x, '_singlestoredb_attrs'):959continue960name = x._singlestoredb_attrs.get('name', x.__name__)961name = f'{name_prefix}{name}{name_suffix}'962external_functions[x.__name__] = x963func, info = make_func(name, x)964endpoints[name.encode('utf-8')] = func, info965966else:967alias = funcs.__name__968external_functions[funcs.__name__] = funcs969alias = f'{name_prefix}{alias}{name_suffix}'970func, info = make_func(alias, funcs)971endpoints[alias.encode('utf-8')] = func, info972973self.app_mode = app_mode974self.url = url975self.data_format = data_format976self.data_version = data_version977self.link_name = link_name978self.link_config = link_config979self.link_credentials = link_credentials980self.endpoints = endpoints981self.external_functions = external_functions982self.function_database = function_database983self.log_file = log_file984self.log_level = log_level985self.disable_metrics = disable_metrics986987# Configure logging988self._configure_logging()989990def _configure_logging(self) -> None:991"""Configure logging based on the log_file settings."""992# Set logger level993self.logger.setLevel(getattr(logging, self.log_level.upper()))994995# Remove all existing handlers to ensure clean configuration996self.logger.handlers.clear()997998# Configure log file if specified999if self.log_file:1000# Create file handler1001file_handler = logging.FileHandler(self.log_file)1002file_handler.setLevel(getattr(logging, self.log_level.upper()))10031004# Use JSON formatter for file logging1005formatter = utils.JSONFormatter()1006file_handler.setFormatter(formatter)10071008# Add the handler to the logger1009self.logger.addHandler(file_handler)1010else:1011# For console logging, create a new stream handler with JSON formatter1012console_handler = logging.StreamHandler()1013console_handler.setLevel(getattr(logging, self.log_level.upper()))1014console_handler.setFormatter(utils.JSONFormatter())1015self.logger.addHandler(console_handler)10161017# Prevent propagation to avoid duplicate or differently formatted messages1018self.logger.propagate = False10191020def get_uvicorn_log_config(self) -> Dict[str, Any]:1021"""1022Create uvicorn log config that matches the Application's logging format.10231024This method returns the log configuration used by uvicorn, allowing external1025users to match the logging format of the Application class.10261027Returns1028-------1029Dict[str, Any]1030Log configuration dictionary compatible with uvicorn's log_config parameter10311032"""1033log_config = {1034'version': 1,1035'disable_existing_loggers': False,1036'formatters': {1037'json': {1038'()': 'singlestoredb.functions.ext.utils.JSONFormatter',1039},1040},1041'handlers': {1042'default': {1043'class': (1044'logging.FileHandler' if self.log_file1045else 'logging.StreamHandler'1046),1047'formatter': 'json',1048},1049},1050'loggers': {1051'uvicorn': {1052'handlers': ['default'],1053'level': self.log_level.upper(),1054'propagate': False,1055},1056'uvicorn.error': {1057'handlers': ['default'],1058'level': self.log_level.upper(),1059'propagate': False,1060},1061'uvicorn.access': {1062'handlers': ['default'],1063'level': self.log_level.upper(),1064'propagate': False,1065},1066},1067}10681069# Add filename to file handler if log file is specified1070if self.log_file:1071log_config['handlers']['default']['filename'] = self.log_file # type: ignore10721073return log_config10741075async def __call__(1076self,1077scope: Dict[str, Any],1078receive: Callable[..., Awaitable[Any]],1079send: Callable[..., Awaitable[Any]],1080) -> None:1081'''1082Application request handler.10831084Parameters1085----------1086scope : dict1087ASGI request scope1088receive : Callable1089Function to receieve request information1090send : Callable1091Function to send response information10921093'''1094request_id = str(uuid.uuid4())10951096timer = Timer(1097app_name=self.name,1098id=request_id,1099timestamp=datetime.datetime.now(1100datetime.timezone.utc,1101).strftime('%Y-%m-%dT%H:%M:%S.%fZ'),1102)1103call_timer = Timer(1104app_name=self.name,1105id=request_id,1106timestamp=datetime.datetime.now(1107datetime.timezone.utc,1108).strftime('%Y-%m-%dT%H:%M:%S.%fZ'),1109)11101111if scope['type'] != 'http':1112raise ValueError(f"Expected HTTP scope, got {scope['type']}")11131114method = scope['method']1115path = tuple(x for x in scope['path'].split('/') if x)1116headers = dict(scope['headers'])11171118content_type = headers.get(1119b'content-type',1120b'application/octet-stream',1121)1122accepts = headers.get(b'accepts', content_type)1123func_name = headers.get(b's2-ef-name', b'')1124func_endpoint = self.endpoints.get(func_name)1125ignore_cancel = headers.get(b's2-ef-ignore-cancel', b'false') == b'true'11261127timer.metadata['function'] = func_name.decode('utf-8') if func_name else ''1128call_timer.metadata['function'] = timer.metadata['function']11291130func = None1131func_info: Dict[str, Any] = {}1132if func_endpoint is not None:1133func, func_info = func_endpoint11341135# Call the endpoint1136if method == 'POST' and func is not None and path == self.invoke_path:11371138self.logger.info(1139'Function call initiated',1140extra={1141'app_name': self.name,1142'request_id': request_id,1143'function_name': func_name.decode('utf-8'),1144'content_type': content_type.decode('utf-8'),1145'accepts': accepts.decode('utf-8'),1146},1147)11481149args_data_format = func_info['args_data_format']1150returns_data_format = func_info['returns_data_format']1151data = []1152more_body = True1153with timer('receive_data'):1154while more_body:1155request = await receive()1156if request.get('type', '') == 'http.disconnect':1157raise RuntimeError('client disconnected')1158data.append(request['body'])1159more_body = request.get('more_body', False)11601161data_version = headers.get(b's2-ef-version', b'')1162input_handler = self.handlers[(content_type, data_version, args_data_format)]1163output_handler = self.handlers[(accepts, data_version, returns_data_format)]11641165try:1166all_tasks = []1167result = []11681169cancel_event = threading.Event()11701171with timer('parse_input'):1172inputs = input_handler['load']( # type: ignore1173func_info['colspec'], b''.join(data),1174)11751176func_task = asyncio.create_task(1177func(cancel_event, call_timer, *inputs)1178if func_info['is_async']1179else to_thread(1180lambda: asyncio.run(1181func(cancel_event, call_timer, *inputs),1182),1183),1184)1185disconnect_task = asyncio.create_task(1186asyncio.sleep(int(1e9))1187if ignore_cancel else cancel_on_disconnect(receive),1188)1189timeout_task = asyncio.create_task(1190cancel_on_timeout(func_info['timeout']),1191)11921193all_tasks += [func_task, disconnect_task, timeout_task]11941195async with timer('function_wrapper'):1196done, pending = await asyncio.wait(1197all_tasks, return_when=asyncio.FIRST_COMPLETED,1198)11991200await cancel_all_tasks(pending)12011202for task in done:1203if task is disconnect_task:1204cancel_event.set()1205raise asyncio.CancelledError(1206'Function call was cancelled by client disconnect',1207)12081209elif task is timeout_task:1210cancel_event.set()1211raise asyncio.TimeoutError(1212'Function call was cancelled due to timeout',1213)12141215elif task is func_task:1216result.extend(task.result())12171218with timer('format_output'):1219body = output_handler['dump'](1220[x[1] for x in func_info['returns']], *result, # type: ignore1221)12221223await send(output_handler['response'])12241225except asyncio.TimeoutError:1226self.logger.exception(1227'Function call timeout',1228extra={1229'app_name': self.name,1230'request_id': request_id,1231'function_name': func_name.decode('utf-8'),1232'timeout': func_info['timeout'],1233},1234)1235body = (1236'[TimeoutError] Function call timed out after ' +1237str(func_info['timeout']) +1238' seconds'1239).encode('utf-8')1240await send(self.error_response_dict)12411242except asyncio.CancelledError:1243self.logger.exception(1244'Function call cancelled',1245extra={1246'app_name': self.name,1247'request_id': request_id,1248'function_name': func_name.decode('utf-8'),1249},1250)1251body = b'[CancelledError] Function call was cancelled'1252await send(self.error_response_dict)12531254except Exception as e:1255self.logger.exception(1256'Function call error',1257extra={1258'app_name': self.name,1259'request_id': request_id,1260'function_name': func_name.decode('utf-8'),1261'exception_type': type(e).__name__,1262},1263)1264body = f'[{type(e).__name__}] {str(e).strip()}'.encode('utf-8')1265await send(self.error_response_dict)12661267finally:1268await cancel_all_tasks(all_tasks)12691270# Handle api reflection1271elif method == 'GET' and path == self.show_create_function_path:1272host = headers.get(b'host', b'localhost:80')1273reflected_url = f'{scope["scheme"]}://{host.decode("utf-8")}/invoke'12741275syntax = []1276for key, (endpoint, endpoint_info) in self.endpoints.items():1277if not func_name or key == func_name:1278syntax.append(1279signature_to_sql(1280endpoint_info['signature'],1281url=self.url or reflected_url,1282data_format=self.data_format,1283database=self.function_database or None,1284),1285)1286body = '\n'.join(syntax).encode('utf-8')12871288await send(self.text_response_dict)12891290# Return function info1291elif method == 'GET' and (path == self.show_function_info_path or not path):1292functions = self.get_function_info()1293body = json.dumps(dict(functions=functions)).encode('utf-8')1294await send(self.text_response_dict)12951296# Return status1297elif method == 'GET' and path == self.status:1298body = json.dumps(dict(status='ok')).encode('utf-8')1299await send(self.text_response_dict)13001301# Path not found1302else:1303body = b''1304await send(self.path_not_found_response_dict)13051306# Send body1307with timer('send_response'):1308out = self.body_response_dict.copy()1309out['body'] = body1310await send(out)13111312for k, v in call_timer.metrics.items():1313timer.metrics[k] = v13141315if not self.disable_metrics:1316metrics = timer.finish()1317self.logger.info(1318'Function call metrics',1319extra={1320'app_name': self.name,1321'request_id': request_id,1322'function_name': timer.metadata.get('function', ''),1323'metrics': metrics,1324},1325)13261327def _create_link(1328self,1329config: Optional[Dict[str, Any]],1330credentials: Optional[Dict[str, Any]],1331) -> Tuple[str, str]:1332"""Generate CREATE LINK command."""1333if self.link_name:1334return self.link_name, ''13351336if not config and not credentials:1337return '', ''13381339link_name = f'py_ext_func_link_{secrets.token_hex(14)}'1340out = [f'CREATE LINK {link_name} AS HTTP']13411342if config:1343out.append(f"CONFIG '{json.dumps(config)}'")13441345if credentials:1346out.append(f"CREDENTIALS '{json.dumps(credentials)}'")13471348return link_name, ' '.join(out) + ';'13491350def _locate_app_functions(self, cur: Any) -> Tuple[Set[str], Set[str]]:1351"""Locate all current functions and links belonging to this app."""1352funcs, links = set(), set()1353if self.function_database:1354database_prefix = escape_name(self.function_database) + '.'1355cur.execute(f'SHOW FUNCTIONS IN {escape_name(self.function_database)}')1356else:1357database_prefix = ''1358cur.execute('SHOW FUNCTIONS')13591360for row in list(cur):1361name, ftype, link = row[0], row[1], row[-1]1362# Only look at external functions1363if 'external' not in ftype.lower():1364continue1365# See if function URL matches url1366cur.execute(f'SHOW CREATE FUNCTION {database_prefix}{escape_name(name)}')1367for fname, _, code, *_ in list(cur):1368m = re.search(r" (?:\w+) (?:SERVICE|MANAGED) '([^']+)'", code)1369if m and m.group(1) == self.url:1370funcs.add(f'{database_prefix}{escape_name(fname)}')1371if link and re.match(r'^py_ext_func_link_\S{14}$', link):1372links.add(link)13731374return funcs, links13751376def get_function_info(1377self,1378func_name: Optional[str] = None,1379) -> Dict[str, Any]:1380"""1381Return the functions and function signature information.13821383Returns1384-------1385Dict[str, Any]13861387"""1388functions = {}1389no_default = object()13901391# Generate CREATE FUNCTION SQL for each function using get_create_functions1392create_sqls = self.get_create_functions(replace=True)1393sql_map = {}1394for (_, info), sql in zip(self.endpoints.values(), create_sqls):1395sig = info['signature']1396sql_map[sig['name']] = sql13971398for key, (func, info) in self.endpoints.items():1399# Get info from docstring1400doc_summary = ''1401doc_long_description = ''1402doc_params = {}1403doc_returns = None1404doc_examples = []1405if func.__doc__:1406try:1407docs = parse(func.__doc__)1408doc_params = {p.arg_name: p for p in docs.params}1409doc_returns = docs.returns1410if not docs.short_description and docs.long_description:1411doc_summary = docs.long_description or ''1412else:1413doc_summary = docs.short_description or ''1414doc_long_description = docs.long_description or ''1415for ex in docs.examples:1416ex_dict: Dict[str, Any] = {1417'description': None,1418'code': None,1419'output': None,1420}1421if ex.description:1422ex_dict['description'] = ex.description1423if ex.snippet:1424code, output = [], []1425for line in ex.snippet.split('\n'):1426line = line.rstrip()1427if re.match(r'^(\w+>|>>>|\.\.\.)', line):1428code.append(line)1429else:1430output.append(line)1431ex_dict['code'] = '\n'.join(code) or None1432ex_dict['output'] = '\n'.join(output) or None1433if ex.post_snippet:1434ex_dict['postscript'] = ex.post_snippet1435doc_examples.append(ex_dict)14361437except Exception as e:1438self.logger.warning(1439'Could not parse docstring for function',1440extra={1441'app_name': self.name,1442'function_name': key.decode('utf-8'),1443'error': str(e),1444},1445)14461447if not func_name or key == func_name:1448sig = info['signature']1449args = []14501451# Function arguments1452for i, a in enumerate(sig.get('args', [])):1453name = a['name']1454dtype = a['dtype']1455nullable = '?' in dtype1456args.append(1457dict(1458name=name,1459dtype=dtype.replace('?', ''),1460nullable=nullable,1461description=(doc_params[name].description or '')1462if name in doc_params else '',1463),1464)1465if a.get('default', no_default) is not no_default:1466args[-1]['default'] = a['default']14671468# Return values1469ret = sig.get('returns', [])1470returns = []14711472for a in ret:1473dtype = a['dtype']1474nullable = '?' in dtype1475returns.append(1476dict(1477dtype=dtype.replace('?', ''),1478nullable=nullable,1479description=doc_returns.description1480if doc_returns else '',1481),1482)1483if a.get('name', None):1484returns[-1]['name'] = a['name']1485if a.get('default', no_default) is not no_default:1486returns[-1]['default'] = a['default']14871488sql = sql_map.get(sig['name'], '')1489functions[sig['name']] = dict(1490args=args,1491returns=returns,1492function_type=info['function_type'],1493sql_statement=sql,1494summary=doc_summary,1495long_description=doc_long_description,1496examples=doc_examples,1497)14981499return functions15001501def get_create_functions(1502self,1503replace: bool = False,1504) -> List[str]:1505"""1506Generate CREATE FUNCTION code for all functions.15071508Parameters1509----------1510replace : bool, optional1511Should existing functions be replaced?15121513Returns1514-------1515List[str]15161517"""1518if not self.endpoints:1519return []15201521out = []1522link = ''1523if self.app_mode.lower() == 'remote':1524link, link_str = self._create_link(self.link_config, self.link_credentials)1525if link and link_str:1526out.append(link_str)15271528for key, (endpoint, endpoint_info) in self.endpoints.items():1529out.append(1530signature_to_sql(1531endpoint_info['signature'],1532url=self.url,1533data_format=self.data_format,1534app_mode=self.app_mode,1535replace=replace,1536link=link or None,1537database=self.function_database or None,1538),1539)15401541return out15421543def register_functions(1544self,1545*connection_args: Any,1546replace: bool = False,1547**connection_kwargs: Any,1548) -> None:1549"""1550Register functions with the database.15511552Parameters1553----------1554*connection_args : Any1555Database connection parameters1556replace : bool, optional1557Should existing functions be replaced?1558**connection_kwargs : Any1559Database connection parameters15601561"""1562with connection.connect(*connection_args, **connection_kwargs) as conn:1563with conn.cursor() as cur:1564if replace:1565funcs, links = self._locate_app_functions(cur)1566for fname in funcs:1567cur.execute(f'DROP FUNCTION IF EXISTS {fname}')1568for link in links:1569cur.execute(f'DROP LINK {link}')1570for func in self.get_create_functions(replace=replace):1571cur.execute(func)15721573def drop_functions(1574self,1575*connection_args: Any,1576**connection_kwargs: Any,1577) -> None:1578"""1579Drop registered functions from database.15801581Parameters1582----------1583*connection_args : Any1584Database connection parameters1585**connection_kwargs : Any1586Database connection parameters15871588"""1589with connection.connect(*connection_args, **connection_kwargs) as conn:1590with conn.cursor() as cur:1591funcs, links = self._locate_app_functions(cur)1592for fname in funcs:1593cur.execute(f'DROP FUNCTION IF EXISTS {fname}')1594for link in links:1595cur.execute(f'DROP LINK {link}')15961597async def call(1598self,1599name: str,1600data_in: io.BytesIO,1601data_out: io.BytesIO,1602data_format: Optional[str] = None,1603data_version: Optional[str] = None,1604) -> None:1605"""1606Call a function in the application.16071608Parameters1609----------1610name : str1611Name of the function to call1612data_in : io.BytesIO1613The input data rows1614data_out : io.BytesIO1615The output data rows1616data_format : str, optional1617The format of the input and output data1618data_version : str, optional1619The version of the data format16201621"""1622data_format = data_format or self.data_format1623data_version = data_version or self.data_version16241625async def receive() -> Dict[str, Any]:1626return dict(body=data_in.read())16271628async def send(content: Dict[str, Any]) -> None:1629status = content.get('status', 200)1630if status != 200:1631raise KeyError(f'error occurred when calling `{name}`: {status}')1632data_out.write(content.get('body', b''))16331634accepts = dict(1635json=b'application/json',1636rowdat_1=b'application/octet-stream',1637arrow=b'application/vnd.apache.arrow.file',1638)16391640# Mock an ASGI scope1641scope = dict(1642type='http',1643path='invoke',1644method='POST',1645headers={1646b'content-type': accepts[data_format.lower()],1647b'accepts': accepts[data_format.lower()],1648b's2-ef-name': name.encode('utf-8'),1649b's2-ef-version': data_version.encode('utf-8'),1650b's2-ef-ignore-cancel': b'true',1651},1652)16531654await self(scope, receive, send)16551656def to_environment(1657self,1658name: str,1659destination: str = '.',1660version: Optional[str] = None,1661dependencies: Optional[List[str]] = None,1662authors: Optional[List[Dict[str, str]]] = None,1663maintainers: Optional[List[Dict[str, str]]] = None,1664description: Optional[str] = None,1665container_service: Optional[Dict[str, Any]] = None,1666external_function: Optional[Dict[str, Any]] = None,1667external_function_remote: Optional[Dict[str, Any]] = None,1668external_function_collocated: Optional[Dict[str, Any]] = None,1669overwrite: bool = False,1670) -> None:1671"""1672Convert application to an environment file.16731674Parameters1675----------1676name : str1677Name of the output environment1678destination : str, optional1679Location of the output file1680version : str, optional1681Version of the package1682dependencies : List[str], optional1683List of dependency specifications like in a requirements.txt file1684authors : List[Dict[str, Any]], optional1685Dictionaries of author information. Keys may include: email, name1686maintainers : List[Dict[str, Any]], optional1687Dictionaries of maintainer information. Keys may include: email, name1688description : str, optional1689Description of package1690container_service : Dict[str, Any], optional1691Container service specifications1692external_function : Dict[str, Any], optional1693External function specifications (applies to both remote and collocated)1694external_function_remote : Dict[str, Any], optional1695Remote external function specifications1696external_function_collocated : Dict[str, Any], optional1697Collocated external function specifications1698overwrite : bool, optional1699Should destination file be overwritten if it exists?17001701"""1702if not has_cloudpickle:1703raise RuntimeError('the cloudpicke package is required for this operation')17041705# Write to temporary location if a remote destination is specified1706tmpdir = None1707if destination.startswith('stage://'):1708tmpdir = tempfile.TemporaryDirectory()1709local_path = os.path.join(tmpdir.name, f'{name}.env')1710else:1711local_path = os.path.join(destination, f'{name}.env')1712if not overwrite and os.path.exists(local_path):1713raise OSError(f'path already exists: {local_path}')17141715with zipfile.ZipFile(local_path, mode='w') as z:1716# Write metadata1717z.writestr(1718'pyproject.toml', utils.to_toml({1719'project': dict(1720name=name,1721version=version,1722dependencies=dependencies,1723requires_python='== ' +1724'.'.join(str(x) for x in sys.version_info[:3]),1725authors=authors,1726maintainers=maintainers,1727description=description,1728),1729'tool.container-service': container_service,1730'tool.external-function': external_function,1731'tool.external-function.remote': external_function_remote,1732'tool.external-function.collocated': external_function_collocated,1733}),1734)17351736# Write Python package1737z.writestr(1738f'{name}/__init__.py',1739textwrap.dedent(f'''1740import pickle as _pkl1741globals().update(1742_pkl.loads({cloudpickle.dumps(self.external_functions)}),1743)1744__all__ = {list(self.external_functions.keys())}''').strip(),1745)17461747# Upload to Stage as needed1748if destination.startswith('stage://'):1749url = urllib.parse.urlparse(re.sub(r'/+$', r'', destination) + '/')1750if not url.path or url.path == '/':1751raise ValueError(f'no stage path was specified: {destination}')17521753mgr = manage_workspaces()1754if url.hostname:1755wsg = mgr.get_workspace_group(url.hostname)1756elif os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP'):1757wsg = mgr.get_workspace_group(1758os.environ['SINGLESTOREDB_WORKSPACE_GROUP'],1759)1760else:1761raise ValueError(f'no workspace group specified: {destination}')17621763# Make intermediate directories1764if url.path.count('/') > 1:1765wsg.stage.mkdirs(os.path.dirname(url.path))17661767wsg.stage.upload_file(1768local_path, url.path + f'{name}.env',1769overwrite=overwrite,1770)1771os.remove(local_path)177217731774def main(argv: Optional[List[str]] = None) -> None:1775"""1776Main program for HTTP-based Python UDFs17771778Parameters1779----------1780argv : List[str], optional1781List of command-line parameters17821783"""1784try:1785import uvicorn1786except ImportError:1787raise ImportError('the uvicorn package is required to run this command')17881789# Should we run in embedded mode (typically for Jupyter)1790try:1791asyncio.get_running_loop()1792use_async = True1793except RuntimeError:1794use_async = False17951796# Temporary directory for Stage environment files1797tmpdir = None17981799# Depending on whether we find an environment file specified, we1800# may have to process the command line twice.1801functions = []1802defaults: Dict[str, Any] = {}1803for i in range(2):18041805parser = argparse.ArgumentParser(1806prog='python -m singlestoredb.functions.ext.asgi',1807description='Run an HTTP-based Python UDF server',1808)1809parser.add_argument(1810'--url', metavar='url',1811default=defaults.get(1812'url',1813get_option('external_function.url'),1814),1815help='URL of the UDF server endpoint',1816)1817parser.add_argument(1818'--host', metavar='host',1819default=defaults.get(1820'host',1821get_option('external_function.host'),1822),1823help='bind socket to this host',1824)1825parser.add_argument(1826'--port', metavar='port', type=int,1827default=defaults.get(1828'port',1829get_option('external_function.port'),1830),1831help='bind socket to this port',1832)1833parser.add_argument(1834'--db', metavar='conn-str',1835default=defaults.get(1836'connection',1837get_option('external_function.connection'),1838),1839help='connection string to use for registering functions',1840)1841parser.add_argument(1842'--replace-existing', action='store_true',1843help='should existing functions of the same name '1844'in the database be replaced?',1845)1846parser.add_argument(1847'--data-format', metavar='format',1848default=defaults.get(1849'data_format',1850get_option('external_function.data_format'),1851),1852choices=['rowdat_1', 'json'],1853help='format of the data rows',1854)1855parser.add_argument(1856'--data-version', metavar='version',1857default=defaults.get(1858'data_version',1859get_option('external_function.data_version'),1860),1861help='version of the data row format',1862)1863parser.add_argument(1864'--link-name', metavar='name',1865default=defaults.get(1866'link_name',1867get_option('external_function.link_name'),1868) or '',1869help='name of the link to use for connections',1870)1871parser.add_argument(1872'--link-config', metavar='json',1873default=str(1874defaults.get(1875'link_config',1876get_option('external_function.link_config'),1877) or '{}',1878),1879help='link config in JSON format',1880)1881parser.add_argument(1882'--link-credentials', metavar='json',1883default=str(1884defaults.get(1885'link_credentials',1886get_option('external_function.link_credentials'),1887) or '{}',1888),1889help='link credentials in JSON format',1890)1891parser.add_argument(1892'--log-level', metavar='[info|debug|warning|error]',1893default=defaults.get(1894'log_level',1895get_option('external_function.log_level'),1896),1897help='logging level',1898)1899parser.add_argument(1900'--log-file', metavar='filepath',1901default=defaults.get(1902'log_file',1903get_option('external_function.log_file'),1904),1905help='File path to write logs to instead of console',1906)1907parser.add_argument(1908'--disable-metrics', action='store_true',1909default=defaults.get(1910'disable_metrics',1911get_option('external_function.disable_metrics'),1912),1913help='Disable logging of function call metrics',1914)1915parser.add_argument(1916'--name-prefix', metavar='name_prefix',1917default=defaults.get(1918'name_prefix',1919get_option('external_function.name_prefix'),1920),1921help='Prefix to add to function names',1922)1923parser.add_argument(1924'--name-suffix', metavar='name_suffix',1925default=defaults.get(1926'name_suffix',1927get_option('external_function.name_suffix'),1928),1929help='Suffix to add to function names',1930)1931parser.add_argument(1932'--function-database', metavar='function_database',1933default=defaults.get(1934'function_database',1935get_option('external_function.function_database'),1936),1937help='Database to use for the function definition',1938)1939parser.add_argument(1940'--app-name', metavar='app_name',1941default=defaults.get(1942'app_name',1943get_option('external_function.app_name'),1944),1945help='Name for the application instance',1946)1947parser.add_argument(1948'functions', metavar='module.or.func.path', nargs='*',1949help='functions or modules to export in UDF server',1950)19511952args = parser.parse_args(argv)19531954if i > 0:1955break19561957# Download Stage files as needed1958for i, f in enumerate(args.functions):1959if f.startswith('stage://'):1960url = urllib.parse.urlparse(f)1961if not url.path or url.path == '/':1962raise ValueError(f'no stage path was specified: {f}')1963if url.path.endswith('/'):1964raise ValueError(f'an environment file must be specified: {f}')19651966mgr = manage_workspaces()1967if url.hostname:1968wsg = mgr.get_workspace_group(url.hostname)1969elif os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP'):1970wsg = mgr.get_workspace_group(1971os.environ['SINGLESTOREDB_WORKSPACE_GROUP'],1972)1973else:1974raise ValueError(f'no workspace group specified: {f}')19751976if tmpdir is None:1977tmpdir = tempfile.TemporaryDirectory()19781979local_path = os.path.join(tmpdir.name, url.path.split('/')[-1])1980wsg.stage.download_file(url.path, local_path)1981args.functions[i] = local_path19821983elif f.startswith('http://') or f.startswith('https://'):1984if tmpdir is None:1985tmpdir = tempfile.TemporaryDirectory()19861987local_path = os.path.join(tmpdir.name, f.split('/')[-1])1988urllib.request.urlretrieve(f, local_path)1989args.functions[i] = local_path19901991# See if any of the args are zip files (assume they are environment files)1992modules = [(x, zipfile.is_zipfile(x)) for x in args.functions]1993envs = [x[0] for x in modules if x[1]]1994others = [x[0] for x in modules if not x[1]]19951996if envs and len(envs) > 1:1997raise RuntimeError('only one environment file may be specified')19981999if envs and others:2000raise RuntimeError('environment files and other modules can not be mixed.')20012002# See if an environment file was specified. If so, use those settings2003# as the defaults and reprocess command line.2004if envs:2005# Add pyproject.toml variables and redo command-line processing2006defaults = utils.read_config(2007envs[0],2008['tool.external-function', 'tool.external-function.remote'],2009)20102011# Load zip file as a module2012modname = os.path.splitext(os.path.basename(envs[0]))[0]2013zi = zipimport.zipimporter(envs[0])2014mod = zi.load_module(modname)2015if mod is None:2016raise RuntimeError(f'environment file could not be imported: {envs[0]}')2017functions = [mod]20182019if defaults:2020continue20212022args.functions = functions or args.functions or None2023args.replace_existing = args.replace_existing \2024or defaults.get('replace_existing') \2025or get_option('external_function.replace_existing')20262027# Substitute in host / port if specified2028if args.host != defaults.get('host') or args.port != defaults.get('port'):2029u = urllib.parse.urlparse(args.url)2030args.url = u._replace(netloc=f'{args.host}:{args.port}').geturl()20312032# Create application from functions / module2033app = Application(2034functions=args.functions,2035url=args.url,2036data_format=args.data_format,2037data_version=args.data_version,2038link_name=args.link_name or None,2039link_config=json.loads(args.link_config) or None,2040link_credentials=json.loads(args.link_credentials) or None,2041app_mode='remote',2042name_prefix=args.name_prefix,2043name_suffix=args.name_suffix,2044function_database=args.function_database or None,2045log_file=args.log_file,2046log_level=args.log_level,2047disable_metrics=args.disable_metrics,2048app_name=args.app_name,2049)20502051funcs = app.get_create_functions(replace=args.replace_existing)2052if not funcs:2053raise RuntimeError('no functions specified')20542055for f in funcs:2056app.logger.info(f)20572058try:2059if args.db:2060app.logger.info('Registering functions with database')2061app.register_functions(2062args.db,2063replace=args.replace_existing,2064)20652066app_args = {2067k: v for k, v in dict(2068host=args.host or None,2069port=args.port or None,2070log_level=args.log_level,2071lifespan='off',2072).items() if v is not None2073}20742075# Configure uvicorn logging to use JSON format matching Application's format2076app_args['log_config'] = app.get_uvicorn_log_config()20772078if use_async:2079asyncio.create_task(_run_uvicorn(uvicorn, app, app_args, db=args.db))2080else:2081uvicorn.run(app, **app_args)20822083finally:2084if not use_async and args.db:2085app.logger.info('Dropping functions from database')2086app.drop_functions(args.db)208720882089async def _run_uvicorn(2090uvicorn: Any,2091app: Any,2092app_args: Any,2093db: Optional[str] = None,2094) -> None:2095"""Run uvicorn server and clean up functions after shutdown."""2096await uvicorn.Server(uvicorn.Config(app, **app_args)).serve()2097if db:2098app.logger.info('Dropping functions from database')2099app.drop_functions(db)210021012102create_app = Application210321042105if __name__ == '__main__':2106try:2107main()2108except RuntimeError as exc:2109logger.error(str(exc))2110sys.exit(1)2111except KeyboardInterrupt:2112pass211321142115