Path: blob/main/crates/polars-lazy/src/frame/exitable.rs
6939 views
use std::sync::Mutex;1use std::sync::mpsc::{Receiver, channel};23use polars_core::POOL;4use polars_utils::relaxed_cell::RelaxedCell;56use super::*;78impl LazyFrame {9pub fn collect_concurrently(self) -> PolarsResult<InProcessQuery> {10let (mut state, mut physical_plan, _) = self.prepare_collect(false, None)?;1112let (tx, rx) = channel();13let token = state.cancel_token();1415if physical_plan.is_cache_prefiller() {16#[cfg(feature = "async")]17{18polars_io::pl_async::get_runtime().spawn_blocking(move || {19let result = physical_plan.execute(&mut state);20tx.send(result).unwrap();21});22}23#[cfg(not(feature = "async"))]24{25std::thread::spawn(move || {26let result = physical_plan.execute(&mut state);27tx.send(result).unwrap();28});29}30} else {31POOL.spawn_fifo(move || {32let result = physical_plan.execute(&mut state);33tx.send(result).unwrap();34});35}3637Ok(InProcessQuery {38rx: Arc::new(Mutex::new(rx)),39token,40})41}42}4344#[derive(Clone)]45pub struct InProcessQuery {46rx: Arc<Mutex<Receiver<PolarsResult<DataFrame>>>>,47token: Arc<RelaxedCell<bool>>,48}4950impl InProcessQuery {51/// Cancel the query at earliest convenience.52pub fn cancel(&self) {53self.token.store(true)54}5556/// Fetch the result.57///58/// If it is ready, a materialized DataFrame is returned.59/// If it is not ready it will return `None`.60pub fn fetch(&self) -> Option<PolarsResult<DataFrame>> {61let rx = self.rx.lock().unwrap();62rx.try_recv().ok()63}6465/// Await the result synchronously.66pub fn fetch_blocking(&self) -> PolarsResult<DataFrame> {67let rx = self.rx.lock().unwrap();68rx.recv().unwrap()69}70}7172impl Drop for InProcessQuery {73fn drop(&mut self) {74self.token.store(true);75}76}777879