Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/magics/run_personal.py
469 views
1
import os
2
import tempfile
3
from pathlib import Path
4
from typing import Any
5
from warnings import warn
6
7
from IPython.core.interactiveshell import InteractiveShell
8
from IPython.core.magic import line_magic
9
from IPython.core.magic import Magics
10
from IPython.core.magic import magics_class
11
from IPython.core.magic import needs_local_scope
12
from IPython.core.magic import no_var_expand
13
from IPython.utils.contexts import preserve_keys
14
from IPython.utils.syspathcontext import prepended_to_syspath
15
from jinja2 import Template
16
17
18
@magics_class
19
class RunPersonalMagic(Magics):
20
def __init__(self, shell: InteractiveShell):
21
Magics.__init__(self, shell=shell)
22
23
@no_var_expand
24
@needs_local_scope
25
@line_magic('run_personal')
26
def run_personal(self, line: str, local_ns: Any = None) -> Any:
27
"""
28
Downloads a personal file using the %sql magic and then runs it using %run.
29
30
Examples::
31
32
# Line usage
33
34
%run_personal personal_file.ipynb
35
36
%run_personal {{ sample_notebook_name }}
37
38
"""
39
40
template = Template(line.strip())
41
personal_file = template.render(local_ns)
42
if not personal_file:
43
raise ValueError('No personal file specified.')
44
if (personal_file.startswith("'") and personal_file.endswith("'")) or \
45
(personal_file.startswith('"') and personal_file.endswith('"')):
46
personal_file = personal_file[1:-1]
47
if not personal_file:
48
raise ValueError('No personal file specified.')
49
50
with tempfile.TemporaryDirectory() as temp_dir:
51
temp_file_path = os.path.join(temp_dir, personal_file)
52
sql_command = (
53
f"DOWNLOAD PERSONAL FILE '{personal_file}' "
54
f"TO '{temp_file_path}'"
55
)
56
57
# Execute the SQL command
58
self.shell.run_line_magic('sql', sql_command)
59
# Run the downloaded file
60
with preserve_keys(self.shell.user_ns, '__file__'):
61
self.shell.user_ns['__file__'] = temp_file_path
62
self.safe_execfile_ipy(temp_file_path, raise_exceptions=True)
63
64
def safe_execfile_ipy(
65
self,
66
fname: str,
67
shell_futures: bool = False,
68
raise_exceptions: bool = False,
69
) -> None:
70
"""Like safe_execfile, but for .ipy or .ipynb files with IPython syntax.
71
72
Parameters
73
----------
74
fname : str
75
The name of the file to execute. The filename must have a
76
.ipy or .ipynb extension.
77
shell_futures : bool (False)
78
If True, the code will share future statements with the interactive
79
shell. It will both be affected by previous __future__ imports, and
80
any __future__ imports in the code will affect the shell. If False,
81
__future__ imports are not shared in either direction.
82
raise_exceptions : bool (False)
83
If True raise exceptions everywhere. Meant for testing.
84
"""
85
fpath = Path(fname).expanduser().resolve()
86
87
# Make sure we can open the file
88
try:
89
with fpath.open('rb'):
90
pass
91
except Exception:
92
warn('Could not open file <%s> for safe execution.' % fpath)
93
return
94
95
# Find things also in current directory. This is needed to mimic the
96
# behavior of running a script from the system command line, where
97
# Python inserts the script's directory into sys.path
98
dname = str(fpath.parent)
99
100
def get_cells() -> Any:
101
"""generator for sequence of code blocks to run"""
102
if fpath.suffix == '.ipynb':
103
from nbformat import read
104
nb = read(fpath, as_version=4)
105
if not nb.cells:
106
return
107
for cell in nb.cells:
108
if cell.cell_type == 'code':
109
if not cell.source.strip():
110
continue
111
if getattr(cell, 'metadata', {}).get('language', '') == 'sql':
112
output_redirect = getattr(
113
cell, 'metadata', {},
114
).get('output_variable', '') or ''
115
if output_redirect:
116
output_redirect = f' {output_redirect} <<'
117
yield f'%%sql{output_redirect}\n{cell.source}'
118
else:
119
yield cell.source
120
else:
121
yield fpath.read_text(encoding='utf-8')
122
123
with prepended_to_syspath(dname):
124
try:
125
for cell in get_cells():
126
result = self.shell.run_cell(
127
cell, silent=True, shell_futures=shell_futures,
128
)
129
if raise_exceptions:
130
result.raise_error()
131
elif not result.success:
132
break
133
except Exception:
134
if raise_exceptions:
135
raise
136
self.shell.showtraceback()
137
warn('Unknown failure executing file: <%s>' % fpath)
138
139