Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/sqlx/magic.py
469 views
1
import os
2
from typing import Any
3
from typing import Optional
4
5
from IPython.core.interactiveshell import InteractiveShell
6
from IPython.core.magic import cell_magic
7
from IPython.core.magic import line_magic
8
from IPython.core.magic import Magics
9
from IPython.core.magic import magics_class
10
from IPython.core.magic import needs_local_scope
11
from IPython.core.magic import no_var_expand
12
from sql.magic import SqlMagic
13
from sqlalchemy import create_engine
14
from sqlalchemy import Engine
15
from sqlalchemy import PoolProxiedConnection
16
17
DEFAULT_POOL_SIZE = 10 # Maximum number of connections in the pool
18
DEFAULT_MAX_OVERFLOW = 5 # additional connections (temporary overflow)
19
DEFAULT_POOL_TIMEOUT = 30 # Wait time for a connection from the pool
20
21
22
@magics_class
23
class SqlxMagic(Magics):
24
def __init__(self, shell: InteractiveShell):
25
Magics.__init__(self, shell=shell)
26
self.magic = SqlMagic(shell)
27
self.engine: Optional['Engine'] = None
28
29
@no_var_expand
30
@needs_local_scope
31
@line_magic('sqlx')
32
@cell_magic('sqlx')
33
def sqlx(self, line: str, cell: str = '', local_ns: Any = None) -> Any:
34
"""
35
Runs SQL statement against a database, specified by
36
SQLAlchemy connect string present in DATABASE_URL environment variable.
37
38
The magic can be used both as a cell magic `%%sqlx` and
39
line magic `%sqlx` (see examples below).
40
41
This is a thin wrapper around the [jupysql](https://jupysql.ploomber.io/) magic,
42
allowing multi-threaded execution.
43
A connection pool will be maintained internally.
44
45
Examples::
46
47
# Line usage
48
49
%sqlx SELECT * FROM mytable
50
51
result = %sqlx SELECT 1
52
53
54
# Cell usage
55
56
%%sqlx
57
DELETE FROM mytable
58
59
%%sqlx
60
DROP TABLE mytable
61
62
"""
63
64
connection = self.get_connection()
65
try:
66
result = self.magic.execute(line, cell, local_ns, connection)
67
finally:
68
connection.close()
69
70
return result
71
72
def get_connection(self) -> PoolProxiedConnection:
73
if self.engine is None:
74
if 'DATABASE_URL' not in os.environ:
75
raise RuntimeError(
76
'Cannot create connection pool, environment variable'
77
" 'DATABASE_URL' is missing.",
78
)
79
80
# TODO: allow configuring engine
81
# idea: %sqlx engine
82
# idea: %%sqlx engine
83
self.engine = create_engine(
84
os.environ['DATABASE_URL'],
85
pool_size=DEFAULT_POOL_SIZE,
86
max_overflow=DEFAULT_MAX_OVERFLOW,
87
pool_timeout=DEFAULT_POOL_TIMEOUT,
88
)
89
90
return self.engine.raw_connection()
91
92
93
# In order to actually use these magics, you must register them with a
94
# running IPython.
95
96
97
def load_ipython_extension(ip: InteractiveShell) -> None:
98
"""
99
Any module file that define a function named `load_ipython_extension`
100
can be loaded via `%load_ext module.path` or be configured to be
101
autoloaded by IPython at startup time.
102
"""
103
104
# Load jupysql extension
105
# This is necessary for jupysql to initialize internal state
106
# required to render messages
107
assert ip.extension_manager is not None
108
result = ip.extension_manager.load_extension('sql')
109
if result == 'no load function':
110
raise RuntimeError('Could not load sql extension. Is jupysql installed?')
111
112
# Register sqlx
113
ip.register_magics(SqlxMagic(ip))
114
115