Path: blob/main/singlestoredb/utils/events.py
469 views
#!/usr/bin/env python1from typing import Any2from typing import Callable3from typing import Dict4from typing import Set56try:7from IPython import get_ipython8has_ipython = True9except ImportError:10has_ipython = False111213_subscribers: Set[Callable[[Dict[str, Any]], None]] = set()141516def subscribe(func: Callable[[Dict[str, Any]], None]) -> None:17"""18Subscribe to SingleStore portal events.1920Parameters21----------22func : Callable23The function to call when an event is received2425"""26_subscribers.add(func)272829def unsubscribe(func: Callable[[Dict[str, Any]], None]) -> None:30"""31Unsubscribe from SingleStore portal events.3233Parameters34----------35func : Callable36The function to call when an event is received3738"""39try:40_subscribers.remove(func)41except KeyError:42pass434445def _event_handler(stream: Any, ident: Any, msg: Dict[str, Any]) -> None:46"""Handle request on the control stream."""47if not _subscribers or not isinstance(msg, dict):48return4950content = msg.get('content', {})51if content.get('type', '') != 'event':52return5354for func in _subscribers:55func(content)565758# Inject a control handler to receive SingleStore events59if has_ipython:60try:61_handlers = get_ipython().kernel.control_handlers62_handlers['singlestore_portal_request'] = _event_handler63except AttributeError:64pass656667