Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/count_star.rs
8430 views
1
use std::env;
2
3
use polars_buffer::Buffer;
4
use polars_io::cloud::CloudOptions;
5
use polars_utils::pl_path::PlRefPath;
6
7
use super::*;
8
9
pub(super) struct CountStar;
10
11
impl CountStar {
12
pub(super) fn new() -> Self {
13
Self
14
}
15
}
16
17
const ENV_VAR_NAME: &str = "POLARS_NO_FAST_FILE_COUNT";
18
19
impl CountStar {
20
// Replace select count(*) from datasource with specialized map function.
21
pub(super) fn optimize_plan(
22
&mut self,
23
lp_arena: &mut Arena<IR>,
24
expr_arena: &mut Arena<AExpr>,
25
mut node: Node,
26
) -> PolarsResult<Option<IR>> {
27
// New-streaming always puts a sink on top.
28
if let IR::Sink { input, .. } = lp_arena.get(node) {
29
node = *input;
30
}
31
32
match env::var(ENV_VAR_NAME).as_deref() {
33
// Setting the value to 1 disables this optimization pass.
34
Ok("1") => return Ok(None),
35
// If the options is set to 0 or not set we allow the optimization.
36
Ok("0") | Err(_) => (),
37
Ok(v) => panic!("{ENV_VAR_NAME} must be one of ('0', '1'), got: {v}"),
38
}
39
40
Ok(
41
visit_logical_plan_for_scan_paths(node, lp_arena, expr_arena, false).map(
42
|count_star_expr| {
43
// MapFunction needs a leaf node, hence we create a dummy placeholder node
44
let placeholder = IR::DataFrameScan {
45
df: Arc::new(Default::default()),
46
schema: Arc::new(Default::default()),
47
output_schema: None,
48
};
49
let placeholder_node = lp_arena.add(placeholder);
50
51
let alp = IR::MapFunction {
52
input: placeholder_node,
53
function: FunctionIR::FastCount {
54
sources: count_star_expr.sources,
55
scan_type: count_star_expr.scan_type,
56
alias: count_star_expr.alias,
57
},
58
};
59
60
lp_arena.replace(count_star_expr.node, alp.clone());
61
alp
62
},
63
),
64
)
65
}
66
}
67
68
struct CountStarExpr {
69
// Top node of the projection to replace
70
node: Node,
71
// Paths to the input files
72
sources: ScanSources,
73
cloud_options: Option<CloudOptions>,
74
// File Type
75
scan_type: Box<FileScanIR>,
76
// Column Alias
77
alias: Option<PlSmallStr>,
78
}
79
80
// Visit the logical plan and return CountStarExpr with the expr information gathered
81
// Return None if query is not a simple COUNT(*) FROM SOURCE
82
fn visit_logical_plan_for_scan_paths(
83
node: Node,
84
lp_arena: &Arena<IR>,
85
expr_arena: &Arena<AExpr>,
86
inside_union: bool, // Inside union's we do not check for COUNT(*) expression
87
) -> Option<CountStarExpr> {
88
match lp_arena.get(node) {
89
IR::Union { inputs, .. } => {
90
enum MutableSources {
91
Paths(Vec<PlRefPath>),
92
Buffers(Vec<Buffer<u8>>),
93
}
94
95
let mut scan_type: Option<Box<FileScanIR>> = None;
96
let mut cloud_options = None;
97
let mut sources = None;
98
99
for input in inputs {
100
match visit_logical_plan_for_scan_paths(*input, lp_arena, expr_arena, true) {
101
Some(expr) => {
102
match (expr.sources, &mut sources) {
103
(
104
ScanSources::Paths(paths),
105
Some(MutableSources::Paths(mutable_paths)),
106
) => mutable_paths.extend_from_slice(&paths[..]),
107
(ScanSources::Paths(paths), None) => {
108
sources = Some(MutableSources::Paths(paths.to_vec()))
109
},
110
(
111
ScanSources::Buffers(buffers),
112
Some(MutableSources::Buffers(mutable_buffers)),
113
) => mutable_buffers.extend_from_slice(&buffers[..]),
114
(ScanSources::Buffers(buffers), None) => {
115
sources = Some(MutableSources::Buffers(buffers.to_vec()))
116
},
117
_ => return None,
118
}
119
120
// Take the first Some(_) cloud option
121
// TODO: Should check the cloud types are the same.
122
cloud_options = cloud_options.or(expr.cloud_options);
123
124
match &scan_type {
125
None => scan_type = Some(expr.scan_type),
126
Some(scan_type) => {
127
// All scans must be of the same type (e.g. csv / parquet)
128
if std::mem::discriminant(&**scan_type)
129
!= std::mem::discriminant(&*expr.scan_type)
130
{
131
return None;
132
}
133
},
134
};
135
},
136
None => return None,
137
}
138
}
139
Some(CountStarExpr {
140
sources: match sources {
141
Some(MutableSources::Paths(paths)) => ScanSources::Paths(paths.into()),
142
Some(MutableSources::Buffers(buffers)) => ScanSources::Buffers(buffers.into()),
143
None => ScanSources::default(),
144
},
145
scan_type: scan_type.unwrap(),
146
cloud_options,
147
node,
148
alias: None,
149
})
150
},
151
IR::Scan {
152
scan_type,
153
sources,
154
unified_scan_args,
155
..
156
} => {
157
// New-streaming is generally on par for all except CSV (see https://github.com/pola-rs/polars/pull/22363).
158
// In the future we can potentially remove the dedicated count codepaths.
159
160
let use_fast_file_count = match scan_type.as_ref() {
161
#[cfg(feature = "csv")]
162
FileScanIR::Csv { .. } => true,
163
_ => false,
164
};
165
166
use_fast_file_count.then(|| CountStarExpr {
167
sources: sources.clone(),
168
scan_type: scan_type.clone(),
169
cloud_options: unified_scan_args.cloud_options.clone(),
170
node,
171
alias: None,
172
})
173
},
174
// A union can insert a simple projection to ensure all projections align.
175
// We can ignore that if we are inside a count star.
176
IR::SimpleProjection { input, .. } if inside_union => {
177
visit_logical_plan_for_scan_paths(*input, lp_arena, expr_arena, false)
178
},
179
IR::Select { input, expr, .. } => {
180
if expr.len() == 1 {
181
let (valid, alias) = is_valid_count_expr(&expr[0], expr_arena);
182
if valid || inside_union {
183
return visit_logical_plan_for_scan_paths(*input, lp_arena, expr_arena, false)
184
.map(|mut expr| {
185
expr.alias = alias;
186
expr.node = node;
187
expr
188
});
189
}
190
}
191
None
192
},
193
_ => None,
194
}
195
}
196
197
fn is_valid_count_expr(e: &ExprIR, expr_arena: &Arena<AExpr>) -> (bool, Option<PlSmallStr>) {
198
match expr_arena.get(e.node()) {
199
AExpr::Len => (true, e.get_alias().cloned()),
200
_ => (false, None),
201
}
202
}
203
204