Path: blob/main/singlestoredb/magics/run_personal.py
469 views
import os1import tempfile2from pathlib import Path3from typing import Any4from warnings import warn56from IPython.core.interactiveshell import InteractiveShell7from IPython.core.magic import line_magic8from IPython.core.magic import Magics9from IPython.core.magic import magics_class10from IPython.core.magic import needs_local_scope11from IPython.core.magic import no_var_expand12from IPython.utils.contexts import preserve_keys13from IPython.utils.syspathcontext import prepended_to_syspath14from jinja2 import Template151617@magics_class18class RunPersonalMagic(Magics):19def __init__(self, shell: InteractiveShell):20Magics.__init__(self, shell=shell)2122@no_var_expand23@needs_local_scope24@line_magic('run_personal')25def run_personal(self, line: str, local_ns: Any = None) -> Any:26"""27Downloads a personal file using the %sql magic and then runs it using %run.2829Examples::3031# Line usage3233%run_personal personal_file.ipynb3435%run_personal {{ sample_notebook_name }}3637"""3839template = Template(line.strip())40personal_file = template.render(local_ns)41if not personal_file:42raise ValueError('No personal file specified.')43if (personal_file.startswith("'") and personal_file.endswith("'")) or \44(personal_file.startswith('"') and personal_file.endswith('"')):45personal_file = personal_file[1:-1]46if not personal_file:47raise ValueError('No personal file specified.')4849with tempfile.TemporaryDirectory() as temp_dir:50temp_file_path = os.path.join(temp_dir, personal_file)51sql_command = (52f"DOWNLOAD PERSONAL FILE '{personal_file}' "53f"TO '{temp_file_path}'"54)5556# Execute the SQL command57self.shell.run_line_magic('sql', sql_command)58# Run the downloaded file59with preserve_keys(self.shell.user_ns, '__file__'):60self.shell.user_ns['__file__'] = temp_file_path61self.safe_execfile_ipy(temp_file_path, raise_exceptions=True)6263def safe_execfile_ipy(64self,65fname: str,66shell_futures: bool = False,67raise_exceptions: bool = False,68) -> None:69"""Like safe_execfile, but for .ipy or .ipynb files with IPython syntax.7071Parameters72----------73fname : str74The name of the file to execute. The filename must have a75.ipy or .ipynb extension.76shell_futures : bool (False)77If True, the code will share future statements with the interactive78shell. It will both be affected by previous __future__ imports, and79any __future__ imports in the code will affect the shell. If False,80__future__ imports are not shared in either direction.81raise_exceptions : bool (False)82If True raise exceptions everywhere. Meant for testing.83"""84fpath = Path(fname).expanduser().resolve()8586# Make sure we can open the file87try:88with fpath.open('rb'):89pass90except Exception:91warn('Could not open file <%s> for safe execution.' % fpath)92return9394# Find things also in current directory. This is needed to mimic the95# behavior of running a script from the system command line, where96# Python inserts the script's directory into sys.path97dname = str(fpath.parent)9899def get_cells() -> Any:100"""generator for sequence of code blocks to run"""101if fpath.suffix == '.ipynb':102from nbformat import read103nb = read(fpath, as_version=4)104if not nb.cells:105return106for cell in nb.cells:107if cell.cell_type == 'code':108if not cell.source.strip():109continue110if getattr(cell, 'metadata', {}).get('language', '') == 'sql':111output_redirect = getattr(112cell, 'metadata', {},113).get('output_variable', '') or ''114if output_redirect:115output_redirect = f' {output_redirect} <<'116yield f'%%sql{output_redirect}\n{cell.source}'117else:118yield cell.source119else:120yield fpath.read_text(encoding='utf-8')121122with prepended_to_syspath(dname):123try:124for cell in get_cells():125result = self.shell.run_cell(126cell, silent=True, shell_futures=shell_futures,127)128if raise_exceptions:129result.raise_error()130elif not result.success:131break132except Exception:133if raise_exceptions:134raise135self.shell.showtraceback()136warn('Unknown failure executing file: <%s>' % fpath)137138139