Path: blob/main/singlestoredb/functions/ext/mmap.py
469 views
#!/usr/bin/env python1"""2Module for creating collocated Python UDFs34This module implements the collocated form of external functions for5SingleStoreDB. This mode uses a socket for control communications6and memory mapped files for passing the data to and from the UDF.78The command below is a sample invocation. It exports all functions9within the `myfuncs` Python module that have a `@udf` decorator on10them. The `--db` option specifies a database connection string.11If this exists, the UDF application will connect to the database12and register all functions. The `--replace-existing` option indicates13that existing functions should be replaced::1415python -m singlestoredb.functions.ext.mmap \16--db=root:@127.0.0.1:9306/cosmeticshop --replace-existing \17myfuncs1819The `myfuncs` package can be any Python package in your Python path.20It must contain functions marked with a `@udf` decorator and the21types must be annotated or specified using the `@udf` decorator22similar to the following::2324from singlestoredb.functions import udf2526@udf27def print_it(x2: float, x3: str) -> str:28return int(x2) * x32930@udf.pandas31def print_it_pandas(x2: float, x3: str) -> str:32return x2.astype(np.int64) * x3.astype(str)3334With the functions registered, you can now run the UDFs::3536SELECT print_it(3.14, 'my string');37SELECT print_it_pandas(3.14, 'my string');3839"""40import argparse41import array42import asyncio43import io44import logging45import mmap46import multiprocessing47import os48import secrets49import socket50import struct51import sys52import tempfile53import threading54import traceback55import urllib56import zipfile57from typing import Any58from typing import Dict59from typing import List60from typing import Optional6162from . import asgi63from . import utils64from ... import manage_workspaces65from ...config import get_option666768logger = utils.get_logger('singlestoredb.functions.ext.mmap')697071def _handle_request(app: Any, connection: Any, client_address: Any) -> None:72"""73Handle function call request.7475Parameters:76app : ASGI app77An ASGI application from the singlestoredb.functions.ext.asgi module78connection : socket connection79Socket connection for function control messages80client_address : string81Address of connecting client8283"""84logger.info('connection from {}'.format(str(connection).split(', ')[0][-4:]))8586# Receive the request header. Format:87# server version: uint6488# length of function name: uint6489buf = connection.recv(16)90version, namelen = struct.unpack('<qq', buf)9192# Python's recvmsg returns a tuple. We only really care about the first93# two parts. The recvmsg call has a weird way of specifying the size for94# the file descriptor array; basically, we're indicating we want to read95# two 32-bit ints (for the input and output files).96fd_model = array.array('i', [0, 0])97msg, ancdata, flags, addr = connection.recvmsg(98namelen,99socket.CMSG_LEN(2 * fd_model.itemsize),100)101assert len(ancdata) == 1102103# The function's name will be in the "message" area of the recvmsg response.104# It will be populated with `namelen` bytes.105name = msg.decode('utf8')106107# Two file descriptors are transferred to us from the database via the108# `sendmsg` protocol. These are for reading the input rows and writing109# the output rows, respectively.110fd0, fd1 = struct.unpack('<ii', ancdata[0][2])111ifile = os.fdopen(fd0, 'rb')112ofile = os.fdopen(fd1, 'wb')113114# Keep receiving data on this socket until we run out.115while True:116117# Read in the length of this row, a uint64. No data means we're done118# receiving.119buf = connection.recv(8)120if not buf:121break122length = struct.unpack('<q', buf)[0]123if not length:124break125126# Map in the input shared memory segment from the fd we received via127# recvmsg.128mem = mmap.mmap(129ifile.fileno(),130length,131mmap.MAP_SHARED,132mmap.PROT_READ,133)134135# Read row data136response_size = 0137out = io.BytesIO()138139ifile.seek(0)140try:141# Run the function142asyncio.run(143app.call(144name,145io.BytesIO(ifile.read(length)),146out,147data_format='rowdat_1',148data_version='1.0',149),150)151152# Write results153buf = out.getbuffer()154response_size = len(buf)155ofile.truncate(max(128*1024, response_size))156ofile.seek(0)157ofile.write(buf)158ofile.flush()159160# Complete the request by send back the status as two uint64s on the161# socket:162# - http status163# - size of data in output shared memory164connection.send(struct.pack('<qq', 200, response_size))165166except Exception as exc:167errmsg = f'error occurred in executing function `{name}`: {exc}\n'168logger.error(errmsg.rstrip())169for line in traceback.format_exception(exc): # type: ignore170logger.error(line.rstrip())171connection.send(172struct.pack(173f'<qq{len(errmsg)}s', 500,174len(errmsg), errmsg.encode('utf8'),175),176)177break178179finally:180# Close the shared memory object.181mem.close()182183# Close shared memory files.184ifile.close()185ofile.close()186187# Close the connection188connection.close()189190191def main(argv: Optional[List[str]] = None) -> None:192"""193Main program for collocated Python UDFs194195Parameters196----------197argv : List[str], optional198List of command-line parameters199200"""201tmpdir = None202functions = []203defaults: Dict[str, Any] = {}204for i in range(2):205parser = argparse.ArgumentParser(206prog='python -m singlestoredb.functions.ext.mmap',207description='Run a collacated Python UDF server',208)209parser.add_argument(210'--max-connections', metavar='n', type=int,211default=get_option('external_function.max_connections'),212help='maximum number of server connections before refusing them',213)214parser.add_argument(215'--single-thread', action='store_true',216default=get_option('external_function.single_thread'),217help='should the server run in single-thread mode?',218)219parser.add_argument(220'--socket-path', metavar='file-path',221default=(222get_option('external_function.socket_path') or223os.path.join(tempfile.gettempdir(), secrets.token_hex(16))224),225help='path to communications socket',226)227parser.add_argument(228'--db', metavar='conn-str',229default=os.environ.get('SINGLESTOREDB_URL', ''),230help='connection string to use for registering functions',231)232parser.add_argument(233'--replace-existing', action='store_true',234help='should existing functions of the same name '235'in the database be replaced?',236)237parser.add_argument(238'--log-level', metavar='[info|debug|warning|error]',239default=get_option('external_function.log_level'),240help='logging level',241)242parser.add_argument(243'--process-mode', metavar='[thread|subprocess]',244default=get_option('external_function.process_mode'),245help='how to handle concurrent handlers',246)247parser.add_argument(248'functions', metavar='module.or.func.path', nargs='*',249help='functions or modules to export in UDF server',250)251252args = parser.parse_args(argv)253254logger.setLevel(getattr(logging, args.log_level.upper()))255256if i > 0:257break258259# Download Stage files as needed260for i, f in enumerate(args.functions):261if f.startswith('stage://'):262url = urllib.parse.urlparse(f)263if not url.path or url.path == '/':264raise ValueError(f'no stage path was specified: {f}')265if url.path.endswith('/'):266raise ValueError(f'an environment file must be specified: {f}')267268mgr = manage_workspaces()269if url.hostname:270wsg = mgr.get_workspace_group(url.hostname)271elif os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP'):272wsg = mgr.get_workspace_group(273os.environ['SINGLESTOREDB_WORKSPACE_GROUP'],274)275else:276raise ValueError(f'no workspace group specified: {f}')277278if tmpdir is None:279tmpdir = tempfile.TemporaryDirectory()280281local_path = os.path.join(tmpdir.name, url.path.split('/')[-1])282wsg.stage.download_file(url.path, local_path)283args.functions[i] = local_path284285elif f.startswith('http://') or f.startswith('https://'):286if tmpdir is None:287tmpdir = tempfile.TemporaryDirectory()288289local_path = os.path.join(tmpdir.name, f.split('/')[-1])290urllib.request.urlretrieve(f, local_path)291args.functions[i] = local_path292293# See if any of the args are zip files (assume they are environment files)294modules = [(x, zipfile.is_zipfile(x)) for x in args.functions]295envs = [x[0] for x in modules if x[1]]296others = [x[0] for x in modules if not x[1]]297298if envs and len(envs) > 1:299raise RuntimeError('only one environment file may be specified.')300301if envs and others:302raise RuntimeError('environment files and other modules can not be mixed.')303304# See if an environment file was specified. If so, use those settings305# as the defaults and reprocess command line.306if envs:307# Add zip file to the Python path308sys.path.insert(0, envs[0])309functions = [os.path.splitext(os.path.basename(envs[0]))[0]]310311# Add pyproject.toml variables and redo command-line processing312defaults = utils.read_config(313envs[0],314['tool.external_function', 'tool.external-function.collocated'],315)316if defaults:317continue318319args.functions = functions or args.functions or None320args.replace_existing = args.replace_existing \321or defaults.get('replace_existing') \322or get_option('external_function.replace_existing')323324if os.path.exists(args.socket_path):325try:326os.unlink(args.socket_path)327except (IOError, OSError):328raise RuntimeError(329f'could not remove existing socket path: {args.socket_path}',330)331332# Create application from functions / module333app = asgi.create_app(334functions=args.functions,335url=args.socket_path,336data_format='rowdat_1',337app_mode='collocated',338)339340funcs = app.get_create_functions(replace=args.replace_existing)341if not funcs:342raise RuntimeError('no functions specified')343344for f in funcs:345logger.info(f'function: {f}')346347# Create the Unix socket server.348server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)349350# Bind our server to the path.351server.bind(args.socket_path)352353logger.info(f'using socket path: {args.socket_path}')354355# Listen for incoming connections. Argument is the number of connections to356# keep in the backlog before we begin refusing them; 32 is plenty for this357# simple case.358server.listen(args.max_connections)359360try:361# Register functions with database362if args.db:363logger.info('registering functions with database')364app.register_functions(args.db, replace=args.replace_existing)365366# Accept connections forever.367while True:368# Listen for the next connection on our port.369connection, client_address = server.accept()370371if args.process_mode == 'thread':372tcls = threading.Thread373else:374tcls = multiprocessing.Process # type: ignore375376t = tcls(377target=_handle_request,378args=(app, connection, client_address),379)380381t.start()382383# NOTE: The following line forces this process to handle requests384# serially. This makes it easier to understand what's going on.385# In real life, though, parallel is much faster. To use parallel386# handling, just comment out the next line.387if args.single_thread:388t.join()389390except KeyboardInterrupt:391return392393finally:394if args.db:395logger.info('dropping functions from database')396app.drop_functions(args.db)397398# Remove the socket file before we exit.399try:400os.unlink(args.socket_path)401except (IOError, OSError):402logger.error(f'could not remove socket path: {args.socket_path}')403404405if __name__ == '__main__':406try:407main()408except RuntimeError as exc:409logger.error(str(exc))410sys.exit(1)411except KeyboardInterrupt:412pass413414415