Path: blob/main/singlestoredb/functions/ext/asgi.py
798 views
#!/usr/bin/env python31"""2Web application for SingleStoreDB external functions.34This module supplies a function that can create web apps intended for use5with the external function feature of SingleStoreDB. The application6function is a standard ASGI <https://asgi.readthedocs.io/en/latest/index.html>7request handler for use with servers such as Uvicorn <https://www.uvicorn.org>.89An external function web application can be created using the `create_app`10function. By default, the exported Python functions are specified by11environment variables starting with SINGLESTOREDB_EXT_FUNCTIONS. See the12documentation in `create_app` for the full syntax. If the application is13created in Python code rather than from the command-line, exported14functions can be specified in the parameters.1516An example of starting a server is shown below.1718Example19-------20> SINGLESTOREDB_EXT_FUNCTIONS='myfuncs.[percentile_90,percentile_95]' \21python3 -m singlestoredb.functions.ext.asgi2223"""24import argparse25import asyncio26import contextvars27import dataclasses28import datetime29import functools30import importlib.util31import inspect32import io33import itertools34import json35import logging36import os37import re38import secrets39import sys40import tempfile41import textwrap42import threading43import time44import traceback45import typing46import urllib47import uuid48import zipfile49import zipimport50from collections.abc import Awaitable51from collections.abc import Iterable52from collections.abc import Sequence53from types import ModuleType54from typing import Any55from typing import Callable56from typing import Dict57from typing import List58from typing import Optional59from typing import Set60from typing import Tuple61from typing import Union6263from . import arrow64from . import json as jdata65from . import rowdat_166from . import utils67from ... import connection68from ... import manage_workspaces69from ...config import get_option70from ...mysql.constants import FIELD_TYPE as ft71from ..signature import get_signature72from ..signature import signature_to_sql73from ..typing import Masked74from ..typing import Table75from .timer import Timer76from singlestoredb.docstring.parser import parse77from singlestoredb.functions.dtypes import escape_name7879try:80import cloudpickle81has_cloudpickle = True82except ImportError:83has_cloudpickle = False8485try:86from pydantic import BaseModel87has_pydantic = True88except ImportError:89has_pydantic = False909192logger = utils.get_logger('singlestoredb.functions.ext.asgi')9394# If a number of processes is specified, create a pool of workers95num_processes = max(0, int(os.environ.get('SINGLESTOREDB_EXT_NUM_PROCESSES', 0)))96if num_processes > 1:97try:98from ray.util.multiprocessing import Pool99except ImportError:100from multiprocessing import Pool101func_map = Pool(num_processes).starmap102else:103func_map = itertools.starmap104105106async def to_thread(107func: Any, /, *args: Any, **kwargs: Dict[str, Any],108) -> Any:109loop = asyncio.get_running_loop()110ctx = contextvars.copy_context()111func_call = functools.partial(ctx.run, func, *args, **kwargs)112return await loop.run_in_executor(None, func_call)113114115# Use negative values to indicate unsigned ints / binary data / usec time precision116rowdat_1_type_map = {117'bool': ft.LONGLONG,118'int8': ft.LONGLONG,119'int16': ft.LONGLONG,120'int32': ft.LONGLONG,121'int64': ft.LONGLONG,122'uint8': -ft.LONGLONG,123'uint16': -ft.LONGLONG,124'uint32': -ft.LONGLONG,125'uint64': -ft.LONGLONG,126'float32': ft.DOUBLE,127'float64': ft.DOUBLE,128'str': ft.STRING,129'bytes': -ft.STRING,130}131132133def get_func_names(funcs: str) -> List[Tuple[str, str]]:134"""135Parse all function names from string.136137Parameters138----------139func_names : str140String containing one or more function names. The syntax is141as follows: [func-name-1@func-alias-1,func-name-2@func-alias-2,...].142The optional '@name' portion is an alias if you want the function143to be renamed.144145Returns146-------147List[Tuple[str]] : a list of tuples containing the names and aliases148of each function.149150"""151if funcs.startswith('['):152func_names = funcs.replace('[', '').replace(']', '').split(',')153func_names = [x.strip() for x in func_names]154else:155func_names = [funcs]156157out = []158for name in func_names:159alias = name160if '@' in name:161name, alias = name.split('@', 1)162out.append((name, alias))163164return out165166167def as_tuple(x: Any) -> Any:168"""Convert object to tuple."""169if has_pydantic and isinstance(x, BaseModel):170return tuple(x.model_dump().values())171if dataclasses.is_dataclass(x):172return dataclasses.astuple(x) # type: ignore173if isinstance(x, dict):174return tuple(x.values())175return tuple(x)176177178def as_list_of_tuples(x: Any) -> Any:179"""Convert object to a list of tuples."""180if isinstance(x, Table):181x = x[0]182if isinstance(x, (list, tuple)) and len(x) > 0:183if isinstance(x[0], (list, tuple)):184return x185if has_pydantic and isinstance(x[0], BaseModel):186return [tuple(y.model_dump().values()) for y in x]187if dataclasses.is_dataclass(x[0]):188return [dataclasses.astuple(y) for y in x]189if isinstance(x[0], dict):190return [tuple(y.values()) for y in x]191return [(y,) for y in x]192return x193194195def get_dataframe_columns(df: Any) -> List[Any]:196"""Return columns of data from a dataframe/table."""197if isinstance(df, Table):198if len(df) == 1:199df = df[0]200else:201return list(df)202203if isinstance(df, Masked):204return [df]205206if isinstance(df, tuple):207return list(df)208209rtype = str(type(df)).lower()210if 'dataframe' in rtype:211return [df[x] for x in df.columns]212elif 'table' in rtype:213return df.columns214elif 'series' in rtype:215return [df]216elif 'array' in rtype:217return [df]218elif 'tuple' in rtype:219return list(df)220221raise TypeError(222'Unsupported data type for dataframe columns: '223f'{rtype}',224)225226227def get_array_class(data_format: str) -> Callable[..., Any]:228"""229Get the array class for the current data format.230231"""232if data_format == 'polars':233import polars as pl234array_cls = pl.Series235elif data_format == 'arrow':236import pyarrow as pa237array_cls = pa.array238elif data_format == 'pandas':239import pandas as pd240array_cls = pd.Series241else:242import numpy as np243array_cls = np.array244return array_cls245246247def get_masked_params(func: Callable[..., Any]) -> List[bool]:248"""249Get the list of masked parameters for the function.250251Parameters252----------253func : Callable254The function to call as the endpoint255256Returns257-------258List[bool]259Boolean list of masked parameters260261"""262params = inspect.signature(func).parameters263return [typing.get_origin(x.annotation) is Masked for x in params.values()]264265266def build_tuple(x: Any) -> Any:267"""Convert object to tuple."""268return tuple(x) if isinstance(x, Masked) else (x, None)269270271def cancel_on_event(272cancel_event: threading.Event,273) -> None:274"""275Cancel the function call if the cancel event is set.276277Parameters278----------279cancel_event : threading.Event280The event to check for cancellation281282Raises283------284asyncio.CancelledError285If the cancel event is set286287"""288if cancel_event.is_set():289task = asyncio.current_task()290if task is not None:291task.cancel()292raise asyncio.CancelledError(293'Function call was cancelled by client',294)295296297def build_udf_endpoint(298func: Callable[..., Any],299returns_data_format: str,300) -> Callable[..., Any]:301"""302Build a UDF endpoint for scalar / list types (row-based).303304Parameters305----------306func : Callable307The function to call as the endpoint308returns_data_format : str309The format of the return values310311Returns312-------313Callable314The function endpoint315316"""317if returns_data_format in ['scalar', 'list']:318319is_async = asyncio.iscoroutinefunction(func)320321async def do_func(322cancel_event: threading.Event,323timer: Timer,324row_ids: Sequence[int],325rows: Sequence[Sequence[Any]],326) -> Tuple[Sequence[int], List[Tuple[Any, ...]]]:327'''Call function on given rows of data.'''328out = []329async with timer('call_function'):330for row in rows:331cancel_on_event(cancel_event)332if is_async:333out.append(await func(*row))334else:335out.append(func(*row))336return row_ids, list(zip(out))337338return do_func339340return build_vector_udf_endpoint(func, returns_data_format)341342343def build_vector_udf_endpoint(344func: Callable[..., Any],345returns_data_format: str,346) -> Callable[..., Any]:347"""348Build a UDF endpoint for vector formats (column-based).349350Parameters351----------352func : Callable353The function to call as the endpoint354returns_data_format : str355The format of the return values356357Returns358-------359Callable360The function endpoint361362"""363masks = get_masked_params(func)364array_cls = get_array_class(returns_data_format)365is_async = asyncio.iscoroutinefunction(func)366367async def do_func(368cancel_event: threading.Event,369timer: Timer,370row_ids: Sequence[int],371cols: Sequence[Tuple[Sequence[Any], Optional[Sequence[bool]]]],372) -> Tuple[373Sequence[int],374List[Tuple[Sequence[Any], Optional[Sequence[bool]]]],375]:376'''Call function on given columns of data.'''377row_ids = array_cls(row_ids)378379# Call the function with `cols` as the function parameters380async with timer('call_function'):381if cols and cols[0]:382if is_async:383out = await func(*[x if m else x[0] for x, m in zip(cols, masks)])384else:385out = func(*[x if m else x[0] for x, m in zip(cols, masks)])386else:387if is_async:388out = await func()389else:390out = func()391392cancel_on_event(cancel_event)393394# Single masked value395if isinstance(out, Masked):396return row_ids, [tuple(out)]397398# Multiple return values399if isinstance(out, tuple):400return row_ids, [build_tuple(x) for x in out]401402# Single return value403return row_ids, [(out, None)]404405return do_func406407408def build_tvf_endpoint(409func: Callable[..., Any],410returns_data_format: str,411) -> Callable[..., Any]:412"""413Build a TVF endpoint for scalar / list types (row-based).414415Parameters416----------417func : Callable418The function to call as the endpoint419returns_data_format : str420The format of the return values421422Returns423-------424Callable425The function endpoint426427"""428if returns_data_format in ['scalar', 'list']:429430is_async = asyncio.iscoroutinefunction(func)431432async def do_func(433cancel_event: threading.Event,434timer: Timer,435row_ids: Sequence[int],436rows: Sequence[Sequence[Any]],437) -> Tuple[Sequence[int], List[Tuple[Any, ...]]]:438'''Call function on given rows of data.'''439out_ids: List[int] = []440out = []441# Call function on each row of data442async with timer('call_function'):443for i, row in zip(row_ids, rows):444cancel_on_event(cancel_event)445if is_async:446res = await func(*row)447else:448res = func(*row)449out.extend(as_list_of_tuples(res))450out_ids.extend([row_ids[i]] * (len(out)-len(out_ids)))451return out_ids, out452453return do_func454455return build_vector_tvf_endpoint(func, returns_data_format)456457458def build_vector_tvf_endpoint(459func: Callable[..., Any],460returns_data_format: str,461) -> Callable[..., Any]:462"""463Build a TVF endpoint for vector formats (column-based).464465Parameters466----------467func : Callable468The function to call as the endpoint469returns_data_format : str470The format of the return values471472Returns473-------474Callable475The function endpoint476477"""478masks = get_masked_params(func)479array_cls = get_array_class(returns_data_format)480481async def do_func(482cancel_event: threading.Event,483timer: Timer,484row_ids: Sequence[int],485cols: Sequence[Tuple[Sequence[Any], Optional[Sequence[bool]]]],486) -> Tuple[487Sequence[int],488List[Tuple[Sequence[Any], Optional[Sequence[bool]]]],489]:490'''Call function on given columns of data.'''491# NOTE: There is no way to determine which row ID belongs to492# each result row, so we just have to use the same493# row ID for all rows in the result.494495is_async = asyncio.iscoroutinefunction(func)496497# Call function on each column of data498async with timer('call_function'):499if cols and cols[0]:500if is_async:501func_res = await func(502*[x if m else x[0] for x, m in zip(cols, masks)],503)504else:505func_res = func(506*[x if m else x[0] for x, m in zip(cols, masks)],507)508else:509if is_async:510func_res = await func()511else:512func_res = func()513514res = get_dataframe_columns(func_res)515516cancel_on_event(cancel_event)517518# Generate row IDs519if isinstance(res[0], Masked):520row_ids = array_cls([row_ids[0]] * len(res[0][0]))521else:522row_ids = array_cls([row_ids[0]] * len(res[0]))523524return row_ids, [build_tuple(x) for x in res]525526return do_func527528529def make_func(530name: str,531func: Callable[..., Any],532) -> Tuple[Callable[..., Any], Dict[str, Any]]:533"""534Make a function endpoint.535536Parameters537----------538name : str539Name of the function to create540func : Callable541The function to call as the endpoint542database : str, optional543The database to use for the function definition544545Returns546-------547(Callable, Dict[str, Any])548549"""550info: Dict[str, Any] = {}551552sig = get_signature(func, func_name=name)553554function_type = sig.get('function_type', 'udf')555args_data_format = sig.get('args_data_format', 'scalar')556returns_data_format = sig.get('returns_data_format', 'scalar')557timeout = (558func._singlestoredb_attrs.get('timeout') or # type: ignore559get_option('external_function.timeout')560)561562if function_type == 'tvf':563do_func = build_tvf_endpoint(func, returns_data_format)564else:565do_func = build_udf_endpoint(func, returns_data_format)566567do_func.__name__ = name568do_func.__doc__ = func.__doc__569570# Store signature for generating CREATE FUNCTION calls571info['signature'] = sig572573# Set data format574info['args_data_format'] = args_data_format575info['returns_data_format'] = returns_data_format576577# Set function type578info['function_type'] = function_type579580# Set timeout581info['timeout'] = max(timeout, 1)582583# Set async flag584info['is_async'] = asyncio.iscoroutinefunction(func)585586# Setup argument types for rowdat_1 parser587colspec = []588for x in sig['args']:589dtype = x['dtype'].replace('?', '')590if dtype not in rowdat_1_type_map:591raise TypeError(f'no data type mapping for {dtype}')592colspec.append((x['name'], rowdat_1_type_map[dtype]))593info['colspec'] = colspec594595# Setup return type596returns = []597for x in sig['returns']:598dtype = x['dtype'].replace('?', '')599if dtype not in rowdat_1_type_map:600raise TypeError(f'no data type mapping for {dtype}')601returns.append((x['name'], rowdat_1_type_map[dtype]))602info['returns'] = returns603604return do_func, info605606607async def cancel_on_timeout(timeout: int) -> None:608"""Cancel request if it takes too long."""609await asyncio.sleep(timeout)610raise asyncio.CancelledError(611'Function call was cancelled due to timeout',612)613614615async def cancel_on_disconnect(616receive: Callable[..., Awaitable[Any]],617) -> None:618"""Cancel request if client disconnects."""619while True:620message = await receive()621if message.get('type', '') == 'http.disconnect':622raise asyncio.CancelledError(623'Function call was cancelled by client',624)625626627async def cancel_all_tasks(tasks: Iterable[asyncio.Task[Any]]) -> None:628"""Cancel all tasks."""629for task in tasks:630task.cancel()631await asyncio.gather(*tasks, return_exceptions=True)632633634def start_counter() -> float:635"""Start a timer and return the start time."""636return time.perf_counter()637638639def end_counter(start: float) -> float:640"""End a timer and return the elapsed time."""641return time.perf_counter() - start642643644class Application(object):645"""646Create an external function application.647648If `functions` is None, the environment is searched for function649specifications in variables starting with `SINGLESTOREDB_EXT_FUNCTIONS`.650Any number of environment variables can be specified as long as they651have this prefix. The format of the environment variable value is the652same as for the `functions` parameter.653654Parameters655----------656functions : str or Iterable[str], optional657Python functions are specified using a string format as follows:658* Single function : <pkg1>.<func1>659* Multiple functions : <pkg1>.[<func1-name,func2-name,...]660* Function aliases : <pkg1>.[<func1@alias1,func2@alias2,...]661* Multiple packages : <pkg1>.<func1>:<pkg2>.<func2>662app_mode : str, optional663The mode of operation for the application: remote, managed, or collocated664url : str, optional665The URL of the function API666data_format : str, optional667The format of the data rows: 'rowdat_1' or 'json'668data_version : str, optional669The version of the call format to expect: '1.0'670link_name : str, optional671The link name to use for the external function application. This is672only for pre-existing links, and can only be used without673``link_config`` and ``link_credentials``.674link_config : Dict[str, Any], optional675The CONFIG section of a LINK definition. This dictionary gets676converted to JSON for the CREATE LINK call.677link_credentials : Dict[str, Any], optional678The CREDENTIALS section of a LINK definition. This dictionary gets679converted to JSON for the CREATE LINK call.680name_prefix : str, optional681Prefix to add to function names when registering with the database682name_suffix : str, optional683Suffix to add to function names when registering with the database684function_database : str, optional685The database to use for external function definitions.686log_file : str, optional687File path to write logs to instead of console. If None, logs are688written to console. When specified, application logger handlers689are replaced with a file handler.690log_level : str, optional691Logging level for the application logger. Valid values are 'info',692'debug', 'warning', 'error'. Defaults to 'info'.693disable_metrics : bool, optional694Disable logging of function call metrics. Defaults to False.695app_name : str, optional696Name for the application instance. Used to create a logger-specific697name. If not provided, a random name will be generated.698699"""700701# Plain text response start702text_response_dict: Dict[str, Any] = dict(703type='http.response.start',704status=200,705headers=[(b'content-type', b'text/plain')],706)707708# Error response start709error_response_dict: Dict[str, Any] = dict(710type='http.response.start',711status=500,712headers=[(b'content-type', b'text/plain')],713)714715# Timeout response start716timeout_response_dict: Dict[str, Any] = dict(717type='http.response.start',718status=504,719headers=[(b'content-type', b'text/plain')],720)721722# Cancel response start723cancel_response_dict: Dict[str, Any] = dict(724type='http.response.start',725status=503,726headers=[(b'content-type', b'text/plain')],727)728729# JSON response start730json_response_dict: Dict[str, Any] = dict(731type='http.response.start',732status=200,733headers=[(b'content-type', b'application/json')],734)735736# ROWDAT_1 response start737rowdat_1_response_dict: Dict[str, Any] = dict(738type='http.response.start',739status=200,740headers=[(b'content-type', b'x-application/rowdat_1')],741)742743# Apache Arrow response start744arrow_response_dict: Dict[str, Any] = dict(745type='http.response.start',746status=200,747headers=[(b'content-type', b'application/vnd.apache.arrow.file')],748)749750# Path not found response start751path_not_found_response_dict: Dict[str, Any] = dict(752type='http.response.start',753status=404,754)755756# Response body template757body_response_dict: Dict[str, Any] = dict(758type='http.response.body',759)760761# Data format + version handlers762handlers = {763(b'application/octet-stream', b'1.0', 'scalar'): dict(764load=rowdat_1.load,765dump=rowdat_1.dump,766response=rowdat_1_response_dict,767),768(b'application/octet-stream', b'1.0', 'list'): dict(769load=rowdat_1.load,770dump=rowdat_1.dump,771response=rowdat_1_response_dict,772),773(b'application/octet-stream', b'1.0', 'pandas'): dict(774load=rowdat_1.load_pandas,775dump=rowdat_1.dump_pandas,776response=rowdat_1_response_dict,777),778(b'application/octet-stream', b'1.0', 'numpy'): dict(779load=rowdat_1.load_numpy,780dump=rowdat_1.dump_numpy,781response=rowdat_1_response_dict,782),783(b'application/octet-stream', b'1.0', 'polars'): dict(784load=rowdat_1.load_polars,785dump=rowdat_1.dump_polars,786response=rowdat_1_response_dict,787),788(b'application/octet-stream', b'1.0', 'arrow'): dict(789load=rowdat_1.load_arrow,790dump=rowdat_1.dump_arrow,791response=rowdat_1_response_dict,792),793(b'application/json', b'1.0', 'scalar'): dict(794load=jdata.load,795dump=jdata.dump,796response=json_response_dict,797),798(b'application/json', b'1.0', 'list'): dict(799load=jdata.load,800dump=jdata.dump,801response=json_response_dict,802),803(b'application/json', b'1.0', 'pandas'): dict(804load=jdata.load_pandas,805dump=jdata.dump_pandas,806response=json_response_dict,807),808(b'application/json', b'1.0', 'numpy'): dict(809load=jdata.load_numpy,810dump=jdata.dump_numpy,811response=json_response_dict,812),813(b'application/json', b'1.0', 'polars'): dict(814load=jdata.load_polars,815dump=jdata.dump_polars,816response=json_response_dict,817),818(b'application/json', b'1.0', 'arrow'): dict(819load=jdata.load_arrow,820dump=jdata.dump_arrow,821response=json_response_dict,822),823(b'application/vnd.apache.arrow.file', b'1.0', 'scalar'): dict(824load=arrow.load,825dump=arrow.dump,826response=arrow_response_dict,827),828(b'application/vnd.apache.arrow.file', b'1.0', 'pandas'): dict(829load=arrow.load_pandas,830dump=arrow.dump_pandas,831response=arrow_response_dict,832),833(b'application/vnd.apache.arrow.file', b'1.0', 'numpy'): dict(834load=arrow.load_numpy,835dump=arrow.dump_numpy,836response=arrow_response_dict,837),838(b'application/vnd.apache.arrow.file', b'1.0', 'polars'): dict(839load=arrow.load_polars,840dump=arrow.dump_polars,841response=arrow_response_dict,842),843(b'application/vnd.apache.arrow.file', b'1.0', 'arrow'): dict(844load=arrow.load_arrow,845dump=arrow.dump_arrow,846response=arrow_response_dict,847),848}849850# Valid URL paths851invoke_path = ('invoke',)852show_create_function_path = ('show', 'create_function')853show_function_info_path = ('show', 'function_info')854status = ('status',)855856def __init__(857self,858functions: Optional[859Union[860str,861Iterable[str],862Callable[..., Any],863Iterable[Callable[..., Any]],864ModuleType,865Iterable[ModuleType],866]867] = None,868app_mode: str = get_option('external_function.app_mode'),869url: str = get_option('external_function.url'),870data_format: str = get_option('external_function.data_format'),871data_version: str = get_option('external_function.data_version'),872link_name: Optional[str] = get_option('external_function.link_name'),873link_config: Optional[Dict[str, Any]] = None,874link_credentials: Optional[Dict[str, Any]] = None,875name_prefix: str = get_option('external_function.name_prefix'),876name_suffix: str = get_option('external_function.name_suffix'),877function_database: Optional[str] = None,878log_file: Optional[str] = get_option('external_function.log_file'),879log_level: str = get_option('external_function.log_level'),880disable_metrics: bool = get_option('external_function.disable_metrics'),881app_name: Optional[str] = get_option('external_function.app_name'),882) -> None:883if link_name and (link_config or link_credentials):884raise ValueError(885'`link_name` can not be used with `link_config` or `link_credentials`',886)887888if link_config is None:889link_config = json.loads(890get_option('external_function.link_config') or '{}',891) or None892893if link_credentials is None:894link_credentials = json.loads(895get_option('external_function.link_credentials') or '{}',896) or None897898# Generate application name if not provided899if app_name is None:900app_name = f'udf_app_{secrets.token_hex(4)}'901902self.name = app_name903904# Create logger instance specific to this application905self.logger = utils.get_logger(f'singlestoredb.functions.ext.asgi.{self.name}')906907# List of functions specs908specs: List[Union[str, Callable[..., Any], ModuleType]] = []909910# Look up Python function specifications911if functions is None:912env_vars = [913x for x in os.environ.keys()914if x.startswith('SINGLESTOREDB_EXT_FUNCTIONS')915]916if env_vars:917specs = [os.environ[x] for x in env_vars]918else:919import __main__920specs = [__main__]921922elif isinstance(functions, ModuleType):923specs = [functions]924925elif isinstance(functions, str):926specs = [functions]927928elif callable(functions):929specs = [functions]930931else:932specs = list(functions)933934# Add functions to application935endpoints = dict()936external_functions = dict()937for funcs in itertools.chain(specs):938939if isinstance(funcs, str):940# Module name941if importlib.util.find_spec(funcs) is not None:942items = importlib.import_module(funcs)943for x in vars(items).values():944if not hasattr(x, '_singlestoredb_attrs'):945continue946name = x._singlestoredb_attrs.get('name', x.__name__)947name = f'{name_prefix}{name}{name_suffix}'948external_functions[x.__name__] = x949func, info = make_func(name, x)950endpoints[name.encode('utf-8')] = func, info951952# Fully qualified function name953elif '.' in funcs:954pkg_path, func_names = funcs.rsplit('.', 1)955pkg = importlib.import_module(pkg_path)956957if pkg is None:958raise RuntimeError(f'Could not locate module: {pkg}')959960# Add endpoint for each exported function961for name, alias in get_func_names(func_names):962item = getattr(pkg, name)963alias = f'{name_prefix}{name}{name_suffix}'964external_functions[name] = item965func, info = make_func(alias, item)966endpoints[alias.encode('utf-8')] = func, info967968else:969raise RuntimeError(f'Could not locate module: {funcs}')970971elif isinstance(funcs, ModuleType):972for x in vars(funcs).values():973if not hasattr(x, '_singlestoredb_attrs'):974continue975name = x._singlestoredb_attrs.get('name', x.__name__)976name = f'{name_prefix}{name}{name_suffix}'977external_functions[x.__name__] = x978func, info = make_func(name, x)979endpoints[name.encode('utf-8')] = func, info980981else:982alias = funcs.__name__983external_functions[funcs.__name__] = funcs984alias = f'{name_prefix}{alias}{name_suffix}'985func, info = make_func(alias, funcs)986endpoints[alias.encode('utf-8')] = func, info987988self.app_mode = app_mode989self.url = url990self.data_format = data_format991self.data_version = data_version992self.link_name = link_name993self.link_config = link_config994self.link_credentials = link_credentials995self.endpoints = endpoints996self.external_functions = external_functions997self.function_database = function_database998self.log_file = log_file999self.log_level = log_level1000self.disable_metrics = disable_metrics10011002# Configure logging1003self._configure_logging()10041005def _configure_logging(self) -> None:1006"""Configure logging based on the log_file settings."""1007# Set logger level1008self.logger.setLevel(getattr(logging, self.log_level.upper()))10091010# Remove all existing handlers to ensure clean configuration1011self.logger.handlers.clear()10121013# Configure log file if specified1014if self.log_file:1015# Create file handler1016file_handler = logging.FileHandler(self.log_file)1017file_handler.setLevel(getattr(logging, self.log_level.upper()))10181019# Use JSON formatter for file logging1020formatter = utils.JSONFormatter()1021file_handler.setFormatter(formatter)10221023# Add the handler to the logger1024self.logger.addHandler(file_handler)1025else:1026# For console logging, create a new stream handler with JSON formatter1027console_handler = logging.StreamHandler()1028console_handler.setLevel(getattr(logging, self.log_level.upper()))1029console_handler.setFormatter(utils.JSONFormatter())1030self.logger.addHandler(console_handler)10311032# Prevent propagation to avoid duplicate or differently formatted messages1033self.logger.propagate = False10341035def get_uvicorn_log_config(self) -> Dict[str, Any]:1036"""1037Create uvicorn log config that matches the Application's logging format.10381039This method returns the log configuration used by uvicorn, allowing external1040users to match the logging format of the Application class.10411042Returns1043-------1044Dict[str, Any]1045Log configuration dictionary compatible with uvicorn's log_config parameter10461047"""1048log_config = {1049'version': 1,1050'disable_existing_loggers': False,1051'formatters': {1052'json': {1053'()': 'singlestoredb.functions.ext.utils.JSONFormatter',1054},1055},1056'handlers': {1057'default': {1058'class': (1059'logging.FileHandler' if self.log_file1060else 'logging.StreamHandler'1061),1062'formatter': 'json',1063},1064},1065'loggers': {1066'uvicorn': {1067'handlers': ['default'],1068'level': self.log_level.upper(),1069'propagate': False,1070},1071'uvicorn.error': {1072'handlers': ['default'],1073'level': self.log_level.upper(),1074'propagate': False,1075},1076'uvicorn.access': {1077'handlers': ['default'],1078'level': self.log_level.upper(),1079'propagate': False,1080},1081},1082}10831084# Add filename to file handler if log file is specified1085if self.log_file:1086log_config['handlers']['default']['filename'] = self.log_file # type: ignore10871088return log_config10891090async def __call__(1091self,1092scope: Dict[str, Any],1093receive: Callable[..., Awaitable[Any]],1094send: Callable[..., Awaitable[Any]],1095) -> None:1096'''1097Application request handler.10981099Parameters1100----------1101scope : dict1102ASGI request scope1103receive : Callable1104Function to receieve request information1105send : Callable1106Function to send response information11071108'''1109request_id = str(uuid.uuid4())11101111timer = Timer(1112app_name=self.name,1113id=request_id,1114timestamp=datetime.datetime.now(1115datetime.timezone.utc,1116).strftime('%Y-%m-%dT%H:%M:%S.%fZ'),1117)1118call_timer = Timer(1119app_name=self.name,1120id=request_id,1121timestamp=datetime.datetime.now(1122datetime.timezone.utc,1123).strftime('%Y-%m-%dT%H:%M:%S.%fZ'),1124)11251126if scope['type'] != 'http':1127raise ValueError(f"Expected HTTP scope, got {scope['type']}")11281129method = scope['method']1130path = tuple(x for x in scope['path'].split('/') if x)1131headers = dict(scope['headers'])11321133content_type = headers.get(1134b'content-type',1135b'application/octet-stream',1136)1137accepts = headers.get(b'accepts', content_type)1138func_name = headers.get(b's2-ef-name', b'')1139func_endpoint = self.endpoints.get(func_name)1140ignore_cancel = headers.get(b's2-ef-ignore-cancel', b'false') == b'true'11411142timer.metadata['function'] = func_name.decode('utf-8') if func_name else ''1143call_timer.metadata['function'] = timer.metadata['function']11441145func = None1146func_info: Dict[str, Any] = {}1147if func_endpoint is not None:1148func, func_info = func_endpoint11491150# Call the endpoint1151if method == 'POST' and func is not None and path == self.invoke_path:11521153self.logger.info(1154'Function call initiated',1155extra={1156'app_name': self.name,1157'request_id': request_id,1158'function_name': func_name.decode('utf-8'),1159'content_type': content_type.decode('utf-8'),1160'accepts': accepts.decode('utf-8'),1161},1162)11631164args_data_format = func_info['args_data_format']1165returns_data_format = func_info['returns_data_format']1166data = []1167more_body = True1168with timer('receive_data'):1169while more_body:1170request = await receive()1171if request.get('type', '') == 'http.disconnect':1172raise RuntimeError('client disconnected')1173data.append(request['body'])1174more_body = request.get('more_body', False)11751176data_version = headers.get(b's2-ef-version', b'')1177input_handler = self.handlers[(content_type, data_version, args_data_format)]1178output_handler = self.handlers[(accepts, data_version, returns_data_format)]11791180try:1181all_tasks = []1182result = []11831184cancel_event = threading.Event()11851186with timer('parse_input'):1187inputs = input_handler['load']( # type: ignore1188func_info['colspec'], b''.join(data),1189)11901191func_task = asyncio.create_task(1192func(cancel_event, call_timer, *inputs)1193if func_info['is_async']1194else to_thread(1195lambda: asyncio.run(1196func(cancel_event, call_timer, *inputs),1197),1198),1199)1200disconnect_task = asyncio.create_task(1201asyncio.sleep(int(1e9))1202if ignore_cancel else cancel_on_disconnect(receive),1203)1204timeout_task = asyncio.create_task(1205cancel_on_timeout(func_info['timeout']),1206)12071208all_tasks += [func_task, disconnect_task, timeout_task]12091210async with timer('function_wrapper'):1211done, pending = await asyncio.wait(1212all_tasks, return_when=asyncio.FIRST_COMPLETED,1213)12141215await cancel_all_tasks(pending)12161217for task in done:1218if task is disconnect_task:1219cancel_event.set()1220raise asyncio.CancelledError(1221'Function call was cancelled by client disconnect',1222)12231224elif task is timeout_task:1225cancel_event.set()1226raise asyncio.TimeoutError(1227'Function call was cancelled due to timeout',1228)12291230elif task is func_task:1231result.extend(task.result())12321233with timer('format_output'):1234body = output_handler['dump'](1235[x[1] for x in func_info['returns']], *result, # type: ignore1236)12371238await send(output_handler['response'])12391240except asyncio.TimeoutError:1241self.logger.exception(1242'Function call timeout',1243extra={1244'app_name': self.name,1245'request_id': request_id,1246'function_name': func_name.decode('utf-8'),1247'timeout': func_info['timeout'],1248},1249)1250body = (1251'TimeoutError: Function call timed out after ' +1252str(func_info['timeout']) +1253' seconds'1254).encode('utf-8')1255await send(self.timeout_response_dict)12561257except asyncio.CancelledError:1258self.logger.exception(1259'Function call cancelled',1260extra={1261'app_name': self.name,1262'request_id': request_id,1263'function_name': func_name.decode('utf-8'),1264},1265)1266body = b'CancelledError: Function call was cancelled'1267await send(self.cancel_response_dict)12681269except Exception as e:1270self.logger.exception(1271'Function call error',1272extra={1273'app_name': self.name,1274'request_id': request_id,1275'function_name': func_name.decode('utf-8'),1276'exception_type': type(e).__name__,1277},1278)1279msg = traceback.format_exc().strip().split(' File ')[-1]1280if msg.startswith('"/tmp/ipykernel_'):1281msg = 'Line ' + msg.split(', line ')[-1]1282else:1283msg = 'File ' + msg1284body = msg.encode('utf-8')1285await send(self.error_response_dict)12861287finally:1288await cancel_all_tasks(all_tasks)12891290# Handle api reflection1291elif method == 'GET' and path == self.show_create_function_path:1292host = headers.get(b'host', b'localhost:80')1293reflected_url = f'{scope["scheme"]}://{host.decode("utf-8")}/invoke'12941295syntax = []1296for key, (endpoint, endpoint_info) in self.endpoints.items():1297if not func_name or key == func_name:1298syntax.append(1299signature_to_sql(1300endpoint_info['signature'],1301url=self.url or reflected_url,1302data_format=self.data_format,1303database=self.function_database or None,1304),1305)1306body = '\n'.join(syntax).encode('utf-8')13071308await send(self.text_response_dict)13091310# Return function info1311elif method == 'GET' and (path == self.show_function_info_path or not path):1312functions = self.get_function_info()1313body = json.dumps(dict(functions=functions)).encode('utf-8')1314await send(self.text_response_dict)13151316# Return status1317elif method == 'GET' and path == self.status:1318body = json.dumps(dict(status='ok')).encode('utf-8')1319await send(self.text_response_dict)13201321# Path not found1322else:1323body = b''1324await send(self.path_not_found_response_dict)13251326# Send body1327with timer('send_response'):1328out = self.body_response_dict.copy()1329out['body'] = body1330await send(out)13311332for k, v in call_timer.metrics.items():1333timer.metrics[k] = v13341335if not self.disable_metrics:1336metrics = timer.finish()1337self.logger.info(1338'Function call metrics',1339extra={1340'app_name': self.name,1341'request_id': request_id,1342'function_name': timer.metadata.get('function', ''),1343'metrics': metrics,1344},1345)13461347def _create_link(1348self,1349config: Optional[Dict[str, Any]],1350credentials: Optional[Dict[str, Any]],1351) -> Tuple[str, str]:1352"""Generate CREATE LINK command."""1353if self.link_name:1354return self.link_name, ''13551356if not config and not credentials:1357return '', ''13581359link_name = f'py_ext_func_link_{secrets.token_hex(14)}'1360out = [f'CREATE LINK {link_name} AS HTTP']13611362if config:1363out.append(f"CONFIG '{json.dumps(config)}'")13641365if credentials:1366out.append(f"CREDENTIALS '{json.dumps(credentials)}'")13671368return link_name, ' '.join(out) + ';'13691370def _locate_app_functions(self, cur: Any) -> Tuple[Set[str], Set[str]]:1371"""Locate all current functions and links belonging to this app."""1372funcs, links = set(), set()1373if self.function_database:1374database_prefix = escape_name(self.function_database) + '.'1375cur.execute(f'SHOW FUNCTIONS IN {escape_name(self.function_database)}')1376else:1377database_prefix = ''1378cur.execute('SHOW FUNCTIONS')13791380for row in list(cur):1381name, ftype, link = row[0], row[1], row[-1]1382# Only look at external functions1383if 'external' not in ftype.lower():1384continue1385# See if function URL matches url1386cur.execute(f'SHOW CREATE FUNCTION {database_prefix}{escape_name(name)}')1387for fname, _, code, *_ in list(cur):1388m = re.search(r" (?:\w+) (?:SERVICE|MANAGED) '([^']+)'", code)1389if m and m.group(1) == self.url:1390funcs.add(f'{database_prefix}{escape_name(fname)}')1391if link and re.match(r'^py_ext_func_link_\S{14}$', link):1392links.add(link)13931394return funcs, links13951396def get_function_info(1397self,1398func_name: Optional[str] = None,1399) -> Dict[str, Any]:1400"""1401Return the functions and function signature information.14021403Returns1404-------1405Dict[str, Any]14061407"""1408functions = {}1409no_default = object()14101411# Generate CREATE FUNCTION SQL for each function using get_create_functions1412create_sqls = self.get_create_functions(replace=True)1413sql_map = {}1414for (_, info), sql in zip(self.endpoints.values(), create_sqls):1415sig = info['signature']1416sql_map[sig['name']] = sql14171418for key, (func, info) in self.endpoints.items():1419# Get info from docstring1420doc_summary = ''1421doc_long_description = ''1422doc_params = {}1423doc_returns = None1424doc_examples = []1425if func.__doc__:1426try:1427docs = parse(func.__doc__)1428doc_params = {p.arg_name: p for p in docs.params}1429doc_returns = docs.returns1430if not docs.short_description and docs.long_description:1431doc_summary = docs.long_description or ''1432else:1433doc_summary = docs.short_description or ''1434doc_long_description = docs.long_description or ''1435for ex in docs.examples:1436ex_dict: Dict[str, Any] = {1437'description': None,1438'code': None,1439'output': None,1440}1441if ex.description:1442ex_dict['description'] = ex.description1443if ex.snippet:1444code, output = [], []1445for line in ex.snippet.split('\n'):1446line = line.rstrip()1447if re.match(r'^(\w+>|>>>|\.\.\.)', line):1448code.append(line)1449else:1450output.append(line)1451ex_dict['code'] = '\n'.join(code) or None1452ex_dict['output'] = '\n'.join(output) or None1453if ex.post_snippet:1454ex_dict['postscript'] = ex.post_snippet1455doc_examples.append(ex_dict)14561457except Exception as e:1458self.logger.warning(1459'Could not parse docstring for function',1460extra={1461'app_name': self.name,1462'function_name': key.decode('utf-8'),1463'error': str(e),1464},1465)14661467if not func_name or key == func_name:1468sig = info['signature']1469args = []14701471# Function arguments1472for i, a in enumerate(sig.get('args', [])):1473name = a['name']1474dtype = a['dtype']1475nullable = '?' in dtype1476args.append(1477dict(1478name=name,1479dtype=dtype.replace('?', ''),1480nullable=nullable,1481description=(doc_params[name].description or '')1482if name in doc_params else '',1483),1484)1485if a.get('default', no_default) is not no_default:1486args[-1]['default'] = a['default']14871488# Return values1489ret = sig.get('returns', [])1490returns = []14911492for a in ret:1493dtype = a['dtype']1494nullable = '?' in dtype1495returns.append(1496dict(1497dtype=dtype.replace('?', ''),1498nullable=nullable,1499description=doc_returns.description1500if doc_returns else '',1501),1502)1503if a.get('name', None):1504returns[-1]['name'] = a['name']1505if a.get('default', no_default) is not no_default:1506returns[-1]['default'] = a['default']15071508sql = sql_map.get(sig['name'], '')1509functions[sig['name']] = dict(1510args=args,1511returns=returns,1512function_type=info['function_type'],1513sql_statement=sql,1514summary=doc_summary,1515long_description=doc_long_description,1516examples=doc_examples,1517)15181519return functions15201521def get_create_functions(1522self,1523replace: bool = False,1524) -> List[str]:1525"""1526Generate CREATE FUNCTION code for all functions.15271528Parameters1529----------1530replace : bool, optional1531Should existing functions be replaced?15321533Returns1534-------1535List[str]15361537"""1538if not self.endpoints:1539return []15401541out = []1542link = ''1543if self.app_mode.lower() == 'remote':1544link, link_str = self._create_link(self.link_config, self.link_credentials)1545if link and link_str:1546out.append(link_str)15471548for key, (endpoint, endpoint_info) in self.endpoints.items():1549out.append(1550signature_to_sql(1551endpoint_info['signature'],1552url=self.url,1553data_format=self.data_format,1554app_mode=self.app_mode,1555replace=replace,1556link=link or None,1557database=self.function_database or None,1558),1559)15601561return out15621563def register_functions(1564self,1565*connection_args: Any,1566replace: bool = False,1567**connection_kwargs: Any,1568) -> None:1569"""1570Register functions with the database.15711572Parameters1573----------1574*connection_args : Any1575Database connection parameters1576replace : bool, optional1577Should existing functions be replaced?1578**connection_kwargs : Any1579Database connection parameters15801581"""1582with connection.connect(*connection_args, **connection_kwargs) as conn:1583with conn.cursor() as cur:1584if replace:1585funcs, links = self._locate_app_functions(cur)1586for fname in funcs:1587cur.execute(f'DROP FUNCTION IF EXISTS {fname}')1588for link in links:1589cur.execute(f'DROP LINK {link}')1590for func in self.get_create_functions(replace=replace):1591cur.execute(func)15921593def drop_functions(1594self,1595*connection_args: Any,1596**connection_kwargs: Any,1597) -> None:1598"""1599Drop registered functions from database.16001601Parameters1602----------1603*connection_args : Any1604Database connection parameters1605**connection_kwargs : Any1606Database connection parameters16071608"""1609with connection.connect(*connection_args, **connection_kwargs) as conn:1610with conn.cursor() as cur:1611funcs, links = self._locate_app_functions(cur)1612for fname in funcs:1613cur.execute(f'DROP FUNCTION IF EXISTS {fname}')1614for link in links:1615cur.execute(f'DROP LINK {link}')16161617async def call(1618self,1619name: str,1620data_in: io.BytesIO,1621data_out: io.BytesIO,1622data_format: Optional[str] = None,1623data_version: Optional[str] = None,1624) -> None:1625"""1626Call a function in the application.16271628Parameters1629----------1630name : str1631Name of the function to call1632data_in : io.BytesIO1633The input data rows1634data_out : io.BytesIO1635The output data rows1636data_format : str, optional1637The format of the input and output data1638data_version : str, optional1639The version of the data format16401641"""1642data_format = data_format or self.data_format1643data_version = data_version or self.data_version16441645async def receive() -> Dict[str, Any]:1646return dict(body=data_in.read())16471648async def send(content: Dict[str, Any]) -> None:1649status = content.get('status', 200)1650if status != 200:1651raise KeyError(f'error occurred when calling `{name}`: {status}')1652data_out.write(content.get('body', b''))16531654accepts = dict(1655json=b'application/json',1656rowdat_1=b'application/octet-stream',1657arrow=b'application/vnd.apache.arrow.file',1658)16591660# Mock an ASGI scope1661scope = dict(1662type='http',1663path='invoke',1664method='POST',1665headers={1666b'content-type': accepts[data_format.lower()],1667b'accepts': accepts[data_format.lower()],1668b's2-ef-name': name.encode('utf-8'),1669b's2-ef-version': data_version.encode('utf-8'),1670b's2-ef-ignore-cancel': b'true',1671},1672)16731674await self(scope, receive, send)16751676def to_environment(1677self,1678name: str,1679destination: str = '.',1680version: Optional[str] = None,1681dependencies: Optional[List[str]] = None,1682authors: Optional[List[Dict[str, str]]] = None,1683maintainers: Optional[List[Dict[str, str]]] = None,1684description: Optional[str] = None,1685container_service: Optional[Dict[str, Any]] = None,1686external_function: Optional[Dict[str, Any]] = None,1687external_function_remote: Optional[Dict[str, Any]] = None,1688external_function_collocated: Optional[Dict[str, Any]] = None,1689overwrite: bool = False,1690) -> None:1691"""1692Convert application to an environment file.16931694Parameters1695----------1696name : str1697Name of the output environment1698destination : str, optional1699Location of the output file1700version : str, optional1701Version of the package1702dependencies : List[str], optional1703List of dependency specifications like in a requirements.txt file1704authors : List[Dict[str, Any]], optional1705Dictionaries of author information. Keys may include: email, name1706maintainers : List[Dict[str, Any]], optional1707Dictionaries of maintainer information. Keys may include: email, name1708description : str, optional1709Description of package1710container_service : Dict[str, Any], optional1711Container service specifications1712external_function : Dict[str, Any], optional1713External function specifications (applies to both remote and collocated)1714external_function_remote : Dict[str, Any], optional1715Remote external function specifications1716external_function_collocated : Dict[str, Any], optional1717Collocated external function specifications1718overwrite : bool, optional1719Should destination file be overwritten if it exists?17201721"""1722if not has_cloudpickle:1723raise RuntimeError('the cloudpicke package is required for this operation')17241725# Write to temporary location if a remote destination is specified1726tmpdir = None1727if destination.startswith('stage://'):1728tmpdir = tempfile.TemporaryDirectory()1729local_path = os.path.join(tmpdir.name, f'{name}.env')1730else:1731local_path = os.path.join(destination, f'{name}.env')1732if not overwrite and os.path.exists(local_path):1733raise OSError(f'path already exists: {local_path}')17341735with zipfile.ZipFile(local_path, mode='w') as z:1736# Write metadata1737z.writestr(1738'pyproject.toml', utils.to_toml({1739'project': dict(1740name=name,1741version=version,1742dependencies=dependencies,1743requires_python='== ' +1744'.'.join(str(x) for x in sys.version_info[:3]),1745authors=authors,1746maintainers=maintainers,1747description=description,1748),1749'tool.container-service': container_service,1750'tool.external-function': external_function,1751'tool.external-function.remote': external_function_remote,1752'tool.external-function.collocated': external_function_collocated,1753}),1754)17551756# Write Python package1757z.writestr(1758f'{name}/__init__.py',1759textwrap.dedent(f'''1760import pickle as _pkl1761globals().update(1762_pkl.loads({cloudpickle.dumps(self.external_functions)}),1763)1764__all__ = {list(self.external_functions.keys())}''').strip(),1765)17661767# Upload to Stage as needed1768if destination.startswith('stage://'):1769url = urllib.parse.urlparse(re.sub(r'/+$', r'', destination) + '/')1770if not url.path or url.path == '/':1771raise ValueError(f'no stage path was specified: {destination}')17721773mgr = manage_workspaces()1774if url.hostname:1775wsg = mgr.get_workspace_group(url.hostname)1776elif os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP'):1777wsg = mgr.get_workspace_group(1778os.environ['SINGLESTOREDB_WORKSPACE_GROUP'],1779)1780else:1781raise ValueError(f'no workspace group specified: {destination}')17821783# Make intermediate directories1784if url.path.count('/') > 1:1785wsg.stage.mkdirs(os.path.dirname(url.path))17861787wsg.stage.upload_file(1788local_path, url.path + f'{name}.env',1789overwrite=overwrite,1790)1791os.remove(local_path)179217931794def main(argv: Optional[List[str]] = None) -> None:1795"""1796Main program for HTTP-based Python UDFs17971798Parameters1799----------1800argv : List[str], optional1801List of command-line parameters18021803"""1804try:1805import uvicorn1806except ImportError:1807raise ImportError('the uvicorn package is required to run this command')18081809# Should we run in embedded mode (typically for Jupyter)1810try:1811asyncio.get_running_loop()1812use_async = True1813except RuntimeError:1814use_async = False18151816# Temporary directory for Stage environment files1817tmpdir = None18181819# Depending on whether we find an environment file specified, we1820# may have to process the command line twice.1821functions = []1822defaults: Dict[str, Any] = {}1823for i in range(2):18241825parser = argparse.ArgumentParser(1826prog='python -m singlestoredb.functions.ext.asgi',1827description='Run an HTTP-based Python UDF server',1828)1829parser.add_argument(1830'--url', metavar='url',1831default=defaults.get(1832'url',1833get_option('external_function.url'),1834),1835help='URL of the UDF server endpoint',1836)1837parser.add_argument(1838'--host', metavar='host',1839default=defaults.get(1840'host',1841get_option('external_function.host'),1842),1843help='bind socket to this host',1844)1845parser.add_argument(1846'--port', metavar='port', type=int,1847default=defaults.get(1848'port',1849get_option('external_function.port'),1850),1851help='bind socket to this port',1852)1853parser.add_argument(1854'--db', metavar='conn-str',1855default=defaults.get(1856'connection',1857get_option('external_function.connection'),1858),1859help='connection string to use for registering functions',1860)1861parser.add_argument(1862'--replace-existing', action='store_true',1863help='should existing functions of the same name '1864'in the database be replaced?',1865)1866parser.add_argument(1867'--data-format', metavar='format',1868default=defaults.get(1869'data_format',1870get_option('external_function.data_format'),1871),1872choices=['rowdat_1', 'json'],1873help='format of the data rows',1874)1875parser.add_argument(1876'--data-version', metavar='version',1877default=defaults.get(1878'data_version',1879get_option('external_function.data_version'),1880),1881help='version of the data row format',1882)1883parser.add_argument(1884'--link-name', metavar='name',1885default=defaults.get(1886'link_name',1887get_option('external_function.link_name'),1888) or '',1889help='name of the link to use for connections',1890)1891parser.add_argument(1892'--link-config', metavar='json',1893default=str(1894defaults.get(1895'link_config',1896get_option('external_function.link_config'),1897) or '{}',1898),1899help='link config in JSON format',1900)1901parser.add_argument(1902'--link-credentials', metavar='json',1903default=str(1904defaults.get(1905'link_credentials',1906get_option('external_function.link_credentials'),1907) or '{}',1908),1909help='link credentials in JSON format',1910)1911parser.add_argument(1912'--log-level', metavar='[info|debug|warning|error]',1913default=defaults.get(1914'log_level',1915get_option('external_function.log_level'),1916),1917help='logging level',1918)1919parser.add_argument(1920'--log-file', metavar='filepath',1921default=defaults.get(1922'log_file',1923get_option('external_function.log_file'),1924),1925help='File path to write logs to instead of console',1926)1927parser.add_argument(1928'--disable-metrics', action='store_true',1929default=defaults.get(1930'disable_metrics',1931get_option('external_function.disable_metrics'),1932),1933help='Disable logging of function call metrics',1934)1935parser.add_argument(1936'--name-prefix', metavar='name_prefix',1937default=defaults.get(1938'name_prefix',1939get_option('external_function.name_prefix'),1940),1941help='Prefix to add to function names',1942)1943parser.add_argument(1944'--name-suffix', metavar='name_suffix',1945default=defaults.get(1946'name_suffix',1947get_option('external_function.name_suffix'),1948),1949help='Suffix to add to function names',1950)1951parser.add_argument(1952'--function-database', metavar='function_database',1953default=defaults.get(1954'function_database',1955get_option('external_function.function_database'),1956),1957help='Database to use for the function definition',1958)1959parser.add_argument(1960'--app-name', metavar='app_name',1961default=defaults.get(1962'app_name',1963get_option('external_function.app_name'),1964),1965help='Name for the application instance',1966)1967parser.add_argument(1968'functions', metavar='module.or.func.path', nargs='*',1969help='functions or modules to export in UDF server',1970)19711972args = parser.parse_args(argv)19731974if i > 0:1975break19761977# Download Stage files as needed1978for i, f in enumerate(args.functions):1979if f.startswith('stage://'):1980url = urllib.parse.urlparse(f)1981if not url.path or url.path == '/':1982raise ValueError(f'no stage path was specified: {f}')1983if url.path.endswith('/'):1984raise ValueError(f'an environment file must be specified: {f}')19851986mgr = manage_workspaces()1987if url.hostname:1988wsg = mgr.get_workspace_group(url.hostname)1989elif os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP'):1990wsg = mgr.get_workspace_group(1991os.environ['SINGLESTOREDB_WORKSPACE_GROUP'],1992)1993else:1994raise ValueError(f'no workspace group specified: {f}')19951996if tmpdir is None:1997tmpdir = tempfile.TemporaryDirectory()19981999local_path = os.path.join(tmpdir.name, url.path.split('/')[-1])2000wsg.stage.download_file(url.path, local_path)2001args.functions[i] = local_path20022003elif f.startswith('http://') or f.startswith('https://'):2004if tmpdir is None:2005tmpdir = tempfile.TemporaryDirectory()20062007local_path = os.path.join(tmpdir.name, f.split('/')[-1])2008urllib.request.urlretrieve(f, local_path)2009args.functions[i] = local_path20102011# See if any of the args are zip files (assume they are environment files)2012modules = [(x, zipfile.is_zipfile(x)) for x in args.functions]2013envs = [x[0] for x in modules if x[1]]2014others = [x[0] for x in modules if not x[1]]20152016if envs and len(envs) > 1:2017raise RuntimeError('only one environment file may be specified')20182019if envs and others:2020raise RuntimeError('environment files and other modules can not be mixed.')20212022# See if an environment file was specified. If so, use those settings2023# as the defaults and reprocess command line.2024if envs:2025# Add pyproject.toml variables and redo command-line processing2026defaults = utils.read_config(2027envs[0],2028['tool.external-function', 'tool.external-function.remote'],2029)20302031# Load zip file as a module2032modname = os.path.splitext(os.path.basename(envs[0]))[0]2033zi = zipimport.zipimporter(envs[0])2034mod = zi.load_module(modname)2035if mod is None:2036raise RuntimeError(f'environment file could not be imported: {envs[0]}')2037functions = [mod]20382039if defaults:2040continue20412042args.functions = functions or args.functions or None2043args.replace_existing = args.replace_existing \2044or defaults.get('replace_existing') \2045or get_option('external_function.replace_existing')20462047# Substitute in host / port if specified2048if args.host != defaults.get('host') or args.port != defaults.get('port'):2049u = urllib.parse.urlparse(args.url)2050args.url = u._replace(netloc=f'{args.host}:{args.port}').geturl()20512052# Create application from functions / module2053app = Application(2054functions=args.functions,2055url=args.url,2056data_format=args.data_format,2057data_version=args.data_version,2058link_name=args.link_name or None,2059link_config=json.loads(args.link_config) or None,2060link_credentials=json.loads(args.link_credentials) or None,2061app_mode='remote',2062name_prefix=args.name_prefix,2063name_suffix=args.name_suffix,2064function_database=args.function_database or None,2065log_file=args.log_file,2066log_level=args.log_level,2067disable_metrics=args.disable_metrics,2068app_name=args.app_name,2069)20702071funcs = app.get_create_functions(replace=args.replace_existing)2072if not funcs:2073raise RuntimeError('no functions specified')20742075for f in funcs:2076app.logger.info(f)20772078try:2079if args.db:2080app.logger.info('Registering functions with database')2081app.register_functions(2082args.db,2083replace=args.replace_existing,2084)20852086app_args = {2087k: v for k, v in dict(2088host=args.host or None,2089port=args.port or None,2090log_level=args.log_level,2091lifespan='off',2092).items() if v is not None2093}20942095# Configure uvicorn logging to use JSON format matching Application's format2096app_args['log_config'] = app.get_uvicorn_log_config()20972098if use_async:2099asyncio.create_task(_run_uvicorn(uvicorn, app, app_args, db=args.db))2100else:2101uvicorn.run(app, **app_args)21022103finally:2104if not use_async and args.db:2105app.logger.info('Dropping functions from database')2106app.drop_functions(args.db)210721082109async def _run_uvicorn(2110uvicorn: Any,2111app: Any,2112app_args: Any,2113db: Optional[str] = None,2114) -> None:2115"""Run uvicorn server and clean up functions after shutdown."""2116await uvicorn.Server(uvicorn.Config(app, **app_args)).serve()2117if db:2118app.logger.info('Dropping functions from database')2119app.drop_functions(db)212021212122create_app = Application212321242125if __name__ == '__main__':2126try:2127main()2128except RuntimeError as exc:2129logger.error(str(exc))2130sys.exit(1)2131except KeyboardInterrupt:2132pass213321342135