Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/frame/exitable.rs
6939 views
1
use std::sync::Mutex;
2
use std::sync::mpsc::{Receiver, channel};
3
4
use polars_core::POOL;
5
use polars_utils::relaxed_cell::RelaxedCell;
6
7
use super::*;
8
9
impl LazyFrame {
10
pub fn collect_concurrently(self) -> PolarsResult<InProcessQuery> {
11
let (mut state, mut physical_plan, _) = self.prepare_collect(false, None)?;
12
13
let (tx, rx) = channel();
14
let token = state.cancel_token();
15
16
if physical_plan.is_cache_prefiller() {
17
#[cfg(feature = "async")]
18
{
19
polars_io::pl_async::get_runtime().spawn_blocking(move || {
20
let result = physical_plan.execute(&mut state);
21
tx.send(result).unwrap();
22
});
23
}
24
#[cfg(not(feature = "async"))]
25
{
26
std::thread::spawn(move || {
27
let result = physical_plan.execute(&mut state);
28
tx.send(result).unwrap();
29
});
30
}
31
} else {
32
POOL.spawn_fifo(move || {
33
let result = physical_plan.execute(&mut state);
34
tx.send(result).unwrap();
35
});
36
}
37
38
Ok(InProcessQuery {
39
rx: Arc::new(Mutex::new(rx)),
40
token,
41
})
42
}
43
}
44
45
#[derive(Clone)]
46
pub struct InProcessQuery {
47
rx: Arc<Mutex<Receiver<PolarsResult<DataFrame>>>>,
48
token: Arc<RelaxedCell<bool>>,
49
}
50
51
impl InProcessQuery {
52
/// Cancel the query at earliest convenience.
53
pub fn cancel(&self) {
54
self.token.store(true)
55
}
56
57
/// Fetch the result.
58
///
59
/// If it is ready, a materialized DataFrame is returned.
60
/// If it is not ready it will return `None`.
61
pub fn fetch(&self) -> Option<PolarsResult<DataFrame>> {
62
let rx = self.rx.lock().unwrap();
63
rx.try_recv().ok()
64
}
65
66
/// Await the result synchronously.
67
pub fn fetch_blocking(&self) -> PolarsResult<DataFrame> {
68
let rx = self.rx.lock().unwrap();
69
rx.recv().unwrap()
70
}
71
}
72
73
impl Drop for InProcessQuery {
74
fn drop(&mut self) {
75
self.token.store(true);
76
}
77
}
78
79