Path: blob/main/singlestoredb/connection.py
469 views
#!/usr/bin/env python1"""SingleStoreDB connections and cursors."""2import abc3import functools4import inspect5import io6import queue7import re8import sys9import warnings10import weakref11from collections.abc import Mapping12from collections.abc import MutableMapping13from typing import Any14from typing import Callable15from typing import Dict16from typing import Iterator17from typing import List18from typing import Optional19from typing import Sequence20from typing import Tuple21from typing import Union22from urllib.parse import parse_qs23from urllib.parse import unquote_plus24from urllib.parse import urlparse2526import sqlparams27try:28from pandas import DataFrame29except ImportError:30class DataFrame(object): # type: ignore31def itertuples(self, *args: Any, **kwargs: Any) -> None:32pass3334from . import auth35from . import exceptions36from .config import get_option37from .utils.results import Description38from .utils.results import Result3940if sys.version_info < (3, 10):41InfileQueue = queue.Queue42else:43InfileQueue = queue.Queue[Union[bytes, str]]444546# DB-API settings47apilevel = '2.0'48threadsafety = 149paramstyle = map_paramstyle = 'pyformat'50positional_paramstyle = 'format'515253# Type codes for character-based columns54CHAR_COLUMNS = set(list(range(247, 256)) + [245])555657def under2camel(s: str) -> str:58"""Format underscore-delimited strings to camel-case."""5960def upper_mid(m: Any) -> str:61"""Uppercase middle group of matches."""62return m.group(1) + m.group(2).upper() + m.group(3)6364def upper(m: Any) -> str:65"""Uppercase match."""66return m.group(1).upper()6768s = re.sub(r'(\b|_)(xml|sql|json)(\b|_)', upper_mid, s, flags=re.I)69s = re.sub(r'(?:^|_+)(\w)', upper, s)70s = re.sub(r'_+$', r'', s)7172return s737475def nested_converter(76conv: Callable[[Any], Any],77inner: Callable[[Any], Any],78) -> Callable[[Any], Any]:79"""Create a pipeline of two functions."""80def converter(value: Any) -> Any:81return conv(inner(value))82return converter838485def cast_bool_param(val: Any) -> bool:86"""Cast value to a bool."""87if val is None or val is False:88return False8990if val is True:91return True9293# Test ints94try:95ival = int(val)96if ival == 1:97return True98if ival == 0:99return False100except Exception:101pass102103# Lowercase strings104if hasattr(val, 'lower'):105if val.lower() in ['on', 't', 'true', 'y', 'yes', 'enabled', 'enable']:106return True107elif val.lower() in ['off', 'f', 'false', 'n', 'no', 'disabled', 'disable']:108return False109110raise ValueError('Unrecognized value for bool: {}'.format(val))111112113def build_params(**kwargs: Any) -> Dict[str, Any]:114"""115Construct connection parameters from given URL and arbitrary parameters.116117Parameters118----------119**kwargs : keyword-parameters, optional120Arbitrary keyword parameters corresponding to connection parameters121122Returns123-------124dict125126"""127out: Dict[str, Any] = {}128129kwargs = {k: v for k, v in kwargs.items() if v is not None}130131# Set known parameters132for name in inspect.getfullargspec(connect).args:133if name == 'conv':134out[name] = kwargs.get(name, None)135elif name == 'results_format': # deprecated136if kwargs.get(name, None) is not None:137warnings.warn(138'The `results_format=` parameter has been '139'renamed to `results_type=`.',140DeprecationWarning,141)142out['results_type'] = kwargs.get(name, get_option('results.type'))143elif name == 'results_type':144out[name] = kwargs.get(name, get_option('results.type'))145else:146out[name] = kwargs.get(name, get_option(name))147148# See if host actually contains a URL; definitely not a perfect test.149host = out['host']150if host and (':' in host or '/' in host or '@' in host or '?' in host):151urlp = _parse_url(host)152if 'driver' not in urlp:153urlp['driver'] = get_option('driver')154out.update(urlp)155156out = _cast_params(out)157158# Set default port based on driver.159if 'port' not in out or not out['port']:160if out['driver'] == 'http':161out['port'] = int(get_option('http_port') or 80)162elif out['driver'] == 'https':163out['port'] = int(get_option('http_port') or 443)164else:165out['port'] = int(get_option('port') or 3306)166167# If there is no user and the password is empty, remove the password key.168if 'user' not in out and not out.get('password', None):169out.pop('password', None)170171if out.get('ssl_ca', '') and not out.get('ssl_verify_cert', None):172out['ssl_verify_cert'] = True173174return out175176177def _get_param_types(func: Any) -> Dict[str, Any]:178"""179Retrieve the types for the parameters to the given function.180181Note that if a parameter has multiple possible types, only the182first one is returned.183184Parameters185----------186func : callable187Callable object to inspect the parameters of188189Returns190-------191dict192193"""194out = {}195args = inspect.getfullargspec(func)196for name in args.args:197ann = args.annotations[name]198if isinstance(ann, str):199ann = eval(ann)200if hasattr(ann, '__args__'):201out[name] = ann.__args__[0]202else:203out[name] = ann204return out205206207def _cast_params(params: Dict[str, Any]) -> Dict[str, Any]:208"""209Cast known keys to appropriate values.210211Parameters212----------213params : dict214Dictionary of connection parameters215216Returns217-------218dict219220"""221param_types = _get_param_types(connect)222out = {}223for key, val in params.items():224key = key.lower()225if val is None:226continue227if key not in param_types:228raise ValueError('Unrecognized connection parameter: {}'.format(key))229dtype = param_types[key]230if dtype is bool:231val = cast_bool_param(val)232elif getattr(dtype, '_name', '') in ['Dict', 'Mapping'] or \233str(dtype).startswith('typing.Dict'):234val = dict(val)235elif getattr(dtype, '_name', '') == 'List':236val = list(val)237elif getattr(dtype, '_name', '') == 'Tuple':238val = tuple(val)239else:240val = dtype(val)241out[key] = val242return out243244245def _parse_url(url: str) -> Dict[str, Any]:246"""247Parse a connection URL and return only the defined parts.248249Parameters250----------251url : str252The URL passed in can be a full URL or a partial URL. At a minimum,253a host name must be specified. All other parts are optional.254255Returns256-------257dict258259"""260out: Dict[str, Any] = {}261262if '//' not in url:263url = '//' + url264265if url.startswith('singlestoredb+'):266url = re.sub(r'^singlestoredb\+', r'', url)267268parts = urlparse(url, scheme='singlestoredb', allow_fragments=True)269270url_db = parts.path271if url_db.startswith('/'):272url_db = url_db.split('/')[1].strip()273url_db = url_db.split('/')[0].strip() or ''274275# Retrieve basic connection parameters276out['host'] = parts.hostname or None277out['port'] = parts.port or None278out['database'] = url_db or None279out['user'] = parts.username or None280281# Allow an empty string for password282if out['user'] and parts.password is not None:283out['password'] = parts.password284285if parts.scheme != 'singlestoredb':286out['driver'] = parts.scheme.lower()287288if out.get('user'):289out['user'] = unquote_plus(out['user'])290291if out.get('password'):292out['password'] = unquote_plus(out['password'])293294if out.get('database'):295out['database'] = unquote_plus(out['database'])296297# Convert query string to parameters298out.update({k.lower(): v[-1] for k, v in parse_qs(parts.query).items()})299300return {k: v for k, v in out.items() if v is not None}301302303def _name_check(name: str) -> str:304"""305Make sure the given name is a legal variable name.306307Parameters308----------309name : str310Name to check311312Returns313-------314str315316"""317name = name.strip()318if not re.match(r'^[A-Za-z_][\w+_]*$', name):319raise ValueError('Name contains invalid characters')320return name321322323def quote_identifier(name: str) -> str:324"""Escape identifier value."""325return f'`{name}`'326327328class Driver(object):329"""Compatibility class for driver name."""330331def __init__(self, name: str):332self.name = name333334335class VariableAccessor(MutableMapping): # type: ignore336"""Variable accessor class."""337338def __init__(self, conn: 'Connection', vtype: str):339object.__setattr__(self, 'connection', weakref.proxy(conn))340object.__setattr__(self, 'vtype', vtype.lower())341if self.vtype not in [342'global', 'local', '',343'cluster', 'cluster global', 'cluster local',344]:345raise ValueError(346'Variable type must be global, local, cluster, '347'cluster global, cluster local, or empty',348)349350def _cast_value(self, value: Any) -> Any:351if isinstance(value, str):352if value.lower() in ['on', 'true']:353return True354if value.lower() in ['off', 'false']:355return False356return value357358def __getitem__(self, name: str) -> Any:359name = _name_check(name)360out = self.connection._iquery(361'show {} variables like %s;'.format(self.vtype),362[name],363)364if not out:365raise KeyError(f"No variable found with the name '{name}'.")366if len(out) > 1:367raise KeyError(f"Multiple variables found with the name '{name}'.")368return self._cast_value(out[0]['Value'])369370def __setitem__(self, name: str, value: Any) -> None:371name = _name_check(name)372if value is True:373value = 'ON'374elif value is False:375value = 'OFF'376if 'local' in self.vtype:377self.connection._iquery(378'set {} {}=%s;'.format(379self.vtype.replace('local', 'session'), name,380), [value],381)382else:383self.connection._iquery('set {} {}=%s;'.format(self.vtype, name), [value])384385def __delitem__(self, name: str) -> None:386raise TypeError('Variables can not be deleted.')387388def __getattr__(self, name: str) -> Any:389return self[name]390391def __setattr__(self, name: str, value: Any) -> None:392self[name] = value393394def __delattr__(self, name: str) -> None:395del self[name]396397def __len__(self) -> int:398out = self.connection._iquery('show {} variables;'.format(self.vtype))399return len(list(out))400401def __iter__(self) -> Iterator[str]:402out = self.connection._iquery('show {} variables;'.format(self.vtype))403return iter(list(x.values())[0] for x in out)404405406class Cursor(metaclass=abc.ABCMeta):407"""408Database cursor for submitting commands and queries.409410This object should not be instantiated directly.411The ``Connection.cursor`` method should be used.412413"""414415def __init__(self, connection: 'Connection'):416"""Call ``Connection.cursor`` instead."""417self.errorhandler = connection.errorhandler418self._connection: Optional[Connection] = weakref.proxy(connection)419420self._rownumber: Optional[int] = None421422self._description: Optional[List[Description]] = None423424#: Default batch size of ``fetchmany`` calls.425self.arraysize = get_option('results.arraysize')426427self._converters: List[428Tuple[429int, Optional[str],430Optional[Callable[..., Any]],431]432] = []433434#: Number of rows affected by the last query.435self.rowcount: int = -1436437self._messages: List[Tuple[int, str]] = []438439#: Row ID of the last modified row.440self.lastrowid: Optional[int] = None441442@property443def messages(self) -> List[Tuple[int, str]]:444"""Messages created by the server."""445return self._messages446447@abc.abstractproperty448def description(self) -> Optional[List[Description]]:449"""The field descriptions of the last query."""450return self._description451452@abc.abstractproperty453def rownumber(self) -> Optional[int]:454"""The last modified row number."""455return self._rownumber456457@property458def connection(self) -> Optional['Connection']:459"""the connection that the cursor belongs to."""460return self._connection461462@abc.abstractmethod463def callproc(464self, name: str,465params: Optional[Sequence[Any]] = None,466) -> None:467"""468Call a stored procedure.469470The result sets generated by a store procedure can be retrieved471like the results of any other query using :meth:`fetchone`,472:meth:`fetchmany`, or :meth:`fetchall`. If the procedure generates473multiple result sets, subsequent result sets can be accessed474using :meth:`nextset`.475476Examples477--------478>>> cur.callproc('myprocedure', ['arg1', 'arg2'])479>>> print(cur.fetchall())480481Parameters482----------483name : str484Name of the stored procedure485params : iterable, optional486Parameters to the stored procedure487488"""489# NOTE: The `callproc` interface varies quite a bit between drivers490# so it is implemented using `execute` here.491492if not self.is_connected():493raise exceptions.InterfaceError(2048, 'Cursor is closed.')494495name = _name_check(name)496497if not params:498self.execute(f'CALL {name}();')499else:500keys = ', '.join([f':{i+1}' for i in range(len(params))])501self.execute(f'CALL {name}({keys});', params)502503@abc.abstractmethod504def is_connected(self) -> bool:505"""Is the cursor still connected?"""506raise NotImplementedError507508@abc.abstractmethod509def close(self) -> None:510"""Close the cursor."""511raise NotImplementedError512513@abc.abstractmethod514def execute(515self, query: str,516args: Optional[Union[Sequence[Any], Dict[str, Any], Any]] = None,517infile_stream: Optional[ # type: ignore518Union[519io.RawIOBase,520io.TextIOBase,521Iterator[Union[bytes, str]],522InfileQueue,523]524] = None,525) -> int:526"""527Execute a SQL statement.528529Queries can use the ``format``-style parameters (``%s``) when using a530list of paramters or ``pyformat``-style parameters (``%(key)s``)531when using a dictionary of parameters.532533Parameters534----------535query : str536The SQL statement to execute537args : Sequence or dict, optional538Parameters to substitute into the SQL code539infile_stream : io.RawIOBase or io.TextIOBase or Iterator[bytes|str], optional540Data stream for ``LOCAL INFILE`` statement541542Examples543--------544Query with no parameters545546>>> cur.execute('select * from mytable')547548Query with positional parameters549550>>> cur.execute('select * from mytable where id < %s', [100])551552Query with named parameters553554>>> cur.execute('select * from mytable where id < %(max)s', dict(max=100))555556Returns557-------558Number of rows affected559560"""561raise NotImplementedError562563def executemany(564self, query: str,565args: Optional[Sequence[Union[Sequence[Any], Dict[str, Any], Any]]] = None,566) -> int:567"""568Execute SQL code against multiple sets of parameters.569570Queries can use the ``format``-style parameters (``%s``) when using571lists of paramters or ``pyformat``-style parameters (``%(key)s``)572when using dictionaries of parameters.573574Parameters575----------576query : str577The SQL statement to execute578args : iterable of iterables or dicts, optional579Sets of parameters to substitute into the SQL code580581Examples582--------583>>> cur.executemany('select * from mytable where id < %s',584... [[100], [200], [300]])585586>>> cur.executemany('select * from mytable where id < %(max)s',587... [dict(max=100), dict(max=100), dict(max=300)])588589Returns590-------591Number of rows affected592593"""594# NOTE: Just implement using `execute` to cover driver inconsistencies595if not args:596self.execute(query)597else:598for params in args:599self.execute(query, params)600return self.rowcount601602@abc.abstractmethod603def fetchone(self) -> Optional[Result]:604"""605Fetch a single row from the result set.606607Examples608--------609>>> while True:610... row = cur.fetchone()611... if row is None:612... break613... print(row)614615Returns616-------617tuple618Values of the returned row if there are rows remaining619620"""621raise NotImplementedError622623@abc.abstractmethod624def fetchmany(self, size: Optional[int] = None) -> Result:625"""626Fetch `size` rows from the result.627628If `size` is not specified, the `arraysize` attribute is used.629630Examples631--------632>>> while True:633... out = cur.fetchmany(100)634... if not len(out):635... break636... for row in out:637... print(row)638639Returns640-------641list of tuples642Values of the returned rows if there are rows remaining643644"""645raise NotImplementedError646647@abc.abstractmethod648def fetchall(self) -> Result:649"""650Fetch all rows in the result set.651652Examples653--------654>>> for row in cur.fetchall():655... print(row)656657Returns658-------659list of tuples660Values of the returned rows if there are rows remaining661None662If there are no rows to return663664"""665raise NotImplementedError666667@abc.abstractmethod668def nextset(self) -> Optional[bool]:669"""670Skip to the next available result set.671672This is used when calling a procedure that returns multiple673results sets.674675Note676----677The ``nextset`` method must be called until it returns an empty678set (i.e., once more than the number of expected result sets).679This is to retain compatibility with PyMySQL and MySOLdb.680681Returns682-------683``True``684If another result set is available685``False``686If no other result set is available687688"""689raise NotImplementedError690691@abc.abstractmethod692def setinputsizes(self, sizes: Sequence[int]) -> None:693"""Predefine memory areas for parameters."""694raise NotImplementedError695696@abc.abstractmethod697def setoutputsize(self, size: int, column: Optional[str] = None) -> None:698"""Set a column buffer size for fetches of large columns."""699raise NotImplementedError700701@abc.abstractmethod702def scroll(self, value: int, mode: str = 'relative') -> None:703"""704Scroll the cursor to the position in the result set.705706Parameters707----------708value : int709Value of the positional move710mode : str711Where to move the cursor from: 'relative' or 'absolute'712713"""714raise NotImplementedError715716def next(self) -> Optional[Result]:717"""718Return the next row from the result set for use in iterators.719720Raises721------722StopIteration723If no more results exist724725Returns726-------727tuple of values728729"""730if not self.is_connected():731raise exceptions.InterfaceError(2048, 'Cursor is closed.')732out = self.fetchone()733if out is None:734raise StopIteration735return out736737__next__ = next738739def __iter__(self) -> Any:740"""Return result iterator."""741return self742743def __enter__(self) -> 'Cursor':744"""Enter a context."""745return self746747def __exit__(748self, exc_type: Optional[object],749exc_value: Optional[Exception], exc_traceback: Optional[str],750) -> None:751"""Exit a context."""752self.close()753754755class ShowResult(Sequence[Any]):756"""757Simple result object.758759This object is primarily used for displaying results to a760terminal or web browser, but it can also be treated like a761simple data frame where columns are accessible using either762dictionary key-like syntax or attribute syntax.763764Examples765--------766>>> conn.show.status().Value[10]767768>>> conn.show.status()[10]['Value']769770Parameters771----------772*args : Any773Parameters to send to underlying list constructor774**kwargs : Any775Keyword parameters to send to underlying list constructor776777See Also778--------779:attr:`Connection.show`780781"""782783def __init__(self, *args: Any, **kwargs: Any) -> None:784self._data: List[Dict[str, Any]] = []785item: Any = None786for item in list(*args, **kwargs):787self._data.append(item)788789def __getitem__(self, item: Union[int, slice]) -> Any:790return self._data[item]791792def __getattr__(self, name: str) -> List[Any]:793if name.startswith('_ipython'):794raise AttributeError(name)795out = []796for item in self._data:797out.append(item[name])798return out799800def __len__(self) -> int:801return len(self._data)802803def __repr__(self) -> str:804if not self._data:805return ''806return '\n{}\n'.format(self._format_table(self._data))807808@property809def columns(self) -> List[str]:810"""The columns in the result."""811if not self._data:812return []813return list(self._data[0].keys())814815def _format_table(self, rows: Sequence[Dict[str, Any]]) -> str:816if not self._data:817return ''818819keys = rows[0].keys()820lens = [len(x) for x in keys]821822for row in self._data:823align = ['<'] * len(keys)824for i, k in enumerate(keys):825lens[i] = max(lens[i], len(str(row[k])))826align[i] = '<' if isinstance(row[k], (bytes, bytearray, str)) else '>'827828fmt = '| %s |' % '|'.join([' {:%s%d} ' % (x, y) for x, y in zip(align, lens)])829830out = []831out.append(fmt.format(*keys))832out.append('-' * len(out[0]))833for row in rows:834out.append(fmt.format(*[str(x) for x in row.values()]))835return '\n'.join(out)836837def __str__(self) -> str:838return self.__repr__()839840def _repr_html_(self) -> str:841if not self._data:842return ''843cell_style = 'style="text-align: left; vertical-align: top"'844out = []845out.append('<table border="1" class="dataframe">')846out.append('<thead>')847out.append('<tr>')848for name in self._data[0].keys():849out.append(f'<th {cell_style}>{name}</th>')850out.append('</tr>')851out.append('</thead>')852out.append('<tbody>')853for row in self._data:854out.append('<tr>')855for item in row.values():856out.append(f'<td {cell_style}>{item}</td>')857out.append('</tr>')858out.append('</tbody>')859out.append('</table>')860return ''.join(out)861862863class ShowAccessor(object):864"""865Accessor for ``SHOW`` commands.866867See Also868--------869:attr:`Connection.show`870871"""872873def __init__(self, conn: 'Connection'):874self._conn = conn875876def columns(self, table: str, full: bool = False) -> ShowResult:877"""Show the column information for the given table."""878table = quote_identifier(table)879if full:880return self._iquery(f'full columns in {table}')881return self._iquery(f'columns in {table}')882883def tables(self, extended: bool = False) -> ShowResult:884"""Show tables in the current database."""885if extended:886return self._iquery('tables extended')887return self._iquery('tables')888889def warnings(self) -> ShowResult:890"""Show warnings."""891return self._iquery('warnings')892893def errors(self) -> ShowResult:894"""Show errors."""895return self._iquery('errors')896897def databases(self, extended: bool = False) -> ShowResult:898"""Show all databases in the server."""899if extended:900return self._iquery('databases extended')901return self._iquery('databases')902903def database_status(self) -> ShowResult:904"""Show status of the current database."""905return self._iquery('database status')906907def global_status(self) -> ShowResult:908"""Show global status of the current server."""909return self._iquery('global status')910911def indexes(self, table: str) -> ShowResult:912"""Show all indexes in the given table."""913table = quote_identifier(table)914return self._iquery(f'indexes in {table}')915916def functions(self) -> ShowResult:917"""Show all functions in the current database."""918return self._iquery('functions')919920def partitions(self, extended: bool = False) -> ShowResult:921"""Show partitions in the current database."""922if extended:923return self._iquery('partitions extended')924return self._iquery('partitions')925926def pipelines(self) -> ShowResult:927"""Show all pipelines in the current database."""928return self._iquery('pipelines')929930def plan(self, plan_id: int, json: bool = False) -> ShowResult:931"""Show the plan for the given plan ID."""932plan_id = int(plan_id)933if json:934return self._iquery(f'plan json {plan_id}')935return self._iquery(f'plan {plan_id}')936937def plancache(self) -> ShowResult:938"""Show all query statements compiled and executed."""939return self._iquery('plancache')940941def processlist(self) -> ShowResult:942"""Show details about currently running threads."""943return self._iquery('processlist')944945def reproduction(self, outfile: Optional[str] = None) -> ShowResult:946"""Show troubleshooting data for query optimizer and code generation."""947if outfile:948outfile = outfile.replace('"', r'\"')949return self._iquery('reproduction into outfile "{outfile}"')950return self._iquery('reproduction')951952def schemas(self) -> ShowResult:953"""Show schemas in the server."""954return self._iquery('schemas')955956def session_status(self) -> ShowResult:957"""Show server status information for a session."""958return self._iquery('session status')959960def status(self, extended: bool = False) -> ShowResult:961"""Show server status information."""962if extended:963return self._iquery('status extended')964return self._iquery('status')965966def table_status(self) -> ShowResult:967"""Show table status information for the current database."""968return self._iquery('table status')969970def procedures(self) -> ShowResult:971"""Show all procedures in the current database."""972return self._iquery('procedures')973974def aggregates(self) -> ShowResult:975"""Show all aggregate functions in the current database."""976return self._iquery('aggregates')977978def create_aggregate(self, name: str) -> ShowResult:979"""Show the function creation code for the given aggregate function."""980name = quote_identifier(name)981return self._iquery(f'create aggregate {name}')982983def create_function(self, name: str) -> ShowResult:984"""Show the function creation code for the given function."""985name = quote_identifier(name)986return self._iquery(f'create function {name}')987988def create_pipeline(self, name: str, extended: bool = False) -> ShowResult:989"""Show the pipeline creation code for the given pipeline."""990name = quote_identifier(name)991if extended:992return self._iquery(f'create pipeline {name} extended')993return self._iquery(f'create pipeline {name}')994995def create_table(self, name: str) -> ShowResult:996"""Show the table creation code for the given table."""997name = quote_identifier(name)998return self._iquery(f'create table {name}')9991000def create_view(self, name: str) -> ShowResult:1001"""Show the view creation code for the given view."""1002name = quote_identifier(name)1003return self._iquery(f'create view {name}')10041005# def grants(1006# self,1007# user: Optional[str] = None,1008# hostname: Optional[str] = None,1009# role: Optional[str] = None1010# ) -> ShowResult:1011# """Show the privileges for the given user or role."""1012# if user:1013# if not re.match(r'^[\w+-_]+$', user):1014# raise ValueError(f'User name is not valid: {user}')1015# if hostname and not re.match(r'^[\w+-_\.]+$', hostname):1016# raise ValueError(f'Hostname is not valid: {hostname}')1017# if hostname:1018# return self._iquery(f"grants for '{user}@{hostname}'")1019# return self._iquery(f"grants for '{user}'")1020# if role:1021# if not re.match(r'^[\w+-_]+$', role):1022# raise ValueError(f'Role is not valid: {role}')1023# return self._iquery(f"grants for role '{role}'")1024# return self._iquery('grants')10251026def _iquery(self, qtype: str) -> ShowResult:1027"""Query the given object type."""1028out = self._conn._iquery(f'show {qtype}')1029for i, row in enumerate(out):1030new_row = {}1031for j, (k, v) in enumerate(row.items()):1032if j == 0:1033k = 'Name'1034new_row[under2camel(k)] = v1035out[i] = new_row1036return ShowResult(out)103710381039class Connection(metaclass=abc.ABCMeta):1040"""1041SingleStoreDB connection.10421043Instances of this object are typically created through the1044:func:`singlestoredb.connect` function rather than creating them directly.1045See the :func:`singlestoredb.connect` function for parameter definitions.10461047See Also1048--------1049:func:`singlestoredb.connect`10501051"""10521053Warning = exceptions.Warning1054Error = exceptions.Error1055InterfaceError = exceptions.InterfaceError1056DataError = exceptions.DataError1057DatabaseError = exceptions.DatabaseError1058OperationalError = exceptions.OperationalError1059IntegrityError = exceptions.IntegrityError1060InternalError = exceptions.InternalError1061ProgrammingError = exceptions.ProgrammingError1062NotSupportedError = exceptions.NotSupportedError10631064#: Read-only DB-API parameter style1065paramstyle = 'pyformat'10661067# Must be set by subclass1068driver = ''10691070# Populated when first needed1071_map_param_converter: Optional[sqlparams.SQLParams] = None1072_positional_param_converter: Optional[sqlparams.SQLParams] = None10731074def __init__(self, **kwargs: Any):1075"""Call :func:`singlestoredb.connect` instead."""1076self.connection_params: Dict[str, Any] = kwargs1077self.errorhandler = None1078self._results_type: str = kwargs.get('results_type', None) or 'tuples'10791080#: Session encoding1081self.encoding = self.connection_params.get('charset', None) or 'utf-8'1082self.encoding = self.encoding.replace('mb4', '')10831084# Handle various authentication types1085credential_type = self.connection_params.get('credential_type', None)1086if credential_type == auth.BROWSER_SSO:1087# TODO: Cache info for token refreshes1088info = auth.get_jwt(self.connection_params['user'])1089self.connection_params['password'] = str(info)1090self.connection_params['credential_type'] = auth.JWT10911092#: Attribute-like access to global server variables1093self.globals = VariableAccessor(self, 'global')10941095#: Attribute-like access to local / session server variables1096self.locals = VariableAccessor(self, 'local')10971098#: Attribute-like access to cluster global server variables1099self.cluster_globals = VariableAccessor(self, 'cluster global')11001101#: Attribute-like access to cluster local / session server variables1102self.cluster_locals = VariableAccessor(self, 'cluster local')11031104#: Attribute-like access to all server variables1105self.vars = VariableAccessor(self, '')11061107#: Attribute-like access to all cluster server variables1108self.cluster_vars = VariableAccessor(self, 'cluster')11091110# For backwards compatibility with SQLAlchemy package1111self._driver = Driver(self.driver)11121113# Output decoders1114self.decoders: Dict[int, Callable[[Any], Any]] = {}11151116@classmethod1117def _convert_params(1118cls, oper: str,1119params: Optional[Union[Sequence[Any], Dict[str, Any], Any]],1120) -> Tuple[Any, ...]:1121"""Convert query to correct parameter format."""1122if params:11231124if cls._map_param_converter is None:1125cls._map_param_converter = sqlparams.SQLParams(1126map_paramstyle, cls.paramstyle, escape_char=True,1127)11281129if cls._positional_param_converter is None:1130cls._positional_param_converter = sqlparams.SQLParams(1131positional_paramstyle, cls.paramstyle, escape_char=True,1132)11331134is_sequence = isinstance(params, Sequence) \1135and not isinstance(params, str) \1136and not isinstance(params, bytes)1137is_mapping = isinstance(params, Mapping)11381139param_converter = cls._map_param_converter \1140if is_mapping else cls._positional_param_converter11411142if not is_sequence and not is_mapping:1143params = [params]11441145return param_converter.format(oper, params)11461147return (oper, None)11481149def autocommit(self, value: bool = True) -> None:1150"""Set autocommit mode."""1151self.locals.autocommit = bool(value)11521153@abc.abstractmethod1154def connect(self) -> 'Connection':1155"""Connect to the server."""1156raise NotImplementedError11571158def _iquery(1159self, oper: str,1160params: Optional[Union[Sequence[Any], Dict[str, Any]]] = None,1161fix_names: bool = True,1162) -> List[Dict[str, Any]]:1163"""Return the results of a query as a list of dicts (for internal use)."""1164with self.cursor() as cur:1165cur.execute(oper, params)1166if not re.match(r'^\s*(select|show|call|echo)\s+', oper, flags=re.I):1167return []1168out = list(cur.fetchall())1169if not out:1170return []1171if isinstance(out, DataFrame):1172out = out.to_dict(orient='records')1173elif isinstance(out[0], (tuple, list)):1174if cur.description:1175names = [x[0] for x in cur.description]1176if fix_names:1177names = [under2camel(str(x).replace(' ', '')) for x in names]1178out = [{k: v for k, v in zip(names, row)} for row in out]1179return out11801181@abc.abstractmethod1182def close(self) -> None:1183"""Close the database connection."""1184raise NotImplementedError11851186@abc.abstractmethod1187def commit(self) -> None:1188"""Commit the pending transaction."""1189raise NotImplementedError11901191@abc.abstractmethod1192def rollback(self) -> None:1193"""Rollback the pending transaction."""1194raise NotImplementedError11951196@abc.abstractmethod1197def cursor(self) -> Cursor:1198"""1199Create a new cursor object.12001201See Also1202--------1203:class:`Cursor`12041205Returns1206-------1207:class:`Cursor`12081209"""1210raise NotImplementedError12111212@abc.abstractproperty1213def messages(self) -> List[Tuple[int, str]]:1214"""Messages generated during the connection."""1215raise NotImplementedError12161217def __enter__(self) -> 'Connection':1218"""Enter a context."""1219return self12201221def __exit__(1222self, exc_type: Optional[object],1223exc_value: Optional[Exception], exc_traceback: Optional[str],1224) -> None:1225"""Exit a context."""1226self.close()12271228@abc.abstractmethod1229def is_connected(self) -> bool:1230"""1231Determine if the database is still connected.12321233Returns1234-------1235bool12361237"""1238raise NotImplementedError12391240def enable_data_api(self, port: Optional[int] = None) -> int:1241"""1242Enable the data API in the server.12431244Use of this method requires privileges that allow setting global1245variables and starting the HTTP proxy.12461247Parameters1248----------1249port : int, optional1250The port number that the HTTP server should run on. If this1251value is not specified, the current value of the1252``http_proxy_port`` is used.12531254See Also1255--------1256:meth:`disable_data_api`12571258Returns1259-------1260int1261port number of the HTTP server12621263"""1264if port is not None:1265self.globals.http_proxy_port = int(port)1266self.globals.http_api = True1267self._iquery('restart proxy')1268return int(self.globals.http_proxy_port)12691270enable_http_api = enable_data_api12711272def disable_data_api(self) -> None:1273"""1274Disable the data API.12751276See Also1277--------1278:meth:`enable_data_api`12791280"""1281self.globals.http_api = False1282self._iquery('restart proxy')12831284disable_http_api = disable_data_api12851286@property1287def show(self) -> ShowAccessor:1288"""Access server properties managed by the SHOW statement."""1289return ShowAccessor(self)12901291@functools.cached_property1292def vector_db(self) -> Any:1293"""1294Get vectorstore API accessor1295"""1296from vectorstore import VectorDB1297return VectorDB(connection=self)129812991300#1301# NOTE: When adding parameters to this function, you should always1302# make the value optional with a default of None. The options1303# processing framework will fill in the default value based1304# on environment variables or other configuration sources.1305#1306def connect(1307host: Optional[str] = None, user: Optional[str] = None,1308password: Optional[str] = None, port: Optional[int] = None,1309database: Optional[str] = None, driver: Optional[str] = None,1310pure_python: Optional[bool] = None, local_infile: Optional[bool] = None,1311charset: Optional[str] = None,1312ssl_key: Optional[str] = None, ssl_cert: Optional[str] = None,1313ssl_ca: Optional[str] = None, ssl_disabled: Optional[bool] = None,1314ssl_cipher: Optional[str] = None, ssl_verify_cert: Optional[bool] = None,1315tls_sni_servername: Optional[str] = None,1316ssl_verify_identity: Optional[bool] = None,1317conv: Optional[Dict[int, Callable[..., Any]]] = None,1318credential_type: Optional[str] = None,1319autocommit: Optional[bool] = None,1320results_type: Optional[str] = None,1321buffered: Optional[bool] = None,1322results_format: Optional[str] = None,1323program_name: Optional[str] = None,1324conn_attrs: Optional[Dict[str, str]] = None,1325multi_statements: Optional[bool] = None,1326client_found_rows: Optional[bool] = None,1327connect_timeout: Optional[int] = None,1328nan_as_null: Optional[bool] = None,1329inf_as_null: Optional[bool] = None,1330encoding_errors: Optional[str] = None,1331track_env: Optional[bool] = None,1332enable_extended_data_types: Optional[bool] = None,1333vector_data_format: Optional[str] = None,1334parse_json: Optional[bool] = None,1335) -> Connection:1336"""1337Return a SingleStoreDB connection.13381339Parameters1340----------1341host : str, optional1342Hostname, IP address, or URL that describes the connection.1343The scheme or protocol defines which database connector to use.1344By default, the ``mysql`` scheme is used. To connect to the1345HTTP API, the scheme can be set to ``http`` or ``https``. The username,1346password, host, and port are specified as in a standard URL. The path1347indicates the database name. The overall form of the URL is:1348``scheme://user:password@host:port/db_name``. The scheme can1349typically be left off (unless you are using the HTTP API):1350``user:password@host:port/db_name``.1351user : str, optional1352Database user name1353password : str, optional1354Database user password1355port : int, optional1356Database port. This defaults to 3306 for non-HTTP connections, 801357for HTTP connections, and 443 for HTTPS connections.1358database : str, optional1359Database name1360pure_python : bool, optional1361Use the connector in pure Python mode1362local_infile : bool, optional1363Allow local file uploads1364charset : str, optional1365Character set for string values1366ssl_key : str, optional1367File containing SSL key1368ssl_cert : str, optional1369File containing SSL certificate1370ssl_ca : str, optional1371File containing SSL certificate authority1372ssl_cipher : str, optional1373Sets the SSL cipher list1374ssl_disabled : bool, optional1375Disable SSL usage1376ssl_verify_cert : bool, optional1377Verify the server's certificate. This is automatically enabled if1378``ssl_ca`` is also specified.1379ssl_verify_identity : bool, optional1380Verify the server's identity1381conv : dict[int, Callable], optional1382Dictionary of data conversion functions1383credential_type : str, optional1384Type of authentication to use: auth.PASSWORD, auth.JWT, or auth.BROWSER_SSO1385autocommit : bool, optional1386Enable autocommits1387results_type : str, optional1388The form of the query results: tuples, namedtuples, dicts,1389numpy, polars, pandas, arrow1390buffered : bool, optional1391Should the entire query result be buffered in memory? This is the default1392behavior which allows full cursor control of the result, but does consume1393more memory.1394results_format : str, optional1395Deprecated. This option has been renamed to results_type.1396program_name : str, optional1397Name of the program1398conn_attrs : dict, optional1399Additional connection attributes for telemetry. Example:1400{'program_version': "1.0.2", "_connector_name": "dbt connector"}1401multi_statements: bool, optional1402Should multiple statements be allowed within a single query?1403connect_timeout : int, optional1404The timeout for connecting to the database in seconds.1405(default: 10, min: 1, max: 31536000)1406nan_as_null : bool, optional1407Should NaN values be treated as NULLs when used in parameter1408substitutions including uploaded data?1409inf_as_null : bool, optional1410Should Inf values be treated as NULLs when used in parameter1411substitutions including uploaded data?1412encoding_errors : str, optional1413The error handler name for value decoding errors1414track_env : bool, optional1415Should the connection track the SINGLESTOREDB_URL environment variable?1416enable_extended_data_types : bool, optional1417Should extended data types (BSON, vector) be enabled?1418vector_data_format : str, optional1419Format for vector types: json or binary14201421Examples1422--------1423Standard database connection14241425>>> conn = s2.connect('me:[email protected]/my_db')14261427Connect to HTTP API on port 808014281429>>> conn = s2.connect('http://me:[email protected]:8080/my_db')14301431Using an environment variable for connection string14321433>>> os.environ['SINGLESTOREDB_URL'] = 'me:[email protected]/my_db'1434>>> conn = s2.connect()14351436Specifying credentials using environment variables14371438>>> os.environ['SINGLESTOREDB_USER'] = 'me'1439>>> os.environ['SINGLESTOREDB_PASSWORD'] = 'p455w0rd'1440>>> conn = s2.connect('s2-host.com/my_db')14411442Specifying options with keyword parameters14431444>>> conn = s2.connect('s2-host.com/my_db', user='me', password='p455w0rd',1445local_infile=True)14461447Specifying options with URL parameters14481449>>> conn = s2.connect('s2-host.com/my_db?local_infile=True&charset=utf8')14501451Connecting within a context manager14521453>>> with s2.connect('...') as conn:1454... with conn.cursor() as cur:1455... cur.execute('...')14561457Setting session variables, the code below sets the ``autocommit`` option14581459>>> conn.locals.autocommit = True14601461Getting session variables14621463>>> conn.locals.autocommit1464True14651466See Also1467--------1468:class:`Connection`14691470Returns1471-------1472:class:`Connection`14731474"""1475params = build_params(**dict(locals()))1476driver = params.get('driver', 'mysql')14771478if not driver or driver == 'mysql':1479from .mysql.connection import Connection # type: ignore1480return Connection(**params)14811482if driver in ['http', 'https']:1483from .http.connection import Connection1484return Connection(**params)14851486raise ValueError(f'Unrecognized protocol: {driver}')148714881489