Path: blob/main/singlestoredb/connection.py
801 views
#!/usr/bin/env python1"""SingleStoreDB connections and cursors."""2import abc3import functools4import inspect5import io6import queue7import re8import sys9import warnings10import weakref11from collections.abc import Iterator12from collections.abc import Mapping13from collections.abc import MutableMapping14from collections.abc import Sequence15from typing import Any16from typing import Callable17from typing import Dict18from typing import List19from typing import Optional20from typing import Tuple21from typing import Union22from urllib.parse import parse_qs23from urllib.parse import unquote_plus24from urllib.parse import urlparse2526import sqlparams27try:28from pandas import DataFrame29except ImportError:30class DataFrame(object): # type: ignore31def itertuples(self, *args: Any, **kwargs: Any) -> None:32pass3334from . import auth35from . import exceptions36from .config import get_option37from .utils.results import Description38from .utils.results import Result3940if sys.version_info < (3, 10):41InfileQueue = queue.Queue42else:43InfileQueue = queue.Queue[Union[bytes, str]]444546# DB-API settings47apilevel = '2.0'48threadsafety = 149paramstyle = map_paramstyle = 'pyformat'50positional_paramstyle = 'format'515253# Type codes for character-based columns54CHAR_COLUMNS = set(list(range(247, 256)) + [245])555657def under2camel(s: str) -> str:58"""Format underscore-delimited strings to camel-case."""5960def upper_mid(m: Any) -> str:61"""Uppercase middle group of matches."""62return m.group(1) + m.group(2).upper() + m.group(3)6364def upper(m: Any) -> str:65"""Uppercase match."""66return m.group(1).upper()6768s = re.sub(r'(\b|_)(xml|sql|json)(\b|_)', upper_mid, s, flags=re.I)69s = re.sub(r'(?:^|_+)(\w)', upper, s)70s = re.sub(r'_+$', r'', s)7172return s737475def nested_converter(76conv: Callable[[Any], Any],77inner: Callable[[Any], Any],78) -> Callable[[Any], Any]:79"""Create a pipeline of two functions."""80def converter(value: Any) -> Any:81return conv(inner(value))82return converter838485def cast_bool_param(val: Any) -> bool:86"""Cast value to a bool."""87if val is None or val is False:88return False8990if val is True:91return True9293# Test ints94try:95ival = int(val)96if ival == 1:97return True98if ival == 0:99return False100except Exception:101pass102103# Lowercase strings104if hasattr(val, 'lower'):105if val.lower() in ['on', 't', 'true', 'y', 'yes', 'enabled', 'enable']:106return True107elif val.lower() in ['off', 'f', 'false', 'n', 'no', 'disabled', 'disable']:108return False109110raise ValueError('Unrecognized value for bool: {}'.format(val))111112113def build_params(**kwargs: Any) -> Dict[str, Any]:114"""115Construct connection parameters from given URL and arbitrary parameters.116117Parameters118----------119**kwargs : keyword-parameters, optional120Arbitrary keyword parameters corresponding to connection parameters121122Returns123-------124dict125126"""127out: Dict[str, Any] = {}128129kwargs = {k: v for k, v in kwargs.items() if v is not None}130131# Set known parameters132for name in inspect.getfullargspec(connect).args:133if name == 'conv':134out[name] = kwargs.get(name, None)135elif name == 'results_format': # deprecated136if kwargs.get(name, None) is not None:137warnings.warn(138'The `results_format=` parameter has been '139'renamed to `results_type=`.',140DeprecationWarning,141)142out['results_type'] = kwargs.get(name, get_option('results.type'))143elif name == 'results_type':144out[name] = kwargs.get(name, get_option('results.type'))145else:146out[name] = kwargs.get(name, get_option(name))147148# See if host actually contains a URL; definitely not a perfect test.149host = out['host']150if host and (':' in host or '/' in host or '@' in host or '?' in host):151urlp = _parse_url(host)152if 'driver' not in urlp:153urlp['driver'] = get_option('driver')154out.update(urlp)155156out = _cast_params(out)157158# Set default port based on driver.159if 'port' not in out or not out['port']:160if out['driver'] == 'http':161out['port'] = int(get_option('http_port') or 80)162elif out['driver'] == 'https':163out['port'] = int(get_option('http_port') or 443)164else:165out['port'] = int(get_option('port') or 3306)166167# If there is no user and the password is empty, remove the password key.168if 'user' not in out and not out.get('password', None):169out.pop('password', None)170171if out.get('ssl_ca', '') and not out.get('ssl_verify_cert', None):172out['ssl_verify_cert'] = True173174return out175176177def _get_param_types(func: Any) -> Dict[str, Any]:178"""179Retrieve the types for the parameters to the given function.180181Note that if a parameter has multiple possible types, only the182first one is returned.183184Parameters185----------186func : callable187Callable object to inspect the parameters of188189Returns190-------191dict192193"""194out = {}195args = inspect.getfullargspec(func)196for name in args.args:197ann = args.annotations[name]198if isinstance(ann, str):199ann = eval(ann)200if hasattr(ann, '__args__'):201out[name] = ann.__args__[0]202else:203out[name] = ann204return out205206207def _cast_params(params: Dict[str, Any]) -> Dict[str, Any]:208"""209Cast known keys to appropriate values.210211Parameters212----------213params : dict214Dictionary of connection parameters215216Returns217-------218dict219220"""221param_types = _get_param_types(connect)222out = {}223for key, val in params.items():224key = key.lower()225if val is None:226continue227if key not in param_types:228raise ValueError('Unrecognized connection parameter: {}'.format(key))229dtype = param_types[key]230if dtype is bool:231val = cast_bool_param(val)232elif getattr(dtype, '_name', '') in ['Dict', 'Mapping'] or \233str(dtype).startswith('typing.Dict'):234val = dict(val)235elif getattr(dtype, '_name', '') == 'List':236val = list(val)237elif getattr(dtype, '_name', '') == 'Tuple':238val = tuple(val)239else:240val = dtype(val)241out[key] = val242return out243244245def _parse_url(url: str) -> Dict[str, Any]:246"""247Parse a connection URL and return only the defined parts.248249Parameters250----------251url : str252The URL passed in can be a full URL or a partial URL. At a minimum,253a host name must be specified. All other parts are optional.254255Returns256-------257dict258259"""260out: Dict[str, Any] = {}261262if '//' not in url:263url = '//' + url264265if url.startswith('singlestoredb+'):266url = re.sub(r'^singlestoredb\+', r'', url)267268parts = urlparse(url, scheme='singlestoredb', allow_fragments=True)269270url_db = parts.path271if url_db.startswith('/'):272url_db = url_db.split('/')[1].strip()273url_db = url_db.split('/')[0].strip() or ''274275# Retrieve basic connection parameters276out['host'] = parts.hostname or None277out['port'] = parts.port or None278out['database'] = url_db or None279out['user'] = parts.username or None280281# Allow an empty string for password282if out['user'] and parts.password is not None:283out['password'] = parts.password284285if parts.scheme != 'singlestoredb':286out['driver'] = parts.scheme.lower()287288if out.get('user'):289out['user'] = unquote_plus(out['user'])290291if out.get('password'):292out['password'] = unquote_plus(out['password'])293294if out.get('database'):295out['database'] = unquote_plus(out['database'])296297# Convert query string to parameters298out.update({k.lower(): v[-1] for k, v in parse_qs(parts.query).items()})299300return {k: v for k, v in out.items() if v is not None}301302303def _name_check(name: str) -> str:304"""305Make sure the given name is a legal variable name.306307Parameters308----------309name : str310Name to check311312Returns313-------314str315316"""317name = name.strip()318if not re.match(r'^[A-Za-z_][\w+_]*$', name):319raise ValueError('Name contains invalid characters')320return name321322323def quote_identifier(name: str) -> str:324"""Escape identifier value."""325return f'`{name}`'326327328class Driver(object):329"""Compatibility class for driver name."""330331def __init__(self, name: str):332self.name = name333334335class VariableAccessor(MutableMapping): # type: ignore336"""Variable accessor class."""337338def __init__(self, conn: 'Connection', vtype: str):339object.__setattr__(self, 'connection', weakref.proxy(conn))340object.__setattr__(self, 'vtype', vtype.lower())341if self.vtype not in [342'global', 'local', '',343'cluster', 'cluster global', 'cluster local',344]:345raise ValueError(346'Variable type must be global, local, cluster, '347'cluster global, cluster local, or empty',348)349350def _cast_value(self, value: Any) -> Any:351if isinstance(value, str):352if value.lower() in ['on', 'true']:353return True354if value.lower() in ['off', 'false']:355return False356return value357358def __getitem__(self, name: str) -> Any:359name = _name_check(name)360out = self.connection._iquery(361'show {} variables like %s;'.format(self.vtype),362[name],363)364if not out:365raise KeyError(f"No variable found with the name '{name}'.")366if len(out) > 1:367raise KeyError(f"Multiple variables found with the name '{name}'.")368return self._cast_value(out[0]['Value'])369370def __setitem__(self, name: str, value: Any) -> None:371name = _name_check(name)372if value is True:373value = 'ON'374elif value is False:375value = 'OFF'376if 'local' in self.vtype:377self.connection._iquery(378'set {} {}=%s;'.format(379self.vtype.replace('local', 'session'), name,380), [value],381)382else:383self.connection._iquery('set {} {}=%s;'.format(self.vtype, name), [value])384385def __delitem__(self, name: str) -> None:386raise TypeError('Variables can not be deleted.')387388def __getattr__(self, name: str) -> Any:389return self[name]390391def __setattr__(self, name: str, value: Any) -> None:392self[name] = value393394def __delattr__(self, name: str) -> None:395del self[name]396397def __len__(self) -> int:398out = self.connection._iquery('show {} variables;'.format(self.vtype))399return len(list(out))400401def __iter__(self) -> Iterator[str]:402out = self.connection._iquery('show {} variables;'.format(self.vtype))403return iter(list(x.values())[0] for x in out)404405406class Cursor(metaclass=abc.ABCMeta):407"""408Database cursor for submitting commands and queries.409410This object should not be instantiated directly.411The ``Connection.cursor`` method should be used.412413"""414415def __init__(self, connection: 'Connection'):416"""Call ``Connection.cursor`` instead."""417self.errorhandler = connection.errorhandler418self._connection: Optional[Connection] = weakref.proxy(connection)419420self._rownumber: Optional[int] = None421422self._description: Optional[List[Description]] = None423424#: Default batch size of ``fetchmany`` calls.425self.arraysize = get_option('results.arraysize')426427self._converters: List[428Tuple[429int, Optional[str],430Optional[Callable[..., Any]],431]432] = []433434#: Number of rows affected by the last query.435self.rowcount: int = -1436437self._messages: List[Tuple[int, str]] = []438439#: Row ID of the last modified row.440self.lastrowid: Optional[int] = None441442@property443def messages(self) -> List[Tuple[int, str]]:444"""Messages created by the server."""445return self._messages446447@abc.abstractproperty448def description(self) -> Optional[List[Description]]:449"""The field descriptions of the last query."""450return self._description451452@abc.abstractproperty453def rownumber(self) -> Optional[int]:454"""The last modified row number."""455return self._rownumber456457@property458def connection(self) -> Optional['Connection']:459"""the connection that the cursor belongs to."""460return self._connection461462@abc.abstractmethod463def callproc(464self, name: str,465params: Optional[Sequence[Any]] = None,466) -> None:467"""468Call a stored procedure.469470The result sets generated by a store procedure can be retrieved471like the results of any other query using :meth:`fetchone`,472:meth:`fetchmany`, or :meth:`fetchall`. If the procedure generates473multiple result sets, subsequent result sets can be accessed474using :meth:`nextset`.475476Examples477--------478>>> cur.callproc('myprocedure', ['arg1', 'arg2'])479>>> print(cur.fetchall())480481Parameters482----------483name : str484Name of the stored procedure485params : iterable, optional486Parameters to the stored procedure487488"""489# NOTE: The `callproc` interface varies quite a bit between drivers490# so it is implemented using `execute` here.491492if not self.is_connected():493raise exceptions.InterfaceError(2048, 'Cursor is closed.')494495name = _name_check(name)496497if not params:498self.execute(f'CALL {name}();')499else:500keys = ', '.join([f':{i+1}' for i in range(len(params))])501self.execute(f'CALL {name}({keys});', params)502503@abc.abstractmethod504def is_connected(self) -> bool:505"""Is the cursor still connected?"""506raise NotImplementedError507508@abc.abstractmethod509def close(self) -> None:510"""Close the cursor."""511raise NotImplementedError512513@abc.abstractmethod514def execute(515self, query: str,516args: Optional[Union[Sequence[Any], Dict[str, Any], Any]] = None,517infile_stream: Optional[ # type: ignore518Union[519io.RawIOBase,520io.TextIOBase,521Iterator[Union[bytes, str]],522InfileQueue,523]524] = None,525) -> int:526"""527Execute a SQL statement.528529Queries can use the ``format``-style parameters (``%s``) when using a530list of paramters or ``pyformat``-style parameters (``%(key)s``)531when using a dictionary of parameters.532533Parameters534----------535query : str536The SQL statement to execute537args : Sequence or dict, optional538Parameters to substitute into the SQL code539infile_stream : io.RawIOBase or io.TextIOBase or Iterator[bytes|str], optional540Data stream for ``LOCAL INFILE`` statement541542Examples543--------544Query with no parameters545546>>> cur.execute('select * from mytable')547548Query with positional parameters549550>>> cur.execute('select * from mytable where id < %s', [100])551552Query with named parameters553554>>> cur.execute('select * from mytable where id < %(max)s', dict(max=100))555556Returns557-------558Number of rows affected559560"""561raise NotImplementedError562563def executemany(564self, query: str,565args: Optional[Sequence[Union[Sequence[Any], Dict[str, Any], Any]]] = None,566) -> int:567"""568Execute SQL code against multiple sets of parameters.569570Queries can use the ``format``-style parameters (``%s``) when using571lists of paramters or ``pyformat``-style parameters (``%(key)s``)572when using dictionaries of parameters.573574Parameters575----------576query : str577The SQL statement to execute578args : iterable of iterables or dicts, optional579Sets of parameters to substitute into the SQL code580581Examples582--------583>>> cur.executemany('select * from mytable where id < %s',584... [[100], [200], [300]])585586>>> cur.executemany('select * from mytable where id < %(max)s',587... [dict(max=100), dict(max=100), dict(max=300)])588589Returns590-------591Number of rows affected592593"""594# NOTE: Just implement using `execute` to cover driver inconsistencies595if not args:596self.execute(query)597else:598for params in args:599self.execute(query, params)600return self.rowcount601602@abc.abstractmethod603def fetchone(self) -> Optional[Result]:604"""605Fetch a single row from the result set.606607Examples608--------609>>> while True:610... row = cur.fetchone()611... if row is None:612... break613... print(row)614615Returns616-------617tuple618Values of the returned row if there are rows remaining619620"""621raise NotImplementedError622623@abc.abstractmethod624def fetchmany(self, size: Optional[int] = None) -> Result:625"""626Fetch `size` rows from the result.627628If `size` is not specified, the `arraysize` attribute is used.629630Examples631--------632>>> while True:633... out = cur.fetchmany(100)634... if not len(out):635... break636... for row in out:637... print(row)638639Returns640-------641list of tuples642Values of the returned rows if there are rows remaining643644"""645raise NotImplementedError646647@abc.abstractmethod648def fetchall(self) -> Result:649"""650Fetch all rows in the result set.651652Examples653--------654>>> for row in cur.fetchall():655... print(row)656657Returns658-------659list of tuples660Values of the returned rows if there are rows remaining661None662If there are no rows to return663664"""665raise NotImplementedError666667@abc.abstractmethod668def nextset(self) -> Optional[bool]:669"""670Skip to the next available result set.671672This is used when calling a procedure that returns multiple673results sets.674675Note676----677The ``nextset`` method must be called until it returns an empty678set (i.e., once more than the number of expected result sets).679This is to retain compatibility with PyMySQL and MySOLdb.680681Returns682-------683``True``684If another result set is available685``False``686If no other result set is available687688"""689raise NotImplementedError690691@abc.abstractmethod692def setinputsizes(self, sizes: Sequence[int]) -> None:693"""Predefine memory areas for parameters."""694raise NotImplementedError695696@abc.abstractmethod697def setoutputsize(self, size: int, column: Optional[str] = None) -> None:698"""Set a column buffer size for fetches of large columns."""699raise NotImplementedError700701@abc.abstractmethod702def scroll(self, value: int, mode: str = 'relative') -> None:703"""704Scroll the cursor to the position in the result set.705706Parameters707----------708value : int709Value of the positional move710mode : str711Where to move the cursor from: 'relative' or 'absolute'712713"""714raise NotImplementedError715716def next(self) -> Optional[Result]:717"""718Return the next row from the result set for use in iterators.719720Raises721------722StopIteration723If no more results exist724725Returns726-------727tuple of values728729"""730if not self.is_connected():731raise exceptions.InterfaceError(2048, 'Cursor is closed.')732out = self.fetchone()733if out is None:734raise StopIteration735return out736737__next__ = next738739def __iter__(self) -> Any:740"""Return result iterator."""741return self742743def __enter__(self) -> 'Cursor':744"""Enter a context."""745return self746747def __exit__(748self, exc_type: Optional[object],749exc_value: Optional[Exception], exc_traceback: Optional[str],750) -> None:751"""Exit a context."""752self.close()753754755class ShowResult(Sequence[Any]):756"""757Simple result object.758759This object is primarily used for displaying results to a760terminal or web browser, but it can also be treated like a761simple data frame where columns are accessible using either762dictionary key-like syntax or attribute syntax.763764Examples765--------766>>> conn.show.status().Value[10]767768>>> conn.show.status()[10]['Value']769770Parameters771----------772*args : Any773Parameters to send to underlying list constructor774**kwargs : Any775Keyword parameters to send to underlying list constructor776777See Also778--------779:attr:`Connection.show`780781"""782783def __init__(self, *args: Any, **kwargs: Any) -> None:784self._data: List[Dict[str, Any]] = []785item: Any = None786for item in list(*args, **kwargs):787self._data.append(item)788789def __getitem__(self, item: Union[int, slice]) -> Any:790return self._data[item]791792def __getattr__(self, name: str) -> List[Any]:793if name.startswith('_ipython'):794raise AttributeError(name)795out = []796for item in self._data:797out.append(item[name])798return out799800def __len__(self) -> int:801return len(self._data)802803def __repr__(self) -> str:804if not self._data:805return ''806return '\n{}\n'.format(self._format_table(self._data))807808@property809def columns(self) -> List[str]:810"""The columns in the result."""811if not self._data:812return []813return list(self._data[0].keys())814815def _format_table(self, rows: Sequence[Dict[str, Any]]) -> str:816if not self._data:817return ''818819keys = rows[0].keys()820lens = [len(x) for x in keys]821822for row in self._data:823align = ['<'] * len(keys)824for i, k in enumerate(keys):825lens[i] = max(lens[i], len(str(row[k])))826align[i] = '<' if isinstance(row[k], (bytes, bytearray, str)) else '>'827828fmt = '| %s |' % '|'.join([' {:%s%d} ' % (x, y) for x, y in zip(align, lens)])829830out = []831out.append(fmt.format(*keys))832out.append('-' * len(out[0]))833for row in rows:834out.append(fmt.format(*[str(x) for x in row.values()]))835return '\n'.join(out)836837def __str__(self) -> str:838return self.__repr__()839840def _repr_html_(self) -> str:841if not self._data:842return ''843cell_style = 'style="text-align: left; vertical-align: top"'844out = []845out.append('<table border="1" class="dataframe">')846out.append('<thead>')847out.append('<tr>')848for name in self._data[0].keys():849out.append(f'<th {cell_style}>{name}</th>')850out.append('</tr>')851out.append('</thead>')852out.append('<tbody>')853for row in self._data:854out.append('<tr>')855for item in row.values():856out.append(f'<td {cell_style}>{item}</td>')857out.append('</tr>')858out.append('</tbody>')859out.append('</table>')860return ''.join(out)861862863class ShowAccessor(object):864"""865Accessor for ``SHOW`` commands.866867See Also868--------869:attr:`Connection.show`870871"""872873def __init__(self, conn: 'Connection'):874self._conn = conn875876def columns(self, table: str, full: bool = False) -> ShowResult:877"""Show the column information for the given table."""878table = quote_identifier(table)879if full:880return self._iquery(f'full columns in {table}')881return self._iquery(f'columns in {table}')882883def tables(self, extended: bool = False) -> ShowResult:884"""Show tables in the current database."""885if extended:886return self._iquery('tables extended')887return self._iquery('tables')888889def warnings(self) -> ShowResult:890"""Show warnings."""891return self._iquery('warnings')892893def errors(self) -> ShowResult:894"""Show errors."""895return self._iquery('errors')896897def databases(self, extended: bool = False) -> ShowResult:898"""Show all databases in the server."""899if extended:900return self._iquery('databases extended')901return self._iquery('databases')902903def database_status(self) -> ShowResult:904"""Show status of the current database."""905return self._iquery('database status')906907def global_status(self) -> ShowResult:908"""Show global status of the current server."""909return self._iquery('global status')910911def indexes(self, table: str) -> ShowResult:912"""Show all indexes in the given table."""913table = quote_identifier(table)914return self._iquery(f'indexes in {table}')915916def functions(self) -> ShowResult:917"""Show all functions in the current database."""918return self._iquery('functions')919920def partitions(self, extended: bool = False) -> ShowResult:921"""Show partitions in the current database."""922if extended:923return self._iquery('partitions extended')924return self._iquery('partitions')925926def pipelines(self) -> ShowResult:927"""Show all pipelines in the current database."""928return self._iquery('pipelines')929930def plan(self, plan_id: int, json: bool = False) -> ShowResult:931"""Show the plan for the given plan ID."""932plan_id = int(plan_id)933if json:934return self._iquery(f'plan json {plan_id}')935return self._iquery(f'plan {plan_id}')936937def plancache(self) -> ShowResult:938"""Show all query statements compiled and executed."""939return self._iquery('plancache')940941def processlist(self) -> ShowResult:942"""Show details about currently running threads."""943return self._iquery('processlist')944945def reproduction(self, outfile: Optional[str] = None) -> ShowResult:946"""Show troubleshooting data for query optimizer and code generation."""947if outfile:948outfile = outfile.replace('"', r'\"')949return self._iquery('reproduction into outfile "{outfile}"')950return self._iquery('reproduction')951952def schemas(self) -> ShowResult:953"""Show schemas in the server."""954return self._iquery('schemas')955956def session_status(self) -> ShowResult:957"""Show server status information for a session."""958return self._iquery('session status')959960def status(self, extended: bool = False) -> ShowResult:961"""Show server status information."""962if extended:963return self._iquery('status extended')964return self._iquery('status')965966def table_status(self) -> ShowResult:967"""Show table status information for the current database."""968return self._iquery('table status')969970def procedures(self) -> ShowResult:971"""Show all procedures in the current database."""972return self._iquery('procedures')973974def aggregates(self) -> ShowResult:975"""Show all aggregate functions in the current database."""976return self._iquery('aggregates')977978def create_aggregate(self, name: str) -> ShowResult:979"""Show the function creation code for the given aggregate function."""980name = quote_identifier(name)981return self._iquery(f'create aggregate {name}')982983def create_function(self, name: str) -> ShowResult:984"""Show the function creation code for the given function."""985name = quote_identifier(name)986return self._iquery(f'create function {name}')987988def create_pipeline(self, name: str, extended: bool = False) -> ShowResult:989"""Show the pipeline creation code for the given pipeline."""990name = quote_identifier(name)991if extended:992return self._iquery(f'create pipeline {name} extended')993return self._iquery(f'create pipeline {name}')994995def create_table(self, name: str) -> ShowResult:996"""Show the table creation code for the given table."""997name = quote_identifier(name)998return self._iquery(f'create table {name}')9991000def create_view(self, name: str) -> ShowResult:1001"""Show the view creation code for the given view."""1002name = quote_identifier(name)1003return self._iquery(f'create view {name}')10041005# def grants(1006# self,1007# user: Optional[str] = None,1008# hostname: Optional[str] = None,1009# role: Optional[str] = None1010# ) -> ShowResult:1011# """Show the privileges for the given user or role."""1012# if user:1013# if not re.match(r'^[\w+-_]+$', user):1014# raise ValueError(f'User name is not valid: {user}')1015# if hostname and not re.match(r'^[\w+-_\.]+$', hostname):1016# raise ValueError(f'Hostname is not valid: {hostname}')1017# if hostname:1018# return self._iquery(f"grants for '{user}@{hostname}'")1019# return self._iquery(f"grants for '{user}'")1020# if role:1021# if not re.match(r'^[\w+-_]+$', role):1022# raise ValueError(f'Role is not valid: {role}')1023# return self._iquery(f"grants for role '{role}'")1024# return self._iquery('grants')10251026def _iquery(self, qtype: str) -> ShowResult:1027"""Query the given object type."""1028out = self._conn._iquery(f'show {qtype}')1029for i, row in enumerate(out):1030new_row = {}1031for j, (k, v) in enumerate(row.items()):1032if j == 0:1033k = 'Name'1034new_row[under2camel(k)] = v1035out[i] = new_row1036return ShowResult(out)103710381039class Connection(metaclass=abc.ABCMeta):1040"""1041SingleStoreDB connection.10421043Instances of this object are typically created through the1044:func:`singlestoredb.connect` function rather than creating them directly.1045See the :func:`singlestoredb.connect` function for parameter definitions.10461047See Also1048--------1049:func:`singlestoredb.connect`10501051"""10521053Warning = exceptions.Warning1054Error = exceptions.Error1055InterfaceError = exceptions.InterfaceError1056DataError = exceptions.DataError1057DatabaseError = exceptions.DatabaseError1058OperationalError = exceptions.OperationalError1059IntegrityError = exceptions.IntegrityError1060InternalError = exceptions.InternalError1061ProgrammingError = exceptions.ProgrammingError1062NotSupportedError = exceptions.NotSupportedError10631064#: Read-only DB-API parameter style1065paramstyle = 'pyformat'10661067# Must be set by subclass1068driver = ''10691070# Populated when first needed1071_map_param_converter: Optional[sqlparams.SQLParams] = None1072_positional_param_converter: Optional[sqlparams.SQLParams] = None10731074def __init__(self, **kwargs: Any):1075"""Call :func:`singlestoredb.connect` instead."""1076self.connection_params: Dict[str, Any] = kwargs1077self.errorhandler = None1078self._results_type: str = kwargs.get('results_type', None) or 'tuples'10791080#: Session encoding1081self.encoding = self.connection_params.get('charset', None) or 'utf-8'1082self.encoding = self.encoding.replace('mb4', '')10831084# Handle various authentication types1085credential_type = self.connection_params.get('credential_type', None)1086if credential_type == auth.BROWSER_SSO:1087# TODO: Cache info for token refreshes1088info = auth.get_jwt(self.connection_params['user'])1089self.connection_params['password'] = str(info)1090self.connection_params['credential_type'] = auth.JWT10911092#: Attribute-like access to global server variables1093self.globals = VariableAccessor(self, 'global')10941095#: Attribute-like access to local / session server variables1096self.locals = VariableAccessor(self, 'local')10971098#: Attribute-like access to cluster global server variables1099self.cluster_globals = VariableAccessor(self, 'cluster global')11001101#: Attribute-like access to cluster local / session server variables1102self.cluster_locals = VariableAccessor(self, 'cluster local')11031104#: Attribute-like access to all server variables1105self.vars = VariableAccessor(self, '')11061107#: Attribute-like access to all cluster server variables1108self.cluster_vars = VariableAccessor(self, 'cluster')11091110# For backwards compatibility with SQLAlchemy package1111self._driver = Driver(self.driver)11121113# Output decoders1114self.decoders: Dict[int, Callable[[Any], Any]] = {}11151116@classmethod1117def _convert_params(1118cls, oper: str,1119params: Optional[Union[Sequence[Any], Dict[str, Any], Any]],1120interpolate_query_with_empty_args: bool = False,1121) -> Tuple[Any, ...]:1122"""Convert query to correct parameter format."""1123if interpolate_query_with_empty_args:1124should_convert = params is not None1125else:1126should_convert = bool(params)11271128if should_convert:11291130if cls._map_param_converter is None:1131cls._map_param_converter = sqlparams.SQLParams(1132map_paramstyle, cls.paramstyle, escape_char=True,1133)11341135if cls._positional_param_converter is None:1136cls._positional_param_converter = sqlparams.SQLParams(1137positional_paramstyle, cls.paramstyle, escape_char=True,1138)11391140is_sequence = isinstance(params, Sequence) \1141and not isinstance(params, str) \1142and not isinstance(params, bytes)1143is_mapping = isinstance(params, Mapping)11441145param_converter = cls._map_param_converter \1146if is_mapping else cls._positional_param_converter11471148if not is_sequence and not is_mapping:1149params = [params]11501151return param_converter.format(oper, params)11521153return (oper, None)11541155def autocommit(self, value: bool = True) -> None:1156"""Set autocommit mode."""1157self.locals.autocommit = bool(value)11581159@abc.abstractmethod1160def connect(self) -> 'Connection':1161"""Connect to the server."""1162raise NotImplementedError11631164def _iquery(1165self, oper: str,1166params: Optional[Union[Sequence[Any], Dict[str, Any]]] = None,1167fix_names: bool = True,1168) -> List[Dict[str, Any]]:1169"""Return the results of a query as a list of dicts (for internal use)."""1170with self.cursor() as cur:1171cur.execute(oper, params)1172if not re.match(r'^\s*(select|show|call|echo)\s+', oper, flags=re.I):1173return []1174out = list(cur.fetchall())1175if not out:1176return []1177if isinstance(out, DataFrame):1178out = out.to_dict(orient='records')1179elif isinstance(out[0], (tuple, list)):1180if cur.description:1181names = [x[0] for x in cur.description]1182if fix_names:1183names = [under2camel(str(x).replace(' ', '')) for x in names]1184out = [{k: v for k, v in zip(names, row)} for row in out]1185return out11861187@abc.abstractmethod1188def close(self) -> None:1189"""Close the database connection."""1190raise NotImplementedError11911192@abc.abstractmethod1193def commit(self) -> None:1194"""Commit the pending transaction."""1195raise NotImplementedError11961197@abc.abstractmethod1198def rollback(self) -> None:1199"""Rollback the pending transaction."""1200raise NotImplementedError12011202@abc.abstractmethod1203def cursor(self) -> Cursor:1204"""1205Create a new cursor object.12061207See Also1208--------1209:class:`Cursor`12101211Returns1212-------1213:class:`Cursor`12141215"""1216raise NotImplementedError12171218@abc.abstractproperty1219def messages(self) -> List[Tuple[int, str]]:1220"""Messages generated during the connection."""1221raise NotImplementedError12221223def __enter__(self) -> 'Connection':1224"""Enter a context."""1225return self12261227def __exit__(1228self, exc_type: Optional[object],1229exc_value: Optional[Exception], exc_traceback: Optional[str],1230) -> None:1231"""Exit a context."""1232self.close()12331234@abc.abstractmethod1235def is_connected(self) -> bool:1236"""1237Determine if the database is still connected.12381239Returns1240-------1241bool12421243"""1244raise NotImplementedError12451246def enable_data_api(self, port: Optional[int] = None) -> int:1247"""1248Enable the data API in the server.12491250Use of this method requires privileges that allow setting global1251variables and starting the HTTP proxy.12521253Parameters1254----------1255port : int, optional1256The port number that the HTTP server should run on. If this1257value is not specified, the current value of the1258``http_proxy_port`` is used.12591260See Also1261--------1262:meth:`disable_data_api`12631264Returns1265-------1266int1267port number of the HTTP server12681269"""1270if port is not None:1271self.globals.http_proxy_port = int(port)1272self.globals.http_api = True1273self._iquery('restart proxy')1274return int(self.globals.http_proxy_port)12751276enable_http_api = enable_data_api12771278def disable_data_api(self) -> None:1279"""1280Disable the data API.12811282See Also1283--------1284:meth:`enable_data_api`12851286"""1287self.globals.http_api = False1288self._iquery('restart proxy')12891290disable_http_api = disable_data_api12911292@property1293def show(self) -> ShowAccessor:1294"""Access server properties managed by the SHOW statement."""1295return ShowAccessor(self)12961297@functools.cached_property1298def vector_db(self) -> Any:1299"""1300Get vectorstore API accessor1301"""1302from vectorstore import VectorDB1303return VectorDB(connection=self)130413051306#1307# NOTE: When adding parameters to this function, you should always1308# make the value optional with a default of None. The options1309# processing framework will fill in the default value based1310# on environment variables or other configuration sources.1311#1312def connect(1313host: Optional[str] = None, user: Optional[str] = None,1314password: Optional[str] = None, port: Optional[int] = None,1315database: Optional[str] = None, driver: Optional[str] = None,1316pure_python: Optional[bool] = None, local_infile: Optional[bool] = None,1317charset: Optional[str] = None,1318ssl_key: Optional[str] = None, ssl_cert: Optional[str] = None,1319ssl_ca: Optional[str] = None, ssl_disabled: Optional[bool] = None,1320ssl_cipher: Optional[str] = None, ssl_verify_cert: Optional[bool] = None,1321tls_sni_servername: Optional[str] = None,1322ssl_verify_identity: Optional[bool] = None,1323conv: Optional[Dict[int, Callable[..., Any]]] = None,1324credential_type: Optional[str] = None,1325autocommit: Optional[bool] = None,1326results_type: Optional[str] = None,1327buffered: Optional[bool] = None,1328results_format: Optional[str] = None,1329program_name: Optional[str] = None,1330conn_attrs: Optional[Dict[str, str]] = None,1331multi_statements: Optional[bool] = None,1332client_found_rows: Optional[bool] = None,1333connect_timeout: Optional[int] = None,1334nan_as_null: Optional[bool] = None,1335inf_as_null: Optional[bool] = None,1336encoding_errors: Optional[str] = None,1337track_env: Optional[bool] = None,1338enable_extended_data_types: Optional[bool] = None,1339vector_data_format: Optional[str] = None,1340parse_json: Optional[bool] = None,1341interpolate_query_with_empty_args: Optional[bool] = None,1342) -> Connection:1343"""1344Return a SingleStoreDB connection.13451346Parameters1347----------1348host : str, optional1349Hostname, IP address, or URL that describes the connection.1350The scheme or protocol defines which database connector to use.1351By default, the ``mysql`` scheme is used. To connect to the1352HTTP API, the scheme can be set to ``http`` or ``https``. The username,1353password, host, and port are specified as in a standard URL. The path1354indicates the database name. The overall form of the URL is:1355``scheme://user:password@host:port/db_name``. The scheme can1356typically be left off (unless you are using the HTTP API):1357``user:password@host:port/db_name``.1358user : str, optional1359Database user name1360password : str, optional1361Database user password1362port : int, optional1363Database port. This defaults to 3306 for non-HTTP connections, 801364for HTTP connections, and 443 for HTTPS connections.1365database : str, optional1366Database name1367pure_python : bool, optional1368Use the connector in pure Python mode1369local_infile : bool, optional1370Allow local file uploads1371charset : str, optional1372Character set for string values1373ssl_key : str, optional1374File containing SSL key1375ssl_cert : str, optional1376File containing SSL certificate1377ssl_ca : str, optional1378File containing SSL certificate authority1379ssl_cipher : str, optional1380Sets the SSL cipher list1381ssl_disabled : bool, optional1382Disable SSL usage1383ssl_verify_cert : bool, optional1384Verify the server's certificate. This is automatically enabled if1385``ssl_ca`` is also specified.1386ssl_verify_identity : bool, optional1387Verify the server's identity1388conv : dict[int, Callable], optional1389Dictionary of data conversion functions1390credential_type : str, optional1391Type of authentication to use: auth.PASSWORD, auth.JWT, or auth.BROWSER_SSO1392autocommit : bool, optional1393Enable autocommits1394results_type : str, optional1395The form of the query results: tuples, namedtuples, dicts,1396numpy, polars, pandas, arrow1397buffered : bool, optional1398Should the entire query result be buffered in memory? This is the default1399behavior which allows full cursor control of the result, but does consume1400more memory.1401results_format : str, optional1402Deprecated. This option has been renamed to results_type.1403program_name : str, optional1404Name of the program1405conn_attrs : dict, optional1406Additional connection attributes for telemetry. Example:1407{'program_version': "1.0.2", "_connector_name": "dbt connector"}1408multi_statements: bool, optional1409Should multiple statements be allowed within a single query?1410connect_timeout : int, optional1411The timeout for connecting to the database in seconds.1412(default: 10, min: 1, max: 31536000)1413nan_as_null : bool, optional1414Should NaN values be treated as NULLs when used in parameter1415substitutions including uploaded data?1416inf_as_null : bool, optional1417Should Inf values be treated as NULLs when used in parameter1418substitutions including uploaded data?1419encoding_errors : str, optional1420The error handler name for value decoding errors1421track_env : bool, optional1422Should the connection track the SINGLESTOREDB_URL environment variable?1423enable_extended_data_types : bool, optional1424Should extended data types (BSON, vector) be enabled?1425vector_data_format : str, optional1426Format for vector types: json or binary1427interpolate_query_with_empty_args : bool, optional1428Should the connector apply parameter interpolation even when the1429parameters are empty? This corresponds to pymysql/mysqlclient's handling14301431Examples1432--------1433Standard database connection14341435>>> conn = s2.connect('me:[email protected]/my_db')14361437Connect to HTTP API on port 808014381439>>> conn = s2.connect('http://me:[email protected]:8080/my_db')14401441Using an environment variable for connection string14421443>>> os.environ['SINGLESTOREDB_URL'] = 'me:[email protected]/my_db'1444>>> conn = s2.connect()14451446Specifying credentials using environment variables14471448>>> os.environ['SINGLESTOREDB_USER'] = 'me'1449>>> os.environ['SINGLESTOREDB_PASSWORD'] = 'p455w0rd'1450>>> conn = s2.connect('s2-host.com/my_db')14511452Specifying options with keyword parameters14531454>>> conn = s2.connect('s2-host.com/my_db', user='me', password='p455w0rd',1455local_infile=True)14561457Specifying options with URL parameters14581459>>> conn = s2.connect('s2-host.com/my_db?local_infile=True&charset=utf8')14601461Connecting within a context manager14621463>>> with s2.connect('...') as conn:1464... with conn.cursor() as cur:1465... cur.execute('...')14661467Setting session variables, the code below sets the ``autocommit`` option14681469>>> conn.locals.autocommit = True14701471Getting session variables14721473>>> conn.locals.autocommit1474True14751476See Also1477--------1478:class:`Connection`14791480Returns1481-------1482:class:`Connection`14831484"""1485params = build_params(**dict(locals()))1486driver = params.get('driver', 'mysql')14871488if not driver or driver == 'mysql':1489from .mysql.connection import Connection # type: ignore1490return Connection(**params)14911492if driver in ['http', 'https']:1493from .http.connection import Connection1494return Connection(**params)14951496raise ValueError(f'Unrecognized protocol: {driver}')149714981499