Path: blob/main/singlestoredb/fusion/graphql.py
469 views
import itertools1import os2from typing import Any3from typing import Callable4from typing import Dict5from typing import List6from typing import Optional7from typing import Tuple8from typing import Union910import requests1112from . import result13from .result import FusionSQLResult141516API_URL = 'https://backend.singlestore.com/public'171819def pass_through(x: Any) -> Any:20"""Pass a value through."""21return x222324def find_path(d: Dict[str, Any], path: str) -> Tuple[bool, Any]:25"""26Find key path in a dictionary.2728Parameters29----------30d : Dict[str, Any]31Dictionary to search32path : str33Period-delimited string indicating nested keys3435Returns36-------37(bool, Any) - bool indicating whether or not the path was found38and the result itself3940"""41curr = d42keys = path.split('.')43for i, k in enumerate(keys):44if k in curr:45curr = curr[k]46if not isinstance(curr, dict):47break48else:49return False, None50if (i + 1) == len(keys):51return True, curr52return False, None535455class GraphQueryField(object):56"""57Field in a GraphQuery result.5859Parameters60----------61path : str62Period-delimited path to the result63dtype : int, optional64MySQL data type of the result, defaults to string65converter : function, optional66Convert for data value6768"""6970_sort_index_count = itertools.count()7172def __init__(73self,74path: str,75dtype: int = result.STRING,76include: Union[str, List[str]] = '',77converter: Optional[Callable[[Any], Any]] = pass_through,78) -> None:79self.path = path80self.dtype = dtype81self.include = [include] if isinstance(include, str) else include82self.include = [x for x in self.include if x]83self.converter = converter84self._sort_index = next(type(self)._sort_index_count)8586def get_path(self, value: Any) -> Tuple[bool, Any]:87"""88Retrieve the field path in the given object.8990Parameters91----------92value : Any93Object parsed from nested dictionary object9495Returns96-------97(bool, Any) - bool indicating whether the path was found and98the result itself99100"""101found, out = find_path(value, self.path)102if self.converter is not None:103return found, self.converter(out)104return found, out105106107class GraphQuery(object):108"""109Base class for all GraphQL classes.110111Parameters112----------113api_token : str, optional114API token to access the GraphQL endpoint115api_url : str, optional116GraphQL endpoint117118"""119120def __init__(121self,122api_token: str = '',123api_url: str = API_URL,124) -> None:125self.api_token = api_token126self.api_url = api_url127128@classmethod129def get_query(cls) -> str:130"""Return the GraphQL for the class."""131return cls.__doc__ or ''132133@classmethod134def get_fields(cls) -> List[Tuple[str, GraphQueryField]]:135"""136Return fields for the query.137138Parameters139----------140groups : str141List of group characters to include142143Returns144-------145List[Tuple[str, QueryField]] - tuple pairs of field name and definition146147"""148attrs = [(k, v) for k, v in vars(cls).items() if isinstance(v, GraphQueryField)]149attrs = list(sorted(attrs, key=lambda x: x[1]._sort_index))150return attrs151152def run(153self,154variables: Optional[Dict[str, Any]] = None,155*,156filter_expr: str = '',157) -> FusionSQLResult:158"""159Run the query.160161Parameters162----------163variables : Dict[str, Any], optional164Dictionary of substitution parameters165166Returns167-------168FusionSQLResult169170"""171api_token = self.api_token or os.environ.get('SINGLESTOREDB_BACKEND_TOKEN')172res = requests.post(173self.api_url,174headers={175'Content-Type': 'application/json',176'Authorization': f'Bearer {api_token}',177},178json={179'query': type(self).get_query(),180'variables': variables or {},181},182)183184if res.status_code != 200:185raise ValueError(f'an error occurred: {res.text}')186187json = res.json()188189if json['data']:190data = json['data'].popitem()[-1]191if isinstance(data, Dict):192data = [data]193else:194data = []195196fres = FusionSQLResult()197198rows = []199fields = type(self).get_fields()200for i, obj in enumerate(data):201row = []202for name, field in fields:203found, value = field.get_path(obj)204if found:205if i == 0:206fres.add_field(name, field.dtype)207row.append(value)208rows.append(tuple(row))209210fres.set_rows(rows)211212return fres213214215