Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/projection_pushdown/projection.rs
7889 views
1
use super::*;
2
3
#[inline]
4
pub(super) fn is_count(node: Node, expr_arena: &Arena<AExpr>) -> bool {
5
matches!(expr_arena.get(node), AExpr::Len)
6
}
7
8
#[allow(clippy::too_many_arguments)]
9
pub(super) fn process_projection(
10
proj_pd: &mut ProjectionPushDown,
11
input: Node,
12
mut exprs: Vec<ExprIR>,
13
mut ctx: ProjectionContext,
14
lp_arena: &mut Arena<IR>,
15
expr_arena: &mut Arena<AExpr>,
16
// Whether is SimpleProjection.
17
simple: bool,
18
) -> PolarsResult<IR> {
19
let mut local_projection = Vec::with_capacity(exprs.len());
20
21
// Special path for `SELECT count(*) FROM`
22
// as there would be no projections and we would read
23
// the whole file while we only want the count
24
if exprs.len() == 1 && is_count(exprs[0].node(), expr_arena) {
25
// Clear all accumulated projections since we only project a single column from this level.
26
ctx.acc_projections.clear();
27
ctx.projected_names.clear();
28
29
let input_lp = lp_arena.get(input);
30
31
// If the input node is not aware of `is_count_star` we must project a single column from
32
// this level, otherwise the upstream nodes may end up projecting everything.
33
let input_is_count_star_aware = match input_lp {
34
IR::DataFrameScan { .. } | IR::Scan { .. } => true,
35
#[cfg(feature = "python")]
36
IR::PythonScan { .. } => true,
37
_ => false,
38
};
39
40
if !input_is_count_star_aware {
41
if let Some(name) = input_lp
42
.schema(lp_arena)
43
.get_at_index(0)
44
.map(|(name, _)| name)
45
{
46
ctx.acc_projections
47
.push(ColumnNode(expr_arena.add(AExpr::Column(name.clone()))));
48
ctx.projected_names.insert(name.clone());
49
}
50
}
51
52
local_projection.push(exprs.pop().unwrap());
53
54
if input_is_count_star_aware {
55
ctx.inner.is_count_star = true;
56
proj_pd.is_count_star = true;
57
}
58
} else {
59
// `remove_names` tracks projected names that need to be removed as they may be aliased
60
// names that are created on this level.
61
let mut remove_names = PlHashSet::new();
62
63
// If there are non-scalar projections we must project at least one of them to maintain the
64
// output height.
65
let mut opt_non_scalar = None;
66
let mut projection_has_non_scalar = false;
67
68
let projected_exprs: Vec<ExprIR> = exprs
69
.into_iter()
70
.filter(|e| {
71
let is_non_scalar = !e.is_scalar(expr_arena);
72
73
if opt_non_scalar.is_none() && is_non_scalar {
74
opt_non_scalar = Some(e.clone())
75
}
76
77
let name = match e.output_name_inner() {
78
OutputName::LiteralLhs(name) | OutputName::Alias(name) => {
79
remove_names.insert(name.clone());
80
name
81
},
82
#[cfg(feature = "dtype-struct")]
83
OutputName::Field(name) => {
84
remove_names.insert(name.clone());
85
name
86
},
87
OutputName::ColumnLhs(name) => name,
88
OutputName::None => {
89
if cfg!(debug_assertions) {
90
panic!()
91
} else {
92
return false;
93
}
94
},
95
};
96
97
let project = ctx.acc_projections.is_empty() || ctx.projected_names.contains(name);
98
projection_has_non_scalar |= project & is_non_scalar;
99
project
100
})
101
.collect();
102
103
// Remove aliased before adding new ones.
104
if !remove_names.is_empty() {
105
if !ctx.projected_names.is_empty() {
106
for name in remove_names.iter() {
107
ctx.projected_names.remove(name);
108
}
109
}
110
111
ctx.acc_projections
112
.retain(|c| !remove_names.contains(column_node_to_name(*c, expr_arena)));
113
}
114
115
for e in projected_exprs {
116
add_expr_to_accumulated(
117
e.node(),
118
&mut ctx.acc_projections,
119
&mut ctx.projected_names,
120
expr_arena,
121
);
122
123
// do local as we still need the effect of the projection
124
// e.g. a projection is more than selecting a column, it can
125
// also be a function/ complicated expression
126
local_projection.push(e);
127
}
128
129
if !projection_has_non_scalar {
130
if let Some(non_scalar) = opt_non_scalar {
131
add_expr_to_accumulated(
132
non_scalar.node(),
133
&mut ctx.acc_projections,
134
&mut ctx.projected_names,
135
expr_arena,
136
);
137
138
local_projection.push(non_scalar);
139
}
140
}
141
}
142
143
ctx.inner.projections_seen += 1;
144
proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;
145
146
let builder = IRBuilder::new(input, expr_arena, lp_arena);
147
148
let lp = if !local_projection.is_empty() && simple {
149
builder
150
.project_simple_nodes(local_projection.into_iter().map(|e| e.node()))?
151
.build()
152
} else {
153
proj_pd.finish_node(local_projection, builder)
154
};
155
156
Ok(lp)
157
}
158
159