Path: blob/main/singlestoredb/mysql/converters.py
469 views
import datetime1import time2from decimal import Decimal3from typing import Any4from typing import Callable5from typing import Dict6from typing import Optional7from typing import Tuple8from typing import Union910from ..converters import converters as decoders11from .err import ProgrammingError1213try:14import numpy as np15has_numpy = True16except ImportError:17has_numpy = False1819try:20import shapely.geometry21import shapely.wkt22has_shapely = True23except ImportError:24has_shapely = False2526try:27import pygeos28has_pygeos = True29except ImportError:30has_pygeos = False313233Encoders = Dict[type, Callable[..., Union[str, Dict[str, str]]]]343536def escape_item(val: Any, charset: str, mapping: Optional[Encoders] = None) -> str:37if mapping is None:38mapping = encoders39encoder = mapping.get(type(val), None)4041# Fallback to default when no encoder found42if encoder is None:43try:44encoder = mapping[str]45except KeyError:46raise TypeError('no default type converter defined')4748if encoder in (escape_dict, escape_sequence):49val = encoder(val, charset, mapping)50else:51val = encoder(val, mapping)52return val535455def escape_dict(56val: Dict[str, Any],57charset: str,58mapping: Optional[Encoders] = None,59) -> Dict[str, str]:60n = {}61for k, v in val.items():62quoted = escape_item(v, charset, mapping)63n[k] = quoted64return n656667def escape_sequence(68val: Any,69charset: str,70mapping: Optional[Encoders] = None,71) -> str:72n = []73for item in val:74quoted = escape_item(item, charset, mapping)75n.append(quoted)76return '(' + ','.join(n) + ')'777879def escape_set(val: Any, charset: str, mapping: Optional[Encoders] = None) -> str:80return ','.join([escape_item(x, charset, mapping) for x in val])818283def escape_bool(value: Any, mapping: Optional[Encoders] = None) -> str:84return str(int(value))858687def escape_int(value: Any, mapping: Optional[Encoders] = None) -> str:88return str(value)899091def escape_float(92value: Any,93mapping: Optional[Encoders] = None,94nan_as_null: bool = False,95inf_as_null: bool = False,96) -> str:97s = repr(value)98if s == 'nan':99if nan_as_null:100return 'NULL'101raise ProgrammingError(0, '%s can not be used with SingleStoreDB' % s)102if s == 'inf':103if inf_as_null:104return 'NULL'105raise ProgrammingError(0, '%s can not be used with SingleStoreDB' % s)106if 'e' not in s:107s += 'e0'108return s109110111_escape_table = [chr(x) for x in range(128)]112_escape_table[0] = '\\0'113_escape_table[ord('\\')] = '\\\\'114_escape_table[ord('\n')] = '\\n'115_escape_table[ord('\r')] = '\\r'116_escape_table[ord('\032')] = '\\Z'117_escape_table[ord('"')] = '\\"'118_escape_table[ord("'")] = "\\'"119120121def escape_string(value: str, mapping: Optional[Encoders] = None) -> str:122"""123Escapes *value* without adding quote.124125Value should be unicode126127"""128return value.translate(_escape_table)129130131def escape_bytes_prefixed(value: bytes, mapping: Optional[Encoders] = None) -> str:132return "_binary X'{}'".format(value.hex())133134135def escape_bytes(value: bytes, mapping: Optional[Encoders] = None) -> str:136return "X'{}'".format(value.hex())137138139def escape_str(value: str, mapping: Optional[Encoders] = None) -> str:140return "'{}'".format(escape_string(str(value), mapping))141142143def escape_None(value: str, mapping: Optional[Encoders] = None) -> str:144return 'NULL'145146147def escape_timedelta(obj: datetime.timedelta, mapping: Optional[Encoders] = None) -> str:148seconds = int(obj.seconds) % 60149minutes = int(obj.seconds // 60) % 60150hours = int(obj.seconds // 3600) % 24 + int(obj.days) * 24151if obj.microseconds:152fmt = "'{0:02d}:{1:02d}:{2:02d}.{3:06d}'"153else:154fmt = "'{0:02d}:{1:02d}:{2:02d}'"155return fmt.format(hours, minutes, seconds, obj.microseconds)156157158def escape_time(obj: datetime.time, mapping: Optional[Encoders] = None) -> str:159if obj.microsecond:160fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'"161else:162fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}'"163return fmt.format(obj)164165166def escape_datetime(obj: datetime.datetime, mapping: Optional[Encoders] = None) -> str:167if obj.microsecond:168fmt = "'{0.year:04}-{0.month:02}-{0.day:02} " \169"{0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'"170else:171fmt = "'{0.year:04}-{0.month:02}-{0.day:02} " \172"{0.hour:02}:{0.minute:02}:{0.second:02}'"173return fmt.format(obj)174175176def escape_date(obj: datetime.date, mapping: Optional[Encoders] = None) -> str:177fmt = "'{0.year:04}-{0.month:02}-{0.day:02}'"178return fmt.format(obj)179180181def escape_struct_time(obj: Tuple[Any, ...], mapping: Optional[Encoders] = None) -> str:182return escape_datetime(datetime.datetime(*obj[:6]))183184185def Decimal2Literal(o: Any, d: Any) -> str:186return format(o, 'f')187188189def through(x: Any) -> Any:190return x191192193# def convert_bit(b):194# b = "\x00" * (8 - len(b)) + b # pad w/ zeroes195# return struct.unpack(">Q", b)[0]196#197# the snippet above is right, but MySQLdb doesn't process bits,198# so we shouldn't either199convert_bit = through200201202encoders: Encoders = {203bool: escape_bool,204int: escape_int,205float: escape_float,206str: escape_str,207bytes: escape_bytes,208tuple: escape_sequence,209list: escape_sequence,210set: escape_sequence,211frozenset: escape_sequence,212dict: escape_dict,213type(None): escape_None,214datetime.date: escape_date,215datetime.datetime: escape_datetime,216datetime.timedelta: escape_timedelta,217datetime.time: escape_time,218time.struct_time: escape_struct_time,219Decimal: Decimal2Literal,220}221222if has_numpy:223224def escape_numpy(value: Any, mapping: Optional[Encoders] = None) -> str:225"""Convert numpy arrays to vectors of bytes."""226return escape_bytes(value.tobytes(), mapping=mapping)227228encoders[np.ndarray] = escape_numpy229encoders[np.float16] = escape_float230encoders[np.float32] = escape_float231encoders[np.float64] = escape_float232if hasattr(np, 'float128'):233encoders[np.float128] = escape_float234encoders[np.uint] = escape_int235encoders[np.uint8] = escape_int236encoders[np.uint16] = escape_int237encoders[np.uint32] = escape_int238encoders[np.uint64] = escape_int239encoders[np.integer] = escape_int240encoders[np.int_] = escape_int241encoders[np.int8] = escape_int242encoders[np.int16] = escape_int243encoders[np.int32] = escape_int244encoders[np.int64] = escape_int245246if has_shapely:247248def escape_shapely(value: Any, mapping: Optional[Encoders] = None) -> str:249"""Convert shapely geo objects."""250return escape_str(shapely.wkt.dumps(value), mapping=mapping)251252encoders[shapely.geometry.Polygon] = escape_shapely253encoders[shapely.geometry.Point] = escape_shapely254encoders[shapely.geometry.LineString] = escape_shapely255256if has_pygeos:257258def escape_pygeos(value: Any, mapping: Optional[Encoders] = None) -> str:259"""Convert pygeos objects."""260return escape_str(pygeos.io.to_wkt(value), mapping=mapping)261262encoders[pygeos.Geometry] = escape_pygeos263264265# for MySQLdb compatibility266conversions = encoders.copy() # type: ignore267conversions.update(decoders) # type: ignore268Thing2Literal = escape_str269270# Run doctests with `pytest --doctest-modules pymysql/converters.py`271272273