Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/utils/events.py
469 views
1
#!/usr/bin/env python
2
from typing import Any
3
from typing import Callable
4
from typing import Dict
5
from typing import Set
6
7
try:
8
from IPython import get_ipython
9
has_ipython = True
10
except ImportError:
11
has_ipython = False
12
13
14
_subscribers: Set[Callable[[Dict[str, Any]], None]] = set()
15
16
17
def subscribe(func: Callable[[Dict[str, Any]], None]) -> None:
18
"""
19
Subscribe to SingleStore portal events.
20
21
Parameters
22
----------
23
func : Callable
24
The function to call when an event is received
25
26
"""
27
_subscribers.add(func)
28
29
30
def unsubscribe(func: Callable[[Dict[str, Any]], None]) -> None:
31
"""
32
Unsubscribe from SingleStore portal events.
33
34
Parameters
35
----------
36
func : Callable
37
The function to call when an event is received
38
39
"""
40
try:
41
_subscribers.remove(func)
42
except KeyError:
43
pass
44
45
46
def _event_handler(stream: Any, ident: Any, msg: Dict[str, Any]) -> None:
47
"""Handle request on the control stream."""
48
if not _subscribers or not isinstance(msg, dict):
49
return
50
51
content = msg.get('content', {})
52
if content.get('type', '') != 'event':
53
return
54
55
for func in _subscribers:
56
func(content)
57
58
59
# Inject a control handler to receive SingleStore events
60
if has_ipython:
61
try:
62
_handlers = get_ipython().kernel.control_handlers
63
_handlers['singlestore_portal_request'] = _event_handler
64
except AttributeError:
65
pass
66
67