Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/scan/mod.rs
6940 views
1
#[cfg(feature = "python")]
2
mod python_scan;
3
4
use std::mem;
5
6
use polars_utils::slice_enum::Slice;
7
8
#[cfg(feature = "python")]
9
pub(crate) use self::python_scan::*;
10
use super::*;
11
use crate::ScanPredicate;
12
use crate::prelude::*;
13
14
/// Producer of an in memory DataFrame
15
pub struct DataFrameExec {
16
pub(crate) df: Arc<DataFrame>,
17
pub(crate) projection: Option<Vec<PlSmallStr>>,
18
}
19
20
impl Executor for DataFrameExec {
21
fn execute(&mut self, _state: &mut ExecutionState) -> PolarsResult<DataFrame> {
22
let df = mem::take(&mut self.df);
23
let mut df = Arc::try_unwrap(df).unwrap_or_else(|df| (*df).clone());
24
25
// projection should be before selection as those are free
26
// TODO: this is only the case if we don't create new columns
27
if let Some(projection) = &self.projection {
28
df = df.select(projection.iter().cloned())?;
29
}
30
31
Ok(df)
32
}
33
}
34
35
pub(crate) struct AnonymousScanExec {
36
pub(crate) function: Arc<dyn AnonymousScan>,
37
pub(crate) unified_scan_args: Box<UnifiedScanArgs>,
38
pub(crate) file_info: FileInfo,
39
pub(crate) predicate: Option<ScanPredicate>,
40
pub(crate) output_schema: Option<SchemaRef>,
41
pub(crate) predicate_has_windows: bool,
42
}
43
44
impl Executor for AnonymousScanExec {
45
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
46
let mut args = AnonymousScanArgs {
47
n_rows: self.unified_scan_args.pre_slice.clone().map(|x| {
48
assert!(matches!(x, Slice::Positive { offset: 0, .. }));
49
50
x.len()
51
}),
52
with_columns: self.unified_scan_args.projection.clone(),
53
schema: self.file_info.schema.clone(),
54
output_schema: self.output_schema.clone(),
55
predicate: None,
56
};
57
if self.predicate.is_some() {
58
state.insert_has_window_function_flag()
59
}
60
61
match (self.function.allows_predicate_pushdown(), &self.predicate) {
62
(true, Some(predicate)) => state.record(
63
|| {
64
args.predicate = predicate.predicate.as_expression().cloned();
65
self.function.scan(args)
66
},
67
"anonymous_scan".into(),
68
),
69
(false, Some(predicate)) => state.record(
70
|| {
71
let mut df = self.function.scan(args)?;
72
let s = predicate.predicate.evaluate(&df, state)?;
73
if self.predicate_has_windows {
74
state.clear_window_expr_cache()
75
}
76
let mask = s.bool().map_err(
77
|_| polars_err!(ComputeError: "filter predicate was not of type boolean"),
78
)?;
79
df = df.filter(mask)?;
80
81
Ok(df)
82
},
83
"anonymous_scan".into(),
84
),
85
_ => state.record(|| self.function.scan(args), "anonymous_scan".into()),
86
}
87
}
88
}
89
90